EndPathExecutor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
24 #include "hep_concurrency/WaitingTask.h"
26 
27 #include <memory>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31 
32 using namespace hep::concurrency;
33 using namespace std;
34 
35 namespace art {
36 
37  EndPathExecutor::EndPathExecutor(ScheduleID const sid,
38  PathManager& pm,
39  ActionTable const& actionTable,
40  ActivityRegistry const& areg,
41  UpdateOutputCallbacks& outputCallbacks,
42  GlobalTaskGroup& group)
43  : sc_{sid}
44  , actionTable_{actionTable}
45  , actReg_{areg}
46  , endPathInfo_{pm.endPathInfo(sid)}
47  , taskGroup_{group}
48  {
49  for (auto const& val : endPathInfo_.workers()) {
50  auto w = val.second.get();
51  assert(sid == w->scheduleID());
52  auto owp = dynamic_cast<OutputWorker*>(w);
53  if (owp != nullptr) {
54  outputWorkers_.emplace_back(owp);
55  }
56  }
57  outputWorkersToOpen_.insert(outputWorkers_.cbegin(), outputWorkers_.cend());
58  outputCallbacks.registerCallback(
59  [this](auto const& tables) { this->selectProducts(tables); });
60  }
61 
62  void
64  {
65  for (auto& label_and_worker : endPathInfo_.workers()) {
66  auto& w = *label_and_worker.second;
68  continue;
69  }
70  w.beginJob(resources);
71  }
72  }
73 
74  void
76  {
78  // FIXME: There seems to be little value-added by the catch and rethrow
79  // here.
80  for (auto& label_and_worker : endPathInfo_.workers()) {
81  auto& w = *label_and_worker.second;
83  continue;
84  }
85  try {
86  w.endJob();
87  }
88  catch (cet::exception& e) {
89  error << "cet::exception caught in Schedule::endJob\n"
90  << e.explain_self();
91  throw error;
92  }
93  catch (exception& e) {
94  error << "Standard library exception caught in Schedule::endJob\n"
95  << e.what();
96  throw error;
97  }
98  catch (...) {
99  error << "Unknown exception caught in Schedule::endJob\n";
100  throw error;
101  }
102  }
103  }
104 
105  void
107  {
108  for (auto ow : outputWorkers_) {
109  ow->selectProducts(tables);
110  }
111  }
112 
113  void
115  {
116  for (auto& label_and_worker : endPathInfo_.workers()) {
117  auto& w = *label_and_worker.second;
119  continue;
120  }
121  w.respondToOpenInputFile(fb);
122  }
123  }
124 
125  void
127  {
128  for (auto& label_and_worker : endPathInfo_.workers()) {
129  auto& w = *label_and_worker.second;
131  continue;
132  }
133  w.respondToCloseInputFile(fb);
134  }
135  }
136 
137  void
139  {
140  for (auto& label_and_worker : endPathInfo_.workers()) {
141  auto& w = *label_and_worker.second;
143  continue;
144  }
145  w.respondToOpenOutputFiles(fb);
146  }
147  }
148 
149  void
151  {
152  for (auto& label_and_worker : endPathInfo_.workers()) {
153  auto& w = *label_and_worker.second;
155  continue;
156  }
157  w.respondToCloseOutputFiles(fb);
158  }
159  }
160 
161  bool
163  {
164  return any_of(outputWorkers_.cbegin(), outputWorkers_.cend(), [](auto ow) {
165  return ow->fileIsOpen();
166  });
167  }
168 
169  void
171  {
172  for (auto ow : outputWorkers_) {
173  actReg_.sPreCloseOutputFile.invoke(ow->label());
174  ow->closeFile();
176  OutputFileInfo(ow->label(), ow->lastClosedFileName()));
177  }
178  }
179 
180  void
182  {
183  runRangeSetHandler_.reset(rsh.clone());
184  }
185 
186  void
188  {
189  for (auto ow : outputWorkers_) {
190  ow->setRunAuxiliaryRangeSetID(rangeSet);
191  }
192  }
193 
194  void
196  {
197  for (auto ow : outputWorkers_) {
198  ow->writeRun(rp);
199  }
201  runRangeSetHandler_->rebase();
202  }
203  }
204 
205  void
207  {
208  subRunRangeSetHandler_.reset(rsh.clone());
209  }
210 
211  void
213  {
214  for (auto ow : outputWorkers_) {
215  // For RootOutput this enters the possibly split range set into
216  // the range set db.
217  ow->setSubRunAuxiliaryRangeSetID(rs);
218  }
219  }
220 
221  void
223  {
224  for (auto ow : outputWorkers_) {
225  ow->writeSubRun(srp);
226  }
228  subRunRangeSetHandler_->rebase();
229  }
230  }
231 
232  //
233  // MEMBER FUNCTIONS -- Process Non-Event
234  //
235 
236  void
238  {
239  for (auto& label_and_worker : endPathInfo_.workers()) {
240  auto& w = *label_and_worker.second;
242  continue;
243  }
244  w.reset();
245  }
246  try {
247  if (!endPathInfo_.paths().empty()) {
248  endPathInfo_.paths().front().process(trans, principal);
249  }
250  }
251  catch (cet::exception& ex) {
252  throw Exception(errors::EventProcessorFailure, "EndPathExecutor:")
253  << "an exception occurred during current event processing\n"
254  << ex;
255  }
256  catch (...) {
257  mf::LogError("PassingThrough")
258  << "an exception occurred during current event processing\n";
259  throw;
260  }
262  }
263 
265  public:
266  PathsDoneTask(EndPathExecutor* const endPathExec,
267  WaitingTaskPtr const finalizeEventTask,
268  GlobalTaskGroup& taskGroup)
269  : endPathExec_{endPathExec}
270  , finalizeEventTask_{finalizeEventTask}
271  , taskGroup_{taskGroup}
272  {}
273 
274  void
275  operator()(exception_ptr const ex)
276  {
277  auto const scheduleID = endPathExec_->sc_.id();
278 
279  // Note: When we start our parent task is the eventLoop task.
280  TDEBUG_BEGIN_TASK_SI(4, scheduleID);
281 
282  if (ex) {
283  try {
284  rethrow_exception(ex);
285  }
286  catch (cet::exception& e) {
287  Exception tmp(errors::EventProcessorFailure, "EndPathExecutor:");
288  tmp << "an exception occurred during current event processing\n" << e;
289  taskGroup_.may_run(finalizeEventTask_, make_exception_ptr(tmp));
290  TDEBUG_END_TASK_SI(4, scheduleID)
291  << "end path processing terminate because of EXCEPTION";
292  return;
293  }
294  catch (...) {
295  taskGroup_.may_run(finalizeEventTask_, current_exception());
296  TDEBUG_END_TASK_SI(4, scheduleID)
297  << "end path processing terminate because of EXCEPTION";
298  return;
299  }
300  }
301 
302  endPathExec_->endPathInfo_.incrementPassedEventCount();
303 
304  taskGroup_.may_run(finalizeEventTask_);
305  TDEBUG_END_TASK_SI(4, scheduleID);
306  }
307 
308  private:
310  WaitingTaskPtr const finalizeEventTask_;
312  };
313 
314  // Note: We come here as part of the endPath task, our
315  // parent task is the eventLoop task.
316  void
317  EndPathExecutor::process_event(WaitingTaskPtr finalizeEventTask,
318  EventPrincipal& ep)
319  {
320  auto const sid = sc_.id();
321  TDEBUG_BEGIN_FUNC_SI(4, sid);
324  try {
325  auto pathsDoneTask =
326  make_waiting_task<PathsDoneTask>(this, finalizeEventTask, taskGroup_);
327  if (endPathInfo_.paths().empty()) {
328  taskGroup_.may_run(pathsDoneTask);
329  } else {
330  endPathInfo_.paths().front().process(pathsDoneTask, ep);
331  }
332  }
333  catch (...) {
334  taskGroup_.may_run(finalizeEventTask, current_exception());
335  }
336  TDEBUG_END_FUNC_SI(4, sid);
337  }
338 
339  void
341  {
342  // We don't worry about providing the sorted list of module names
343  // for the end_path right now. If users decide it is necessary to
344  // know what they are, then we can provide them.
346  for (auto ow : outputWorkers_) {
347  ModuleContext const mc{pc, ow->description()};
348  actReg_.sPreWriteEvent.invoke(mc);
349  ow->writeEvent(ep, mc);
350  actReg_.sPostWriteEvent.invoke(mc);
351  }
352  auto const& eid = ep.eventID();
353  bool const lastInSubRun{ep.isLastInSubRun()};
354  TDEBUG_FUNC_SI(5, sc_.id())
355  << "eid: " << eid.run() << ", " << eid.subRun() << ", " << eid.event();
356  runRangeSetHandler_->update(eid, lastInSubRun);
357  subRunRangeSetHandler_->update(eid, lastInSubRun);
358  }
359 
360  bool
362  {
363  return !outputWorkersToClose_.empty();
364  }
365 
366  // MT note: This is where we need to get all the schedules
367  // synchronized, and then have all schedules do the file
368  // close, and then the file open, then the schedules can
369  // proceed. A nasty complication is that a great deal of
370  // time can go by between the file close and the file open
371  // because artdaq may pause the run inbetween, and wants to
372  // have all output files closed while the run is paused.
373  // They probably want the input file closed too.
374  void
376  {
378  for (auto ow : outputWorkersToClose_) {
379  // Skip files that are already closed due to other end-path
380  // executors already closing them.
381  if (!ow->fileIsOpen()) {
382  continue;
383  }
384  actReg_.sPreCloseOutputFile.invoke(ow->label());
385  ow->closeFile();
387  OutputFileInfo{ow->label(), ow->lastClosedFileName()});
388  }
389  outputWorkersToOpen_ = move(outputWorkersToClose_);
390  }
391 
392  bool
394  {
395  return !outputWorkersToOpen_.empty();
396  }
397 
398  void
400  {
401  for (auto ow : outputWorkersToOpen_) {
402  if (!ow->openFile(fb)) {
403  continue;
404  }
405  actReg_.sPostOpenOutputFile.invoke(ow->label());
406  }
408  outputWorkersToOpen_.clear();
409  }
410 
411  // Note: When we are passed OutputFileStatus::Switching, we must close
412  // the file and call openSomeOutputFiles which changes it back
413  // to OutputFileStatus::Open.
414  // A side effect of switching status is the run/subrun writes
415  // are not counted in the overall counting by RootOutputClosingCriteria
416  // while the switch is active (this avoids counting the extra subRun and
417  // Run that we are forced to write to finish out the file we are
418  // closing, which keeps the ongoing count for closing based on SubRun
419  // and Run counts meaningful). However, the extra ones are still
420  // counted by the tree entry counters.
421  void
423  {
424  for (auto ow : outputWorkers_) {
425  ow->setFileStatus(ofs);
426  }
427  fileStatus_ = ofs;
428  }
429 
430  void
432  {
433  for (auto ow : outputWorkers_) {
434  if (atBoundary < ow->fileGranularity()) {
435  // The boundary we are checking at is finer than the checks
436  // the output worker needs, nothing to do.
437  continue;
438  }
439  auto wants_to_close = ow->requestsToCloseFile();
440  if (wants_to_close) {
441  outputWorkersToClose_.insert(ow);
442  }
443  }
444  }
445 
446  void
448  {
449  for (auto ow : outputWorkers_) {
450  ow->incrementInputFileNumber();
451  }
452  }
453 
454  bool
456  {
457  if (outputWorkers_.empty()) {
458  return false;
459  }
460  bool all_at_limit = true;
461  for (auto w : outputWorkers_) {
462  if (!w->limitReached()) {
463  all_at_limit = false;
464  break;
465  }
466  }
467  if (all_at_limit) {
468  mf::LogInfo("SuccessfulTermination")
469  << "The job is terminating successfully because each output module\n"
470  << "has reached its configured limit.\n";
471  }
472  return all_at_limit;
473  }
474 
475 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
EventID const & eventID() const
Definition: Principal.cc:1064
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
bool outputsToClose() const
bool allAtLimit() const
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenOutputFile
void seedRunRangeSet(RangeSetHandler const &)
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
#define TDEBUG_END_TASK_SI(LEVEL, SI)
bool skip_non_replicated(Worker const &)
PathsDoneTask(EndPathExecutor *const endPathExec, WaitingTaskPtr const finalizeEventTask, GlobalTaskGroup &taskGroup)
error
Definition: include.cc:26
STL namespace.
void writeSubRun(SubRunPrincipal &srp)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void selectProducts(ProductTables const &)
void operator()(exception_ptr const ex)
bool isLastInSubRun() const
Definition: Principal.cc:924
void recordOutputClosureRequests(Granularity)
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreWriteEvent
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
OutputFileStatus
const double e
Transition
Definition: Transition.h:7
void setOutputFileStatus(OutputFileStatus)
def move(depos, offset)
Definition: depos.py:107
std::atomic< OutputFileStatus > fileStatus_
std::set< OutputWorker * > outputWorkersToClose_
void seedSubRunRangeSet(RangeSetHandler const &)
bool someOutputsOpen() const
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
string tmp
Definition: languages.py:63
static auto end_path_spec()
Definition: PathContext.h:20
void writeRun(RunPrincipal &rp)
void beginJob(detail::SharedResources const &resources)
void reset_for_event()
Definition: PathsInfo.cc:82
bool outputsToOpen() const
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
std::set< OutputWorker * > outputWorkersToOpen_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void respondToCloseInputFile(FileBlock const &fb)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
void writeEvent(EventPrincipal &)
void incrementPassedEventCount()
Definition: PathsInfo.cc:101
ActionTable const & actionTable_
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
RangeSetHandler * clone() const
void process(Transition, Principal &)
GlobalTaskGroup & taskGroup_
ActivityRegistry const & actReg_
void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void incrementTotalEventCount()
Definition: PathsInfo.cc:95
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostWriteEvent