Classes | Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
art::EventProcessor Class Reference

#include <EventProcessor.h>

Classes

class  EndPathRunnerTask
 
class  EndPathTask
 

Public Types

enum  StatusCode { epSuccess = 0, epSignal = 3 }
 

Public Member Functions

 EventProcessor (fhicl::ParameterSet const &pset, detail::EnabledModules const &enabled_modules)
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (EventProcessor &&)=delete
 
EventProcessoroperator= (EventProcessor const &)=delete
 
EventProcessoroperator= (EventProcessor &&)=delete
 
StatusCode runToCompletion ()
 

Private Types

template<typename T >
using tsan = hep::concurrency::thread_sanitize< T >
 
template<typename T >
using tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr< T >
 

Private Member Functions

void processAllEventsAsync (ScheduleID sid)
 
void readAndProcessAsync (ScheduleID sid)
 
void processEventAsync (ScheduleID sid)
 
void finishEventAsync (ScheduleID sid)
 
template<Level L>
bool levelsToProcess ()
 
template<Level L>
std::enable_if_t< is_above_most_deeply_nested_level(L)> begin ()
 
template<Level L>
void process ()
 
template<Level L>
void finalize ()
 
template<Level L>
void finalizeContainingLevels ()
 
template<Level L>
void recordOutputModuleClosureRequests ()
 
Level advanceItemType ()
 
void beginJob ()
 
void endJob ()
 
void endJobAllSchedules ()
 
void openInputFile ()
 
bool outputsToOpen ()
 
void openSomeOutputFiles ()
 
void closeInputFile ()
 
void closeSomeOutputFiles ()
 
void closeAllOutputFiles ()
 
void closeAllFiles ()
 
void respondToOpenInputFile ()
 
void respondToCloseInputFile ()
 
void respondToOpenOutputFiles ()
 
void respondToCloseOutputFiles ()
 
void readRun ()
 
void beginRun ()
 
void beginRunIfNotDoneAlready ()
 
void setRunAuxiliaryRangeSetID ()
 
void endRun ()
 
void writeRun ()
 
void readSubRun ()
 
void beginSubRun ()
 
void beginSubRunIfNotDoneAlready ()
 
void setSubRunAuxiliaryRangeSetID ()
 
void endSubRun ()
 
void writeSubRun ()
 
void readEvent ()
 
void processEvent ()
 
void writeEvent ()
 
void setOutputFileStatus (OutputFileStatus)
 
void invokePostBeginJobWorkers_ ()
 
void terminateAbnormally_ ()
 
Scheduleschedule (ScheduleID const id)
 
Schedulemain_schedule ()
 
actions::ActionCodes error_action (cet::exception &e) const
 
template<>
void process ()
 

Private Attributes

std::atomic< LevelnextLevel_ {Level::ReadyToAdvance}
 
tsan< detail::ExceptionCollectorec_ {}
 
tsan< cet::cpu_timertimer_ {}
 
std::atomic< boolbeginRunCalled_ {false}
 
std::atomic< boolbeginSubRunCalled_ {false}
 
std::atomic< boolfinalizeRunEnabled_ {false}
 
std::atomic< boolfinalizeSubRunEnabled_ {false}
 
ActivityRegistry actReg_ {}
 
tsan< MFStatusUpdatermfStatusUpdater_ {actReg_}
 
tsan< UpdateOutputCallbacksoutputCallbacks_ {}
 
tsan< ProductDescriptionsproducedProductDescriptions_ {}
 
tsan< ProductTablesproducedProductLookupTables_ {ProductTables::invalid()}
 
tsan< ProducingServiceSignalspsSignals_ {}
 
tsan< Schedulerscheduler_
 
std::unique_ptr< GlobalTaskGrouptaskGroup_ {nullptr}
 
detail::SharedResources sharedResources_ {}
 
ScheduleIteration scheduleIteration_
 
tsan_unique_ptr< ServicesManagerservicesManager_
 
tsan< PathManagerpathManager_
 
tsan_unique_ptr< InputSourceinput_ {nullptr}
 
tsan< std::map< ScheduleID, Schedule > > schedules_ {}
 
tsan_unique_ptr< FileBlockfb_ {nullptr}
 
tsan_unique_ptr< RunPrincipalrunPrincipal_ {nullptr}
 
tsan_unique_ptr< SubRunPrincipalsubRunPrincipal_ {nullptr}
 
PerScheduleContainer< std::unique_ptr< EventPrincipal > > eventPrincipals_ {}
 
bool const handleEmptyRuns_
 
bool const handleEmptySubRuns_
 
SharedException sharedException_
 
std::atomic< boolfirstEvent_ {true}
 
std::atomic< boolfileSwitchInProgress_ {false}
 

Detailed Description

Definition at line 43 of file EventProcessor.h.

Member Typedef Documentation

template<typename T >
using art::EventProcessor::tsan = hep::concurrency::thread_sanitize<T>
private

Definition at line 139 of file EventProcessor.h.

template<typename T >
using art::EventProcessor::tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr<T>
private

Definition at line 142 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epSignal 

Definition at line 49 of file EventProcessor.h.

Constructor & Destructor Documentation

art::EventProcessor::EventProcessor ( fhicl::ParameterSet const &  pset,
detail::EnabledModules const &  enabled_modules 
)
explicit

Definition at line 124 of file EventProcessor.cc.

126  : scheduler_{pset.get<ParameterSet>("services.scheduler")}
127  , scheduleIteration_{scheduler_->num_schedules()}
128  , servicesManager_{create_services_manager(
129  pset.get<ParameterSet>("services"),
130  actReg_,
132  , pathManager_{pset,
135  scheduler_->actionTable(),
136  actReg_,
137  enabled_modules}
138  , handleEmptyRuns_{scheduler_->handleEmptyRuns()}
139  , handleEmptySubRuns_{scheduler_->handleEmptySubRuns()}
140  {
141  auto services_pset = pset.get<ParameterSet>("services");
142  auto const scheduler_pset = services_pset.get<ParameterSet>("scheduler");
143  {
144  // FIXME: Signals and threads require more effort than this! A
145  // signal is delivered to only one thread, and which
146  // thread is left up to the implementation to decide. To
147  // get control we must block all signals in the main
148  // thread, create a new thread which will handle the
149  // signals we want to handle, unblock the signals in that
150  // thread only, and have it use sigwaitinfo() to suspend
151  // itselt and wait for those signals.
152  setupSignals(scheduler_pset.get<bool>("enableSigInt", true));
153  }
157  // We do this late because the floating point control word, signal
158  // masks, etc., are per-thread and inherited from the master
159  // thread, so we want to allow system services, user services, and
160  // modules to configure these things in their constructors before
161  // we let tbb create any threads. This means they cannot use tbb
162  // in their constructors, instead they must use the beginJob
163  // callout.
164  taskGroup_ = scheduler_->global_task_group();
165  // Whenever we are ready to enable ROOT's implicit MT, which is
166  // equivalent to its use of TBB, the call should be made after our
167  // own TBB task manager has been initialized.
168  // ROOT::EnableImplicitMT();
169  TDEBUG_FUNC(5) << "nschedules: " << scheduler_->num_schedules()
170  << " nthreads: " << scheduler_->num_threads();
171 
172  auto const errorOnMissingConsumes = scheduler_->errorOnMissingConsumes();
173  ConsumesInfo::instance()->setRequireConsumes(errorOnMissingConsumes);
174 
175  auto const& processName = Globals::instance()->processName();
176 
177  // Trigger-names
178  servicesManager_->addSystemService<TriggerNamesService>(
180  pset.get<ParameterSet>("physics", {}));
181 
182  // We have delayed creating the service instances, now actually
183  // create them.
184  servicesManager_->forceCreation();
185  ServiceHandle<FileCatalogMetadata>()->addMetadataString("art.process_name",
186  processName);
187 
188  // Now that the service module instances have been created we can
189  // set the callbacks, set the module description, and register the
190  // products for each service module instance.
191  ProcessConfiguration const pc{processName, pset.id(), getReleaseVersion()};
192  auto const producing_services = servicesManager_->registerProducts(
193  producedProductDescriptions_, psSignals_, pc);
194  pathManager_->createModulesAndWorkers(
195  *taskGroup_, sharedResources_, producing_services);
196 
197  ServiceHandle<TriggerNamesService> trigger_names [[maybe_unused]];
198  auto const end = Globals::instance()->nschedules();
199  for (ScheduleID::size_type i = 0; i != end; ++i) {
200  ScheduleID const sid{i};
201 
202  // The ordering of the path results in the TriggerPathsInfo (which is used
203  // for the TriggerResults object), must be the same as that provided by
204  // the TriggerNamesService.
205  auto& trigger_paths_info = pathManager_->triggerPathsInfo(sid);
206  assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
207 
208  auto results_inserter =
209  maybe_trigger_results_inserter(sid, //
210  processName,
211  pset,
212  Globals::instance()->triggerPSet(),
213  outputCallbacks_, //
214  producedProductDescriptions_,
215  scheduler_->actionTable(), //
216  actReg_, //
217  trigger_paths_info, //
218  *taskGroup_, //
220  schedules_->emplace(std::piecewise_construct,
221  std::forward_as_tuple(sid),
222  std::forward_as_tuple(sid,
223  pathManager_,
224  scheduler_->actionTable(),
225  actReg_,
227  move(results_inserter),
228  *taskGroup_));
229  }
230  sharedResources_.freeze(taskGroup_->native_group());
231 
232  FDEBUG(2) << pset.to_string() << endl;
233  // The input source must be created after the end path executor
234  // because the end path executor registers a callback that must
235  // be invoked after the first input file is opened.
236  {
238  main_input.put("module_type", "EmptyEvent");
239  main_input.put("module_label", "source");
240  main_input.put("maxEvents", -1);
241  if (!pset.get_if_present("source", main_input)) {
242  mf::LogInfo("EventProcessorSourceConfig")
243  << "Could not find a source configuration: using default.";
244  }
245  ModuleDescription const md{
246  main_input.id(),
247  main_input.get<string>("module_type"),
248  main_input.get<string>("module_label"),
250  ProcessConfiguration{processName, pset.id(), getReleaseVersion()}};
251  InputSourceDescription isd{md, outputCallbacks_, actReg_};
252  try {
253  input_.reset(InputSourceFactory::make(main_input, isd).release());
254  }
255  catch (fhicl::detail::validationException const& e) {
257  << "\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
258  << "\nmodule_type : " << cet::bold_fontify(md.moduleName()) << "\n\n"
259  << e.what();
260  }
261  catch (Exception const& x) {
262  if (x.categoryCode() == errors::Configuration) {
263  throw Exception(errors::Configuration, "FailedInputSource")
264  << "Configuration of main input source has failed\n"
265  << x;
266  }
267  throw;
268  }
269  catch (cet::exception const& x) {
270  throw Exception(errors::Configuration, "FailedInputSource")
271  << "Configuration of main input source has failed\n"
272  << x;
273  }
274  catch (...) {
275  throw;
276  }
277  }
278  actReg_.sPostSourceConstruction.invoke(input_->moduleDescription());
279  // Create product tables used for product retrieval within modules.
280  producedProductLookupTables_ = ProductTables{producedProductDescriptions_};
281  outputCallbacks_->invoke(producedProductLookupTables_);
282  }
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
tsan_unique_ptr< InputSource > input_
main_input
CNN definition ############################.
Definition: train_cnn.py:84
tsan< ProductDescriptions > producedProductDescriptions_
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:24
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
bool const handleEmptyRuns_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
fhicl::ParameterSet const & triggerPSet() const
Definition: Globals.cc:60
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< ProducingServiceSignals > psSignals_
void freeze(tbb::task_group &group)
const double e
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
def move(depos, offset)
Definition: depos.py:107
T get(std::string const &key) const
Definition: ParameterSet.h:271
std::string const & getReleaseVersion()
std::string bold_fontify(std::string const &s)
Definition: bold_fontify.h:8
tsan_unique_ptr< ServicesManager > servicesManager_
void setupSignals(bool want_sigint_enabled)
ParameterSetID id() const
unique_ptr< InputSource > make(ParameterSet const &conf, InputSourceDescription &desc)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
string release
Definition: conf.py:24
char const * what() const noexcept override
std::string const & processName() const
Definition: Globals.cc:48
id_type size_type
Definition: ScheduleID.h:25
void setRequireConsumes(bool const)
Definition: ConsumesInfo.cc:93
list x
Definition: train.py:276
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
static Globals * instance()
Definition: Globals.cc:17
void put(std::string const &key)
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
QTextStream & endl(QTextStream &s)
art::EventProcessor::EventProcessor ( EventProcessor const &  )
delete
art::EventProcessor::EventProcessor ( EventProcessor &&  )
delete

Member Function Documentation

Level art::EventProcessor::advanceItemType ( )
private

Definition at line 1544 of file EventProcessor.cc.

1545  {
1546  auto const itemType = input_->nextItemType();
1547  FDEBUG(1) << string(4, ' ') << "*** nextItemType: " << itemType << " ***\n";
1548  switch (itemType) {
1549  case input::IsStop:
1550  return highest_level();
1551  case input::IsFile:
1552  return Level::InputFile;
1553  case input::IsRun:
1554  return Level::Run;
1555  case input::IsSubRun:
1556  return Level::SubRun;
1557  case input::IsEvent:
1558  return Level::Event;
1559  case input::IsInvalid:
1561  << "Invalid next item type presented to the event processor.\n"
1562  << "Please contact artists@fnal.gov.";
1563  }
1565  << "Unrecognized next item type presented to the event processor.\n"
1566  << "Please contact artists@fnal.gov.";
1567  }
tsan_unique_ptr< InputSource > input_
std::string string
Definition: nybbler.cc:12
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
constexpr auto highest_level() noexcept
Definition: Level.h:32
template<Level L>
std::enable_if_t<is_above_most_deeply_nested_level(L)> art::EventProcessor::begin ( )
private
void art::EventProcessor::beginJob ( )
private

Definition at line 480 of file EventProcessor.cc.

481  {
482  FDEBUG(1) << string(8, ' ') << "beginJob\n";
484  // NOTE: This implementation assumes 'Job' means one call the
485  // EventProcessor::run. If it really means once per 'application'
486  // then this code will have to be changed. Also have to deal with
487  // case where have 'run' then new Module added and do 'run' again.
488  // In that case the newly added Module needs its 'beginJob' to be
489  // called.
490  try {
491  input_->doBeginJob();
492  }
493  catch (cet::exception& e) {
494  mf::LogError("BeginJob") << "A cet::exception happened while processing"
495  " the beginJob of the 'source'\n";
496  e << "A cet::exception happened while processing"
497  " the beginJob of the 'source'\n";
498  throw;
499  }
500  catch (exception const&) {
501  mf::LogError("BeginJob") << "A exception happened while processing"
502  " the beginJob of the 'source'\n";
503  throw;
504  }
505  catch (...) {
506  mf::LogError("BeginJob") << "An unknown exception happened while"
507  " processing the beginJob of the 'source'\n";
508  throw;
509  }
510  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
512  });
513  actReg_.sPostBeginJob.invoke();
515  }
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
std::string string
Definition: nybbler.cc:12
void beginJob(detail::SharedResources const &resources)
Definition: Schedule.cc:42
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
const double e
void beginJob()
Definition: Breakpoints.cc:14
detail::SharedResources sharedResources_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::beginRun ( )
private

Definition at line 713 of file EventProcessor.cc.

714  {
715  assert(runPrincipal_);
716  RunID const r{runPrincipal_->runID()};
717  if (r.isFlush()) {
718  return;
719  }
720  finalizeRunEnabled_ = true;
721  try {
722  {
724  actReg_.sPreBeginRun.invoke(run);
725  }
726  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
728  });
729  {
730  Run const run{*runPrincipal_, invalid_module_context};
731  actReg_.sPostBeginRun.invoke(run);
732  }
733  }
734  catch (cet::exception& ex) {
735  throw Exception{
737  "EventProcessor: an exception occurred during current event processing",
738  ex};
739  }
740  catch (...) {
741  mf::LogError("PassingThrough")
742  << "an exception occurred during current event processing\n";
743  throw;
744  }
745  FDEBUG(1) << string(8, ' ') << "beginRun....................(" << r
746  << ")\n";
747  beginRunCalled_ = true;
748  }
std::string string
Definition: nybbler.cc:12
void process(Transition, Principal &)
Definition: Schedule.cc:84
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< RunPrincipal > runPrincipal_
std::atomic< bool > beginRunCalled_
std::atomic< bool > finalizeRunEnabled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
auto const invalid_module_context
void art::EventProcessor::beginRunIfNotDoneAlready ( )
private

Definition at line 751 of file EventProcessor.cc.

752  {
753  if (!beginRunCalled_) {
754  beginRun();
755  }
756  }
std::atomic< bool > beginRunCalled_
void art::EventProcessor::beginSubRun ( )
private

Definition at line 882 of file EventProcessor.cc.

883  {
884  assert(subRunPrincipal_);
885  SubRunID const sr{subRunPrincipal_->subRunID()};
886  if (sr.isFlush()) {
887  return;
888  }
889  finalizeSubRunEnabled_ = true;
890  try {
891  {
893  actReg_.sPreBeginSubRun.invoke(srun);
894  }
895  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
897  });
898  {
899  SubRun const srun{*subRunPrincipal_, invalid_module_context};
900  actReg_.sPostBeginSubRun.invoke(srun);
901  }
902  }
903  catch (cet::exception& ex) {
904  throw Exception{
906  "EventProcessor: an exception occurred during current event processing",
907  ex};
908  }
909  catch (...) {
910  mf::LogError("PassingThrough")
911  << "an exception occurred during current event processing\n";
912  throw;
913  }
914  FDEBUG(1) << string(8, ' ') << "beginSubRun.................(" << sr
915  << ")\n";
916  beginSubRunCalled_ = true;
917  }
std::string string
Definition: nybbler.cc:12
void process(Transition, Principal &)
Definition: Schedule.cc:84
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
std::atomic< bool > finalizeSubRunEnabled_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
static constexpr double sr
Definition: Units.h:166
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
auto const invalid_module_context
void art::EventProcessor::beginSubRunIfNotDoneAlready ( )
private

Definition at line 920 of file EventProcessor.cc.

921  {
922  if (!beginSubRunCalled_) {
923  beginSubRun();
924  }
925  }
std::atomic< bool > beginSubRunCalled_
void art::EventProcessor::closeAllFiles ( )
private

Definition at line 557 of file EventProcessor.cc.

558  {
560  closeInputFile();
561  }
void art::EventProcessor::closeAllOutputFiles ( )
private

Definition at line 584 of file EventProcessor.cc.

585  {
586  if (!main_schedule().someOutputsOpen()) {
587  return;
588  }
591  FDEBUG(1) << string(8, ' ') << "closeAllOutputFiles\n";
592  }
void closeAllOutputFiles()
Definition: Schedule.h:86
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void art::EventProcessor::closeInputFile ( )
private

Definition at line 564 of file EventProcessor.cc.

565  {
567  // Output-file closing on input-file boundaries are tricky since
568  // input files must outlive the output files, which often have
569  // data copied forward from the input files. That's why the
570  // recordOutputClosureRequests call is made here instead of in a
571  // specialization of recordOutputModuleClosureRequests<>.
573  if (main_schedule().outputsToClose()) {
575  }
577  actReg_.sPreCloseFile.invoke();
578  input_->closeFile();
579  actReg_.sPostCloseFile.invoke();
580  FDEBUG(1) << string(8, ' ') << "closeInputFile\n";
581  }
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:74
void incrementInputFileNumber()
Definition: Schedule.h:113
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void art::EventProcessor::closeSomeOutputFiles ( )
private

Definition at line 632 of file EventProcessor.cc.

633  {
634  // Precondition: there are SOME output files that have been
635  // flagged as needing to close. Otherwise,
636  // 'respondtoCloseOutputFiles' will be needlessly
637  // called.
638  assert(main_schedule().outputsToClose());
641  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
642  }
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
void closeSomeOutputFiles()
Definition: Schedule.h:98
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void art::EventProcessor::endJob ( )
private

Definition at line 518 of file EventProcessor.cc.

519  {
520  FDEBUG(1) << string(8, ' ') << "endJob\n";
521  ec_->call([this] { endJobAllSchedules(); });
522  ec_->call([] { ConsumesInfo::instance()->showMissingConsumes(); });
523  ec_->call([this] { input_->doEndJob(); });
524  ec_->call([this] { actReg_.sPostEndJob.invoke(); });
525  ec_->call([] { mf::LogStatistics(); });
526  ec_->call([this] {
528  });
529  }
tsan_unique_ptr< InputSource > input_
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:24
tsan< Scheduler > scheduler_
std::string string
Definition: nybbler.cc:12
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
tsan< PathManager > pathManager_
tsan< detail::ExceptionCollector > ec_
tsan< cet::cpu_timer > timer_
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
Definition: writeSummary.cc:89
void LogStatistics()
void showMissingConsumes() const
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void art::EventProcessor::endJobAllSchedules ( )
private

Definition at line 532 of file EventProcessor.cc.

533  {
535  [this](ScheduleID const sid) { schedule(sid).endJob(); });
536  }
void endJob()
Definition: Schedule.cc:49
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
void for_each_schedule(F f) const
void art::EventProcessor::endRun ( )
private

Definition at line 802 of file EventProcessor.cc.

803  {
804  assert(runPrincipal_);
805  // Precondition: The RunID does not correspond to a flush ID. --
806  // N.B. The flush flag is not explicitly checked here since endRun
807  // is only called from finalizeRun, which is where the check
808  // happens.
809  RunID const run{runPrincipal_->runID()};
810  assert(!run.isFlush());
811  try {
812  actReg_.sPreEndRun.invoke(runPrincipal_->runID(),
813  runPrincipal_->endTime());
814  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
816  });
818  actReg_.sPostEndRun.invoke(r);
819  }
820  catch (cet::exception& ex) {
821  throw Exception{
823  "EventProcessor: an exception occurred during current event processing",
824  ex};
825  }
826  catch (...) {
827  mf::LogError("PassingThrough")
828  << "an exception occurred during current event processing\n";
829  throw;
830  }
831  FDEBUG(1) << string(8, ' ') << "endRun......................(" << run
832  << ")\n";
833  beginRunCalled_ = false;
834  }
std::string string
Definition: nybbler.cc:12
void process(Transition, Principal &)
Definition: Schedule.cc:84
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< RunPrincipal > runPrincipal_
std::atomic< bool > beginRunCalled_
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
auto const invalid_module_context
void art::EventProcessor::endSubRun ( )
private

Definition at line 1028 of file EventProcessor.cc.

1029  {
1030  assert(subRunPrincipal_);
1031  // Precondition: The SubRunID does not correspond to a flush ID.
1032  // Note: the flush flag is not explicitly checked here since
1033  // endSubRun is only called from finalizeSubRun, which is where the
1034  // check happens.
1035  SubRunID const sr{subRunPrincipal_->subRunID()};
1036  assert(!sr.isFlush());
1037  try {
1038  actReg_.sPreEndSubRun.invoke(subRunPrincipal_->subRunID(),
1039  subRunPrincipal_->endTime());
1040  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
1042  });
1044  actReg_.sPostEndSubRun.invoke(srun);
1045  }
1046  catch (cet::exception& ex) {
1047  throw Exception{
1049  "EventProcessor: an exception occurred during current event processing",
1050  ex};
1051  }
1052  catch (...) {
1053  mf::LogError("PassingThrough")
1054  << "an exception occurred during current event processing\n";
1055  throw;
1056  }
1057  FDEBUG(1) << string(8, ' ') << "endSubRun...................(" << sr
1058  << ")\n";
1059  beginSubRunCalled_ = false;
1060  }
std::string string
Definition: nybbler.cc:12
void process(Transition, Principal &)
Definition: Schedule.cc:84
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
static constexpr double sr
Definition: Units.h:166
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
auto const invalid_module_context
actions::ActionCodes art::EventProcessor::error_action ( cet::exception &  e) const
inlineprivate

Definition at line 157 of file EventProcessor.h.

158  {
159  return scheduler_->actionTable().find(e.root_cause());
160  }
tsan< Scheduler > scheduler_
const double e
template<Level L>
void art::EventProcessor::finalize ( )
private
template<Level L>
void art::EventProcessor::finalizeContainingLevels ( )
inlineprivate

Definition at line 95 of file EventProcessor.h.

96  {}
void art::EventProcessor::finishEventAsync ( ScheduleID  sid)
private

Definition at line 1436 of file EventProcessor.cc.

1437  {
1438  auto& ep = schedule(sid).event_principal();
1440  ScheduleContext{sid});
1441 
1442  // Note: We are part of the endPathTask.
1443  TDEBUG_BEGIN_FUNC_SI(4, sid);
1444  FDEBUG(1) << string(8, ' ') << "processEvent................("
1445  << ep.eventID() << ")\n";
1446  try {
1447  // Ask the output workers if they have reached their limits, and
1448  // if so setup to end the job the next time around the event
1449  // loop.
1450  FDEBUG(1) << string(8, ' ') << "shouldWeStop\n";
1451  TDEBUG_FUNC_SI(5, sid) << "Calling schedules_->allAtLimit()";
1452  static std::mutex m;
1453  std::lock_guard sentry{m};
1454  if (schedule(sid).allAtLimit()) {
1455  // Set to return to the File level.
1457  }
1458  // Now we can write the results of processing to the outputs,
1459  // and delete the event principal.
1460  if (!ep.eventID().isFlush()) {
1461  // Possibly open new output files. This is safe to do because
1462  // EndPathExecutor functions are called in a serialized
1463  // context.
1464  TDEBUG_FUNC_SI(5, sid) << "Calling openSomeOutputFiles()";
1466  TDEBUG_FUNC_SI(5, sid) << "Calling schedule(sid).writeEvent()";
1467 
1468  auto const id = ep.eventID();
1469  schedule(sid).writeEvent();
1470  FDEBUG(1) << string(8, ' ') << "writeEvent..................(" << id
1471  << ")\n";
1472  }
1473  TDEBUG_FUNC_SI(5, sid)
1474  << "Calling schedules_->"
1475  "recordOutputClosureRequests(Granularity::Event)";
1477  }
1478  catch (cet::exception& e) {
1482  "EventProcessor: an exception occurred "
1483  "during current event processing",
1484  e);
1485  // And then end this task, terminating event processing.
1486  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1487  return;
1488  }
1489  mf::LogWarning(e.category())
1490  << "exception being ignored for current event:\n"
1491  << cet::trim_right_copy(e.what(), " \n");
1492  // WARNING: We continue processing after the catch blocks!!!
1493  }
1494  catch (...) {
1495  mf::LogError("PassingThrough")
1496  << "an exception occurred during current event processing\n";
1498  // And then end this task, terminating event processing.
1499  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1500  return;
1501  }
1502 
1503  // The next event processing task is a continuation of this task.
1504  processAllEventsAsync(sid);
1505  TDEBUG_END_FUNC_SI(4, sid);
1506  }
std::string string
Definition: nybbler.cc:12
std::atomic< Level > nextLevel_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void processAllEventsAsync(ScheduleID sid)
void store(std::exception_ptr ex_ptr)
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
const double e
EventPrincipal & event_principal()
Definition: Schedule.h:192
actions::ActionCodes error_action(cet::exception &e) const
void writeEvent()
Definition: Schedule.h:104
#define TDEBUG_FUNC_SI(LEVEL, SI)
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:74
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:32
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
auto const invalid_module_context
void art::EventProcessor::invokePostBeginJobWorkers_ ( )
private

Definition at line 285 of file EventProcessor.cc.

286  {
287  using cet::transform_all;
288  // Need to convert multiple lists of workers into a long list that
289  // the postBeginJobWorkers callbacks can understand.
290  vector<Worker*> allWorkers;
291  transform_all(pathManager_->triggerPathsInfo(ScheduleID::first()).workers(),
292  back_inserter(allWorkers),
293  [](auto const& pr) { return pr.second.get(); });
294  transform_all(pathManager_->endPathInfo(ScheduleID::first()).workers(),
295  back_inserter(allWorkers),
296  [](auto const& pr) { return pr.second.get(); });
297  actReg_.sPostBeginJobWorkers.invoke(input_, allWorkers);
298  }
tsan_unique_ptr< InputSource > input_
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
tsan< PathManager > pathManager_
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
auto transform_all(Container &, OutputIt, UnaryOp)
ActivityRegistry actReg_
template<Level L>
bool art::EventProcessor::levelsToProcess ( )
private

Definition at line 305 of file EventProcessor.cc.

306  {
307  if (nextLevel_.load() == Level::ReadyToAdvance) {
309  // Consider reading right here?
310  }
311  if (nextLevel_.load() == L) {
313  if (main_schedule().outputsToClose()) {
315  finalizeContainingLevels<L>();
317  }
318  return true;
319  } else if (nextLevel_.load() < L) {
320  return false;
321  } else if (nextLevel_.load() == highest_level()) {
322  return false;
323  }
324  throw Exception{errors::LogicError} << "Incorrect level hierarchy.\n"
325  << " Current level: " << L
326  << " Next level: " << nextLevel_;
327  }
std::atomic< Level > nextLevel_
bool outputsToClose() const
Definition: Schedule.h:68
Schedule & main_schedule()
void setOutputFileStatus(OutputFileStatus)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
constexpr auto highest_level() noexcept
Definition: Level.h:32
Schedule& art::EventProcessor::main_schedule ( )
inlineprivate

Definition at line 151 of file EventProcessor.h.

152  {
153  return schedule(ScheduleID::first());
154  }
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
Schedule & schedule(ScheduleID const id)
void art::EventProcessor::openInputFile ( )
private

Definition at line 542 of file EventProcessor.cc.

543  {
544  actReg_.sPreOpenFile.invoke();
545  FDEBUG(1) << string(8, ' ') << "openInputFile\n";
546  fb_.reset(input_->readFile().release());
547  if (fb_ == nullptr) {
549  << "Source readFile() did not return a valid FileBlock: FileBlock "
550  << "should be valid or readFile() should throw.\n";
551  }
552  actReg_.sPostOpenFile.invoke(fb_->fileName());
554  }
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void art::EventProcessor::openSomeOutputFiles ( )
private

Definition at line 609 of file EventProcessor.cc.

610  {
611  if (!outputsToOpen()) {
612  return;
613  }
614 
615  auto open_some_outputs = [this](ScheduleID const sid) {
617  };
618  scheduleIteration_.for_each_schedule(open_some_outputs);
619 
620  FDEBUG(1) << string(8, ' ') << "openSomeOutputFiles\n";
622  }
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void openSomeOutputFiles(FileBlock const &fb)
Definition: Schedule.h:92
EventProcessor& art::EventProcessor::operator= ( EventProcessor const &  )
delete
EventProcessor& art::EventProcessor::operator= ( EventProcessor &&  )
delete
bool art::EventProcessor::outputsToOpen ( )
private

Definition at line 595 of file EventProcessor.cc.

596  {
597  bool outputs_to_open{false};
598  auto check_outputs_to_open = [this,
599  &outputs_to_open](ScheduleID const sid) {
600  if (schedule(sid).outputsToOpen()) {
601  outputs_to_open = true;
602  }
603  };
604  scheduleIteration_.for_each_schedule(check_outputs_to_open);
605  return outputs_to_open;
606  }
ScheduleIteration scheduleIteration_
bool outputsToOpen() const
Definition: Schedule.h:62
Schedule & schedule(ScheduleID const id)
void for_each_schedule(F f) const
template<Level L>
void art::EventProcessor::process ( )
private

Definition at line 1510 of file EventProcessor.cc.

1511  {
1512  if ((shutdown_flag > 0) || !ec_->empty()) {
1513  return;
1514  }
1515  ec_->call([this] { begin<L>(); });
1516  while ((shutdown_flag == 0) && ec_->empty() &&
1517  levelsToProcess<level_down(L)>()) {
1518  ec_->call([this] { process<level_down(L)>(); });
1519  }
1520  ec_->call([this] {
1521  finalize<L>();
1522  recordOutputModuleClosureRequests<L>();
1523  });
1524  }
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
template<>
void art::EventProcessor::process ( )
private

Definition at line 1079 of file EventProcessor.cc.

1080  {
1081  if ((shutdown_flag > 0) || !ec_->empty()) {
1082  return;
1083  }
1084  // Note: This loop is to allow output file switching to happen in
1085  // the main thread.
1086  firstEvent_ = true;
1087  bool done = false;
1088  while (!done) {
1091 
1092  auto const last_schedule_index = scheduler_->num_schedules() - 1;
1093  for (ScheduleID::size_type i = 0; i != last_schedule_index; ++i) {
1094  taskGroup_->run([this, i] { processAllEventsAsync(ScheduleID(i)); });
1095  }
1096  taskGroup_->native_group().run_and_wait([this, last_schedule_index] {
1097  processAllEventsAsync(ScheduleID(last_schedule_index));
1098  });
1099 
1100  // If anything bad happened during event processing, let the
1101  // user know.
1103  if (!fileSwitchInProgress_.load()) {
1104  done = true;
1105  continue;
1106  }
1108  finalizeContainingLevels<most_deeply_nested_level()>();
1111  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
1112  // We started the switch after advancing to the next item type;
1113  // we must make sure that we read that event before advancing
1114  // the item type again.
1115  firstEvent_ = true;
1116  fileSwitchInProgress_ = false;
1117  }
1118  }
tsan< Scheduler > scheduler_
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
std::atomic< bool > firstEvent_
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
std::atomic< bool > fileSwitchInProgress_
SharedException sharedException_
id_type size_type
Definition: ScheduleID.h:25
void closeSomeOutputFiles()
Definition: Schedule.h:98
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void beginSubRunIfNotDoneAlready()
void art::EventProcessor::processAllEventsAsync ( ScheduleID  sid)
private

Definition at line 1125 of file EventProcessor.cc.

1126  {
1127  // Note: We are part of the processAllEventsTask (schedule head
1128  // task), and our parent is the eventLoopTask.
1129  TDEBUG_BEGIN_FUNC_SI(4, sid);
1130  try {
1131  readAndProcessAsync(sid);
1132  }
1133  catch (...) {
1135  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1136  return;
1137  }
1138  // If no exception, then end this task, which does not terminate
1139  // event processing because readAndProcessAsync creates a
1140  // continuation task.
1141  TDEBUG_END_FUNC_SI(4, sid);
1142  }
void readAndProcessAsync(ScheduleID sid)
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
void art::EventProcessor::processEvent ( )
private
void art::EventProcessor::processEventAsync ( ScheduleID  sid)
private

Definition at line 1399 of file EventProcessor.cc.

1399  {
1400  // Note: We are part of the readAndProcessEventTask (schedule head
1401  // task), and our parent task is the eventLoopTask.
1402  TDEBUG_BEGIN_FUNC_SI(4, sid);
1403  assert(!schedule(sid).event_principal().eventID().isFlush());
1404  // Continue processing via the creation of a continuation.
1405  auto endPathTask = make_waiting_task<EndPathTask>(this, sid);
1406  // Start the trigger paths running. When they finish they will
1407  // spawn the endPathTask which will run the end path, write the
1408  // event, and start the next event processing task.
1409  schedule(sid).process_event_modifiers(endPathTask);
1410  TDEBUG_END_FUNC_SI(4, sid);
1411  }
1412  catch (cet::exception& e) {
1413  // Upon exiting this scope, end this task, terminating event
1414  // processing.
1418  "EventProcessor: an exception occurred during current event processing",
1419  e);
1420  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1421  return;
1422  }
1423  mf::LogWarning(e.category())
1424  << "exception being ignored for current event:\n"
1425  << cet::trim_right_copy(e.what(), " \n");
1426  TDEBUG_END_FUNC_SI(4, sid) << "Ignoring exception.";
1427  }
1428  catch (...) {
1429  mf::LogError("PassingThrough")
1430  << "an exception occurred during current event processing\n";
1432  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1433  }
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void store(std::exception_ptr ex_ptr)
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
const double e
actions::ActionCodes error_action(cet::exception &e) const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
Definition: Schedule.cc:91
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::readAndProcessAsync ( ScheduleID  sid)
private

Definition at line 1150 of file EventProcessor.cc.

1151  {
1152  // Note: We are part of the readAndProcessEventTask (schedule head
1153  // task), and our parent task is the eventLoopTask.
1154  TDEBUG_BEGIN_FUNC_SI(4, sid);
1155  // Note: shutdown_flag is a extern global atomic int in
1156  // art/art/Utilities/UnixSignalHandlers.cc
1157  if (shutdown_flag) {
1158  // User called for a clean shutdown using a signal or ctrl-c,
1159  // end event processing and this task.
1160  TDEBUG_END_FUNC_SI(4, sid) << "CLEAN SHUTDOWN";
1161  return;
1162  }
1163 
1164  // The item type advance and the event read must be done with the
1165  // input source lock held; however event-processing must not
1166  // serialized.
1167  {
1168  InputSourceMutexSentry lock_input;
1169  if (fileSwitchInProgress_.load()) {
1170  // We must avoid advancing the iterator after a schedule has
1171  // noticed it is time to switch files. After the switch, we
1172  // will need to set firstEvent_ true so that the first
1173  // schedule that resumes after the switch actually reads the
1174  // event that the first schedule which noticed we needed a
1175  // switch had advanced the iterator to.
1176 
1177  // Note: We still have the problem that because the schedules
1178  // do not read events at the same time the file switch point
1179  // can be up to nschedules-1 ahead of where it would have been
1180  // if there was only one schedule. If we are switching output
1181  // files every event in an attempt to create single event
1182  // files, this really does not work out too well.
1183  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH";
1184  return;
1185  }
1186  // Check the next item type and exit this task if it is not an
1187  // event, or if the user has asynchronously requested a
1188  // shutdown.
1189  auto expected = true;
1190  if (firstEvent_.compare_exchange_strong(expected, false)) {
1191  // Do not advance the item type on the first event.
1192  } else {
1193  // Do the advance item type.
1194  if (nextLevel_.load() == Level::ReadyToAdvance) {
1195  // See what the next item is.
1196  TDEBUG_FUNC_SI(5, sid) << "Calling advanceItemType()";
1198  }
1199  if ((nextLevel_.load() < most_deeply_nested_level()) ||
1200  (nextLevel_.load() == highest_level())) {
1201  // We are popping up, end event processing and this task.
1202  TDEBUG_END_FUNC_SI(4, sid) << "END OF SUBRUN";
1203  return;
1204  }
1205  if (nextLevel_.load() != most_deeply_nested_level()) {
1206  // Error: incorrect level hierarchy
1207  TDEBUG_END_FUNC_SI(4, sid) << "BAD HIERARCHY";
1208  throw Exception{errors::LogicError} << "Incorrect level hierarchy.";
1209  }
1211  // At this point we have determined that we are going to read
1212  // an event and we must do that before dropping the lock on
1213  // the input source which is what is protecting us against a
1214  // double-advance caused by a different schedule.
1215  if (schedule(sid).outputsToClose()) {
1216  fileSwitchInProgress_ = true;
1217  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH INITIATED";
1218  return;
1219  }
1220  }
1221 
1222  // Now we can read the event from the source.
1223  ScheduleContext const sc{sid};
1224  assert(subRunPrincipal_);
1225  assert(subRunPrincipal_->subRunID().isValid());
1226  actReg_.sPreSourceEvent.invoke(sc);
1227  TDEBUG_FUNC_SI(5, sid) << "Calling input_->readEvent(subRunPrincipal_)";
1228  auto ep = input_->readEvent(subRunPrincipal_.get());
1229  assert(ep);
1230  // The intended behavior here is that the producing services
1231  // which are called during the sPostReadEvent cannot see each
1232  // others put products. We enforce this by creating the groups
1233  // for the produced products, but do not allow the lookups to
1234  // find them until after the callbacks have run.
1235  ep->createGroupsForProducedProducts(producedProductLookupTables_);
1236  psSignals_->sPostReadEvent.invoke(*ep);
1237  ep->enableLookupOfProducedProducts(producedProductLookupTables_);
1239  FDEBUG(1) << string(8, ' ') << "readEvent...................("
1240  << ep->eventID() << ")\n";
1241  schedule(sid).accept_principal(move(ep));
1242  // Now we drop the input source lock by exiting the guarded
1243  // scope.
1244  }
1245  if (schedule(sid).event_principal().eventID().isFlush()) {
1246  // No processing to do, start next event handling task.
1247  processAllEventsAsync(sid);
1248  TDEBUG_END_FUNC_SI(4, sid) << "FLUSH EVENT";
1249  return;
1250  }
1251 
1252  // Now process the event.
1253  processEventAsync(sid);
1254  TDEBUG_END_FUNC_SI(4, sid);
1255  }
tsan_unique_ptr< InputSource > input_
const char expected[]
Definition: Exception_t.cc:22
std::string string
Definition: nybbler.cc:12
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
constexpr auto most_deeply_nested_level() noexcept
Definition: Level.h:44
std::atomic< Level > nextLevel_
std::atomic< bool > firstEvent_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void processAllEventsAsync(ScheduleID sid)
tsan< ProductTables > producedProductLookupTables_
def move(depos, offset)
Definition: depos.py:107
std::atomic< bool > fileSwitchInProgress_
#define TDEBUG_FUNC_SI(LEVEL, SI)
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void processEventAsync(ScheduleID sid)
void accept_principal(std::unique_ptr< EventPrincipal > principal)
Definition: Schedule.h:185
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:32
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
auto const invalid_module_context
void art::EventProcessor::readEvent ( )
private
void art::EventProcessor::readRun ( )
private

Definition at line 684 of file EventProcessor.cc.

685  {
686  actReg_.sPreSourceRun.invoke();
687  runPrincipal_.reset(input_->readRun().release());
688  assert(runPrincipal_);
689  auto rsh = input_->runRangeSetHandler();
690  assert(rsh);
691  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
692  schedule(sid).seedRunRangeSet(*rsh);
693  };
694  scheduleIteration_.for_each_schedule(seed_range_set);
695  // The intended behavior here is that the producing services which
696  // are called during the sPostReadRun cannot see each others put
697  // products. We enforce this by creating the groups for the
698  // produced products, but do not allow the lookups to find them
699  // until after the callbacks have run.
700  runPrincipal_->createGroupsForProducedProducts(
702  psSignals_->sPostReadRun.invoke(*runPrincipal_);
703  runPrincipal_->enableLookupOfProducedProducts(producedProductLookupTables_);
704  {
706  actReg_.sPostSourceRun.invoke(r);
707  }
708  FDEBUG(1) << string(8, ' ') << "readRun.....................("
709  << runPrincipal_->runID() << ")\n";
710  }
tsan_unique_ptr< InputSource > input_
std::string string
Definition: nybbler.cc:12
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
ScheduleIteration scheduleIteration_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
tsan< ProductTables > producedProductLookupTables_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:138
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
auto const invalid_module_context
void art::EventProcessor::readSubRun ( )
private

Definition at line 852 of file EventProcessor.cc.

853  {
854  actReg_.sPreSourceSubRun.invoke();
855  subRunPrincipal_.reset(input_->readSubRun(runPrincipal_.get()).release());
856  assert(subRunPrincipal_);
857  auto rsh = input_->subRunRangeSetHandler();
858  assert(rsh);
859  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
860  schedule(sid).seedSubRunRangeSet(*rsh);
861  };
862  scheduleIteration_.for_each_schedule(seed_range_set);
863  // The intended behavior here is that the producing services which
864  // are called during the sPostReadSubRun cannot see each others
865  // put products. We enforce this by creating the groups for the
866  // produced products, but do not allow the lookups to find them
867  // until after the callbacks have run.
868  subRunPrincipal_->createGroupsForProducedProducts(
870  psSignals_->sPostReadSubRun.invoke(*subRunPrincipal_);
871  subRunPrincipal_->enableLookupOfProducedProducts(
873  {
875  actReg_.sPostSourceSubRun.invoke(sr);
876  }
877  FDEBUG(1) << string(8, ' ') << "readSubRun..................("
878  << subRunPrincipal_->subRunID() << ")\n";
879  }
tsan_unique_ptr< InputSource > input_
std::string string
Definition: nybbler.cc:12
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:163
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
ScheduleIteration scheduleIteration_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
tsan< ProductTables > producedProductLookupTables_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
string release
Definition: conf.py:24
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
static constexpr double sr
Definition: Units.h:166
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
auto const invalid_module_context
template<Level L>
void art::EventProcessor::recordOutputModuleClosureRequests ( )
inlineprivate

Definition at line 99 of file EventProcessor.h.

100  {}
void art::EventProcessor::respondToCloseInputFile ( )
private

Definition at line 654 of file EventProcessor.cc.

655  {
656  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
658  });
659  FDEBUG(1) << string(8, ' ') << "respondToCloseInputFile\n";
660  }
void respondToCloseInputFile(FileBlock const &)
Definition: Schedule.cc:63
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::respondToCloseOutputFiles ( )
private

Definition at line 672 of file EventProcessor.cc.

673  {
674  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
676  });
677  FDEBUG(1) << string(8, ' ') << "respondToCloseOutputFiles\n";
678  }
void respondToCloseOutputFiles(FileBlock const &)
Definition: Schedule.cc:77
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::respondToOpenInputFile ( )
private

Definition at line 645 of file EventProcessor.cc.

646  {
647  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
649  });
650  FDEBUG(1) << string(8, ' ') << "respondToOpenInputFile\n";
651  }
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void respondToOpenInputFile(FileBlock const &)
Definition: Schedule.cc:56
void art::EventProcessor::respondToOpenOutputFiles ( )
private

Definition at line 663 of file EventProcessor.cc.

664  {
665  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
667  });
668  FDEBUG(1) << string(8, ' ') << "respondToOpenOutputFiles\n";
669  }
std::string string
Definition: nybbler.cc:12
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
void respondToOpenOutputFiles(FileBlock const &)
Definition: Schedule.cc:70
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
EventProcessor::StatusCode art::EventProcessor::runToCompletion ( )

Definition at line 1527 of file EventProcessor.cc.

1528  {
1529  StatusCode returnCode{epSuccess};
1530  ec_->call([this, &returnCode] {
1531  process<highest_level()>();
1532  if (shutdown_flag > 0) {
1533  returnCode = epSignal;
1534  }
1535  });
1536  if (!ec_->empty()) {
1538  ec_->rethrow();
1539  }
1540  return returnCode;
1541  }
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
Schedule& art::EventProcessor::schedule ( ScheduleID const  id)
inlineprivate

Definition at line 145 of file EventProcessor.h.

146  {
147  return schedules_->at(id);
148  }
tsan< std::map< ScheduleID, Schedule > > schedules_
void art::EventProcessor::setOutputFileStatus ( OutputFileStatus  ofs)
private

Definition at line 625 of file EventProcessor.cc.

626  {
628  FDEBUG(1) << string(8, ' ') << "setOutputFileStatus\n";
629  }
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void setOutputFileStatus(OutputFileStatus const ofs)
Definition: Schedule.h:119
void art::EventProcessor::setRunAuxiliaryRangeSetID ( )
private

Definition at line 759 of file EventProcessor.cc.

760  {
761  assert(runPrincipal_);
762  FDEBUG(1) << string(8, ' ') << "setRunAuxiliaryRangeSetID...("
763  << runPrincipal_->runID() << ")\n";
764  if (main_schedule().runRangeSetHandler().type() ==
766  // We are using EmptyEvent source, need to merge what the
767  // schedules have seen.
768  RangeSet mergedRS;
769  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
770  auto const& rs = schedule(sid).runRangeSetHandler().seenRanges();
771  // The following constructor ensures that the range is sorted
772  // before 'merge' is called.
773  RangeSet const tmp{rs.run(), rs.ranges()};
774  mergedRS.merge(tmp);
775  };
776  scheduleIteration_.for_each_schedule(merge_range_sets);
777  runPrincipal_->updateSeenRanges(mergedRS);
778  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
779  schedule(sid).setRunAuxiliaryRangeSetID(mergedRS);
780  };
781  scheduleIteration_.for_each_schedule(update_executors);
782  return;
783  }
784 
785  // Since we are using already existing ranges, all the range set
786  // handlers have the same ranges, use the first one. handler with
787  // the largest event number, that will be the one which we will
788  // use as the file switch boundary. Note that is may not match
789  // the exactly the schedule that triggered the switch. Do we need
790  // to fix this?
791  unique_ptr<RangeSetHandler> rshAtSwitch{
793  if (main_schedule().fileStatus() != OutputFileStatus::Switching) {
794  // We are at the end of the job.
795  rshAtSwitch->flushRanges();
796  }
797  runPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
798  main_schedule().setRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
799  }
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:156
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
ScheduleIteration scheduleIteration_
tsan_unique_ptr< RunPrincipal > runPrincipal_
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:144
string tmp
Definition: languages.py:63
RangeSet seenRanges() const
static QCString type
Definition: declinfo.cpp:672
RangeSetHandler * clone() const
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::setSubRunAuxiliaryRangeSetID ( )
private

Definition at line 928 of file EventProcessor.cc.

929  {
930  assert(subRunPrincipal_);
931  FDEBUG(1) << string(8, ' ') << "setSubRunAuxiliaryRangeSetID("
932  << subRunPrincipal_->subRunID() << ")\n";
933  if (main_schedule().subRunRangeSetHandler().type() ==
935  // We are using EmptyEvent source, need to merge what the
936  // schedules have seen.
937  RangeSet mergedRS;
938  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
939  auto const& rs = schedule(sid).subRunRangeSetHandler().seenRanges();
940  // The following constructor ensures that the range is sorted
941  // before 'merge' is called.
942  RangeSet const tmp{rs.run(), rs.ranges()};
943  mergedRS.merge(tmp);
944  };
945  scheduleIteration_.for_each_schedule(merge_range_sets);
946  subRunPrincipal_->updateSeenRanges(mergedRS);
947  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
948  schedule(sid).setSubRunAuxiliaryRangeSetID(mergedRS);
949  };
950  scheduleIteration_.for_each_schedule(update_executors);
951  return;
952  }
953  // Ranges are split/flushed only for a RangeSetHandler whose
954  // dynamic type is 'ClosedRangeSetHandler'.
955  //
956  // Consider the following range-sets
957  //
958  // SubRun RangeSet:
959  //
960  // { Run 1 : SubRun 1 : Events [1,7) } <-- Current
961  //
962  // Run RangeSet:
963  //
964  // { Run 1 : SubRun 0 : Events [5,11)
965  // SubRun 1 : Events [1,7) <-- Current
966  // SubRun 1 : Events [9,15) }
967  //
968  // For a range split just before SubRun 1, Event 6, the
969  // range sets should become:
970  //
971  // SubRun RangeSet:
972  //
973  // { Run 1 : SubRun 1 : Events [1,6)
974  // SubRun 1 : Events [6,7) } <-- Updated
975  //
976  // Run RangeSet:
977  //
978  // { Run 1 : SubRun 0 : Events [5,11)
979  // SubRun 1 : Events [1,6)
980  // SubRun 1 : Events [6,7) <-- Updated
981  // SubRun 1 : Events [9,15) }
982  //
983  // Since we are using already existing ranges, all the range set
984  // handlers have the same ranges. Find the closed range set
985  // handler with the largest event number, that will be the one
986  // which we will use as the file switch boundary. Note that is
987  // may not match the exactly the schedule that triggered the
988  // switch. Do we need to fix this?
989  //
990  // If we do not find any handlers with valid event info then we
991  // use the first one, which is just fine. This happens for
992  // example when we are dropping all events.
993  unsigned largestEvent = 1U;
994  ScheduleID idxOfMax{ScheduleID::first()};
995  ScheduleID idx{ScheduleID::first()};
997  auto& rsh = dynamic_cast<ClosedRangeSetHandler const&>(val);
998  // Make sure the event number is a valid event number before using
999  // it. It can be invalid in the handler if we have not yet read an
1000  // event, which happens with empty subruns and when we are
1001  // dropping all events.
1002  if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
1003  if (rsh.eventInfo().id().event() > largestEvent) {
1004  largestEvent = rsh.eventInfo().id().event();
1005  idxOfMax = idx;
1006  }
1007  }
1008  idx = idx.next();
1009 
1010  unique_ptr<RangeSetHandler> rshAtSwitch{
1012  if (main_schedule().fileStatus() == OutputFileStatus::Switching) {
1013  rshAtSwitch->maybeSplitRange();
1014  unique_ptr<RangeSetHandler> runRSHAtSwitch{
1015  schedule(idxOfMax).runRangeSetHandler().clone()};
1016  runRSHAtSwitch->maybeSplitRange();
1017  main_schedule().seedRunRangeSet(*runRSHAtSwitch);
1018  } else {
1019  // We are at the end of the job.
1020  rshAtSwitch->flushRanges();
1021  }
1022  main_schedule().seedSubRunRangeSet(*rshAtSwitch);
1023  subRunPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
1024  main_schedule().setSubRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
1025  }
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:168
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:156
std::string string
Definition: nybbler.cc:12
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:163
Schedule & main_schedule()
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
ScheduleIteration scheduleIteration_
RangeSetHandler const & subRunRangeSetHandler()
Definition: Schedule.h:179
string tmp
Definition: languages.py:63
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
RangeSet seenRanges() const
static QCString type
Definition: declinfo.cpp:672
RangeSetHandler * clone() const
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:138
void art::EventProcessor::terminateAbnormally_ ( )
private

Definition at line 1572 of file EventProcessor.cc.

1572  {
1573  if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
1574  ServiceHandle<RandomNumberGenerator>()->saveToFile_();
1575  }
1576  }
1577  catch (...) {
1578  }
void art::EventProcessor::writeEvent ( )
private
void art::EventProcessor::writeRun ( )
private

Definition at line 837 of file EventProcessor.cc.

838  {
839  assert(runPrincipal_);
840  // Precondition: The RunID does not correspond to a flush ID.
841  RunID const r{runPrincipal_->runID()};
842  assert(!r.isFlush());
844  FDEBUG(1) << string(8, ' ') << "writeRun....................(" << r
845  << ")\n";
846  }
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
tsan_unique_ptr< RunPrincipal > runPrincipal_
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void writeRun(RunPrincipal &rp)
Definition: Schedule.h:150
void art::EventProcessor::writeSubRun ( )
private

Definition at line 1063 of file EventProcessor.cc.

1064  {
1065  assert(subRunPrincipal_);
1066  // Precondition: The SubRunID does not correspond to a flush ID.
1067  SubRunID const& sr{subRunPrincipal_->subRunID()};
1068  assert(!sr.isFlush());
1070  FDEBUG(1) << string(8, ' ') << "writeSubRun.................(" << sr
1071  << ")\n";
1072  }
std::string string
Definition: nybbler.cc:12
Schedule & main_schedule()
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
static constexpr double sr
Definition: Units.h:166
void writeSubRun(SubRunPrincipal &srp)
Definition: Schedule.h:173

Member Data Documentation

ActivityRegistry art::EventProcessor::actReg_ {}
private

Definition at line 185 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::beginRunCalled_ {false}
private

Definition at line 172 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::beginSubRunCalled_ {false}
private

Definition at line 175 of file EventProcessor.h.

tsan<detail::ExceptionCollector> art::EventProcessor::ec_ {}
private

Definition at line 166 of file EventProcessor.h.

PerScheduleContainer<std::unique_ptr<EventPrincipal> > art::EventProcessor::eventPrincipals_ {}
private

Definition at line 245 of file EventProcessor.h.

tsan_unique_ptr<FileBlock> art::EventProcessor::fb_ {nullptr}
private

Definition at line 236 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::fileSwitchInProgress_ {false}
private

Definition at line 265 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::finalizeRunEnabled_ {false}
private

Definition at line 178 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::finalizeSubRunEnabled_ {false}
private

Definition at line 181 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::firstEvent_ {true}
private

Definition at line 262 of file EventProcessor.h.

bool const art::EventProcessor::handleEmptyRuns_
private

Definition at line 248 of file EventProcessor.h.

bool const art::EventProcessor::handleEmptySubRuns_
private

Definition at line 251 of file EventProcessor.h.

tsan_unique_ptr<InputSource> art::EventProcessor::input_ {nullptr}
private

Definition at line 230 of file EventProcessor.h.

tsan<MFStatusUpdater> art::EventProcessor::mfStatusUpdater_ {actReg_}
private

Definition at line 188 of file EventProcessor.h.

std::atomic<Level> art::EventProcessor::nextLevel_ {Level::ReadyToAdvance}
private

Definition at line 163 of file EventProcessor.h.

tsan<UpdateOutputCallbacks> art::EventProcessor::outputCallbacks_ {}
private

Definition at line 193 of file EventProcessor.h.

tsan<PathManager> art::EventProcessor::pathManager_
private

Definition at line 227 of file EventProcessor.h.

tsan<ProductDescriptions> art::EventProcessor::producedProductDescriptions_ {}
private

Definition at line 199 of file EventProcessor.h.

tsan<ProductTables> art::EventProcessor::producedProductLookupTables_ {ProductTables::invalid()}
private

Definition at line 207 of file EventProcessor.h.

tsan<ProducingServiceSignals> art::EventProcessor::psSignals_ {}
private

Definition at line 209 of file EventProcessor.h.

tsan_unique_ptr<RunPrincipal> art::EventProcessor::runPrincipal_ {nullptr}
private

Definition at line 239 of file EventProcessor.h.

ScheduleIteration art::EventProcessor::scheduleIteration_
private

Definition at line 220 of file EventProcessor.h.

tsan<Scheduler> art::EventProcessor::scheduler_
private

Definition at line 214 of file EventProcessor.h.

tsan<std::map<ScheduleID, Schedule> > art::EventProcessor::schedules_ {}
private

Definition at line 233 of file EventProcessor.h.

tsan_unique_ptr<ServicesManager> art::EventProcessor::servicesManager_
private

Definition at line 223 of file EventProcessor.h.

SharedException art::EventProcessor::sharedException_
private

Definition at line 255 of file EventProcessor.h.

detail::SharedResources art::EventProcessor::sharedResources_ {}
private

Definition at line 218 of file EventProcessor.h.

tsan_unique_ptr<SubRunPrincipal> art::EventProcessor::subRunPrincipal_ {nullptr}
private

Definition at line 242 of file EventProcessor.h.

std::unique_ptr<GlobalTaskGroup> art::EventProcessor::taskGroup_ {nullptr}
private

Definition at line 216 of file EventProcessor.h.

tsan<cet::cpu_timer> art::EventProcessor::timer_ {}
private

Definition at line 169 of file EventProcessor.h.


The documentation for this class was generated from the following files: