submit_mcc.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import argparse
4 import logging
5 import os
6 import sys
7 import json
8 import subprocess
9 import time
10 from datetime import datetime
11 from ECLAPI import ECLConnection, ECLEntry
12 
13 ECL_URL="http://dbweb6.fnal.gov:8080/ECL/dunepro"
14 now = datetime.now().strftime('%Y%m%d%H%M%S')
15 
16 try:
17  from cStringIO import StringIO # Python 2
18 except ImportError:
19  from io import StringIO
20 
21 # set up the logger
22 log_stream = StringIO()
23 FORMAT = '%(asctime)s - submit_mcc.py - %(levelname)s - %(message)s'
24 logging.basicConfig(stream=log_stream, level=logging.INFO, format=FORMAT)
25 
26 def call_project_py(xml, stg, act, test=False):
27  if test:
28  tmpdir = "/pnfs/dune/scratch/dunepro/mcc10_test/test_{}".format(now)
29  subprocess.call("mkdir -p {}".format(tmpdir), shell=True)
30  overrides = ["wrap_proj", "-Onumevents=10",
31  "-Ostage.numjobs=1", "-Ostage.outdir={tmp}",
32  "-Ostage.workdir={tmp}".format(tmp=tmpdir)]
33  logging.info("Submitting a test job with:")
34  logging.info(" numberjobs = 1;")
35  logging.info(" numevents = 1;")
36  logging.info(" outdir = {};".format(tmpdir))
37  logging.info(" workdir = {};".format(tmpdir))
38  else:
39  overrides = []
40  prj_cmd = overrides + ["project.py", "--xml", xml, "--stage", stg,
41  "--{}".format(act)]
42  try:
43  prj_out = subprocess.check_output(prj_cmd, stderr=subprocess.STDOUT)
44  logging.info("project.py was run successfully.")
45  logging.info("project.py output: {}".format(prj_out))
46  except subprocess.CalledProcessError as e:
47  logging.error("project.py exit with {}.".format(e.returncode))
48  logging.error("project.py output: {}.".format(e.output))
49  logging.error("Exiting now!")
50  print(log_stream.getvalue())
51  sys.exit(40)
52  return
53 
54 def handle_stage(workflow_def, xml, workflow, stage, nocheck=False, test=False):
55  wf_dict = json.load(open(workflow_def))
56  workflows = wf_dict["workflows"]
57  if workflow not in workflows:
58  logging.error("Workflow chosen is not defined in the json file.")
59  print(log_stream.getvalue())
60  sys.exit(30)
61  if stage not in workflows[workflow]["stages"]:
62  logging.error("Stage is not defined in the workflow.")
63  print(log_stream.getvalue())
64  sys.exit(31)
65  stages = workflows[workflow]["stages"]
66  actions = workflows[workflow]["actions"]
67  stage_index = stages.index(stage)
68  action = actions[stage_index]
69  # not checking on previous stage if:
70  # 1) '--no-check' is enabled;
71  # 2) '--test' is enabled, cannot check on test jobs since xml file is
72  # generated on the fly with parameter overrides;
73  # 3) if the stage is the first stage of a workflow.
74  if stage_index == 0 or test:
75  nocheck=True
76  if not nocheck:
77  prev_stage = stages[stage_index-1]
78  logging.info("Checking previous stage: {}".format(prev_stage))
79  call_project_py(xml, prev_stage, "check")
80  time.sleep(10)
81  logging.info("Handle stage: {} with action: {}".format(stage, action))
82  call_project_py(xml, stage, action, test)
83  return
84 
85 if __name__ == '__main__':
86  parser = argparse.ArgumentParser(description='')
87  groupArg = parser.add_argument_group('require arguments')
88 
89  groupArg.add_argument('--xml', metavar='xml_file', type=str,
90  help='path to xml file for project.py with path', required = True)
91  groupArg.add_argument('--workflow', metavar='MCC_workflow_name', type=str,
92  help='MCC workflow name', required = True)
93  groupArg.add_argument('--stage', metavar='stage_name', type=str,
94  help='stage name for project.py', required = True)
95 
96  parser.add_argument('--test', action='store_true',
97  help='submit test jobs only')
98  parser.add_argument('--no-check', action='store_true',
99  help='turn off checking on previous stage')
100  parser.add_argument('--workflow-def', type=str, default="workflows.json",
101  metavar='workflow_definition_json_file',
102  help='path to json file which defines the workflows')
103  parser.add_argument('--post-ecl', action='store_true',
104  help='post the output to ECL')
105  parser.add_argument('--ecl-user', type=str, default="dunepro",
106  help='ECL user')
107  parser.add_argument('--ecl-password', type=str, help='ECL user password')
108  parser.add_argument('--ecl-subject', type=str,
109  default="MCC Submission Entry Subject",
110  help='ECL entry subject name')
111  parser.add_argument('--ecl-category', type=str,
112  default="mcc10", help='ECL category name')
113  parser.add_argument('--ecl-comment', type=str,
114  default="submit_mcc.py entry comment", help='ECL entry comment')
115 
116  args = parser.parse_args()
117  cwd = os.getcwd()
118 
119  logging.info("Current dir : {}".format(cwd))
120  logging.info("XML file used : {}".format(args.xml))
121  logging.info("Workflow json : {}".format(args.workflow_def))
122  logging.info("Workflow name : {}".format(args.workflow))
123  logging.info("Stage name : {}".format(args.stage))
124 
125  if not os.path.isfile(args.workflow_def):
126  logging.error("Workflow definition file is not existed. Exit now!")
127  print(log_stream.getvalue())
128  sys.exit(11)
129  if not os.path.isfile(args.xml):
130  logging.error("XML file for project py is not existed. Exit now!")
131  print(log_stream.getvalue())
132  sys.exit(12)
133 
134  if args.test:
135  logging.warn("Submitting testing jobs")
136  if args.no_check:
137  logging.warn("Will not check previous stage with project.py")
138  if args.post_ecl:
139  logging.info("Will make an ECL entry in dunepro ECL.")
140  if not args.ecl_password:
141  logging.error("ECL user password must be supplied. Exiting now!")
142  print(log_stream.getvalue())
143  sys.exit(20)
144 
145  # do submit_stage work here
146  handle_stage(args.workflow_def, args.xml, args.workflow, args.stage,
147  args.no_check, args.test)
148 
149  if args.post_ecl:
150  ecl_entry = ECLEntry(
151  category=args.ecl_category,
152  tags=['MCCSubmission'],
153  formname='MCC Submission',
154  preformatted=False)
155 
156  logging.info("ECL subject : {}".format(args.ecl_subject))
157 
158  ecl_entry.addSubject(args.ecl_subject)
159  ecl_entry.setValue(name="work_dir", value=cwd)
160  ecl_entry.setValue(name="xml_file", value=args.xml)
161  ecl_entry.setValue(name="workflow_file", value=args.workflow_def)
162  ecl_entry.setValue(name="workflow", value=args.workflow)
163  ecl_entry.setValue(name="stage", value=args.stage)
164  ecl_entry.setValue(name="comment", value=args.ecl_comment)
165 
166  logfile = ('log_mcc_submit_{}.txt'.format(now))
167  with open(logfile, 'w') as lfile:
168  lfile.write(log_stream.getvalue())
169 
170  ecl_entry.addAttachment(name=logfile, filename="{}/{}".format(
171  cwd, logfile))
172 
173  elconn = ECLConnection(url=ECL_URL, username=args.ecl_user,
174  password=args.ecl_password)
175 
176  logging.info("Posting the entry to dunepro ECL now.")
177  response = elconn.post(ecl_entry)
178  logging.info("Response from ECL: {}".format(response))
179  elconn.close()
180 
181  print(log_stream.getvalue())
int open(const char *, int)
Opens a file descriptor.
def call_project_py(xml, stg, act, test=False)
Definition: submit_mcc.py:26
def handle_stage(workflow_def, xml, workflow, stage, nocheck=False, test=False)
Definition: submit_mcc.py:54