Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
art::Path Class Reference

#include <Path.h>

Classes

class  WorkerDoneTask
 

Public Member Functions

 Path (ActionTable const &, ActivityRegistry const &, PathContext const &, std::vector< WorkerInPath > &&, HLTGlobalStatus *, GlobalTaskGroup &) noexcept
 
ScheduleID scheduleID () const
 
PathSpec const & pathSpec () const
 
PathID pathID () const
 
std::string const & name () const
 
std::vector< WorkerInPath > const & workersInPath () const
 
hlt::HLTState state () const
 
std::size_t timesRun () const
 
std::size_t timesPassed () const
 
std::size_t timesFailed () const
 
std::size_t timesExcept () const
 
void clearCounters ()
 
void process (Transition, Principal &)
 
void process (hep::concurrency::WaitingTaskPtr pathsDoneTask, EventPrincipal &)
 

Private Member Functions

void runWorkerTask (size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_idx_asynch (size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_idx (size_t const idx, size_t const max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_workerFinished (size_t const idx, size_t const max_idx, EventPrincipal &ep, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_pathFinished (size_t const idx, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
 

Private Attributes

ActionTable const & actionTable_
 
ActivityRegistry const & actReg_
 
PathContext const pc_
 
size_t const pathPosition_
 
std::vector< WorkerInPathworkers_
 
cet::exempt_ptr< HLTGlobalStatustrptr_
 
GlobalTaskGrouptaskGroup_
 
hlt::HLTState state_ {hlt::Ready}
 
std::size_t timesRun_ {}
 
std::size_t timesPassed_ {}
 
std::size_t timesFailed_ {}
 
std::size_t timesExcept_ {}
 

Detailed Description

Definition at line 31 of file Path.h.

Constructor & Destructor Documentation

art::Path::Path ( ActionTable const &  actions,
ActivityRegistry const &  actReg,
PathContext const &  pc,
std::vector< WorkerInPath > &&  workers,
HLTGlobalStatus pathResults,
GlobalTaskGroup taskGroup 
)
noexcept

Definition at line 35 of file Path.cc.

42  , actReg_{actReg}
43  , pc_{pc}
44  , pathPosition_{ServiceHandle<TriggerNamesService>()->index_for(
45  pc_.pathID())}
46  , workers_{move(workers)}
47  , trptr_{pathResults}
48  , taskGroup_{taskGroup}
49  {
50  TDEBUG_FUNC_SI(4, pc_.scheduleID()) << hex << this << dec;
51  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
QTextStream & hex(QTextStream &s)
ActionTable const & actionTable_
Definition: Path.h:82
auto scheduleID() const
Definition: PathContext.h:52
def move(depos, offset)
Definition: depos.py:107
#define TDEBUG_FUNC_SI(LEVEL, SI)
size_t const pathPosition_
Definition: Path.h:85
QTextStream & dec(QTextStream &s)
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::exempt_ptr< HLTGlobalStatus > trptr_
Definition: Path.h:90
std::vector< WorkerInPath > workers_
Definition: Path.h:87
PathID pathID() const noexcept
Definition: PathContext.h:68
Namespace containing all the test actions.
PathContext const pc_
Definition: Path.h:84

Member Function Documentation

void art::Path::clearCounters ( )
string const & art::Path::name ( ) const

Definition at line 72 of file Path.cc.

73  {
74  return pc_.pathName();
75  }
auto const & pathName() const
Definition: PathContext.h:63
PathContext const pc_
Definition: Path.h:84
PathID art::Path::pathID ( ) const

Definition at line 66 of file Path.cc.

67  {
68  return pc_.pathID();
69  }
PathID pathID() const noexcept
Definition: PathContext.h:68
PathContext const pc_
Definition: Path.h:84
PathSpec const & art::Path::pathSpec ( ) const

Definition at line 60 of file Path.cc.

61  {
62  return pc_.pathSpec();
63  }
auto const & pathSpec() const
Definition: PathContext.h:57
PathContext const pc_
Definition: Path.h:84
void art::Path::process ( Transition  trans,
Principal principal 
)

Definition at line 114 of file Path.cc.

115  {
116  // Invoke pre-path signals only for the first schedule.
117  if (pc_.scheduleID() == ScheduleID::first()) {
118  switch (trans) {
120  actReg_.sPrePathBeginRun.invoke(name());
121  break;
122  case Transition::EndRun:
123  actReg_.sPrePathEndRun.invoke(name());
124  break;
126  actReg_.sPrePathBeginSubRun.invoke(name());
127  break;
129  actReg_.sPrePathEndSubRun.invoke(name());
130  break;
131  default: {} // No other pre-path signals supported.
132  }
133  }
134  state_ = hlt::Ready;
135  std::size_t idx = 0;
136  bool all_passed{false};
137  for (WorkerInPath& wip : workers_) {
138  // We do not want to call (e.g.) beginRun once per schedule for
139  // non-replicated modules.
140  if (detail::skip_non_replicated(*wip.getWorker())) {
141  continue;
142  }
143  try {
144  all_passed = wip.run(trans, principal);
145  if (!all_passed)
146  break;
147  }
148  catch (cet::exception& e) {
150  throw art::Exception{
151  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
152  << "Exception going through path " << name() << "\n";
153  }
154  catch (...) {
155  mf::LogError("PassingThrough")
156  << "Exception passing through path " << name() << "\n";
158  throw;
159  }
160  ++idx;
161  }
162  if (all_passed) {
163  state_ = hlt::Pass;
164  } else {
165  state_ = hlt::Fail;
166  }
167  // Invoke post-path signals only for the last schedule.
168  if (pc_.scheduleID().id() == art::Globals::instance()->nschedules() - 1) {
169  HLTPathStatus const status(state_, idx);
170  switch (trans) {
173  break;
174  case Transition::EndRun:
175  actReg_.sPostPathEndRun.invoke(name(), status);
176  break;
179  break;
182  break;
183  default: {} // No other post-path signals supported.
184  }
185  }
186  }
hlt::HLTState state_
Definition: Path.h:95
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathEndSubRun
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathEndRun
bool skip_non_replicated(Worker const &)
std::string const & name() const
Definition: Path.cc:72
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathEndSubRun
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathBeginSubRun
auto scheduleID() const
Definition: PathContext.h:52
const double e
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathBeginRun
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::vector< WorkerInPath > workers_
Definition: Path.h:87
static Globals * instance()
Definition: Globals.cc:17
PathContext const pc_
Definition: Path.h:84
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathBeginRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathBeginSubRun
void art::Path::process ( hep::concurrency::WaitingTaskPtr  pathsDoneTask,
EventPrincipal  
)
void art::Path::process_event_idx ( size_t const  idx,
size_t const  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 333 of file Path.cc.

337  {
338  auto const sid = pc_.scheduleID();
339  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
340  auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
341  this, idx, max_idx, ep, pathsDone, taskGroup_);
342  auto& workerInPath = workers_[idx];
343  workerInPath.run(workerDoneTask, ep);
344  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
345  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
auto scheduleID() const
Definition: PathContext.h:52
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::vector< WorkerInPath > workers_
Definition: Path.h:87
PathContext const pc_
Definition: Path.h:84
void art::Path::process_event_idx_asynch ( size_t  idx,
size_t  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 233 of file Path.cc.

237  {
238  auto const sid = pc_.scheduleID();
239  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
240  taskGroup_.run([this, idx, max_idx, &ep, pathsDone] {
241  runWorkerTask(idx, max_idx, ep, pathsDone);
242  });
243  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
244  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
auto scheduleID() const
Definition: PathContext.h:52
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
PathContext const pc_
Definition: Path.h:84
void runWorkerTask(size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:211
void art::Path::process_event_pathFinished ( size_t const  idx,
bool  should_continue,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 375 of file Path.cc.

378  {
379  // We come here as as part of a runWorker task.
380  auto const sid = pc_.scheduleID();
381  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
382  << " should_continue: " << should_continue;
383  if (should_continue) {
384  ++timesPassed_;
385  state_ = hlt::Pass;
386  } else {
387  ++timesFailed_;
388  state_ = hlt::Fail;
389  }
390 
391  auto ex_ptr = std::exception_ptr{};
392  try {
393  HLTPathStatus const status{state_, idx};
394  if (trptr_) {
395  // Not the end path.
396  trptr_->at(pathPosition_) = status;
397  }
399  }
400  catch (...) {
401  ex_ptr = std::current_exception();
402  }
403  taskGroup_.may_run(pathsDone, ex_ptr);
404  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
405  << " should_continue: " << should_continue
406  << (ex_ptr ? " EXCEPTION" : "");
407  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
hlt::HLTState state_
Definition: Path.h:95
auto scheduleID() const
Definition: PathContext.h:52
GlobalSignal< detail::SignalResponseType::LIFO, void(PathContext const &, HLTPathStatus const &)> sPostProcessPath
std::size_t timesFailed_
Definition: Path.h:98
#define TDEBUG_FUNC_SI(LEVEL, SI)
size_t const pathPosition_
Definition: Path.h:85
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::exempt_ptr< HLTGlobalStatus > trptr_
Definition: Path.h:90
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
std::size_t timesPassed_
Definition: Path.h:97
PathContext const pc_
Definition: Path.h:84
void art::Path::process_event_workerFinished ( size_t const  idx,
size_t const  max_idx,
EventPrincipal ep,
bool  should_continue,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 348 of file Path.cc.

353  {
354  auto const sid = pc_.scheduleID();
355  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx
356  << " should_continue: " << should_continue;
357  auto new_idx = idx + 1;
358  // Move on to the next worker.
359  if (should_continue && (new_idx < max_idx)) {
360  // Spawn the next worker.
361  process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
362  // And end this one.
363  TDEBUG_END_FUNC_SI(4, sid)
364  << "new_idx: " << new_idx << " max_idx: " << max_idx;
365  return;
366  }
367 
368  // All done, or filter rejected, or error.
369  process_event_pathFinished(new_idx, should_continue, pathsDone);
370  // And end the path here.
371  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
372  }
auto scheduleID() const
Definition: PathContext.h:52
void process_event_idx_asynch(size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:233
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void process_event_pathFinished(size_t const idx, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:375
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
PathContext const pc_
Definition: Path.h:84
void art::Path::runWorkerTask ( size_t  idx,
size_t  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 211 of file Path.cc.

215  {
216  auto const sid = pc_.scheduleID();
217  TDEBUG_BEGIN_TASK_SI(4, sid);
218  try {
219  process_event_idx(idx, max_idx, ep, pathsDone);
220  TDEBUG_END_TASK_SI(4, sid);
221  }
222  catch (...) {
223  taskGroup_.may_run(pathsDone, current_exception());
224  TDEBUG_END_TASK_SI(4, sid) << "path terminate because of EXCEPTION";
225  }
226  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
auto scheduleID() const
Definition: PathContext.h:52
void process_event_idx(size_t const idx, size_t const max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:333
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
PathContext const pc_
Definition: Path.h:84
ScheduleID art::Path::scheduleID ( ) const

Definition at line 54 of file Path.cc.

55  {
56  return pc_.scheduleID();
57  }
auto scheduleID() const
Definition: PathContext.h:52
PathContext const pc_
Definition: Path.h:84
hlt::HLTState art::Path::state ( ) const

Definition at line 102 of file Path.cc.

103  {
104  return state_;
105  }
hlt::HLTState state_
Definition: Path.h:95
size_t art::Path::timesExcept ( ) const

Definition at line 96 of file Path.cc.

97  {
98  return timesExcept_;
99  }
std::size_t timesExcept_
Definition: Path.h:99
size_t art::Path::timesFailed ( ) const

Definition at line 90 of file Path.cc.

91  {
92  return timesFailed_;
93  }
std::size_t timesFailed_
Definition: Path.h:98
size_t art::Path::timesPassed ( ) const

Definition at line 84 of file Path.cc.

85  {
86  return timesPassed_;
87  }
std::size_t timesPassed_
Definition: Path.h:97
size_t art::Path::timesRun ( ) const

Definition at line 78 of file Path.cc.

79  {
80  return timesRun_;
81  }
std::size_t timesRun_
Definition: Path.h:96
vector< WorkerInPath > const & art::Path::workersInPath ( ) const

Definition at line 108 of file Path.cc.

109  {
110  return workers_;
111  }
std::vector< WorkerInPath > workers_
Definition: Path.h:87

Member Data Documentation

ActionTable const& art::Path::actionTable_
private

Definition at line 82 of file Path.h.

ActivityRegistry const& art::Path::actReg_
private

Definition at line 83 of file Path.h.

size_t const art::Path::pathPosition_
private

Definition at line 85 of file Path.h.

PathContext const art::Path::pc_
private

Definition at line 84 of file Path.h.

hlt::HLTState art::Path::state_ {hlt::Ready}
private

Definition at line 95 of file Path.h.

GlobalTaskGroup& art::Path::taskGroup_
private

Definition at line 92 of file Path.h.

std::size_t art::Path::timesExcept_ {}
private

Definition at line 99 of file Path.h.

std::size_t art::Path::timesFailed_ {}
private

Definition at line 98 of file Path.h.

std::size_t art::Path::timesPassed_ {}
private

Definition at line 97 of file Path.h.

std::size_t art::Path::timesRun_ {}
private

Definition at line 96 of file Path.h.

cet::exempt_ptr<HLTGlobalStatus> art::Path::trptr_
private

Definition at line 90 of file Path.h.

std::vector<WorkerInPath> art::Path::workers_
private

Definition at line 87 of file Path.h.


The documentation for this class was generated from the following files: