20 #include "hep_concurrency/WaitingTask.h" 29 using namespace fhicl;
48 , taskGroup_{taskGroup}
54 Path::scheduleID()
const 56 return pc_.scheduleID();
60 Path::pathSpec()
const 62 return pc_.pathSpec();
74 return pc_.pathName();
78 Path::timesRun()
const 84 Path::timesPassed()
const 90 Path::timesFailed()
const 96 Path::timesExcept()
const 107 vector<WorkerInPath>
const&
108 Path::workersInPath()
const 117 if (pc_.scheduleID() == ScheduleID::first()) {
119 case Transition::BeginRun:
120 actReg_.sPrePathBeginRun.invoke(
name());
122 case Transition::EndRun:
123 actReg_.sPrePathEndRun.invoke(
name());
125 case Transition::BeginSubRun:
126 actReg_.sPrePathBeginSubRun.invoke(
name());
128 case Transition::EndSubRun:
129 actReg_.sPrePathEndSubRun.invoke(
name());
136 bool all_passed{
false};
144 all_passed = wip.run(trans, principal);
152 <<
"Exception going through path " <<
name() <<
"\n";
156 <<
"Exception passing through path " <<
name() <<
"\n";
171 case Transition::BeginRun:
172 actReg_.sPostPathBeginRun.invoke(
name(), status);
174 case Transition::EndRun:
175 actReg_.sPostPathEndRun.invoke(
name(), status);
177 case Transition::BeginSubRun:
178 actReg_.sPostPathBeginSubRun.invoke(
name(), status);
180 case Transition::EndSubRun:
181 actReg_.sPostPathEndSubRun.invoke(
name(), status);
193 auto const sid = pc_.scheduleID();
198 actReg_.sPreProcessPath.invoke(pc_);
202 auto max_idx = workers_.size();
206 process_event_idx_asynch(idx, max_idx, ep, pathsDoneTask);
211 Path::runWorkerTask(
size_t const idx,
212 size_t const max_idx,
214 WaitingTaskPtr pathsDone)
216 auto const sid = pc_.scheduleID();
219 process_event_idx(idx, max_idx, ep, pathsDone);
223 taskGroup_.may_run(pathsDone, current_exception());
233 Path::process_event_idx_asynch(
size_t const idx,
234 size_t const max_idx,
236 WaitingTaskPtr pathsDone)
238 auto const sid = pc_.scheduleID();
240 taskGroup_.run([
this, idx, max_idx, &ep, pathsDone] {
241 runWorkerTask(idx, max_idx, ep, pathsDone);
250 size_t const max_idx,
252 WaitingTaskPtr pathsDone,
258 , pathsDone_{pathsDone}
264 auto const sid = path_->pc_.scheduleID();
266 auto& workerInPath = path_->workers_[idx_];
268 bool new_should_continue = workerInPath.returnCode();
269 TDEBUG_TASK_SI(4, sid) <<
"new_should_continue: " << new_should_continue;
272 rethrow_exception(ex);
275 auto action = path_->actionTable_.find(e.root_cause());
279 ++path_->timesExcept_;
283 path_->trptr_->at(path_->pathPosition_) =
289 <<
"Exception going through path " << path_->name() <<
"\n";
290 auto ex_ptr = make_exception_ptr(art_ex);
291 group_.may_run(pathsDone_, ex_ptr);
295 new_should_continue =
false;
297 <<
", due to exception, message:\n" 303 <<
"Exception passing through path " << path_->name() <<
"\n";
304 ++path_->timesExcept_;
308 path_->trptr_->at(path_->pathPosition_) =
311 group_.may_run(pathsDone_, current_exception());
317 path_->process_event_workerFinished(
318 idx_, max_idx_, ep_, new_should_continue, pathsDone_);
333 Path::process_event_idx(
size_t const idx,
334 size_t const max_idx,
336 WaitingTaskPtr pathsDone)
338 auto const sid = pc_.scheduleID();
339 TDEBUG_FUNC_SI(4, sid) <<
"idx: " << idx <<
" max_idx: " << max_idx;
340 auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
341 this, idx, max_idx, ep, pathsDone, taskGroup_);
342 auto& workerInPath = workers_[idx];
343 workerInPath.run(workerDoneTask, ep);
344 TDEBUG_FUNC_SI(4, sid) <<
"idx: " << idx <<
" max_idx: " << max_idx;
348 Path::process_event_workerFinished(
size_t const idx,
349 size_t const max_idx,
351 bool const should_continue,
352 WaitingTaskPtr pathsDone)
354 auto const sid = pc_.scheduleID();
356 <<
" should_continue: " << should_continue;
357 auto new_idx = idx + 1;
359 if (should_continue && (new_idx < max_idx)) {
361 process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
364 <<
"new_idx: " << new_idx <<
" max_idx: " << max_idx;
369 process_event_pathFinished(new_idx, should_continue, pathsDone);
375 Path::process_event_pathFinished(
size_t const idx,
376 bool const should_continue,
377 WaitingTaskPtr pathsDone)
380 auto const sid = pc_.scheduleID();
382 <<
" should_continue: " << should_continue;
383 if (should_continue) {
391 auto ex_ptr = std::exception_ptr{};
396 trptr_->at(pathPosition_) =
status;
398 actReg_.sPostProcessPath.invoke(pc_,
status);
401 ex_ptr = std::current_exception();
403 taskGroup_.may_run(pathsDone, ex_ptr);
405 <<
" should_continue: " << should_continue
406 << (ex_ptr ?
" EXCEPTION" :
"");
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
void operator()(exception_ptr ex)
bool skip_non_replicated(Worker const &)
WorkerDoneTask(Path *path, size_t const idx, size_t const max_idx, EventPrincipal &ep, WaitingTaskPtr pathsDone, GlobalTaskGroup &group)
QTextStream & hex(QTextStream &s)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
#define TDEBUG_TASK_SI(LEVEL, SI)
WaitingTaskPtr pathsDone_
ScheduleID::size_type nschedules() const
#define TDEBUG_FUNC_SI(LEVEL, SI)
QTextStream & dec(QTextStream &s)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Namespace containing all the test actions.
static Globals * instance()
cet::coded_exception< error, detail::translate > exception