project_utilities.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #----------------------------------------------------------------------
3 #
4 # Name: project_utilities.py
5 #
6 # Purpose: A python module containing various python utility functions
7 # and classes used by project.py and other python scripts.
8 #
9 # Created: 28-Oct-2013 H. Greenlee
10 #
11 #----------------------------------------------------------------------
12 
13 from __future__ import absolute_import
14 from __future__ import print_function
15 import sys, os, stat, time, types
16 try:
17  import urllib.request as urlrequest
18 except ImportError:
19  import urllib as urlrequest
20 import datetime
21 import socket
22 import subprocess
23 import shutil
24 import threading
25 try:
26  import queue
27 except ImportError:
28  import Queue as queue
29 import uuid
30 import samweb_cli
31 from project_modules.ifdherror import IFDHError
32 import larbatch_posix
33 import larbatch_utilities
34 from larbatch_utilities import get_experiment, get_user, get_role, get_prouser
35 from larbatch_utilities import test_ticket, test_kca, test_proxy, get_kca, get_proxy
36 from larbatch_utilities import dimensions
37 from larbatch_utilities import dimensions_datastream
38 from larbatch_utilities import wait_for_subprocess
39 from larbatch_utilities import get_bluearc_server
40 from larbatch_utilities import get_dcache_server
41 from larbatch_utilities import get_dropbox
42 from larbatch_utilities import get_sam_metadata
43 from larbatch_utilities import get_ups_products
44 from larbatch_utilities import get_setup_script_path
45 from larbatch_utilities import check_running
46 from larbatch_utilities import convert_str
47 
48 # Prevent root from printing garbage on initialization.
49 if 'TERM' in os.environ:
50  del os.environ['TERM']
51 
52 # Hide command line arguments from ROOT module.
53 myargv = sys.argv
54 sys.argv = myargv[0:1]
55 import ROOT
56 ROOT.gErrorIgnoreLevel = ROOT.kError
57 sys.argv = myargv
58 
59 # Global variables.
60 
61 samweb_obj = None # Initialized SAMWebClient object
62 samcache = {} # Sam query cache (samcache[dimension] = set(...)).
63 
64 
65 # Like os.path.isdir, but faster by avoiding unnecessary i/o.
66 
67 def fast_isdir(path):
68  result = False
69  if path[-5:] != '.list' and \
70  path[-5:] != '.root' and \
71  path[-4:] != '.txt' and \
72  path[-4:] != '.fcl' and \
73  path[-4:] != '.out' and \
74  path[-4:] != '.err' and \
75  path[-3:] != '.sh' and \
76  path[-5:] != '.stat' and \
77  larbatch_posix.isdir(path):
78  result = True
79  return result
80 
81 # Wait for file to appear on local filesystem.
82 
83 def wait_for_stat(path):
84 
85  ntry = 60
86  while ntry > 0:
87  if larbatch_posix.access(path, os.R_OK):
88  return 0
89  print('Waiting ...')
90 
91  # Reading the parent directory seems to make files be visible faster.
92 
93  larbatch_posix.listdir(os.path.dirname(path))
94  time.sleep(1)
95  ntry = ntry - 1
96 
97  # Timed out.
98 
99  return 1
100 
101 # Function to return the path of a scratch directory which can be used
102 # for creating large temporary files. The scratch directory should not
103 # be in dCache. The default implementation here uses the following algorithm.
104 #
105 # 1. Environment variable TMPDIR.
106 #
107 # 2. Environment variable SCRATCH.
108 #
109 # 3. Path /scratch/<experiment>/<user>
110 #
111 # 4. Path /<experiment>/data/users/<user>
112 #
113 # Raise an exception if the scratch directory doesn't exist or is not writeable.
114 
116  scratch = ''
117 
118  # Get scratch directory path.
119 
120  if 'TMPDIR' in os.environ:
121  scratch = os.environ['TMPDIR']
122 
123  elif 'SCRATCH' in os.environ:
124  scratch = os.environ['SCRATCH']
125 
126  else:
127  scratch = '/scratch/%s/%s' % (get_experiment(), get_user())
128  if not larbatch_posix.isdir(scratch) or not larbatch_posix.access(scratch, os.W_OK):
129  scratch = '/%s/data/users/%s' % (get_experiment(), get_user())
130 
131  # Checkout.
132 
133  if scratch == '':
134  raise RuntimeError('No scratch directory specified.')
135 
136  if not larbatch_posix.isdir(scratch) or not larbatch_posix.access(scratch, os.W_OK):
137  raise RuntimeError('Scratch directory %s does not exist or is not writeable.' % scratch)
138 
139  return scratch
140 
141 # Function to return the mountpoint of a given path.
142 
143 def mountpoint(path):
144 
145  # Handle symbolic links and relative paths.
146 
147  path = os.path.realpath(path)
148 
149  # Find mountpoint.
150 
151  while not os.path.ismount(path):
152  dir = os.path.dirname(path)
153  if len(dir) >= len(path):
154  return dir
155  path = dir
156 
157  return path
158 
159 
160 # Function to escape dollar signs in string by prepending backslash (\).
161 
163 
164  result = ''
165  for c in s:
166  if c == '$' and ( len(result) == 0 or result[-1] != '\\'):
167  result += '\\'
168  result += c
169  return result
170 
171 
172 # Function to parse a string containing a comma- and hyphen-separated
173 # representation of a collection of positive integers into a sorted list
174 # of ints. Raise ValueError excpetion in case of unparseable string.
175 
176 def parseInt(s):
177 
178  result = set()
179 
180  # First split string into tokens separated by commas.
181 
182  for token in s.split(','):
183 
184  # Plain integers handled here.
185 
186  if token.strip().isdigit():
187  result.add(int(token))
188  continue
189 
190  # Hyphenenated ranges handled here.
191 
192  limits = token.split('-')
193  if len(limits) == 2 and limits[0].strip().isdigit() and limits[1].strip().isdigit():
194  result |= set(range(int(limits[0]), int(limits[1])+1))
195  continue
196 
197  # Don't understand.
198 
199  raise ValueError('Unparseable range token %s.' % token)
200 
201  # Return result in form of a sorted list.
202 
203  return sorted(result)
204 
205 
206 # Function to construct a new dataset definition from an existing definition
207 # such that the new dataset definition will be limited to a specified run and
208 # set of subruns.
209 #
210 # The name of the new definition is returned as the return value of
211 # the function.
212 #
213 # If the new query does not return any files, the new dataset is not created,
214 # and the function returns the empty string ('').
215 
216 def create_limited_dataset(defname, run, subruns):
217 
218  if len(subruns) == 0:
219  return ''
220 
221  # Construct comma-separated list of run-subrun pairs in a form that is
222  # acceptable as sam dimension constraint.
223 
224  run_subrun_dim = ''
225  for subrun in subruns:
226  if run_subrun_dim != '':
227  run_subrun_dim += ','
228  run_subrun_dim += "%d.%d" % (run, subrun)
229 
230  # Construct dimension including run and subrun constraints.
231 
232  dim = "defname: %s and run_number %s" % (defname, run_subrun_dim)
233 
234  # Test the new dimension.
235 
236  nfiles = samweb().countFiles(dimensions=dim)
237  if nfiles == 0:
238  return ''
239 
240  # Make sure we have a kca certificate.
241 
242  test_kca()
243 
244  # Construct a new unique definition name.
245 
246  newdefname = defname + '_' + str(uuid.uuid4())
247 
248  # Create definition.
249 
250  samweb().createDefinition(newdefname, dim, user=get_user(), group=get_experiment())
251 
252  # Done (return definition name).
253 
254  return newdefname
255 
256 # Return initialized SAMWebClient object.
257 
258 def samweb():
259 
260  global samweb_obj
261 
262  if samweb_obj == None:
263  samweb_obj = samweb_cli.SAMWebClient(experiment=get_experiment())
264 
265  os.environ['SSL_CERT_DIR'] = '/etc/grid-security/certificates'
266 
267  return samweb_obj
268 
269 # Start sam project.
270 
271 def start_project(defname, default_prjname, max_files, force_snapshot, filelistdef):
272 
273  # Check project name.
274 
275  s = samweb()
276  prjname = default_prjname
277  if prjname == '':
278  prjname = s.makeProjectName(defname)
279  print('Starting project %s' % prjname)
280 
281  # Make sure we have a certificate.
282 
283  test_kca()
284 
285  # Figure out how many files are in the input dataset.
286 
287  nf = 0
288  if filelistdef:
289  files = listFiles('defname: %s' % defname)
290  nf = len(files)
291  else:
292  nf = s.countFiles('defname: %s' % defname)
293  print('Input dataset has %d files.' % nf)
294  if nf == 0:
295  return 1
296 
297  # Make limited dataset?
298 
299  if max_files > 0 and nf > max_files:
300  limitdef = '%s_limit_%d' % (prjname, max_files)
301 
302  # Figure out whether limitdef already exists.
303 
304  if defExists(limitdef) and not filelistdef:
305  print('Using already created limited dataset definition %s.' % limitdef)
306  else:
307  dim = 'defname: %s with limit %d' % (defname, max_files)
308  if filelistdef:
309  limitdef = makeFileListDefinition(dim)
310  else:
311  print('Creating limited dataset definition %s.' % limitdef)
312  s.createDefinition(limitdef, dim, user=get_user(), group=get_experiment())
313 
314  defname = limitdef
315  nf = max_files
316 
317  elif filelistdef:
318 
319  dim = 'defname: %s' % defname
320  defname = makeFileListDefinition(dim)
321 
322  # Force snapshot?
323 
324  if force_snapshot:
325  print('Forcing snapthot.')
326  defname = '%s:force' % defname
327 
328  # Start the project.
329 
330  print('Starting project %s.' % prjname)
331  s.startProject(prjname,
332  defname=defname,
333  station=get_experiment(),
334  group=get_experiment(),
335  user=get_user())
336 
337  # Done.
338 
339  return 0
340 
341 # Return a list of active projects associated with a particular dataset definition stem
342 # based on project start and end times. The particular criteria used in this function
343 # are:
344 #
345 # 1. Project started within the last 72 hours.
346 #
347 # 2. Project no end time.
348 
349 def active_projects2(defname = '', dropboxwait = 0.):
350 
351  result = set()
352 
353  # Get project name stem.
354 
355  s = samweb()
356  prjstem = ''
357  if defname != '':
358  prjstem = '%s_' % s.makeProjectName(defname).rsplit('_',1)[0]
359 
360  # Query a list of projects started within the last 72 hours.
361 
362  dt = datetime.timedelta(3, 0)
363  tmin = datetime.datetime.utcnow() - dt
364  tminstr = tmin.strftime('%Y-%m-%dT%H:%M:%S')
365  prjnames = s.listProjects(started_after = tminstr)
366 
367  # Loop over projects to check end times.
368 
369  for prjname in prjnames:
370  if prjstem == '' or prjname.startswith(prjstem):
371 
372  # This project is a candidate for inclusion in result.
373  # Check end time.
374 
375  age = 0
376  prjurl = s.findProject(project=prjname, station=get_experiment())
377  if prjurl != '':
378  prjsum = s.projectSummary(prjurl)
379  if 'project_end_time' in prjsum:
380  tendstr = prjsum['project_end_time']
381  if len(tendstr) >= 19:
382  try:
383  tend = datetime.datetime.strptime(tendstr[:19], '%Y-%m-%dT%H:%M:%S')
384  tage = datetime.datetime.utcnow() - tend
385  age = tage.total_seconds()
386  except:
387  pass
388 
389  # Keep this project if there is no end time.
390 
391  if age <= dropboxwait * 86400:
392  result.add(prjname)
393 
394  # Done.
395 
396  return result
397 
398 
399 # Return a list of active projects associated with a particular dataset definition stem.
400 # If the definition argument is the empty string, return all active projects.
401 
402 def active_projects(defname = ''):
403 
404  result = set()
405 
406  # Get project name stem.
407 
408  s = samweb()
409  prjstem = ''
410  if defname != '':
411  prjstem = '%s_' % s.makeProjectName(defname).rsplit('_',1)[0]
412 
413  # Dump station
414 
415  url = '%s/dumpStation?station=%s' % (s.get_baseurl(), get_experiment())
416  furl = urlrequest.urlopen(url)
417 
418  # Parse response.
419 
420  for line in furl.readlines():
421  words = line.split()
422  if len(words) > 5:
423  prjname = convert_str(words[0])
424  if prjstem == '' or prjname.startswith(prjstem):
425  result.add(prjname)
426 
427  # Done.
428 
429  return result
430 
431 # Make active projects dataset definition.
432 #
433 # defname - Dataset definition associated with active projects.
434 # dropbixwait - Dropbox wait interval (float days).
435 # active_defname - Name of dataset definition to create.
436 # wait_defname - Name of dropbox waiting dataset to create.
437 
438 def make_active_project_dataset(defname, dropboxwait, active_defname, wait_defname):
439 
440  s = samweb()
441  test_kca()
442 
443  # Get list of active projects.
444 
445  prjs = active_projects(defname) | active_projects2(defname, dropboxwait)
446 
447  # Make sam dimension.
448 
449  dim = ''
450  for prj in prjs:
451  if dim == '':
452  dim = 'snapshot_for_project_name %s' % prj
453  else:
454  dim += ',%s' % prj
455 
456  # If there were no matching projects, make up some legal dimension that won't
457  # match any files.
458 
459  if dim == '':
460  dim = 'file_id 0'
461 
462  # Create or update active_defname.
463 
464  if defExists(active_defname):
465  print('Updating dataset definition %s' % active_defname)
466  s.deleteDefinition(active_defname)
467  else:
468  print('Creating dataset definition %s' % active_defname)
469 
470  s.createDefinition(active_defname, dim, user=get_user(), group=get_experiment())
471 
472  # If the dropbox waiting interval is nonzero, create a dataset for
473  # dropbox waiting files.
474 
475  dim = ''
476  if dropboxwait > 0.:
477 
478  # Convert the waiting interval to a datetime.timedelta object.
479 
480  dt = datetime.timedelta(int(dropboxwait), int(dropboxwait % 1 * 86400))
481 
482  # Get the earliest allowed time.
483 
484  tmin = datetime.datetime.utcnow() - dt
485 
486  # Format time in a form acceptable to sam.
487 
488  tminstr = tmin.strftime('%Y-%m-%dT%H:%M:%S')
489 
490  # Append sam dimension.
491 
492  dim = "isparentof: (create_date > '%s' and availability: virtual)" % tminstr
493 
494  else:
495 
496  # Otherwise make dummy dataset.
497 
498  dim = 'file_id 0'
499 
500  # Create or update active_defname.
501 
502  if defExists(wait_defname):
503  print('Updating dataset definition %s' % wait_defname)
504  s.deleteDefinition(wait_defname)
505  else:
506  print('Creating dataset definition %s' % wait_defname)
507 
508  s.createDefinition(wait_defname, dim, user=get_user(), group=get_experiment())
509 
510 
511 # Function to check whether a sam dataset definition exists.
512 
513 def defExists(defname):
514  def_exists = False
515  try:
516  samweb().descDefinition(defname)
517  def_exists = True
518  except:
519  def_exists = False
520  return def_exists
521 
522 
523 # Function to make a dummy sam dataset definition (doesn't match files) in case one doesn't exist.
524 
525 def makeDummyDef(defname):
526 
527  if not defExists(defname):
528 
529  # Make dummy definition.
530 
531  print('Making dummy dataset definition %s' % defname)
532  test_kca()
533  samweb().createDefinition(defname, 'file_id 0', user=get_user(), group=get_experiment())
534 
535 
536 # Function to ensure that files in dCache have layer two.
537 # This function is included here as a workaround for bugs in the dCache nfs interface.
538 
539 def addLayerTwo(path, recreate=True):
540 
541  # Don't do anything if this file is not located in dCache (/pnfs/...)
542  # or has nonzero size.
543 
544  if larbatch_posix.exists(path) and path[0:6] == '/pnfs/' and larbatch_posix.stat(path).st_size == 0:
545 
546  if recreate:
547  print('Adding layer two for path %s.' % path)
548  else:
549  print('Deleting empty file %s.' % path)
550 
551  # Now we got a zero size file in dCache, which kind of files may be
552  # missing layer two.
553  # Delete the file and recreate it using ifdh.
554 
555  larbatch_posix.remove(path)
556  if not recreate:
557  return
558  test_proxy()
559 
560  # Make sure environment variables X509_USER_CERT and X509_USER_KEY
561  # are not defined (they confuse ifdh).
562 
563  save_vars = {}
564  for var in ('X509_USER_CERT', 'X509_USER_KEY'):
565  if var in os.environ:
566  save_vars[var] = os.environ[var]
567  del os.environ[var]
568 
569  # Do ifdh cp.
570 
571  command = ['ifdh', 'cp', '/dev/null', path]
572  jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE,
573  stderr=subprocess.PIPE)
574  q = queue.Queue()
575  thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
576  thread.start()
577  thread.join(timeout=60)
578  if thread.is_alive():
579  print('Terminating subprocess.')
580  jobinfo.terminate()
581  thread.join()
582  rc = q.get()
583  jobout = convert_str(q.get())
584  joberr = convert_str(q.get())
585  if rc != 0:
586  for var in list(save_vars.keys()):
587  os.environ[var] = save_vars[var]
588  raise IFDHError(command, rc, jobout, joberr)
589 
590  # Restore environment variables.
591 
592  for var in list(save_vars.keys()):
593  os.environ[var] = save_vars[var]
594 
595 # This function returns jobsub_submit options that should be included for
596 # all batch submissions.
597 
599  opt = ''
600  return opt
601 
602 # Check the health status of the batch system and any other resources that
603 # are required to submit batch jobs successfully. The idea is that this
604 # function may be called before submitting batch jobs. If this function
605 # returns false, batch jobs should not be submitted, and this failure should
606 # not be counted as an error. The default implementation here always returns
607 # true, but may be overridden in experiment_utilities.
608 
610  return True
611 
612 # The following functions are included for backward compatibility.
613 # The actual implementations have been moved to larbatch_posix or
614 # larbatch_utilities, with a different name.
615 
616 def path_to_srm_url(path):
617  return larbatch_utilities.srm_uri(path)
618 
619 def safeexist(path):
620  return larbatch_posix.exists(path)
621 
622 def saferead(path):
623  if safeexist(path):
624  return larbatch_posix.readlines(path)
625  else:
626  return []
627 
628 def safecopy(src, dest):
629  return larbatch_posix.copy(src, dest)
630 
631 # The following functions are depracated and function as no-ops.
632 # They are included for backward compatibility.
633 
634 def path_to_url(path):
635  return path
636 
637 def path_to_local(path):
638  return path
639 
640 # Class SafeTFile is retired. For compatibility, calls to the former
641 # constructor of class SafeTFile are now simply passed to the ROOT
642 # TFile open method. Note that class SafeTFile only ever supported
643 # opening root files for reading.
644 
645 def SafeTFile(path):
646  return ROOT.TFile.Open(path)
647 
648 # Expand "defname:" clauses in a sam dimension.
649 
650 def expandDefnames(dim):
651 
652  result = ''
653  isdefname = False
654  words = dim.split()
655 
656  for word in words:
657  if isdefname:
658  isdefname = False
659  desc = samweb().descDefinitionDict(word)
660  descdim = desc['dimensions']
661 
662  # If this definition doesn't contain a top level or" or "minus" clause,
663  # leave it unexpanded.
664 
665  if descdim.find(' or ') < 0 and descdim.find(' minus ') < 0:
666  result += ' defname: %s' % word
667  else:
668  result += ' ( %s )' % desc['dimensions']
669 
670  else:
671  if word == 'defname:':
672  isdefname = True
673  else:
674  result += ' %s' % word
675 
676  return result
677 
678 # This function converts a sam dimension into a tokenized rpn list.
679 #
680 # The following kinds of tokens are recognized.
681 #
682 # 1. Grouping symbols "(", ")", "isparentof:(", "ischildof:("
683 #
684 # 2. Operators "or", "minus". Operators have equal precedence and
685 # associate from left to right.
686 #
687 # 3. "with limit N" clause (must come at end).
688 #
689 # 4. Any string expression that does not fall in above categories.
690 #
691 # The returned value of this function is a list consisting of sam dimensions,
692 # "or" and "minus" operators, and possibly a final "with limit" clause.
693 
694 def tokenizeRPN(dim):
695 
696  temp = []
697  result = []
698  exp = ''
699 
700  # Split of final "with limit" clause, if any.
701 
702  head = dim
703  tail = ''
704  n = dim.find('with limit')
705  if n >= 0:
706  head = dim[:n]
707  tail = dim[n:]
708 
709  # Space out parentheses.
710 
711  head = head.replace('(', ' ( ')
712  head = head.replace(')', ' ) ')
713 
714  # But not isxxx:
715 
716  head = head.replace('isparentof: ', 'isparentof:')
717  head = head.replace('ischildof: ', 'ischildof:')
718 
719  for word in head.split():
720 
721  if word == '(' or word == 'isparentof:(' or word == 'ischildof:(':
722  if len(exp) > 0:
723  result.append(exp)
724  exp = ''
725  temp.append(word)
726 
727  elif word == 'or' or word == 'minus':
728 
729  if len(exp) > 0:
730  result.append(exp)
731  exp = ''
732 
733  done = False
734  while len(temp) > 0 and not done:
735  last = temp.pop()
736  if last == '(' or last == 'isparentof:(' or last == 'ischildof:':
737  temp.append(last)
738  done = True
739  else:
740  result.append(last)
741  temp.append(word)
742 
743  elif word == ')':
744 
745  if len(exp) > 0:
746  result.append(exp)
747  exp = ''
748 
749  done = False
750  while not done:
751  last = temp.pop()
752  if last == '(':
753  done = True
754  elif last == 'isparentof:(':
755  if len(result) == 0 or result[-1] == 'or' or result[-1] == 'minus':
756  raise RuntimeError('isparentof: parse error')
757  last = result.pop()
758  result.append('isparentof:( %s )' % last)
759  done = True
760  elif last == 'ischildof:(':
761  if len(result) == 0 or result[-1] == 'or' or result[-1] == 'minus':
762  raise RuntimeError('ischildof: parse error')
763  last = result.pop()
764  result.append('ischildof:( %s )' % last)
765  done = True
766  else:
767  result.append(last)
768 
769  else:
770  if len(exp) == 0:
771  exp = word
772  else:
773  exp += ' %s' % word
774 
775  # Clear remaining items.
776 
777  if len(exp) > 0:
778  result.append(exp)
779  while len(temp) > 0:
780  result.append(temp.pop())
781 
782  # Add final "with limit" clause, if any.
783 
784  if len(tail) > 0:
785  result.append(tail)
786 
787  return result
788 
789 
790 # This function mostly mimics the samweb listFiles function. It evaluates a sam dimension
791 # and returns a completed list of files in the form of a python set.
792 #
793 # This function exists to work around inefficiencies in the default sam implementation
794 # of listFiles by performing various set operations (set unions and set differences, as
795 # indicated sam "or" and "minus" clauses) on completed python sets, rather than as database
796 # queries.
797 #
798 # Additionally, this function caches the results of queries.
799 
800 def listFiles(dim):
801 
802  global samcache
803 
804  print('Generating completed set of files using dimension "%s".' % dim)
805 
806  # Check cache.
807 
808  if dim in samcache:
809  print('Fetching result from sam cache.')
810  return samcache[dim]
811 
812  # As a first step, expand out "defname:" clauses containing top level "or" or "minus"
813  # clauses.
814 
815  done = False
816  while not done:
817  newdim = expandDefnames(dim)
818  if newdim == dim:
819  done = True
820  else:
821  dim = newdim
822 
823  # Parse dimension into rpn list of sam dimensions and set operations.
824 
825  rpn = tokenizeRPN(dim)
826 
827  # Evaluate rpn.
828 
829  stack = []
830  for item in rpn:
831 
832  if item == 'or':
833 
834  # Take the set union of the top two items on the stack.
835 
836  set1 = stack.pop()
837  set2 = stack.pop()
838  union = set1 | set2
839  print('Set union %d files' % len(union))
840  stack.append(union)
841 
842  elif item == 'minus':
843 
844  # Take the set difference of the top two items on the stack.
845 
846  set1 = stack.pop()
847  set2 = stack.pop()
848  diff = set2 - set1
849  print('Set difference %d files' % len(diff))
850  stack.append(diff)
851 
852  elif item.startswith('with limit'):
853 
854  # Truncate set on top of stack.
855 
856  n = int(item[10:])
857  while len(stack[-1]) > n:
858  stack[-1].pop()
859  print('Truncated to %d files' % len(stack[-1]))
860 
861  else:
862 
863  # Treat this item as a sam dimension.
864  # Evaluate this dimension as a completed set, and push this set
865  # onto the stack.
866 
867  print('Evaluating "%s"' % item)
868  if item in samcache:
869  print('Fetching result from cache.')
870  files = samcache[item]
871  else:
872  files = set(samweb().listFiles(item))
873  samcache[item] = files
874  print('Result %d files' % len(files))
875  stack.append(files)
876 
877  # Done.
878 
879  print('Final result %d files' % len(stack[-1]))
880  samcache[dim] = stack[-1]
881  return stack[-1]
882 
883 # Make a sam dataset definition consisting of a list of files. The file
884 # list can be passed directly as an argument, or be evaluated by function
885 # listFiles. The name of the newly created dataset definition
886 # is returned as the return value of the function.
887 
888 def makeFileListDefinition(list_or_dim):
889 
890  # Make sure we have a kca certificate.
891 
892  test_kca()
893 
894  # Make file list dimension.
895 
896  flist = []
897  if type(list_or_dim) == type([]) or type(list_or_dim) == type(set()):
898  flist = list_or_dim
899  print('Making file list definition from %s with %d elements.' % (type(list_or_dim),
900  len(list_or_dim)))
901  else:
902  flist = listFiles(list_or_dim)
903  print('Making file list definition using dimension "%s"' % list_or_dim)
904 
905  listdim=''
906  for filename in flist:
907  if listdim == '':
908  listdim = 'file_name %s' % filename
909  else:
910  listdim += ', %s' % filename
911  if listdim == '':
912  listdim = 'file_id 0'
913 
914  # Maybe construct a new unique definition name.
915 
916  defname = get_user() + '_filelist_' + str(uuid.uuid4())
917 
918  # Create definition.
919 
920  samweb().createDefinition(defname, listdim, user=get_user(), group=get_experiment())
921 
922  # Done.
923 
924  return defname
925 
926 # Get full path of specified singularity container image file.
927 # The argument can be an absolute or relative path of the image file,
928 # or the argument can be an alias, such as 'sl7'.
929 # Alias arguments are used to find standard Fermilab singularity images.
930 # If no image file can be found, return the empty string.
931 
932 def get_singularity(name):
933 
934  result = ''
935  dir = '/cvmfs/singularity.opensciencegrid.org/fermilab'
936  lcname = name.lower()
937 
938  # See if the argument makes sense as a path.
939 
940  if os.path.exists(os.path.abspath(name)):
941  result = os.path.abspath(name)
942 
943  # Otherwise, try to interpret the argument as an alias.
944 
945  elif os.path.exists('%s/%s' % (dir, lcname)):
946  result = '%s/%s' % (dir, lcname)
947  elif os.path.exists('%s/fnal-wn-%s' % (dir, lcname)):
948  result = '%s/fnal-wn-%s' % (dir, lcname)
949  elif os.path.exists('%s/fnal-wn-%s:latest' % (dir, lcname)):
950  result = '%s/fnal-wn-%s:latest' % (dir, lcname)
951 
952  # Done.
953 
954  return result
def make_active_project_dataset(defname, dropboxwait, active_defname, wait_defname)
def addLayerTwo(path, recreate=True)
def start_project(defname, default_prjname, max_files, force_snapshot, filelistdef)
def active_projects(defname='')
def create_limited_dataset(defname, run, subruns)
def makeFileListDefinition(list_or_dim)
def active_projects2(defname='', dropboxwait=0.)
static void countFiles(int &htmlFiles, int &files)
Definition: index.cpp:1062
static QCString str