Path.cc
Go to the documentation of this file.
1 
3 // vim: set sw=2 expandtab :
4 
13 #include "art/Utilities/Globals.h"
20 #include "hep_concurrency/WaitingTask.h"
22 
23 #include <cstddef>
24 #include <exception>
25 #include <string>
26 #include <vector>
27 
28 using namespace cet;
29 using namespace fhicl;
30 using namespace hep::concurrency;
31 using namespace std;
32 
33 namespace art {
34 
35  Path::Path(ActionTable const& actions,
36  ActivityRegistry const& actReg,
37  PathContext const& pc,
38  vector<WorkerInPath>&& workers,
39  HLTGlobalStatus* pathResults,
40  GlobalTaskGroup& taskGroup) noexcept
41  : actionTable_{actions}
42  , actReg_{actReg}
43  , pc_{pc}
44  , pathPosition_{ServiceHandle<TriggerNamesService>()->index_for(
45  pc_.pathID())}
46  , workers_{move(workers)}
47  , trptr_{pathResults}
48  , taskGroup_{taskGroup}
49  {
50  TDEBUG_FUNC_SI(4, pc_.scheduleID()) << hex << this << dec;
51  }
52 
54  Path::scheduleID() const
55  {
56  return pc_.scheduleID();
57  }
58 
59  PathSpec const&
60  Path::pathSpec() const
61  {
62  return pc_.pathSpec();
63  }
64 
65  PathID
66  Path::pathID() const
67  {
68  return pc_.pathID();
69  }
70 
71  string const&
72  Path::name() const
73  {
74  return pc_.pathName();
75  }
76 
77  size_t
78  Path::timesRun() const
79  {
80  return timesRun_;
81  }
82 
83  size_t
84  Path::timesPassed() const
85  {
86  return timesPassed_;
87  }
88 
89  size_t
90  Path::timesFailed() const
91  {
92  return timesFailed_;
93  }
94 
95  size_t
96  Path::timesExcept() const
97  {
98  return timesExcept_;
99  }
100 
102  Path::state() const
103  {
104  return state_;
105  }
106 
107  vector<WorkerInPath> const&
108  Path::workersInPath() const
109  {
110  return workers_;
111  }
112 
113  void
114  Path::process(Transition const trans, Principal& principal)
115  {
116  // Invoke pre-path signals only for the first schedule.
117  if (pc_.scheduleID() == ScheduleID::first()) {
118  switch (trans) {
119  case Transition::BeginRun:
120  actReg_.sPrePathBeginRun.invoke(name());
121  break;
122  case Transition::EndRun:
123  actReg_.sPrePathEndRun.invoke(name());
124  break;
125  case Transition::BeginSubRun:
126  actReg_.sPrePathBeginSubRun.invoke(name());
127  break;
128  case Transition::EndSubRun:
129  actReg_.sPrePathEndSubRun.invoke(name());
130  break;
131  default: {} // No other pre-path signals supported.
132  }
133  }
134  state_ = hlt::Ready;
135  std::size_t idx = 0;
136  bool all_passed{false};
137  for (WorkerInPath& wip : workers_) {
138  // We do not want to call (e.g.) beginRun once per schedule for
139  // non-replicated modules.
140  if (detail::skip_non_replicated(*wip.getWorker())) {
141  continue;
142  }
143  try {
144  all_passed = wip.run(trans, principal);
145  if (!all_passed)
146  break;
147  }
148  catch (cet::exception& e) {
149  state_ = hlt::Exception;
150  throw art::Exception{
151  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
152  << "Exception going through path " << name() << "\n";
153  }
154  catch (...) {
155  mf::LogError("PassingThrough")
156  << "Exception passing through path " << name() << "\n";
157  state_ = hlt::Exception;
158  throw;
159  }
160  ++idx;
161  }
162  if (all_passed) {
163  state_ = hlt::Pass;
164  } else {
165  state_ = hlt::Fail;
166  }
167  // Invoke post-path signals only for the last schedule.
168  if (pc_.scheduleID().id() == art::Globals::instance()->nschedules() - 1) {
169  HLTPathStatus const status(state_, idx);
170  switch (trans) {
171  case Transition::BeginRun:
172  actReg_.sPostPathBeginRun.invoke(name(), status);
173  break;
174  case Transition::EndRun:
175  actReg_.sPostPathEndRun.invoke(name(), status);
176  break;
177  case Transition::BeginSubRun:
178  actReg_.sPostPathBeginSubRun.invoke(name(), status);
179  break;
180  case Transition::EndSubRun:
181  actReg_.sPostPathEndSubRun.invoke(name(), status);
182  break;
183  default: {} // No other post-path signals supported.
184  }
185  }
186  }
187 
188  void
189  Path::process(WaitingTaskPtr pathsDoneTask, EventPrincipal& ep)
190  {
191  // We come here as part of the readAndProcessEvent task (schedule
192  // head task), or as part of the endPath task.
193  auto const sid = pc_.scheduleID();
194  TDEBUG_BEGIN_FUNC_SI(4, sid);
195  TDEBUG_FUNC_SI(6, sid) << hex << this << dec << " Resetting waitingTasks_";
196 
197  // Make sure the list is not auto-spawning tasks.
198  actReg_.sPreProcessPath.invoke(pc_);
199  ++timesRun_;
200  state_ = hlt::Ready;
201  size_t idx = 0;
202  auto max_idx = workers_.size();
203  // Start the task spawn chain going with the first worker on the
204  // path. Each worker will spawn the next worker in order, until
205  // all the workers have run.
206  process_event_idx_asynch(idx, max_idx, ep, pathsDoneTask);
207  TDEBUG_END_FUNC_SI(4, sid);
208  }
209 
210  void
211  Path::runWorkerTask(size_t const idx,
212  size_t const max_idx,
213  EventPrincipal& ep,
214  WaitingTaskPtr pathsDone)
215  {
216  auto const sid = pc_.scheduleID();
217  TDEBUG_BEGIN_TASK_SI(4, sid);
218  try {
219  process_event_idx(idx, max_idx, ep, pathsDone);
220  TDEBUG_END_TASK_SI(4, sid);
221  }
222  catch (...) {
223  taskGroup_.may_run(pathsDone, current_exception());
224  TDEBUG_END_TASK_SI(4, sid) << "path terminate because of EXCEPTION";
225  }
226  }
227 
228  // This function is a spawn chain system to run workers one at a time,
229  // in the order specified on the path, and then decrement the ref count
230  // on the endPathsTask when finished (which causes it to run if we are
231  // the last path to finish running its workers).
232  void
233  Path::process_event_idx_asynch(size_t const idx,
234  size_t const max_idx,
235  EventPrincipal& ep,
236  WaitingTaskPtr pathsDone)
237  {
238  auto const sid = pc_.scheduleID();
239  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
240  taskGroup_.run([this, idx, max_idx, &ep, pathsDone] {
241  runWorkerTask(idx, max_idx, ep, pathsDone);
242  });
243  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
244  }
245 
247  public:
249  size_t const idx,
250  size_t const max_idx,
251  EventPrincipal& ep,
252  WaitingTaskPtr pathsDone,
253  GlobalTaskGroup& group)
254  : path_{path}
255  , idx_{idx}
256  , max_idx_{max_idx}
257  , ep_{ep}
258  , pathsDone_{pathsDone}
259  , group_{group}
260  {}
261  void
262  operator()(exception_ptr ex)
263  {
264  auto const sid = path_->pc_.scheduleID();
265  TDEBUG_BEGIN_TASK_SI(4, sid);
266  auto& workerInPath = path_->workers_[idx_];
267  // Note: This will only be set false by a filter which has rejected.
268  bool new_should_continue = workerInPath.returnCode();
269  TDEBUG_TASK_SI(4, sid) << "new_should_continue: " << new_should_continue;
270  if (ex) {
271  try {
272  rethrow_exception(ex);
273  }
274  catch (cet::exception& e) {
275  auto action = path_->actionTable_.find(e.root_cause());
276  assert(action != actions::FailModule);
277  if (action != actions::FailPath) {
278  // Possible actions: IgnoreCompletely, Rethrow, SkipEvent
279  ++path_->timesExcept_;
280  path_->state_ = hlt::Exception;
281  if (path_->trptr_) {
282  // Not the end path.
283  path_->trptr_->at(path_->pathPosition_) =
284  HLTPathStatus(path_->state_, idx_);
285  }
286  auto art_ex =
288  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
289  << "Exception going through path " << path_->name() << "\n";
290  auto ex_ptr = make_exception_ptr(art_ex);
291  group_.may_run(pathsDone_, ex_ptr);
292  TDEBUG_END_TASK_SI(4, sid) << "terminate path because of EXCEPTION";
293  return;
294  }
295  new_should_continue = false;
296  mf::LogWarning(e.category()) << "Failing path " << path_->name()
297  << ", due to exception, message:\n"
298  << e.what() << "\n";
299  // WARNING: We continue processing below!!!
300  }
301  catch (...) {
302  mf::LogError("PassingThrough")
303  << "Exception passing through path " << path_->name() << "\n";
304  ++path_->timesExcept_;
305  path_->state_ = hlt::Exception;
306  if (path_->trptr_) {
307  // Not the end path.
308  path_->trptr_->at(path_->pathPosition_) =
309  HLTPathStatus(path_->state_, idx_);
310  }
311  group_.may_run(pathsDone_, current_exception());
312  TDEBUG_END_TASK_SI(4, sid) << "terminate path because of EXCEPTION";
313  return;
314  }
315  }
316 
317  path_->process_event_workerFinished(
318  idx_, max_idx_, ep_, new_should_continue, pathsDone_);
319  TDEBUG_END_TASK_SI(4, sid);
320  }
321 
322  private:
324  size_t const idx_;
325  size_t const max_idx_;
327  WaitingTaskPtr pathsDone_;
329  };
330 
331  // This function is the main body of the Run Worker task.
332  void
333  Path::process_event_idx(size_t const idx,
334  size_t const max_idx,
335  EventPrincipal& ep,
336  WaitingTaskPtr pathsDone)
337  {
338  auto const sid = pc_.scheduleID();
339  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
340  auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
341  this, idx, max_idx, ep, pathsDone, taskGroup_);
342  auto& workerInPath = workers_[idx];
343  workerInPath.run(workerDoneTask, ep);
344  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
345  }
346 
347  void
348  Path::process_event_workerFinished(size_t const idx,
349  size_t const max_idx,
350  EventPrincipal& ep,
351  bool const should_continue,
352  WaitingTaskPtr pathsDone)
353  {
354  auto const sid = pc_.scheduleID();
355  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx
356  << " should_continue: " << should_continue;
357  auto new_idx = idx + 1;
358  // Move on to the next worker.
359  if (should_continue && (new_idx < max_idx)) {
360  // Spawn the next worker.
361  process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
362  // And end this one.
363  TDEBUG_END_FUNC_SI(4, sid)
364  << "new_idx: " << new_idx << " max_idx: " << max_idx;
365  return;
366  }
367 
368  // All done, or filter rejected, or error.
369  process_event_pathFinished(new_idx, should_continue, pathsDone);
370  // And end the path here.
371  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
372  }
373 
374  void
375  Path::process_event_pathFinished(size_t const idx,
376  bool const should_continue,
377  WaitingTaskPtr pathsDone)
378  {
379  // We come here as as part of a runWorker task.
380  auto const sid = pc_.scheduleID();
381  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
382  << " should_continue: " << should_continue;
383  if (should_continue) {
384  ++timesPassed_;
385  state_ = hlt::Pass;
386  } else {
387  ++timesFailed_;
388  state_ = hlt::Fail;
389  }
390 
391  auto ex_ptr = std::exception_ptr{};
392  try {
393  HLTPathStatus const status{state_, idx};
394  if (trptr_) {
395  // Not the end path.
396  trptr_->at(pathPosition_) = status;
397  }
398  actReg_.sPostProcessPath.invoke(pc_, status);
399  }
400  catch (...) {
401  ex_ptr = std::current_exception();
402  }
403  taskGroup_.may_run(pathsDone, ex_ptr);
404  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
405  << " should_continue: " << should_continue
406  << (ex_ptr ? " EXCEPTION" : "");
407  }
408 
409 } // namespace art
static QCString name
Definition: declinfo.cpp:673
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
void operator()(exception_ptr ex)
Definition: Path.cc:262
bool skip_non_replicated(Worker const &)
STL namespace.
WorkerDoneTask(Path *path, size_t const idx, size_t const max_idx, EventPrincipal &ep, WaitingTaskPtr pathsDone, GlobalTaskGroup &group)
Definition: Path.cc:248
QTextStream & hex(QTextStream &s)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
HLTState
Definition: HLTenums.h:6
#define TDEBUG_TASK_SI(LEVEL, SI)
def process(f, kind)
Definition: search.py:254
const double e
WaitingTaskPtr pathsDone_
Definition: Path.cc:327
Transition
Definition: Transition.h:7
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
def move(depos, offset)
Definition: depos.py:107
size_t const idx_
Definition: Path.cc:324
EventPrincipal & ep_
Definition: Path.cc:326
#define TDEBUG_FUNC_SI(LEVEL, SI)
QTextStream & dec(QTextStream &s)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
GlobalTaskGroup & group_
Definition: Path.cc:328
Definition: Path.h:31
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Namespace containing all the test actions.
static Globals * instance()
Definition: Globals.cc:17
size_t const max_idx_
Definition: Path.cc:325
workers
Definition: train.py:479
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33