Worker.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
15 #include "cetlib_except/exception.h"
16 #include "hep_concurrency/SerialTaskQueueChain.h"
17 #include "hep_concurrency/WaitingTask.h"
18 #include "hep_concurrency/WaitingTaskList.h"
20 
21 #include <atomic>
22 #include <cassert>
23 #include <exception>
24 #include <memory>
25 #include <sstream>
26 
27 using namespace hep::concurrency;
28 using namespace std;
29 
30 using mf::LogError;
31 
32 namespace {
34  brief_context(art::ModuleDescription const& md)
35  {
36  return md.moduleName() + "/" + md.moduleLabel();
37  }
38 
40  brief_context(art::ModuleDescription const& md,
41  art::Principal const& principal)
42  {
43  std::ostringstream result;
44  result << brief_context(md) << ' ';
45  auto const bt = principal.branchType();
46  switch (bt) {
47  case art::InRun:
48  result << principal.runID();
49  break;
50  case art::InSubRun:
51  result << principal.subRunID();
52  break;
53  case art::InEvent:
54  result << principal.eventID();
55  break;
56  default: {}
57  }
58  return result.str();
59  }
60 
61  [[noreturn]] void
62  rethrow_with_context(std::exception_ptr eptr,
63  art::ModuleDescription const& md,
64  std::string const& transition)
65  {
66  using namespace art;
67  assert(eptr);
68  std::string const brief_module_context =
69  brief_context(md) + " during " + transition;
70  try {
71  std::rethrow_exception(eptr);
72  }
73  catch (cet::exception& e) {
75  "An exception was thrown while processing module " +
76  brief_module_context,
77  e};
78  }
79  catch (exception& e) {
80  throw Exception{errors::StdException, brief_module_context} << e.what();
81  }
82  catch (string& s) {
84  "A string exception was thrown while processing module " +
85  brief_module_context}
86  << s << '\n';
87  }
88  catch (char const* c) {
89  throw Exception{
91  "A char const* exception was thrown while processing module " +
92  brief_module_context}
93  << c << '\n';
94  }
95  catch (...) {
97  << "An unknown exception was thrown while processing module " +
98  brief_module_context;
99  }
100  }
101 }
102 
103 namespace art {
104 
105  Worker::Worker(ModuleDescription const& md, WorkerParams const& wp)
106  : scheduleID_{wp.scheduleID_}
107  , md_{md}
108  , actions_{wp.actions_}
109  , actReg_{wp.actReg_}
110  , waitingTasks_{wp.taskGroup_}
111  {
112  TDEBUG_FUNC_SI(5, wp.scheduleID_)
113  << hex << this << dec << " name: " << md.moduleName()
114  << " label: " << md.moduleLabel();
115  }
116 
117  ModuleDescription const&
119  {
120  return md_;
121  }
122 
123  string const&
125  {
126  return md_.moduleLabel();
127  }
128 
129  // Used only by WorkerInPath.
130  bool
132  {
133  return returnCode_.load();
134  }
135 
136  SerialTaskQueueChain*
138  {
139  return implSerialTaskQueueChain();
140  }
141 
142  // Used by EventProcessor
143  // Used by Schedule
144  // Used by EndPathExecutor
145  void
147  {
148  state_ = Ready;
149  cached_exception_ = std::exception_ptr{};
151  << hex << this << dec << " Resetting waitingTasks_";
152  waitingTasks_.reset();
153  workStarted_ = false;
154  returnCode_ = false;
155  }
156 
157  // Used only by writeSummary
158  size_t
160  {
161  return counts_visited_.load();
162  }
163 
164  // Used only by writeSummary
165  size_t
167  {
168  return counts_run_.load();
169  }
170 
171  // Used only by writeSummary
172  size_t
174  {
175  return counts_passed_.load();
176  }
177 
178  // Used only by writeSummary
179  size_t
181  {
182  return counts_failed_.load();
183  }
184 
185  // Used only by writeSummary
186  size_t
188  {
189  return counts_thrown_.load();
190  }
191 
192  void
195  implBeginJob(resources);
197  }
198  catch (...) {
199  rethrow_with_context(std::current_exception(), md_, "beginJob");
200  }
201 
202  void
204  actReg_.sPreModuleEndJob.invoke(md_);
205  implEndJob();
206  actReg_.sPostModuleEndJob.invoke(md_);
207  }
208  catch (...) {
209  rethrow_with_context(std::current_exception(), md_, "endJob");
210  }
211 
212  void
214  {
218  }
219 
220  void
222  {
226  }
227 
228  void
230  {
234  }
235 
236  void
238  {
242  }
243 
244  bool
246  Principal& principal,
247  ModuleContext const& mc)
248  {
249  switch (state_.load()) {
250  case Ready:
251  break;
252  case Pass:
253  return true;
254  case Fail:
255  return false;
256  case ExceptionThrown: {
257  // Rethrow the cached exception again. It seems impossible to
258  // get here a second time unless a cet::exception has been
259  // thrown previously.
260  mf::LogWarning("repeat") << "A module has been invoked a second time "
261  "even though it caught an exception during "
262  "the previous invocation.\nThis may be an "
263  "indication of a configuration problem.\n";
264  rethrow_exception(cached_exception_);
265  }
266  case Working:
267  break; // See below.
268  }
269  bool rc = false;
270  try {
271  if (state_.load() == Working) {
272  // Not part of the switch statement above because we want the
273  // exception to be caught by our handling mechanism.
275  << "A Module has been invoked while it is still being executed.\n"
276  << "Product dependencies have invoked a module execution cycle.\n";
277  }
278  state_ = Working;
279  if (trans == Transition::BeginRun) {
280  actReg_.sPreModuleBeginRun.invoke(mc);
281  rc = implDoBegin(dynamic_cast<RunPrincipal&>(principal), mc);
282  actReg_.sPostModuleBeginRun.invoke(mc);
283  } else if (trans == Transition::EndRun) {
284  actReg_.sPreModuleEndRun.invoke(mc);
285  rc = implDoEnd(dynamic_cast<RunPrincipal&>(principal), mc);
286  actReg_.sPostModuleEndRun.invoke(mc);
287  } else if (trans == Transition::BeginSubRun) {
288  actReg_.sPreModuleBeginSubRun.invoke(mc);
289  rc = implDoBegin(dynamic_cast<SubRunPrincipal&>(principal), mc);
290  actReg_.sPostModuleBeginSubRun.invoke(mc);
291  } else if (trans == Transition::EndSubRun) {
292  actReg_.sPreModuleEndSubRun.invoke(mc);
293  rc = implDoEnd(dynamic_cast<SubRunPrincipal&>(principal), mc);
294  actReg_.sPostModuleEndSubRun.invoke(mc);
295  }
296  state_ = Pass;
297  }
298  catch (cet::exception& e) {
300  e << "The above exception was thrown while processing module "
301  << brief_context(md_, principal) << '\n';
302  if (auto edmEx = dynamic_cast<art::Exception*>(&e)) {
303  cached_exception_ = std::make_exception_ptr(*edmEx);
304  } else {
305  auto art_ex = art::Exception{errors::OtherArt, std::string(), e};
306  cached_exception_ = std::make_exception_ptr(art_ex);
307  }
308  throw;
309  }
310  catch (std::bad_alloc const& bda) {
312  auto art_ex =
314  << "A bad_alloc exception occurred during a call to the module "
315  << brief_context(md_, principal) << '\n'
316  << "The job has probably exhausted the virtual memory available to the "
317  "process.\n";
318  cached_exception_ = make_exception_ptr(art_ex);
319  rethrow_exception(cached_exception_);
320  }
321  catch (std::exception const& e) {
323  auto art_ex = Exception{errors::StdException}
324  << "An exception occurred during a call to the module "
325  << brief_context(md_, principal) << e.what();
326  cached_exception_ = make_exception_ptr(art_ex);
327  rethrow_exception(cached_exception_);
328  }
329  catch (std::string const& s) {
331  auto art_ex = Exception{errors::BadExceptionType, "string"}
332  << "A string thrown as an exception occurred during a call "
333  "to the module "
334  << brief_context(md_, principal) << '\n'
335  << s << '\n';
336  cached_exception_ = make_exception_ptr(art_ex);
337  rethrow_exception(cached_exception_);
338  }
339  catch (char const* c) {
341  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
342  << "A char const* thrown as an exception occurred during a "
343  "call to the module "
344  << brief_context(md_, principal) << '\n'
345  << c << '\n';
346  cached_exception_ = make_exception_ptr(art_ex);
347  rethrow_exception(cached_exception_);
348  }
349  catch (...) {
351  auto art_ex =
352  Exception{errors::Unknown, "repeated"}
353  << "An unknown occurred during a previous call to the module "
354  << brief_context(md_, principal) << '\n';
355  cached_exception_ = make_exception_ptr(art_ex);
356  rethrow_exception(cached_exception_);
357  }
358  return rc;
359  }
360 
361  // This is used only to do trigger results insertion.
362  void
364  ++counts_visited_;
365  returnCode_ = false;
366  // Transition from Ready state to Working state.
367  state_ = Working;
368  actReg_.sPreModule.invoke(mc);
369  // Note: Only the TriggerResults inserter--a producer--calls this
370  // function. The return code is thus always true.
371  returnCode_ = implDoProcess(p, mc);
372  actReg_.sPostModule.invoke(mc);
373  assert(returnCode_.load());
374  state_ = Pass;
375  }
376  catch (cet::exception& e) {
377  auto action = actions_.find(e.root_cause());
378  assert(not mc.onEndPath());
380  state_ = Pass;
381  returnCode_ = true;
382  ++counts_passed_;
383  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
384  << e.what() << "\n";
385  // WARNING: We will continue execution below!!!
386  } else if (action == actions::FailModule) {
387  state_ = Fail;
388  returnCode_ = true;
389  ++counts_failed_;
390  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
391  << e.what() << "\n";
392  // WARNING: We will continue execution below!!!
393  } else {
395  ++counts_thrown_;
396  e << "The above exception was thrown while processing module "
397  << brief_context(md_, p) << '\n';
398  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
399  cached_exception_ = make_exception_ptr(*edmEx);
400  } else {
402  make_exception_ptr(Exception{errors::OtherArt, string(), e});
403  }
404  rethrow_exception(cached_exception_);
405  }
406  }
407  catch (bad_alloc const& bda) {
409  ++counts_thrown_;
410  auto art_ex =
412  << "A bad_alloc exception occurred during a call to the module "
413  << brief_context(md_, p) << '\n'
414  << "The job has probably exhausted the virtual memory available to the "
415  "process.\n";
416  cached_exception_ = make_exception_ptr(art_ex);
417  rethrow_exception(cached_exception_);
418  }
419  catch (exception const& e) {
421  ++counts_thrown_;
422  auto art_ex = Exception{errors::StdException}
423  << "An exception occurred during a call to the module "
424  << brief_context(md_, p) << '\n'
425  << e.what();
426  cached_exception_ = make_exception_ptr(art_ex);
427  rethrow_exception(cached_exception_);
428  }
429  catch (string const& s) {
431  ++counts_thrown_;
432  auto art_ex = Exception{errors::BadExceptionType, "string"}
433  << "A string thrown as an exception occurred during a call "
434  "to the module "
435  << brief_context(md_, p) << '\n'
436  << s << '\n';
437  cached_exception_ = make_exception_ptr(art_ex);
438  rethrow_exception(cached_exception_);
439  }
440  catch (char const* c) {
442  ++counts_thrown_;
443  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
444  << "A char const* thrown as an exception occurred during a "
445  "call to the module "
446  << brief_context(md_, p) << '\n'
447  << c << "\n";
448  cached_exception_ = make_exception_ptr(art_ex);
449  rethrow_exception(cached_exception_);
450  }
451  catch (...) {
452  ++counts_thrown_;
454  auto art_ex = Exception{errors::Unknown, "repeated"}
455  << "An unknown occurred during a previous call to the module "
456  << brief_context(md_, p) << '\n';
457  cached_exception_ = make_exception_ptr(art_ex);
458  rethrow_exception(cached_exception_);
459  }
460 
461  void
463  {
464  auto const sid = mc.scheduleID();
465  TDEBUG_BEGIN_TASK_SI(4, sid);
466  returnCode_ = false;
467  try {
468  // Transition from Ready state to Working state.
469  state_ = Working;
470  actReg_.sPreModule.invoke(mc);
471  // Note: Only filters ever return false, and when they do it
472  // means they have rejected.
473  returnCode_ = implDoProcess(p, mc);
474  actReg_.sPostModule.invoke(mc);
475  state_ = Fail;
476  if (returnCode_.load()) {
477  state_ = Pass;
478  }
479  }
480  catch (cet::exception& e) {
481  auto action = actions_.find(e.root_cause());
482  // If we are processing an endPath, treat SkipEvent or FailPath
483  // as FailModule, so any subsequent OutputModules are still run.
484  if (mc.onEndPath()) {
487  }
488  }
490  state_ = Pass;
491  returnCode_ = true;
492  ++counts_passed_;
493  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
494  << e.what() << "\n";
495  // WARNING: We will continue execution below!!!
496  } else if (action == actions::FailModule) {
497  state_ = Fail;
498  returnCode_ = true;
499  ++counts_failed_;
500  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
501  << e.what() << "\n";
502  // WARNING: We will continue execution below!!!
503  } else {
505  ++counts_thrown_;
506  e << "The above exception was thrown while processing module "
507  << brief_context(md_, p);
508  if (auto art_ex = dynamic_cast<Exception*>(&e)) {
509  cached_exception_ = make_exception_ptr(*art_ex);
510  } else {
512  make_exception_ptr(Exception{errors::OtherArt, string(), e});
513  }
514  waitingTasks_.doneWaiting(cached_exception_);
515  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
516  return;
517  }
518  }
519  catch (bad_alloc const& bda) {
521  ++counts_thrown_;
522  auto art_ex =
524  << "A bad_alloc exception was thrown while processing module "
525  << brief_context(md_, p) << '\n'
526  << "The job has probably exhausted the virtual memory available to the "
527  "process.\n";
528  cached_exception_ = make_exception_ptr(art_ex);
529  waitingTasks_.doneWaiting(cached_exception_);
530  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
531  return;
532  }
533  catch (exception const& e) {
535  ++counts_thrown_;
536  auto art_ex = Exception{errors::StdException}
537  << "An exception was thrown while processing module "
538  << brief_context(md_, p) << '\n'
539  << e.what();
540  cached_exception_ = make_exception_ptr(art_ex);
541  waitingTasks_.doneWaiting(cached_exception_);
542  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
543  return;
544  }
545  catch (string const& s) {
547  ++counts_thrown_;
548  auto art_ex =
550  << "A string was thrown as an exception while processing module "
551  << brief_context(md_, p) << '\n'
552  << s << '\n';
553  cached_exception_ = make_exception_ptr(art_ex);
554  waitingTasks_.doneWaiting(cached_exception_);
555  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
556  return;
557  }
558  catch (char const* c) {
560  ++counts_thrown_;
561  auto art_ex =
562  Exception{errors::BadExceptionType, "char const*"}
563  << "A char const* was thrown as an exception while processing module "
564  << brief_context(md_, p) << '\n'
565  << c << "\n";
566  cached_exception_ = make_exception_ptr(art_ex);
567  waitingTasks_.doneWaiting(cached_exception_);
568  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
569  return;
570  }
571  catch (...) {
572  ++counts_thrown_;
574  auto art_ex =
575  Exception{errors::Unknown, "repeated"}
576  << "An unknown exception was thrown while processing module "
577  << brief_context(md_, p) << '\n';
578  cached_exception_ = make_exception_ptr(art_ex);
579  waitingTasks_.doneWaiting(cached_exception_);
580  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
581  return;
582  }
583  waitingTasks_.doneWaiting(exception_ptr{});
584  TDEBUG_END_TASK_SI(4, sid);
585  }
586 
587  void
588  Worker::doWork_event(WaitingTaskPtr workerInPathDoneTask,
589  EventPrincipal& p,
590  ModuleContext const& mc)
591  {
592  auto const sid = mc.scheduleID();
593  TDEBUG_BEGIN_FUNC_SI(4, sid);
594  // Note: We actually can have more than one entry in this list
595  // because a worker may be one more than one path, and if both
596  // paths are running in parallel, then it is possible that they
597  // both attempt to run the same worker at nearly the same time.
598  // We arrange so that the worker itself only runs once, but we do
599  // have to notify all the paths that the worker has finished,
600  // hence the waiting list of notification tasks.
601  //
602  // Note: threading: More than one task can enter here in the case
603  // that paths running in parallel share the same worker.
604  waitingTasks_.add(workerInPathDoneTask);
605  ++counts_visited_;
606  bool expected = false;
607  if (workStarted_.compare_exchange_strong(expected, true)) {
608  if (auto chain = serialTaskQueueChain()) {
609  // Must be a serialized shared module (including legacy).
610  TDEBUG_FUNC_SI(4, sid) << "pushing onto chain " << hex << chain << dec;
611  chain->push([&p, &mc, this] { runWorker(p, mc); });
612  TDEBUG_END_FUNC_SI(4, sid);
613  return;
614  }
615  // Must be a replicated or shared module with no serialization.
616  TDEBUG_FUNC_SI(4, sid) << "calling worker functor";
617  runWorker(p, mc);
618  TDEBUG_END_FUNC_SI(4, sid);
619  return;
620  }
621  // Worker is running on another path, exit without running the waiting
622  // worker done tasks.
623  TDEBUG_END_FUNC_SI(4, sid) << "work already in progress on another path";
624  }
625 
626 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
void endJob()
Definition: Worker.cc:203
EventID const & eventID() const
Definition: Principal.cc:1064
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginRun
std::size_t timesExcept() const
Definition: Worker.cc:187
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndRun
virtual void implBeginJob(detail::SharedResources const &resources)=0
RunID const & runID() const
Definition: Principal.cc:1052
static QCString result
void respondToOpenOutputFiles(FileBlock const &fb)
Definition: Worker.cc:229
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleBeginJob
bool onEndPath() const
Definition: ModuleContext.h:53
const char expected[]
Definition: Exception_t.cc:22
void respondToCloseOutputFiles(FileBlock const &fb)
Definition: Worker.cc:237
std::string string
Definition: nybbler.cc:12
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:71
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseOutputFiles
#define TDEBUG_END_TASK_SI(LEVEL, SI)
std::size_t timesPassed() const
Definition: Worker.cc:173
bool doWork(Transition, Principal &, ModuleContext const &)
Definition: Worker.cc:245
std::string const & moduleLabel() const
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseInputFile
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
auto scheduleID() const
Definition: ModuleContext.h:28
std::string const & label() const
Definition: Worker.cc:124
virtual hep::concurrency::SerialTaskQueueChain * implSerialTaskQueueChain() const =0
STL namespace.
QTextStream & hex(QTextStream &s)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void reset()
Definition: Worker.cc:146
ActionTable const & actions_
Definition: Worker.h:122
std::size_t timesRun() const
Definition: Worker.cc:166
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:146
virtual void implRespondToOpenInputFile(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndSubRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleBeginJob
std::atomic< int > state_
Definition: Worker.h:124
std::size_t timesVisited() const
Definition: Worker.cc:159
virtual void implRespondToCloseOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenInputFile
virtual bool implDoEnd(RunPrincipal &rp, ModuleContext const &mc)=0
std::string const & moduleName() const
const double e
bt
Definition: tracks.py:83
Transition
Definition: Transition.h:7
ActivityRegistry const & actReg_
Definition: Worker.h:123
void runWorker(EventPrincipal &, ModuleContext const &)
Definition: Worker.cc:462
SubRunID subRunID() const
Definition: Principal.cc:1058
ModuleDescription const md_
Definition: Worker.h:121
ScheduleID scheduleID_
Definition: WorkerParams.h:29
virtual bool implDoProcess(EventPrincipal &, ModuleContext const &)=0
p
Definition: test.py:223
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginRun
#define TDEBUG_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenInputFile
ScheduleID const scheduleID_
Definition: Worker.h:120
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleEndJob
QTextStream & dec(QTextStream &s)
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.cc:213
std::atomic< bool > returnCode_
Definition: Worker.h:137
std::atomic< std::size_t > counts_run_
Definition: Worker.h:147
ModuleDescription const & description() const
Definition: Worker.cc:118
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
virtual void implRespondToCloseInputFile(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:150
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:148
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenOutputFiles
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:149
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
virtual void implEndJob()=0
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleEndJob
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.cc:221
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndSubRun
void beginJob(detail::SharedResources const &)
Definition: Worker.cc:193
virtual bool implDoBegin(RunPrincipal &rp, ModuleContext const &mc)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginSubRun
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain() const
Definition: Worker.cc:137
std::exception_ptr cached_exception_
Definition: Worker.h:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginSubRun
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseOutputFiles
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenOutputFiles
void doWork_event(hep::concurrency::WaitingTaskPtr workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
bool returnCode() const
Definition: Worker.cc:131
virtual void implRespondToOpenOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseInputFile
static QCString * s
Definition: config.cpp:1042
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
BranchType branchType() const
Definition: Principal.cc:1022
std::atomic< bool > workStarted_
Definition: Worker.h:136
std::size_t timesFailed() const
Definition: Worker.cc:180