extractor_dict.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from __future__ import print_function
4 
5 import sys, getopt
6 import os
7 from subprocess import Popen, PIPE
8 import threading
9 import queue
10 import project_utilities, root_metadata
11 import json
12 import abc
13 
14 # Function to wait for a subprocess to finish and fetch return code,
15 # standard output, and standard error.
16 # Call this function like this:
17 #
18 # q = Queue.Queue()
19 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
20 # wait_for_subprocess(jobinfo, q)
21 # rc = q.get() # Return code.
22 # jobout = q.get() # Standard output
23 # joberr = q.get() # Standard error
24 
25 """extractor_dict.py
26 Purpose: To extract metadata from output file on worker node, generate JSON file
27 """
28 
29 
30 class MetaData(object):
31  """Base class to hold / interpret general metadata"""
32  __metaclass__ = abc.ABCMeta
33 
34  @abc.abstractmethod
35  def __init__(self, inputfile):
36  self.inputfile = inputfile
37 
39  """Extract metadata from inputfile into a pipe for further processing."""
40  local = project_utilities.path_to_local(self.inputfile)
41  if len(local) > 0:
42  proc = Popen(["sam_metadata_dumper", local], stdout=PIPE,
43  stderr=PIPE)
44  else:
45  url = project_utilities.path_to_url(inputfile)
46  proc = Popen(["sam_metadata_dumper", url], stdout=PIPE,
47  stderr=PIPE)
48  if len(local) > 0 and local != self.inputfile:
49  os.remove(local)
50  return proc
51 
52  def get_job(self, proc):
53  """Run the proc in a 60-sec timeout queue, return stdout, stderr"""
54  q = queue.Queue()
55  thread = threading.Thread(target=self.wait_for_subprocess, args=[proc, q])
56  thread.start()
57  thread.join(timeout=60)
58  if thread.is_alive():
59  print('Terminating subprocess because of timeout.')
60  proc.terminate()
61  thread.join()
62  rc = q.get()
63  jobout = q.get()
64  joberr = q.get()
65  if rc != 0:
66  raise RuntimeError('sam_metadata_dumper returned nonzero exit status {}.'.format(rc))
67  return jobout, joberr
68 
69  @staticmethod
70  def wait_for_subprocess(jobinfo, q):
71  """Run jobinfo, put the return code, stdout, and stderr into a queue"""
72  jobout, joberr = jobinfo.communicate()
73  rc = jobinfo.poll()
74  for item in (rc, jobout, joberr):
75  q.put(item)
76  return
77 
78  @staticmethod
79  def mdart_gen(jobtuple):
80  """Take Jobout and Joberr (in jobtuple) and return mdart object from that"""
81  mdtext = ''.join(line.replace(", ,", ",") for line in jobtuple[0].split('\n') if line[-3:-1] != ' ,')
82  mdtop = json.JSONDecoder().decode(mdtext)
83  if len(mdtop.keys()) == 0:
84  print('No top-level key in extracted metadata.')
85  sys.exit(1)
86  file_name = mdtop.keys()[0]
87  return mdtop[file_name]
88 
89  @staticmethod
91  """If there's no application key in md dict, create the key with a blank dictionary.
92  Then return md['application'], along with mdval"""
93  if 'application' not in md:
94  md['application'] = {}
95  return md['application']
96 
97 
99  """Class to hold/interpret experiment-specific metadata"""
100  def __init__(self, expname, inputfile):
101  MetaData.__init__(self, inputfile)
102  self.expname = expname
103  #self.exp_md_keyfile = expname + '_metadata_key'
104  try:
105  #translateMetaData = __import__("experiment_utilities", "MetaDataKey")
106  from experiment_utilities import MetaDataKey
107  except ImportError:
108  print("You have not defined an experiment-specific metadata and key-translating module in experiment_utilities. Exiting")
109  raise
110 
111  metaDataModule = MetaDataKey()
112  self.metadataList, self.translateKeyf = metaDataModule.metadataList(), metaDataModule.translateKey
113 
114  def translateKey(self, key):
115  """Returns the output of the imported translateKey function (as translateKeyf) called on key"""
116  return self.translateKeyf(key)
117 
118  def md_gen(self, mdart, md0={}):
119  """Loop through art metdata, generate metadata dictionary"""
120  # define an empty python dictionary which will hold sam metadata.
121  # Some fields can be copied directly from art metadata to sam metadata.
122  # Other fields require conversion.
123  md = {}
124 
125  # Loop over art metadata.
126  for mdkey in mdart.keys():
127  mdval = mdart[mdkey]
128 
129  # Skip some art-specific fields.
130 
131  if mdkey == 'file_format_version':
132  pass
133  elif mdkey == 'file_format_era':
134  pass
135 
136  # Ignore primary run_type field (if any).
137  # Instead, get run_type from runs field.
138 
139  elif mdkey == 'run_type':
140  pass
141 
142  # Ignore data_stream for now.
143 
144  elif mdkey == 'data_stream':
145  pass
146 
147  # Ignore process_name for now.
148 
149  elif mdkey == 'process_name':
150  pass
151 
152  # Application family/name/version.
153 
154  elif mdkey == 'applicationFamily':
155  if not md.has_key('application'):
156  md['application'] = {}
157  md['application']['family'] = mdval
158  elif mdkey == 'StageName':
159  if not md.has_key('application'):
160  md['application'] = {}
161  md['application']['name'] = mdval
162  elif mdkey == 'applicationVersion':
163  if not md.has_key('application'):
164  md['application'] = {}
165  md['application']['version'] = mdval
166 
167  # Parents.
168 
169  elif mdkey == 'parents':
170  mdparents = []
171  for parent in mdval:
172  parent_dict = {'file_name': parent}
173  mdparents.append(parent_dict)
174  md['parents'] = mdparents
175 
176  # Other fields where the key or value requires minor conversion.
177 
178  elif mdkey == 'first_event':
179  md[mdkey] = mdval[2]
180  elif mdkey == 'last_event':
181  md[mdkey] = mdval[2]
182  elif mdkey == 'lbneMCGenerators':
183  md['lbne_MC.generators'] = mdval
184  elif mdkey == 'lbneMCOscillationP':
185  md['lbne_MC.oscillationP'] = mdval
186  elif mdkey == 'lbneMCTriggerListVersion':
187  md['lbne_MC.trigger-list-version'] = mdval
188  elif mdkey == 'lbneMCBeamEnergy':
189  md['lbne_MC.beam_energy'] = mdval
190  elif mdkey == 'lbneMCBeamFluxID':
191  md['lbne_MC.beam_flux_ID'] = mdval
192  elif mdkey == 'lbneMCName':
193  md['lbne_MC.name'] = mdval
194  elif mdkey == 'lbneMCDetectorType':
195  md['lbne_MC.detector_type'] = mdval
196  elif mdkey == 'lbneMCNeutrinoFlavors':
197  md['lbne_MC.neutrino_flavors'] = mdval
198  elif mdkey == 'lbneMCMassHierarchy':
199  md['lbne_MC.mass_hierarchy'] = mdval
200  elif mdkey == 'lbneMCMiscellaneous':
201  md['lbne_MC.miscellaneous'] = mdval
202  elif mdkey == 'lbneMCGeometryVersion':
203  md['lbne_MC.geometry_version'] = mdval
204  elif mdkey == 'lbneMCOverlay':
205  md['lbne_MC.overlay'] = mdval
206  elif mdkey == 'lbneDataRunMode':
207  md['lbne_data.run_mode'] = mdval
208  elif mdkey == 'lbneDataDetectorType':
209  md['lbne_data.detector_type'] = mdval
210  elif mdkey == 'lbneDataName':
211  md['lbne_data.name'] = mdval
212 
213  # For all other keys, copy art metadata directly to sam metadata.
214  # This works for run-tuple (run, subrun, runtype) and time stamps.
215 
216  else:
217  md[mdkey] = mdart[mdkey]
218 
219 
220  # Get the other meta data field parameters
221  md['file_name'] = self.inputfile.split("/")[-1]
222  if 'file_size' in md0:
223  md['file_size'] = md0['file_size']
224  else:
225  md['file_size'] = os.path.getsize(self.inputfile)
226  if 'crc' in md0:
227  md['crc'] = md0['crc']
228  else:
229  md['crc'] = root_metadata.fileEnstoreChecksum(self.inputfile)
230 
231  # In case we ever want to check out what md is for any instance of MetaData by calling instance.md
232  self.md = md
233  return self.md
234 
235  def getmetadata(self, md0={}):
236  """ Get metadata from input file and return as python dictionary.
237  Calls other methods in class and returns metadata dictionary"""
238  proc = self.extract_metadata_to_pipe()
239  jobt = self.get_job(proc)
240  mdart = self.mdart_gen(jobt)
241  return self.md_gen(mdart, md0)
242 
243 def main():
244  try:
245  expSpecificMetadata = expMetaData(os.environ['SAM_EXPERIMENT'], str(sys.argv[1]))
246  except TypeError:
247  print('You have not implemented a defineMetaData function by providing an experiment.')
248  print('No metadata keys will be saved')
249  raise
250  mdtext = json.dumps(expSpecificMetadata.getmetadata(), indent=2, sort_keys=True)
251  print(mdtext)
252  sys.exit(0)
253 
254 
255 
256 if __name__ == "__main__":
257  main()
258 
259 
static bool format(QChar::Decomposition tag, QString &str, int index, int len)
Definition: qstring.cpp:11496
def get_job(self, proc)
def getmetadata(self, md0={})
def md_gen(self, mdart, md0={})
def wait_for_subprocess(jobinfo, q)
def extract_metadata_to_pipe(self)
void decode(std::any const &a, Hep2Vector &result)
Definition: CLHEP_ps.h:12
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
def __init__(self, inputfile)
def __init__(self, expname, inputfile)
static QCString str