projectdef.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 ######################################################################
3 #
4 # Name: projectdef.py
5 #
6 # Purpose: Python class ProjectDef (used by project.py script).
7 #
8 # Created: 12-Dec-2014 Herbert Greenlee
9 #
10 ######################################################################
11 
12 from __future__ import absolute_import
13 from __future__ import print_function
14 import sys, os, subprocess
15 from project_modules.xmlerror import XMLError
16 from project_modules.stagedef import StageDef
17 import larbatch_posix
18 from larbatch_utilities import convert_str
19 from larbatch_utilities import get_ups_products
20 
21 # Project definition class contains data parsed from project defition xml file.
22 
23 class ProjectDef:
24 
25  # Constructor.
26  # project_element argument can be an xml element or None.
27 
28  def __init__(self, project_element, default_first_input_list, default_input_lists, check=True):
29 
30  # Assign default values.
31 
32  self.name= '' # Project name.
33  self.num_events = 0 # Total events (all jobs).
34  self.num_jobs = 1 # Number of jobs.
35  self.max_files_per_job = 0 # Max number of files per job.
36  self.ups = [] # Top level ups products.
37  self.os = '' # Batch OS.
38  self.resource = 'DEDICATED,OPPORTUNISTIC' # Jobsub resources.
39  self.role = '' # Role (normally Analysis or Production).
40  self.lines = '' # Arbitrary condor commands.
41  self.server = '-' # Jobsub server.
42  self.site = '' # Site.
43  self.blacklist = '' # Blacklist.
44  self.cpu = 0 # Number of cpus.
45  self.disk = '' # Disk space (string value+unit).
46  self.memory = 0 # Amount of memory (integer MB).
47  self.merge = 'hadd -T' # histogram merging program.
48  self.anamerge = '' # Analysis merge flag.
49  self.release_tag = '' # Larsoft release tag.
50  self.release_qual = 'debug' # Larsoft release qualifier.
51  self.version = '' # Project version.
52  self.local_release_dir = '' # Larsoft local release directory.
53  self.local_release_tar = '' # Larsoft local release tarball.
54  self.file_type = '' # Sam file type.
55  self.run_type = '' # Sam run type.
56  self.run_number = 0 # Sam run number.
57  self.script = 'condor_lar.sh' # Batch script.
58  self.validate_on_worker = 0 # Run post-job validation on the worker node
59  self.copy_to_fts = 0 # Copy a copy of the file to a dropbox scanned by fts. Note that a copy is still sent to <outdir>
60  self.cvmfs = 1 # Cvmfs flag.
61  self.stash = 1 # Stash cache flag.
62  self.singularity = 1 # Singularity flag.
63  self.start_script = 'condor_start_project.sh' # Sam start project script.
64  self.stop_script = 'condor_stop_project.sh' # Sam stop project script.
65  self.force_dag = 0 # Force dag for sam input jobs.
66  self.fclpath = [] # Fcl search path.
67  self.stages = [] # List of stages (StageDef objects).
68  self.parameters = {} # Dictionary of metadata parameters.
69 
70  # Extract values from xml.
71 
72  # Project name (attribute)
73 
74  if 'name' in dict(project_element.attributes):
75  self.name = str(project_element.attributes['name'].firstChild.data)
76  if self.name == '':
77  raise XMLError('Project name not specified.')
78 
79  # Total events (subelement).
80 
81  num_events_elements = project_element.getElementsByTagName('numevents')
82  for num_events_element in num_events_elements:
83  if num_events_element.parentNode == project_element:
84  self.num_events = int(num_events_element.firstChild.data)
85  if self.num_events == 0:
86  raise XMLError('Number of events not specified.')
87 
88  # Number of jobs (subelement).
89 
90  num_jobs_elements = project_element.getElementsByTagName('numjobs')
91  for num_jobs_element in num_jobs_elements:
92  if num_jobs_element.parentNode == project_element:
93  self.num_jobs = int(num_jobs_element.firstChild.data)
94 
95  # Max Number of files per jobs.
96 
97  max_files_per_job_elements = project_element.getElementsByTagName('maxfilesperjob')
98  for max_files_per_job_element in max_files_per_job_elements:
99  if max_files_per_job_element.parentNode == project_element:
100  self.max_files_per_job = int(max_files_per_job_element.firstChild.data)
101 
102  # Top level ups product (repeatable subelement).
103 
104  ups_elements = project_element.getElementsByTagName('ups')
105  for ups_element in ups_elements:
106  self.ups.append(str(ups_element.firstChild.data))
107 
108  # If ups products list is empty, set default.
109 
110  if len(self.ups) == 0:
111  self.ups = get_ups_products().split(',')
112 
113  # OS (subelement).
114 
115  os_elements = project_element.getElementsByTagName('os')
116  for os_element in os_elements:
117  if os_element.parentNode == project_element:
118  self.os = str(os_element.firstChild.data)
119  self.os = ''.join(self.os.split())
120 
121  # Resource (subelement).
122 
123  resource_elements = project_element.getElementsByTagName('resource')
124  for resource_element in resource_elements:
125  if resource_element.parentNode == project_element:
126  self.resource = str(resource_element.firstChild.data)
127  self.resource = ''.join(self.resource.split())
128 
129  # Role (subelement).
130 
131  role_elements = project_element.getElementsByTagName('role')
132  for role_element in role_elements:
133  if role_element.parentNode == project_element:
134  self.role = str(role_element.firstChild.data)
135 
136  # Lines (subelement).
137 
138  lines_elements = project_element.getElementsByTagName('lines')
139  for lines_element in lines_elements:
140  if lines_element.parentNode == project_element:
141  self.lines = str(lines_element.firstChild.data)
142 
143  # Server (subelement).
144 
145  server_elements = project_element.getElementsByTagName('server')
146  for server_element in server_elements:
147  if server_element.parentNode == project_element:
148  self.server = str(server_element.firstChild.data)
149 
150  # Site (subelement).
151 
152  site_elements = project_element.getElementsByTagName('site')
153  for site_element in site_elements:
154  if site_element.parentNode == project_element:
155  self.site = str(site_element.firstChild.data)
156  self.site = ''.join(self.site.split())
157 
158  # Blacklist (subelement).
159 
160  blacklist_elements = project_element.getElementsByTagName('blacklist')
161  for blacklist_element in blacklist_elements:
162  if blacklist_element.parentNode == project_element:
163  self.blacklist = str(blacklist_element.firstChild.data)
164  self.blacklist = ''.join(self.blacklist.split())
165 
166  # Cpu (subelement).
167 
168  cpu_elements = project_element.getElementsByTagName('cpu')
169  for cpu_element in cpu_elements:
170  if cpu_element.parentNode == project_element:
171  self.cpu = int(cpu_element.firstChild.data)
172 
173  # Disk (subelement).
174 
175  disk_elements = project_element.getElementsByTagName('disk')
176  for disk_element in disk_elements:
177  if disk_element.parentNode == project_element:
178  self.disk = str(disk_element.firstChild.data)
179  self.disk = ''.join(self.disk.split())
180 
181  # Memory (subelement).
182 
183  memory_elements = project_element.getElementsByTagName('memory')
184  for memory_element in memory_elements:
185  if memory_element.parentNode == project_element:
186  self.memory = int(memory_element.firstChild.data)
187 
188  # merge (subelement).
189 
190  merge_elements = project_element.getElementsByTagName('merge')
191  for merge_element in merge_elements:
192  if merge_element.parentNode == project_element:
193  if merge_element.firstChild:
194  self.merge = str(merge_element.firstChild.data)
195  else:
196  self.merge = ''
197 
198  # anamerge (subelement).
199 
200  anamerge_elements = project_element.getElementsByTagName('anamerge')
201  for anamerge_element in anamerge_elements:
202  if anamerge_element.parentNode == project_element:
203  if anamerge_element.firstChild:
204  self.anamerge = str(anamerge_element.firstChild.data)
205  else:
206  self.anamerge = ''
207 
208  # Larsoft (subelement).
209 
210  larsoft_elements = project_element.getElementsByTagName('larsoft')
211  if larsoft_elements:
212 
213  # Release tag (subelement).
214 
215  tag_elements = larsoft_elements[0].getElementsByTagName('tag')
216  if tag_elements and tag_elements[0].firstChild != None:
217  self.release_tag = str(tag_elements[0].firstChild.data)
218 
219  # Release qualifier (subelement).
220 
221  qual_elements = larsoft_elements[0].getElementsByTagName('qual')
222  if qual_elements:
223  self.release_qual = str(qual_elements[0].firstChild.data)
224 
225  # Local release directory or tarball (subelement).
226  #
227 
228  local_elements = larsoft_elements[0].getElementsByTagName('local')
229  if local_elements:
230  local = str(local_elements[0].firstChild.data)
231  if larbatch_posix.isdir(local):
232  self.local_release_dir = local
233  else:
234  self.local_release_tar = local
235 
236  # Version (subelement).
237 
238  version_elements = project_element.getElementsByTagName('version')
239  if version_elements:
240  self.version = str(version_elements[0].firstChild.data)
241  else:
242  self.version = self.release_tag
243 
244  # Make sure local test release directory/tarball exists, if specified.
245  # Existence of non-null local_release_dir has already been tested.
246 
247  if check and self.local_release_tar != '' and not larbatch_posix.exists(self.local_release_tar):
248  raise IOError("Local release directory/tarball %s does not exist." % self.local_release_tar)
249 
250  # Sam file type (subelement).
251 
252  file_type_elements = project_element.getElementsByTagName('filetype')
253  if file_type_elements:
254  self.file_type = str(file_type_elements[0].firstChild.data)
255 
256  # Sam run type (subelement).
257 
258  run_type_elements = project_element.getElementsByTagName('runtype')
259  if run_type_elements:
260  self.run_type = str(run_type_elements[0].firstChild.data)
261 
262  # Sam run number (subelement).
263 
264  run_number_elements = project_element.getElementsByTagName('runnumber')
265  if run_number_elements:
266  self.run_number = int(run_number_elements[0].firstChild.data)
267 
268  # Batch script (subelement).
269 
270  script_elements = project_element.getElementsByTagName('script')
271  for script_element in script_elements:
272  if script_element.parentNode == project_element:
273  self.script = str(script_element.firstChild.data)
274 
275  # Make sure batch script exists, and convert into a full path.
276 
277  if check:
278  script_path = ''
279  try:
280  jobinfo = subprocess.Popen(['which', self.script],
281  stdout=subprocess.PIPE,
282  stderr=subprocess.PIPE)
283  jobout, joberr = jobinfo.communicate()
284  jobout = convert_str(jobout)
285  joberr = convert_str(joberr)
286  rc = jobinfo.poll()
287  script_path = jobout.splitlines()[0].strip()
288  except:
289  pass
290  if script_path == '' or not larbatch_posix.access(script_path, os.X_OK):
291  raise IOError('Script %s not found.' % self.script)
292  self.script = script_path
293 
294  # Validate-on-worker flag (subelement).
295 
296  worker_validations = project_element.getElementsByTagName('check')
297  for worker_validation in worker_validations:
298  if worker_validation.parentNode == project_element:
299  self.validate_on_worker = int(worker_validation.firstChild.data)
300 
301  # Copy to FTS flag (subelement).
302 
303  worker_copys = project_element.getElementsByTagName('copy')
304  for worker_copy in worker_copys:
305  if worker_copy.parentNode == project_element:
306  self.copy_to_fts = int(worker_copy.firstChild.data)
307 
308  # Cvmfs flag (subelement).
309 
310  cvmfs_elements = project_element.getElementsByTagName('cvmfs')
311  for cvmfs_element in cvmfs_elements:
312  if cvmfs_element.parentNode == project_element:
313  self.cvmfs = int(cvmfs_element.firstChild.data)
314 
315  # Stash flag (subelement).
316 
317  stash_elements = project_element.getElementsByTagName('stash')
318  for stash_element in stash_elements:
319  if stash_element.parentNode == project_element:
320  self.stash = int(stash_element.firstChild.data)
321 
322  # Singularity flag (subelement).
323 
324  singularity_elements = project_element.getElementsByTagName('singularity')
325  for singularity_element in singularity_elements:
326  if singularity_element.parentNode == project_element:
327  self.singularity = int(singularity_element.firstChild.data)
328 
329  # Start project batch script (subelement).
330 
331  start_script_elements = project_element.getElementsByTagName('startscript')
332  for start_script_element in start_script_elements:
333  if start_script_element.parentNode == project_element:
334  self.start_script = str(start_script_element.firstChild.data)
335  self.force_dag = 1
336 
337  # Make sure start project batch script exists, and convert into a full path.
338 
339  if check:
340  script_path = ''
341  try:
342  jobinfo = subprocess.Popen(['which', self.start_script],
343  stdout=subprocess.PIPE,
344  stderr=subprocess.PIPE)
345  jobout, joberr = jobinfo.communicate()
346  jobout = convert_str(jobout)
347  joberr = convert_str(joberr)
348  rc = jobinfo.poll()
349  script_path = jobout.splitlines()[0].strip()
350  except:
351  pass
352  self.start_script = script_path
353 
354  # Stop project batch script (subelement).
355 
356  stop_script_elements = project_element.getElementsByTagName('stopscript')
357  for stop_script_element in stop_script_elements:
358  if stop_script_element.parentNode == project_element:
359  self.stop_script = str(stop_script_element.firstChild.data)
360  self.force_dag = 1
361 
362  # Make sure stop project batch script exists, and convert into a full path.
363 
364  if check:
365  script_path = ''
366  try:
367  jobinfo = subprocess.Popen(['which', self.stop_script],
368  stdout=subprocess.PIPE,
369  stderr=subprocess.PIPE)
370  jobout, joberr = jobinfo.communicate()
371  jobout = convert_str(jobout)
372  joberr = convert_str(joberr)
373  rc = jobinfo.poll()
374  script_path = jobout.splitlines()[0].strip()
375  except:
376  pass
377  self.stop_script = script_path
378 
379  # Fcl search path (repeatable subelement).
380 
381  fclpath_elements = project_element.getElementsByTagName('fcldir')
382  for fclpath_element in fclpath_elements:
383  self.fclpath.append(str(fclpath_element.firstChild.data))
384 
385  # Add $FHICL_FILE_PATH.
386 
387  if check and 'FHICL_FILE_PATH' in os.environ:
388  for fcldir in os.environ['FHICL_FILE_PATH'].split(':'):
389  if larbatch_posix.exists(fcldir):
390  self.fclpath.append(fcldir)
391 
392  # Make sure all directories of fcl search path exist.
393 
394  if check:
395  for fcldir in self.fclpath:
396  if not larbatch_posix.exists(fcldir):
397  raise IOError("Fcl search directory %s does not exist." % fcldir)
398 
399  # Project stages (repeatable subelement).
400 
401  stage_elements = project_element.getElementsByTagName('stage')
402  default_previous_stage = ''
403  default_input_lists[default_previous_stage] = default_first_input_list
404  for stage_element in stage_elements:
405 
406  # Get base stage, if any.
407 
408  base_stage = None
409  if 'base' in dict(stage_element.attributes):
410  base_name = str(stage_element.attributes['base'].firstChild.data)
411  if base_name != '':
412  for stage in self.stages:
413  if stage.name == base_name:
414  base_stage = stage
415  break
416 
417  if base_stage == None:
418  raise LookupError('Base stage %s not found.' % base_name)
419 
420  self.stages.append(StageDef(stage_element,
421  base_stage,
422  default_input_lists,
423  default_previous_stage,
424  self.num_jobs,
425  self.num_events,
426  self.max_files_per_job,
427  self.merge,
428  self.anamerge,
429  self.cpu,
430  self.disk,
431  self.memory,
432  self.validate_on_worker,
433  self.copy_to_fts,
434  self.cvmfs,
435  self.stash,
436  self.singularity,
437  self.script,
438  self.start_script,
439  self.stop_script,
440  self.site,
441  self.blacklist,
442  check=check))
443  default_previous_stage = self.stages[-1].name
444  default_input_lists[default_previous_stage] = os.path.join(self.stages[-1].bookdir,
445  'files.list')
446 
447  # Dictionary of metadata parameters
448 
449  param_elements = project_element.getElementsByTagName('parameter')
450  for param_element in param_elements:
451  name = str(param_element.attributes['name'].firstChild.data)
452  value = str(param_element.firstChild.data)
453  self.parameters[name] = value
454 
455  # Done.
456 
457  return
458 
459  # String conversion.
460 
461  def __str__(self):
462  result = 'Project name = %s\n' % self.name
463  result += 'Total events = %d\n' % self.num_events
464  result += 'Number of jobs = %d\n' % self.num_jobs
465  result += 'Max files per job = %d\n' % self.max_files_per_job
466  result += 'Top level ups products:\n'
467  for prod in self.ups:
468  result += ' %s\n' % prod
469  result += 'OS = %s\n' % self.os
470  result += 'Resource = %s\n' % self.resource
471  result += 'Role = %s\n' % self.role
472  result += 'Lines = %s\n' % self.lines
473  result += 'Jobsub server = %s\n' % self.server
474  result += 'Site = %s\n' % self.site
475  result += 'Blacklist = %s\n' % self.blacklist
476  result += 'Cpu = %d\n' % self.cpu
477  result += 'Disk = %s\n' % self.disk
478  result += 'Memory = %d MB\n' % self.memory
479  result += 'Histogram merging program = %s\n' % self.merge
480  result += 'Analysis merge flag = %s\n' % self.anamerge
481  result += 'Larsoft release tag = %s\n' % self.release_tag
482  result += 'Larsoft release qualifier = %s\n' % self.release_qual
483  result += 'Version = %s\n' % self.version
484  result += 'Local test release directory = %s\n' % self.local_release_dir
485  result += 'Local test release tarball = %s\n' % self.local_release_tar
486  result += 'File type = %s\n' % self.file_type
487  result += 'Run type = %s\n' % self.run_type
488  result += 'Run number = %d\n' % self.run_number
489  result += 'Batch script = %s\n' % self.script
490  result += 'Start sam project script = %s\n' % self.start_script
491  result += 'Stop sam project script = %s\n' % self.stop_script
492  result += 'Force dag = %d\n' % self.force_dag
493  result += 'Fcl search path:\n'
494  for fcldir in self.fclpath:
495  result += ' %s\n' % fcldir
496  result += 'Metadata parameters:\n'
497  for key in self.parameters:
498  result += '%s: %s\n' % (key,self.parameters[key])
499  result += '\nStages:'
500  for stage in self.stages:
501  result += '\n\n' + str(stage)
502  return result
503 
504  # Get the specified stage.
505 
506  def get_stage(self, stagename):
507 
508  if len(self.stages) == 0:
509  raise LookupError("Project does not have any stages.")
510 
511  elif stagename == '' and len(self.stages) == 1:
512  return self.stages[0]
513 
514  else:
515  for stage in self.stages:
516  if stagename == stage.name:
517  return stage
518 
519  # If we fell through to here, we didn't find an appropriate stage.
520 
521  raise RuntimeError('No stage %s.' % stagename)
522 
523  # Find fcl file on fcl search path.
524 
525  def get_fcl(self, fclname):
526 
527  fcl_list = []
528  for name in fclname:
529  fcl = ''
530  for fcldir in self.fclpath:
531  fcl = os.path.join(fcldir, name)
532  #print fcl
533  if larbatch_posix.exists(fcl):
534  break
535 
536  if fcl == '' or not larbatch_posix.exists(fcl):
537  raise IOError('Could not find fcl file %s.' % name)
538  fcl_list.append(fcl)
539  return fcl_list
def __init__(self, project_element, default_first_input_list, default_input_lists, check=True)
Definition: projectdef.py:28
def get_stage(self, stagename)
Definition: projectdef.py:506
def get_fcl(self, fclname)
Definition: projectdef.py:525
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
static QCString str