13 from __future__
import absolute_import
14 from __future__
import print_function
15 import sys, os, stat, time, types
17 import urllib.request
as urlrequest
19 import urllib
as urlrequest
31 from project_modules.ifdherror
import IFDHError
33 import larbatch_utilities
34 from larbatch_utilities
import get_experiment, get_user, get_role, get_prouser
35 from larbatch_utilities
import test_ticket, test_kca, test_proxy, get_kca, get_proxy
36 from larbatch_utilities
import dimensions
37 from larbatch_utilities
import dimensions_datastream
38 from larbatch_utilities
import wait_for_subprocess
39 from larbatch_utilities
import get_bluearc_server
40 from larbatch_utilities
import get_dcache_server
41 from larbatch_utilities
import get_dropbox
42 from larbatch_utilities
import get_sam_metadata
43 from larbatch_utilities
import get_ups_products
44 from larbatch_utilities
import get_setup_script_path
45 from larbatch_utilities
import check_running
46 from larbatch_utilities
import convert_str
49 if 'TERM' in os.environ:
50 del os.environ[
'TERM']
54 sys.argv = myargv[0:1]
56 ROOT.gErrorIgnoreLevel = ROOT.kError
69 if path[-5:] !=
'.list' and \
70 path[-5:] !=
'.root' and \
71 path[-4:] !=
'.txt' and \
72 path[-4:] !=
'.fcl' and \
73 path[-4:] !=
'.out' and \
74 path[-4:] !=
'.err' and \
75 path[-3:] !=
'.sh' and \
76 path[-5:] !=
'.stat' and \
77 larbatch_posix.isdir(path):
87 if larbatch_posix.access(path, os.R_OK):
93 larbatch_posix.listdir(os.path.dirname(path))
120 if 'TMPDIR' in os.environ:
121 scratch = os.environ[
'TMPDIR']
123 elif 'SCRATCH' in os.environ:
124 scratch = os.environ[
'SCRATCH']
128 if not larbatch_posix.isdir(scratch)
or not larbatch_posix.access(scratch, os.W_OK):
134 raise RuntimeError(
'No scratch directory specified.')
136 if not larbatch_posix.isdir(scratch)
or not larbatch_posix.access(scratch, os.W_OK):
137 raise RuntimeError(
'Scratch directory %s does not exist or is not writeable.' % scratch)
147 path = os.path.realpath(path)
151 while not os.path.ismount(path):
152 dir = os.path.dirname(path)
153 if len(dir) >= len(path):
166 if c ==
'$' and ( len(result) == 0
or result[-1] !=
'\\'):
182 for token
in s.split(
','):
186 if token.strip().isdigit():
187 result.add(
int(token))
192 limits = token.split(
'-')
193 if len(limits) == 2
and limits[0].strip().isdigit()
and limits[1].strip().isdigit():
194 result |= set(range(
int(limits[0]),
int(limits[1])+1))
199 raise ValueError(
'Unparseable range token %s.' % token)
203 return sorted(result)
218 if len(subruns) == 0:
225 for subrun
in subruns:
226 if run_subrun_dim !=
'':
227 run_subrun_dim +=
',' 228 run_subrun_dim +=
"%d.%d" % (run, subrun)
232 dim =
"defname: %s and run_number %s" % (defname, run_subrun_dim)
246 newdefname = defname +
'_' +
str(uuid.uuid4())
262 if samweb_obj ==
None:
265 os.environ[
'SSL_CERT_DIR'] =
'/etc/grid-security/certificates' 271 def start_project(defname, default_prjname, max_files, force_snapshot, filelistdef):
276 prjname = default_prjname
278 prjname = s.makeProjectName(defname)
279 print(
'Starting project %s' % prjname)
289 files =
listFiles(
'defname: %s' % defname)
292 nf = s.countFiles(
'defname: %s' % defname)
293 print(
'Input dataset has %d files.' % nf)
299 if max_files > 0
and nf > max_files:
300 limitdef =
'%s_limit_%d' % (prjname, max_files)
304 if defExists(limitdef)
and not filelistdef:
305 print(
'Using already created limited dataset definition %s.' % limitdef)
307 dim =
'defname: %s with limit %d' % (defname, max_files)
311 print(
'Creating limited dataset definition %s.' % limitdef)
319 dim =
'defname: %s' % defname
325 print(
'Forcing snapthot.')
326 defname =
'%s:force' % defname
330 print(
'Starting project %s.' % prjname)
331 s.startProject(prjname,
358 prjstem =
'%s_' % s.makeProjectName(defname).rsplit(
'_',1)[0]
362 dt = datetime.timedelta(3, 0)
363 tmin = datetime.datetime.utcnow() - dt
364 tminstr = tmin.strftime(
'%Y-%m-%dT%H:%M:%S')
365 prjnames = s.listProjects(started_after = tminstr)
369 for prjname
in prjnames:
370 if prjstem ==
'' or prjname.startswith(prjstem):
378 prjsum = s.projectSummary(prjurl)
379 if 'project_end_time' in prjsum:
380 tendstr = prjsum[
'project_end_time']
381 if len(tendstr) >= 19:
383 tend = datetime.datetime.strptime(tendstr[:19],
'%Y-%m-%dT%H:%M:%S')
384 tage = datetime.datetime.utcnow() - tend
385 age = tage.total_seconds()
391 if age <= dropboxwait * 86400:
411 prjstem =
'%s_' % s.makeProjectName(defname).rsplit(
'_',1)[0]
415 url =
'%s/dumpStation?station=%s' % (s.get_baseurl(),
get_experiment())
416 furl = urlrequest.urlopen(url)
420 for line
in furl.readlines():
424 if prjstem ==
'' or prjname.startswith(prjstem):
452 dim =
'snapshot_for_project_name %s' % prj
465 print(
'Updating dataset definition %s' % active_defname)
466 s.deleteDefinition(active_defname)
468 print(
'Creating dataset definition %s' % active_defname)
480 dt = datetime.timedelta(
int(dropboxwait),
int(dropboxwait % 1 * 86400))
484 tmin = datetime.datetime.utcnow() - dt
488 tminstr = tmin.strftime(
'%Y-%m-%dT%H:%M:%S')
492 dim =
"isparentof: (create_date > '%s' and availability: virtual)" % tminstr
503 print(
'Updating dataset definition %s' % wait_defname)
504 s.deleteDefinition(wait_defname)
506 print(
'Creating dataset definition %s' % wait_defname)
516 samweb().descDefinition(defname)
531 print(
'Making dummy dataset definition %s' % defname)
544 if larbatch_posix.exists(path)
and path[0:6] ==
'/pnfs/' and larbatch_posix.stat(path).st_size == 0:
547 print(
'Adding layer two for path %s.' % path)
549 print(
'Deleting empty file %s.' % path)
555 larbatch_posix.remove(path)
564 for var
in (
'X509_USER_CERT',
'X509_USER_KEY'):
565 if var
in os.environ:
566 save_vars[var] = os.environ[var]
571 command = [
'ifdh',
'cp',
'/dev/null', path]
572 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE,
573 stderr=subprocess.PIPE)
575 thread = threading.Thread(target=wait_for_subprocess, args=[jobinfo, q])
577 thread.join(timeout=60)
578 if thread.is_alive():
579 print(
'Terminating subprocess.')
586 for var
in list(save_vars.keys()):
587 os.environ[var] = save_vars[var]
588 raise IFDHError(command, rc, jobout, joberr)
592 for var
in list(save_vars.keys()):
593 os.environ[var] = save_vars[var]
617 return larbatch_utilities.srm_uri(path)
620 return larbatch_posix.exists(path)
624 return larbatch_posix.readlines(path)
629 return larbatch_posix.copy(src, dest)
646 return ROOT.TFile.Open(path)
659 desc =
samweb().descDefinitionDict(word)
660 descdim = desc[
'dimensions']
665 if descdim.find(
' or ') < 0
and descdim.find(
' minus ') < 0:
666 result +=
' defname: %s' % word
668 result +=
' ( %s )' % desc[
'dimensions']
671 if word ==
'defname:':
674 result +=
' %s' % word
704 n = dim.find(
'with limit')
711 head = head.replace(
'(',
' ( ')
712 head = head.replace(
')',
' ) ')
716 head = head.replace(
'isparentof: ',
'isparentof:')
717 head = head.replace(
'ischildof: ',
'ischildof:')
719 for word
in head.split():
721 if word ==
'(' or word ==
'isparentof:(' or word ==
'ischildof:(':
727 elif word ==
'or' or word ==
'minus':
734 while len(temp) > 0
and not done:
736 if last ==
'(' or last ==
'isparentof:(' or last ==
'ischildof:':
754 elif last ==
'isparentof:(':
755 if len(result) == 0
or result[-1] ==
'or' or result[-1] ==
'minus':
756 raise RuntimeError(
'isparentof: parse error')
758 result.append(
'isparentof:( %s )' % last)
760 elif last ==
'ischildof:(':
761 if len(result) == 0
or result[-1] ==
'or' or result[-1] ==
'minus':
762 raise RuntimeError(
'ischildof: parse error')
764 result.append(
'ischildof:( %s )' % last)
780 result.append(temp.pop())
804 print(
'Generating completed set of files using dimension "%s".' % dim)
809 print(
'Fetching result from sam cache.')
839 print(
'Set union %d files' % len(union))
842 elif item ==
'minus':
849 print(
'Set difference %d files' % len(diff))
852 elif item.startswith(
'with limit'):
857 while len(stack[-1]) > n:
859 print(
'Truncated to %d files' % len(stack[-1]))
867 print(
'Evaluating "%s"' % item)
869 print(
'Fetching result from cache.')
870 files = samcache[item]
873 samcache[item] = files
874 print(
'Result %d files' % len(files))
879 print(
'Final result %d files' % len(stack[-1]))
880 samcache[dim] = stack[-1]
899 print(
'Making file list definition from %s with %d elements.' % (
type(list_or_dim),
903 print(
'Making file list definition using dimension "%s"' % list_or_dim)
906 for filename
in flist:
908 listdim =
'file_name %s' % filename
910 listdim +=
', %s' % filename
912 listdim =
'file_id 0' 916 defname =
get_user() +
'_filelist_' +
str(uuid.uuid4())
935 dir =
'/cvmfs/singularity.opensciencegrid.org/fermilab' 936 lcname = name.lower()
940 if os.path.exists(os.path.abspath(name)):
941 result = os.path.abspath(name)
945 elif os.path.exists(
'%s/%s' % (dir, lcname)):
946 result =
'%s/%s' % (dir, lcname)
947 elif os.path.exists(
'%s/fnal-wn-%s' % (dir, lcname)):
948 result =
'%s/fnal-wn-%s' % (dir, lcname)
949 elif os.path.exists(
'%s/fnal-wn-%s:latest' % (dir, lcname)):
950 result =
'%s/fnal-wn-%s:latest' % (dir, lcname)
def make_active_project_dataset(defname, dropboxwait, active_defname, wait_defname)
def addLayerTwo(path, recreate=True)
def start_project(defname, default_prjname, max_files, force_snapshot, filelistdef)
def active_projects(defname='')
def default_jobsub_submit_options()
def create_limited_dataset(defname, run, subruns)
def makeFileListDefinition(list_or_dim)
def get_singularity(name)
def active_projects2(defname='', dropboxwait=0.)
def makeDummyDef(defname)
static void countFiles(int &htmlFiles, int &files)
def path_to_srm_url(path)