extractor_dict.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import absolute_import
3 from __future__ import print_function
4 import sys, getopt
5 import os
6 from subprocess import Popen, PIPE
7 import threading
8 try:
9  import queue
10 except ImportError:
11  import Queue as queue
12 import project_utilities, root_metadata
13 from larbatch_utilities import convert_str
14 import json
15 import abc
16 
17 # Function to wait for a subprocess to finish and fetch return code,
18 # standard output, and standard error.
19 # Call this function like this:
20 #
21 # q = Queue.Queue()
22 # jobinfo = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
23 # wait_for_subprocess(jobinfo, q)
24 # rc = q.get() # Return code.
25 # jobout = q.get() # Standard output
26 # joberr = q.get() # Standard error
27 
28 """extractor_dict.py
29 Purpose: To extract metadata from output file on worker node, generate JSON file
30 """
31 
32 
33 class MetaData(object):
34  """Base class to hold / interpret general metadata"""
35  __metaclass__ = abc.ABCMeta
36 
37  @abc.abstractmethod
38  def __init__(self, inputfile):
39  self.inputfile = inputfile
40 
42  """Extract metadata from inputfile into a pipe for further processing."""
43  local = project_utilities.path_to_local(self.inputfile)
44  if len(local) > 0:
45  proc = Popen(["sam_metadata_dumper", local], stdout=PIPE,
46  stderr=PIPE)
47  else:
48  url = project_utilities.path_to_url(inputfile)
49  proc = Popen(["sam_metadata_dumper", url], stdout=PIPE,
50  stderr=PIPE)
51  if len(local) > 0 and local != self.inputfile:
52  os.remove(local)
53  return proc
54 
55  def get_job(self, proc):
56  """Run the proc in a 60-sec timeout queue, return stdout, stderr"""
57  q = queue.Queue()
58  thread = threading.Thread(target=self.wait_for_subprocess, args=[proc, q])
59  thread.start()
60  thread.join(timeout=60)
61  if thread.is_alive():
62  print('Terminating subprocess because of timeout.')
63  proc.terminate()
64  thread.join()
65  rc = q.get()
66  jobout = convert_str(q.get())
67  joberr = convert_str(q.get())
68  if rc != 0:
69  raise RuntimeError('sam_metadata_dumper returned nonzero exit status {}.'.format(rc))
70  return jobout, joberr
71 
72  @staticmethod
73  def wait_for_subprocess(jobinfo, q):
74  """Run jobinfo, put the return code, stdout, and stderr into a queue"""
75  jobout, joberr = jobinfo.communicate()
76  jobout = convert_str(jobout)
77  joberr = convert_str(joberr)
78  rc = jobinfo.poll()
79  for item in (rc, jobout, joberr):
80  q.put(item)
81  return
82 
83  @staticmethod
84  def mdart_gen(jobtuple):
85  """Take Jobout and Joberr (in jobtuple) and return mdart object from that"""
86  mdtext = ''.join(line.replace(", ,", ",") for line in jobtuple[0].split('\n') if line[-3:-1] != ' ,')
87  mdtop = json.JSONDecoder().decode(mdtext)
88  if len(list(mdtop.keys())) == 0:
89  print('No top-level key in extracted metadata.')
90  sys.exit(1)
91  file_name = list(mdtop.keys())[0]
92  return mdtop[file_name]
93 
94  @staticmethod
96  """If there's no application key in md dict, create the key with a blank dictionary.
97  Then return md['application'], along with mdval"""
98  if 'application' not in md:
99  md['application'] = {}
100  return md['application']
101 
102 
104  """Class to hold/interpret experiment-specific metadata"""
105  def __init__(self, expname, inputfile):
106  MetaData.__init__(self, inputfile)
107  self.expname = expname
108  #self.exp_md_keyfile = expname + '_metadata_key'
109  try:
110  #translateMetaData = __import__("experiment_utilities", "MetaDataKey")
111  from experiment_utilities import MetaDataKey
112  except ImportError:
113  print("You have not defined an experiment-specific metadata and key-translating module in experiment_utilities. Exiting")
114  raise
115 
116  metaDataModule = MetaDataKey()
117  self.metadataList, self.translateKeyf = metaDataModule.metadataList(), metaDataModule.translateKey
118 
119  def translateKey(self, key):
120  """Returns the output of the imported translateKey function (as translateKeyf) called on key"""
121  return self.translateKeyf(key)
122 
123  def md_gen(self, mdart, md0={}):
124  """Loop through art metdata, generate metadata dictionary"""
125  # define an empty python dictionary which will hold sam metadata.
126  # Some fields can be copied directly from art metadata to sam metadata.
127  # Other fields require conversion.
128  md = {}
129 
130 
131 
132  # Loop over art metadata.
133  mixparents = []
134  for mdkey, mdval in list(mdart.items()):
135  # mdval = mdart[mdkey]
136 
137  # Skip some art-specific fields.
138  # Ignore primary run_type field (if any).
139  # Instead, get run_type from runs field.
140  if mdkey in ['file_format_version', 'file_format_era', 'run_type']:
141  pass
142  elif mdkey in ['art.file_format_version', 'art.file_format_era', 'art.run_type']:
143  pass
144 
145  # Ignore new-style first/last event (old-style first/last handled below).
146  elif mdkey in ['art.first_event', 'art.last_event']:
147  pass
148 
149  # Ignore data_stream if it begins with "out".
150  # These kinds of stream names are probably junk module labels.
151 
152  # First check if the data_stream is just "out" Catches an edge case
153  # where the stream does not have a number
154 
155  elif mdkey == 'data_stream' and mdval == 'out':
156  pass
157 
158  elif mdkey == 'data_stream' and mdval[:3] == 'out' and \
159  mdval[3] >= '0' and mdval[3] <= '9':
160  pass
161 
162  # Application family/name/version.
163  elif mdkey == 'applicationFamily' or mdkey == 'application.family':
164  md['application'], md['application']['family'] = self.md_handle_application(md), mdval
165  elif mdkey == 'process_name' or mdkey == 'art.process_name':
166  md['application'], md['application']['name'] = self.md_handle_application(md), mdval
167  elif mdkey == 'applicationVersion' or mdkey == 'application.version':
168  md['application'], md['application']['version'] = self.md_handle_application(md), mdval
169 
170  # Parents.
171  elif mdkey == 'parents':
172  md['parents'] = [{'file_name': parent} for parent in mdval]
173 
174  elif mdkey.startswith('mixparent'):
175  mixparents.append(mdval.strip(' ,"') )
176 
177  # Other fields where the key or value requires minor conversion.
178  elif mdkey in ['first_event', 'last_event']:
179  if (type(mdval) == type([]) or type(mdval) == type(())) and len(mdval) >= 3:
180  md[mdkey] = mdval[2]
181  else:
182  md[mdkey] = mdval
183  elif mdkey in self.metadataList:
184  #print mdkey
185  trkey = self.translateKey(mdkey)
186  #print trkey
187  md[trkey] = mdval
188  elif mdkey == 'fclName':
189  md['fcl.name'] = mdval
190  elif mdkey == 'fclVersion':
191  md['fcl.version'] = mdval
192 
193  #For all other keys, copy art metadata directly to sam metadata.
194  #This works for run-tuple (run, subrun, runtype) and time stamps.
195  else:
196  md[mdkey] = mdval
197 
198  # Merge mix parents into normal parents.
199 
200  for mixparent in mixparents:
201  mixparent_dict = {'file_name': mixparent}
202  md['parents'].append(mixparent_dict)
203 
204  # Get the other meta data field parameters.
205 
206  md['file_name'] = self.inputfile.split("/")[-1]
207  if 'file_size' in md0:
208  md['file_size'] = md0['file_size']
209  else:
210  md['file_size'] = os.path.getsize(self.inputfile)
211  if 'crc' in md0:
212  md['crc'] = md0['crc']
213  else:
214  md['crc'] = root_metadata.fileEnstoreChecksum(self.inputfile)
215 
216  # In case we ever want to check out what md is for any instance of MetaData by calling instance.md
217  self.md = md
218  return self.md
219 
220  def getmetadata(self, md0={}):
221  """ Get metadata from input file and return as python dictionary.
222  Calls other methods in class and returns metadata dictionary"""
223  proc = self.extract_metadata_to_pipe()
224  jobt = self.get_job(proc)
225  mdart = self.mdart_gen(jobt)
226  return self.md_gen(mdart, md0)
227 
228 def main():
229  try:
230  expSpecificMetadata = expMetaData(os.environ['SAM_EXPERIMENT'], str(sys.argv[1]))
231  except TypeError:
232  print('You have not implemented a defineMetaData function by providing an experiment.')
233  print('No metadata keys will be saved')
234  raise
235  mdtext = json.dumps(expSpecificMetadata.getmetadata(), indent=2, sort_keys=True)
236  print(mdtext)
237  sys.exit(0)
238 
239 
240 
241 if __name__ == "__main__":
242  main()
243 
244 
def __init__(self, expname, inputfile)
static bool format(QChar::Decomposition tag, QString &str, int index, int len)
Definition: qstring.cpp:11496
def wait_for_subprocess(jobinfo, q)
void decode(std::any const &a, Hep2Vector &result)
Definition: CLHEP_ps.h:12
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
def __init__(self, inputfile)
def md_gen(self, mdart, md0={})
static QCString str