diff --git a/.github/workflows/mcdj.yml b/.github/workflows/mcdj.yml deleted file mode 100644 index 7be56e0..0000000 --- a/.github/workflows/mcdj.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: mcdj - -on: - pull_request: - push: - branches: [ main ] - tags: [ '*' ] - -jobs: - mcgen: - runs-on: ubuntu-latest - container: - image: codecr.jlab.org/hallb/clas12/container-forge/simulation:latest - steps: - - uses: actions/checkout@v6 - - name: json - run: ./mcdj -h diff --git a/README.md b/README.md index d16512f..ddcfd08 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Standard gcard/yaml combinations for the different data sets and data-processing * _GEMC versions prior to 5.4 do not fully support binary magnetic field maps and CCDB for RF configuration._ * _COATJAVA versions prior to 10.0.7 do not support AI- denoising or track-finding._ - +* _mcdj has moved to [its own repository](https://code.jlab.org/hallb/clas12/mcdj)._ ## Run Configurations diff --git a/mcdj b/mcdj deleted file mode 100755 index adfc207..0000000 --- a/mcdj +++ /dev/null @@ -1,354 +0,0 @@ -#!/usr/bin/env python3 - -import logging - -def cli(): - import argparse - p = argparse.ArgumentParser( - description='mcdj runs the full CLAS12 simulation pipeline, ' - ' starting from a clas12-mcgen geneator, the GEMC particle gun, or LUND files.', - epilog='All executables must be in $PATH. A double-dash should be used to separate ' - 'the generator arguments. For example, to run 12 parallel jobs of ' - 'the dvcsgen generator with its t-min cut: ' - '`mcdj -j 12 -g ./gemc.xml -y ./recon.yml -- dvcsgen -t 0.1` ' - 'Or, to process LUND files 3-at-a-time: ' - '`mcdj -j 3 ... -- lund1.txt lund2.txt lund3.txt ... ` ' - 'Or, to use GEMC\'s particle gun: ' - '`mcdj -- gemc -BEAM_P="e-, 6*GeV, 15*deg, 20*deg"` ' - 'Or, to run 100 jobs, 10 at a time: ' - '`mcdj -j 10 -J 10 ...`') - p.add_argument('-n','--nevents',default=10,type=int,help='number of events per job (default=10)',metavar='#') - p.add_argument('-j','--jobs',default=1,type=int,help='number of parallel jobs (default=1)',metavar='#') - p.add_argument('-J','--Jobs',default=1,type=int,help='number of serial jobs (default=1)',metavar='#') - p.add_argument('-g','--gcard',required=True,type=str,help='GEMC gcard configuration file',metavar='PATH') - p.add_argument('-y','--yaml',required=True,type=str,help='COATJAVA yaml configuration file',metavar='PATH') - p.add_argument('-r','--run',default=11,type=int,help='run number (default=11)') - p.add_argument('-m','--match',default=False,action='store_true',help='enable truth matching') - p.add_argument('-s','--seed',default=0,type=int,help='random number seed (default=clock)') - p.add_argument('-d','--dst',default=False,action='store_true',help='run standalone dst-maker') - p.add_argument('-R','--recon',default=True,action='store_false',help='disable reconstruction') - p.add_argument('-q','--quiet',default=False,action='store_true',help='silence GEANT4 exceptions') - p.add_argument('-v','--verbose',default=0,action='count',help='increase verbosity (repeatable)') - p.add_argument('-c','--cleanup',default=False,action='store_true',help='delete intermediate outputs') - p.add_argument('-b','--back',default=[],nargs='+',help='background files for merging',metavar='PATH') - p.add_argument('--denoise',default=False,action='store_true',help='enable old denoising (use YAML for new)') - p.add_argument('gen',nargs='+',help='generator command line or LUND file(s)') - return p - -class ColoredFormatter(logging.Formatter): - def __init__(self, msg): - self.length = 20 - self.colors = {'INFO':32,'WARNING':35,'CRITICAL':31,'ERROR':31} - self.reset_seq = '\033[0m' - self.color_seq = '\033[1;%dm' - logging.Formatter.__init__(self, msg) - def format(self, record): - if record.levelname in self.colors: - l = record.levelname - record.levelname = '%-20s'%( (self.color_seq % self.colors[l]) + l + self.reset_seq) - record.msg = self.color_seq % (self.colors[l]) + record.msg + self.reset_seq - else: - l = self.length - len(self.reset_seq) - len(self.color_seq) - record.levelname = ('%%-%ds'%l) % record.levelname - return logging.Formatter.format(self, record) - -class ColoredLogger(logging.Logger): - def __init__(self, name): - logging.Logger.__init__(self, name, logging.INFO) - console = ExitingStreamHandler() - console.setFormatter(ColoredFormatter('[%(levelname)s] %(message)s')) - self.addHandler(console) - return - -class ExitingStreamHandler(logging.StreamHandler): - def emit(self, record): - self.setFormatter(ColoredFormatter('[%(levelname)s] %(message)s')) - super().emit(record) - if record.levelno >= logging.CRITICAL: - import sys - sys.exit(record.levelno) - -# get a 32-bit RNG seed from the system clock: -def get_rng_clock_seed(): - import time - return int(time.time()*1e6) & 0xFFFFFFFF - -# run a subprocess and wait for it to return: -def run(cfg, cwd, cmd): - import subprocess - logging.getLogger(__name__).info(' '.join(cmd)) - p = subprocess.Popen(cmd,cwd=cwd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,encoding='latin-1') - hush = False - for line in iter(p.stdout.readline, ''): - if len(line.strip())>0: - if cfg and cfg.quiet: - if line.find('*********************************************************************') >= 0: - continue - if line.find('-------- WWWW ------- G4Exception-START -------- WWWW -------') >= 0: - hush = True - continue - elif line.find('-------- WWWW -------- G4Exception-END --------- WWWW -------') >= 0: - hush = False - continue - if not hush: - print(line.rstrip()) - p.wait() - return p.returncode - -def get_numa_cpus(node): - import subprocess - cmd = ['numactl','-H'] - logging.getLogger(__name__).info(' '.join(cmd)) - p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,encoding='latin-1') - for line in iter(p.stdout.readline, ''): - if line.startswith(f'node {node} cpus:'): - return line.strip().split()[3:] - -# a list of all the output files we've made: -_outputs = [] - -# choose output filename based on input filename and new prefix/suffix: -def get_out(cfg, inp, odir, prefix=None, suffix=None): - import os - o = os.path.basename(inp) - if suffix: - if o.rfind('.') > 0: - o = o[:o.rfind('.')] - o = o + suffix - if prefix: - o = prefix + o - _outputs.append(odir+'/'+o) - return o - -# run a clas12-mcgen event genetator: -def run_generator(cfg, cwd): - import time - o = cfg.gen[0]+'.dat' - cmd = [cfg.gen[0]] - seed = get_rng_clock_seed() if cfg.seed==0 else cfg.seed - cmd.extend(['--docker','--trig',str(cfg.nevents),'--seed',str(seed)]) - cmd.extend(cfg.gen[1:]) - return run(cfg,cwd,cmd), o - -# run GEMC, with LUND input or internal generator: -def run_gemc(cfg, cwd, lund=None): - cmd = ['gemc',cfg.gcard,f'-RUNNO={cfg.run}','-USE_GUI=0',f'-N={cfg.nevents}'] - seed = get_rng_clock_seed() if cfg.seed==0 else cfg.seed - cmd.extend([f'-RANDOM={seed}']) - if cfg.match: - cmd.extend(['-SAVE_ALL_MOTHERS=1','-SKIPREJECTEDHITS=1','-INTEGRATEDRAW="*"','-NGENP=50']) - if lund: - o = get_out(cfg, lund, cwd, suffix='.hipo') - cmd.append(f'-INPUT_GEN_FILE=LUND,{lund}') - else: - o = cwd+'/gemc.hipo' - cmd.extend(cfg.gen[1:]) - cmd.append(f'-OUTPUT=hipo,{o}') - return run(cfg,cwd,cmd), o - -# run background merging: -import threading -_bglock = threading.Lock() -def run_bgmerge(cfg, cwd, inp): - o = get_out(cfg, inp, cwd, prefix='bg_') - with _bglock: - cmd=['bg-merger','-i',inp,'-o',o,cfg.back[cfg.iback[cfg.nback]]] - cfg.nback = 0 if cfg.nback+1 >= len(cfg.back) else cfg.nback+1 - return run(cfg,cwd,cmd), o - -# run denoising: -def run_denoise(cfg, cwd, inp): - o = get_out(cfg, inp, cwd, prefix='dn_') - cmd=['denoise2.exe','-o',o,'-i',inp] - return run(cfg,cwd, cmd), o - -# run reconstruction: -def run_recon(cfg, cwd, inp): - o = get_out(cfg, inp, cwd, prefix='rec_') - cmd=['recon-util','-y',cfg.yaml,'-i',inp,'-o',o] - return run(cfg,cwd,cmd), o - -# run dst maker: -def run_dst(cfg, cwd, inp): - o = get_out(cfg, inp, cwd, prefix='dst_') - cmd=['dst-maker','-o',o,inp] - return run(cfg,cwd,cmd), o - -# run the reconstruction portion of the simulation pipeline: -def recon_pipeline(cfg, cwd, hipo): - ret,o = 0,hipo - if len(cfg.back) > 0: - ret,o = run_bgmerge(cfg, cwd, o) - if ret != 0: - logging.getLogger(__name__).error('background merging failed.') - return ret,o - if cfg.denoise: - ret,o = run_denoise(cfg, cwd, o) - if ret != 0: - logging.getLogger(__name__).error('denoising failed.') - return ret,o - if cfg.recon: - ret,o = run_recon(cfg, cwd, o) - if ret != 0: - logging.getLogger(__name__).error('reconstruction failed.') - return ret,o - if cfg.dst: - ret,o = run_dst(cfg, cwd, o) - if ret != 0: - logging.getLogger(__name__).error('dst making failed.') - return ret,o - return ret,o - -# run a GEMC particle-gun simulation pipeline: -def gemc_pipeline(cfg, cwd): - ret,o = run_gemc(cfg, cwd) - if ret != 0: - logging.getLogger(__name__).error('gemc failed.') - return ret,o - return recon_pipeline(cfg, cwd, o) - -# run a LUND simulation pipeline: -def lund_pipeline(cfg, cwd, lund): - ret,o = run_gemc(cfg, cwd, lund) - if ret != 0: - logging.getLogger(__name__).error('gemc failed.') - return ret,o - return recon_pipeline(cfg, cwd, o) - -# run a clas12-mcgen event-generator simulation pipeline: -def generator_pipeline(cfg, cwd): - ret,o = run_generator(cfg, cwd) - if ret != 0: - logging.getLogger(__name__).error(f'generator {cfg.gen[0]} failed.') - return ret,o - return lund_pipeline(cfg, cwd, o) - -# choose job directory and make it if necessary: -def get_job_dir(cfg, i): - import os - d = '.' if cfg.jobs==1 and cfg.Jobs==1 else f'./mcdj-{cfg.gen[0]}-{i}' - if not os.path.exists(d): os.makedirs(d) - return os.path.abspath(d) - -# cleanup the filesystem mess we made: -def cleanup(cfg, outputs): - import os - import shutil - # move and rename final output files: - if cfg.jobs > 1: - for i,o in enumerate(outputs): - b,s = os.path.splitext(os.path.basename(o)) - os.rename(o, f'{b}-{i}{s}') - if cfg.cleanup: - # remove intermediate output files: - [ os.remove(o) for o in set(_outputs) - set(outputs) if os.path.isfile(o) ] - if cfg.jobs > 1: - # remove job subdirectories: - [ shutil.rmtree(os.path.dirname(o)) for o in outputs ] - -# spawn parallel pipelines and wait for them to finish: -def launch_pipelines(cfg): - import os - import time - import itertools - import concurrent.futures as cf - from types import SimpleNamespace - ijob,ojob = 0,0 - futures = [] - results = cfg.jobs * cfg.Jobs * [SimpleNamespace(stat=1, out=None, cwd=None)] - with cf.ThreadPoolExecutor(max_workers=cfg.jobs) as exe: - while True: - # collect finished tasks: - for i,f in enumerate(futures): - if f.done(): - if f.exception() is None: - results[ojob].stat,results[ojob].out = f.result() - ojob += 1 - else: - print(f.exception()) - futures.pop(i) - # spawn new tasks: - while len(futures) < cfg.jobs and ijob < cfg.jobs*cfg.Jobs \ - and ( cfg.gen[0] != 'lund' or ijob+1 < len(cfg.gen) ): - results[ijob].cwd = get_job_dir(cfg, ijob) - if cfg.gen[0] == 'lund': - futures.append( exe.submit(lund_pipeline, cfg, results[ijob].cwd, cfg.gen[1:][ijob]) ) - elif cfg.gen[0] == 'gemc': - futures.append( exe.submit(gemc_pipeline, cfg, results[ijob].cwd) ) - else: - futures.append( exe.submit(generator_pipeline, cfg, results[ijob].cwd) ) - ijob += 1 - # done, nothing left to do: - if len(futures) == 0: - break - time.sleep(1) - # cleanup filesystem: - cleanup(cfg, [f'{o.cwd}/{o.out}' for o in results if o.out is not None]) - # return sum of all statuses: - return sum([o.stat for o in results if o.stat is not None]) - -def configure(cfg): - - logging.getLogger(__name__).setLevel(20-10*cfg.verbose) - - # check existence of executables in $PATH: - import os - import shutil - executables = ['gemc','bg-merger','recon-util','dst-maker'] - for exe in filter(lambda x: not shutil.which(x), executables): - logging.getLogger(__name__).critical('executable not found in $PATH: '+exe) - if cfg.denoise and not shutil.which('denoise2.exe'): - logging.getLogger(__name__).critical('executable not found in $PATH: denoise2.exe') - - # determine event generator: - if cfg.gen[0] == 'gemc': - logging.getLogger(__name__).warning('using GEMC internal generator.') - logging.getLogger(__name__).warning('using generator options: '+' '.join(cfg.gen[1:])) - elif shutil.which(cfg.gen[0]): - logging.getLogger(__name__).warning('using generator found in $PATH: '+shutil.which(cfg.gen[0])) - logging.getLogger(__name__).warning('using generator options: '+' '.join(cfg.gen[1:])) - else: - logging.getLogger(__name__).warning('generator not found in $PATH, interpreting as LUND file(s) ...') - for f in filter(lambda x: not os.path.isfile(x),cfg.gen[1:]): - logging.getLogger(__name__).critical(f'LUND file does not exist: {f}.') - cfg.gen.insert(0, 'lund') - - # convert all input paths to absolute paths: - import os - cfg.gcard = os.path.abspath(cfg.gcard) - cfg.yaml = os.path.abspath(cfg.yaml) - cfg.back = [ os.path.abspath(b) for b in cfg.back ] - if cfg.gen[0] == 'lund': - for i in range(1,len(cfg.gen)): - cfg.gen[i] = os.path.abspath(cfg.gen[i]) - - # check existence of input files: - if not os.path.isfile(cfg.gcard): - logging.getLogger(__name__).critical(f'invalid gcard: {cfg.gcard}') - if not os.path.isfile(cfg.yaml): - logging.getLogger(__name__).critical(f'invalid yaml: {cfg.yaml}') - for b in [ b for b in cfg.back if not os.path.isfile(b) ]: - logging.getLogger(__name__).critical(f'invalid background file: {b}') - if cfg.gen[0] == 'lund': - for l in [ l for l in cfg.gen[1:] if not os.path.isfile(l) ]: - logging.getLogger(__name__).critical(f'invalid lund file: {l}') - - logging.getLogger(__name__).debug('config: '+str(cfg)) - - # generate static random sequence of background files: - import random - cfg.iback = random.sample(range(len(cfg.back)), len(cfg.back)) - cfg.nback = 0 - - return cfg - -if __name__ == '__main__': - - import logging - logging.setLoggerClass(ColoredLogger) - - cfg = cli().parse_args() - cfg = configure(cfg) - import sys - sys.exit(launch_pipelines(cfg)) -