10 from datetime
import datetime
11 from ECLAPI
import ECLConnection, ECLEntry
13 ECL_URL=
"http://dbweb6.fnal.gov:8080/ECL/dunepro" 14 now = datetime.now().strftime(
'%Y%m%d%H%M%S')
17 from cStringIO
import StringIO
19 from io
import StringIO
22 log_stream = StringIO()
23 FORMAT =
'%(asctime)s - submit_mcc.py - %(levelname)s - %(message)s' 24 logging.basicConfig(stream=log_stream, level=logging.INFO, format=FORMAT)
28 tmpdir =
"/pnfs/dune/scratch/dunepro/mcc10_test/test_{}".
format(now)
29 subprocess.call(
"mkdir -p {}".
format(tmpdir), shell=
True)
30 overrides = [
"wrap_proj",
"-Onumevents=10",
31 "-Ostage.numjobs=1",
"-Ostage.outdir={tmp}",
32 "-Ostage.workdir={tmp}".
format(tmp=tmpdir)]
33 logging.info(
"Submitting a test job with:")
34 logging.info(
" numberjobs = 1;")
35 logging.info(
" numevents = 1;")
36 logging.info(
" outdir = {};".
format(tmpdir))
37 logging.info(
" workdir = {};".
format(tmpdir))
40 prj_cmd = overrides + [
"project.py",
"--xml", xml,
"--stage", stg,
43 prj_out = subprocess.check_output(prj_cmd, stderr=subprocess.STDOUT)
44 logging.info(
"project.py was run successfully.")
45 logging.info(
"project.py output: {}".
format(prj_out))
46 except subprocess.CalledProcessError
as e:
47 logging.error(
"project.py exit with {}.".
format(e.returncode))
48 logging.error(
"project.py output: {}.".
format(e.output))
49 logging.error(
"Exiting now!")
50 print(log_stream.getvalue())
54 def handle_stage(workflow_def, xml, workflow, stage, nocheck=False, test=False):
55 wf_dict = json.load(
open(workflow_def))
56 workflows = wf_dict[
"workflows"]
57 if workflow
not in workflows:
58 logging.error(
"Workflow chosen is not defined in the json file.")
59 print(log_stream.getvalue())
61 if stage
not in workflows[workflow][
"stages"]:
62 logging.error(
"Stage is not defined in the workflow.")
63 print(log_stream.getvalue())
65 stages = workflows[workflow][
"stages"]
66 actions = workflows[workflow][
"actions"]
67 stage_index = stages.index(stage)
68 action = actions[stage_index]
74 if stage_index == 0
or test:
77 prev_stage = stages[stage_index-1]
78 logging.info(
"Checking previous stage: {}".
format(prev_stage))
81 logging.info(
"Handle stage: {} with action: {}".
format(stage, action))
85 if __name__ ==
'__main__':
86 parser = argparse.ArgumentParser(description=
'')
87 groupArg = parser.add_argument_group(
'require arguments')
89 groupArg.add_argument(
'--xml', metavar=
'xml_file', type=str,
90 help=
'path to xml file for project.py with path', required =
True)
91 groupArg.add_argument(
'--workflow', metavar=
'MCC_workflow_name', type=str,
92 help=
'MCC workflow name', required =
True)
93 groupArg.add_argument(
'--stage', metavar=
'stage_name', type=str,
94 help=
'stage name for project.py', required =
True)
96 parser.add_argument(
'--test', action=
'store_true',
97 help=
'submit test jobs only')
98 parser.add_argument(
'--no-check', action=
'store_true',
99 help=
'turn off checking on previous stage')
100 parser.add_argument(
'--workflow-def', type=str, default=
"workflows.json",
101 metavar=
'workflow_definition_json_file',
102 help=
'path to json file which defines the workflows')
103 parser.add_argument(
'--post-ecl', action=
'store_true',
104 help=
'post the output to ECL')
105 parser.add_argument(
'--ecl-user', type=str, default=
"dunepro",
107 parser.add_argument(
'--ecl-password', type=str, help=
'ECL user password')
108 parser.add_argument(
'--ecl-subject', type=str,
109 default=
"MCC Submission Entry Subject",
110 help=
'ECL entry subject name')
111 parser.add_argument(
'--ecl-category', type=str,
112 default=
"mcc10", help=
'ECL category name')
113 parser.add_argument(
'--ecl-comment', type=str,
114 default=
"submit_mcc.py entry comment", help=
'ECL entry comment')
116 args = parser.parse_args()
119 logging.info(
"Current dir : {}".
format(cwd))
120 logging.info(
"XML file used : {}".
format(args.xml))
121 logging.info(
"Workflow json : {}".
format(args.workflow_def))
122 logging.info(
"Workflow name : {}".
format(args.workflow))
123 logging.info(
"Stage name : {}".
format(args.stage))
125 if not os.path.isfile(args.workflow_def):
126 logging.error(
"Workflow definition file is not existed. Exit now!")
127 print(log_stream.getvalue())
129 if not os.path.isfile(args.xml):
130 logging.error(
"XML file for project py is not existed. Exit now!")
131 print(log_stream.getvalue())
135 logging.warn(
"Submitting testing jobs")
137 logging.warn(
"Will not check previous stage with project.py")
139 logging.info(
"Will make an ECL entry in dunepro ECL.")
140 if not args.ecl_password:
141 logging.error(
"ECL user password must be supplied. Exiting now!")
142 print(log_stream.getvalue())
146 handle_stage(args.workflow_def, args.xml, args.workflow, args.stage,
147 args.no_check, args.test)
151 category=args.ecl_category,
152 tags=[
'MCCSubmission'],
153 formname=
'MCC Submission',
156 logging.info(
"ECL subject : {}".
format(args.ecl_subject))
158 ecl_entry.addSubject(args.ecl_subject)
159 ecl_entry.setValue(name=
"work_dir", value=cwd)
160 ecl_entry.setValue(name=
"xml_file", value=args.xml)
161 ecl_entry.setValue(name=
"workflow_file", value=args.workflow_def)
162 ecl_entry.setValue(name=
"workflow", value=args.workflow)
163 ecl_entry.setValue(name=
"stage", value=args.stage)
164 ecl_entry.setValue(name=
"comment", value=args.ecl_comment)
166 logfile = (
'log_mcc_submit_{}.txt'.
format(now))
167 with
open(logfile,
'w')
as lfile:
168 lfile.write(log_stream.getvalue())
170 ecl_entry.addAttachment(name=logfile, filename=
"{}/{}".
format(
174 password=args.ecl_password)
176 logging.info(
"Posting the entry to dunepro ECL now.")
177 response = elconn.post(ecl_entry)
178 logging.info(
"Response from ECL: {}".
format(response))
181 print(log_stream.getvalue())
int open(const char *, int)
Opens a file descriptor.
def call_project_py(xml, stg, act, test=False)
def handle_stage(workflow_def, xml, workflow, stage, nocheck=False, test=False)