2494 tmpdir = tempfile.mkdtemp()
2498 tmpworkdir = tempfile.mkdtemp()
2504 jobsub_workdir_files_args = []
2508 input_list_name =
'' 2509 if stage.inputlist !=
'':
2510 input_list_name = os.path.basename(stage.inputlist)
2511 work_list_name = os.path.join(tmpworkdir, input_list_name)
2512 if stage.inputlist != work_list_name:
2513 input_files = larbatch_posix.readlines(stage.inputlist)
2514 print(
'Making input list.')
2515 work_list =
safeopen(work_list_name)
2516 for input_file
in input_files:
2517 print(
'Adding input file %s' % input_file)
2518 work_list.write(
'%s\n' % input_file.strip())
2520 print(
'Done making input list.')
2524 fcls = project.get_fcl(stage.fclname)
2529 workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
2530 if os.path.abspath(fcl) != os.path.abspath(workfcl):
2531 larbatch_posix.copy(fcl, workfcl)
2539 wrapper_fcl_name = os.path.join(tmpworkdir,
'wrapper.fcl')
2540 wrapper_fcl =
safeopen(wrapper_fcl_name)
2542 original_project_name = project.name
2543 original_stage_name = stage.name
2544 original_project_version = project.version
2547 wrapper_fcl.write(
'#---STAGE %d\n' % stageNum)
2548 wrapper_fcl.write(
'#include "%s"\n' % os.path.basename(fcl))
2549 wrapper_fcl.write(
'\n')
2554 xml_has_metadata = project.file_type !=
'' or \
2555 project.run_type !=
'' 2556 if xml_has_metadata:
2560 if project.release_tag !=
'':
2561 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
2562 project.release_tag)
2564 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "test"\n')
2565 if project.file_type:
2566 wrapper_fcl.write(
'services.FileCatalogMetadata.fileType: "%s"\n' % \
2568 if project.run_type:
2569 wrapper_fcl.write(
'services.FileCatalogMetadata.runType: "%s"\n' % \
2575 if stageNum < len(stage.project_name)
and stage.project_name[stageNum] !=
'':
2576 project.name = stage.project_name[stageNum]
2577 if stageNum < len(stage.stage_name)
and stage.stage_name[stageNum] !=
'':
2578 stage.name = stage.stage_name[stageNum]
2579 if stageNum < len(stage.project_version)
and stage.project_version[stageNum] !=
'':
2580 project.version = stage.project_version[stageNum]
2581 sam_metadata = project_utilities.get_sam_metadata(project, stage)
2583 wrapper_fcl.write(sam_metadata)
2584 project.name = original_project_name
2585 stage.name = original_stage_name
2586 project.version = original_project_version
2591 if (
not stage.pubs_input
and stage.pubs_output)
or stage.output_run:
2592 wrapper_fcl.write(
'source.firstRun: %d\n' % stage.output_run)
2598 if stage.maxfluxfilemb != 0
and stageNum == 0:
2599 wrapper_fcl.write(
'physics.producers.generator.FluxCopyMethod: "IFDH"\n')
2600 wrapper_fcl.write(
'physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
2601 wrapper_fcl.write(
'#---END_STAGE\n')
2602 stageNum = 1 + stageNum
2611 abssetupscript = project_utilities.get_setup_script_path()
2613 if not abssetupscript.startswith(
'/cvmfs/'):
2614 setupscript = os.path.join(stage.workdir,
'setup_experiment.sh')
2615 larbatch_posix.copy(abssetupscript, setupscript)
2616 jobsub_workdir_files_args.extend([
'-f', setupscript])
2621 if stage.batchname !=
'':
2622 workname = stage.batchname
2624 workname =
'%s-%s-%s' % (stage.name, project.name, project.release_tag)
2625 workname = workname + os.path.splitext(stage.script)[1]
2627 workscript = os.path.join(tmpdir, workname)
2628 if stage.script != workscript:
2629 larbatch_posix.copy(stage.script, workscript)
2633 workstartscript =
'' 2635 if stage.start_script !=
'':
2636 workstartname =
'start-%s' % workname
2638 workstartscript = os.path.join(tmpdir, workstartname)
2639 if stage.start_script != workstartscript:
2640 larbatch_posix.copy(stage.start_script, workstartscript)
2646 if stage.stop_script !=
'':
2647 workstopname =
'stop-%s' % workname
2649 workstopscript = os.path.join(tmpdir, workstopname)
2650 if stage.stop_script != workstopscript:
2651 larbatch_posix.copy(stage.stop_script, workstopscript)
2655 for init_script
in stage.init_script:
2656 if init_script !=
'':
2657 if not larbatch_posix.exists(init_script):
2658 raise RuntimeError(
'Worker initialization script %s does not exist.\n' % \
2660 work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
2661 if init_script != work_init_script:
2662 larbatch_posix.copy(init_script, work_init_script)
2666 n = len(stage.init_script)
2668 stage.init_script =
'' 2670 stage.init_script = stage.init_script[0]
2675 work_init_wrapper = os.path.join(tmpworkdir,
'init_wrapper.sh')
2676 f =
open(work_init_wrapper,
'w')
2677 f.write(
'#! /bin/bash\n')
2678 for init_script
in stage.init_script:
2680 f.write(
'echo "Executing %s"\n' % os.path.basename(init_script))
2681 f.write(
'./%s\n' % os.path.basename(init_script))
2682 f.write(
'status=$?\n')
2683 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(init_script))
2684 f.write(
'if [ $status -ne 0 ]; then\n')
2685 f.write(
' exit $status\n')
2688 f.write(
'echo "Done executing initialization scripts."\n')
2690 stage.init_script = work_init_wrapper
2694 for init_source
in stage.init_source:
2695 if init_source !=
'':
2696 if not larbatch_posix.exists(init_source):
2697 raise RuntimeError(
'Worker initialization source script %s does not exist.\n' % \
2699 work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
2700 if init_source != work_init_source:
2701 larbatch_posix.copy(init_source, work_init_source)
2705 n = len(stage.init_source)
2707 stage.init_source =
'' 2709 stage.init_source = stage.init_source[0]
2715 work_init_source_wrapper = os.path.join(tmpworkdir,
'init_source_wrapper.sh')
2716 f =
open(work_init_source_wrapper,
'w')
2717 for init_source
in stage.init_source:
2719 f.write(
'echo "Sourcing %s"\n' % os.path.basename(init_source))
2720 f.write(
'source %s\n' % os.path.basename(init_source))
2722 f.write(
'echo "Done sourcing initialization scripts."\n')
2724 stage.init_source = work_init_source_wrapper
2728 for end_script
in stage.end_script:
2729 if end_script !=
'':
2730 if not larbatch_posix.exists(end_script):
2731 raise RuntimeError(
'Worker end-of-job script %s does not exist.\n' % end_script)
2732 work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
2733 if end_script != work_end_script:
2734 larbatch_posix.copy(end_script, work_end_script)
2738 n = len(stage.end_script)
2740 stage.end_script =
'' 2742 stage.end_script = stage.end_script[0]
2747 work_end_wrapper = os.path.join(tmpworkdir,
'end_wrapper.sh')
2748 f =
open(work_end_wrapper,
'w')
2749 f.write(
'#! /bin/bash\n')
2750 for end_script
in stage.end_script:
2752 f.write(
'echo "Executing %s"\n' % os.path.basename(end_script))
2753 f.write(
'./%s\n' % os.path.basename(end_script))
2754 f.write(
'status=$?\n')
2755 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(end_script))
2756 f.write(
'if [ $status -ne 0 ]; then\n')
2757 f.write(
' exit $status\n')
2760 f.write(
'echo "Done executing finalization scripts."\n')
2762 stage.end_script = work_end_wrapper
2766 for istage
in stage.mid_source:
2767 for mid_source
in stage.mid_source[istage]:
2768 if mid_source !=
'':
2769 if not larbatch_posix.exists(mid_source):
2770 raise RuntimeError(
'Worker midstage initialization source script %s does not exist.\n' % mid_source)
2771 work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
2772 if mid_source != work_mid_source:
2773 larbatch_posix.copy(mid_source, work_mid_source)
2779 if len(stage.mid_source) > 0:
2780 work_mid_source_wrapper = os.path.join(tmpworkdir,
'mid_source_wrapper.sh')
2781 f =
open(work_mid_source_wrapper,
'w')
2782 for istage
in stage.mid_source:
2783 for mid_source
in stage.mid_source[istage]:
2784 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2786 f.write(
' echo "Sourcing %s"\n' % os.path.basename(mid_source))
2787 f.write(
' source %s\n' % os.path.basename(mid_source))
2790 f.write(
'echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
2792 stage.mid_source = work_mid_source_wrapper
2794 stage.mid_source =
'' 2798 for istage
in stage.mid_script:
2799 for mid_script
in stage.mid_script[istage]:
2800 if mid_script !=
'':
2801 if not larbatch_posix.exists(mid_script):
2802 raise RuntimeError(
'Worker midstage finalization script %s does not exist.\n' % mid_script)
2803 work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
2804 if mid_script != work_mid_script:
2805 larbatch_posix.copy(mid_script, work_mid_script)
2810 if len(stage.mid_script) > 0:
2811 work_mid_wrapper = os.path.join(tmpworkdir,
'mid_wrapper.sh')
2812 f =
open(work_mid_wrapper,
'w')
2813 f.write(
'#! /bin/bash\n')
2814 f.write(
'stage=$1\n')
2815 for istage
in stage.mid_script:
2816 for mid_script
in stage.mid_script[istage]:
2817 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2819 f.write(
' echo "Executing %s"\n' % os.path.basename(mid_script))
2820 f.write(
' ./%s\n' % os.path.basename(mid_script))
2821 f.write(
' status=$?\n')
2822 f.write(
' echo "%s finished with status $status"\n' % os.path.basename(mid_script))
2823 f.write(
' if [ $status -ne 0 ]; then\n')
2824 f.write(
' exit $status\n')
2828 f.write(
'echo "Done executing midstage finalization scripts for stage $stage."\n')
2830 stage.mid_script = work_mid_wrapper
2832 stage.mid_script =
'' 2836 helpers = (
'root_metadata.py',
2839 'validate_in_job.py',
2844 for helper
in helpers:
2848 jobinfo = subprocess.Popen([
'which', helper],
2849 stdout=subprocess.PIPE,
2850 stderr=subprocess.PIPE)
2851 jobout, joberr = jobinfo.communicate()
2855 helper_path = jobout.splitlines()[0].strip()
2857 work_helper = os.path.join(tmpworkdir, helper)
2858 if helper_path != work_helper:
2859 larbatch_posix.copy(helper_path, work_helper)
2861 print(
'Helper script %s not found.' % helper)
2866 helper_modules = (
'larbatch_posix',
2867 'project_utilities',
2868 'larbatch_utilities',
2869 'experiment_utilities',
2872 for helper_module
in helper_modules:
2876 jobinfo = subprocess.Popen([
'python'],
2877 stdin=subprocess.PIPE,
2878 stdout=subprocess.PIPE,
2879 stderr=subprocess.PIPE)
2880 cmd =
'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
2882 jobout, joberr = jobinfo.communicate()
2886 helper_path = jobout.splitlines()[-1].strip()
2889 work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
2890 if helper_path != work_helper:
2891 larbatch_posix.copy(helper_path, work_helper)
2893 print(
'Helper python module %s not found.' % helper_module)
2900 checked_file = os.path.join(stage.bookdir,
'checked')
2901 if not larbatch_posix.exists(checked_file):
2902 raise RuntimeError(
'Wait for any running jobs to finish and run project.py --check')
2907 bad_filename = os.path.join(stage.bookdir,
'bad.list')
2908 if larbatch_posix.exists(bad_filename):
2909 lines = larbatch_posix.readlines(bad_filename)
2911 bad_subdir = line.strip()
2912 if bad_subdir !=
'':
2913 bad_path = os.path.join(stage.outdir, bad_subdir)
2914 if larbatch_posix.exists(bad_path):
2915 print(
'Deleting %s' % bad_path)
2916 larbatch_posix.rmtree(bad_path)
2917 bad_path = os.path.join(stage.logdir, bad_subdir)
2918 if larbatch_posix.exists(bad_path):
2919 print(
'Deleting %s' % bad_path)
2920 larbatch_posix.rmtree(bad_path)
2921 bad_path = os.path.join(stage.bookdir, bad_subdir)
2922 if larbatch_posix.exists(bad_path):
2923 print(
'Deleting %s' % bad_path)
2924 larbatch_posix.rmtree(bad_path)
2931 if stage.inputdef ==
'':
2932 missing_filename = os.path.join(stage.bookdir,
'missing_files.list')
2933 if larbatch_posix.exists(missing_filename):
2934 lines = larbatch_posix.readlines(missing_filename)
2936 words = line.split()
2938 missing_files.append(words[0])
2939 makeup_count = len(missing_files)
2940 print(
'Makeup list contains %d files.' % makeup_count)
2942 if input_list_name !=
'':
2943 work_list_name = os.path.join(tmpworkdir, input_list_name)
2944 if larbatch_posix.exists(work_list_name):
2945 larbatch_posix.remove(work_list_name)
2946 work_list =
safeopen(work_list_name)
2947 for missing_file
in missing_files:
2948 work_list.write(
'%s\n' % missing_file)
2955 if stage.inputdef ==
'' and stage.inputfile ==
'' and stage.inputlist ==
'':
2956 procs = set(range(stage.num_jobs))
2961 output_files = os.path.join(stage.bookdir,
'files.list')
2962 if larbatch_posix.exists(output_files):
2963 lines = larbatch_posix.readlines(output_files)
2965 dir = os.path.basename(os.path.dirname(line))
2966 dir_parts = dir.split(
'_')
2967 if len(dir_parts) > 1:
2968 proc =
int(dir_parts[1])
2971 if len(procs) != makeup_count:
2972 raise RuntimeError(
'Makeup process list has different length than makeup count.')
2977 procmap =
'procmap.txt' 2978 procmap_path = os.path.join(tmpworkdir, procmap)
2979 procmap_file =
safeopen(procmap_path)
2981 procmap_file.write(
'%d\n' % proc)
2982 procmap_file.close()
2991 cpids_filename = os.path.join(stage.bookdir,
'cpids.list')
2992 if larbatch_posix.exists(cpids_filename):
2993 cpids_files = larbatch_posix.readlines(cpids_filename)
2994 for line
in cpids_files:
2995 cpids.append(line.strip())
3001 project_utilities.test_kca()
3002 makeup_defname = samweb.makeProjectName(stage.inputdef) +
'_makeup' 3009 cpids_list = cpids_list +
'%s%s' % (sep, cpid)
3014 dim =
'(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
3018 print(
'Creating makeup sam dataset definition %s' % makeup_defname)
3019 project_utilities.test_kca()
3020 samweb.createDefinition(defname=makeup_defname, dims=dim)
3021 makeup_count = samweb.countFiles(defname=makeup_defname)
3022 print(
'Makeup dataset contains %d files.' % makeup_count)
3026 tmptar =
'%s/work.tar' % tmpworkdir
3027 jobinfo = subprocess.Popen([
'tar',
'-cf', tmptar,
'-C', tmpworkdir,
3028 '--mtime=2018-01-01',
3029 '--exclude=work.tar',
'.'],
3030 stdout=subprocess.PIPE,
3031 stderr=subprocess.PIPE)
3032 jobout, joberr = jobinfo.communicate()
3035 raise RuntimeError(
'Failed to create work tarball in %s' % tmpworkdir)
3039 hasher = hashlib.md5()
3040 f =
open(tmptar,
'rb')
3045 hash = hasher.hexdigest()
3052 hashtar =
'%s/work%s.tar' % (stage.workdir, hash)
3053 if not larbatch_posix.exists(hashtar):
3054 larbatch_posix.copy(tmptar, hashtar)
3055 jobsub_workdir_files_args.extend([
'-f', hashtar])
3062 inputdef = stage.inputdef
3063 if makeup
and makeup_defname !=
'':
3064 inputdef = makeup_defname
3071 project_utilities.test_kca()
3072 prjname = samweb.makeProjectName(inputdef)
3077 if stage.mixinputdef !=
'':
3079 project_utilities.test_kca()
3080 mixprjname =
'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
3085 if prjname !=
'' and stage.prestart != 0:
3086 ok = project_utilities.start_project(inputdef, prjname,
3087 stage.num_jobs * stage.max_files_per_job,
3088 stage.recur, stage.filelistdef)
3090 print(
'Failed to start project.')
3096 if mixprjname !=
'' and prj_started:
3097 ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
3099 print(
'Failed to start mix project.')
3104 role = project_utilities.get_role()
3105 if project.role !=
'':
3110 command = [
'jobsub_submit']
3115 command.append(
'--group=%s' % project_utilities.get_experiment())
3116 command.append(
'--role=%s' % role)
3117 command.extend(jobsub_workdir_files_args)
3118 if project.server !=
'-' and project.server !=
'':
3119 command.append(
'--jobsub-server=%s' % project.server)
3120 if stage.resource !=
'':
3121 command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3122 elif project.resource !=
'':
3123 command.append(
'--resource-provides=usage_model=%s' % project.resource)
3124 if stage.lines !=
'':
3125 command.append(
'--lines=%s' % stage.lines)
3126 elif project.lines !=
'':
3127 command.append(
'--lines=%s' % project.lines)
3128 if stage.site !=
'':
3129 command.append(
'--site=%s' % stage.site)
3130 if stage.blacklist !=
'':
3131 command.append(
'--blacklist=%s' % stage.blacklist)
3133 command.append(
'--cpu=%d' % stage.cpu)
3134 if stage.disk !=
'':
3135 command.append(
'--disk=%s' % stage.disk)
3136 if stage.memory != 0:
3137 command.append(
'--memory=%d' % stage.memory)
3138 if project.os !=
'':
3139 if stage.singularity == 0:
3140 command.append(
'--OS=%s' % project.os)
3142 p = project_utilities.get_singularity(project.os)
3144 if (stage.num_jobs > 1
or project.force_dag)
and \
3145 (inputdef !=
'' or stage.mixinputdef !=
'') :
3146 command.append(
r"""--lines='+SingularityImage=\"%s\"'""" % p)
3148 command.append(
r"""--lines='+SingularityImage="%s"'""" % p)
3150 raise RuntimeError(
'No singularity image found for %s' % project.os)
3151 if not stage.pubs_output:
3153 command_njobs = stage.num_jobs
3154 command.extend([
'-N',
'%d' % command_njobs])
3156 command_njobs =
min(makeup_count, stage.num_jobs)
3157 command.extend([
'-N',
'%d' % command_njobs])
3159 if stage.inputdef !=
'':
3160 command_njobs = stage.num_jobs
3162 command_njobs = stage.num_jobs
3163 command.extend([
'-N',
'%d' % command_njobs])
3164 if stage.jobsub !=
'':
3165 for word
in stage.jobsub.split():
3166 command.append(word)
3167 opt = project_utilities.default_jobsub_submit_options()
3169 for word
in opt.split():
3170 command.append(word)
3171 if stage.cvmfs != 0:
3172 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3173 if stage.stash != 0:
3174 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3175 if stage.singularity != 0:
3176 command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3180 workurl =
"file://%s" % workscript
3181 command.append(workurl)
3186 if stage.max_files_per_job != 0:
3187 command_max_files_per_job = stage.max_files_per_job
3188 command.extend([
'--nfile',
'%d' % command_max_files_per_job])
3193 command.extend([
' --group', project_utilities.get_experiment()])
3194 command.extend([
' -g'])
3195 command.extend([
' -c',
'wrapper.fcl'])
3196 command.extend([
' --ups',
','.join(project.ups)])
3197 if project.release_tag !=
'':
3198 command.extend([
' -r', project.release_tag])
3199 command.extend([
' -b', project.release_qual])
3200 if project.local_release_dir !=
'':
3201 command.extend([
' --localdir', project.local_release_dir])
3202 if project.local_release_tar !=
'':
3203 command.extend([
' --localtar', project.local_release_tar])
3204 command.extend([
' --workdir', stage.workdir])
3205 command.extend([
' --outdir', stage.outdir])
3206 command.extend([
' --logdir', stage.logdir])
3207 if stage.dirsize > 0:
3208 command.extend([
' --dirsize',
'%d' % stage.dirsize])
3209 if stage.dirlevels > 0:
3210 command.extend([
' --dirlevels',
'%d' % stage.dirlevels])
3213 command.extend([
' --exe',
':'.join(stage.exe)])
3215 command.extend([
' --exe', stage.exe])
3216 if stage.schema !=
'':
3217 command.extend([
' --sam_schema', stage.schema])
3218 if project.os !=
'':
3219 command.extend([
' --os', project.os])
3223 if not stage.pubs_input
and stage.pubs_output
and stage.output_subruns[0] > 0:
3224 command.extend([
'--process',
'%d' % (stage.output_subruns[0]-1)])
3229 command.append(
'--single')
3231 if stage.inputfile !=
'':
3232 command.extend([
' -s', stage.inputfile])
3233 elif input_list_name !=
'':
3234 command.extend([
' -S', input_list_name])
3235 elif inputdef !=
'':
3236 command.extend([
' --sam_defname', inputdef,
3237 ' --sam_project', prjname])
3239 command.extend([
' --recur'])
3240 if stage.mixinputdef !=
'':
3241 command.extend([
' --mix_defname', stage.mixinputdef,
3242 ' --mix_project', mixprjname])
3243 if stage.inputmode !=
'':
3244 command.extend([
' --inputmode', stage.inputmode])
3245 command.extend([
' -n',
'%d' % stage.num_events])
3246 if stage.inputdef ==
'':
3247 command.extend([
' --njobs',
'%d' % stage.num_jobs ])
3248 for ftype
in stage.datafiletypes:
3249 command.extend([
'--data_file_type', ftype])
3251 command.extend([
' --procmap', procmap])
3254 command.extend([
' --output',
':'.join(stage.output)])
3256 command.extend([
' --output', stage.output])
3257 if stage.TFileName !=
'':
3258 command.extend([
' --TFileName', stage.TFileName])
3259 if stage.init_script !=
'':
3260 command.extend([
' --init-script', os.path.basename(stage.init_script)])
3261 if stage.init_source !=
'':
3262 command.extend([
' --init-source', os.path.basename(stage.init_source)])
3263 if stage.end_script !=
'':
3264 command.extend([
' --end-script', os.path.basename(stage.end_script)])
3265 if stage.mid_source !=
'':
3266 command.extend([
' --mid-source', os.path.basename(stage.mid_source)])
3267 if stage.mid_script !=
'':
3268 command.extend([
' --mid-script', os.path.basename(stage.mid_script)])
3269 if abssetupscript !=
'':
3270 command.extend([
' --init', abssetupscript])
3274 if stage.validate_on_worker == 1:
3275 print(
'Validation will be done on the worker node %d' % stage.validate_on_worker)
3276 command.extend([
' --validate'])
3277 command.extend([
' --declare'])
3279 if type(stage.fclname) ==
type([])
and len(stage.fclname) > 1:
3280 command.extend([
' --maintain_parentage'])
3282 if stage.copy_to_fts == 1:
3283 command.extend([
' --copy'])
3287 if (prjname !=
'' or mixprjname !=
'')
and command_njobs == 1
and not project.force_dag
and not prj_started:
3288 command.extend([
' --sam_start',
3289 ' --sam_station', project_utilities.get_experiment(),
3290 ' --sam_group', project_utilities.get_experiment()])
3299 if command_njobs > 1
or project.force_dag:
3301 dag_prjs.append([inputdef, prjname])
3302 if stage.mixinputdef !=
'':
3303 dag_prjs.append([stage.mixinputdef, mixprjname])
3305 for dag_prj
in dag_prjs:
3310 if workstartname ==
'' or workstopname ==
'':
3311 raise RuntimeError(
'Sam start or stop project script not found.')
3315 start_command = [
'jobsub']
3319 start_command.append(
'--group=%s' % project_utilities.get_experiment())
3320 if setupscript !=
'':
3321 start_command.append(
'-f %s' % setupscript)
3323 if stage.resource !=
'':
3324 start_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3325 elif project.resource !=
'':
3326 start_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3327 if stage.lines !=
'':
3328 start_command.append(
'--lines=%s' % stage.lines)
3329 elif project.lines !=
'':
3330 start_command.append(
'--lines=%s' % project.lines)
3331 if stage.site !=
'':
3332 start_command.append(
'--site=%s' % stage.site)
3333 if stage.blacklist !=
'':
3334 start_command.append(
'--blacklist=%s' % stage.blacklist)
3335 if project.os !=
'':
3336 if stage.singularity == 0:
3337 start_command.append(
'--OS=%s' % project.os)
3339 p = project_utilities.get_singularity(project.os)
3341 start_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3343 raise RuntimeError(
'No singularity image found for %s' % project.os)
3344 if stage.jobsub_start !=
'':
3345 for word
in stage.jobsub_start.split():
3346 start_command.append(word)
3347 opt = project_utilities.default_jobsub_submit_options()
3349 for word
in opt.split():
3350 start_command.append(word)
3351 if stage.cvmfs != 0:
3352 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3353 if stage.stash != 0:
3354 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3355 if stage.singularity != 0:
3356 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3360 workstarturl =
"file://%s" % workstartscript
3361 start_command.append(workstarturl)
3365 start_command.extend([
' --sam_station', project_utilities.get_experiment(),
3366 ' --sam_group', project_utilities.get_experiment(),
3367 ' --sam_defname', dag_prj[0],
3368 ' --sam_project', dag_prj[1],
3371 start_command.extend([
' --recur'])
3373 if abssetupscript !=
'':
3374 start_command.extend([
' --init', abssetupscript])
3376 if stage.num_jobs > 0
and stage.max_files_per_job > 0:
3377 start_command.extend([
' --max_files',
'%d' % (stage.num_jobs * stage.max_files_per_job)])
3379 if stage.prestagefraction > 0.:
3380 start_command.extend([
' --prestage_fraction',
'%f' % stage.prestagefraction])
3384 start_command.extend([
' --logdir', stage.logdir])
3388 if not prj_started
or stage.prestagefraction > 0.:
3389 start_commands.append(start_command)
3393 stop_command = [
'jobsub']
3397 stop_command.append(
'--group=%s' % project_utilities.get_experiment())
3398 if setupscript !=
'':
3399 stop_command.append(
'-f %s' % setupscript)
3401 if stage.resource !=
'':
3402 stop_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3403 elif project.resource !=
'':
3404 stop_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3405 if stage.lines !=
'':
3406 stop_command.append(
'--lines=%s' % stage.lines)
3407 elif project.lines !=
'':
3408 stop_command.append(
'--lines=%s' % project.lines)
3409 if stage.site !=
'':
3410 stop_command.append(
'--site=%s' % stage.site)
3411 if stage.blacklist !=
'':
3412 stop_command.append(
'--blacklist=%s' % stage.blacklist)
3413 if project.os !=
'':
3414 if stage.singularity == 0:
3415 stop_command.append(
'--OS=%s' % project.os)
3417 p = project_utilities.get_singularity(project.os)
3419 stop_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3421 raise RuntimeError(
'No singularity image found for %s' % project.os)
3422 if stage.jobsub_start !=
'':
3423 for word
in stage.jobsub_start.split():
3424 stop_command.append(word)
3425 opt = project_utilities.default_jobsub_submit_options()
3427 for word
in opt.split():
3428 stop_command.append(word)
3429 if stage.cvmfs != 0:
3430 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3431 if stage.stash != 0:
3432 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3433 if stage.singularity != 0:
3434 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3438 workstopurl =
"file://%s" % workstopscript
3439 stop_command.append(workstopurl)
3443 stop_command.extend([
' --sam_station', project_utilities.get_experiment(),
3444 ' --sam_project', dag_prj[1],
3449 stop_command.extend([
' --logdir', stage.logdir])
3451 if abssetupscript !=
'':
3452 stop_command.extend([
' --init', abssetupscript])
3456 stop_commands.append(stop_command)
3458 if len(start_commands) > 0
or len(stop_commands) > 0:
3462 dagfilepath = os.path.join(tmpdir,
'submit.dag')
3464 dag.write(
'<serial>\n')
3468 if len(start_commands) > 0:
3469 dag.write(
'\n<parallel>\n\n')
3470 for start_command
in start_commands:
3472 for word
in start_command:
3476 if word[:6] ==
'jobsub':
3480 dag.write(
'</parallel>\n')
3484 dag.write(
'\n<parallel>\n\n')
3485 for process
in range(command_njobs):
3489 for word
in command:
3499 if word[:6] ==
'jobsub':
3501 if word[:7] ==
'--role=':
3503 if word.startswith(
'--jobsub-server='):
3505 word = project_utilities.dollar_escape(word)
3507 if word[:6] ==
'jobsub':
3510 dag.write(
' --process %d\n' % process)
3512 dag.write(
'\n</parallel>\n')
3516 if len(stop_commands) > 0:
3517 dag.write(
'\n<parallel>\n\n')
3518 for stop_command
in stop_commands:
3520 for word
in stop_command:
3524 if word[:6] ==
'jobsub':
3528 dag.write(
'</parallel>\n')
3532 dag.write(
'\n</serial>\n')
3537 command = [
'jobsub_submit_dag']
3538 command.append(
'--group=%s' % project_utilities.get_experiment())
3539 if project.server !=
'-' and project.server !=
'':
3540 command.append(
'--jobsub-server=%s' % project.server)
3541 command.append(
'--role=%s' % role)
3542 dagfileurl =
'file://'+ dagfilepath
3543 command.append(dagfileurl)
3545 checked_file = os.path.join(stage.bookdir,
'checked')
3549 submit_timeout = 3600000
3551 submit_timeout += 1.0 * command_njobs
3552 if stage.jobsub_timeout > submit_timeout:
3553 submit_timeout = stage.jobsub_timeout
3561 print(
'Invoke jobsub_submit')
3563 print(
' '.join(command))
3566 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3567 thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
3569 thread.join(timeout=submit_timeout)
3570 if thread.is_alive():
3576 if larbatch_posix.exists(checked_file):
3577 larbatch_posix.remove(checked_file)
3578 if larbatch_posix.isdir(tmpdir):
3579 larbatch_posix.rmtree(tmpdir)
3580 if larbatch_posix.isdir(tmpworkdir):
3581 larbatch_posix.rmtree(tmpworkdir)
3583 raise JobsubError(command, rc, jobout, joberr)
3584 for line
in jobout.split(
'\n'):
3585 if "JobsubJobId" in line:
3586 jobid = line.strip().
split()[-1]
3588 raise JobsubError(command, rc, jobout, joberr)
3589 print(
'jobsub_submit finished.')
3595 if makeup_count > 0:
3597 print(
' '.join(command))
3600 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3601 thread = threading.Thread(target=project_utilities.wait_for_subprocess,
3604 thread.join(timeout=submit_timeout)
3605 if thread.is_alive():
3611 if larbatch_posix.exists(checked_file):
3612 larbatch_posix.remove(checked_file)
3613 if larbatch_posix.isdir(tmpdir):
3614 larbatch_posix.rmtree(tmpdir)
3615 if larbatch_posix.isdir(tmpworkdir):
3616 larbatch_posix.rmtree(tmpworkdir)
3618 raise JobsubError(command, rc, jobout, joberr)
3619 for line
in jobout.split(
'\n'):
3620 if "JobsubJobId" in line:
3621 jobid = line.strip().
split()[-1]
3623 raise JobsubError(command, rc, jobout, joberr)
3625 print(
'Makeup action aborted because makeup job count is zero.')
int open(const char *, int)
Opens a file descriptor.
def dojobsub(project, stage, makeup, recur, dryrun)
def safeopen(destination)
T min(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
void split(std::string const &s, char c, OutIter dest)