12 from __future__
import absolute_import
13 from __future__
import print_function
14 import sys, os, stat, math, subprocess, random
21 import project_utilities
22 import larbatch_utilities
23 from larbatch_utilities
import convert_str
27 from project_modules.xmlerror
import XMLError
28 from project_modules.pubsinputerror
import PubsInputError
29 from project_modules.pubsdeadenderror
import PubsDeadEndError
37 def __init__(self, stage_element, base_stage, default_input_lists, default_previous_stage,
38 default_num_jobs, default_num_events, default_max_files_per_job, default_merge,
40 default_cpu, default_disk, default_memory, default_validate_on_worker,
41 default_copy_to_fts, default_cvmfs, default_stash, default_singularity,
42 default_script, default_start_script, default_stop_script,
43 default_site, default_blacklist, check=
True):
47 if base_stage !=
None:
48 self.
name = base_stage.name
75 self.
ana = base_stage.ana
189 self.
merge = default_merge
193 self.
site = default_site
195 self.
cpu = default_cpu
196 self.
disk = default_disk
198 self.
memory = default_memory
209 self.
cvmfs = default_cvmfs
210 self.
stash = default_stash
212 self.
script = default_script
220 if 'name' in dict(stage_element.attributes):
221 self.
name =
str(stage_element.attributes[
'name'].firstChild.data)
223 raise XMLError(
"Stage name not specified.")
227 batchname_elements = stage_element.getElementsByTagName(
'batchname')
228 if batchname_elements:
229 self.
batchname =
str(batchname_elements[0].firstChild.data)
233 fclname_elements = stage_element.getElementsByTagName(
'fcl')
234 if len(fclname_elements) > 0:
236 for fcl
in fclname_elements:
237 self.fclname.append(
str(fcl.firstChild.data).strip())
239 raise XMLError(
'No Fcl names specified for stage %s.' % self.
name)
243 outdir_elements = stage_element.getElementsByTagName(
'outdir')
245 self.
outdir =
str(outdir_elements[0].firstChild.data)
247 raise XMLError(
'Output directory not specified for stage %s.' % self.
name)
251 logdir_elements = stage_element.getElementsByTagName(
'logdir')
253 self.
logdir =
str(logdir_elements[0].firstChild.data)
259 workdir_elements = stage_element.getElementsByTagName(
'workdir')
261 self.
workdir =
str(workdir_elements[0].firstChild.data)
263 raise XMLError(
'Work directory not specified for stage %s.' % self.
name)
267 bookdir_elements = stage_element.getElementsByTagName(
'bookdir')
269 self.
bookdir =
str(bookdir_elements[0].firstChild.data)
275 dirsize_elements = stage_element.getElementsByTagName(
'dirsize')
277 self.
dirsize =
int(dirsize_elements[0].firstChild.data)
281 dirlevels_elements = stage_element.getElementsByTagName(
'dirlevels')
282 if dirlevels_elements:
283 self.
dirlevels =
int(dirlevels_elements[0].firstChild.data)
287 inputfile_elements = stage_element.getElementsByTagName(
'inputfile')
288 if inputfile_elements:
289 self.
inputfile =
str(inputfile_elements[0].firstChild.data)
293 inputlist_elements = stage_element.getElementsByTagName(
'inputlist')
294 if inputlist_elements:
295 self.
inputlist =
str(inputlist_elements[0].firstChild.data)
299 inputmode_elements = stage_element.getElementsByTagName(
'inputmode')
300 if inputmode_elements:
301 self.
inputmode =
str(inputmode_elements[0].firstChild.data)
305 inputdef_elements = stage_element.getElementsByTagName(
'inputdef')
306 if inputdef_elements:
307 self.
inputdef =
str(inputdef_elements[0].firstChild.data)
311 ana_elements = stage_element.getElementsByTagName(
'ana')
313 self.
ana =
int(ana_elements[0].firstChild.data)
317 recur_elements = stage_element.getElementsByTagName(
'recur')
319 self.
recur =
int(recur_elements[0].firstChild.data)
323 recurtype_elements = stage_element.getElementsByTagName(
'recurtype')
324 if recurtype_elements:
325 self.
recurtype =
str(recurtype_elements[0].firstChild.data)
329 recurlimit_elements = stage_element.getElementsByTagName(
'recurlimit')
330 if recurlimit_elements:
331 self.
recurlimit =
int(recurlimit_elements[0].firstChild.data)
335 recurdef_elements = stage_element.getElementsByTagName(
'recurdef')
336 if recurdef_elements:
338 self.
inputdef =
str(recurdef_elements[0].firstChild.data)
343 singlerun_elements = stage_element.getElementsByTagName(
'singlerun')
344 if singlerun_elements:
345 self.
singlerun =
int(singlerun_elements[0].firstChild.data)
349 filelistdef_elements = stage_element.getElementsByTagName(
'filelistdef')
350 if filelistdef_elements:
355 prestart_elements = stage_element.getElementsByTagName(
'prestart')
356 if prestart_elements:
357 self.
prestart =
int(prestart_elements[0].firstChild.data)
361 activebase_elements = stage_element.getElementsByTagName(
'activebase')
362 if activebase_elements:
363 self.
activebase =
str(activebase_elements[0].firstChild.data)
367 dropboxwait_elements = stage_element.getElementsByTagName(
'dropboxwait')
368 if dropboxwait_elements:
373 prestagefraction_elements = stage_element.getElementsByTagName(
'prestagefraction')
374 if prestagefraction_elements:
379 inputstream_elements = stage_element.getElementsByTagName(
'inputstream')
380 if inputstream_elements:
385 previousstage_elements = stage_element.getElementsByTagName(
'previousstage')
386 if previousstage_elements:
391 if base_stage !=
None:
399 raise XMLError(
'Previous stage and input specified for stage %s.' % self.
name)
403 mixinputdef_elements = stage_element.getElementsByTagName(
'mixinputdef')
404 if mixinputdef_elements:
410 raise XMLError(
'Input file and input list both specified for stage %s.' % self.
name)
416 raise XMLError(
'Input dataset and input files specified for stage %s.' % self.
name)
420 raise XMLError(
'Input list (inputlist) or inputfile is needed for textfile model.')
430 default_input_list =
'' 431 previous_stage_name = default_previous_stage
434 if previous_stage_name
in default_input_lists:
435 default_input_list = default_input_lists[previous_stage_name]
439 if self.
inputstream ==
'' or default_input_list ==
'':
442 n = default_input_list.rfind(
'.')
444 n = len(default_input_list)
445 self.
inputlist =
'%s_%s%s' % (default_input_list[:n],
447 default_input_list[n:])
451 pubs_input_ok_elements = stage_element.getElementsByTagName(
'pubsinput')
452 if pubs_input_ok_elements:
457 maxfluxfilemb_elements = stage_element.getElementsByTagName(
'maxfluxfilemb')
458 if maxfluxfilemb_elements:
470 num_jobs_elements = stage_element.getElementsByTagName(
'numjobs')
471 if num_jobs_elements:
472 self.
num_jobs =
int(num_jobs_elements[0].firstChild.data)
476 num_events_elements = stage_element.getElementsByTagName(
'numevents')
477 if num_events_elements:
478 self.
num_events =
int(num_events_elements[0].firstChild.data)
482 max_files_per_job_elements = stage_element.getElementsByTagName(
'maxfilesperjob')
483 if max_files_per_job_elements:
489 run_number = stage_element.getElementsByTagName(
'runnumber')
495 target_size_elements = stage_element.getElementsByTagName(
'targetsize')
496 if target_size_elements:
502 defname_elements = stage_element.getElementsByTagName(
'defname')
504 self.
defname =
str(defname_elements[0].firstChild.data)
508 ana_defname_elements = stage_element.getElementsByTagName(
'anadefname')
509 if ana_defname_elements:
514 data_tier_elements = stage_element.getElementsByTagName(
'datatier')
515 if data_tier_elements:
516 self.
data_tier =
str(data_tier_elements[0].firstChild.data)
520 data_stream_elements = stage_element.getElementsByTagName(
'datastream')
521 if len(data_stream_elements) > 0:
523 for data_stream
in data_stream_elements:
524 self.data_stream.append(
str(data_stream.firstChild.data))
528 ana_data_tier_elements = stage_element.getElementsByTagName(
'anadatatier')
529 if ana_data_tier_elements:
534 ana_data_stream_elements = stage_element.getElementsByTagName(
'anadatastream')
535 if len(ana_data_stream_elements) > 0:
537 for ana_data_stream
in ana_data_stream_elements:
538 self.ana_data_stream.append(
str(ana_data_stream.firstChild.data))
542 submit_script_elements = stage_element.getElementsByTagName(
'submitscript')
543 if submit_script_elements:
558 stdout=subprocess.PIPE,
559 stderr=subprocess.PIPE)
560 jobout, joberr = jobinfo.communicate()
568 raise IOError(
'Submit script %s not found.' % self.
submit_script[0])
572 init_script_elements = stage_element.getElementsByTagName(
'initscript')
573 if len(init_script_elements) > 0:
574 for init_script_element
in init_script_elements:
575 init_script =
str(init_script_element.firstChild.data)
580 if init_script !=
'':
581 if larbatch_posix.exists(init_script):
582 init_script = os.path.realpath(init_script)
588 jobinfo = subprocess.Popen([
'which', init_script],
589 stdout=subprocess.PIPE,
590 stderr=subprocess.PIPE)
591 jobout, joberr = jobinfo.communicate()
593 init_script =
convert_str(jobout.splitlines()[0].strip())
597 if not larbatch_posix.exists(init_script):
598 raise IOError(
'Init script %s not found.' % init_script)
600 self.init_script.append(init_script)
604 init_source_elements = stage_element.getElementsByTagName(
'initsource')
605 if len(init_source_elements) > 0:
606 for init_source_element
in init_source_elements:
607 init_source =
str(init_source_element.firstChild.data)
611 if init_source !=
'':
613 if larbatch_posix.exists(init_source):
614 init_source = os.path.realpath(init_source)
620 jobinfo = subprocess.Popen([
'which', init_source],
621 stdout=subprocess.PIPE,
622 stderr=subprocess.PIPE)
623 jobout, joberr = jobinfo.communicate()
625 init_source =
convert_str(jobout.splitlines()[0].strip())
629 if not larbatch_posix.exists(init_source):
630 raise IOError(
'Init source script %s not found.' % init_source)
636 parent_element = init_source_element.parentNode
637 if parent_element.nodeName ==
'fcl':
644 fcl =
str(parent_element.firstChild.data).strip()
645 n = self.fclname.index(fcl)
654 self.init_source.append(init_source)
658 end_script_elements = stage_element.getElementsByTagName(
'endscript')
659 if len(end_script_elements) > 0:
660 for end_script_element
in end_script_elements:
661 end_script =
str(end_script_element.firstChild.data)
667 if larbatch_posix.exists(end_script):
668 end_script = os.path.realpath(end_script)
674 jobinfo = subprocess.Popen([
'which', end_script],
675 stdout=subprocess.PIPE,
676 stderr=subprocess.PIPE)
677 jobout, joberr = jobinfo.communicate()
679 end_script =
convert_str(jobout.splitlines()[0].strip())
683 if not larbatch_posix.exists(end_script):
684 raise IOError(
'End-of-job script %s not found.' % end_script)
690 parent_element = end_script_element.parentNode
691 if parent_element.nodeName ==
'fcl':
698 fcl =
str(parent_element.firstChild.data).strip()
699 n = self.fclname.index(fcl)
708 self.end_script.append(end_script)
712 project_name_elements = stage_element.getElementsByTagName(
'projectname')
713 if len(project_name_elements) > 0:
714 for project_name_element
in project_name_elements:
718 fcl_element = project_name_element.parentNode
719 if fcl_element.nodeName !=
'fcl':
720 raise XMLError(
"Found <projectname> element outside <fcl> element.")
721 fcl =
str(fcl_element.firstChild.data).strip()
726 n = self.fclname.index(fcl)
731 self.project_name.append(
'')
735 project_name =
str(project_name_element.firstChild.data)
744 self.project_name.append(
'')
748 stage_name_elements = stage_element.getElementsByTagName(
'stagename')
749 if len(stage_name_elements) > 0:
750 for stage_name_element
in stage_name_elements:
754 fcl_element = stage_name_element.parentNode
755 if fcl_element.nodeName !=
'fcl':
756 raise XMLError(
"Found <stagename> element outside <fcl> element.")
757 fcl =
str(fcl_element.firstChild.data).strip()
762 n = self.fclname.index(fcl)
767 self.stage_name.append(
'')
771 stage_name =
str(stage_name_element.firstChild.data)
780 self.stage_name.append(
'')
784 project_version_elements = stage_element.getElementsByTagName(
'version')
785 if len(project_version_elements) > 0:
786 for project_version_element
in project_version_elements:
790 fcl_element = project_version_element.parentNode
791 if fcl_element.nodeName !=
'fcl':
792 raise XMLError(
"Found stage level <version> element outside <fcl> element.")
793 fcl =
str(fcl_element.firstChild.data).strip()
798 n = self.fclname.index(fcl)
803 self.project_version.append(
'')
807 project_version =
str(project_version_element.firstChild.data)
816 self.project_version.append(
'')
820 merge_elements = stage_element.getElementsByTagName(
'merge')
822 self.
merge =
str(merge_elements[0].firstChild.data)
826 anamerge_elements = stage_element.getElementsByTagName(
'anamerge')
827 if anamerge_elements:
828 self.
anamerge =
str(anamerge_elements[0].firstChild.data)
832 resource_elements = stage_element.getElementsByTagName(
'resource')
833 if resource_elements:
834 self.
resource =
str(resource_elements[0].firstChild.data)
835 self.
resource =
''.join(self.resource.split())
839 lines_elements = stage_element.getElementsByTagName(
'lines')
841 self.
lines =
str(lines_elements[0].firstChild.data)
845 site_elements = stage_element.getElementsByTagName(
'site')
847 self.
site =
str(site_elements[0].firstChild.data)
848 self.
site =
''.join(self.site.split())
852 blacklist_elements = stage_element.getElementsByTagName(
'blacklist')
853 if blacklist_elements:
854 self.
blacklist =
str(blacklist_elements[0].firstChild.data)
855 self.
blacklist =
''.join(self.blacklist.split())
859 cpu_elements = stage_element.getElementsByTagName(
'cpu')
861 self.
cpu =
int(cpu_elements[0].firstChild.data)
865 disk_elements = stage_element.getElementsByTagName(
'disk')
867 self.
disk =
str(disk_elements[0].firstChild.data)
868 self.
disk =
''.join(self.disk.split())
872 datafiletypes_elements = stage_element.getElementsByTagName(
'datafiletypes')
873 if datafiletypes_elements:
874 data_file_types_str =
str(datafiletypes_elements[0].firstChild.data)
875 data_file_types_str =
''.join(data_file_types_str.split())
880 memory_elements = stage_element.getElementsByTagName(
'memory')
882 self.
memory =
int(memory_elements[0].firstChild.data)
886 param_elements = stage_element.getElementsByTagName(
'parameter')
887 if len(param_elements) > 0:
889 for param_element
in param_elements:
890 name =
str(param_element.attributes[
'name'].firstChild.data)
891 value =
str(param_element.firstChild.data)
896 output_elements = stage_element.getElementsByTagName(
'output')
897 if len(output_elements) > 0:
905 for output_element
in output_elements:
906 parent_element = output_element.parentNode
907 if parent_element.nodeName !=
'fcl':
908 output =
str(output_element.firstChild.data)
911 self.output.append(output)
915 for output_element
in output_elements:
916 parent_element = output_element.parentNode
917 if parent_element.nodeName ==
'fcl':
921 fcl =
str(parent_element.firstChild.data).strip()
922 n = self.fclname.index(fcl)
926 while len(self.
output) < n+1:
927 self.output.append(
'')
931 output =
str(output_element.firstChild.data)
940 self.output.append(
'')
944 TFileName_elements = stage_element.getElementsByTagName(
'TFileName')
945 if TFileName_elements:
946 self.
TFileName =
str(TFileName_elements[0].firstChild.data)
950 jobsub_elements = stage_element.getElementsByTagName(
'jobsub')
952 self.
jobsub =
str(jobsub_elements[0].firstChild.data)
956 jobsub_start_elements = stage_element.getElementsByTagName(
'jobsub_start')
957 if jobsub_start_elements:
962 jobsub_timeout_elements = stage_element.getElementsByTagName(
'jobsub_timeout')
963 if jobsub_timeout_elements:
968 exe_elements = stage_element.getElementsByTagName(
'exe')
969 if len(exe_elements) > 0:
977 for exe_element
in exe_elements:
978 parent_element = exe_element.parentNode
979 if parent_element.nodeName !=
'fcl':
980 exe =
str(exe_element.firstChild.data)
987 for exe_element
in exe_elements:
988 parent_element = exe_element.parentNode
989 if parent_element.nodeName ==
'fcl':
993 fcl =
str(parent_element.firstChild.data).strip()
994 n = self.fclname.index(fcl)
998 while len(self.
exe) < n+1:
1003 exe =
str(exe_element.firstChild.data)
1010 if len(self.
exe) > 0:
1016 schema_elements = stage_element.getElementsByTagName(
'schema')
1018 self.
schema =
str(schema_elements[0].firstChild.data)
1022 validate_on_worker_elements = stage_element.getElementsByTagName(
'check')
1023 if validate_on_worker_elements:
1028 copy_to_fts_elements = stage_element.getElementsByTagName(
'copy')
1029 if copy_to_fts_elements:
1034 cvmfs_elements = stage_element.getElementsByTagName(
'cvmfs')
1036 self.
cvmfs =
int(cvmfs_elements[0].firstChild.data)
1040 stash_elements = stage_element.getElementsByTagName(
'stash')
1042 self.
stash =
int(stash_elements[0].firstChild.data)
1046 singularity_elements = stage_element.getElementsByTagName(
'singularity')
1047 if singularity_elements:
1052 script_elements = stage_element.getElementsByTagName(
'script')
1054 self.
script = script_elements[0].firstChild.data
1061 jobinfo = subprocess.Popen([
'which', self.
script],
1062 stdout=subprocess.PIPE,
1063 stderr=subprocess.PIPE)
1064 jobout, joberr = jobinfo.communicate()
1068 script_path = jobout.splitlines()[0].strip()
1071 if script_path ==
'' or not larbatch_posix.access(script_path, os.X_OK):
1072 raise IOError(
'Script %s not found.' % self.
script)
1073 self.
script = script_path
1077 start_script_elements = stage_element.getElementsByTagName(
'startscript')
1078 if start_script_elements:
1079 self.
start_script = start_script_elements[0].firstChild.data
1086 jobinfo = subprocess.Popen([
'which', self.
start_script],
1087 stdout=subprocess.PIPE,
1088 stderr=subprocess.PIPE)
1089 jobout, joberr = jobinfo.communicate()
1093 script_path = jobout.splitlines()[0].strip()
1100 stop_script_elements = stage_element.getElementsByTagName(
'stopscript')
1101 if stop_script_elements:
1102 self.
stop_script = stop_script_elements[0].firstChild.data
1109 jobinfo = subprocess.Popen([
'which', self.
stop_script],
1110 stdout=subprocess.PIPE,
1111 stderr=subprocess.PIPE)
1112 jobout, joberr = jobinfo.communicate()
1116 script_path = jobout.splitlines()[0].strip()
1128 result =
'Stage name = %s\n' % self.
name 1129 result =
'Batch job name = %s\n' % self.
batchname 1132 result +=
'Fcl filename = %s\n' % fcl
1133 result +=
'Output directory = %s\n' % self.
outdir 1134 result +=
'Log directory = %s\n' % self.
logdir 1135 result +=
'Work directory = %s\n' % self.
workdir 1136 result +=
'Bookkeeping directory = %s\n' % self.
bookdir 1137 result +=
'Maximum directory size = %d\n' % self.
dirsize 1138 result +=
'Extra directory levels = %d\n' % self.
dirlevels 1139 result +=
'Dynamic directories = %d\n' % self.
dynamic 1140 result +=
'Input file = %s\n' % self.
inputfile 1141 result +=
'Input list = %s\n' % self.
inputlist 1142 result +=
'Input mode = %s\n' % self.
inputmode 1143 result +=
'Input sam dataset = %s' % self.
inputdef 1145 result +=
' (recursive)' 1147 result +=
'Base sam dataset = %s\n' % self.
basedef 1148 result +=
'Analysis flag = %d\n' % self.
ana 1149 result +=
'Recursive flag = %d\n' % self.
recur 1150 result +=
'Recursive type = %s\n' % self.
recurtype 1151 result +=
'Recursive limit = %d\n' % self.
recurlimit 1152 result +=
'Single run flag = %d\n' % self.
singlerun 1153 result +=
'File list definition flag = %d\n' % self.
filelistdef 1154 result +=
'Prestart flag = %d\n' % self.
prestart 1155 result +=
'Active projects base name = %s\n' % self.
activebase 1156 result +=
'Dropbox waiting interval = %f\n' % self.
dropboxwait 1158 result +=
'Input stream = %s\n' % self.
inputstream 1160 result +=
'Mix input sam dataset = %s\n' % self.
mixinputdef 1162 result +=
'Pubs input mode = %d\n' % self.
pubs_input 1163 result +=
'Pubs input run number = %d\n' % self.
input_run 1165 result +=
'Pubs input subrun number = %d\n' % subrun
1166 result +=
'Pubs input version number = %d\n' % self.
input_version 1167 result +=
'Pubs output mode = %d\n' % self.
pubs_output 1168 result +=
'Pubs output run number = %d\n' % self.
output_run 1170 result +=
'Pubs output subrun number = %d\n' % subrun
1171 result +=
'Pubs output version number = %d\n' % self.
output_version 1172 result +=
'Output file name = %s\n' % self.
output 1173 result +=
'TFileName = %s\n' % self.
TFileName 1174 result +=
'Number of jobs = %d\n' % self.
num_jobs 1175 result +=
'Number of events = %d\n' % self.
num_events 1178 result +=
'Output file target size = %d\n' % self.
target_size 1179 result +=
'Dataset definition name = %s\n' % self.
defname 1180 result +=
'Analysis dataset definition name = %s\n' % self.
ana_defname 1181 result +=
'Data tier = %s\n' % self.
data_tier 1186 result +=
'Worker initialization script = %s\n' % self.
init_script 1187 result +=
'Worker initialization source script = %s\n' % self.
init_source 1188 result +=
'Worker end-of-job script = %s\n' % self.
end_script 1189 result +=
'Worker midstage source initialization scripts = %s\n' % self.
mid_source 1190 result +=
'Worker midstage finalization scripts = %s\n' % self.
mid_script 1191 result +=
'Project name overrides = %s\n' % self.
project_name 1192 result +=
'Stage name overrides = %s\n' % self.
stage_name 1194 result +=
'Special histogram merging program = %s\n' % self.
merge 1195 result +=
'Analysis merge flag = %s\n' % self.
anamerge 1196 result +=
'Resource = %s\n' % self.
resource 1197 result +=
'Lines = %s\n' % self.
lines 1198 result +=
'Site = %s\n' % self.
site 1199 result +=
'Blacklist = %s\n' % self.
blacklist 1200 result +=
'Cpu = %d\n' % self.
cpu 1201 result +=
'Disk = %s\n' % self.
disk 1203 result +=
'Memory = %d MB\n' % self.
memory 1204 result +=
'Metadata parameters:\n' 1206 result +=
'%s: %s\n' % (key,self.
parameters[key])
1207 result +=
'Output file name = %s\n' % self.
output 1208 result +=
'TFile name = %s\n' % self.
TFileName 1209 result +=
'Jobsub_submit options = %s\n' % self.
jobsub 1210 result +=
'Jobsub_submit start/stop options = %s\n' % self.
jobsub_start 1212 result +=
'Executables = %s\n' % self.
exe 1213 result +=
'Schema = %s\n' % self.
schema 1215 result +=
'Upload-on-worker = %d\n' % self.
copy_to_fts 1216 result +=
'Cvmfs flag = %d\n' % self.
cvmfs 1217 result +=
'Stash cache flag = %d\n' % self.
stash 1218 result +=
'Singularity flag = %d\n' % self.
singularity 1219 result +=
'Batch script = %s\n' % self.
script 1284 raise RuntimeError(
'Pubs input for single file input is not supported.')
1300 newdef = project_utilities.create_limited_dataset(self.
inputdef,
1304 raise PubsInputError(run, subruns[0], version)
1311 if files_per_job == 0:
1313 self.
num_jobs = (len(subruns) + files_per_job - 1) / files_per_job
1325 if len(subruns) == 1:
1330 pubs_path =
'%d/%d' % (run, subruns[0])
1332 pubs_path =
'%d/%d/%d' % (version, run, subruns[0])
1335 self.
inputlist = os.path.join(dir, pubs_path, base)
1341 lines = larbatch_posix.readlines(self.
inputlist)
1345 raise PubsInputError(run, subruns[0], version)
1350 input_file = line.strip()
1351 if not larbatch_posix.exists(input_file):
1352 raise PubsInputError(run, subruns[0], version)
1364 if len(subruns) > 1:
1375 if larbatch_posix.exists(new_inputlist_path):
1376 new_inputlist_path =
'%s/%s_%s.list' % (dir, base,
str(uuid.uuid4()))
1382 new_inputlist_file =
None 1390 for subrun
in subruns:
1398 pubs_path =
'%d/%d' % (run, subrun)
1400 pubs_path =
'%d/%d/%d' % (version, run, subrun)
1402 subrun_inputlist = os.path.join(dir, pubs_path, base)
1405 lines = larbatch_posix.readlines(subrun_inputlist)
1409 raise PubsInputError(run, subruns[0], version)
1411 subrun_inputfile = line.strip()
1417 sr = larbatch_posix.stat(subrun_inputfile)
1418 sr_size = sr.st_size
1423 actual_subruns.append(subrun)
1424 if new_inputlist_file ==
None:
1425 print(
'Generating new input list %s\n' % new_inputlist_path)
1426 new_inputlist_file = larbatch_posix.open(new_inputlist_path,
'w')
1427 new_inputlist_file.write(
'%s\n' % subrun_inputfile)
1428 total_size += sr.st_size
1440 new_inputlist_file.close()
1444 if len(actual_subruns) == 0:
1445 raise PubsInputError(run, subruns[0], version)
1449 if len(actual_subruns) != len(subruns):
1450 print(
'Truncating subrun list: %s' %
str(actual_subruns))
1452 subruns.extend(actual_subruns)
1458 if files_per_job == 0:
1460 self.
num_jobs = (len(subruns) + files_per_job - 1) / files_per_job
1489 if len(subruns) == 1:
1491 pubs_path =
'%d/%d' % (run, subruns[0])
1493 pubs_path =
'%d/%d/%d' % (version, run, subruns[0])
1497 pubs_path =
'%d/@s' % run
1499 pubs_path =
'%d/%d/@s' % (version, run)
1514 print(
'Running presubmission check script', end=
' ')
1516 print(word, end=
' ')
1519 stdout=subprocess.PIPE,
1520 stderr=subprocess.PIPE)
1522 thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess,
1525 thread.join(timeout=60)
1526 if thread.is_alive():
1527 print(
'Submit script timed out, terminating.')
1533 print(
'Script exit status = %d' % rc)
1534 print(
'Script standard output:')
1536 print(
'Script diagnostic output:')
1551 raise IOError(
'Input file %s does not exist.' % self.
inputfile)
1553 raise IOError(
'Input list %s does not exist.' % self.
inputlist)
1562 project_utilities.make_active_project_dataset(self.
activebase,
1572 input_filenames = larbatch_posix.readlines(self.
inputlist)
1574 for line
in input_filenames:
1575 filename = line.split()[0]
1576 filesize = larbatch_posix.stat(filename).st_size
1577 size_tot = size_tot + filesize
1579 if new_num_jobs < 1:
1583 print(
"Ideal number of jobs based on target file size is %d." % new_num_jobs)
1585 print(
"Updating number of jobs from %d to %d." % (self.
num_jobs, new_num_jobs))
1595 samweb = project_utilities.samweb()
1596 print(
"Doing single run processing.")
1601 dim =
'defname: %s' % self.
inputdef 1603 input_files = list(project_utilities.listFiles(dim))
1605 input_files = samweb.listFiles(dimensions=dim)
1606 if len(input_files) > 0:
1607 random_file = random.choice(input_files)
1608 print(
'Example file: %s' % random_file)
1612 md = samweb.getMetadata(random_file)
1613 run_tuples = md[
'runs']
1614 if len(run_tuples) > 0:
1615 run = run_tuples[0][0]
1616 print(
'Input files will be limited to run %d.' % run)
1621 newdef =
'%s_run_%d' % (samweb.makeProjectName(self.
inputdef), run)
1624 desc = samweb.descDefinition(defname=newdef)
1626 except samweb_cli.exceptions.DefinitionNotFound:
1629 print(
'Creating dataset definition %s' % newdef)
1630 newdim =
'defname: %s and run_number %d' % (self.
inputdef, run)
1631 samweb.createDefinition(defname=newdef, dims=newdim)
1635 print(
'Problem extracting run number from example file.')
1639 print(
'Input dataset is empty.')
1651 samweb = project_utilities.samweb()
1652 dim =
'defname: %s' % self.
inputdef 1656 files = project_utilities.listFiles(dim)
1659 sum = samweb.listFilesSummary(dimensions=dim)
1660 nfiles = sum[
'file_count']
1661 print(
'Input dataset %s has %d files.' % (self.
inputdef, nfiles))
1666 if max_files > 0
and max_files < nfiles:
1668 while len(files) > max_files:
1670 dim =
'defname: %s' % project_utilities.makeFileListDefinition(files)
1672 dim +=
' with limit %d' % max_files
1673 sum = samweb.listFilesSummary(dimensions=dim)
1674 size_tot = sum[
'total_file_size']
1675 nfiles = sum[
'file_count']
1678 dim =
'defname: %s' % project_utilities.makeFileListDefinition(files)
1679 sum = samweb.listFilesSummary(dimensions=dim)
1680 size_tot = sum[
'total_file_size']
1681 nfiles = sum[
'file_count']
1686 if new_num_jobs < 1:
1695 if new_num_jobs < 1:
1700 print(
"Ideal number of jobs based on target file size is %d." % new_num_jobs)
1702 print(
"Updating number of jobs from %d to %d." % (self.
num_jobs, new_num_jobs))
1704 print(
"Ideal number of files per job is %d." % new_max_files_per_job)
1706 print(
"Updating maximum files per job from %d to %d." % (
1710 print(
'Input dataset is empty.')
1718 if self.
inputdef !=
'' and checkdef
and not checkok:
1719 samweb = project_utilities.samweb()
1722 files = project_utilities.listFiles(
'defname: %s' % self.
inputdef)
1725 sum = samweb.listFilesSummary(defname=self.
inputdef)
1726 n = sum[
'file_count']
1727 print(
'Input dataset %s contains %d files.' % (self.
inputdef, n))
1741 if not larbatch_posix.exists(self.
outdir):
1742 raise IOError(
'Output directory %s does not exist.' % self.
outdir)
1743 if not larbatch_posix.exists(self.
logdir):
1744 raise IOError(
'Log directory %s does not exist.' % self.
logdir)
1750 if not larbatch_posix.exists(self.
outdir):
1751 raise IOError(
'Output directory %s does not exist.' % self.
outdir)
1753 raise IOError(
'Log directory %s does not exist.' % self.
logdir)
1754 if not larbatch_posix.exists(self.
workdir):
1755 raise IOError(
'Work directory %s does not exist.' % self.
workdir)
1757 raise IOError(
'Bookkeeping directory %s does not exist.' % self.
bookdir)
1763 if not larbatch_posix.exists(self.
outdir):
1764 larbatch_posix.makedirs(self.
outdir)
1766 larbatch_posix.makedirs(self.
logdir)
1767 if not larbatch_posix.exists(self.
workdir):
1768 larbatch_posix.makedirs(self.
workdir)
1770 larbatch_posix.makedirs(self.
bookdir)
1774 if self.
outdir[0:6] ==
'/pnfs/':
1775 mode = stat.S_IMODE(larbatch_posix.stat(self.
outdir).st_mode)
1776 if not mode & stat.S_IWGRP:
1777 mode = mode | stat.S_IWGRP
1778 larbatch_posix.chmod(self.
outdir, mode)
1779 if self.
logdir[0:6] ==
'/pnfs/':
1780 mode = stat.S_IMODE(larbatch_posix.stat(self.
logdir).st_mode)
1781 if not mode & stat.S_IWGRP:
1782 mode = mode | stat.S_IWGRP
1783 larbatch_posix.chmod(self.
logdir, mode)
def checkinput(self, checkdef=False)
def __init__(self, stage_element, base_stage, default_input_lists, default_previous_stage, default_num_jobs, default_num_events, default_max_files_per_job, default_merge, default_anamerge, default_cpu, default_disk, default_memory, default_validate_on_worker, default_copy_to_fts, default_cvmfs, default_stash, default_singularity, default_script, default_start_script, default_stop_script, default_site, default_blacklist, check=True)
def check_output_dirs(self)
def pubsify_output(self, run, subruns, version)
void split(std::string const &s, char c, OutIter dest)
def pubsify_input(self, run, subruns, version)