Classes | Public Member Functions | Private Attributes | Friends | List of all members
art::EndPathExecutor Class Reference

#include <EndPathExecutor.h>

Classes

class  PathsDoneTask
 

Public Member Functions

 EndPathExecutor (ScheduleID sid, PathManager &pm, ActionTable const &actions, ActivityRegistry const &areg, UpdateOutputCallbacks &callbacks, GlobalTaskGroup &task_group)
 
 EndPathExecutor (EndPathExecutor &&)=delete
 
EndPathExecutoroperator= (EndPathExecutor &&)=delete
 
 EndPathExecutor (EndPathExecutor const &)=delete
 
EndPathExecutoroperator= (EndPathExecutor const &)=delete
 
void beginJob (detail::SharedResources const &resources)
 
void endJob ()
 
void selectProducts (ProductTables const &)
 
void respondToOpenInputFile (FileBlock const &fb)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenOutputFiles (FileBlock const &fb)
 
void respondToCloseOutputFiles (FileBlock const &fb)
 
bool someOutputsOpen () const
 
void closeAllOutputFiles ()
 
void seedRunRangeSet (RangeSetHandler const &)
 
void setRunAuxiliaryRangeSetID (RangeSet const &rs)
 
void writeRun (RunPrincipal &rp)
 
void seedSubRunRangeSet (RangeSetHandler const &)
 
void setSubRunAuxiliaryRangeSetID (RangeSet const &rs)
 
void writeSubRun (SubRunPrincipal &srp)
 
void process (Transition, Principal &)
 
void process_event (hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
 
void writeEvent (EventPrincipal &)
 
bool outputsToClose () const
 
void closeSomeOutputFiles ()
 
bool outputsToOpen () const
 
void openSomeOutputFiles (FileBlock const &fb)
 
void setOutputFileStatus (OutputFileStatus)
 
void recordOutputClosureRequests (Granularity)
 
void incrementInputFileNumber ()
 
bool allAtLimit () const
 

Private Attributes

ScheduleContext const sc_
 
ActionTable const & actionTable_
 
ActivityRegistry const & actReg_
 
PathsInfoendPathInfo_
 
GlobalTaskGrouptaskGroup_
 
std::vector< OutputWorker * > outputWorkers_ {}
 
std::unique_ptr< RangeSetHandlerrunRangeSetHandler_ {nullptr}
 
std::unique_ptr< RangeSetHandlersubRunRangeSetHandler_ {nullptr}
 
std::atomic< OutputFileStatusfileStatus_ {OutputFileStatus::Closed}
 
std::set< OutputWorker * > outputWorkersToOpen_ {}
 
std::set< OutputWorker * > outputWorkersToClose_ {}
 

Friends

class Schedule
 

Detailed Description

Definition at line 42 of file EndPathExecutor.h.

Constructor & Destructor Documentation

art::EndPathExecutor::EndPathExecutor ( ScheduleID  sid,
PathManager pm,
ActionTable const &  actions,
ActivityRegistry const &  areg,
UpdateOutputCallbacks callbacks,
GlobalTaskGroup task_group 
)

Definition at line 37 of file EndPathExecutor.cc.

43  : sc_{sid}
44  , actionTable_{actionTable}
45  , actReg_{areg}
46  , endPathInfo_{pm.endPathInfo(sid)}
47  , taskGroup_{group}
48  {
49  for (auto const& val : endPathInfo_.workers()) {
50  auto w = val.second.get();
51  assert(sid == w->scheduleID());
52  auto owp = dynamic_cast<OutputWorker*>(w);
53  if (owp != nullptr) {
54  outputWorkers_.emplace_back(owp);
55  }
56  }
57  outputWorkersToOpen_.insert(outputWorkers_.cbegin(), outputWorkers_.cend());
58  outputCallbacks.registerCallback(
59  [this](auto const& tables) { this->selectProducts(tables); });
60  }
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void selectProducts(ProductTables const &)
ScheduleContext const sc_
std::set< OutputWorker * > outputWorkersToOpen_
ActionTable const & actionTable_
GlobalTaskGroup & taskGroup_
ActivityRegistry const & actReg_
std::vector< OutputWorker * > outputWorkers_
art::EndPathExecutor::EndPathExecutor ( EndPathExecutor &&  )
delete
art::EndPathExecutor::EndPathExecutor ( EndPathExecutor const &  )
delete

Member Function Documentation

bool art::EndPathExecutor::allAtLimit ( ) const

Definition at line 455 of file EndPathExecutor.cc.

456  {
457  if (outputWorkers_.empty()) {
458  return false;
459  }
460  bool all_at_limit = true;
461  for (auto w : outputWorkers_) {
462  if (!w->limitReached()) {
463  all_at_limit = false;
464  break;
465  }
466  }
467  if (all_at_limit) {
468  mf::LogInfo("SuccessfulTermination")
469  << "The job is terminating successfully because each output module\n"
470  << "has reached its configured limit.\n";
471  }
472  return all_at_limit;
473  }
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::beginJob ( detail::SharedResources const &  resources)

Definition at line 63 of file EndPathExecutor.cc.

64  {
65  for (auto& label_and_worker : endPathInfo_.workers()) {
66  auto& w = *label_and_worker.second;
68  continue;
69  }
70  w.beginJob(resources);
71  }
72  }
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::EndPathExecutor::closeAllOutputFiles ( )

Definition at line 170 of file EndPathExecutor.cc.

171  {
172  for (auto ow : outputWorkers_) {
173  actReg_.sPreCloseOutputFile.invoke(ow->label());
174  ow->closeFile();
176  OutputFileInfo(ow->label(), ow->lastClosedFileName()));
177  }
178  }
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
ActivityRegistry const & actReg_
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::closeSomeOutputFiles ( )

Definition at line 375 of file EndPathExecutor.cc.

376  {
378  for (auto ow : outputWorkersToClose_) {
379  // Skip files that are already closed due to other end-path
380  // executors already closing them.
381  if (!ow->fileIsOpen()) {
382  continue;
383  }
384  actReg_.sPreCloseOutputFile.invoke(ow->label());
385  ow->closeFile();
387  OutputFileInfo{ow->label(), ow->lastClosedFileName()});
388  }
389  outputWorkersToOpen_ = move(outputWorkersToClose_);
390  }
void setOutputFileStatus(OutputFileStatus)
def move(depos, offset)
Definition: depos.py:107
std::set< OutputWorker * > outputWorkersToClose_
std::set< OutputWorker * > outputWorkersToOpen_
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
ActivityRegistry const & actReg_
void art::EndPathExecutor::endJob ( )

Definition at line 75 of file EndPathExecutor.cc.

76  {
78  // FIXME: There seems to be little value-added by the catch and rethrow
79  // here.
80  for (auto& label_and_worker : endPathInfo_.workers()) {
81  auto& w = *label_and_worker.second;
83  continue;
84  }
85  try {
86  w.endJob();
87  }
88  catch (cet::exception& e) {
89  error << "cet::exception caught in Schedule::endJob\n"
90  << e.explain_self();
91  throw error;
92  }
93  catch (exception& e) {
94  error << "Standard library exception caught in Schedule::endJob\n"
95  << e.what();
96  throw error;
97  }
98  catch (...) {
99  error << "Unknown exception caught in Schedule::endJob\n";
100  throw error;
101  }
102  }
103  }
bool skip_non_replicated(Worker const &)
error
Definition: include.cc:26
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
const double e
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EndPathExecutor::incrementInputFileNumber ( )

Definition at line 447 of file EndPathExecutor.cc.

448  {
449  for (auto ow : outputWorkers_) {
450  ow->incrementInputFileNumber();
451  }
452  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::openSomeOutputFiles ( FileBlock const &  fb)

Definition at line 399 of file EndPathExecutor.cc.

400  {
401  for (auto ow : outputWorkersToOpen_) {
402  if (!ow->openFile(fb)) {
403  continue;
404  }
405  actReg_.sPostOpenOutputFile.invoke(ow->label());
406  }
408  outputWorkersToOpen_.clear();
409  }
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenOutputFile
void setOutputFileStatus(OutputFileStatus)
std::set< OutputWorker * > outputWorkersToOpen_
ActivityRegistry const & actReg_
EndPathExecutor& art::EndPathExecutor::operator= ( EndPathExecutor &&  )
delete
EndPathExecutor& art::EndPathExecutor::operator= ( EndPathExecutor const &  )
delete
bool art::EndPathExecutor::outputsToClose ( ) const

Definition at line 361 of file EndPathExecutor.cc.

362  {
363  return !outputWorkersToClose_.empty();
364  }
std::set< OutputWorker * > outputWorkersToClose_
bool art::EndPathExecutor::outputsToOpen ( ) const

Definition at line 393 of file EndPathExecutor.cc.

394  {
395  return !outputWorkersToOpen_.empty();
396  }
std::set< OutputWorker * > outputWorkersToOpen_
void art::EndPathExecutor::process ( Transition  trans,
Principal principal 
)

Definition at line 237 of file EndPathExecutor.cc.

238  {
239  for (auto& label_and_worker : endPathInfo_.workers()) {
240  auto& w = *label_and_worker.second;
242  continue;
243  }
244  w.reset();
245  }
246  try {
247  if (!endPathInfo_.paths().empty()) {
248  endPathInfo_.paths().front().process(trans, principal);
249  }
250  }
251  catch (cet::exception& ex) {
252  throw Exception(errors::EventProcessorFailure, "EndPathExecutor:")
253  << "an exception occurred during current event processing\n"
254  << ex;
255  }
256  catch (...) {
257  mf::LogError("PassingThrough")
258  << "an exception occurred during current event processing\n";
259  throw;
260  }
262  }
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
bool skip_non_replicated(Worker const &)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void incrementPassedEventCount()
Definition: PathsInfo.cc:101
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EndPathExecutor::process_event ( hep::concurrency::WaitingTaskPtr  finalizeEventTask,
EventPrincipal  
)

Definition at line 317 of file EndPathExecutor.cc.

319  {
320  auto const sid = sc_.id();
321  TDEBUG_BEGIN_FUNC_SI(4, sid);
324  try {
325  auto pathsDoneTask =
326  make_waiting_task<PathsDoneTask>(this, finalizeEventTask, taskGroup_);
327  if (endPathInfo_.paths().empty()) {
328  taskGroup_.may_run(pathsDoneTask);
329  } else {
330  endPathInfo_.paths().front().process(pathsDoneTask, ep);
331  }
332  }
333  catch (...) {
334  taskGroup_.may_run(finalizeEventTask, current_exception());
335  }
336  TDEBUG_END_FUNC_SI(4, sid);
337  }
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
ScheduleContext const sc_
void reset_for_event()
Definition: PathsInfo.cc:82
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalTaskGroup & taskGroup_
void incrementTotalEventCount()
Definition: PathsInfo.cc:95
void art::EndPathExecutor::recordOutputClosureRequests ( Granularity  atBoundary)

Definition at line 431 of file EndPathExecutor.cc.

432  {
433  for (auto ow : outputWorkers_) {
434  if (atBoundary < ow->fileGranularity()) {
435  // The boundary we are checking at is finer than the checks
436  // the output worker needs, nothing to do.
437  continue;
438  }
439  auto wants_to_close = ow->requestsToCloseFile();
440  if (wants_to_close) {
441  outputWorkersToClose_.insert(ow);
442  }
443  }
444  }
std::set< OutputWorker * > outputWorkersToClose_
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::respondToCloseInputFile ( FileBlock const &  fb)

Definition at line 126 of file EndPathExecutor.cc.

127  {
128  for (auto& label_and_worker : endPathInfo_.workers()) {
129  auto& w = *label_and_worker.second;
131  continue;
132  }
133  w.respondToCloseInputFile(fb);
134  }
135  }
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::EndPathExecutor::respondToCloseOutputFiles ( FileBlock const &  fb)

Definition at line 150 of file EndPathExecutor.cc.

151  {
152  for (auto& label_and_worker : endPathInfo_.workers()) {
153  auto& w = *label_and_worker.second;
155  continue;
156  }
157  w.respondToCloseOutputFiles(fb);
158  }
159  }
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::EndPathExecutor::respondToOpenInputFile ( FileBlock const &  fb)

Definition at line 114 of file EndPathExecutor.cc.

115  {
116  for (auto& label_and_worker : endPathInfo_.workers()) {
117  auto& w = *label_and_worker.second;
119  continue;
120  }
121  w.respondToOpenInputFile(fb);
122  }
123  }
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::EndPathExecutor::respondToOpenOutputFiles ( FileBlock const &  fb)

Definition at line 138 of file EndPathExecutor.cc.

139  {
140  for (auto& label_and_worker : endPathInfo_.workers()) {
141  auto& w = *label_and_worker.second;
143  continue;
144  }
145  w.respondToOpenOutputFiles(fb);
146  }
147  }
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::EndPathExecutor::seedRunRangeSet ( RangeSetHandler const &  rsh)

Definition at line 181 of file EndPathExecutor.cc.

182  {
183  runRangeSetHandler_.reset(rsh.clone());
184  }
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void art::EndPathExecutor::seedSubRunRangeSet ( RangeSetHandler const &  rsh)

Definition at line 206 of file EndPathExecutor.cc.

207  {
208  subRunRangeSetHandler_.reset(rsh.clone());
209  }
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
void art::EndPathExecutor::selectProducts ( ProductTables const &  tables)

Definition at line 106 of file EndPathExecutor.cc.

107  {
108  for (auto ow : outputWorkers_) {
109  ow->selectProducts(tables);
110  }
111  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setOutputFileStatus ( OutputFileStatus  ofs)

Definition at line 422 of file EndPathExecutor.cc.

423  {
424  for (auto ow : outputWorkers_) {
425  ow->setFileStatus(ofs);
426  }
427  fileStatus_ = ofs;
428  }
std::atomic< OutputFileStatus > fileStatus_
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 187 of file EndPathExecutor.cc.

188  {
189  for (auto ow : outputWorkers_) {
190  ow->setRunAuxiliaryRangeSetID(rangeSet);
191  }
192  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setSubRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 212 of file EndPathExecutor.cc.

213  {
214  for (auto ow : outputWorkers_) {
215  // For RootOutput this enters the possibly split range set into
216  // the range set db.
217  ow->setSubRunAuxiliaryRangeSetID(rs);
218  }
219  }
std::vector< OutputWorker * > outputWorkers_
bool art::EndPathExecutor::someOutputsOpen ( ) const

Definition at line 162 of file EndPathExecutor.cc.

163  {
164  return any_of(outputWorkers_.cbegin(), outputWorkers_.cend(), [](auto ow) {
165  return ow->fileIsOpen();
166  });
167  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::writeEvent ( EventPrincipal ep)

Definition at line 340 of file EndPathExecutor.cc.

341  {
342  // We don't worry about providing the sorted list of module names
343  // for the end_path right now. If users decide it is necessary to
344  // know what they are, then we can provide them.
345  PathContext const pc{sc_, PathContext::end_path_spec(), {}};
346  for (auto ow : outputWorkers_) {
347  ModuleContext const mc{pc, ow->description()};
348  actReg_.sPreWriteEvent.invoke(mc);
349  ow->writeEvent(ep, mc);
350  actReg_.sPostWriteEvent.invoke(mc);
351  }
352  auto const& eid = ep.eventID();
353  bool const lastInSubRun{ep.isLastInSubRun()};
354  TDEBUG_FUNC_SI(5, sc_.id())
355  << "eid: " << eid.run() << ", " << eid.subRun() << ", " << eid.event();
356  runRangeSetHandler_->update(eid, lastInSubRun);
357  subRunRangeSetHandler_->update(eid, lastInSubRun);
358  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreWriteEvent
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
static auto end_path_spec()
Definition: PathContext.h:20
ActivityRegistry const & actReg_
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostWriteEvent
void art::EndPathExecutor::writeRun ( RunPrincipal rp)

Definition at line 195 of file EndPathExecutor.cc.

196  {
197  for (auto ow : outputWorkers_) {
198  ow->writeRun(rp);
199  }
201  runRangeSetHandler_->rebase();
202  }
203  }
std::atomic< OutputFileStatus > fileStatus_
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void art::EndPathExecutor::writeSubRun ( SubRunPrincipal srp)

Definition at line 222 of file EndPathExecutor.cc.

223  {
224  for (auto ow : outputWorkers_) {
225  ow->writeSubRun(srp);
226  }
228  subRunRangeSetHandler_->rebase();
229  }
230  }
std::atomic< OutputFileStatus > fileStatus_
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
std::vector< OutputWorker * > outputWorkers_

Friends And Related Function Documentation

friend class Schedule
friend

Definition at line 43 of file EndPathExecutor.h.

Member Data Documentation

ActionTable const& art::EndPathExecutor::actionTable_
private

Definition at line 129 of file EndPathExecutor.h.

ActivityRegistry const& art::EndPathExecutor::actReg_
private

Definition at line 130 of file EndPathExecutor.h.

PathsInfo& art::EndPathExecutor::endPathInfo_
private

Definition at line 131 of file EndPathExecutor.h.

std::atomic<OutputFileStatus> art::EndPathExecutor::fileStatus_ {OutputFileStatus::Closed}
private

Definition at line 141 of file EndPathExecutor.h.

std::vector<OutputWorker*> art::EndPathExecutor::outputWorkers_ {}
private

Definition at line 134 of file EndPathExecutor.h.

std::set<OutputWorker*> art::EndPathExecutor::outputWorkersToClose_ {}
private

Definition at line 150 of file EndPathExecutor.h.

std::set<OutputWorker*> art::EndPathExecutor::outputWorkersToOpen_ {}
private

Definition at line 142 of file EndPathExecutor.h.

std::unique_ptr<RangeSetHandler> art::EndPathExecutor::runRangeSetHandler_ {nullptr}
private

Definition at line 136 of file EndPathExecutor.h.

ScheduleContext const art::EndPathExecutor::sc_
private

Definition at line 125 of file EndPathExecutor.h.

std::unique_ptr<RangeSetHandler> art::EndPathExecutor::subRunRangeSetHandler_ {nullptr}
private

Definition at line 138 of file EndPathExecutor.h.

GlobalTaskGroup& art::EndPathExecutor::taskGroup_
private

Definition at line 132 of file EndPathExecutor.h.


The documentation for this class was generated from the following files: