extractor_prod.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys, getopt
3 import os
4 from subprocess import Popen, PIPE
5 import threading
6 import queue
7 import json
8 import abc
9 import ifdh
10 import argparse
11 import samweb_client
12 import collections
13 # Temporary solution for crc computation until we can move the calculation into code inside duneutil.
14 # Then we should be able to remove the larbatch dependency altogether.
15 import root_metadata
16 
17 # Function to wait for a subprocess to finish and fetch return code,
18 # standard output, and standard error.
19 # Call this function like this:
20 #
21 # q = Queue.Queue()
22 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
23 # wait_for_subprocess(jobinfo, q)
24 # rc = q.get() # Return code.
25 # jobout = q.get() # Standard output
26 # joberr = q.get() # Standard error
27 
28 """extractor_dict.py
29 Purpose: To extract metadata from output file on worker node, generate JSON file
30 """
31 
32 
33 class MetaData(object):
34  """Base class to hold / interpret general metadata"""
35  __metaclass__ = abc.ABCMeta
36 
37  @abc.abstractmethod
38  def __init__(self, inputfile):
39  self.inputfile = inputfile
40 
42  """Extract metadata from inputfile into a pipe for further processing."""
43  local = self.inputfile
44  if len(local) > 0:
45  proc = Popen(["sam_metadata_dumper", local], stdout=PIPE,
46  stderr=PIPE)
47  else:
48  url = inputfile
49  proc = Popen(["sam_metadata_dumper", url], stdout=PIPE,
50  stderr=PIPE)
51  if len(local) > 0 and local != self.inputfile:
52  os.remove(local)
53  return proc
54 
55  def get_job(self, proc):
56  """Run the proc in a 60-sec timeout queue, return stdout, stderr"""
57  q = queue.Queue()
58  thread = threading.Thread(target=self.wait_for_subprocess, args=[proc, q])
59  thread.start()
60  thread.join(timeout=7200)
61  if thread.is_alive():
62  print('Terminating subprocess because of timeout.')
63  proc.terminate()
64  thread.join()
65  rc = q.get()
66  jobout = q.get()
67  joberr = q.get()
68  if rc != 0:
69  raise RuntimeError('sam_metadata_dumper returned nonzero exit status {}.'.format(rc))
70  return jobout, joberr
71 
72  @staticmethod
73  def wait_for_subprocess(jobinfo, q):
74  """Run jobinfo, put the return code, stdout, and stderr into a queue"""
75  jobout, joberr = jobinfo.communicate()
76  rc = jobinfo.poll()
77  for item in (rc, jobout, joberr):
78  q.put(item)
79  return
80 
81  @staticmethod
82  def mdart_gen(jobtuple):
83  """Take Jobout and Joberr (in jobtuple) and return mdart object from that"""
84 ### mdtext = ''.join(line.replace(", ,", ",") for line in jobtuple[0].split('\n') if line[-3:-1] != ' ,')
85  mdtext = ''.join(line.replace(", ,", ",") for line in jobtuple[0].decode().split('\n') if line[-3:-1] != ' ,')
86  mdtop = json.JSONDecoder().decode(mdtext)
87  if len(list(mdtop.keys())) == 0:
88  print('No top-level key in extracted metadata.')
89  sys.exit(1)
90  file_name = list(mdtop.keys())[0]
91  return mdtop[file_name]
92 
93  @staticmethod
95  """If there's no application key in md dict, create the key with a blank dictionary.
96  Then return md['application'], along with mdval"""
97  if 'application' not in md:
98  md['application'] = {}
99  return md['application']
100 
101 
102 
104 
105  def __init__(self):
106  self.expname = ''
107 
108  def metadataList(self):
109  return [self.expname + elt for elt in ('lbneMCGenerators','lbneMCName','lbneMCDetectorType','StageName')]
110 
111  def translateKey(self, key):
112  if key == 'lbneMCDetectorType':
113  return 'lbne_MC.detector_type'
114  elif key == 'StageName':
115  return 'lbne_MC.miscellaneous'
116  else:
117  prefix = key[:4]
118  stem = key[4:]
119  projNoun = stem.split("MC")
120  return prefix + "_MC." + projNoun[1]
121 
122 
123 
125  """Class to hold/interpret experiment-specific metadata"""
126  def __init__(self, expname, inputfile):
127  MetaData.__init__(self, inputfile)
128  self.expname = expname
129  #self.exp_md_keyfile = expname + '_metadata_key'
130 # try:
131 # #translateMetaData = __import__("experiment_utilities", "MetaDataKey")
132 # from experiment_utilities import MetaDataKey
133 # except ImportError:
134 # print("You have not defined an experiment-specific metadata and key-translating module in experiment_utilities. Exiting")
135 # raise
136 #
137  metaDataModule = MetaDataKey()
138  self.metadataList, self.translateKeyf = metaDataModule.metadataList(), metaDataModule.translateKey
139 
140  def translateKey(self, key):
141  """Returns the output of the imported translateKey function (as translateKeyf) called on key"""
142  return self.translateKeyf(key)
143 
144  def md_gen(self, mdart, md0={}):
145  """Loop through art metdata, generate metadata dictionary"""
146  # define an empty python dictionary which will hold sam metadata.
147  # Some fields can be copied directly from art metadata to sam metadata.
148  # Other fields require conversion.
149  md = {}
150 
151  # Loop over art metadata.
152  for mdkey in list(mdart.keys()):
153  mdval = mdart[mdkey]
154 
155  # Skip some art-specific fields.
156  if mdkey == 'file_format_version':
157  pass
158  elif mdkey == 'file_format_era':
159  pass
160 
161  # Ignore primary run_type field (if any).
162  # Instead, get run_type from runs field.
163 
164  elif mdkey == 'run_type':
165  pass
166  elif mdkey == 'application.version':
167  pass
168  elif mdkey == 'application.family':
169  pass
170  elif mdkey == 'application.name':
171  pass
172 
173  # do not Ignore data_stream any longer.
174 
175  elif mdkey == 'data_stream':
176  if 'dunemeta.data_stream' not in list(mdart.keys()): # only use this data_stream value if dunemeta.data_stream is not present
177  md['data_stream'] = mdval
178 
179  # Ignore process_name as of 2018-09-22 because it is not in SAM yet
180  elif mdkey == 'process_name':
181 # md['process_name'] = mdval
182  pass
183  # Application family/name/version.
184 
185  elif mdkey == 'applicationFamily':
186  if not md.has_key('application'):
187  md['application'] = {}
188  md['application']['family'] = mdval
189  elif mdkey == 'StageName' or mdkey == 'applicationName':
190  if not md.has_key('application'):
191  md['application'] = {}
192  md['application']['name'] = mdval
193  elif mdkey == 'applicationVersion':
194  if not md.has_key('application'):
195  md['application'] = {}
196  md['application']['version'] = mdval
197 
198  # Parents.
199 
200  elif mdkey == 'parents':
201  mdparents = []
202  if not args.strip_parents:
203  for parent in mdval:
204  parent_dict = {'file_name': parent}
205  mdparents.append(parent_dict)
206  md['parents'] = mdparents
207 
208  # Other fields where the key or value requires minor conversion.
209 
210  elif mdkey == 'art.first_event':
211  md[mdkey] = mdval[2]
212  elif mdkey == 'art.last_event':
213  md[mdkey] = mdval[2]
214  elif mdkey == 'first_event':
215  md[mdkey] = mdval
216  elif mdkey == 'last_event':
217  md[mdkey] = mdval
218  elif mdkey == 'lbneMCGenerators':
219  md['lbne_MC.generators'] = mdval
220  elif mdkey == 'lbneMCOscillationP':
221  md['lbne_MC.oscillationP'] = mdval
222  elif mdkey == 'lbneMCTriggerListVersion':
223  md['lbne_MC.trigger-list-version'] = mdval
224  elif mdkey == 'lbneMCBeamEnergy':
225  md['lbne_MC.beam_energy'] = mdval
226  elif mdkey == 'lbneMCBeamFluxID':
227  md['lbne_MC.beam_flux_ID'] = mdval
228  elif mdkey == 'lbneMCName':
229  md['lbne_MC.name'] = mdval
230  elif mdkey == 'lbneMCDetectorType':
231  md['lbne_MC.detector_type'] = mdval
232  elif mdkey == 'lbneMCNeutrinoFlavors':
233  md['lbne_MC.neutrino_flavors'] = mdval
234  elif mdkey == 'lbneMCMassHierarchy':
235  md['lbne_MC.mass_hierarchy'] = mdval
236  elif mdkey == 'lbneMCMiscellaneous':
237  md['lbne_MC.miscellaneous'] = mdval
238  elif mdkey == 'lbneMCGeometryVersion':
239  md['lbne_MC.geometry_version'] = mdval
240  elif mdkey == 'lbneMCOverlay':
241  md['lbne_MC.overlay'] = mdval
242  elif mdkey == 'lbneDataRunMode':
243  md['lbne_data.run_mode'] = mdval
244  elif mdkey == 'lbneDataDetectorType':
245  md['lbne_data.detector_type'] = mdval
246  elif mdkey == 'lbneDataName':
247  md['lbne_data.name'] = mdval
248  elif mdkey == 'detector.hv_status':
249  md[mdkey] = mdval
250  elif mdkey == 'detector.hv_value':
251  md[mdkey] = mdval
252  elif mdkey == 'detector.tpc_status':
253  md[mdkey] = mdval
254  elif mdkey == 'detector.tpc_apa_status':
255  md[mdkey] = mdval
256  elif mdkey == 'detector.tpc_apas':
257  md[mdkey] = mdval
258  elif mdkey == 'detector.tpc_apa_1':
259  md[mdkey] = mdval
260  elif mdkey == 'detector.tpc_apa_2':
261  md[mdkey] = mdval
262  elif mdkey == 'detector.tpc_apa_3':
263  md[mdkey] = mdval
264  elif mdkey == 'detector.tpc_apa_4':
265  md[mdkey] = mdval
266  elif mdkey == 'detector.tpc_apa_5':
267  md[mdkey] = mdval
268  elif mdkey == 'detector.tpc_apa_6':
269  md[mdkey] = mdval
270  elif mdkey == 'detector.pd_status':
271  md[mdkey] = mdval
272  elif mdkey == 'detector.crt_status':
273  md[mdkey] = mdval
274  elif mdkey == 'daq.readout':
275  md[mdkey] = mdval
276  elif mdkey == 'daq.felix_status':
277  md[mdkey] = mdval
278  elif mdkey == 'beam.polarity':
279  md[mdkey] = mdval
280  elif mdkey == 'beam.momentum':
281  md[mdkey] = mdval
282  elif mdkey == 'dunemeta.data_stream':
283  md['data_stream'] = mdval
284  elif mdkey == '??.data_type':
285  md[mdkey] = mdval
286  elif mdkey == 'data_quality.level':
287  md[mdkey] = mdval
288  elif mdkey == 'data_quality.is_junk':
289  md[mdkey] = mdval
290  elif mdkey == 'data_quality.do_not_process':
291  md[mdkey] = mdval
292  elif mdkey == 'data_quality.online_good_run_list':
293  md[mdkey] = mdval
294  elif mdkey == 'dunemeta.dune_data.accouple':
295  md['DUNE_data.accouple'] = int(mdval)
296  elif mdkey == 'dunemeta.dune_data.calibpulsemode':
297  md['DUNE_data.calibpulsemode'] = int(mdval)
298  elif mdkey == 'dunemeta.dune_data.daqconfigname':
299  md['DUNE_data.DAQConfigName'] = mdval
300  elif mdkey == 'dunemeta.dune_data.detector_config':
301  md['DUNE_data.detector_config'] = mdval
302  elif mdkey == 'dunemeta.dune_data.febaselinehigh':
303  md['DUNE_data.febaselinehigh'] = int(mdval)
304  elif mdkey == 'dunemeta.dune_data.fegain':
305  md['DUNE_data.fegain'] = int(mdval)
306  elif mdkey == 'dunemeta.dune_data.feleak10x':
307  md['DUNE_data.feleak10x'] = int(mdval)
308  elif mdkey == 'dunemeta.dune_data.feleakhigh':
309  md['DUNE_data.feleakhigh'] = int(mdval)
310  elif mdkey == 'dunemeta.dune_data.feshapingtime':
311  md['DUNE_data.feshapingtime'] = int(mdval)
312  elif mdkey == 'dunemeta.dune_data.inconsistent_hw_config':
313  md['DUNE_data.inconsistent_hw_config'] = int(mdval)
314  elif mdkey == 'dunemeta.dune_data.is_fake_data':
315  md['DUNE_data.is_fake_data'] = int(mdval)
316  elif mdkey == 'dunemeta.dune_data.readout_window':
317  md['DUNE_data.readout_window'] = float(mdval)
318  # For all other keys, copy art metadata directly to sam metadata.
319  # This works for run-tuple (run, subrun, runtype) and time stamps.
320 
321  else:
322  md[mdkey] = mdart[mdkey]
323 
324  # Get the other meta data field parameters
325 
326  md['file_name'] = self.inputfile.split("/")[-1]
327  if 'file_size' in md0:
328  md['file_size'] = md0['file_size']
329  else:
330  md['file_size'] = os.path.getsize(self.inputfile)
331  if 'crc' in md0 and not args.no_crc:
332  md['crc'] = md0['crc']
333  elif not args.no_crc:
334  md['crc'] = root_metadata.fileEnstoreChecksum(self.inputfile)
335 
336  # In case we ever want to check out what md is for any instance of MetaData by calling instance.md
337  self.md = md
338  return self.md
339 
340  def getmetadata(self, md0={}):
341  """ Get metadata from input file and return as python dictionary.
342  Calls other methods in class and returns metadata dictionary"""
343  proc = self.extract_metadata_to_pipe()
344  jobt = self.get_job(proc)
345  mdart = self.mdart_gen(jobt)
346  return self.md_gen(mdart, md0)
347 
348 def main():
349 
350  argparser = argparse.ArgumentParser('Parse arguments')
351  argparser.add_argument('--infile',help='path to input file',required=True,type=str)
352  argparser.add_argument('--declare',help='validate and declare the metadata for the file specified in --infile to SAM',action='store_true')
353  argparser.add_argument('--appname',help='application name for SAM metadata',type=str)
354  argparser.add_argument('--appversion',help='application version for SAM metadata',type=str)
355  argparser.add_argument('--appfamily',help='application family for SAM metadata',type=str)
356  argparser.add_argument('--campaign',help='Value for DUNE.campaign for SAM metadata',type=str)
357  argparser.add_argument('--data_stream',help='Value for data_stream for SAM metadata',type=str)
358  argparser.add_argument('--requestid',help='Value for DUNE.requestid for SAM metadata',type=str)
359  argparser.add_argument('--set_processed',help='Set for parent file as processed in SAM metadata',action="store_true")
360  argparser.add_argument('--strip_parents',help='Do not include the file\'s parents in SAM metadata for declaration',action="store_true")
361  argparser.add_argument('--no_crc',help='Leave the crc out of the generated json',action="store_true")
362  argparser.add_argument('--skip_dumper',help='Skip running sam_metadata_dumper on the input file',action="store_true")
363  argparser.add_argument('--input_json',help='Input json file containing metadata to be added to output (can contain ANY valid SAM metadata parameters)',type=str)
364 
365  global args
366  args = argparser.parse_args()
367 
368  ih = ifdh.ifdh()
369 
370  try:
371 # expSpecificMetadata = expMetaData(os.environ['SAM_EXPERIMENT'], str(sys.argv[1]))
372  expSpecificMetadata = expMetaData(os.environ['SAM_EXPERIMENT'], args.infile)
373  mddict = expSpecificMetadata.getmetadata()
374  # If --input_json is supplied, open that dict now and add it to the output json
375  if args.input_json != None:
376  if os.path.exists(args.input_json):
377  try:
378  injson=open(args.input_json)
379  arbjson = json.load(injson)
380  for key in list(arbjson.keys()):
381  mddict[key] = arbjson[key]
382  except:
383  print('Error loading input json file.')
384  raise
385  else:
386  print('Error, specified input file does not exist.')
387  sys.exit(1)
388 
389  if 'application' in mddict and 'name' not in list(mddict['application'].keys()) and args.appname != None:
390  mddict['application']['name'] = args.appname
391  if 'application' in mddict and 'version' not in list(mddict['application'].keys()) and args.appversion != None:
392  mddict['application']['version'] = args.appversion
393  if 'application' in mddict and 'family' not in list(mddict['application'].keys()) and args.appfamily != None:
394  mddict['application']['family'] = args.appfamily
395  if args.appfamily != None and args.appname != None and args.appversion != None:
396  mddict['application'] = {}
397  mddict['application']['family'] = args.appfamily
398  mddict['application']['name'] = args.appname
399  mddict['application']['version'] = args.appversion
400  if 'DUNE.campaign' not in list(mddict.keys()) and args.campaign != None:
401  mddict['DUNE.campaign'] = args.campaign
402  if args.data_stream != None:
403  mddict['data_stream'] = args.data_stream
404  if args.requestid != None:
405  mddict['DUNE.requestid'] = args.requestid
406 
407  except TypeError:
408  print('You have not implemented a defineMetaData function by providing an experiment.')
409  print('No metadata keys will be saved')
410  raise
411 # mdtext = json.dumps(expSpecificMetadata.getmetadata(), indent=2, sort_keys=True)
412  mdtext = json.dumps(mddict, indent=2, sort_keys=True)
413 
414  if args.declare:
415  ih.declareFile(mdtext)
416 
417  if args.set_processed:
418  swc = samweb_client.SAMWebClient('dune')
419  moddict = {"DUNE.production_status" : "processed" }
420  for parent in moddict['parents']:
421  fname = moddict['parents'][parent]['file_name']
422  try:
423  swc.modifyFileMetadata(fname, moddict)
424  except:
425  print('Error modidying metadata for %s' % fname)
426  raise
427 
428  print(mdtext)
429  sys.exit(0)
430 
431 
432 
433 if __name__ == "__main__":
434  main()
435 
436 
def get_job(self, proc)
int open(const char *, int)
Opens a file descriptor.
static bool format(QChar::Decomposition tag, QString &str, int index, int len)
Definition: qstring.cpp:11496
def __init__(self, expname, inputfile)
def extract_metadata_to_pipe(self)
def md_gen(self, mdart, md0={})
def __init__(self, inputfile)
void decode(std::any const &a, Hep2Vector &result)
Definition: CLHEP_ps.h:12
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
def getmetadata(self, md0={})
def wait_for_subprocess(jobinfo, q)