stagedef.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 ######################################################################
3 #
4 # Name: stagedef.py
5 #
6 # Purpose: Python class StageDef (used by project.py).
7 #
8 # Created: 12-Dec-2014 Herbert Greenlee
9 #
10 ######################################################################
11 
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys, os, stat, math, subprocess, random
15 import threading
16 try:
17  import queue
18 except ImportError:
19  import Queue as queue
20 import samweb_cli
21 import project_utilities
22 import larbatch_utilities
23 from larbatch_utilities import convert_str
24 import larbatch_posix
25 import uuid
26 import math
27 from project_modules.xmlerror import XMLError
28 from project_modules.pubsinputerror import PubsInputError
29 from project_modules.pubsdeadenderror import PubsDeadEndError
30 
31 # Stage definition class.
32 
33 class StageDef:
34 
35  # Constructor.
36 
37  def __init__(self, stage_element, base_stage, default_input_lists, default_previous_stage,
38  default_num_jobs, default_num_events, default_max_files_per_job, default_merge,
39  default_anamerge,
40  default_cpu, default_disk, default_memory, default_validate_on_worker,
41  default_copy_to_fts, default_cvmfs, default_stash, default_singularity,
42  default_script, default_start_script, default_stop_script,
43  default_site, default_blacklist, check=True):
44 
45  # Assign default values.
46 
47  if base_stage != None:
48  self.name = base_stage.name
49  self.batchname = base_stage.batchname
50  self.fclname = base_stage.fclname
51  self.outdir = base_stage.outdir
52  self.logdir = base_stage.logdir
53  self.workdir = base_stage.workdir
54  self.bookdir = base_stage.bookdir
55  self.dirsize = base_stage.dirsize
56  self.dirlevels = base_stage.dirlevels
57  self.dynamic = base_stage.dynamic
58  self.inputfile = base_stage.inputfile
59  self.inputlist = base_stage.inputlist
60  self.inputmode = base_stage.inputmode
61  self.basedef = base_stage.basedef
62  self.inputdef = base_stage.inputdef
63  self.inputstream = base_stage.inputstream
64  self.previousstage = base_stage.previousstage
65  self.mixinputdef = base_stage.mixinputdef
66  self.pubs_input_ok = base_stage.pubs_input_ok
67  self.pubs_input = base_stage.pubs_input
68  self.input_run = base_stage.input_run
69  self.input_subruns = base_stage.input_subruns
70  self.input_version = base_stage.input_version
71  self.pubs_output = base_stage.pubs_output
72  self.output_run = base_stage.output_run
73  self.output_subruns = base_stage.output_subruns
74  self.output_version = base_stage.output_version
75  self.ana = base_stage.ana
76  self.recur = base_stage.recur
77  self.recurtype = base_stage.recurtype
78  self.recurlimit = base_stage.recurlimit
79  self.singlerun = base_stage.singlerun
80  self.filelistdef = base_stage.filelistdef
81  self.prestart = base_stage.prestart
82  self.activebase = base_stage.activebase
83  self.dropboxwait = base_stage.dropboxwait
84  self.prestagefraction = base_stage.prestagefraction
85  self.maxfluxfilemb = base_stage.maxfluxfilemb
86  self.num_jobs = base_stage.num_jobs
87  self.num_events = base_stage.num_events
88  self.max_files_per_job = base_stage.max_files_per_job
89  self.target_size = base_stage.target_size
90  self.defname = base_stage.defname
91  self.ana_defname = base_stage.ana_defname
92  self.data_tier = base_stage.data_tier
93  self.data_stream = base_stage.data_stream
94  self.ana_data_tier = base_stage.ana_data_tier
95  self.ana_data_stream = base_stage.ana_data_stream
96  self.submit_script = base_stage.submit_script
97  self.init_script = base_stage.init_script
98  self.init_source = base_stage.init_source
99  self.end_script = base_stage.end_script
100  self.mid_source = base_stage.mid_source
101  self.mid_script = base_stage.mid_script
102  self.project_name = base_stage.project_name
103  self.stage_name = base_stage.stage_name
104  self.project_version = base_stage.project_version
105  self.merge = base_stage.merge
106  self.anamerge = base_stage.anamerge
107  self.resource = base_stage.resource
108  self.lines = base_stage.lines
109  self.site = base_stage.site
110  self.blacklist = base_stage.blacklist
111  self.cpu = base_stage.cpu
112  self.disk = base_stage.disk
113  self.datafiletypes = base_stage.datafiletypes
114  self.memory = base_stage.memory
115  self.parameters = base_stage.parameters
116  self.output = base_stage.output
117  self.TFileName = base_stage.TFileName
118  self.jobsub = base_stage.jobsub
119  self.jobsub_start = base_stage.jobsub_start
120  self.jobsub_timeout = base_stage.jobsub_timeout
121  self.exe = base_stage.exe
122  self.schema = base_stage.schema
123  self.validate_on_worker = base_stage. validate_on_worker
124  self.copy_to_fts = base_stage.copy_to_fts
125  self.cvmfs = base_stage.cvmfs
126  self.stash = base_stage.stash
127  self.singularity = base_stage.singularity
128  self.script = base_stage.script
129  self.start_script = base_stage.start_script
130  self.stop_script = base_stage.stop_script
131  else:
132  self.name = '' # Stage name.
133  self.batchname = '' # Batch job name
134  self.fclname = []
135  self.outdir = '' # Output directory.
136  self.logdir = '' # Log directory.
137  self.workdir = '' # Work directory.
138  self.bookdir = '' # Bookkeeping directory.
139  self.dirsize = 0 # Maximum directory size.
140  self.dirlevels = 0 # Number of extra directory levels.
141  self.dynamic = 0 # Dynamic output/log directory.
142  self.inputfile = '' # Single input file.
143  self.inputlist = '' # Input file list.
144  self.inputmode = '' # Input file type (none or textfile)
145  self.basedef = '' # Base sam dataset definition.
146  self.inputdef = '' # Input sam dataset definition.
147  self.inputstream = '' # Input file stream.
148  self.previousstage = '' # Previous stage name.
149  self.mixinputdef = '' # Mix input sam dataset definition.
150  self.pubs_input_ok = 1 # Is pubs input allowed?
151  self.pubs_input = 0 # Pubs input mode.
152  self.input_run = 0 # Pubs input run.
153  self.input_subruns = [] # Pubs input subrun number(s).
154  self.input_version = 0 # Pubs input version number.
155  self.pubs_output = 0 # Pubs output mode.
156  self.output_run = 0 # Pubs output run.
157  self.output_subruns = [] # Pubs output subrun number.
158  self.output_version = 0 # Pubs output version number.
159  self.ana = 0 # Analysis flag.
160  self.recur = 0 # Recursive flag.
161  self.recurtype = '' # Recursive type.
162  self.recurlimit = 0 # Recursive limit.
163  self.singlerun=0 # Single run mode.
164  self.filelistdef=0 # Convert sam input def to file list.
165  self.prestart = 0 # Prestart flag.
166  self.activebase = '' # Active projects base name.
167  self.dropboxwait = 0. # Dropbox waiting interval.
168  self.prestagefraction = 0. # Prestage fraction.
169  self.maxfluxfilemb = 0 # MaxFluxFileMB (size of genie flux files to fetch).
170  self.num_jobs = default_num_jobs # Number of jobs.
171  self.num_events = default_num_events # Number of events.
172  self.max_files_per_job = default_max_files_per_job #max num of files per job
173  self.target_size = 0 # Target size for output files.
174  self.defname = '' # Sam dataset definition name.
175  self.ana_defname = '' # Sam dataset definition name.
176  self.data_tier = '' # Sam data tier.
177  self.data_stream = [] # Sam data stream.
178  self.ana_data_tier = '' # Sam data tier.
179  self.ana_data_stream = [] # Sam data stream.
180  self.submit_script = '' # Submit script.
181  self.init_script = [] # Worker initialization script.
182  self.init_source = [] # Worker initialization bash source script.
183  self.end_script = [] # Worker end-of-job script.
184  self.mid_source = {} # Worker midstage source init scripts.
185  self.mid_script = {} # Worker midstage finalization scripts.
186  self.project_name = [] # Project name overrides.
187  self.stage_name = [] # Stage name overrides.
188  self.project_version = [] # Project version overrides.
189  self.merge = default_merge # Histogram merging program
190  self.anamerge = default_anamerge # Analysis merge flag.
191  self.resource = '' # Jobsub resources.
192  self.lines = '' # Arbitrary condor commands.
193  self.site = default_site # Site.
194  self.blacklist = default_blacklist # Blacklist site.
195  self.cpu = default_cpu # Number of cpus.
196  self.disk = default_disk # Disk space (string value+unit).
197  self.datafiletypes = ["root"] # Data file types.
198  self.memory = default_memory # Amount of memory (integer MB).
199  self.parameters = {} # Dictionary of metadata parameters.
200  self.output = [] # Art output file names.
201  self.TFileName = '' # TFile output file name.
202  self.jobsub = '' # Arbitrary jobsub_submit options.
203  self.jobsub_start = '' # Arbitrary jobsub_submit options for sam start/stop jobs.
204  self.jobsub_timeout = 0 # Jobsub submit timeout.
205  self.exe = [] # Art-like executables.
206  self.schema = '' # Sam schema.
207  self.validate_on_worker = default_validate_on_worker # Validate-on-worker flag.
208  self.copy_to_fts = default_copy_to_fts # Upload-on-worker flag.
209  self.cvmfs = default_cvmfs # Default cvmfs flag.
210  self.stash = default_stash # Default stash flag.
211  self.singularity = default_singularity # Default singularity flag.
212  self.script = default_script # Upload-on-worker flag.
213  self.start_script = default_start_script # Upload-on-worker flag.
214  self.stop_script = default_stop_script # Upload-on-worker flag.
215 
216  # Extract values from xml.
217 
218  # Stage name (attribute).
219 
220  if 'name' in dict(stage_element.attributes):
221  self.name = str(stage_element.attributes['name'].firstChild.data)
222  if self.name == '':
223  raise XMLError("Stage name not specified.")
224 
225  # Batch job name (subelement).
226 
227  batchname_elements = stage_element.getElementsByTagName('batchname')
228  if batchname_elements:
229  self.batchname = str(batchname_elements[0].firstChild.data)
230 
231  # Fcl file name (repeatable subelement).
232 
233  fclname_elements = stage_element.getElementsByTagName('fcl')
234  if len(fclname_elements) > 0:
235  self.fclname = []
236  for fcl in fclname_elements:
237  self.fclname.append(str(fcl.firstChild.data).strip())
238  if len(self.fclname) == 0:
239  raise XMLError('No Fcl names specified for stage %s.' % self.name)
240 
241  # Output directory (subelement).
242 
243  outdir_elements = stage_element.getElementsByTagName('outdir')
244  if outdir_elements:
245  self.outdir = str(outdir_elements[0].firstChild.data)
246  if self.outdir == '':
247  raise XMLError('Output directory not specified for stage %s.' % self.name)
248 
249  # Log directory (subelement).
250 
251  logdir_elements = stage_element.getElementsByTagName('logdir')
252  if logdir_elements:
253  self.logdir = str(logdir_elements[0].firstChild.data)
254  if self.logdir == '':
255  self.logdir = self.outdir
256 
257  # Work directory (subelement).
258 
259  workdir_elements = stage_element.getElementsByTagName('workdir')
260  if workdir_elements:
261  self.workdir = str(workdir_elements[0].firstChild.data)
262  if self.workdir == '':
263  raise XMLError('Work directory not specified for stage %s.' % self.name)
264 
265  # Bookkeeping directory (subelement).
266 
267  bookdir_elements = stage_element.getElementsByTagName('bookdir')
268  if bookdir_elements:
269  self.bookdir = str(bookdir_elements[0].firstChild.data)
270  if self.bookdir == '':
271  self.bookdir = self.logdir
272 
273  # Maximum directory size (subelement).
274 
275  dirsize_elements = stage_element.getElementsByTagName('dirsize')
276  if dirsize_elements:
277  self.dirsize = int(dirsize_elements[0].firstChild.data)
278 
279  # Extra directory levels (subelement).
280 
281  dirlevels_elements = stage_element.getElementsByTagName('dirlevels')
282  if dirlevels_elements:
283  self.dirlevels = int(dirlevels_elements[0].firstChild.data)
284 
285  # Single input file (subelement).
286 
287  inputfile_elements = stage_element.getElementsByTagName('inputfile')
288  if inputfile_elements:
289  self.inputfile = str(inputfile_elements[0].firstChild.data)
290 
291  # Input file list (subelement).
292 
293  inputlist_elements = stage_element.getElementsByTagName('inputlist')
294  if inputlist_elements:
295  self.inputlist = str(inputlist_elements[0].firstChild.data)
296 
297  # Input file type (subelement).
298 
299  inputmode_elements = stage_element.getElementsByTagName('inputmode')
300  if inputmode_elements:
301  self.inputmode = str(inputmode_elements[0].firstChild.data)
302 
303  # Input sam dataset dfeinition (subelement).
304 
305  inputdef_elements = stage_element.getElementsByTagName('inputdef')
306  if inputdef_elements:
307  self.inputdef = str(inputdef_elements[0].firstChild.data)
308 
309  # Analysis flag (subelement).
310 
311  ana_elements = stage_element.getElementsByTagName('ana')
312  if ana_elements:
313  self.ana = int(ana_elements[0].firstChild.data)
314 
315  # Recursive flag (subelement).
316 
317  recur_elements = stage_element.getElementsByTagName('recur')
318  if recur_elements:
319  self.recur = int(recur_elements[0].firstChild.data)
320 
321  # Recursive type (subelement).
322 
323  recurtype_elements = stage_element.getElementsByTagName('recurtype')
324  if recurtype_elements:
325  self.recurtype = str(recurtype_elements[0].firstChild.data)
326 
327  # Recursive limit (subelement).
328 
329  recurlimit_elements = stage_element.getElementsByTagName('recurlimit')
330  if recurlimit_elements:
331  self.recurlimit = int(recurlimit_elements[0].firstChild.data)
332 
333  # Recursive input sam dataset dfeinition (subelement).
334 
335  recurdef_elements = stage_element.getElementsByTagName('recurdef')
336  if recurdef_elements:
337  self.basedef = self.inputdef
338  self.inputdef = str(recurdef_elements[0].firstChild.data)
339  self.recur = 1
340 
341  # Single run flag (subelement).
342 
343  singlerun_elements = stage_element.getElementsByTagName('singlerun')
344  if singlerun_elements:
345  self.singlerun = int(singlerun_elements[0].firstChild.data)
346 
347  # File list definition flag (subelement).
348 
349  filelistdef_elements = stage_element.getElementsByTagName('filelistdef')
350  if filelistdef_elements:
351  self.filelistdef = int(filelistdef_elements[0].firstChild.data)
352 
353  # Prestart flag.
354 
355  prestart_elements = stage_element.getElementsByTagName('prestart')
356  if prestart_elements:
357  self.prestart = int(prestart_elements[0].firstChild.data)
358 
359  # Active projects basename.
360 
361  activebase_elements = stage_element.getElementsByTagName('activebase')
362  if activebase_elements:
363  self.activebase = str(activebase_elements[0].firstChild.data)
364 
365  # Dropbox wait interval.
366 
367  dropboxwait_elements = stage_element.getElementsByTagName('dropboxwait')
368  if dropboxwait_elements:
369  self.dropboxwait = float(dropboxwait_elements[0].firstChild.data)
370 
371  # Prestage fraction (subelement).
372 
373  prestagefraction_elements = stage_element.getElementsByTagName('prestagefraction')
374  if prestagefraction_elements:
375  self.prestagefraction = float(prestagefraction_elements[0].firstChild.data)
376 
377  # Input stream (subelement).
378 
379  inputstream_elements = stage_element.getElementsByTagName('inputstream')
380  if inputstream_elements:
381  self.inputstream = str(inputstream_elements[0].firstChild.data)
382 
383  # Previous stage name (subelement).
384 
385  previousstage_elements = stage_element.getElementsByTagName('previousstage')
386  if previousstage_elements:
387  self.previousstage = str(previousstage_elements[0].firstChild.data)
388 
389  # If a base stage was specified, nullify any input inherted from base.
390 
391  if base_stage != None:
392  self.inputfile = ''
393  self.inputlist = ''
394  self.inputdef = ''
395 
396  # It never makes sense to specify a previous stage with some other input.
397 
398  if self.inputfile != '' or self.inputlist != '' or self.inputdef != '':
399  raise XMLError('Previous stage and input specified for stage %s.' % self.name)
400 
401  # Mix input sam dataset (subelement).
402 
403  mixinputdef_elements = stage_element.getElementsByTagName('mixinputdef')
404  if mixinputdef_elements:
405  self.mixinputdef = str(mixinputdef_elements[0].firstChild.data)
406 
407  # It is an error to specify both input file and input list.
408 
409  if self.inputfile != '' and self.inputlist != '':
410  raise XMLError('Input file and input list both specified for stage %s.' % self.name)
411 
412  # It is an error to specify either input file or input list together
413  # with a sam input dataset.
414 
415  if self.inputdef != '' and (self.inputfile != '' or self.inputlist != ''):
416  raise XMLError('Input dataset and input files specified for stage %s.' % self.name)
417 
418  # It is an error to use textfile inputmode without an inputlist or inputfile
419  if self.inputmode == 'textfile' and self.inputlist == '' and self.inputfile == '':
420  raise XMLError('Input list (inputlist) or inputfile is needed for textfile model.')
421 
422  # If none of input definition, input file, nor input list were specified, set
423  # the input list to the dafault input list. If an input stream was specified,
424  # insert it in front of the file type.
425 
426  if self.inputfile == '' and self.inputlist == '' and self.inputdef == '':
427 
428  # Get the default input list according to the previous stage.
429 
430  default_input_list = ''
431  previous_stage_name = default_previous_stage
432  if self.previousstage != '':
433  previous_stage_name = self.previousstage
434  if previous_stage_name in default_input_lists:
435  default_input_list = default_input_lists[previous_stage_name]
436 
437  # Modify default input list according to input stream, if any.
438 
439  if self.inputstream == '' or default_input_list == '':
440  self.inputlist = default_input_list
441  else:
442  n = default_input_list.rfind('.')
443  if n < 0:
444  n = len(default_input_list)
445  self.inputlist = '%s_%s%s' % (default_input_list[:n],
446  self.inputstream,
447  default_input_list[n:])
448 
449  # Pubs input flag.
450 
451  pubs_input_ok_elements = stage_element.getElementsByTagName('pubsinput')
452  if pubs_input_ok_elements:
453  self.pubs_input_ok = int(pubs_input_ok_elements[0].firstChild.data)
454 
455  # MaxFluxFileMB GENIEHelper fcl parameter (subelement).
456 
457  maxfluxfilemb_elements = stage_element.getElementsByTagName('maxfluxfilemb')
458  if maxfluxfilemb_elements:
459  self.maxfluxfilemb = int(maxfluxfilemb_elements[0].firstChild.data)
460  else:
461 
462  # If this is a generator job, give maxfluxfilemb parameter a default
463  # nonzero value.
464 
465  if self.inputfile == '' and self.inputlist == '' and self.inputdef == '':
466  self.maxfluxfilemb = 500
467 
468  # Number of jobs (subelement).
469 
470  num_jobs_elements = stage_element.getElementsByTagName('numjobs')
471  if num_jobs_elements:
472  self.num_jobs = int(num_jobs_elements[0].firstChild.data)
473 
474  # Number of events (subelement).
475 
476  num_events_elements = stage_element.getElementsByTagName('numevents')
477  if num_events_elements:
478  self.num_events = int(num_events_elements[0].firstChild.data)
479 
480  # Max Number of files per jobs.
481 
482  max_files_per_job_elements = stage_element.getElementsByTagName('maxfilesperjob')
483  if max_files_per_job_elements:
484  self.max_files_per_job = int(max_files_per_job_elements[0].firstChild.data)
485 
486  # Run number of events (MC Gen only).
487  #overriden by --pubs <run> is running in pubs mode
488 
489  run_number = stage_element.getElementsByTagName('runnumber')
490  if run_number:
491  self.output_run = int(run_number[0].firstChild.data)
492 
493  # Target size for output files (subelement).
494 
495  target_size_elements = stage_element.getElementsByTagName('targetsize')
496  if target_size_elements:
497  self.target_size = int(target_size_elements[0].firstChild.data)
498 
499 
500  # Sam dataset definition name (subelement).
501 
502  defname_elements = stage_element.getElementsByTagName('defname')
503  if defname_elements:
504  self.defname = str(defname_elements[0].firstChild.data)
505 
506  # Sam analysis dataset definition name (subelement).
507 
508  ana_defname_elements = stage_element.getElementsByTagName('anadefname')
509  if ana_defname_elements:
510  self.ana_defname = str(ana_defname_elements[0].firstChild.data)
511 
512  # Sam data tier (subelement).
513 
514  data_tier_elements = stage_element.getElementsByTagName('datatier')
515  if data_tier_elements:
516  self.data_tier = str(data_tier_elements[0].firstChild.data)
517 
518  # Sam data stream (subelement).
519 
520  data_stream_elements = stage_element.getElementsByTagName('datastream')
521  if len(data_stream_elements) > 0:
522  self.data_stream = []
523  for data_stream in data_stream_elements:
524  self.data_stream.append(str(data_stream.firstChild.data))
525 
526  # Sam analysis data tier (subelement).
527 
528  ana_data_tier_elements = stage_element.getElementsByTagName('anadatatier')
529  if ana_data_tier_elements:
530  self.ana_data_tier = str(ana_data_tier_elements[0].firstChild.data)
531 
532  # Sam analysis data stream (subelement).
533 
534  ana_data_stream_elements = stage_element.getElementsByTagName('anadatastream')
535  if len(ana_data_stream_elements) > 0:
536  self.ana_data_stream = []
537  for ana_data_stream in ana_data_stream_elements:
538  self.ana_data_stream.append(str(ana_data_stream.firstChild.data))
539 
540  # Submit script (subelement).
541 
542  submit_script_elements = stage_element.getElementsByTagName('submitscript')
543  if submit_script_elements:
544  self.submit_script = str(submit_script_elements[0].firstChild.data).split()
545 
546  # Make sure submit script exists, and convert into a full path.
547 
548  if check:
549  if len(self.submit_script) > 0:
550  if larbatch_posix.exists(self.submit_script[0]):
551  self.submit_script[0] = os.path.realpath(self.submit_script[0])
552  else:
553 
554  # Look for script on execution path.
555 
556  try:
557  jobinfo = subprocess.Popen(['which', self.submit_script[0]],
558  stdout=subprocess.PIPE,
559  stderr=subprocess.PIPE)
560  jobout, joberr = jobinfo.communicate()
561  jobout = convert_str(jobout)
562  joberr = convert_str(joberr)
563  rc = jobinfo.poll()
564  self.submit_script[0] = jobout.splitlines()[0].strip()
565  except:
566  pass
567  if not larbatch_posix.exists(self.submit_script[0]):
568  raise IOError('Submit script %s not found.' % self.submit_script[0])
569 
570  # Worker initialization script (repeatable subelement).
571 
572  init_script_elements = stage_element.getElementsByTagName('initscript')
573  if len(init_script_elements) > 0:
574  for init_script_element in init_script_elements:
575  init_script = str(init_script_element.firstChild.data)
576 
577  # Make sure init script exists, and convert into a full path.
578 
579  if check:
580  if init_script != '':
581  if larbatch_posix.exists(init_script):
582  init_script = os.path.realpath(init_script)
583  else:
584 
585  # Look for script on execution path.
586 
587  try:
588  jobinfo = subprocess.Popen(['which', init_script],
589  stdout=subprocess.PIPE,
590  stderr=subprocess.PIPE)
591  jobout, joberr = jobinfo.communicate()
592  rc = jobinfo.poll()
593  init_script = convert_str(jobout.splitlines()[0].strip())
594  except:
595  pass
596 
597  if not larbatch_posix.exists(init_script):
598  raise IOError('Init script %s not found.' % init_script)
599 
600  self.init_script.append(init_script)
601 
602  # Worker initialization source script (repeatable subelement).
603 
604  init_source_elements = stage_element.getElementsByTagName('initsource')
605  if len(init_source_elements) > 0:
606  for init_source_element in init_source_elements:
607  init_source = str(init_source_element.firstChild.data)
608 
609  # Make sure init source script exists, and convert into a full path.
610 
611  if init_source != '':
612  if check:
613  if larbatch_posix.exists(init_source):
614  init_source = os.path.realpath(init_source)
615  else:
616 
617  # Look for script on execution path.
618 
619  try:
620  jobinfo = subprocess.Popen(['which', init_source],
621  stdout=subprocess.PIPE,
622  stderr=subprocess.PIPE)
623  jobout, joberr = jobinfo.communicate()
624  rc = jobinfo.poll()
625  init_source = convert_str(jobout.splitlines()[0].strip())
626  except:
627  pass
628 
629  if not larbatch_posix.exists(init_source):
630  raise IOError('Init source script %s not found.' % init_source)
631 
632  # The <initsource> element can occur at the top level of the <stage>
633  # element, or inside a <fcl> element.
634  # Update the StageDef object differently in these two cases.
635 
636  parent_element = init_source_element.parentNode
637  if parent_element.nodeName == 'fcl':
638 
639  # This <initsource> is located inside a <fcl> element.
640  # Find the index of this fcl file.
641  # Python will raise an exception if the fcl can't be found
642  # (shouldn't happen).
643 
644  fcl = str(parent_element.firstChild.data).strip()
645  n = self.fclname.index(fcl)
646  if not n in self.mid_source:
647  self.mid_source[n] = []
648  self.mid_source[n].append(init_source)
649 
650  else:
651 
652  # This is a <stage> level <initsource> element.
653 
654  self.init_source.append(init_source)
655 
656  # Worker end-of-job script (repeatable subelement).
657 
658  end_script_elements = stage_element.getElementsByTagName('endscript')
659  if len(end_script_elements) > 0:
660  for end_script_element in end_script_elements:
661  end_script = str(end_script_element.firstChild.data)
662 
663  # Make sure end-of-job scripts exists, and convert into a full path.
664 
665  if end_script != '':
666  if check:
667  if larbatch_posix.exists(end_script):
668  end_script = os.path.realpath(end_script)
669  else:
670 
671  # Look for script on execution path.
672 
673  try:
674  jobinfo = subprocess.Popen(['which', end_script],
675  stdout=subprocess.PIPE,
676  stderr=subprocess.PIPE)
677  jobout, joberr = jobinfo.communicate()
678  rc = jobinfo.poll()
679  end_script = convert_str(jobout.splitlines()[0].strip())
680  except:
681  pass
682 
683  if not larbatch_posix.exists(end_script):
684  raise IOError('End-of-job script %s not found.' % end_script)
685 
686  # The <endscript> element can occur at the top level of the <stage>
687  # element, or inside a <fcl> element.
688  # Update the StageDef object differently in these two cases.
689 
690  parent_element = end_script_element.parentNode
691  if parent_element.nodeName == 'fcl':
692 
693  # This <endscript> is located inside a <fcl> element.
694  # Find the index of this fcl file.
695  # Python will raise an exception if the fcl can't be found
696  # (shouldn't happen).
697 
698  fcl = str(parent_element.firstChild.data).strip()
699  n = self.fclname.index(fcl)
700  if not n in self.mid_script:
701  self.mid_script[n] = []
702  self.mid_script[n].append(end_script)
703 
704  else:
705 
706  # This is a <stage> level <endscript> element.
707 
708  self.end_script.append(end_script)
709 
710  # Project name overrides (repeatable subelement).
711 
712  project_name_elements = stage_element.getElementsByTagName('projectname')
713  if len(project_name_elements) > 0:
714  for project_name_element in project_name_elements:
715 
716  # Match this project name with its parent fcl element.
717 
718  fcl_element = project_name_element.parentNode
719  if fcl_element.nodeName != 'fcl':
720  raise XMLError("Found <projectname> element outside <fcl> element.")
721  fcl = str(fcl_element.firstChild.data).strip()
722 
723  # Find the index of this fcl file.
724  # Python will raise an exception if the fcl can't be found (shouldn't happen).
725 
726  n = self.fclname.index(fcl)
727 
728  # Make sure project_name list is long enough.
729 
730  while len(self.project_name) < n+1:
731  self.project_name.append('')
732 
733  # Extract project name and add it to list.
734 
735  project_name = str(project_name_element.firstChild.data)
736  self.project_name[n] = project_name
737 
738  # Make sure that the size of the project_name list (if present) ia at least as
739  # long as the fclname list.
740  # If not, extend by adding empty string.
741 
742  if len(self.project_name) > 0:
743  while len(self.project_name) < len(self.fclname):
744  self.project_name.append('')
745 
746  # Stage name overrides (repeatable subelement).
747 
748  stage_name_elements = stage_element.getElementsByTagName('stagename')
749  if len(stage_name_elements) > 0:
750  for stage_name_element in stage_name_elements:
751 
752  # Match this project name with its parent fcl element.
753 
754  fcl_element = stage_name_element.parentNode
755  if fcl_element.nodeName != 'fcl':
756  raise XMLError("Found <stagename> element outside <fcl> element.")
757  fcl = str(fcl_element.firstChild.data).strip()
758 
759  # Find the index of this fcl file.
760  # Python will raise an exception if the fcl can't be found (shouldn't happen).
761 
762  n = self.fclname.index(fcl)
763 
764  # Make sure stage_name list is long enough.
765 
766  while len(self.stage_name) < n+1:
767  self.stage_name.append('')
768 
769  # Extract stage name and add it to list.
770 
771  stage_name = str(stage_name_element.firstChild.data)
772  self.stage_name[n] = stage_name
773 
774  # Make sure that the size of the stage_name list (if present) ia at least as
775  # long as the fclname list.
776  # If not, extend by adding empty string.
777 
778  if len(self.stage_name) > 0:
779  while len(self.stage_name) < len(self.fclname):
780  self.stage_name.append('')
781 
782  # Project version overrides (repeatable subelement).
783 
784  project_version_elements = stage_element.getElementsByTagName('version')
785  if len(project_version_elements) > 0:
786  for project_version_element in project_version_elements:
787 
788  # Match this project version with its parent fcl element.
789 
790  fcl_element = project_version_element.parentNode
791  if fcl_element.nodeName != 'fcl':
792  raise XMLError("Found stage level <version> element outside <fcl> element.")
793  fcl = str(fcl_element.firstChild.data).strip()
794 
795  # Find the index of this fcl file.
796  # Python will raise an exception if the fcl can't be found (shouldn't happen).
797 
798  n = self.fclname.index(fcl)
799 
800  # Make sure project_version list is long enough.
801 
802  while len(self.project_version) < n+1:
803  self.project_version.append('')
804 
805  # Extract project version and add it to list.
806 
807  project_version = str(project_version_element.firstChild.data)
808  self.project_version[n] = project_version
809 
810  # Make sure that the size of the project_version list (if present) ia at least as
811  # long as the fclname list.
812  # If not, extend by adding empty string.
813 
814  if len(self.project_version) > 0:
815  while len(self.project_version) < len(self.fclname):
816  self.project_version.append('')
817 
818  # Histogram merging program.
819 
820  merge_elements = stage_element.getElementsByTagName('merge')
821  if merge_elements:
822  self.merge = str(merge_elements[0].firstChild.data)
823 
824  # Analysis merge flag.
825 
826  anamerge_elements = stage_element.getElementsByTagName('anamerge')
827  if anamerge_elements:
828  self.anamerge = str(anamerge_elements[0].firstChild.data)
829 
830  # Resource (subelement).
831 
832  resource_elements = stage_element.getElementsByTagName('resource')
833  if resource_elements:
834  self.resource = str(resource_elements[0].firstChild.data)
835  self.resource = ''.join(self.resource.split())
836 
837  # Lines (subelement).
838 
839  lines_elements = stage_element.getElementsByTagName('lines')
840  if lines_elements:
841  self.lines = str(lines_elements[0].firstChild.data)
842 
843  # Site (subelement).
844 
845  site_elements = stage_element.getElementsByTagName('site')
846  if site_elements:
847  self.site = str(site_elements[0].firstChild.data)
848  self.site = ''.join(self.site.split())
849 
850  # Blacklist (subelement).
851 
852  blacklist_elements = stage_element.getElementsByTagName('blacklist')
853  if blacklist_elements:
854  self.blacklist = str(blacklist_elements[0].firstChild.data)
855  self.blacklist = ''.join(self.blacklist.split())
856 
857  # Cpu (subelement).
858 
859  cpu_elements = stage_element.getElementsByTagName('cpu')
860  if cpu_elements:
861  self.cpu = int(cpu_elements[0].firstChild.data)
862 
863  # Disk (subelement).
864 
865  disk_elements = stage_element.getElementsByTagName('disk')
866  if disk_elements:
867  self.disk = str(disk_elements[0].firstChild.data)
868  self.disk = ''.join(self.disk.split())
869 
870  # Data file types (subelement).
871 
872  datafiletypes_elements = stage_element.getElementsByTagName('datafiletypes')
873  if datafiletypes_elements:
874  data_file_types_str = str(datafiletypes_elements[0].firstChild.data)
875  data_file_types_str = ''.join(data_file_types_str.split())
876  self.datafiletypes = data_file_types_str.split(',')
877 
878  # Memory (subelement).
879 
880  memory_elements = stage_element.getElementsByTagName('memory')
881  if memory_elements:
882  self.memory = int(memory_elements[0].firstChild.data)
883 
884  # Dictionary of metadata parameters (repeatable subelement).
885 
886  param_elements = stage_element.getElementsByTagName('parameter')
887  if len(param_elements) > 0:
888  self.parameters = {}
889  for param_element in param_elements:
890  name = str(param_element.attributes['name'].firstChild.data)
891  value = str(param_element.firstChild.data)
892  self.parameters[name] = value
893 
894  # Output file name (repeatable subelement).
895 
896  output_elements = stage_element.getElementsByTagName('output')
897  if len(output_elements) > 0:
898 
899  # The output element can occur once at the top level of the <stage> element, or
900  # inside a <fcl> element. The former applies globally. The latter applies
901  # only to that fcl substage.
902 
903  # Loop over global output elements.
904 
905  for output_element in output_elements:
906  parent_element = output_element.parentNode
907  if parent_element.nodeName != 'fcl':
908  output = str(output_element.firstChild.data)
909  self.output = []
910  while len(self.output) < len(self.fclname):
911  self.output.append(output)
912 
913  # Loop over fcl output elements.
914 
915  for output_element in output_elements:
916  parent_element = output_element.parentNode
917  if parent_element.nodeName == 'fcl':
918 
919  # Match this output name with its parent fcl element.
920 
921  fcl = str(parent_element.firstChild.data).strip()
922  n = self.fclname.index(fcl)
923 
924  # Make sure project_name list is long enough.
925 
926  while len(self.output) < n+1:
927  self.output.append('')
928 
929  # Extract output name and add it to list.
930 
931  output = str(output_element.firstChild.data)
932  self.output[n] = output
933 
934  # Make sure that the size of the output list (if present) ia at least as
935  # long as the fclname list.
936  # If not, extend by adding empty string.
937 
938  if len(self.output) > 0:
939  while len(self.output) < len(self.fclname):
940  self.output.append('')
941 
942  # TFileName (subelement).
943 
944  TFileName_elements = stage_element.getElementsByTagName('TFileName')
945  if TFileName_elements:
946  self.TFileName = str(TFileName_elements[0].firstChild.data)
947 
948  # Jobsub.
949 
950  jobsub_elements = stage_element.getElementsByTagName('jobsub')
951  if jobsub_elements:
952  self.jobsub = str(jobsub_elements[0].firstChild.data)
953 
954  # Jobsub start/stop.
955 
956  jobsub_start_elements = stage_element.getElementsByTagName('jobsub_start')
957  if jobsub_start_elements:
958  self.jobsub_start = str(jobsub_start_elements[0].firstChild.data)
959 
960  # Jobsub submit timeout.
961 
962  jobsub_timeout_elements = stage_element.getElementsByTagName('jobsub_timeout')
963  if jobsub_timeout_elements:
964  self.jobsub_timeout = int(jobsub_timeout_elements[0].firstChild.data)
965 
966  # Name of art-like executables (repeatable subelement).
967 
968  exe_elements = stage_element.getElementsByTagName('exe')
969  if len(exe_elements) > 0:
970 
971  # The exe element can occur once at the top level of the <stage> element, or
972  # inside a <fcl> element. The former applies globally. The latter applies
973  # only to that fcl substage.
974 
975  # Loop over global exe elements.
976 
977  for exe_element in exe_elements:
978  parent_element = exe_element.parentNode
979  if parent_element.nodeName != 'fcl':
980  exe = str(exe_element.firstChild.data)
981  self.exe = []
982  while len(self.exe) < len(self.fclname):
983  self.exe.append(exe)
984 
985  # Loop over fcl exe elements.
986 
987  for exe_element in exe_elements:
988  parent_element = exe_element.parentNode
989  if parent_element.nodeName == 'fcl':
990 
991  # Match this exe name with its parent fcl element.
992 
993  fcl = str(parent_element.firstChild.data).strip()
994  n = self.fclname.index(fcl)
995 
996  # Make sure project_name list is long enough.
997 
998  while len(self.exe) < n+1:
999  self.exe.append('')
1000 
1001  # Extract exe name and add it to list.
1002 
1003  exe = str(exe_element.firstChild.data)
1004  self.exe[n] = exe
1005 
1006  # Make sure that the size of the exe list (if present) ia at least as
1007  # long as the fclname list.
1008  # If not, extend by adding empty string.
1009 
1010  if len(self.exe) > 0:
1011  while len(self.exe) < len(self.fclname):
1012  self.exe.append('')
1013 
1014  # Sam schema.
1015 
1016  schema_elements = stage_element.getElementsByTagName('schema')
1017  if schema_elements:
1018  self.schema = str(schema_elements[0].firstChild.data)
1019 
1020  # Validate-on-worker.
1021 
1022  validate_on_worker_elements = stage_element.getElementsByTagName('check')
1023  if validate_on_worker_elements:
1024  self.validate_on_worker = int(validate_on_worker_elements[0].firstChild.data)
1025 
1026  # Upload-on-worker.
1027 
1028  copy_to_fts_elements = stage_element.getElementsByTagName('copy')
1029  if copy_to_fts_elements:
1030  self.copy_to_fts = int(copy_to_fts_elements[0].firstChild.data)
1031 
1032  # Cvmfs flag.
1033 
1034  cvmfs_elements = stage_element.getElementsByTagName('cvmfs')
1035  if cvmfs_elements:
1036  self.cvmfs = int(cvmfs_elements[0].firstChild.data)
1037 
1038  # Stash flag.
1039 
1040  stash_elements = stage_element.getElementsByTagName('stash')
1041  if stash_elements:
1042  self.stash = int(stash_elements[0].firstChild.data)
1043 
1044  # Singularity flag.
1045 
1046  singularity_elements = stage_element.getElementsByTagName('singularity')
1047  if singularity_elements:
1048  self.singularity = int(singularity_elements[0].firstChild.data)
1049 
1050  # Batch script
1051 
1052  script_elements = stage_element.getElementsByTagName('script')
1053  if script_elements:
1054  self.script = script_elements[0].firstChild.data
1055 
1056  # Make sure batch script exists, and convert into a full path.
1057 
1058  if check:
1059  script_path = ''
1060  try:
1061  jobinfo = subprocess.Popen(['which', self.script],
1062  stdout=subprocess.PIPE,
1063  stderr=subprocess.PIPE)
1064  jobout, joberr = jobinfo.communicate()
1065  jobout = convert_str(jobout)
1066  joberr = convert_str(joberr)
1067  rc = jobinfo.poll()
1068  script_path = jobout.splitlines()[0].strip()
1069  except:
1070  pass
1071  if script_path == '' or not larbatch_posix.access(script_path, os.X_OK):
1072  raise IOError('Script %s not found.' % self.script)
1073  self.script = script_path
1074 
1075  # Start script
1076 
1077  start_script_elements = stage_element.getElementsByTagName('startscript')
1078  if start_script_elements:
1079  self.start_script = start_script_elements[0].firstChild.data
1080 
1081  # Make sure start project batch script exists, and convert into a full path.
1082 
1083  if check:
1084  script_path = ''
1085  try:
1086  jobinfo = subprocess.Popen(['which', self.start_script],
1087  stdout=subprocess.PIPE,
1088  stderr=subprocess.PIPE)
1089  jobout, joberr = jobinfo.communicate()
1090  jobout = convert_str(jobout)
1091  joberr = convert_str(joberr)
1092  rc = jobinfo.poll()
1093  script_path = jobout.splitlines()[0].strip()
1094  except:
1095  pass
1096  self.start_script = script_path
1097 
1098  # Stop script
1099 
1100  stop_script_elements = stage_element.getElementsByTagName('stopscript')
1101  if stop_script_elements:
1102  self.stop_script = stop_script_elements[0].firstChild.data
1103 
1104  # Make sure stop project batch script exists, and convert into a full path.
1105 
1106  if check:
1107  script_path = ''
1108  try:
1109  jobinfo = subprocess.Popen(['which', self.stop_script],
1110  stdout=subprocess.PIPE,
1111  stderr=subprocess.PIPE)
1112  jobout, joberr = jobinfo.communicate()
1113  jobout = convert_str(jobout)
1114  joberr = convert_str(joberr)
1115  rc = jobinfo.poll()
1116  script_path = jobout.splitlines()[0].strip()
1117  except:
1118  pass
1119  self.stop_script = script_path
1120 
1121  # Done.
1122 
1123  return
1124 
1125  # String conversion.
1126 
1127  def __str__(self):
1128  result = 'Stage name = %s\n' % self.name
1129  result = 'Batch job name = %s\n' % self.batchname
1130  #result += 'Fcl filename = %s\n' % self.fclname
1131  for fcl in self.fclname:
1132  result += 'Fcl filename = %s\n' % fcl
1133  result += 'Output directory = %s\n' % self.outdir
1134  result += 'Log directory = %s\n' % self.logdir
1135  result += 'Work directory = %s\n' % self.workdir
1136  result += 'Bookkeeping directory = %s\n' % self.bookdir
1137  result += 'Maximum directory size = %d\n' % self.dirsize
1138  result += 'Extra directory levels = %d\n' % self.dirlevels
1139  result += 'Dynamic directories = %d\n' % self.dynamic
1140  result += 'Input file = %s\n' % self.inputfile
1141  result += 'Input list = %s\n' % self.inputlist
1142  result += 'Input mode = %s\n' % self.inputmode
1143  result += 'Input sam dataset = %s' % self.inputdef
1144  if self.recur:
1145  result += ' (recursive)'
1146  result += '\n'
1147  result += 'Base sam dataset = %s\n' % self.basedef
1148  result += 'Analysis flag = %d\n' % self.ana
1149  result += 'Recursive flag = %d\n' % self.recur
1150  result += 'Recursive type = %s\n' % self.recurtype
1151  result += 'Recursive limit = %d\n' % self.recurlimit
1152  result += 'Single run flag = %d\n' % self.singlerun
1153  result += 'File list definition flag = %d\n' % self.filelistdef
1154  result += 'Prestart flag = %d\n' % self.prestart
1155  result += 'Active projects base name = %s\n' % self.activebase
1156  result += 'Dropbox waiting interval = %f\n' % self.dropboxwait
1157  result += 'Prestage fraction = %f\n' % self.prestagefraction
1158  result += 'Input stream = %s\n' % self.inputstream
1159  result += 'Previous stage name = %s\n' % self.previousstage
1160  result += 'Mix input sam dataset = %s\n' % self.mixinputdef
1161  result += 'Pubs input allowed = %d\n' % self.pubs_input_ok
1162  result += 'Pubs input mode = %d\n' % self.pubs_input
1163  result += 'Pubs input run number = %d\n' % self.input_run
1164  for subrun in self.input_subruns:
1165  result += 'Pubs input subrun number = %d\n' % subrun
1166  result += 'Pubs input version number = %d\n' % self.input_version
1167  result += 'Pubs output mode = %d\n' % self.pubs_output
1168  result += 'Pubs output run number = %d\n' % self.output_run
1169  for subrun in self.output_subruns:
1170  result += 'Pubs output subrun number = %d\n' % subrun
1171  result += 'Pubs output version number = %d\n' % self.output_version
1172  result += 'Output file name = %s\n' % self.output
1173  result += 'TFileName = %s\n' % self.TFileName
1174  result += 'Number of jobs = %d\n' % self.num_jobs
1175  result += 'Number of events = %d\n' % self.num_events
1176  result += 'Max flux MB = %d\n' % self.maxfluxfilemb
1177  result += 'Max files per job = %d\n' % self.max_files_per_job
1178  result += 'Output file target size = %d\n' % self.target_size
1179  result += 'Dataset definition name = %s\n' % self.defname
1180  result += 'Analysis dataset definition name = %s\n' % self.ana_defname
1181  result += 'Data tier = %s\n' % self.data_tier
1182  result += 'Data stream = %s\n' % self.data_stream
1183  result += 'Analysis data tier = %s\n' % self.ana_data_tier
1184  result += 'Analysis data stream = %s\n' % self.ana_data_stream
1185  result += 'Submit script = %s\n' % self.submit_script
1186  result += 'Worker initialization script = %s\n' % self.init_script
1187  result += 'Worker initialization source script = %s\n' % self.init_source
1188  result += 'Worker end-of-job script = %s\n' % self.end_script
1189  result += 'Worker midstage source initialization scripts = %s\n' % self.mid_source
1190  result += 'Worker midstage finalization scripts = %s\n' % self.mid_script
1191  result += 'Project name overrides = %s\n' % self.project_name
1192  result += 'Stage name overrides = %s\n' % self.stage_name
1193  result += 'Project version overrides = %s\n' % self.project_version
1194  result += 'Special histogram merging program = %s\n' % self.merge
1195  result += 'Analysis merge flag = %s\n' % self.anamerge
1196  result += 'Resource = %s\n' % self.resource
1197  result += 'Lines = %s\n' % self.lines
1198  result += 'Site = %s\n' % self.site
1199  result += 'Blacklist = %s\n' % self.blacklist
1200  result += 'Cpu = %d\n' % self.cpu
1201  result += 'Disk = %s\n' % self.disk
1202  result += 'Datafiletypes = %s\n' % self.datafiletypes
1203  result += 'Memory = %d MB\n' % self.memory
1204  result += 'Metadata parameters:\n'
1205  for key in self.parameters:
1206  result += '%s: %s\n' % (key,self.parameters[key])
1207  result += 'Output file name = %s\n' % self.output
1208  result += 'TFile name = %s\n' % self.TFileName
1209  result += 'Jobsub_submit options = %s\n' % self.jobsub
1210  result += 'Jobsub_submit start/stop options = %s\n' % self.jobsub_start
1211  result += 'Jobsub submit timeout = %d\n' % self.jobsub_timeout
1212  result += 'Executables = %s\n' % self.exe
1213  result += 'Schema = %s\n' % self.schema
1214  result += 'Validate-on-worker = %d\n' % self.validate_on_worker
1215  result += 'Upload-on-worker = %d\n' % self.copy_to_fts
1216  result += 'Cvmfs flag = %d\n' % self.cvmfs
1217  result += 'Stash cache flag = %d\n' % self.stash
1218  result += 'Singularity flag = %d\n' % self.singularity
1219  result += 'Batch script = %s\n' % self.script
1220  result += 'Start script = %s\n' % self.start_script
1221  result += 'Stop script = %s\n' % self.stop_script
1222  return result
1223 
1224  # The purpose of this method is to limit input to the specified run
1225  # and subruns. There are several use cases.
1226  #
1227  # 1. If xml element pubsinput is false (0), do not try to limit the
1228  # input in any way. Just use whatever is the normal input.
1229  #
1230  # 2. If input is from a sam dataset, create a restricted dataset that
1231  # is limited to the specified run and subruns.
1232  #
1233  # 3. If input is from a file list (and pubsinput is true), modify the
1234  # input list assuming that the input area has the standard pubs
1235  # diretory structure. There are several subcases depending on
1236  # whether there is one or multiple subruns.
1237  #
1238  # a) Single subrun. Reset input list to point to an input
1239  # list from pubs input directory (assuming input area has
1240  # standard pubs structure). Raise PubsInputError if input
1241  # files.list does not exist.
1242  #
1243  # b) Multiple subruns. Generate a new input list with a
1244  # unique name which will get input from the union of the
1245  # specified run and subruns input lists (assuming pubs
1246  # directory structure).
1247  #
1248  # In each case 2, 3(a), and 3(b), the original stage parameter
1249  # num_jobs is ignored. Instead, num_jobs is recalculated as
1250  #
1251  # num_jobs = (len(subruns) + max_files_per_job - 1) / max_files_per_job
1252  #
1253  # This formula has the feature that if len(subruns) == 1, or
1254  # if max_files_per_job is equal to or greater than the number
1255  # of suburns (or effectively infinite), then the final
1256  # num_jobs paramater will always be one, meaning all input
1257  # files will read in a single job. On the other hand if
1258  # max_files_per_job == 1, then num_jobs will be equal to the
1259  # number of subruns, so each batch job will process one subrun.
1260  #
1261  # 4. If input is from a singe file (and pubsinput is true), raise
1262  # an exception. This use case doesn't make sense and isn't
1263  # supported.
1264  #
1265  # 5. If no input is specified, don't do anything, since there is no
1266  # input to limit.
1267 
1268  def pubsify_input(self, run, subruns, version):
1269 
1270  # Don't do anything if pubs input is disabled.
1271 
1272  if not self.pubs_input_ok:
1273  return
1274 
1275  # It never makes sense to specify pubs input mode if there are no
1276  # input files (i.e. generation jobs). This is not considered an error.
1277 
1278  if self.inputfile == '' and self.inputlist == '' and self.inputdef == '':
1279  return
1280 
1281  # The case if input from a single file is not supported. Raise an exception.
1282 
1283  if self.inputfile != '':
1284  raise RuntimeError('Pubs input for single file input is not supported.')
1285 
1286  # Set pubs input mode.
1287 
1288  self.pubs_input = 1
1289 
1290  # Save the run, subrun, and version numbers.
1291 
1292  self.input_run = run;
1293  self.input_subruns = subruns;
1294  self.input_version = version;
1295 
1296  # if input is from a sam dataset, create a restricted dataset that limits
1297  # input files to selected run and subruns.
1298 
1299  if self.inputdef != '':
1300  newdef = project_utilities.create_limited_dataset(self.inputdef,
1301  run,
1302  subruns)
1303  if not newdef:
1304  raise PubsInputError(run, subruns[0], version)
1305  self.inputdef = newdef
1306 
1307  # Set the number of submitted jobs assuming each worker will get
1308  # self.max_files_per_job files.
1309 
1310  files_per_job = self.max_files_per_job
1311  if files_per_job == 0:
1312  files_per_job = 1
1313  self.num_jobs = (len(subruns) + files_per_job - 1) / files_per_job
1314 
1315  # Done.
1316 
1317  return
1318 
1319  # If we get to here, we have input from a file list and a previous stage
1320  # exists. This normally indicates a daisy chain. This is where subcases
1321  # 3 (a), (b) are handled.
1322 
1323  # Case 3(a), single subrun.
1324 
1325  if len(subruns) == 1:
1326 
1327  # Insert run and subrun into input file list path.
1328 
1329  if version == None:
1330  pubs_path = '%d/%d' % (run, subruns[0])
1331  else:
1332  pubs_path = '%d/%d/%d' % (version, run, subruns[0])
1333  dir = os.path.dirname(self.inputlist)
1334  base = os.path.basename(self.inputlist)
1335  self.inputlist = os.path.join(dir, pubs_path, base)
1336 
1337  # Verify that the input list exists and is not empty.
1338 
1339  lines = []
1340  try:
1341  lines = larbatch_posix.readlines(self.inputlist)
1342  except:
1343  lines = []
1344  if len(lines) == 0:
1345  raise PubsInputError(run, subruns[0], version)
1346 
1347  # Verify that input files actually exist.
1348 
1349  for line in lines:
1350  input_file = line.strip()
1351  if not larbatch_posix.exists(input_file):
1352  raise PubsInputError(run, subruns[0], version)
1353 
1354  # Specify that there will be exactly one job submitted.
1355 
1356  self.num_jobs = 1
1357 
1358  # Everything OK (case 3(a)).
1359 
1360  return
1361 
1362  # Case 3(b), multiple subruns.
1363 
1364  if len(subruns) > 1:
1365 
1366  # Generate a new input file list with a unique name and place
1367  # it in the same directory as the original input list. Note that
1368  # the input list may not actually exist at this point. If it
1369  # doesn't exist, just use the original name. If it already exists,
1370  # generate a different name.
1371 
1372  dir = os.path.dirname(self.inputlist)
1373  base = os.path.basename(self.inputlist)
1374  new_inputlist_path = self.inputlist
1375  if larbatch_posix.exists(new_inputlist_path):
1376  new_inputlist_path = '%s/%s_%s.list' % (dir, base, str(uuid.uuid4()))
1377  self.inputlist = new_inputlist_path
1378 
1379  # Defer opening the new input list file until after the original
1380  # input file is successfully opened.
1381 
1382  new_inputlist_file = None
1383 
1384  # Loop over subruns. Read contents of pubs input list for each subrun.
1385 
1386  nsubruns = 0
1387  total_size = 0
1388  actual_subruns = []
1389  truncate = False
1390  for subrun in subruns:
1391 
1392  if truncate:
1393  break
1394 
1395  nsubruns += 1
1396 
1397  if version == None:
1398  pubs_path = '%d/%d' % (run, subrun)
1399  else:
1400  pubs_path = '%d/%d/%d' % (version, run, subrun)
1401 
1402  subrun_inputlist = os.path.join(dir, pubs_path, base)
1403  lines = []
1404  try:
1405  lines = larbatch_posix.readlines(subrun_inputlist)
1406  except:
1407  lines = []
1408  if len(lines) == 0:
1409  raise PubsInputError(run, subruns[0], version)
1410  for line in lines:
1411  subrun_inputfile = line.strip()
1412 
1413  # Test size and accessibility of this input file.
1414 
1415  sr_size = -1
1416  try:
1417  sr = larbatch_posix.stat(subrun_inputfile)
1418  sr_size = sr.st_size
1419  except:
1420  sr_size = -1
1421 
1422  if sr_size > 0:
1423  actual_subruns.append(subrun)
1424  if new_inputlist_file == None:
1425  print('Generating new input list %s\n' % new_inputlist_path)
1426  new_inputlist_file = larbatch_posix.open(new_inputlist_path, 'w')
1427  new_inputlist_file.write('%s\n' % subrun_inputfile)
1428  total_size += sr.st_size
1429 
1430  # If at this point the total size exceeds the target size,
1431  # truncate the list of subruns and break out of the loop.
1432 
1433  if self.max_files_per_job > 1 and self.target_size != 0 \
1434  and total_size >= self.target_size:
1435  truncate = True
1436  break
1437 
1438  # Done looping over subruns.
1439 
1440  new_inputlist_file.close()
1441 
1442  # Raise an exception if the actual list of subruns is empty.
1443 
1444  if len(actual_subruns) == 0:
1445  raise PubsInputError(run, subruns[0], version)
1446 
1447  # Update the list of subruns to be the actual list of subruns.
1448 
1449  if len(actual_subruns) != len(subruns):
1450  print('Truncating subrun list: %s' % str(actual_subruns))
1451  del subruns[:]
1452  subruns.extend(actual_subruns)
1453 
1454  # Set the number of submitted jobs assuming each worker will get
1455  # self.max_files_per_job files.
1456 
1457  files_per_job = self.max_files_per_job
1458  if files_per_job == 0:
1459  files_per_job = 1
1460  self.num_jobs = (len(subruns) + files_per_job - 1) / files_per_job
1461 
1462  # Everything OK (case 3(b)).
1463 
1464  return
1465 
1466  # Shouldn't ever fall out of loop.
1467 
1468  return
1469 
1470 
1471  # Function to convert this stage for pubs output.
1472 
1473  def pubsify_output(self, run, subruns, version):
1474 
1475  # Set pubs mode.
1476 
1477  self.pubs_output = 1
1478 
1479  # Save the run, subrun, and version numbers.
1480 
1481  self.output_run = run;
1482  self.output_subruns = subruns;
1483  self.output_version = version;
1484 
1485  # Append run and subrun to workdir, outdir, logdir, and bookdir.
1486  # In case of multiple subruns, encode the subdir directory as "@s",
1487  # which informs the batch worker to determine the subrun dynamically.
1488 
1489  if len(subruns) == 1:
1490  if version == None:
1491  pubs_path = '%d/%d' % (run, subruns[0])
1492  else:
1493  pubs_path = '%d/%d/%d' % (version, run, subruns[0])
1494  self.workdir = os.path.join(self.workdir, pubs_path)
1495  else:
1496  if version == None:
1497  pubs_path = '%d/@s' % run
1498  else:
1499  pubs_path = '%d/%d/@s' % (version, run)
1500  self.workdir = os.path.join(self.workdir, str(uuid.uuid4()))
1501  self.dynamic = 1
1502  self.outdir = os.path.join(self.outdir, pubs_path)
1503  self.logdir = os.path.join(self.logdir, pubs_path)
1504  self.bookdir = os.path.join(self.bookdir, pubs_path)
1505 
1506  # Run presubmission check script, if any.
1507  # Dump output and return exit status.
1508  # A nonzero exit status generally means that jobs shouldn't be submitted.
1509 
1510  def checksubmit(self):
1511 
1512  rc = 0
1513  if len(self.submit_script) > 0:
1514  print('Running presubmission check script', end=' ')
1515  for word in self.submit_script:
1516  print(word, end=' ')
1517  print()
1518  jobinfo = subprocess.Popen(self.submit_script,
1519  stdout=subprocess.PIPE,
1520  stderr=subprocess.PIPE)
1521  q = queue.Queue()
1522  thread = threading.Thread(target=larbatch_utilities.wait_for_subprocess,
1523  args=[jobinfo, q])
1524  thread.start()
1525  thread.join(timeout=60)
1526  if thread.is_alive():
1527  print('Submit script timed out, terminating.')
1528  jobinfo.terminate()
1529  thread.join()
1530  rc = q.get()
1531  jobout = convert_str(q.get())
1532  joberr = convert_str(q.get())
1533  print('Script exit status = %d' % rc)
1534  print('Script standard output:')
1535  print(jobout)
1536  print('Script diagnostic output:')
1537  print(joberr)
1538 
1539  # Done.
1540  # Return exit status.
1541 
1542  return rc
1543 
1544 
1545  # Raise an exception if any specified input file/list doesn't exist.
1546  # (We don't currently check sam input datasets).
1547 
1548  def checkinput(self, checkdef=False):
1549 
1550  if self.inputfile != '' and not larbatch_posix.exists(self.inputfile):
1551  raise IOError('Input file %s does not exist.' % self.inputfile)
1552  if self.inputlist != '' and not larbatch_posix.exists(self.inputlist):
1553  raise IOError('Input list %s does not exist.' % self.inputlist)
1554 
1555  checkok = False
1556 
1557  # Define or update the active projects dataset, if requested.
1558 
1559  if self.activebase != '':
1560  activedef = '%s_active' % self.activebase
1561  waitdef = '%s_wait' % self.activebase
1562  project_utilities.make_active_project_dataset(self.activebase,
1563  self.dropboxwait,
1564  activedef,
1565  waitdef)
1566 
1567  # If target size is nonzero, and input is from a file list, calculate
1568  # the ideal number of output jobs and override the current number
1569  # of jobs.
1570 
1571  if self.target_size != 0 and self.inputlist != '':
1572  input_filenames = larbatch_posix.readlines(self.inputlist)
1573  size_tot = 0
1574  for line in input_filenames:
1575  filename = line.split()[0]
1576  filesize = larbatch_posix.stat(filename).st_size
1577  size_tot = size_tot + filesize
1578  new_num_jobs = size_tot / self.target_size
1579  if new_num_jobs < 1:
1580  new_num_jobs = 1
1581  if new_num_jobs > self.num_jobs:
1582  new_num_jobs = self.num_jobs
1583  print("Ideal number of jobs based on target file size is %d." % new_num_jobs)
1584  if new_num_jobs != self.num_jobs:
1585  print("Updating number of jobs from %d to %d." % (self.num_jobs, new_num_jobs))
1586  self.num_jobs = new_num_jobs
1587 
1588  # If singlerun mode is requested, pick a random file from the input
1589  # dataset and create (if necessary) a new dataset definition which
1590  # limits files to be only from that run. Don't do anything here if
1591  # the input dataset is empty.
1592 
1593  if self.singlerun and checkdef:
1594 
1595  samweb = project_utilities.samweb()
1596  print("Doing single run processing.")
1597 
1598  # First find an input file.
1599 
1600  #dim = 'defname: %s with limit 1' % self.inputdef
1601  dim = 'defname: %s' % self.inputdef
1602  if self.filelistdef:
1603  input_files = list(project_utilities.listFiles(dim))
1604  else:
1605  input_files = samweb.listFiles(dimensions=dim)
1606  if len(input_files) > 0:
1607  random_file = random.choice(input_files)
1608  print('Example file: %s' % random_file)
1609 
1610  # Extract run number.
1611 
1612  md = samweb.getMetadata(random_file)
1613  run_tuples = md['runs']
1614  if len(run_tuples) > 0:
1615  run = run_tuples[0][0]
1616  print('Input files will be limited to run %d.' % run)
1617 
1618  # Make a new dataset definition.
1619  # If this definition already exists, assume it is correct.
1620 
1621  newdef = '%s_run_%d' % (samweb.makeProjectName(self.inputdef), run)
1622  def_exists = False
1623  try:
1624  desc = samweb.descDefinition(defname=newdef)
1625  def_exists = True
1626  except samweb_cli.exceptions.DefinitionNotFound:
1627  pass
1628  if not def_exists:
1629  print('Creating dataset definition %s' % newdef)
1630  newdim = 'defname: %s and run_number %d' % (self.inputdef, run)
1631  samweb.createDefinition(defname=newdef, dims=newdim)
1632  self.inputdef = newdef
1633 
1634  else:
1635  print('Problem extracting run number from example file.')
1636  return 1
1637 
1638  else:
1639  print('Input dataset is empty.')
1640  return 1
1641 
1642  # If target size is nonzero, and input is from a sam dataset definition,
1643  # and maxfilesperjob is not one, calculate the ideal number of jobs and
1644  # maxfilesperjob.
1645 
1646  if self.target_size != 0 and self.max_files_per_job != 1 and self.inputdef != '':
1647 
1648  # Query sam to determine size and number of files in input
1649  # dataset.
1650 
1651  samweb = project_utilities.samweb()
1652  dim = 'defname: %s' % self.inputdef
1653  nfiles = 0
1654  files = []
1655  if self.filelistdef:
1656  files = project_utilities.listFiles(dim)
1657  nfiles = len(files)
1658  else:
1659  sum = samweb.listFilesSummary(dimensions=dim)
1660  nfiles = sum['file_count']
1661  print('Input dataset %s has %d files.' % (self.inputdef, nfiles))
1662  if nfiles > 0:
1663  checkok = True
1664  max_files = self.max_files_per_job * self.num_jobs
1665  size_tot = 0
1666  if max_files > 0 and max_files < nfiles:
1667  if self.filelistdef:
1668  while len(files) > max_files:
1669  files.pop()
1670  dim = 'defname: %s' % project_utilities.makeFileListDefinition(files)
1671  else:
1672  dim += ' with limit %d' % max_files
1673  sum = samweb.listFilesSummary(dimensions=dim)
1674  size_tot = sum['total_file_size']
1675  nfiles = sum['file_count']
1676  else:
1677  if self.filelistdef:
1678  dim = 'defname: %s' % project_utilities.makeFileListDefinition(files)
1679  sum = samweb.listFilesSummary(dimensions=dim)
1680  size_tot = sum['total_file_size']
1681  nfiles = sum['file_count']
1682 
1683  # Calculate updated job parameters.
1684 
1685  new_num_jobs = int(math.ceil(float(size_tot) / float(self.target_size)))
1686  if new_num_jobs < 1:
1687  new_num_jobs = 1
1688  if new_num_jobs > self.num_jobs:
1689  new_num_jobs = self.num_jobs
1690 
1691  new_max_files_per_job = int(math.ceil(float(self.target_size) * float(nfiles) / float(size_tot)))
1692  if self.max_files_per_job > 0 and new_max_files_per_job > self.max_files_per_job:
1693  new_max_files_per_job = self.max_files_per_job
1694  new_num_jobs = (nfiles + self.max_files_per_job - 1) / self.max_files_per_job
1695  if new_num_jobs < 1:
1696  new_num_jobs = 1
1697  if new_num_jobs > self.num_jobs:
1698  new_num_jobs = self.num_jobs
1699 
1700  print("Ideal number of jobs based on target file size is %d." % new_num_jobs)
1701  if new_num_jobs != self.num_jobs:
1702  print("Updating number of jobs from %d to %d." % (self.num_jobs, new_num_jobs))
1703  self.num_jobs = new_num_jobs
1704  print("Ideal number of files per job is %d." % new_max_files_per_job)
1705  if new_max_files_per_job != self.max_files_per_job:
1706  print("Updating maximum files per job from %d to %d." % (
1707  self.max_files_per_job, new_max_files_per_job))
1708  self.max_files_per_job = new_max_files_per_job
1709  else:
1710  print('Input dataset is empty.')
1711  return 1
1712 
1713  # If requested, do a final check in the input dataset.
1714  # Limit the number of jobs to be not more than the number of files, since
1715  # it never makes sense to have more jobs than that.
1716  # If the number of input files is zero, return an error.
1717 
1718  if self.inputdef != '' and checkdef and not checkok:
1719  samweb = project_utilities.samweb()
1720  n = 0
1721  if self.filelistdef:
1722  files = project_utilities.listFiles('defname: %s' % self.inputdef)
1723  n = len(files)
1724  else:
1725  sum = samweb.listFilesSummary(defname=self.inputdef)
1726  n = sum['file_count']
1727  print('Input dataset %s contains %d files.' % (self.inputdef, n))
1728  if n < self.num_jobs:
1729  self.num_jobs = n
1730  if n == 0:
1731  return 1
1732 
1733  # Done (all good).
1734 
1735  return 0
1736 
1737 
1738  # Raise an exception if output directory or log directory doesn't exist.
1739 
1741  if not larbatch_posix.exists(self.outdir):
1742  raise IOError('Output directory %s does not exist.' % self.outdir)
1743  if not larbatch_posix.exists(self.logdir):
1744  raise IOError('Log directory %s does not exist.' % self.logdir)
1745  return
1746 
1747  # Raise an exception if output, log, work, or bookkeeping directory doesn't exist.
1748 
1749  def checkdirs(self):
1750  if not larbatch_posix.exists(self.outdir):
1751  raise IOError('Output directory %s does not exist.' % self.outdir)
1752  if self.logdir != self.outdir and not larbatch_posix.exists(self.logdir):
1753  raise IOError('Log directory %s does not exist.' % self.logdir)
1754  if not larbatch_posix.exists(self.workdir):
1755  raise IOError('Work directory %s does not exist.' % self.workdir)
1756  if self.bookdir != self.logdir and not larbatch_posix.exists(self.bookdir):
1757  raise IOError('Bookkeeping directory %s does not exist.' % self.bookdir)
1758  return
1759 
1760  # Make output, log, work, and bookkeeping directory, if they don't exist.
1761 
1762  def makedirs(self):
1763  if not larbatch_posix.exists(self.outdir):
1764  larbatch_posix.makedirs(self.outdir)
1765  if self.logdir != self.outdir and not larbatch_posix.exists(self.logdir):
1766  larbatch_posix.makedirs(self.logdir)
1767  if not larbatch_posix.exists(self.workdir):
1768  larbatch_posix.makedirs(self.workdir)
1769  if self.bookdir != self.logdir and not larbatch_posix.exists(self.bookdir):
1770  larbatch_posix.makedirs(self.bookdir)
1771 
1772  # If output is on dcache, make output directory group-writable.
1773 
1774  if self.outdir[0:6] == '/pnfs/':
1775  mode = stat.S_IMODE(larbatch_posix.stat(self.outdir).st_mode)
1776  if not mode & stat.S_IWGRP:
1777  mode = mode | stat.S_IWGRP
1778  larbatch_posix.chmod(self.outdir, mode)
1779  if self.logdir[0:6] == '/pnfs/':
1780  mode = stat.S_IMODE(larbatch_posix.stat(self.logdir).st_mode)
1781  if not mode & stat.S_IWGRP:
1782  mode = mode | stat.S_IWGRP
1783  larbatch_posix.chmod(self.logdir, mode)
1784 
1785  self.checkdirs()
1786  return
def checkinput(self, checkdef=False)
Definition: stagedef.py:1548
def __init__(self, stage_element, base_stage, default_input_lists, default_previous_stage, default_num_jobs, default_num_events, default_max_files_per_job, default_merge, default_anamerge, default_cpu, default_disk, default_memory, default_validate_on_worker, default_copy_to_fts, default_cvmfs, default_stash, default_singularity, default_script, default_start_script, default_stop_script, default_site, default_blacklist, check=True)
Definition: stagedef.py:43
def check_output_dirs(self)
Definition: stagedef.py:1740
def pubsify_output(self, run, subruns, version)
Definition: stagedef.py:1473
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
def pubsify_input(self, run, subruns, version)
Definition: stagedef.py:1268
static QCString str