Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
art::TriggerPathsExecutor Class Reference

#include <TriggerPathsExecutor.h>

Classes

class  PathsDoneTask
 

Public Member Functions

 TriggerPathsExecutor (ScheduleID, PathManager &, ActionTable const &, std::unique_ptr< Worker > triggerResultsInserter, GlobalTaskGroup &group)
 
 TriggerPathsExecutor (TriggerPathsExecutor const &)=delete
 
 TriggerPathsExecutor (TriggerPathsExecutor &&)=delete
 
TriggerPathsExecutoroperator= (TriggerPathsExecutor const &)=delete
 
TriggerPathsExecutoroperator= (TriggerPathsExecutor &&)=delete
 
void process (Transition, Principal &)
 
void process_event (hep::concurrency::WaitingTaskPtr endPathTask, EventPrincipal &)
 
void beginJob (detail::SharedResources const &resources)
 
void endJob ()
 
void respondToOpenInputFile (FileBlock const &)
 
void respondToCloseInputFile (FileBlock const &)
 
void respondToOpenOutputFiles (FileBlock const &)
 
void respondToCloseOutputFiles (FileBlock const &)
 
void process_event_paths_done (EventPrincipal &)
 

Private Member Functions

bool skipNonReplicated_ (Worker const &)
 

Private Attributes

ScheduleContext const sc_
 
ActionTable const & actionTable_
 
PathsInfotriggerPathsInfo_
 
std::unique_ptr< Workerresults_inserter_
 
GlobalTaskGrouptaskGroup_
 

Detailed Description

Definition at line 42 of file TriggerPathsExecutor.h.

Constructor & Destructor Documentation

art::TriggerPathsExecutor::TriggerPathsExecutor ( ScheduleID  scheduleID,
PathManager pm,
ActionTable const &  actions,
std::unique_ptr< Worker triggerResultsInserter,
GlobalTaskGroup group 
)

Definition at line 27 of file TriggerPathsExecutor.cc.

33  : sc_{scheduleID}
35  , triggerPathsInfo_{pm.triggerPathsInfo(scheduleID)}
36  , results_inserter_{std::move(triggerResultsInserter)}
37  , taskGroup_{group}
38  {
39  TDEBUG_FUNC_SI(5, scheduleID) << hex << this << dec;
40  }
ScheduleContext const sc_
std::unique_ptr< Worker > results_inserter_
QTextStream & hex(QTextStream &s)
ActionTable const & actionTable_
def move(depos, offset)
Definition: depos.py:107
#define TDEBUG_FUNC_SI(LEVEL, SI)
QTextStream & dec(QTextStream &s)
Namespace containing all the test actions.
art::TriggerPathsExecutor::TriggerPathsExecutor ( TriggerPathsExecutor const &  )
delete
art::TriggerPathsExecutor::TriggerPathsExecutor ( TriggerPathsExecutor &&  )
delete

Member Function Documentation

void art::TriggerPathsExecutor::beginJob ( detail::SharedResources const &  resources)

Definition at line 43 of file TriggerPathsExecutor.cc.

44  {
45  for (auto const& val : triggerPathsInfo_.workers()) {
46  auto& w = *val.second;
48  continue;
49  }
50  w.beginJob(resources);
51  }
52  if (results_inserter_) {
53  results_inserter_->beginJob(resources);
54  }
55  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::TriggerPathsExecutor::endJob ( )

Definition at line 58 of file TriggerPathsExecutor.cc.

59  {
61  for (auto& val : triggerPathsInfo_.workers()) {
62  auto& w = *val.second;
64  continue;
65  }
66  // FIXME: The catch and rethrow here seems to have little value added.
67  try {
68  w.endJob();
69  }
70  catch (cet::exception& e) {
71  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
72  << e.explain_self();
73  throw error;
74  }
75  catch (exception& e) {
76  error << "Standard library exception caught in "
77  "TriggerPathsExecutor::endJob\n"
78  << e.what();
79  throw error;
80  }
81  catch (...) {
82  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
83  throw error;
84  }
85  }
86  if (results_inserter_) {
87  // FIXME: The catch and rethrow here seems to have little value added.
88  try {
89  results_inserter_->endJob();
90  }
91  catch (cet::exception& e) {
92  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
93  << e.explain_self();
94  throw error;
95  }
96  catch (exception& e) {
97  error << "Standard library exception caught in "
98  "TriggerPathsExecutor::endJob\n"
99  << e.what();
100  throw error;
101  }
102  catch (...) {
103  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
104  throw error;
105  }
106  }
107  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
error
Definition: include.cc:26
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
const double e
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
TriggerPathsExecutor& art::TriggerPathsExecutor::operator= ( TriggerPathsExecutor const &  )
delete
TriggerPathsExecutor& art::TriggerPathsExecutor::operator= ( TriggerPathsExecutor &&  )
delete
void art::TriggerPathsExecutor::process ( Transition  trans,
Principal principal 
)

Definition at line 170 of file TriggerPathsExecutor.cc.

171  {
173  for (auto& path : triggerPathsInfo_.paths()) {
174  path.process(trans, principal);
175  }
176  }
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
void reset()
Definition: PathsInfo.cc:74
void art::TriggerPathsExecutor::process_event ( hep::concurrency::WaitingTaskPtr  endPathTask,
EventPrincipal  
)

Definition at line 223 of file TriggerPathsExecutor.cc.

225  {
226  // We get here as part of the readAndProcessEventTask (schedule
227  // head task).
228  auto const scheduleID = sc_.id();
229  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
230  if (results_inserter_) {
231  results_inserter_->reset();
232  }
235  try {
236  if (triggerPathsInfo_.paths().empty()) {
237  auto pathsDoneTask = make_waiting_task<PathsDoneTask>(
238  this, endPathTask, event_principal, taskGroup_);
239  taskGroup_.may_run(pathsDoneTask);
240  TDEBUG_END_FUNC_SI(4, scheduleID);
241  return;
242  }
243  auto pathsDoneTask = std::make_shared<WaitingTask>(
244  PathsDoneTask{this, endPathTask, event_principal, taskGroup_},
245  triggerPathsInfo_.paths().size());
246  for (auto& path : triggerPathsInfo_.paths()) {
247  // Start each path running. The path will start a spawn chain
248  // going to run each worker in the order specified on the
249  // path, and when they have all been run, it will call
250  // doneWaiting() on the pathsDoneTask, which decrements its
251  // reference count, which will eventually cause it to run when
252  // every path has finished.
253  path.process(pathsDoneTask, event_principal);
254  }
255  TDEBUG_END_FUNC_SI(4, scheduleID);
256  }
257  catch (...) {
258  taskGroup_.may_run(endPathTask, current_exception());
259  TDEBUG_END_FUNC_SI(4, scheduleID) << "because of EXCEPTION";
260  }
261  }
ScheduleContext const sc_
std::unique_ptr< Worker > results_inserter_
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
void reset_for_event()
Definition: PathsInfo.cc:82
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
void incrementTotalEventCount()
Definition: PathsInfo.cc:95
void art::TriggerPathsExecutor::process_event_paths_done ( EventPrincipal principal)

Definition at line 264 of file TriggerPathsExecutor.cc.

265  {
266  // We come here as part of the pathsDoneTask.
267  auto const scheduleID = sc_.id();
268  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
269  try {
272  }
273  if (results_inserter_) {
274  // FIXME: not sure what the trigger bit should be
275  auto const& resultsInserterDesc = results_inserter_->description();
276  PathContext const pc{sc_,
278  {resultsInserterDesc.moduleLabel()}};
279  ModuleContext const mc{pc, resultsInserterDesc};
280  results_inserter_->doWork_event(principal, mc);
281  }
282  }
283  catch (cet::exception& e) {
284  auto action = actionTable_.find(e.root_cause());
286  assert(action != actions::FailPath);
287  assert(action != actions::FailModule);
288  if (action != actions::SkipEvent) {
289  TDEBUG_END_FUNC_SI(4, scheduleID);
290  throw;
291  }
292  mf::LogWarning(e.category())
293  << "An exception occurred inserting the TriggerResults object:\n"
294  << cet::trim_right_copy(e.what(), " \n");
295  }
296  TDEBUG_END_FUNC_SI(4, scheduleID);
297  }
ScheduleContext const sc_
std::unique_ptr< Worker > results_inserter_
HLTGlobalStatus & pathResults()
Definition: PathsInfo.cc:89
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:71
ActionTable const & actionTable_
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
const double e
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void incrementPassedEventCount()
Definition: PathsInfo.cc:101
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
static auto art_path_spec()
Definition: PathContext.h:32
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::TriggerPathsExecutor::respondToCloseInputFile ( FileBlock const &  fb)

Definition at line 125 of file TriggerPathsExecutor.cc.

126  {
127  for (auto const& val : triggerPathsInfo_.workers()) {
128  auto& w = *val.second;
130  continue;
131  }
132  w.respondToCloseInputFile(fb);
133  }
134  if (results_inserter_) {
135  results_inserter_->respondToCloseInputFile(fb);
136  }
137  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::TriggerPathsExecutor::respondToCloseOutputFiles ( FileBlock const &  fb)

Definition at line 155 of file TriggerPathsExecutor.cc.

156  {
157  for (auto const& val : triggerPathsInfo_.workers()) {
158  auto& w = *val.second;
160  continue;
161  }
162  w.respondToCloseOutputFiles(fb);
163  }
164  if (results_inserter_) {
165  results_inserter_->respondToCloseOutputFiles(fb);
166  }
167  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::TriggerPathsExecutor::respondToOpenInputFile ( FileBlock const &  fb)

Definition at line 110 of file TriggerPathsExecutor.cc.

111  {
112  for (auto const& val : triggerPathsInfo_.workers()) {
113  auto& w = *val.second;
115  continue;
116  }
117  w.respondToOpenInputFile(fb);
118  }
119  if (results_inserter_) {
120  results_inserter_->respondToOpenInputFile(fb);
121  }
122  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
void art::TriggerPathsExecutor::respondToOpenOutputFiles ( FileBlock const &  fb)

Definition at line 140 of file TriggerPathsExecutor.cc.

141  {
142  for (auto const& val : triggerPathsInfo_.workers()) {
143  auto& w = *val.second;
145  continue;
146  }
147  w.respondToOpenOutputFiles(fb);
148  }
149  if (results_inserter_) {
150  results_inserter_->respondToOpenOutputFiles(fb);
151  }
152  }
std::unique_ptr< Worker > results_inserter_
bool skip_non_replicated(Worker const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
bool art::TriggerPathsExecutor::skipNonReplicated_ ( Worker const &  )
private

Member Data Documentation

ActionTable const& art::TriggerPathsExecutor::actionTable_
private

Definition at line 76 of file TriggerPathsExecutor.h.

std::unique_ptr<Worker> art::TriggerPathsExecutor::results_inserter_
private

Definition at line 78 of file TriggerPathsExecutor.h.

ScheduleContext const art::TriggerPathsExecutor::sc_
private

Definition at line 75 of file TriggerPathsExecutor.h.

GlobalTaskGroup& art::TriggerPathsExecutor::taskGroup_
private

Definition at line 79 of file TriggerPathsExecutor.h.

PathsInfo& art::TriggerPathsExecutor::triggerPathsInfo_
private

Definition at line 77 of file TriggerPathsExecutor.h.


The documentation for this class was generated from the following files: