471 from __future__
import absolute_import
472 from __future__
import print_function
473 import sys, os, stat, subprocess, shutil, json, getpass, uuid, tempfile, hashlib
475 import urllib.request
as urlrequest
477 import urllib
as urlrequest
478 import larbatch_posix
483 import Queue
as queue
484 from xml.dom.minidom
import parse
485 import project_utilities, root_metadata
486 from project_modules.projectdef
import ProjectDef
487 from project_modules.projectstatus
import ProjectStatus
488 from project_modules.batchstatus
import BatchStatus
489 from project_modules.jobsuberror
import JobsubError
490 from project_modules.ifdherror
import IFDHError
491 import larbatch_utilities
492 from larbatch_utilities
import convert_str
493 from larbatch_utilities
import convert_bytes
497 extractor_dict =
None 509 global extractor_dict
514 samweb = project_utilities.samweb()
515 from extractor_dict
import expMetaData
519 def docleanx(projects, projectname, stagename, clean_descendants = True):
520 print(projectname, stagename)
534 cleaned_bookdirs = []
538 done_cleaning =
False 539 while not done_cleaning:
541 cleaned_something =
False 545 for project
in projects:
546 for stage
in project.stages:
548 clean_this_stage =
False 552 if not stage.bookdir
in cleaned_bookdirs:
556 if (projectname ==
'' or project.name == projectname)
and \
557 (stagename ==
'' or stage.name == stagename):
559 clean_this_stage =
True 564 elif clean_descendants
and stage.inputlist !=
'' and \
565 os.path.dirname(stage.inputlist)
in cleaned_bookdirs:
567 clean_this_stage =
True 572 cleaned_something =
True 573 cleaned_bookdirs.append(stage.bookdir)
575 print(
'Clean project %s, stage %s' % (project.name, stage.name))
579 if larbatch_posix.exists(stage.outdir):
580 dir_uid = larbatch_posix.stat(stage.outdir).st_uid
581 if dir_uid == uid
or dir_uid == euid:
582 print(
'Clean directory %s.' % stage.outdir)
583 larbatch_posix.rmtree(stage.outdir)
585 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.outdir)
589 if larbatch_posix.exists(stage.logdir):
590 dir_uid = larbatch_posix.stat(stage.logdir).st_uid
591 if dir_uid == uid
or dir_uid == euid:
592 print(
'Clean directory %s.' % stage.logdir)
593 larbatch_posix.rmtree(stage.logdir)
595 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.logdir)
599 if larbatch_posix.exists(stage.workdir):
600 dir_uid = larbatch_posix.stat(stage.workdir).st_uid
601 if dir_uid == uid
or dir_uid == euid:
602 print(
'Clean directory %s.' % stage.workdir)
603 larbatch_posix.rmtree(stage.workdir)
605 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.workdir)
609 if larbatch_posix.exists(stage.bookdir):
610 dir_uid = larbatch_posix.stat(stage.bookdir).st_uid
611 if dir_uid == uid
or dir_uid == euid:
612 print(
'Clean directory %s.' % stage.bookdir)
613 larbatch_posix.rmtree(stage.bookdir)
615 raise RuntimeError(
'Owner mismatch, delete %s manually.' % stage.bookdir)
617 done_cleaning =
not cleaned_something
629 project_utilities.test_kca()
638 project_status = ProjectStatus(prjs)
639 batch_status = BatchStatus(prjs)
643 print(
'\nProject %s:' % project.name)
647 for stage
in project.stages:
649 stagename = stage.name
650 stage_status = project_status.get_stage_status(stagename)
651 b_stage_status = batch_status.get_stage_status(stagename)
652 if stage_status.exists:
653 print(
'\nStage %s: %d art files, %d events, %d analysis files, %d errors, %d missing files.' % (
654 stagename, stage_status.nfile, stage_status.nev, stage_status.nana,
655 stage_status.nerror, stage_status.nmiss))
657 print(
'\nStage %s output directory does not exist.' % stagename)
658 print(
'Stage %s batch jobs: %d idle, %d running, %d held, %d other.' % (
659 stagename, b_stage_status[0], b_stage_status[1], b_stage_status[2], b_stage_status[3]))
672 if element.nodeName ==
'project':
673 default_input_by_stage = {}
674 project = ProjectDef(element,
'', default_input_by_stage, check=check)
675 projects.append(project)
683 default_input_by_stage = {}
684 subelements = element.getElementsByTagName(
'project')
685 for subelement
in subelements:
686 project = ProjectDef(subelement, default_input, default_input_by_stage, check=check)
687 projects.append(project)
688 for stage
in project.stages:
689 stage_list = os.path.join(stage.bookdir,
'files.list')
690 default_input_by_stage[stage.name] = stage_list
691 default_input = stage_list
704 if xmlfile
in get_projects.cache:
705 return get_projects.cache[xmlfile]
711 elif xmlfile.find(
':') < 0:
714 xml = urlrequest.urlopen(xmlfile)
719 root = doc.documentElement
727 get_projects.cache[xmlfile] = projects
735 get_projects.cache = {}
742 for project
in projects:
743 if projectname ==
'' or projectname == project.name:
744 for stage
in project.stages:
745 if stagename ==
'' or stagename == stage.name:
755 def get_project(xmlfile, projectname='', stagename='', check=True):
767 for project
in projects:
771 for stage
in project.stages:
774 if stage.name == stagename:
779 if circular
and len(projects) > 0
and len(projects[0].stages) > 0:
780 return projects[0].stages[0]
793 if circular
and len(projects) > 0
and len(projects[-1].stages) > 0:
794 result = projects[-1].stages[-1]
798 for project
in projects:
802 for stage
in project.stages:
803 if stage.name == stagename:
818 raise RuntimeError(
'No project selected for projectname=%s, stagename=%s' % (
819 projectname, stagename))
820 stage = project.get_stage(stagename)
822 raise RuntimeError(
'No stage selected for projectname=%s, stagename=%s' % (
823 projectname, stagename))
824 get_projects.cache = {}
825 stage.pubsify_input(run, subruns, version)
826 stage.pubsify_output(run, subruns, version)
827 get_projects.cache = {}
828 return project, stage
847 if not larbatch_posix.exists(path):
852 json_path = os.path.join(logdir, os.path.basename(path) +
'.json')
853 if larbatch_posix.exists(json_path):
858 lines = larbatch_posix.readlines(json_path)
873 if len(list(md.keys())) > 0:
877 nevroot =
int(md[
'events'])
878 if 'data_stream' in md:
879 stream = md[
'data_stream']
880 result = (nevroot, stream)
908 print(
'Checking root files in directory %s.' % outdir)
909 filenames = larbatch_posix.listdir(outdir)
910 for filename
in filenames:
911 name, ext = os.path.splitext(filename)
912 if len(ext) > 0
and ext[1:]
in data_file_types:
913 path = os.path.join(outdir, filename)
919 roots.append((os.path.join(outdir, filename), nevroot, stream))
925 hists.append(os.path.join(outdir, filename))
932 print(
'Warning: File %s in directory %s is not a valid root file.' % (filename, outdir))
936 return (nev, roots, hists)
948 if stage.inputfile !=
'':
949 result.append(stage.inputfile)
951 elif stage.inputlist !=
'' and larbatch_posix.exists(stage.inputlist):
953 input_filenames = larbatch_posix.readlines(stage.inputlist)
954 for line
in input_filenames:
956 result.append(words[0])
960 elif stage.inputdef !=
'':
962 result = samweb.listFiles(defname=stage.inputdef)
978 for out_subpath, subdirs, files
in larbatch_posix.walk(stage.outdir):
982 if len(subdirs) != 0:
985 subdir = os.path.relpath(out_subpath, stage.outdir)
986 log_subpath = os.path.join(stage.bookdir, subdir)
989 if file[-5:] ==
'.root':
994 file_path = os.path.join(out_subpath, file)
995 shortfile = file[:150] +
str(uuid.uuid4()) +
'.root' 996 shortfile_path = os.path.join(out_subpath, shortfile)
997 print(
'%s\n->%s\n' % (file_path, shortfile_path))
998 larbatch_posix.rename(file_path, shortfile_path)
1002 json_path = os.path.join(log_subpath, file +
'.json')
1003 if larbatch_posix.exists(json_path):
1004 shortjson = shortfile +
'.json' 1005 shortjson_path = os.path.join(log_subpath, shortjson)
1006 print(
'%s\n->%s\n' % (json_path, shortjson_path))
1007 larbatch_posix.rename(json_path, shortjson_path)
1017 for log_subpath, subdirs, files
in larbatch_posix.walk(stage.logdir):
1021 if len(subdirs) != 0:
1023 subdir = os.path.relpath(log_subpath, stage.logdir)
1026 book_subpath = os.path.join(stage.bookdir, subdir)
1028 if file.startswith(
'log')
and file.endswith(
'.tar'):
1029 src =
'%s/%s' % (log_subpath, file)
1030 dst =
'%s/%s' % (book_subpath, file)
1031 flag =
'%s.done' % dst
1035 if dst != src
and not larbatch_posix.exists(flag):
1039 print(
'Copying tarball %s into %s' % (src, book_subpath))
1040 if not larbatch_posix.isdir(book_subpath):
1041 larbatch_posix.makedirs(book_subpath)
1042 larbatch_posix.copy(src, dst)
1046 if not larbatch_posix.exists(flag):
1050 print(
'Extracting tarball %s' % dst)
1051 jobinfo = subprocess.Popen([
'tar',
'-xf', dst,
'-C', book_subpath,
1052 '--exclude=beam*.dat',
1053 '--exclude=beam*.info',
1059 stdout=subprocess.PIPE,
1060 stderr=subprocess.PIPE)
1061 jobout, joberr = jobinfo.communicate()
1068 print(
'Failed to extract log tarball in %s' % dst)
1074 f = larbatch_posix.open(flag,
'w')
1081 larbatch_posix.remove(dst)
1142 if quick == 1
and not ana:
1149 if not larbatch_posix.exists(stage.outdir):
1150 print(
'Output directory %s does not exist.' % stage.outdir)
1152 if not larbatch_posix.exists(stage.bookdir):
1153 print(
'Log directory %s does not exist.' % stage.bookdir)
1157 has_metadata = project.file_type !=
'' or project.run_type !=
'' 1158 has_input = stage.inputfile !=
'' or stage.inputlist !=
'' or stage.inputdef !=
'' 1159 print(
'Checking directory %s' % stage.bookdir)
1177 for log_subpath, subdirs, files
in larbatch_posix.walk(stage.bookdir):
1181 if len(subdirs) != 0:
1184 subdir = os.path.relpath(log_subpath, stage.bookdir)
1187 out_subpath = os.path.join(stage.outdir, subdir)
1188 dirok = project_utilities.fast_isdir(log_subpath)
1192 if dirok
and log_subpath[-6:] ==
'_start':
1193 filename = os.path.join(log_subpath,
'sam_project.txt')
1194 if larbatch_posix.exists(filename):
1195 sam_project = larbatch_posix.readlines(filename)[0].strip()
1196 if sam_project !=
'' and not sam_project
in sam_projects:
1197 sam_projects.append(sam_project)
1201 if dirok
and not subdir[-6:] ==
'_start' and not subdir[-5:] ==
'_stop' \
1202 and not subdir ==
'log':
1208 if not project_utilities.fast_isdir(out_subpath):
1209 print(
'No output directory corresponding to subdirectory %s.' % subdir)
1215 stat_filename = os.path.join(log_subpath,
'lar.stat')
1216 if larbatch_posix.exists(stat_filename):
1219 status =
int(larbatch_posix.readlines(stat_filename)[0].strip())
1221 print(
'Job in subdirectory %s ended with non-zero exit status %d.' % (
1225 print(
'Bad file lar.stat in subdirectory %s.' % subdir)
1233 nev, roots, subhists =
check_root(out_subpath, log_subpath, stage.datafiletypes)
1235 if len(roots) == 0
or nev < 0:
1236 print(
'Problem with root file(s) in subdirectory %s.' % subdir)
1238 elif nev < -1
or len(subhists) == 0:
1239 print(
'Problem with analysis root file(s) in subdirectory %s.' % subdir)
1245 if not bad
and has_metadata:
1247 rootname = os.path.basename(root[0])
1248 for s
in list(procmap.keys()):
1249 oldroots = procmap[s]
1250 for oldroot
in oldroots:
1251 oldrootname = os.path.basename(oldroot[0])
1252 if rootname == oldrootname:
1253 print(
'Duplicate filename %s in subdirectory %s' % (rootname,
1255 olddir = os.path.basename(os.path.dirname(oldroot[0]))
1256 print(
'Previous subdirectory %s' % olddir)
1261 if not bad
and has_metadata:
1263 rootname = os.path.basename(root[0])
1264 if len(rootname) >= 200:
1265 print(
'Filename %s in subdirectory %s is longer than 200 characters.' % (
1272 if not bad
and stage.inputdef !=
'':
1273 filename1 = os.path.join(log_subpath,
'sam_project.txt')
1274 if not larbatch_posix.exists(filename1):
1275 print(
'Could not find file sam_project.txt')
1277 filename2 = os.path.join(log_subpath,
'cpid.txt')
1278 if not larbatch_posix.exists(filename2):
1279 print(
'Could not find file cpid.txt')
1282 sam_project = larbatch_posix.readlines(filename1)[0].strip()
1283 if not sam_project
in sam_projects:
1284 sam_projects.append(sam_project)
1285 cpid = larbatch_posix.readlines(filename2)[0].strip()
1286 if not cpid
in cpids:
1292 if not bad
and (stage.inputlist !=
'' or stage.inputfile !=
''):
1293 filename = os.path.join(log_subpath,
'transferred_uris.list')
1294 if not larbatch_posix.exists(filename):
1295 print(
'Could not find file transferred_uris.list')
1298 lines = larbatch_posix.readlines(filename)
1308 subdir_split = subdir.split(
'_')
1309 if len(subdir_split) > 1:
1310 process =
int(subdir_split[1])
1311 if process
in processes:
1312 print(
'Duplicate process number')
1315 processes.append(process)
1320 procmap[subdir] = roots
1324 filesana.extend(subhists)
1328 nev_tot = nev_tot + nev
1329 nroot_tot = nroot_tot + len(roots)
1334 bad_workers.append(subdir)
1339 print(
'Bad subdirectory %s.' % subdir)
1350 contents = larbatch_posix.listdir(stage.bookdir)
1351 if len(contents) == 0:
1352 print(
'Directory %s may be dead.' % stage.bookdir)
1353 print(
'Returning error status without creating any bookkeeping files.')
1358 filelistname = os.path.join(stage.bookdir,
'files.list')
1361 eventslistname = os.path.join(stage.bookdir,
'events.list')
1362 eventslist =
safeopen(eventslistname)
1364 badfilename = os.path.join(stage.bookdir,
'bad.list')
1367 missingfilesname = os.path.join(stage.bookdir,
'missing_files.list')
1368 missingfiles =
safeopen(missingfilesname)
1370 filesanalistname = os.path.join(stage.bookdir,
'filesana.list')
1371 filesanalist =
safeopen(filesanalistname)
1373 urislistname = os.path.join(stage.bookdir,
'transferred_uris.list')
1382 for s
in list(procmap.keys()):
1384 for root
in procmap[s]:
1386 filelist.write(
'%s\n' % root[0])
1387 eventslist.write(
'%s %d\n' % root[:2])
1390 if stream
not in streams:
1391 streamlistname = os.path.join(stage.bookdir,
'files_%s.list' % stream)
1392 streams[stream] =
safeopen(streamlistname)
1393 streams[stream].
write(
'%s\n' % root[0])
1398 for bad_worker
in bad_workers:
1399 badfile.write(
'%s\n' % bad_worker)
1405 if stage.inputdef ==
'' and not stage.pubs_output:
1407 if len(input_files) > 0:
1408 missing_files = list(set(input_files) - set(uris))
1409 for missing_file
in missing_files:
1410 missingfiles.write(
'%s\n' % missing_file)
1413 nmiss = stage.num_jobs - len(procmap)
1414 for n
in range(nmiss):
1415 missingfiles.write(
'/dev/null\n')
1420 for hist
in filesana:
1421 filesanalist.write(
'%s\n' % hist)
1426 urislist.write(
'%s\n' % uri)
1431 print(
"%d processes completed successfully." % nproc)
1432 print(
"%d total good histogram files." % len(filesana))
1434 print(
"%d total good events." % nev_tot)
1435 print(
"%d total good root files." % nroot_tot)
1436 print(
"%d total good histogram files." % len(filesana))
1442 project_utilities.addLayerTwo(filelistname)
1445 project_utilities.addLayerTwo(eventslistname)
1450 missingfiles.write(
'\n')
1451 missingfiles.close()
1452 filesanalist.close()
1453 if len(filesana) == 0:
1454 project_utilities.addLayerTwo(filesanalistname)
1456 urislist.write(
'\n')
1458 for stream
in list(streams.keys()):
1459 streams[stream].
close()
1463 if stage.inputdef !=
'' and not stage.pubs_input:
1467 sam_projects_filename = os.path.join(stage.bookdir,
'sam_projects.list')
1468 sam_projects_file =
safeopen(sam_projects_filename)
1469 for sam_project
in sam_projects:
1470 sam_projects_file.write(
'%s\n' % sam_project)
1471 sam_projects_file.close()
1472 if len(sam_projects) == 0:
1473 project_utilities.addLayerTwo(sam_projects_filename)
1477 cpids_filename = os.path.join(stage.bookdir,
'cpids.list')
1478 cpids_file =
safeopen(cpids_filename)
1480 cpids_file.write(
'%s\n' % cpid)
1483 project_utilities.addLayerTwo(cpids_filename)
1490 cpids_list = cpids_list +
'%s%s' % (sep, cpid)
1492 if cpids_list !=
'':
1493 dim =
'consumer_process_id %s and consumed_status consumed' % cpids_list
1495 nconsumed = samweb.countFiles(dimensions=dim)
1501 if cpids_list !=
'':
1502 udim =
'(defname: %s) minus (%s)' % (stage.inputdef, dim)
1504 udim =
'defname: %s' % stage.inputdef
1505 nunconsumed = samweb.countFiles(dimensions=udim)
1506 nerror = nerror + nunconsumed
1510 print(
'%d sam projects.' % len(sam_projects))
1511 print(
'%d successful consumer process ids.' % len(cpids))
1512 print(
'%d files consumed.' % nconsumed)
1513 print(
'%d files not consumed.' % nunconsumed)
1517 for sam_project
in sam_projects:
1518 print(
'\nChecking sam project %s' % sam_project)
1520 url = samweb.findProject(sam_project, project_utilities.get_experiment())
1522 result = samweb.projectSummary(url)
1528 if 'processes' in result:
1529 processes = result[
'processes']
1530 for process
in processes:
1532 if 'status' in process:
1533 if process[
'status'] ==
'active':
1535 if 'counts' in process:
1536 counts = process[
'counts']
1537 if 'delivered' in counts:
1538 nd = nd + counts[
'delivered']
1539 if 'consumed' in counts:
1540 nc = nc + counts[
'consumed']
1541 if 'failed' in counts:
1542 nf = nf + counts[
'failed']
1543 print(
'Status: %s' % result[
'project_status'])
1544 print(
'%d total processes' % nproc)
1545 print(
'%d active processes' % nact)
1546 print(
'%d files in snapshot' % result[
'files_in_snapshot'])
1547 print(
'%d files delivered' % (nd + nc))
1548 print(
'%d files consumed' % nc)
1549 print(
'%d files failed' % nf)
1554 checkfilename = os.path.join(stage.bookdir,
'checked')
1555 checkfile =
safeopen(checkfilename)
1556 checkfile.write(
'\n')
1558 project_utilities.addLayerTwo(checkfilename)
1560 if stage.inputdef ==
'' or stage.pubs_input:
1561 print(
'%d processes with errors.' % nerror)
1562 print(
'%d missing files.' % nmiss)
1564 print(
'%d unconsumed files.' % nerror)
1572 if not ana
and nroot_tot == 0:
1574 if len(procmap) == 0:
1581 if not larbatch_posix.isdir(stage.outdir):
1582 print(
'Output directory %s does not exist.' % stage.outdir)
1585 if not larbatch_posix.isdir(stage.bookdir):
1586 print(
'Log directory %s does not exist.' % stage.bookdir)
1589 print(
'Checking directory %s' % stage.bookdir)
1598 transferredFiles = []
1607 for log_subpath, subdirs, files
in larbatch_posix.walk(stage.bookdir):
1611 if len(subdirs) != 0:
1615 if log_subpath[-6:] ==
'_start' or log_subpath[-5:] ==
'_stop':
1616 filename = os.path.join(log_subpath,
'sam_project.txt')
1617 if larbatch_posix.exists(filename):
1618 sam_project = larbatch_posix.readlines(filename)[0].strip()
1619 if sam_project !=
'' and not sam_project
in sam_projects:
1620 sam_projects.append(sam_project)
1624 print(
'Doing quick check of directory %s.' % log_subpath)
1626 subdir = os.path.relpath(log_subpath, stage.bookdir)
1628 out_subpath = os.path.join(stage.outdir, subdir)
1629 dirok = project_utilities.fast_isdir(log_subpath)
1636 missingfilesname = os.path.join(log_subpath,
'missing_files.list')
1642 missingfiles = project_utilities.saferead(missingfilesname)
1645 print(
'Cannot open file: %s' % missingfilesname)
1649 if validateOK == 1
and len(missingfiles) == 0:
1650 print(
'%s exists, but is empty' % missingfilesname)
1655 line = missingfiles[0]
1656 line = line.strip(
'\n')
1657 if(
int(line) != 0 ):
1672 if stage.inputdef !=
'':
1674 filename1 = os.path.join(log_subpath,
'sam_project.txt')
1675 if not larbatch_posix.exists(filename1):
1676 print(
'Could not find file sam_project.txt')
1679 sam_project = larbatch_posix.readlines(filename1)[0].strip()
1680 if not sam_project
in sam_projects:
1681 sam_projects.append(sam_project)
1683 filename2 = os.path.join(log_subpath,
'cpid.txt')
1684 if not larbatch_posix.exists(filename2):
1685 print(
'Could not find file cpid.txt')
1688 cpid = larbatch_posix.readlines(filename2)[0].strip()
1689 if not cpid
in cpids:
1692 filelistsrc = os.path.join(log_subpath,
'files.list')
1695 if( tmpArray == [ -1 ] ):
1698 goodFiles.extend(tmpArray)
1700 fileanalistsrc = os.path.join(log_subpath,
'filesana.list')
1703 if(
not tmpArray == [ -1 ] ):
1704 goodAnaFiles.extend(tmpArray)
1706 eventlistsrc = os.path.join(log_subpath,
'events.list')
1710 if( tmpArray == [ -1 ] ):
1713 eventLists.extend(tmpArray)
1716 badfilesrc = os.path.join(log_subpath,
'bad.list')
1722 if( tmpArray == [ -1 ] ):
1725 badLists.extend(tmpArray)
1728 missingfilesrc = os.path.join(log_subpath, 'missing_files.list') 1730 tmpArray = scan_file(missingfilesrc) 1732 if( tmpArray == [ -1 ] ): 1735 missingLists.extend(tmpArray) 1748 urislistsrc = os.path.join(log_subpath,
'transferred_uris.list')
1753 if( tmpArray == [ -1 ] ):
1756 transferredFiles.extend(tmpArray)
1758 streamList = larbatch_posix.listdir(log_subpath)
1760 for stream
in streamList:
1761 if( stream[:6] !=
"files_" ):
1763 streamfilesrc = os.path.join(log_subpath, stream)
1766 if( tmpArray == [ -1 ] ):
1769 if(streamLists.get(stream,
"empty") ==
"empty" ):
1770 streamLists[stream] = tmpArray
1772 streamLists[stream].extend(tmpArray)
1775 goodLogDirs.add(log_subpath)
1777 checkfilename = os.path.join(stage.bookdir,
'checked')
1778 checkfile =
safeopen(checkfilename)
1779 checkfile.write(
'\n')
1783 filelistdest = os.path.join(stage.bookdir,
'files.list')
1784 if larbatch_posix.exists(filelistdest):
1786 larbatch_posix.remove(filelistdest)
1787 if len(goodLogDirs) == 1:
1788 src =
'%s/files.list' % goodLogDirs.copy().pop()
1790 larbatch_posix.symlink(src, filelistdest)
1794 for goodFile
in goodFiles:
1796 inputList.write(
"%s\n" % goodFile)
1798 if len(goodFiles) == 0:
1799 project_utilities.addLayerTwo(filelistdest)
1802 fileanalistdest = os.path.join(stage.bookdir,
'filesana.list')
1803 if larbatch_posix.exists(fileanalistdest):
1805 larbatch_posix.remove(fileanalistdest)
1806 if len(goodLogDirs) == 1:
1807 src =
'%s/filesana.list' % goodLogDirs.copy().pop()
1809 larbatch_posix.symlink(src, fileanalistdest)
1812 anaList =
safeopen(fileanalistdest)
1813 for goodAnaFile
in goodAnaFiles:
1815 anaList.write(
"%s\n" % goodAnaFile)
1817 if len(goodAnaFiles) == 0:
1818 project_utilities.addLayerTwo(fileanalistdest)
1821 eventlistdest = os.path.join(stage.bookdir,
'events.list')
1822 if larbatch_posix.exists(eventlistdest):
1824 larbatch_posix.remove(eventlistdest)
1825 if len(goodLogDirs) == 1:
1826 src =
'%s/events.list' % goodLogDirs.copy().pop()
1828 larbatch_posix.symlink(src, eventlistdest)
1831 eventsOutList =
safeopen(eventlistdest)
1832 for event
in eventLists:
1834 eventsOutList.write(
"%s\n" % event)
1835 eventsOutList.close()
1836 if len(eventLists) == 0:
1837 project_utilities.addLayerTwo(eventlistdest)
1840 if(len(badLists) > 0):
1841 badlistdest = os.path.join(stage.bookdir,
'bad.list')
1843 for bad
in badLists:
1844 badOutList.write(
"%s\n" % bad)
1850 if stage.inputdef ==
'' and not stage.pubs_output:
1852 if len(input_files) > 0:
1853 missing_files = list(set(input_files) - set(transferredFiles))
1855 if len(missing_files) > 0:
1856 missinglistdest = os.path.join(stage.bookdir,
'missing_files.list')
1857 missingOutList =
safeopen(missinglistdest)
1858 for missing
in missing_files:
1859 missingOutList.write(
"%s\n" % missing)
1860 missingOutList.close()
1864 urilistdest = os.path.join(stage.bookdir,
'transferred_uris.list')
1865 if larbatch_posix.exists(urilistdest):
1867 larbatch_posix.remove(urilistdest)
1868 if len(goodLogDirs) == 1
and len(transferredFiles) > 0:
1869 src =
'%s/transferred_uris.list' % goodLogDirs.copy().pop()
1871 larbatch_posix.symlink(src, urilistdest)
1875 for uri
in transferredFiles:
1877 uriOutList.write(
"%s\n" % uri)
1879 if len(transferredFiles) == 0:
1880 project_utilities.addLayerTwo(urilistdest)
1882 if stage.inputdef !=
'':
1883 samprojectdest = os.path.join(stage.bookdir,
'sam_projects.list')
1884 if larbatch_posix.exists(samprojectdest):
1886 larbatch_posix.remove(samprojectdest)
1887 if len(goodLogDirs) == 1:
1888 src =
'%s/sam_project.txt' % goodLogDirs.copy().pop()
1890 larbatch_posix.symlink(src, samprojectdest)
1893 samprojectfile =
safeopen(samprojectdest)
1894 for sam
in sam_projects:
1895 samprojectfile.write(
"%s\n" % sam)
1896 samprojectfile.close()
1897 if len(sam_projects) == 0:
1898 project_utilities.addLayerTwo(samprojectdest)
1900 cpiddest = os.path.join(stage.bookdir,
'cpids.list')
1901 if larbatch_posix.exists(cpiddest):
1903 larbatch_posix.remove(cpiddest)
1904 if len(goodLogDirs) == 1:
1905 src =
'%s/cpid.txt' % goodLogDirs.copy().pop()
1907 larbatch_posix.symlink(src, cpiddest)
1912 cpidfile.write(
"%s \n" % cp)
1915 project_utilities.addLayerTwo(cpiddest)
1918 for stream
in streamLists:
1919 streamdest = os.path.join(stage.bookdir, stream)
1920 if larbatch_posix.exists(streamdest):
1922 larbatch_posix.remove(streamdest)
1923 if len(goodLogDirs) == 1:
1924 src =
'%s/%s' % (goodLogDirs.copy().pop(), stream)
1926 larbatch_posix.symlink(src, streamdest)
1929 streamOutList =
safeopen(streamdest)
1930 for line
in streamLists[stream]:
1931 streamOutList.write(
"%s\n" % line)
1932 streamOutList.close()
1933 if len(streamLists[stream]) == 0:
1934 project_utilities.addLayerTwo(streamdest)
1940 print(
'Number of errors = %d' % nErrors)
1965 for dirpath, dirnames, filenames
in larbatch_posix.walk(stage.bookdir):
1966 for filename
in filenames:
1967 if filename ==
'env.txt':
1978 envpath = os.path.join(dirpath, filename)
1979 vars = larbatch_posix.readlines(envpath)
1984 varsplit = var.split(
'=', 1)
1985 name = varsplit[0].strip()
1986 if name ==
'JOBSUBPARENTJOBID':
1987 logid = varsplit[1].strip()
1992 logsplit = logid.split(
'@', 1)
1993 cluster_process = logsplit[0]
1994 server = logsplit[1]
1995 cluster = cluster_process.split(
'.', 1)[0]
1996 logid = cluster +
'.0' +
'@' + server
1997 logids.append(logid)
2004 varsplit = var.split(
'=', 1)
2005 name = varsplit[0].strip()
2006 if name ==
'JOBSUBJOBID':
2007 logid = varsplit[1].strip()
2012 logsplit = logid.split(
'@', 1)
2013 cluster_process = logsplit[0]
2014 server = logsplit[1]
2015 cluster = cluster_process.split(
'.', 1)[0]
2016 logid = cluster +
'.0' +
'@' + server
2017 logids.append(logid)
2026 logdir = os.path.join(stage.bookdir,
'log')
2027 if larbatch_posix.exists(logdir):
2028 larbatch_posix.rmtree(logdir)
2029 larbatch_posix.mkdir(logdir)
2033 for logid
in set(logids):
2039 print(
'Fetching log files for id %s' % logid)
2040 command = [
'jobsub_fetchlog']
2041 if project.server !=
'-' and project.server !=
'':
2042 command.append(
'--jobsub-server=%s' % project.server)
2043 command.append(
'--jobid=%s' % logid)
2044 command.append(
'--dest-dir=%s' % logdir)
2045 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2046 jobout, joberr = jobinfo.communicate()
2051 raise JobsubError(command, rc, jobout, joberr)
2063 print(
'Failed to fetch log files.')
2084 listname =
'files.list' 2086 listname =
'filesana.list' 2087 fnlist = os.path.join(logdir, listname)
2088 if larbatch_posix.exists(fnlist):
2089 roots = larbatch_posix.readlines(fnlist)
2091 raise RuntimeError(
'No %s file found %s, run project.py --check' % (listname, fnlist))
2095 fn = os.path.basename(path)
2096 dirpath = os.path.dirname(path)
2097 dirname = os.path.relpath(dirpath, outdir)
2101 has_metadata =
False 2103 md = samweb.getMetadata(filenameorid=fn)
2105 except samweb_cli.exceptions.FileNotFound:
2111 print(
'Metadata OK: %s' % fn)
2114 print(
'Declaring: %s' % fn)
2115 jsonfile = os.path.join(logdir, os.path.join(dirname, fn)) +
'.json' 2117 if larbatch_posix.exists(jsonfile):
2118 mdlines = larbatch_posix.readlines(jsonfile)
2120 for line
in mdlines:
2121 mdtext = mdtext + line
2123 md = json.loads(mdtext)
2131 expSpecificMetaData = expMetaData(os.environ[
'SAM_EXPERIMENT'],larbatch_posix.root_stream(path))
2132 md = expSpecificMetaData.getmetadata(mdjson)
2134 project_utilities.test_kca()
2140 samweb.declareFile(md=md)
2145 print(
'SAM declare failed.')
2149 print(
'No sam metadata found for %s.' % fn)
2151 print(
'Not declared: %s' % fn)
2166 result = samweb.listFilesSummary(dimensions=dim)
2167 for key
in list(result.keys()):
2168 print(
'%s: %s' % (key, result[key]))
2195 desc = samweb.descDefinition(defname=defname)
2197 except samweb_cli.exceptions.DefinitionNotFound:
2203 print(
'Definition already exists: %s' % defname)
2206 print(
'Creating definition %s.' % defname)
2207 project_utilities.test_kca()
2208 samweb.createDefinition(defname=defname, dims=dim)
2211 print(
'Definition should be created: %s' % defname)
2225 result = samweb.listFilesSummary(defname=defname)
2226 for key
in list(result.keys()):
2227 print(
'%s: %s' % (key, result[key]))
2246 desc = samweb.descDefinition(defname=defname)
2248 except samweb_cli.exceptions.DefinitionNotFound:
2254 print(
'Deleting definition: %s' % defname)
2255 project_utilities.test_kca()
2256 samweb.deleteDefinition(defname=defname)
2258 print(
'No such definition: %s' % defname)
2268 print(
'Adding disk locations.')
2270 print(
'Cleaning disk locations.')
2272 print(
'Removing disk locations.')
2274 print(
'Uploading to FTS.')
2276 print(
'Checking disk locations.')
2284 filelist = samweb.listFiles(dimensions=dim, stream=
False)
2289 for filename
in filelist:
2290 disk_dict[filename] = []
2291 for out_subpath, subdirs, files
in larbatch_posix.walk(outdir):
2295 if len(subdirs) != 0:
2300 disk_dict[fn].append(out_subpath)
2304 for filename
in filelist:
2305 disk_locs = disk_dict[filename]
2306 sam_locs = samweb.locateFile(filenameorid=filename)
2307 if len(sam_locs) == 0
and not upload:
2308 print(
'No location: %s' % filename)
2315 for disk_loc
in disk_locs:
2317 for sam_loc
in sam_locs:
2318 if sam_loc[
'location_type'] ==
'disk':
2319 if disk_loc == sam_loc[
'location'].
split(
':')[-1]:
2323 locs_to_add.append(disk_loc)
2332 for sam_loc
in sam_locs:
2333 if sam_loc[
'location_type'] ==
'disk':
2338 locs_to_remove.append(sam_loc[
'location'])
2346 local_path = os.path.join(sam_loc[
'location'].
split(
':')[-1], filename)
2347 if not larbatch_posix.exists(local_path):
2348 locs_to_remove.append(sam_loc[
'location'])
2356 should_upload =
False 2357 if upload
and len(disk_locs) > 0:
2358 should_upload =
True 2359 for sam_loc
in sam_locs:
2360 if sam_loc[
'location_type'] ==
'tape':
2361 should_upload =
False 2364 dropbox = project_utilities.get_dropbox(filename)
2365 if not larbatch_posix.exists(dropbox):
2366 print(
'Making dropbox directory %s.' % dropbox)
2367 larbatch_posix.makedirs(dropbox)
2368 locs_to_upload[disk_locs[0]] = dropbox
2372 for loc
in locs_to_add:
2373 node = project_utilities.get_bluearc_server()
2374 if loc[0:6] ==
'/pnfs/':
2375 node = project_utilities.get_dcache_server()
2376 loc = node + loc.split(
':')[-1]
2378 print(
'Adding location: %s.' % loc)
2379 project_utilities.test_kca()
2380 samweb.addFileLocation(filenameorid=filename, location=loc)
2382 print(
'Can add location: %s.' % loc)
2384 for loc
in locs_to_remove:
2386 print(
'Removing location: %s.' % loc)
2387 project_utilities.test_kca()
2388 samweb.removeFileLocation(filenameorid=filename, location=loc)
2390 print(
'Should remove location: %s.' % loc)
2392 for loc
in list(locs_to_upload.keys()):
2393 dropbox = locs_to_upload[loc]
2397 if not larbatch_posix.isdir(dropbox):
2398 print(
'Dropbox directory %s does not exist.' % dropbox)
2403 dropbox_filename = os.path.join(dropbox, filename)
2404 if larbatch_posix.exists(dropbox_filename):
2405 print(
'File %s already exists in dropbox %s.' % (filename, dropbox))
2410 loc_filename = os.path.join(loc, filename)
2414 if project_utilities.mountpoint(loc_filename) == \
2415 project_utilities.mountpoint(dropbox_filename):
2416 print(
'Symlinking %s to dropbox directory %s.' % (filename, dropbox))
2417 relpath = os.path.relpath(os.path.realpath(loc_filename), dropbox)
2418 print(
'relpath=',relpath)
2419 print(
'dropbox_filename=',dropbox_filename)
2420 larbatch_posix.symlink(relpath, dropbox_filename)
2423 print(
'Copying %s to dropbox directory %s.' % (filename, dropbox))
2424 larbatch_posix.copy(loc_filename, dropbox_filename)
2446 filelist = samweb.listFiles(dimensions=dim, stream=
True)
2449 filename = next(filelist)
2450 except StopIteration:
2460 sam_locs = samweb.locateFile(filenameorid=filename)
2461 for sam_loc
in sam_locs:
2462 if sam_loc[
'location_type'] ==
'tape':
2467 print(
'On tape: %s' % filename)
2471 print(
'Not on tape: %s' % filename)
2473 print(
'%d files.' % ntot)
2474 print(
'%d files need to be store on tape.' % nbad)
2494 tmpdir = tempfile.mkdtemp()
2498 tmpworkdir = tempfile.mkdtemp()
2504 jobsub_workdir_files_args = []
2508 input_list_name =
'' 2509 if stage.inputlist !=
'':
2510 input_list_name = os.path.basename(stage.inputlist)
2511 work_list_name = os.path.join(tmpworkdir, input_list_name)
2512 if stage.inputlist != work_list_name:
2513 input_files = larbatch_posix.readlines(stage.inputlist)
2514 print(
'Making input list.')
2515 work_list =
safeopen(work_list_name)
2516 for input_file
in input_files:
2517 print(
'Adding input file %s' % input_file)
2518 work_list.write(
'%s\n' % input_file.strip())
2520 print(
'Done making input list.')
2524 fcls = project.get_fcl(stage.fclname)
2529 workfcl = os.path.join(tmpworkdir, os.path.basename(fcl))
2530 if os.path.abspath(fcl) != os.path.abspath(workfcl):
2531 larbatch_posix.copy(fcl, workfcl)
2539 wrapper_fcl_name = os.path.join(tmpworkdir,
'wrapper.fcl')
2540 wrapper_fcl =
safeopen(wrapper_fcl_name)
2542 original_project_name = project.name
2543 original_stage_name = stage.name
2544 original_project_version = project.version
2547 wrapper_fcl.write(
'#---STAGE %d\n' % stageNum)
2548 wrapper_fcl.write(
'#include "%s"\n' % os.path.basename(fcl))
2549 wrapper_fcl.write(
'\n')
2554 xml_has_metadata = project.file_type !=
'' or \
2555 project.run_type !=
'' 2556 if xml_has_metadata:
2560 if project.release_tag !=
'':
2561 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "%s"\n' % \
2562 project.release_tag)
2564 wrapper_fcl.write(
'services.FileCatalogMetadata.applicationVersion: "test"\n')
2565 if project.file_type:
2566 wrapper_fcl.write(
'services.FileCatalogMetadata.fileType: "%s"\n' % \
2568 if project.run_type:
2569 wrapper_fcl.write(
'services.FileCatalogMetadata.runType: "%s"\n' % \
2575 if stageNum < len(stage.project_name)
and stage.project_name[stageNum] !=
'':
2576 project.name = stage.project_name[stageNum]
2577 if stageNum < len(stage.stage_name)
and stage.stage_name[stageNum] !=
'':
2578 stage.name = stage.stage_name[stageNum]
2579 if stageNum < len(stage.project_version)
and stage.project_version[stageNum] !=
'':
2580 project.version = stage.project_version[stageNum]
2581 sam_metadata = project_utilities.get_sam_metadata(project, stage)
2583 wrapper_fcl.write(sam_metadata)
2584 project.name = original_project_name
2585 stage.name = original_stage_name
2586 project.version = original_project_version
2591 if (
not stage.pubs_input
and stage.pubs_output)
or stage.output_run:
2592 wrapper_fcl.write(
'source.firstRun: %d\n' % stage.output_run)
2598 if stage.maxfluxfilemb != 0
and stageNum == 0:
2599 wrapper_fcl.write(
'physics.producers.generator.FluxCopyMethod: "IFDH"\n')
2600 wrapper_fcl.write(
'physics.producers.generator.MaxFluxFileMB: %d\n' % stage.maxfluxfilemb)
2601 wrapper_fcl.write(
'#---END_STAGE\n')
2602 stageNum = 1 + stageNum
2611 abssetupscript = project_utilities.get_setup_script_path()
2613 if not abssetupscript.startswith(
'/cvmfs/'):
2614 setupscript = os.path.join(stage.workdir,
'setup_experiment.sh')
2615 larbatch_posix.copy(abssetupscript, setupscript)
2616 jobsub_workdir_files_args.extend([
'-f', setupscript])
2621 if stage.batchname !=
'':
2622 workname = stage.batchname
2624 workname =
'%s-%s-%s' % (stage.name, project.name, project.release_tag)
2625 workname = workname + os.path.splitext(stage.script)[1]
2627 workscript = os.path.join(tmpdir, workname)
2628 if stage.script != workscript:
2629 larbatch_posix.copy(stage.script, workscript)
2633 workstartscript =
'' 2635 if stage.start_script !=
'':
2636 workstartname =
'start-%s' % workname
2638 workstartscript = os.path.join(tmpdir, workstartname)
2639 if stage.start_script != workstartscript:
2640 larbatch_posix.copy(stage.start_script, workstartscript)
2646 if stage.stop_script !=
'':
2647 workstopname =
'stop-%s' % workname
2649 workstopscript = os.path.join(tmpdir, workstopname)
2650 if stage.stop_script != workstopscript:
2651 larbatch_posix.copy(stage.stop_script, workstopscript)
2655 for init_script
in stage.init_script:
2656 if init_script !=
'':
2657 if not larbatch_posix.exists(init_script):
2658 raise RuntimeError(
'Worker initialization script %s does not exist.\n' % \
2660 work_init_script = os.path.join(tmpworkdir, os.path.basename(init_script))
2661 if init_script != work_init_script:
2662 larbatch_posix.copy(init_script, work_init_script)
2666 n = len(stage.init_script)
2668 stage.init_script =
'' 2670 stage.init_script = stage.init_script[0]
2675 work_init_wrapper = os.path.join(tmpworkdir,
'init_wrapper.sh')
2676 f =
open(work_init_wrapper,
'w')
2677 f.write(
'#! /bin/bash\n')
2678 for init_script
in stage.init_script:
2680 f.write(
'echo "Executing %s"\n' % os.path.basename(init_script))
2681 f.write(
'./%s\n' % os.path.basename(init_script))
2682 f.write(
'status=$?\n')
2683 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(init_script))
2684 f.write(
'if [ $status -ne 0 ]; then\n')
2685 f.write(
' exit $status\n')
2688 f.write(
'echo "Done executing initialization scripts."\n')
2690 stage.init_script = work_init_wrapper
2694 for init_source
in stage.init_source:
2695 if init_source !=
'':
2696 if not larbatch_posix.exists(init_source):
2697 raise RuntimeError(
'Worker initialization source script %s does not exist.\n' % \
2699 work_init_source = os.path.join(tmpworkdir, os.path.basename(init_source))
2700 if init_source != work_init_source:
2701 larbatch_posix.copy(init_source, work_init_source)
2705 n = len(stage.init_source)
2707 stage.init_source =
'' 2709 stage.init_source = stage.init_source[0]
2715 work_init_source_wrapper = os.path.join(tmpworkdir,
'init_source_wrapper.sh')
2716 f =
open(work_init_source_wrapper,
'w')
2717 for init_source
in stage.init_source:
2719 f.write(
'echo "Sourcing %s"\n' % os.path.basename(init_source))
2720 f.write(
'source %s\n' % os.path.basename(init_source))
2722 f.write(
'echo "Done sourcing initialization scripts."\n')
2724 stage.init_source = work_init_source_wrapper
2728 for end_script
in stage.end_script:
2729 if end_script !=
'':
2730 if not larbatch_posix.exists(end_script):
2731 raise RuntimeError(
'Worker end-of-job script %s does not exist.\n' % end_script)
2732 work_end_script = os.path.join(tmpworkdir, os.path.basename(end_script))
2733 if end_script != work_end_script:
2734 larbatch_posix.copy(end_script, work_end_script)
2738 n = len(stage.end_script)
2740 stage.end_script =
'' 2742 stage.end_script = stage.end_script[0]
2747 work_end_wrapper = os.path.join(tmpworkdir,
'end_wrapper.sh')
2748 f =
open(work_end_wrapper,
'w')
2749 f.write(
'#! /bin/bash\n')
2750 for end_script
in stage.end_script:
2752 f.write(
'echo "Executing %s"\n' % os.path.basename(end_script))
2753 f.write(
'./%s\n' % os.path.basename(end_script))
2754 f.write(
'status=$?\n')
2755 f.write(
'echo "%s finished with status $status"\n' % os.path.basename(end_script))
2756 f.write(
'if [ $status -ne 0 ]; then\n')
2757 f.write(
' exit $status\n')
2760 f.write(
'echo "Done executing finalization scripts."\n')
2762 stage.end_script = work_end_wrapper
2766 for istage
in stage.mid_source:
2767 for mid_source
in stage.mid_source[istage]:
2768 if mid_source !=
'':
2769 if not larbatch_posix.exists(mid_source):
2770 raise RuntimeError(
'Worker midstage initialization source script %s does not exist.\n' % mid_source)
2771 work_mid_source = os.path.join(tmpworkdir, os.path.basename(mid_source))
2772 if mid_source != work_mid_source:
2773 larbatch_posix.copy(mid_source, work_mid_source)
2779 if len(stage.mid_source) > 0:
2780 work_mid_source_wrapper = os.path.join(tmpworkdir,
'mid_source_wrapper.sh')
2781 f =
open(work_mid_source_wrapper,
'w')
2782 for istage
in stage.mid_source:
2783 for mid_source
in stage.mid_source[istage]:
2784 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2786 f.write(
' echo "Sourcing %s"\n' % os.path.basename(mid_source))
2787 f.write(
' source %s\n' % os.path.basename(mid_source))
2790 f.write(
'echo "Done sourcing midstage source initialization scripts for stage $stage."\n')
2792 stage.mid_source = work_mid_source_wrapper
2794 stage.mid_source =
'' 2798 for istage
in stage.mid_script:
2799 for mid_script
in stage.mid_script[istage]:
2800 if mid_script !=
'':
2801 if not larbatch_posix.exists(mid_script):
2802 raise RuntimeError(
'Worker midstage finalization script %s does not exist.\n' % mid_script)
2803 work_mid_script = os.path.join(tmpworkdir, os.path.basename(mid_script))
2804 if mid_script != work_mid_script:
2805 larbatch_posix.copy(mid_script, work_mid_script)
2810 if len(stage.mid_script) > 0:
2811 work_mid_wrapper = os.path.join(tmpworkdir,
'mid_wrapper.sh')
2812 f =
open(work_mid_wrapper,
'w')
2813 f.write(
'#! /bin/bash\n')
2814 f.write(
'stage=$1\n')
2815 for istage
in stage.mid_script:
2816 for mid_script
in stage.mid_script[istage]:
2817 f.write(
'if [ $stage -eq %d ]; then\n' % istage)
2819 f.write(
' echo "Executing %s"\n' % os.path.basename(mid_script))
2820 f.write(
' ./%s\n' % os.path.basename(mid_script))
2821 f.write(
' status=$?\n')
2822 f.write(
' echo "%s finished with status $status"\n' % os.path.basename(mid_script))
2823 f.write(
' if [ $status -ne 0 ]; then\n')
2824 f.write(
' exit $status\n')
2828 f.write(
'echo "Done executing midstage finalization scripts for stage $stage."\n')
2830 stage.mid_script = work_mid_wrapper
2832 stage.mid_script =
'' 2836 helpers = (
'root_metadata.py',
2839 'validate_in_job.py',
2844 for helper
in helpers:
2848 jobinfo = subprocess.Popen([
'which', helper],
2849 stdout=subprocess.PIPE,
2850 stderr=subprocess.PIPE)
2851 jobout, joberr = jobinfo.communicate()
2855 helper_path = jobout.splitlines()[0].strip()
2857 work_helper = os.path.join(tmpworkdir, helper)
2858 if helper_path != work_helper:
2859 larbatch_posix.copy(helper_path, work_helper)
2861 print(
'Helper script %s not found.' % helper)
2866 helper_modules = (
'larbatch_posix',
2867 'project_utilities',
2868 'larbatch_utilities',
2869 'experiment_utilities',
2872 for helper_module
in helper_modules:
2876 jobinfo = subprocess.Popen([
'python'],
2877 stdin=subprocess.PIPE,
2878 stdout=subprocess.PIPE,
2879 stderr=subprocess.PIPE)
2880 cmd =
'import %s\nprint(%s.__file__)\n' % (helper_module, helper_module)
2882 jobout, joberr = jobinfo.communicate()
2886 helper_path = jobout.splitlines()[-1].strip()
2889 work_helper = os.path.join(tmpworkdir, os.path.basename(helper_path))
2890 if helper_path != work_helper:
2891 larbatch_posix.copy(helper_path, work_helper)
2893 print(
'Helper python module %s not found.' % helper_module)
2900 checked_file = os.path.join(stage.bookdir,
'checked')
2901 if not larbatch_posix.exists(checked_file):
2902 raise RuntimeError(
'Wait for any running jobs to finish and run project.py --check')
2907 bad_filename = os.path.join(stage.bookdir,
'bad.list')
2908 if larbatch_posix.exists(bad_filename):
2909 lines = larbatch_posix.readlines(bad_filename)
2911 bad_subdir = line.strip()
2912 if bad_subdir !=
'':
2913 bad_path = os.path.join(stage.outdir, bad_subdir)
2914 if larbatch_posix.exists(bad_path):
2915 print(
'Deleting %s' % bad_path)
2916 larbatch_posix.rmtree(bad_path)
2917 bad_path = os.path.join(stage.logdir, bad_subdir)
2918 if larbatch_posix.exists(bad_path):
2919 print(
'Deleting %s' % bad_path)
2920 larbatch_posix.rmtree(bad_path)
2921 bad_path = os.path.join(stage.bookdir, bad_subdir)
2922 if larbatch_posix.exists(bad_path):
2923 print(
'Deleting %s' % bad_path)
2924 larbatch_posix.rmtree(bad_path)
2931 if stage.inputdef ==
'':
2932 missing_filename = os.path.join(stage.bookdir,
'missing_files.list')
2933 if larbatch_posix.exists(missing_filename):
2934 lines = larbatch_posix.readlines(missing_filename)
2936 words = line.split()
2938 missing_files.append(words[0])
2939 makeup_count = len(missing_files)
2940 print(
'Makeup list contains %d files.' % makeup_count)
2942 if input_list_name !=
'':
2943 work_list_name = os.path.join(tmpworkdir, input_list_name)
2944 if larbatch_posix.exists(work_list_name):
2945 larbatch_posix.remove(work_list_name)
2946 work_list =
safeopen(work_list_name)
2947 for missing_file
in missing_files:
2948 work_list.write(
'%s\n' % missing_file)
2955 if stage.inputdef ==
'' and stage.inputfile ==
'' and stage.inputlist ==
'':
2956 procs = set(range(stage.num_jobs))
2961 output_files = os.path.join(stage.bookdir,
'files.list')
2962 if larbatch_posix.exists(output_files):
2963 lines = larbatch_posix.readlines(output_files)
2965 dir = os.path.basename(os.path.dirname(line))
2966 dir_parts = dir.split(
'_')
2967 if len(dir_parts) > 1:
2968 proc =
int(dir_parts[1])
2971 if len(procs) != makeup_count:
2972 raise RuntimeError(
'Makeup process list has different length than makeup count.')
2977 procmap =
'procmap.txt' 2978 procmap_path = os.path.join(tmpworkdir, procmap)
2979 procmap_file =
safeopen(procmap_path)
2981 procmap_file.write(
'%d\n' % proc)
2982 procmap_file.close()
2991 cpids_filename = os.path.join(stage.bookdir,
'cpids.list')
2992 if larbatch_posix.exists(cpids_filename):
2993 cpids_files = larbatch_posix.readlines(cpids_filename)
2994 for line
in cpids_files:
2995 cpids.append(line.strip())
3001 project_utilities.test_kca()
3002 makeup_defname = samweb.makeProjectName(stage.inputdef) +
'_makeup' 3009 cpids_list = cpids_list +
'%s%s' % (sep, cpid)
3014 dim =
'(defname: %s) minus (consumer_process_id %s and consumed_status consumed)' % (stage.inputdef, cpids_list)
3018 print(
'Creating makeup sam dataset definition %s' % makeup_defname)
3019 project_utilities.test_kca()
3020 samweb.createDefinition(defname=makeup_defname, dims=dim)
3021 makeup_count = samweb.countFiles(defname=makeup_defname)
3022 print(
'Makeup dataset contains %d files.' % makeup_count)
3026 tmptar =
'%s/work.tar' % tmpworkdir
3027 jobinfo = subprocess.Popen([
'tar',
'-cf', tmptar,
'-C', tmpworkdir,
3028 '--mtime=2018-01-01',
3029 '--exclude=work.tar',
'.'],
3030 stdout=subprocess.PIPE,
3031 stderr=subprocess.PIPE)
3032 jobout, joberr = jobinfo.communicate()
3035 raise RuntimeError(
'Failed to create work tarball in %s' % tmpworkdir)
3039 hasher = hashlib.md5()
3040 f =
open(tmptar,
'rb')
3045 hash = hasher.hexdigest()
3052 hashtar =
'%s/work%s.tar' % (stage.workdir, hash)
3053 if not larbatch_posix.exists(hashtar):
3054 larbatch_posix.copy(tmptar, hashtar)
3055 jobsub_workdir_files_args.extend([
'-f', hashtar])
3062 inputdef = stage.inputdef
3063 if makeup
and makeup_defname !=
'':
3064 inputdef = makeup_defname
3071 project_utilities.test_kca()
3072 prjname = samweb.makeProjectName(inputdef)
3077 if stage.mixinputdef !=
'':
3079 project_utilities.test_kca()
3080 mixprjname =
'mix_%s' % samweb.makeProjectName(stage.mixinputdef)
3085 if prjname !=
'' and stage.prestart != 0:
3086 ok = project_utilities.start_project(inputdef, prjname,
3087 stage.num_jobs * stage.max_files_per_job,
3088 stage.recur, stage.filelistdef)
3090 print(
'Failed to start project.')
3096 if mixprjname !=
'' and prj_started:
3097 ok = project_utilities.start_project(stage.mixinputdef, mixprjname, 0, 0, stage.filelistdef)
3099 print(
'Failed to start mix project.')
3104 role = project_utilities.get_role()
3105 if project.role !=
'':
3110 command = [
'jobsub_submit']
3115 command.append(
'--group=%s' % project_utilities.get_experiment())
3116 command.append(
'--role=%s' % role)
3117 command.extend(jobsub_workdir_files_args)
3118 if project.server !=
'-' and project.server !=
'':
3119 command.append(
'--jobsub-server=%s' % project.server)
3120 if stage.resource !=
'':
3121 command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3122 elif project.resource !=
'':
3123 command.append(
'--resource-provides=usage_model=%s' % project.resource)
3124 if stage.lines !=
'':
3125 command.append(
'--lines=%s' % stage.lines)
3126 elif project.lines !=
'':
3127 command.append(
'--lines=%s' % project.lines)
3128 if stage.site !=
'':
3129 command.append(
'--site=%s' % stage.site)
3130 if stage.blacklist !=
'':
3131 command.append(
'--blacklist=%s' % stage.blacklist)
3133 command.append(
'--cpu=%d' % stage.cpu)
3134 if stage.disk !=
'':
3135 command.append(
'--disk=%s' % stage.disk)
3136 if stage.memory != 0:
3137 command.append(
'--memory=%d' % stage.memory)
3138 if project.os !=
'':
3139 if stage.singularity == 0:
3140 command.append(
'--OS=%s' % project.os)
3142 p = project_utilities.get_singularity(project.os)
3144 if (stage.num_jobs > 1
or project.force_dag)
and \
3145 (inputdef !=
'' or stage.mixinputdef !=
'') :
3146 command.append(
r"""--lines='+SingularityImage=\"%s\"'""" % p)
3148 command.append(
r"""--lines='+SingularityImage="%s"'""" % p)
3150 raise RuntimeError(
'No singularity image found for %s' % project.os)
3151 if not stage.pubs_output:
3153 command_njobs = stage.num_jobs
3154 command.extend([
'-N',
'%d' % command_njobs])
3156 command_njobs =
min(makeup_count, stage.num_jobs)
3157 command.extend([
'-N',
'%d' % command_njobs])
3159 if stage.inputdef !=
'':
3160 command_njobs = stage.num_jobs
3162 command_njobs = stage.num_jobs
3163 command.extend([
'-N',
'%d' % command_njobs])
3164 if stage.jobsub !=
'':
3165 for word
in stage.jobsub.split():
3166 command.append(word)
3167 opt = project_utilities.default_jobsub_submit_options()
3169 for word
in opt.split():
3170 command.append(word)
3171 if stage.cvmfs != 0:
3172 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3173 if stage.stash != 0:
3174 command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3175 if stage.singularity != 0:
3176 command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3180 workurl =
"file://%s" % workscript
3181 command.append(workurl)
3186 if stage.max_files_per_job != 0:
3187 command_max_files_per_job = stage.max_files_per_job
3188 command.extend([
'--nfile',
'%d' % command_max_files_per_job])
3193 command.extend([
' --group', project_utilities.get_experiment()])
3194 command.extend([
' -g'])
3195 command.extend([
' -c',
'wrapper.fcl'])
3196 command.extend([
' --ups',
','.join(project.ups)])
3197 if project.release_tag !=
'':
3198 command.extend([
' -r', project.release_tag])
3199 command.extend([
' -b', project.release_qual])
3200 if project.local_release_dir !=
'':
3201 command.extend([
' --localdir', project.local_release_dir])
3202 if project.local_release_tar !=
'':
3203 command.extend([
' --localtar', project.local_release_tar])
3204 command.extend([
' --workdir', stage.workdir])
3205 command.extend([
' --outdir', stage.outdir])
3206 command.extend([
' --logdir', stage.logdir])
3207 if stage.dirsize > 0:
3208 command.extend([
' --dirsize',
'%d' % stage.dirsize])
3209 if stage.dirlevels > 0:
3210 command.extend([
' --dirlevels',
'%d' % stage.dirlevels])
3213 command.extend([
' --exe',
':'.join(stage.exe)])
3215 command.extend([
' --exe', stage.exe])
3216 if stage.schema !=
'':
3217 command.extend([
' --sam_schema', stage.schema])
3218 if project.os !=
'':
3219 command.extend([
' --os', project.os])
3223 if not stage.pubs_input
and stage.pubs_output
and stage.output_subruns[0] > 0:
3224 command.extend([
'--process',
'%d' % (stage.output_subruns[0]-1)])
3229 command.append(
'--single')
3231 if stage.inputfile !=
'':
3232 command.extend([
' -s', stage.inputfile])
3233 elif input_list_name !=
'':
3234 command.extend([
' -S', input_list_name])
3235 elif inputdef !=
'':
3236 command.extend([
' --sam_defname', inputdef,
3237 ' --sam_project', prjname])
3239 command.extend([
' --recur'])
3240 if stage.mixinputdef !=
'':
3241 command.extend([
' --mix_defname', stage.mixinputdef,
3242 ' --mix_project', mixprjname])
3243 if stage.inputmode !=
'':
3244 command.extend([
' --inputmode', stage.inputmode])
3245 command.extend([
' -n',
'%d' % stage.num_events])
3246 if stage.inputdef ==
'':
3247 command.extend([
' --njobs',
'%d' % stage.num_jobs ])
3248 for ftype
in stage.datafiletypes:
3249 command.extend([
'--data_file_type', ftype])
3251 command.extend([
' --procmap', procmap])
3254 command.extend([
' --output',
':'.join(stage.output)])
3256 command.extend([
' --output', stage.output])
3257 if stage.TFileName !=
'':
3258 command.extend([
' --TFileName', stage.TFileName])
3259 if stage.init_script !=
'':
3260 command.extend([
' --init-script', os.path.basename(stage.init_script)])
3261 if stage.init_source !=
'':
3262 command.extend([
' --init-source', os.path.basename(stage.init_source)])
3263 if stage.end_script !=
'':
3264 command.extend([
' --end-script', os.path.basename(stage.end_script)])
3265 if stage.mid_source !=
'':
3266 command.extend([
' --mid-source', os.path.basename(stage.mid_source)])
3267 if stage.mid_script !=
'':
3268 command.extend([
' --mid-script', os.path.basename(stage.mid_script)])
3269 if abssetupscript !=
'':
3270 command.extend([
' --init', abssetupscript])
3274 if stage.validate_on_worker == 1:
3275 print(
'Validation will be done on the worker node %d' % stage.validate_on_worker)
3276 command.extend([
' --validate'])
3277 command.extend([
' --declare'])
3279 if type(stage.fclname) ==
type([])
and len(stage.fclname) > 1:
3280 command.extend([
' --maintain_parentage'])
3282 if stage.copy_to_fts == 1:
3283 command.extend([
' --copy'])
3287 if (prjname !=
'' or mixprjname !=
'')
and command_njobs == 1
and not project.force_dag
and not prj_started:
3288 command.extend([
' --sam_start',
3289 ' --sam_station', project_utilities.get_experiment(),
3290 ' --sam_group', project_utilities.get_experiment()])
3299 if command_njobs > 1
or project.force_dag:
3301 dag_prjs.append([inputdef, prjname])
3302 if stage.mixinputdef !=
'':
3303 dag_prjs.append([stage.mixinputdef, mixprjname])
3305 for dag_prj
in dag_prjs:
3310 if workstartname ==
'' or workstopname ==
'':
3311 raise RuntimeError(
'Sam start or stop project script not found.')
3315 start_command = [
'jobsub']
3319 start_command.append(
'--group=%s' % project_utilities.get_experiment())
3320 if setupscript !=
'':
3321 start_command.append(
'-f %s' % setupscript)
3323 if stage.resource !=
'':
3324 start_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3325 elif project.resource !=
'':
3326 start_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3327 if stage.lines !=
'':
3328 start_command.append(
'--lines=%s' % stage.lines)
3329 elif project.lines !=
'':
3330 start_command.append(
'--lines=%s' % project.lines)
3331 if stage.site !=
'':
3332 start_command.append(
'--site=%s' % stage.site)
3333 if stage.blacklist !=
'':
3334 start_command.append(
'--blacklist=%s' % stage.blacklist)
3335 if project.os !=
'':
3336 if stage.singularity == 0:
3337 start_command.append(
'--OS=%s' % project.os)
3339 p = project_utilities.get_singularity(project.os)
3341 start_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3343 raise RuntimeError(
'No singularity image found for %s' % project.os)
3344 if stage.jobsub_start !=
'':
3345 for word
in stage.jobsub_start.split():
3346 start_command.append(word)
3347 opt = project_utilities.default_jobsub_submit_options()
3349 for word
in opt.split():
3350 start_command.append(word)
3351 if stage.cvmfs != 0:
3352 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3353 if stage.stash != 0:
3354 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3355 if stage.singularity != 0:
3356 start_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3360 workstarturl =
"file://%s" % workstartscript
3361 start_command.append(workstarturl)
3365 start_command.extend([
' --sam_station', project_utilities.get_experiment(),
3366 ' --sam_group', project_utilities.get_experiment(),
3367 ' --sam_defname', dag_prj[0],
3368 ' --sam_project', dag_prj[1],
3371 start_command.extend([
' --recur'])
3373 if abssetupscript !=
'':
3374 start_command.extend([
' --init', abssetupscript])
3376 if stage.num_jobs > 0
and stage.max_files_per_job > 0:
3377 start_command.extend([
' --max_files',
'%d' % (stage.num_jobs * stage.max_files_per_job)])
3379 if stage.prestagefraction > 0.:
3380 start_command.extend([
' --prestage_fraction',
'%f' % stage.prestagefraction])
3384 start_command.extend([
' --logdir', stage.logdir])
3388 if not prj_started
or stage.prestagefraction > 0.:
3389 start_commands.append(start_command)
3393 stop_command = [
'jobsub']
3397 stop_command.append(
'--group=%s' % project_utilities.get_experiment())
3398 if setupscript !=
'':
3399 stop_command.append(
'-f %s' % setupscript)
3401 if stage.resource !=
'':
3402 stop_command.append(
'--resource-provides=usage_model=%s' % stage.resource)
3403 elif project.resource !=
'':
3404 stop_command.append(
'--resource-provides=usage_model=%s' % project.resource)
3405 if stage.lines !=
'':
3406 stop_command.append(
'--lines=%s' % stage.lines)
3407 elif project.lines !=
'':
3408 stop_command.append(
'--lines=%s' % project.lines)
3409 if stage.site !=
'':
3410 stop_command.append(
'--site=%s' % stage.site)
3411 if stage.blacklist !=
'':
3412 stop_command.append(
'--blacklist=%s' % stage.blacklist)
3413 if project.os !=
'':
3414 if stage.singularity == 0:
3415 stop_command.append(
'--OS=%s' % project.os)
3417 p = project_utilities.get_singularity(project.os)
3419 stop_command.append(
'--lines=\'+SingularityImage=\\"%s\\"\'' % p)
3421 raise RuntimeError(
'No singularity image found for %s' % project.os)
3422 if stage.jobsub_start !=
'':
3423 for word
in stage.jobsub_start.split():
3424 stop_command.append(word)
3425 opt = project_utilities.default_jobsub_submit_options()
3427 for word
in opt.split():
3428 stop_command.append(word)
3429 if stage.cvmfs != 0:
3430 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_opensciencegrid_org==true)\'' % project_utilities.get_experiment())
3431 if stage.stash != 0:
3432 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_CVMFS_%s_osgstorage_org==true)\'' % project_utilities.get_experiment())
3433 if stage.singularity != 0:
3434 stop_command.append(
'--append_condor_requirements=\'(TARGET.HAS_SINGULARITY=?=true)\'')
3438 workstopurl =
"file://%s" % workstopscript
3439 stop_command.append(workstopurl)
3443 stop_command.extend([
' --sam_station', project_utilities.get_experiment(),
3444 ' --sam_project', dag_prj[1],
3449 stop_command.extend([
' --logdir', stage.logdir])
3451 if abssetupscript !=
'':
3452 stop_command.extend([
' --init', abssetupscript])
3456 stop_commands.append(stop_command)
3458 if len(start_commands) > 0
or len(stop_commands) > 0:
3462 dagfilepath = os.path.join(tmpdir,
'submit.dag')
3464 dag.write(
'<serial>\n')
3468 if len(start_commands) > 0:
3469 dag.write(
'\n<parallel>\n\n')
3470 for start_command
in start_commands:
3472 for word
in start_command:
3476 if word[:6] ==
'jobsub':
3480 dag.write(
'</parallel>\n')
3484 dag.write(
'\n<parallel>\n\n')
3485 for process
in range(command_njobs):
3489 for word
in command:
3499 if word[:6] ==
'jobsub':
3501 if word[:7] ==
'--role=':
3503 if word.startswith(
'--jobsub-server='):
3505 word = project_utilities.dollar_escape(word)
3507 if word[:6] ==
'jobsub':
3510 dag.write(
' --process %d\n' % process)
3512 dag.write(
'\n</parallel>\n')
3516 if len(stop_commands) > 0:
3517 dag.write(
'\n<parallel>\n\n')
3518 for stop_command
in stop_commands:
3520 for word
in stop_command:
3524 if word[:6] ==
'jobsub':
3528 dag.write(
'</parallel>\n')
3532 dag.write(
'\n</serial>\n')
3537 command = [
'jobsub_submit_dag']
3538 command.append(
'--group=%s' % project_utilities.get_experiment())
3539 if project.server !=
'-' and project.server !=
'':
3540 command.append(
'--jobsub-server=%s' % project.server)
3541 command.append(
'--role=%s' % role)
3542 dagfileurl =
'file://'+ dagfilepath
3543 command.append(dagfileurl)
3545 checked_file = os.path.join(stage.bookdir,
'checked')
3549 submit_timeout = 3600000
3551 submit_timeout += 1.0 * command_njobs
3552 if stage.jobsub_timeout > submit_timeout:
3553 submit_timeout = stage.jobsub_timeout
3561 print(
'Invoke jobsub_submit')
3563 print(
' '.join(command))
3566 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3567 thread = threading.Thread(target=project_utilities.wait_for_subprocess, args=[jobinfo, q])
3569 thread.join(timeout=submit_timeout)
3570 if thread.is_alive():
3576 if larbatch_posix.exists(checked_file):
3577 larbatch_posix.remove(checked_file)
3578 if larbatch_posix.isdir(tmpdir):
3579 larbatch_posix.rmtree(tmpdir)
3580 if larbatch_posix.isdir(tmpworkdir):
3581 larbatch_posix.rmtree(tmpworkdir)
3583 raise JobsubError(command, rc, jobout, joberr)
3584 for line
in jobout.split(
'\n'):
3585 if "JobsubJobId" in line:
3586 jobid = line.strip().
split()[-1]
3588 raise JobsubError(command, rc, jobout, joberr)
3589 print(
'jobsub_submit finished.')
3595 if makeup_count > 0:
3597 print(
' '.join(command))
3600 jobinfo = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
3601 thread = threading.Thread(target=project_utilities.wait_for_subprocess,
3604 thread.join(timeout=submit_timeout)
3605 if thread.is_alive():
3611 if larbatch_posix.exists(checked_file):
3612 larbatch_posix.remove(checked_file)
3613 if larbatch_posix.isdir(tmpdir):
3614 larbatch_posix.rmtree(tmpdir)
3615 if larbatch_posix.isdir(tmpworkdir):
3616 larbatch_posix.rmtree(tmpworkdir)
3618 raise JobsubError(command, rc, jobout, joberr)
3619 for line
in jobout.split(
'\n'):
3620 if "JobsubJobId" in line:
3621 jobid = line.strip().
split()[-1]
3623 raise JobsubError(command, rc, jobout, joberr)
3625 print(
'Makeup action aborted because makeup job count is zero.')
3634 def dosubmit(project, stage, makeup=False, recur=False, dryrun=False):
3638 project_utilities.test_kca()
3642 larbatch_utilities.test_jobsub()
3646 ok = stage.checksubmit()
3648 print(
'No jobs submitted.')
3655 if stage.pubs_output
and not stage.dynamic:
3656 if larbatch_posix.exists(stage.workdir):
3657 larbatch_posix.rmtree(stage.workdir)
3658 if larbatch_posix.exists(stage.outdir):
3659 larbatch_posix.rmtree(stage.outdir)
3660 if larbatch_posix.exists(stage.logdir):
3661 larbatch_posix.rmtree(stage.logdir)
3662 if larbatch_posix.exists(stage.bookdir):
3663 larbatch_posix.rmtree(stage.bookdir)
3674 ok = stage.checkinput(checkdef=
True)
3676 print(
'No jobs submitted.')
3681 if not makeup
and not recur
and not stage.dynamic:
3682 if len(larbatch_posix.listdir(stage.outdir)) != 0:
3683 raise RuntimeError(
'Output directory %s is not empty.' % stage.outdir)
3684 if len(larbatch_posix.listdir(stage.logdir)) != 0:
3685 raise RuntimeError(
'Log directory %s is not empty.' % stage.logdir)
3686 if len(larbatch_posix.listdir(stage.bookdir)) != 0:
3687 raise RuntimeError(
'Log directory %s is not empty.' % stage.bookdir)
3691 jobid =
dojobsub(project, stage, makeup, recur, dryrun)
3695 jobids_filename = os.path.join(stage.bookdir,
'jobids.list')
3697 if larbatch_posix.exists(jobids_filename):
3698 lines = larbatch_posix.readlines(jobids_filename)
3704 jobids.append(jobid)
3706 jobid_file =
safeopen(jobids_filename)
3707 for jobid
in jobids:
3708 jobid_file.write(
'%s\n' % jobid)
3724 hnlist = os.path.join(stage.bookdir,
'filesana.list')
3725 if larbatch_posix.exists(hnlist):
3726 hlist = larbatch_posix.readlines(hnlist)
3728 raise RuntimeError(
'No filesana.list file found %s, run project.py --checkana' % hnlist)
3730 histurlsname_temp =
'histurls.list' 3731 histurls =
safeopen(histurlsname_temp)
3734 histurls.write(
'%s\n' % hist)
3738 name = os.path.join(stage.outdir,
'anahist.root')
3739 if name[0:6] ==
'/pnfs/':
3740 tempdir =
'%s/mergentuple_%d_%d' % (project_utilities.get_scratch_dir(),
3743 if not larbatch_posix.isdir(tempdir):
3744 larbatch_posix.makedirs(tempdir)
3745 name_temp =
'%s/anahist.root' % tempdir
3750 mergecom =
"hadd -T" 3754 mergecom = stage.merge
3756 print(
"Merging %d root files using %s." % (len(hlist), mergecom))
3758 if larbatch_posix.exists(name_temp):
3759 larbatch_posix.remove(name_temp)
3760 comlist = mergecom.split()
3761 comlist.extend([
"-f",
"-k", name_temp,
'@' + histurlsname_temp])
3762 rc = subprocess.call(comlist, stdout=sys.stdout, stderr=sys.stderr)
3764 print(
"%s exit status %d" % (mergecom, rc))
3765 if name != name_temp:
3766 if larbatch_posix.exists(name):
3767 larbatch_posix.remove(name)
3768 if larbatch_posix.exists(name_temp):
3771 larbatch_posix.copy(name_temp, name)
3772 larbatch_posix.rmtree(tempdir)
3773 larbatch_posix.remove(histurlsname_temp)
3781 stage_has_input = stage.inputfile !=
'' or stage.inputlist !=
'' or stage.inputdef !=
'' 3782 if not stage_has_input:
3783 raise RuntimeError(
'No auditing for generator stage.')
3789 if stage.defname !=
'':
3790 query =
'isparentof: (defname: %s) and availability: anylocation' %(stage.defname)
3792 outparentlist = samweb.listFiles(dimensions=query)
3793 outputlist = samweb.listFiles(defname=stage.defname)
3795 raise RuntimeError(
'Error accessing sam information for definition %s.\nDoes definition exist?' % stage.defname)
3797 raise RuntimeError(
'Output definition not found.')
3803 if stage.inputdef !=
'':
3805 inputlist=samweb.listFiles(defname=stage.inputdef)
3806 elif stage.inputlist !=
'':
3808 if larbatch_posix.exists(stage.inputlist):
3809 ilist = larbatch_posix.readlines(stage.inputlist)
3812 inputlist.append(os.path.basename(i.strip()))
3814 raise RuntimeError(
'Input definition and/or input list does not exist.')
3816 difflist = set(inputlist)^set(outparentlist)
3819 for item
in difflist:
3820 if item
in inputlist:
3823 missingfilelistname = os.path.join(stage.bookdir,
'missingfiles.list')
3824 missingfilelist =
safeopen(missingfilelistname)
3826 missingfilelist.write(
"%s\n" %item)
3827 elif item
in outparentlist:
3829 childcmd =
'samweb list-files "ischildof: (file_name=%s) and availability: physical"' %(item)
3830 children =
convert_str(subprocess.check_output(childcmd, shell=
True)).splitlines()
3831 rmfile = list(set(children) & set(outputlist))[0]
3834 fnlist = os.path.join(stage.bookdir,
'files.list')
3835 if larbatch_posix.exists(fnlist):
3836 flist = larbatch_posix.readlines(fnlist)
3839 slist.append(line.split()[0])
3841 raise RuntimeError(
'No files.list file found %s, run project.py --check' % fnlist)
3845 sdict = {
'content_status':
'bad'}
3846 project_utilities.test_kca()
3847 samweb.modifyFileMetadata(rmfile, sdict)
3848 print(
'\nDeclaring the status of the following file as bad:', rmfile)
3853 fn = [x
for x
in slist
if os.path.basename(x.strip()) != rmfile]
3856 thefile.write(
"%s\n" % item)
3859 print(
"Everything in order.")
3862 print(
'Missing parent file(s) = ', mc)
3863 print(
'Extra parent file(s) = ',me)
3866 missingfilelist.close()
3867 print(
"Creating missingfiles.list in the output directory....done!")
3871 print(
"For extra parent files, files.list redefined and content status declared as bad in SAM...done!")
3878 filename = sys.argv[0]
3879 file =
open(filename,
'r') 3883 for line
in file.readlines():
3884 if line[2:12] ==
'project.py':
3886 elif line[0:6] ==
'######' and doprint:
3890 print(line[2:], end=
' ')
3910 normxmlfile = xmlfile
3914 if xmlfile.find(
':') < 0
and \
3915 not xmlfile.startswith(
'/')
and \
3916 not xmlfile.startswith(
'./')
and \
3917 not xmlfile.startswith(
'../')
and \
3923 dirs = [os.getcwd()]
3927 if 'XMLPATH' in os.environ:
3928 dirs.extend(os.environ[
'XMLPATH'].
split(
':'))
3933 xmlpath = os.path.join(dir, xmlfile)
3934 if os.path.exists(xmlpath):
3935 normxmlfile = xmlpath
3946 filename = sys.argv[0]
3947 file =
open(filename,
'r') 3951 for line
in file.readlines():
3952 if line[2:20] ==
'XML file structure':
3954 elif line[0:6] ==
'######' and doprint:
3958 print(line[2:], end=
' ')
4014 check_declarations = 0
4015 check_declarations_ana = 0
4016 test_declarations = 0
4017 test_declarations_ana = 0
4018 check_definition = 0
4019 check_definition_ana = 0
4021 test_definition_ana = 0
4023 add_locations_ana = 0
4025 check_locations_ana = 0
4031 clean_locations_ana = 0
4032 remove_locations = 0
4033 remove_locations_ana = 0
4036 while len(args) > 0:
4037 if args[0] ==
'-h' or args[0] ==
'--help' :
4040 elif args[0] ==
'-xh' or args[0] ==
'--xmlhelp' :
4043 elif args[0] ==
'--xml' and len(args) > 1:
4046 elif args[0] ==
'--project' and len(args) > 1:
4047 projectname = args[1]
4049 elif args[0] ==
'--stage' and len(args) > 1:
4050 stagenames = args[1].
split(
',')
4052 elif args[0] ==
'--tmpdir' and len(args) > 1:
4053 os.environ[
'TMPDIR'] = args[1]
4055 elif args[0] ==
'--lines' and len(args) > 1:
4058 elif args[0] ==
'--site' and len(args) > 1:
4061 elif args[0] ==
'--cpu' and len(args) > 1:
4064 elif args[0] ==
'--disk' and len(args) > 1:
4067 elif args[0] ==
'--memory' and len(args) > 1:
4068 memory =
int(args[1])
4070 elif args[0] ==
'--inputdef' and len(args) > 1:
4073 elif args[0] ==
'--submit':
4076 elif args[0] ==
'--recur':
4079 elif args[0] ==
'--pubs' and len(args) > 2:
4081 pubs_run =
int(args[1])
4082 pubs_subruns = project_utilities.parseInt(args[2])
4084 if len(args) > 0
and args[0] !=
'' and args[0][0] !=
'-':
4085 pubs_version =
int(args[0])
4087 elif args[0] ==
'--check':
4090 elif args[0] ==
'--checkana':
4093 elif args[0] ==
'--shorten':
4096 elif args[0] ==
'--fetchlog':
4099 elif args[0] ==
'--merge':
4102 elif args[0] ==
'--mergehist':
4105 elif args[0] ==
'--mergentuple':
4108 elif args[0] ==
'--audit':
4111 elif args[0] ==
'--status':
4114 elif args[0] ==
'--makeup':
4117 elif args[0] ==
'--clean':
4120 elif args[0] ==
'--clean_one':
4123 elif args[0] ==
'--dump_project':
4126 elif args[0] ==
'--dump_stage':
4129 elif args[0] ==
'--dryrun':
4132 elif args[0] ==
'--nocheck':
4135 elif args[0] ==
'--outdir':
4138 elif args[0] ==
'--logdir':
4141 elif args[0] ==
'--workdir':
4144 elif args[0] ==
'--bookdir':
4147 elif args[0] ==
'--fcl':
4150 elif args[0] ==
'--defname':
4153 elif args[0] ==
'--input_files':
4156 elif args[0] ==
'--check_submit':
4159 elif args[0] ==
'--check_input':
4162 elif args[0] ==
'--declare':
4165 elif args[0] ==
'--declare_ana':
4168 elif args[0] ==
'--define':
4171 elif args[0] ==
'--define_ana':
4174 elif args[0] ==
'--undefine':
4177 elif args[0] ==
'--check_declarations':
4178 check_declarations = 1
4180 elif args[0] ==
'--check_declarations_ana':
4181 check_declarations_ana = 1
4183 elif args[0] ==
'--test_declarations':
4184 test_declarations = 1
4186 elif args[0] ==
'--test_declarations_ana':
4187 test_declarations_ana = 1
4189 elif args[0] ==
'--check_definition':
4190 check_definition = 1
4192 elif args[0] ==
'--check_definition_ana':
4193 check_definition_ana = 1
4195 elif args[0] ==
'--test_definition':
4198 elif args[0] ==
'--test_definition_ana':
4199 test_definition_ana = 1
4201 elif args[0] ==
'--add_locations':
4204 elif args[0] ==
'--add_locations_ana':
4205 add_locations_ana = 1
4207 elif args[0] ==
'--check_locations':
4210 elif args[0] ==
'--check_locations_ana':
4211 check_locations_ana = 1
4213 elif args[0] ==
'--upload':
4216 elif args[0] ==
'--upload_ana':
4219 elif args[0] ==
'--check_tape':
4222 elif args[0] ==
'--check_tape_ana':
4225 elif args[0] ==
'--clean_locations':
4228 elif args[0] ==
'--clean_locations_ana':
4229 clean_locations_ana = 1
4231 elif args[0] ==
'--remove_locations':
4232 remove_locations = 1
4234 elif args[0] ==
'--remove_locations_ana':
4235 remove_locations_ana = 1
4238 print(
'Unknown option %s' % args[0])
4248 print(
'No xml file specified. Type "project.py -h" for help.')
4254 num_action = submit + check + checkana + fetchlog + merge + mergehist + mergentuple + audit + stage_status + makeup + define + define_ana + undefine + declare + declare_ana
4256 print(
'More than one action was specified.')
4265 for stagename
in stagenames:
4268 if projectname ==
'':
4269 projectname = project.name
4271 raise RuntimeError(
'No project selected.\n')
4276 for stagename
in stagenames:
4277 docleanx(projects, projectname, stagename, clean_descendants =
True)
4282 for stagename
in stagenames:
4283 docleanx(projects, projectname, stagename, clean_descendants =
False)
4295 for stagename
in stagenames:
4296 stage = project.get_stage(stagename)
4297 stages[stagename] = stage
4310 stage.memory = memory
4312 stage.inputdef = inputdef
4313 stage.inputfile =
'' 4314 stage.inputlist =
'' 4321 stage.pubsify_input(pubs_run, pubs_subruns, pubs_version)
4322 stage.pubsify_output(pubs_run, pubs_subruns, pubs_version)
4326 if stage.recur
and stage.inputdef !=
'' and stage.basedef !=
'':
4333 desc = samweb.descDefinition(defname=stage.inputdef)
4335 except samweb_cli.exceptions.DefinitionNotFound:
4342 project_utilities.test_kca()
4350 project_wildcard =
'%s_%%' % samweb.makeProjectName(stage.inputdef).rsplit(
'_',1)[0]
4351 if stage.recurtype ==
'snapshot':
4352 dim =
'defname: %s minus snapshot_for_project_name %s' % \
4353 (stage.basedef, project_wildcard)
4354 elif stage.recurtype ==
'consumed':
4355 dim =
'defname: %s minus (project_name %s and consumed_status consumed)' % \
4356 (stage.basedef, project_wildcard)
4358 elif stage.recurtype ==
'child':
4364 if stage.data_stream !=
None and len(stage.data_stream) > 0:
4365 nstream = len(stage.data_stream)
4368 for istream
in range(nstream):
4369 idim = project_utilities.dimensions_datastream(project, stage,
4370 ana=
False, index=istream)
4371 if idim.find(
'anylocation') > 0:
4372 idim = idim.replace(
'anylocation',
'physical')
4374 idim +=
' with availability physical' 4378 dim +=
'(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4380 if stage.activebase !=
'':
4381 activedef =
'%s_active' % stage.activebase
4382 waitdef =
'%s_wait' % stage.activebase
4383 dim +=
' minus defname: %s' % activedef
4384 dim +=
' minus defname: %s' % waitdef
4385 project_utilities.makeDummyDef(activedef)
4386 project_utilities.makeDummyDef(waitdef)
4388 elif stage.recurtype ==
'anachild':
4394 if stage.ana_data_stream !=
None and len(stage.ana_data_stream) > 0:
4395 nstream = len(stage.ana_data_stream)
4398 for istream
in range(nstream):
4399 idim = project_utilities.dimensions_datastream(project, stage,
4400 ana=
True, index=istream)
4401 if idim.find(
'anylocation') > 0:
4402 idim = idim.replace(
'anylocation',
'physical')
4404 idim +=
' with availability physical' 4408 dim +=
'(defname: %s minus isparentof:( %s ) )' % (stage.basedef, idim)
4410 if stage.activebase !=
'':
4411 activedef =
'%s_active' % stage.activebase
4412 waitdef =
'%s_wait' % stage.activebase
4413 dim +=
' minus defname: %s' % activedef
4414 dim +=
' minus defname: %s' % waitdef
4415 project_utilities.makeDummyDef(activedef)
4416 project_utilities.makeDummyDef(waitdef)
4418 elif stage.recurtype !=
'' and stage.recurtype !=
'none':
4419 raise RuntimeError(
'Unknown recursive type %s.' % stage.recurtype)
4423 if stage.recurlimit != 0:
4424 dim +=
' with limit %d' % stage.recurlimit
4428 print(
'Creating recursive dataset definition %s' % stage.inputdef)
4429 project_utilities.test_kca()
4430 samweb.createDefinition(defname=stage.inputdef, dims=dim)
4436 for stagename
in stagenames:
4437 print(
'Stage %s:' % stagename)
4438 stage = stages[stagename]
4449 for stagename
in stagenames:
4450 print(
'Stage %s:' % stagename)
4451 stage = stages[stagename]
4457 for stagename
in stagenames:
4458 print(
'Stage %s:' % stagename)
4459 stage = stages[stagename]
4465 for stagename
in stagenames:
4466 print(
'Stage %s:' % stagename)
4467 stage = stages[stagename]
4468 print(stage.workdir)
4473 for stagename
in stagenames:
4474 print(
'Stage %s:' % stagename)
4475 stage = stages[stagename]
4476 print(stage.bookdir)
4481 for stagename
in stagenames:
4482 print(
'Stage %s:' % stagename)
4483 stage = stages[stagename]
4484 if stage.defname !=
'':
4485 print(stage.defname)
4490 for stagename
in stagenames:
4491 print(
'Stage %s:' % stagename)
4492 stage = stages[stagename]
4494 for input_file
in input_files:
4500 for stagename
in stagenames:
4501 print(
'Stage %s:' % stagename)
4502 stage = stages[stagename]
4508 for stagename
in stagenames:
4509 print(
'Stage %s:' % stagename)
4510 stage = stages[stagename]
4511 stage.checkinput(checkdef=
True)
4516 for stagename
in stagenames:
4517 print(
'Stage %s:' % stagename)
4518 stage = stages[stagename]
4525 if submit
or makeup:
4529 for stagename
in stagenames:
4530 print(
'Stage %s:' % stagename)
4532 if project_utilities.check_running(xmlfile, stagename):
4533 print(
'Skipping job submission because similar job submission process is running.')
4535 stage = stages[stagename]
4536 dosubmit(project, stage, makeup, stage.recur, dryrun)
4538 if check
or checkana:
4542 for stagename
in stagenames:
4543 print(
'Stage %s:' % stagename)
4544 stage = stages[stagename]
4545 docheck(project, stage, checkana
or stage.ana, stage.validate_on_worker)
4551 for stagename
in stagenames:
4552 print(
'Stage %s:' % stagename)
4553 stage = stages[stagename]
4556 if mergehist
or mergentuple
or merge:
4561 for stagename
in stagenames:
4562 print(
'Stage %s:' % stagename)
4563 stage = stages[stagename]
4564 domerge(stage, mergehist, mergentuple)
4570 for stagename
in stagenames:
4571 print(
'Stage %s:' % stagename)
4572 stage = stages[stagename]
4575 if check_definition
or define:
4579 for stagename
in stagenames:
4580 print(
'Stage %s:' % stagename)
4581 stage = stages[stagename]
4583 if stage.ana_defname ==
'':
4584 print(
'No sam analysis dataset definition name specified for this stage.')
4586 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
4589 if stage.defname ==
'':
4590 print(
'No sam dataset definition name specified for this stage.')
4592 dim = project_utilities.dimensions_datastream(project, stage, ana=
False)
4595 if check_definition_ana
or define_ana:
4599 for stagename
in stagenames:
4600 print(
'Stage %s:' % stagename)
4601 stage = stages[stagename]
4602 if stage.ana_defname ==
'':
4603 print(
'No sam analysis dataset definition name specified for this stage.')
4605 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
4612 for stagename
in stagenames:
4613 print(
'Stage %s:' % stagename)
4614 stage = stages[stagename]
4616 if stage.ana_defname ==
'':
4617 print(
'No sam dataset definition name specified for this stage.')
4621 if stage.defname ==
'':
4622 print(
'No sam dataset definition name specified for this stage.')
4626 if test_definition_ana:
4630 for stagename
in stagenames:
4631 print(
'Stage %s:' % stagename)
4632 stage = stages[stagename]
4633 if stage.ana_defname ==
'':
4634 print(
'No sam dataset definition name specified for this stage.')
4642 for stagename
in stagenames:
4643 print(
'Stage %s:' % stagename)
4644 stage = stages[stagename]
4645 if stage.defname ==
'':
4646 print(
'No sam dataset definition name specified for this stage.')
4650 if check_declarations
or declare:
4654 for stagename
in stagenames:
4655 print(
'Stage %s:' % stagename)
4656 stage = stages[stagename]
4659 if check_declarations_ana
or declare_ana:
4663 for stagename
in stagenames:
4664 print(
'Stage %s:' % stagename)
4665 stage = stages[stagename]
4668 if test_declarations:
4672 for stagename
in stagenames:
4673 print(
'Stage %s:' % stagename)
4674 stage = stages[stagename]
4675 dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4678 if test_declarations_ana:
4682 for stagename
in stagenames:
4683 print(
'Stage %s:' % stagename)
4684 stage = stages[stagename]
4685 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
4688 if check_locations
or add_locations
or clean_locations
or remove_locations
or upload:
4692 for stagename
in stagenames:
4693 print(
'Stage %s:' % stagename)
4694 stage = stages[stagename]
4695 dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4697 add_locations, clean_locations, remove_locations,
4700 if check_locations_ana
or add_locations_ana
or clean_locations_ana
or \
4701 remove_locations_ana
or upload_ana:
4705 for stagename
in stagenames:
4706 print(
'Stage %s:' % stagename)
4707 stage = stages[stagename]
4708 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
4710 add_locations_ana, clean_locations_ana, remove_locations_ana,
4717 for stagename
in stagenames:
4718 print(
'Stage %s:' % stagename)
4719 stage = stages[stagename]
4720 dim = project_utilities.dimensions_datastream(project, stage, ana=stage.ana)
4727 for stagename
in stagenames:
4728 print(
'Stage %s:' % stagename)
4729 stage = stages[stagename]
4730 dim = project_utilities.dimensions_datastream(project, stage, ana=
True)
4740 if larbatch_posix.exists(destination):
4741 larbatch_posix.remove(destination)
4742 file = larbatch_posix.open(destination,
'w')
4753 fileList = project_utilities.saferead(fileName)
4759 if len(fileList) > 0:
4760 for line
in fileList:
4761 returnArray.append(line.strip())
4770 if __name__ ==
'__main__':
4771 sys.exit(
main(sys.argv))
4774 inp = open(stage.inputlist,"r") 4776 columns = line.split(
"/")
4777 columns = [col.strip()
for col
in columns]
4778 inputlist.append(columns[8])
def next_stage(projects, stagename, circular=False)
def docleanx(projects, projectname, stagename, clean_descendants=True)
def dotest_declarations(dim)
def doquickcheck(project, stage, ana)
def dotest_definition(defname)
def previous_stage(projects, stagename, circular=False)
int open(const char *, int)
Opens a file descriptor.
def get_pubs_stage(xmlfile, projectname, stagename, run, subruns, version=None)
size_t write(int, const char *, size_t)
Writes count bytes from buf to the filedescriptor fd.
def dofetchlog(project, stage)
def dojobsub(project, stage, makeup, recur, dryrun)
def get_project(xmlfile, projectname='', stagename='', check=True)
def docheck_definition(defname, dim, define)
def get_input_files(stage)
int close(int)
Closes the file descriptor fd.
def domerge(stage, mergehist, mergentuple)
def dosubmit(project, stage, makeup=False, recur=False, dryrun=False)
def docheck_declarations(logdir, outdir, declare, ana=False)
def docheck_locations(dim, outdir, add, clean, remove, upload)
def find_projects(element, check=True)
def select_project(projects, projectname, stagename)
def safeopen(destination)
def check_root_file(path, logdir)
T min(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
def check_root(outdir, logdir, data_file_types)
def docheck(project, stage, ana, quick=False)
void split(std::string const &s, char c, OutIter dest)
def get_projects(xmlfile, check=True)