EventProcessor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
29 #include "art/Utilities/Globals.h"
40 #include "cetlib/bold_fontify.h"
42 #include "cetlib/trim.h"
43 #include "fhiclcpp/ParameterSet.h"
45 #include "hep_concurrency/WaitingTask.h"
47 
48 #include <cassert>
49 #include <exception>
50 #include <functional>
51 #include <iostream>
52 #include <memory>
53 #include <string>
54 #include <utility>
55 #include <vector>
56 
57 using namespace hep::concurrency;
58 using namespace std;
59 using namespace string_literals;
61 
62 namespace art {
63 
64  namespace {
65  ServicesManager*
66  create_services_manager(ParameterSet services_pset,
67  ActivityRegistry& actReg,
68  detail::SharedResources& resources)
69  {
70  auto const fpcPSet =
71  services_pset.get<ParameterSet>("FloatingPointControl", {});
72  services_pset.erase("FloatingPointControl");
73  services_pset.erase("message");
74  services_pset.erase("scheduler");
75  auto mgr = new ServicesManager{move(services_pset), actReg, resources};
76  mgr->addSystemService<FloatingPointControl>(fpcPSet, actReg);
77  return mgr;
78  }
79 
80  std::unique_ptr<Worker>
81  maybe_trigger_results_inserter(ScheduleID const scheduleID,
82  string const& processName,
83  ParameterSet const& proc_pset,
84  ParameterSet const& trig_pset,
85  UpdateOutputCallbacks& outputCallbacks,
86  ProductDescriptions& productsToProduce,
87  ActionTable const& actions,
88  ActivityRegistry const& actReg,
89  PathsInfo& pathsInfo,
90  GlobalTaskGroup& task_group,
91  detail::SharedResources& resources)
92  {
93  if (pathsInfo.paths().empty()) {
94  return std::unique_ptr<Worker>{nullptr};
95  }
96 
97  // Make the trigger results inserter.
98  WorkerParams const wp{outputCallbacks,
99  productsToProduce,
100  actReg,
101  actions,
102  scheduleID,
103  task_group.native_group(),
104  resources};
105  ModuleDescription md{
106  trig_pset.id(),
107  "TriggerResultInserter",
108  "TriggerResults",
109  ModuleThreadingType::replicated,
110  ProcessConfiguration{processName, proc_pset.id(), getReleaseVersion()}};
111  actReg.sPreModuleConstruction.invoke(md);
112  auto producer = std::make_shared<TriggerResultInserter>(
113  trig_pset, scheduleID, pathsInfo.pathResults());
114  producer->setModuleDescription(md);
115  auto result =
116  std::make_unique<WorkerT<ReplicatedProducer>>(producer, md, wp);
117  actReg.sPostModuleConstruction.invoke(md);
118  return result;
119  }
120 
121  auto const invalid_module_context = ModuleContext::invalid();
122  }
123 
124  EventProcessor::EventProcessor(ParameterSet const& pset,
125  detail::EnabledModules const& enabled_modules)
126  : scheduler_{pset.get<ParameterSet>("services.scheduler")}
127  , scheduleIteration_{scheduler_->num_schedules()}
128  , servicesManager_{create_services_manager(
129  pset.get<ParameterSet>("services"),
130  actReg_,
132  , pathManager_{pset,
135  scheduler_->actionTable(),
136  actReg_,
137  enabled_modules}
138  , handleEmptyRuns_{scheduler_->handleEmptyRuns()}
139  , handleEmptySubRuns_{scheduler_->handleEmptySubRuns()}
140  {
141  auto services_pset = pset.get<ParameterSet>("services");
142  auto const scheduler_pset = services_pset.get<ParameterSet>("scheduler");
143  {
144  // FIXME: Signals and threads require more effort than this! A
145  // signal is delivered to only one thread, and which
146  // thread is left up to the implementation to decide. To
147  // get control we must block all signals in the main
148  // thread, create a new thread which will handle the
149  // signals we want to handle, unblock the signals in that
150  // thread only, and have it use sigwaitinfo() to suspend
151  // itselt and wait for those signals.
152  setupSignals(scheduler_pset.get<bool>("enableSigInt", true));
153  }
157  // We do this late because the floating point control word, signal
158  // masks, etc., are per-thread and inherited from the master
159  // thread, so we want to allow system services, user services, and
160  // modules to configure these things in their constructors before
161  // we let tbb create any threads. This means they cannot use tbb
162  // in their constructors, instead they must use the beginJob
163  // callout.
164  taskGroup_ = scheduler_->global_task_group();
165  // Whenever we are ready to enable ROOT's implicit MT, which is
166  // equivalent to its use of TBB, the call should be made after our
167  // own TBB task manager has been initialized.
168  // ROOT::EnableImplicitMT();
169  TDEBUG_FUNC(5) << "nschedules: " << scheduler_->num_schedules()
170  << " nthreads: " << scheduler_->num_threads();
171 
172  auto const errorOnMissingConsumes = scheduler_->errorOnMissingConsumes();
173  ConsumesInfo::instance()->setRequireConsumes(errorOnMissingConsumes);
174 
175  auto const& processName = Globals::instance()->processName();
176 
177  // Trigger-names
178  servicesManager_->addSystemService<TriggerNamesService>(
180  pset.get<ParameterSet>("physics", {}));
181 
182  // We have delayed creating the service instances, now actually
183  // create them.
184  servicesManager_->forceCreation();
185  ServiceHandle<FileCatalogMetadata>()->addMetadataString("art.process_name",
186  processName);
187 
188  // Now that the service module instances have been created we can
189  // set the callbacks, set the module description, and register the
190  // products for each service module instance.
191  ProcessConfiguration const pc{processName, pset.id(), getReleaseVersion()};
192  auto const producing_services = servicesManager_->registerProducts(
194  pathManager_->createModulesAndWorkers(
195  *taskGroup_, sharedResources_, producing_services);
196 
197  ServiceHandle<TriggerNamesService> trigger_names [[maybe_unused]];
198  auto const end = Globals::instance()->nschedules();
199  for (ScheduleID::size_type i = 0; i != end; ++i) {
200  ScheduleID const sid{i};
201 
202  // The ordering of the path results in the TriggerPathsInfo (which is used
203  // for the TriggerResults object), must be the same as that provided by
204  // the TriggerNamesService.
205  auto& trigger_paths_info = pathManager_->triggerPathsInfo(sid);
206  assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
207 
208  auto results_inserter =
209  maybe_trigger_results_inserter(sid, //
210  processName,
211  pset,
212  Globals::instance()->triggerPSet(),
213  outputCallbacks_, //
215  scheduler_->actionTable(), //
216  actReg_, //
217  trigger_paths_info, //
218  *taskGroup_, //
220  schedules_->emplace(std::piecewise_construct,
221  std::forward_as_tuple(sid),
222  std::forward_as_tuple(sid,
223  pathManager_,
224  scheduler_->actionTable(),
225  actReg_,
227  move(results_inserter),
228  *taskGroup_));
229  }
230  sharedResources_.freeze(taskGroup_->native_group());
231 
232  FDEBUG(2) << pset.to_string() << endl;
233  // The input source must be created after the end path executor
234  // because the end path executor registers a callback that must
235  // be invoked after the first input file is opened.
236  {
238  main_input.put("module_type", "EmptyEvent");
239  main_input.put("module_label", "source");
240  main_input.put("maxEvents", -1);
241  if (!pset.get_if_present("source", main_input)) {
242  mf::LogInfo("EventProcessorSourceConfig")
243  << "Could not find a source configuration: using default.";
244  }
245  ModuleDescription const md{
246  main_input.id(),
247  main_input.get<string>("module_type"),
248  main_input.get<string>("module_label"),
250  ProcessConfiguration{processName, pset.id(), getReleaseVersion()}};
252  try {
253  input_.reset(InputSourceFactory::make(main_input, isd).release());
254  }
255  catch (fhicl::detail::validationException const& e) {
257  << "\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
258  << "\nmodule_type : " << cet::bold_fontify(md.moduleName()) << "\n\n"
259  << e.what();
260  }
261  catch (Exception const& x) {
262  if (x.categoryCode() == errors::Configuration) {
263  throw Exception(errors::Configuration, "FailedInputSource")
264  << "Configuration of main input source has failed\n"
265  << x;
266  }
267  throw;
268  }
269  catch (cet::exception const& x) {
270  throw Exception(errors::Configuration, "FailedInputSource")
271  << "Configuration of main input source has failed\n"
272  << x;
273  }
274  catch (...) {
275  throw;
276  }
277  }
278  actReg_.sPostSourceConstruction.invoke(input_->moduleDescription());
279  // Create product tables used for product retrieval within modules.
282  }
283 
284  void
286  {
287  using cet::transform_all;
288  // Need to convert multiple lists of workers into a long list that
289  // the postBeginJobWorkers callbacks can understand.
290  vector<Worker*> allWorkers;
291  transform_all(pathManager_->triggerPathsInfo(ScheduleID::first()).workers(),
292  back_inserter(allWorkers),
293  [](auto const& pr) { return pr.second.get(); });
294  transform_all(pathManager_->endPathInfo(ScheduleID::first()).workers(),
295  back_inserter(allWorkers),
296  [](auto const& pr) { return pr.second.get(); });
297  actReg_.sPostBeginJobWorkers.invoke(input_, allWorkers);
298  }
299 
300  //================================================================
301  // Event-loop infrastructure
302 
303  template <Level L>
304  bool
306  {
307  if (nextLevel_.load() == Level::ReadyToAdvance) {
309  // Consider reading right here?
310  }
311  if (nextLevel_.load() == L) {
313  if (main_schedule().outputsToClose()) {
315  finalizeContainingLevels<L>();
317  }
318  return true;
319  } else if (nextLevel_.load() < L) {
320  return false;
321  } else if (nextLevel_.load() == highest_level()) {
322  return false;
323  }
324  throw Exception{errors::LogicError} << "Incorrect level hierarchy.\n"
325  << " Current level: " << L
326  << " Next level: " << nextLevel_;
327  }
328 
329  // Specializations for process function template
330 
331  template <>
332  inline void
333  EventProcessor::begin<Level::Job>()
334  {
335  timer_->start();
336  beginJob();
337  }
338 
339  template <>
340  inline void
341  EventProcessor::begin<Level::InputFile>()
342  {
343  openInputFile();
344  }
345 
346  template <>
347  void
348  EventProcessor::begin<Level::Run>()
349  {
350  readRun();
351 
352  // We only enable run finalization if reading was successful.
353  // This appears to be a design weakness.
354  finalizeRunEnabled_ = true;
355  if (handleEmptyRuns_) {
356  beginRun();
357  }
358  }
359 
360  template <>
361  void
362  EventProcessor::begin<Level::SubRun>()
363  {
364  assert(runPrincipal_);
365  assert(runPrincipal_->runID().isValid());
366  readSubRun();
367 
368  // We only enable subrun finalization if reading was successful.
369  // This appears to be a design weakness.
370  finalizeSubRunEnabled_ = true;
371  if (handleEmptySubRuns_) {
373  beginSubRun();
374  }
375  }
376 
377  template <>
378  void
379  EventProcessor::finalize<Level::SubRun>()
380  {
381  if (!finalizeSubRunEnabled_) {
382  return;
383  }
384 
385  assert(subRunPrincipal_);
386  if (subRunPrincipal_->subRunID().isFlush()) {
387  return;
388  }
389 
392  if (beginSubRunCalled_) {
393  endSubRun();
394  }
395  writeSubRun();
396  finalizeSubRunEnabled_ = false;
397  }
398 
399  template <>
400  void
401  EventProcessor::finalize<Level::Run>()
402  {
403  if (!finalizeRunEnabled_) {
404  return;
405  }
406 
407  assert(runPrincipal_);
408  if (runPrincipal_->runID().isFlush()) {
409  return;
410  }
411 
414  if (beginRunCalled_) {
415  endRun();
416  }
417  writeRun();
418  finalizeRunEnabled_ = false;
419  }
420 
421  template <>
422  void
423  EventProcessor::finalize<Level::InputFile>()
424  {
425  if (nextLevel_.load() == Level::Job) {
426  closeAllFiles();
427  } else {
428  closeInputFile();
429  }
430  }
431 
432  template <>
433  void
434  EventProcessor::finalize<Level::Job>()
435  {
436  endJob();
437  timer_->stop();
438  }
439 
440  template <>
441  void
442  EventProcessor::finalizeContainingLevels<Level::SubRun>()
443  {
444  finalize<Level::Run>();
445  }
446 
447  template <>
448  void
449  EventProcessor::finalizeContainingLevels<Level::Event>()
450  {
451  finalize<Level::SubRun>();
452  finalize<Level::Run>();
453  }
454 
455  template <>
456  void
457  EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
458  {
460  }
461 
462  template <>
463  void
464  EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
465  {
467  }
468 
469  template <>
470  void
471  EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
472  {
474  }
475 
476  //=============================================
477  // Job level
478 
479  void
481  {
482  FDEBUG(1) << string(8, ' ') << "beginJob\n";
484  // NOTE: This implementation assumes 'Job' means one call the
485  // EventProcessor::run. If it really means once per 'application'
486  // then this code will have to be changed. Also have to deal with
487  // case where have 'run' then new Module added and do 'run' again.
488  // In that case the newly added Module needs its 'beginJob' to be
489  // called.
490  try {
491  input_->doBeginJob();
492  }
493  catch (cet::exception& e) {
494  mf::LogError("BeginJob") << "A cet::exception happened while processing"
495  " the beginJob of the 'source'\n";
496  e << "A cet::exception happened while processing"
497  " the beginJob of the 'source'\n";
498  throw;
499  }
500  catch (exception const&) {
501  mf::LogError("BeginJob") << "A exception happened while processing"
502  " the beginJob of the 'source'\n";
503  throw;
504  }
505  catch (...) {
506  mf::LogError("BeginJob") << "An unknown exception happened while"
507  " processing the beginJob of the 'source'\n";
508  throw;
509  }
512  });
513  actReg_.sPostBeginJob.invoke();
515  }
516 
517  void
519  {
520  FDEBUG(1) << string(8, ' ') << "endJob\n";
521  ec_->call([this] { endJobAllSchedules(); });
522  ec_->call([] { ConsumesInfo::instance()->showMissingConsumes(); });
523  ec_->call([this] { input_->doEndJob(); });
524  ec_->call([this] { actReg_.sPostEndJob.invoke(); });
525  ec_->call([] { mf::LogStatistics(); });
526  ec_->call([this] {
528  });
529  }
530 
531  void
533  {
535  [this](ScheduleID const sid) { schedule(sid).endJob(); });
536  }
537 
538  //====================================================
539  // File level
540 
541  void
543  {
544  actReg_.sPreOpenFile.invoke();
545  FDEBUG(1) << string(8, ' ') << "openInputFile\n";
546  fb_.reset(input_->readFile().release());
547  if (fb_ == nullptr) {
549  << "Source readFile() did not return a valid FileBlock: FileBlock "
550  << "should be valid or readFile() should throw.\n";
551  }
552  actReg_.sPostOpenFile.invoke(fb_->fileName());
554  }
555 
556  void
558  {
560  closeInputFile();
561  }
562 
563  void
565  {
567  // Output-file closing on input-file boundaries are tricky since
568  // input files must outlive the output files, which often have
569  // data copied forward from the input files. That's why the
570  // recordOutputClosureRequests call is made here instead of in a
571  // specialization of recordOutputModuleClosureRequests<>.
573  if (main_schedule().outputsToClose()) {
575  }
577  actReg_.sPreCloseFile.invoke();
578  input_->closeFile();
579  actReg_.sPostCloseFile.invoke();
580  FDEBUG(1) << string(8, ' ') << "closeInputFile\n";
581  }
582 
583  void
585  {
586  if (!main_schedule().someOutputsOpen()) {
587  return;
588  }
591  FDEBUG(1) << string(8, ' ') << "closeAllOutputFiles\n";
592  }
593 
594  bool
596  {
597  bool outputs_to_open{false};
598  auto check_outputs_to_open = [this,
599  &outputs_to_open](ScheduleID const sid) {
600  if (schedule(sid).outputsToOpen()) {
601  outputs_to_open = true;
602  }
603  };
604  scheduleIteration_.for_each_schedule(check_outputs_to_open);
605  return outputs_to_open;
606  }
607 
608  void
610  {
611  if (!outputsToOpen()) {
612  return;
613  }
614 
615  auto open_some_outputs = [this](ScheduleID const sid) {
617  };
618  scheduleIteration_.for_each_schedule(open_some_outputs);
619 
620  FDEBUG(1) << string(8, ' ') << "openSomeOutputFiles\n";
622  }
623 
624  void
626  {
628  FDEBUG(1) << string(8, ' ') << "setOutputFileStatus\n";
629  }
630 
631  void
633  {
634  // Precondition: there are SOME output files that have been
635  // flagged as needing to close. Otherwise,
636  // 'respondtoCloseOutputFiles' will be needlessly
637  // called.
638  assert(main_schedule().outputsToClose());
641  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
642  }
643 
644  void
646  {
649  });
650  FDEBUG(1) << string(8, ' ') << "respondToOpenInputFile\n";
651  }
652 
653  void
655  {
658  });
659  FDEBUG(1) << string(8, ' ') << "respondToCloseInputFile\n";
660  }
661 
662  void
664  {
667  });
668  FDEBUG(1) << string(8, ' ') << "respondToOpenOutputFiles\n";
669  }
670 
671  void
673  {
676  });
677  FDEBUG(1) << string(8, ' ') << "respondToCloseOutputFiles\n";
678  }
679 
680  //=============================================
681  // Run level
682 
683  void
685  {
686  actReg_.sPreSourceRun.invoke();
687  runPrincipal_.reset(input_->readRun().release());
688  assert(runPrincipal_);
689  auto rsh = input_->runRangeSetHandler();
690  assert(rsh);
691  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
692  schedule(sid).seedRunRangeSet(*rsh);
693  };
694  scheduleIteration_.for_each_schedule(seed_range_set);
695  // The intended behavior here is that the producing services which
696  // are called during the sPostReadRun cannot see each others put
697  // products. We enforce this by creating the groups for the
698  // produced products, but do not allow the lookups to find them
699  // until after the callbacks have run.
700  runPrincipal_->createGroupsForProducedProducts(
702  psSignals_->sPostReadRun.invoke(*runPrincipal_);
703  runPrincipal_->enableLookupOfProducedProducts(producedProductLookupTables_);
704  {
706  actReg_.sPostSourceRun.invoke(r);
707  }
708  FDEBUG(1) << string(8, ' ') << "readRun.....................("
709  << runPrincipal_->runID() << ")\n";
710  }
711 
712  void
714  {
715  assert(runPrincipal_);
716  RunID const r{runPrincipal_->runID()};
717  if (r.isFlush()) {
718  return;
719  }
720  finalizeRunEnabled_ = true;
721  try {
722  {
724  actReg_.sPreBeginRun.invoke(run);
725  }
728  });
729  {
731  actReg_.sPostBeginRun.invoke(run);
732  }
733  }
734  catch (cet::exception& ex) {
735  throw Exception{
737  "EventProcessor: an exception occurred during current event processing",
738  ex};
739  }
740  catch (...) {
741  mf::LogError("PassingThrough")
742  << "an exception occurred during current event processing\n";
743  throw;
744  }
745  FDEBUG(1) << string(8, ' ') << "beginRun....................(" << r
746  << ")\n";
747  beginRunCalled_ = true;
748  }
749 
750  void
752  {
753  if (!beginRunCalled_) {
754  beginRun();
755  }
756  }
757 
758  void
760  {
761  assert(runPrincipal_);
762  FDEBUG(1) << string(8, ' ') << "setRunAuxiliaryRangeSetID...("
763  << runPrincipal_->runID() << ")\n";
764  if (main_schedule().runRangeSetHandler().type() ==
766  // We are using EmptyEvent source, need to merge what the
767  // schedules have seen.
768  RangeSet mergedRS;
769  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
770  auto const& rs = schedule(sid).runRangeSetHandler().seenRanges();
771  // The following constructor ensures that the range is sorted
772  // before 'merge' is called.
773  RangeSet const tmp{rs.run(), rs.ranges()};
774  mergedRS.merge(tmp);
775  };
776  scheduleIteration_.for_each_schedule(merge_range_sets);
777  runPrincipal_->updateSeenRanges(mergedRS);
778  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
779  schedule(sid).setRunAuxiliaryRangeSetID(mergedRS);
780  };
781  scheduleIteration_.for_each_schedule(update_executors);
782  return;
783  }
784 
785  // Since we are using already existing ranges, all the range set
786  // handlers have the same ranges, use the first one. handler with
787  // the largest event number, that will be the one which we will
788  // use as the file switch boundary. Note that is may not match
789  // the exactly the schedule that triggered the switch. Do we need
790  // to fix this?
791  unique_ptr<RangeSetHandler> rshAtSwitch{
793  if (main_schedule().fileStatus() != OutputFileStatus::Switching) {
794  // We are at the end of the job.
795  rshAtSwitch->flushRanges();
796  }
797  runPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
798  main_schedule().setRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
799  }
800 
801  void
803  {
804  assert(runPrincipal_);
805  // Precondition: The RunID does not correspond to a flush ID. --
806  // N.B. The flush flag is not explicitly checked here since endRun
807  // is only called from finalizeRun, which is where the check
808  // happens.
809  RunID const run{runPrincipal_->runID()};
810  assert(!run.isFlush());
811  try {
812  actReg_.sPreEndRun.invoke(runPrincipal_->runID(),
813  runPrincipal_->endTime());
816  });
818  actReg_.sPostEndRun.invoke(r);
819  }
820  catch (cet::exception& ex) {
821  throw Exception{
823  "EventProcessor: an exception occurred during current event processing",
824  ex};
825  }
826  catch (...) {
827  mf::LogError("PassingThrough")
828  << "an exception occurred during current event processing\n";
829  throw;
830  }
831  FDEBUG(1) << string(8, ' ') << "endRun......................(" << run
832  << ")\n";
833  beginRunCalled_ = false;
834  }
835 
836  void
838  {
839  assert(runPrincipal_);
840  // Precondition: The RunID does not correspond to a flush ID.
841  RunID const r{runPrincipal_->runID()};
842  assert(!r.isFlush());
844  FDEBUG(1) << string(8, ' ') << "writeRun....................(" << r
845  << ")\n";
846  }
847 
848  //=============================================
849  // SubRun level
850 
851  void
853  {
854  actReg_.sPreSourceSubRun.invoke();
855  subRunPrincipal_.reset(input_->readSubRun(runPrincipal_.get()).release());
856  assert(subRunPrincipal_);
857  auto rsh = input_->subRunRangeSetHandler();
858  assert(rsh);
859  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
860  schedule(sid).seedSubRunRangeSet(*rsh);
861  };
862  scheduleIteration_.for_each_schedule(seed_range_set);
863  // The intended behavior here is that the producing services which
864  // are called during the sPostReadSubRun cannot see each others
865  // put products. We enforce this by creating the groups for the
866  // produced products, but do not allow the lookups to find them
867  // until after the callbacks have run.
868  subRunPrincipal_->createGroupsForProducedProducts(
870  psSignals_->sPostReadSubRun.invoke(*subRunPrincipal_);
871  subRunPrincipal_->enableLookupOfProducedProducts(
873  {
875  actReg_.sPostSourceSubRun.invoke(sr);
876  }
877  FDEBUG(1) << string(8, ' ') << "readSubRun..................("
878  << subRunPrincipal_->subRunID() << ")\n";
879  }
880 
881  void
883  {
884  assert(subRunPrincipal_);
885  SubRunID const sr{subRunPrincipal_->subRunID()};
886  if (sr.isFlush()) {
887  return;
888  }
889  finalizeSubRunEnabled_ = true;
890  try {
891  {
893  actReg_.sPreBeginSubRun.invoke(srun);
894  }
897  });
898  {
900  actReg_.sPostBeginSubRun.invoke(srun);
901  }
902  }
903  catch (cet::exception& ex) {
904  throw Exception{
906  "EventProcessor: an exception occurred during current event processing",
907  ex};
908  }
909  catch (...) {
910  mf::LogError("PassingThrough")
911  << "an exception occurred during current event processing\n";
912  throw;
913  }
914  FDEBUG(1) << string(8, ' ') << "beginSubRun.................(" << sr
915  << ")\n";
916  beginSubRunCalled_ = true;
917  }
918 
919  void
921  {
922  if (!beginSubRunCalled_) {
923  beginSubRun();
924  }
925  }
926 
927  void
929  {
930  assert(subRunPrincipal_);
931  FDEBUG(1) << string(8, ' ') << "setSubRunAuxiliaryRangeSetID("
932  << subRunPrincipal_->subRunID() << ")\n";
933  if (main_schedule().subRunRangeSetHandler().type() ==
935  // We are using EmptyEvent source, need to merge what the
936  // schedules have seen.
937  RangeSet mergedRS;
938  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
939  auto const& rs = schedule(sid).subRunRangeSetHandler().seenRanges();
940  // The following constructor ensures that the range is sorted
941  // before 'merge' is called.
942  RangeSet const tmp{rs.run(), rs.ranges()};
943  mergedRS.merge(tmp);
944  };
945  scheduleIteration_.for_each_schedule(merge_range_sets);
946  subRunPrincipal_->updateSeenRanges(mergedRS);
947  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
948  schedule(sid).setSubRunAuxiliaryRangeSetID(mergedRS);
949  };
950  scheduleIteration_.for_each_schedule(update_executors);
951  return;
952  }
953  // Ranges are split/flushed only for a RangeSetHandler whose
954  // dynamic type is 'ClosedRangeSetHandler'.
955  //
956  // Consider the following range-sets
957  //
958  // SubRun RangeSet:
959  //
960  // { Run 1 : SubRun 1 : Events [1,7) } <-- Current
961  //
962  // Run RangeSet:
963  //
964  // { Run 1 : SubRun 0 : Events [5,11)
965  // SubRun 1 : Events [1,7) <-- Current
966  // SubRun 1 : Events [9,15) }
967  //
968  // For a range split just before SubRun 1, Event 6, the
969  // range sets should become:
970  //
971  // SubRun RangeSet:
972  //
973  // { Run 1 : SubRun 1 : Events [1,6)
974  // SubRun 1 : Events [6,7) } <-- Updated
975  //
976  // Run RangeSet:
977  //
978  // { Run 1 : SubRun 0 : Events [5,11)
979  // SubRun 1 : Events [1,6)
980  // SubRun 1 : Events [6,7) <-- Updated
981  // SubRun 1 : Events [9,15) }
982  //
983  // Since we are using already existing ranges, all the range set
984  // handlers have the same ranges. Find the closed range set
985  // handler with the largest event number, that will be the one
986  // which we will use as the file switch boundary. Note that is
987  // may not match the exactly the schedule that triggered the
988  // switch. Do we need to fix this?
989  //
990  // If we do not find any handlers with valid event info then we
991  // use the first one, which is just fine. This happens for
992  // example when we are dropping all events.
993  unsigned largestEvent = 1U;
994  ScheduleID idxOfMax{ScheduleID::first()};
997  auto& rsh = dynamic_cast<ClosedRangeSetHandler const&>(val);
998  // Make sure the event number is a valid event number before using
999  // it. It can be invalid in the handler if we have not yet read an
1000  // event, which happens with empty subruns and when we are
1001  // dropping all events.
1002  if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
1003  if (rsh.eventInfo().id().event() > largestEvent) {
1004  largestEvent = rsh.eventInfo().id().event();
1005  idxOfMax = idx;
1006  }
1007  }
1008  idx = idx.next();
1009 
1010  unique_ptr<RangeSetHandler> rshAtSwitch{
1012  if (main_schedule().fileStatus() == OutputFileStatus::Switching) {
1013  rshAtSwitch->maybeSplitRange();
1014  unique_ptr<RangeSetHandler> runRSHAtSwitch{
1015  schedule(idxOfMax).runRangeSetHandler().clone()};
1016  runRSHAtSwitch->maybeSplitRange();
1017  main_schedule().seedRunRangeSet(*runRSHAtSwitch);
1018  } else {
1019  // We are at the end of the job.
1020  rshAtSwitch->flushRanges();
1021  }
1022  main_schedule().seedSubRunRangeSet(*rshAtSwitch);
1023  subRunPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
1024  main_schedule().setSubRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
1025  }
1026 
1027  void
1029  {
1030  assert(subRunPrincipal_);
1031  // Precondition: The SubRunID does not correspond to a flush ID.
1032  // Note: the flush flag is not explicitly checked here since
1033  // endSubRun is only called from finalizeSubRun, which is where the
1034  // check happens.
1035  SubRunID const sr{subRunPrincipal_->subRunID()};
1036  assert(!sr.isFlush());
1037  try {
1038  actReg_.sPreEndSubRun.invoke(subRunPrincipal_->subRunID(),
1039  subRunPrincipal_->endTime());
1040  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
1042  });
1044  actReg_.sPostEndSubRun.invoke(srun);
1045  }
1046  catch (cet::exception& ex) {
1047  throw Exception{
1049  "EventProcessor: an exception occurred during current event processing",
1050  ex};
1051  }
1052  catch (...) {
1053  mf::LogError("PassingThrough")
1054  << "an exception occurred during current event processing\n";
1055  throw;
1056  }
1057  FDEBUG(1) << string(8, ' ') << "endSubRun...................(" << sr
1058  << ")\n";
1059  beginSubRunCalled_ = false;
1060  }
1061 
1062  void
1064  {
1065  assert(subRunPrincipal_);
1066  // Precondition: The SubRunID does not correspond to a flush ID.
1067  SubRunID const& sr{subRunPrincipal_->subRunID()};
1068  assert(!sr.isFlush());
1070  FDEBUG(1) << string(8, ' ') << "writeSubRun.................(" << sr
1071  << ")\n";
1072  }
1073 
1074  // ==============================================================================
1075  // Event level
1076 
1077  template <>
1078  void
1079  EventProcessor::process<most_deeply_nested_level()>()
1080  {
1081  if ((shutdown_flag > 0) || !ec_->empty()) {
1082  return;
1083  }
1084  // Note: This loop is to allow output file switching to happen in
1085  // the main thread.
1086  firstEvent_ = true;
1087  bool done = false;
1088  while (!done) {
1091 
1092  auto const last_schedule_index = scheduler_->num_schedules() - 1;
1093  for (ScheduleID::size_type i = 0; i != last_schedule_index; ++i) {
1094  taskGroup_->run([this, i] { processAllEventsAsync(ScheduleID(i)); });
1095  }
1096  taskGroup_->native_group().run_and_wait([this, last_schedule_index] {
1097  processAllEventsAsync(ScheduleID(last_schedule_index));
1098  });
1099 
1100  // If anything bad happened during event processing, let the
1101  // user know.
1103  if (!fileSwitchInProgress_.load()) {
1104  done = true;
1105  continue;
1106  }
1108  finalizeContainingLevels<most_deeply_nested_level()>();
1111  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
1112  // We started the switch after advancing to the next item type;
1113  // we must make sure that we read that event before advancing
1114  // the item type again.
1115  firstEvent_ = true;
1116  fileSwitchInProgress_ = false;
1117  }
1118  }
1119 
1120  // This is the event loop (also known as the schedule head). It
1121  // calls readAndProcessAsync, which reads and processes a single
1122  // event, creates itself again as a continuation task, and then
1123  // exits.
1124  void
1126  {
1127  // Note: We are part of the processAllEventsTask (schedule head
1128  // task), and our parent is the eventLoopTask.
1129  TDEBUG_BEGIN_FUNC_SI(4, sid);
1130  try {
1131  readAndProcessAsync(sid);
1132  }
1133  catch (...) {
1135  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1136  return;
1137  }
1138  // If no exception, then end this task, which does not terminate
1139  // event processing because readAndProcessAsync creates a
1140  // continuation task.
1141  TDEBUG_END_FUNC_SI(4, sid);
1142  }
1143 
1144  // This function is executed as part of the readAndProcessEvent
1145  // task, our parent task is the EventLoopTask. Here we advance to
1146  // the next item in the file index, end event processing if it is
1147  // not an event, or if the user has requested a shutdown, read the
1148  // event, and then call another function to do the processing.
1149  void
1151  {
1152  // Note: We are part of the readAndProcessEventTask (schedule head
1153  // task), and our parent task is the eventLoopTask.
1154  TDEBUG_BEGIN_FUNC_SI(4, sid);
1155  // Note: shutdown_flag is a extern global atomic int in
1156  // art/art/Utilities/UnixSignalHandlers.cc
1157  if (shutdown_flag) {
1158  // User called for a clean shutdown using a signal or ctrl-c,
1159  // end event processing and this task.
1160  TDEBUG_END_FUNC_SI(4, sid) << "CLEAN SHUTDOWN";
1161  return;
1162  }
1163 
1164  // The item type advance and the event read must be done with the
1165  // input source lock held; however event-processing must not
1166  // serialized.
1167  {
1168  InputSourceMutexSentry lock_input;
1169  if (fileSwitchInProgress_.load()) {
1170  // We must avoid advancing the iterator after a schedule has
1171  // noticed it is time to switch files. After the switch, we
1172  // will need to set firstEvent_ true so that the first
1173  // schedule that resumes after the switch actually reads the
1174  // event that the first schedule which noticed we needed a
1175  // switch had advanced the iterator to.
1176 
1177  // Note: We still have the problem that because the schedules
1178  // do not read events at the same time the file switch point
1179  // can be up to nschedules-1 ahead of where it would have been
1180  // if there was only one schedule. If we are switching output
1181  // files every event in an attempt to create single event
1182  // files, this really does not work out too well.
1183  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH";
1184  return;
1185  }
1186  // Check the next item type and exit this task if it is not an
1187  // event, or if the user has asynchronously requested a
1188  // shutdown.
1189  auto expected = true;
1190  if (firstEvent_.compare_exchange_strong(expected, false)) {
1191  // Do not advance the item type on the first event.
1192  } else {
1193  // Do the advance item type.
1194  if (nextLevel_.load() == Level::ReadyToAdvance) {
1195  // See what the next item is.
1196  TDEBUG_FUNC_SI(5, sid) << "Calling advanceItemType()";
1198  }
1199  if ((nextLevel_.load() < most_deeply_nested_level()) ||
1200  (nextLevel_.load() == highest_level())) {
1201  // We are popping up, end event processing and this task.
1202  TDEBUG_END_FUNC_SI(4, sid) << "END OF SUBRUN";
1203  return;
1204  }
1205  if (nextLevel_.load() != most_deeply_nested_level()) {
1206  // Error: incorrect level hierarchy
1207  TDEBUG_END_FUNC_SI(4, sid) << "BAD HIERARCHY";
1208  throw Exception{errors::LogicError} << "Incorrect level hierarchy.";
1209  }
1211  // At this point we have determined that we are going to read
1212  // an event and we must do that before dropping the lock on
1213  // the input source which is what is protecting us against a
1214  // double-advance caused by a different schedule.
1215  if (schedule(sid).outputsToClose()) {
1216  fileSwitchInProgress_ = true;
1217  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH INITIATED";
1218  return;
1219  }
1220  }
1221 
1222  // Now we can read the event from the source.
1223  ScheduleContext const sc{sid};
1224  assert(subRunPrincipal_);
1225  assert(subRunPrincipal_->subRunID().isValid());
1226  actReg_.sPreSourceEvent.invoke(sc);
1227  TDEBUG_FUNC_SI(5, sid) << "Calling input_->readEvent(subRunPrincipal_)";
1228  auto ep = input_->readEvent(subRunPrincipal_.get());
1229  assert(ep);
1230  // The intended behavior here is that the producing services
1231  // which are called during the sPostReadEvent cannot see each
1232  // others put products. We enforce this by creating the groups
1233  // for the produced products, but do not allow the lookups to
1234  // find them until after the callbacks have run.
1235  ep->createGroupsForProducedProducts(producedProductLookupTables_);
1236  psSignals_->sPostReadEvent.invoke(*ep);
1237  ep->enableLookupOfProducedProducts(producedProductLookupTables_);
1239  FDEBUG(1) << string(8, ' ') << "readEvent...................("
1240  << ep->eventID() << ")\n";
1241  schedule(sid).accept_principal(move(ep));
1242  // Now we drop the input source lock by exiting the guarded
1243  // scope.
1244  }
1245  if (schedule(sid).event_principal().eventID().isFlush()) {
1246  // No processing to do, start next event handling task.
1247  processAllEventsAsync(sid);
1248  TDEBUG_END_FUNC_SI(4, sid) << "FLUSH EVENT";
1249  return;
1250  }
1251 
1252  // Now process the event.
1253  processEventAsync(sid);
1254  TDEBUG_END_FUNC_SI(4, sid);
1255  }
1256 
1257  // ----------------------------------------------------------------------------
1259  public:
1261  : evp_{evp}, sid_{sid}
1262  {}
1263 
1264  void
1265  operator()(std::exception_ptr ex) const
1266  {
1267  TDEBUG_BEGIN_TASK_SI(4, sid_);
1268  if (ex) {
1269  try {
1270  rethrow_exception(ex);
1271  }
1272  catch (cet::exception& e) {
1273  if (evp_->error_action(e) != actions::IgnoreCompletely) {
1274  evp_->sharedException_.store<Exception>(
1276  "EventProcessor: an exception occurred during current "
1277  "event processing",
1278  e);
1279  TDEBUG_END_TASK_SI(4, sid_);
1280  return;
1281  }
1282  mf::LogWarning(e.category())
1283  << "exception being ignored for current event:\n"
1284  << cet::trim_right_copy(e.what(), " \n");
1285  // WARNING: We continue processing after the catch blocks!!!
1286  }
1287  catch (...) {
1288  mf::LogError("PassingThrough")
1289  << "an exception occurred during current event processing\n";
1290  evp_->sharedException_.store_current();
1291  TDEBUG_END_TASK_SI(4, sid_);
1292  return;
1293  }
1294  }
1295 
1296  evp_->finishEventAsync(sid_);
1297 
1298  TDEBUG_END_TASK_SI(4, sid_);
1299  }
1300 
1301  private:
1304  };
1305 
1306  // ----------------------------------------------------------------------------
1308  public:
1310  : evp_{evp}, sid_{sid}
1311  {}
1312 
1313  void
1314  operator()(exception_ptr const ex)
1315  {
1316  // Note: When we start our parent is the eventLoopTask.
1317  TDEBUG_BEGIN_TASK_SI(4, sid_);
1318  if (ex) {
1319  try {
1320  rethrow_exception(ex);
1321  }
1322  catch (cet::exception& e) {
1323  if (evp_->error_action(e) != actions::IgnoreCompletely) {
1324  evp_->sharedException_.store<Exception>(
1326  "EventProcessor: an exception occurred during current "
1327  "event processing",
1328  e);
1329  TDEBUG_END_TASK_SI(4, sid_)
1330  << "terminate event loop because of EXCEPTION";
1331  return;
1332  }
1333  mf::LogWarning(e.category())
1334  << "exception being ignored for current event:\n"
1335  << cet::trim_right_copy(e.what(), " \n");
1336  // WARNING: We continue processing after the catch blocks!!!
1337  }
1338  catch (...) {
1339  mf::LogError("PassingThrough")
1340  << "an exception occurred during current event processing\n";
1341  evp_->sharedException_.store_current();
1342  TDEBUG_END_TASK_SI(4, sid_)
1343  << "terminate event loop because of EXCEPTION";
1344  return;
1345  }
1346  }
1347 
1348  auto finalize_event_task =
1349  make_waiting_task<EndPathRunnerTask>(evp_, sid_);
1350  try {
1351  evp_->schedule(sid_).process_event_observers(finalize_event_task);
1352  }
1353  catch (cet::exception& e) {
1354  if (evp_->error_action(e) != actions::IgnoreCompletely) {
1355  evp_->sharedException_.store<Exception>(
1357  "EventProcessor: an exception occurred during current event "
1358  "processing",
1359  e);
1360  TDEBUG_END_TASK_SI(4, sid_)
1361  << "terminate event loop because of EXCEPTION";
1362  return;
1363  }
1364  mf::LogWarning(e.category())
1365  << "exception being ignored for current event:\n"
1366  << cet::trim_right_copy(e.what(), " \n");
1367  // WARNING: We continue processing after the catch blocks!!!
1368  }
1369  catch (...) {
1370  mf::LogError("PassingThrough")
1371  << "an exception occurred during current event processing\n";
1372  evp_->sharedException_.store_current();
1373  TDEBUG_END_TASK_SI(4, sid_)
1374  << "terminate event loop because of EXCEPTION";
1375  return;
1376  }
1377 
1378  // Once the end path processing is done, exit this task, which
1379  // does not end event-processing because of the continuation
1380  // task.
1381  TDEBUG_END_TASK_SI(4, sid_);
1382  }
1383 
1384  private:
1387  };
1388 
1389  // This function is a continuation of the body of the
1390  // readAndProcessEvent task. Here we call down to Schedule to do the
1391  // trigger path processing, passing it a waiting task which will do
1392  // the end path processing, finalize the event, and start the next
1393  // read and process event task. Note that Schedule will spawn a
1394  // task to process each of the trigger paths, and then when they are
1395  // finished, insert the trigger results, and then spawn the waiting
1396  // task we gave it to do the end path processing, write the event,
1397  // and then start the next event processing task.
1398  void
1400  // Note: We are part of the readAndProcessEventTask (schedule head
1401  // task), and our parent task is the eventLoopTask.
1402  TDEBUG_BEGIN_FUNC_SI(4, sid);
1403  assert(!schedule(sid).event_principal().eventID().isFlush());
1404  // Continue processing via the creation of a continuation.
1405  auto endPathTask = make_waiting_task<EndPathTask>(this, sid);
1406  // Start the trigger paths running. When they finish they will
1407  // spawn the endPathTask which will run the end path, write the
1408  // event, and start the next event processing task.
1409  schedule(sid).process_event_modifiers(endPathTask);
1410  TDEBUG_END_FUNC_SI(4, sid);
1411  }
1412  catch (cet::exception& e) {
1413  // Upon exiting this scope, end this task, terminating event
1414  // processing.
1418  "EventProcessor: an exception occurred during current event processing",
1419  e);
1420  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1421  return;
1422  }
1423  mf::LogWarning(e.category())
1424  << "exception being ignored for current event:\n"
1425  << cet::trim_right_copy(e.what(), " \n");
1426  TDEBUG_END_FUNC_SI(4, sid) << "Ignoring exception.";
1427  }
1428  catch (...) {
1429  mf::LogError("PassingThrough")
1430  << "an exception occurred during current event processing\n";
1432  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1433  }
1434 
1435  void
1437  {
1438  auto& ep = schedule(sid).event_principal();
1440  ScheduleContext{sid});
1441 
1442  // Note: We are part of the endPathTask.
1443  TDEBUG_BEGIN_FUNC_SI(4, sid);
1444  FDEBUG(1) << string(8, ' ') << "processEvent................("
1445  << ep.eventID() << ")\n";
1446  try {
1447  // Ask the output workers if they have reached their limits, and
1448  // if so setup to end the job the next time around the event
1449  // loop.
1450  FDEBUG(1) << string(8, ' ') << "shouldWeStop\n";
1451  TDEBUG_FUNC_SI(5, sid) << "Calling schedules_->allAtLimit()";
1452  static std::mutex m;
1453  std::lock_guard sentry{m};
1454  if (schedule(sid).allAtLimit()) {
1455  // Set to return to the File level.
1457  }
1458  // Now we can write the results of processing to the outputs,
1459  // and delete the event principal.
1460  if (!ep.eventID().isFlush()) {
1461  // Possibly open new output files. This is safe to do because
1462  // EndPathExecutor functions are called in a serialized
1463  // context.
1464  TDEBUG_FUNC_SI(5, sid) << "Calling openSomeOutputFiles()";
1466  TDEBUG_FUNC_SI(5, sid) << "Calling schedule(sid).writeEvent()";
1467 
1468  auto const id = ep.eventID();
1469  schedule(sid).writeEvent();
1470  FDEBUG(1) << string(8, ' ') << "writeEvent..................(" << id
1471  << ")\n";
1472  }
1473  TDEBUG_FUNC_SI(5, sid)
1474  << "Calling schedules_->"
1475  "recordOutputClosureRequests(Granularity::Event)";
1477  }
1478  catch (cet::exception& e) {
1482  "EventProcessor: an exception occurred "
1483  "during current event processing",
1484  e);
1485  // And then end this task, terminating event processing.
1486  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1487  return;
1488  }
1489  mf::LogWarning(e.category())
1490  << "exception being ignored for current event:\n"
1491  << cet::trim_right_copy(e.what(), " \n");
1492  // WARNING: We continue processing after the catch blocks!!!
1493  }
1494  catch (...) {
1495  mf::LogError("PassingThrough")
1496  << "an exception occurred during current event processing\n";
1498  // And then end this task, terminating event processing.
1499  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1500  return;
1501  }
1502 
1503  // The next event processing task is a continuation of this task.
1504  processAllEventsAsync(sid);
1505  TDEBUG_END_FUNC_SI(4, sid);
1506  }
1507 
1508  template <Level L>
1509  void
1511  {
1512  if ((shutdown_flag > 0) || !ec_->empty()) {
1513  return;
1514  }
1515  ec_->call([this] { begin<L>(); });
1516  while ((shutdown_flag == 0) && ec_->empty() &&
1517  levelsToProcess<level_down(L)>()) {
1518  ec_->call([this] { process<level_down(L)>(); });
1519  }
1520  ec_->call([this] {
1521  finalize<L>();
1522  recordOutputModuleClosureRequests<L>();
1523  });
1524  }
1525 
1528  {
1529  StatusCode returnCode{epSuccess};
1530  ec_->call([this, &returnCode] {
1531  process<highest_level()>();
1532  if (shutdown_flag > 0) {
1533  returnCode = epSignal;
1534  }
1535  });
1536  if (!ec_->empty()) {
1538  ec_->rethrow();
1539  }
1540  return returnCode;
1541  }
1542 
1543  Level
1545  {
1546  auto const itemType = input_->nextItemType();
1547  FDEBUG(1) << string(4, ' ') << "*** nextItemType: " << itemType << " ***\n";
1548  switch (itemType) {
1549  case input::IsStop:
1550  return highest_level();
1551  case input::IsFile:
1552  return Level::InputFile;
1553  case input::IsRun:
1554  return Level::Run;
1555  case input::IsSubRun:
1556  return Level::SubRun;
1557  case input::IsEvent:
1558  return Level::Event;
1559  case input::IsInvalid:
1561  << "Invalid next item type presented to the event processor.\n"
1562  << "Please contact artists@fnal.gov.";
1563  }
1565  << "Unrecognized next item type presented to the event processor.\n"
1566  << "Please contact artists@fnal.gov.";
1567  }
1568 
1569  // ===============================================================================
1570 
1571  void
1573  if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
1574  ServiceHandle<RandomNumberGenerator>()->saveToFile_();
1575  }
1576  }
1577  catch (...) {
1578  }
1579 
1580 } // namespace art
void readAndProcessAsync(ScheduleID sid)
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
void endJob()
Definition: Schedule.cc:49
void closeAllOutputFiles()
Definition: Schedule.h:86
void respondToCloseOutputFiles(FileBlock const &)
Definition: Schedule.cc:77
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:168
void respondToCloseInputFile(FileBlock const &)
Definition: Schedule.cc:63
main_input
CNN definition ############################.
Definition: train_cnn.py:84
tsan< ProductDescriptions > producedProductDescriptions_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
static QCString result
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:24
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:156
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
const char expected[]
Definition: Exception_t.cc:22
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
std::string string
Definition: nybbler.cc:12
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
tsan_unique_ptr< FileBlock > fb_
#define TDEBUG_END_TASK_SI(LEVEL, SI)
constexpr auto most_deeply_nested_level() noexcept
Definition: Level.h:44
bool const handleEmptyRuns_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
std::atomic< Level > nextLevel_
bool outputsToClose() const
Definition: Schedule.h:68
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:163
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
Schedule & main_schedule()
void process(Transition, Principal &)
Definition: Schedule.cc:84
Level
Definition: Level.h:13
void beginJob(detail::SharedResources const &resources)
Definition: Schedule.cc:42
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
STL namespace.
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
std::vector< BranchDescription > ProductDescriptions
fhicl::ParameterSet const & triggerPSet() const
Definition: Globals.cc:60
std::unique_ptr< GlobalTaskGroup > taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
Definition: Run.h:17
void store(std::exception_ptr ex_ptr)
ProcessConfigurationID id() const
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
EndPathRunnerTask(EventProcessor *evp, ScheduleID const sid)
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:144
void freeze(tbb::task_group &group)
tsan< cet::cpu_timer > timer_
OutputFileStatus
const double e
EventPrincipal & event_principal()
Definition: Schedule.h:192
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostSourceConstruction
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
void beginJob()
Definition: Breakpoints.cc:14
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
Definition: writeSummary.cc:89
def move(depos, offset)
Definition: depos.py:107
std::atomic< bool > finalizeSubRunEnabled_
T get(std::string const &key) const
Definition: ParameterSet.h:271
std::string const & getReleaseVersion()
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
EndPathTask(EventProcessor *evp, ScheduleID const sid)
void writeEvent()
Definition: Schedule.h:104
std::atomic< bool > finalizeRunEnabled_
std::string bold_fontify(std::string const &s)
Definition: bold_fontify.h:8
std::atomic< bool > fileSwitchInProgress_
RangeSetHandler const & subRunRangeSetHandler()
Definition: Schedule.h:179
#define TDEBUG_FUNC_SI(LEVEL, SI)
string tmp
Definition: languages.py:63
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:74
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
auto transform_all(Container &, OutputIt, UnaryOp)
std::atomic< bool > beginSubRunCalled_
void setupSignals(bool want_sigint_enabled)
void finishEventAsync(ScheduleID sid)
void LogStatistics()
ParameterSetID id() const
void respondToOpenOutputFiles(FileBlock const &)
Definition: Schedule.cc:70
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
void incrementInputFileNumber()
Definition: Schedule.h:113
RangeSet seenRanges() const
StatusCode runToCompletion()
bool outputsToOpen() const
Definition: Schedule.h:62
unique_ptr< InputSource > make(ParameterSet const &conf, InputSourceDescription &desc)
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
string release
Definition: conf.py:24
char const * what() const noexcept override
void showMissingConsumes() const
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
std::string const & processName() const
Definition: Globals.cc:48
id_type size_type
Definition: ScheduleID.h:25
void processEventAsync(ScheduleID sid)
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
static QCString type
Definition: declinfo.cpp:672
void closeSomeOutputFiles()
Definition: Schedule.h:98
void setRequireConsumes(bool const)
Definition: ConsumesInfo.cc:93
void accept_principal(std::unique_ptr< EventPrincipal > principal)
Definition: Schedule.h:185
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
RangeSetHandler * clone() const
bool erase(std::string const &key)
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
RangeSet & merge(RangeSet const &other)
Definition: RangeSet.cc:295
list x
Definition: train.py:276
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:32
void for_each_schedule(F f) const
Namespace containing all the test actions.
static Globals * instance()
Definition: Globals.cc:17
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
Definition: Schedule.cc:91
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void beginSubRunIfNotDoneAlready()
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:138
void writeRun(RunPrincipal &rp)
Definition: Schedule.h:150
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
static constexpr double sr
Definition: Units.h:166
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
void respondToOpenInputFile(FileBlock const &)
Definition: Schedule.cc:56
void openSomeOutputFiles(FileBlock const &fb)
Definition: Schedule.h:92
void operator()(exception_ptr const ex)
void put(std::string const &key)
void setSubRunAuxiliaryRangeSetID()
void writeSubRun(SubRunPrincipal &srp)
Definition: Schedule.h:173
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
QTextStream & endl(QTextStream &s)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
auto const invalid_module_context
void operator()(std::exception_ptr ex) const
void setOutputFileStatus(OutputFileStatus const ofs)
Definition: Schedule.h:119