TriggerPathsExecutor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
15 #include "cetlib/trim.h"
16 #include "hep_concurrency/WaitingTask.h"
18 
19 #include <cassert>
20 #include <utility>
21 
22 using namespace hep::concurrency;
23 using namespace std;
24 
25 namespace art {
26 
27  TriggerPathsExecutor::TriggerPathsExecutor(
28  ScheduleID const scheduleID,
29  PathManager& pm,
30  ActionTable const& actions,
31  std::unique_ptr<Worker> triggerResultsInserter,
32  GlobalTaskGroup& group)
33  : sc_{scheduleID}
35  , triggerPathsInfo_{pm.triggerPathsInfo(scheduleID)}
36  , results_inserter_{std::move(triggerResultsInserter)}
37  , taskGroup_{group}
38  {
39  TDEBUG_FUNC_SI(5, scheduleID) << hex << this << dec;
40  }
41 
42  void
44  {
45  for (auto const& val : triggerPathsInfo_.workers()) {
46  auto& w = *val.second;
48  continue;
49  }
50  w.beginJob(resources);
51  }
52  if (results_inserter_) {
53  results_inserter_->beginJob(resources);
54  }
55  }
56 
57  void
59  {
61  for (auto& val : triggerPathsInfo_.workers()) {
62  auto& w = *val.second;
64  continue;
65  }
66  // FIXME: The catch and rethrow here seems to have little value added.
67  try {
68  w.endJob();
69  }
70  catch (cet::exception& e) {
71  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
72  << e.explain_self();
73  throw error;
74  }
75  catch (exception& e) {
76  error << "Standard library exception caught in "
77  "TriggerPathsExecutor::endJob\n"
78  << e.what();
79  throw error;
80  }
81  catch (...) {
82  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
83  throw error;
84  }
85  }
86  if (results_inserter_) {
87  // FIXME: The catch and rethrow here seems to have little value added.
88  try {
89  results_inserter_->endJob();
90  }
91  catch (cet::exception& e) {
92  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
93  << e.explain_self();
94  throw error;
95  }
96  catch (exception& e) {
97  error << "Standard library exception caught in "
98  "TriggerPathsExecutor::endJob\n"
99  << e.what();
100  throw error;
101  }
102  catch (...) {
103  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
104  throw error;
105  }
106  }
107  }
108 
109  void
111  {
112  for (auto const& val : triggerPathsInfo_.workers()) {
113  auto& w = *val.second;
115  continue;
116  }
117  w.respondToOpenInputFile(fb);
118  }
119  if (results_inserter_) {
120  results_inserter_->respondToOpenInputFile(fb);
121  }
122  }
123 
124  void
126  {
127  for (auto const& val : triggerPathsInfo_.workers()) {
128  auto& w = *val.second;
130  continue;
131  }
132  w.respondToCloseInputFile(fb);
133  }
134  if (results_inserter_) {
135  results_inserter_->respondToCloseInputFile(fb);
136  }
137  }
138 
139  void
141  {
142  for (auto const& val : triggerPathsInfo_.workers()) {
143  auto& w = *val.second;
145  continue;
146  }
147  w.respondToOpenOutputFiles(fb);
148  }
149  if (results_inserter_) {
150  results_inserter_->respondToOpenOutputFiles(fb);
151  }
152  }
153 
154  void
156  {
157  for (auto const& val : triggerPathsInfo_.workers()) {
158  auto& w = *val.second;
160  continue;
161  }
162  w.respondToCloseOutputFiles(fb);
163  }
164  if (results_inserter_) {
165  results_inserter_->respondToCloseOutputFiles(fb);
166  }
167  }
168 
169  void
171  {
173  for (auto& path : triggerPathsInfo_.paths()) {
174  path.process(trans, principal);
175  }
176  }
177 
179  public:
181  WaitingTaskPtr const endPathTask,
182  EventPrincipal& principal,
183  GlobalTaskGroup& group)
184  : schedule_{schedule}
185  , endPathTask_{endPathTask}
186  , principal_{principal}
187  , taskGroup_{group}
188  {}
189 
190  void
191  operator()(exception_ptr const ex)
192  {
193  auto const scheduleID = schedule_->sc_.id();
194 
195  TDEBUG_BEGIN_TASK_SI(4, scheduleID);
196  if (ex) {
197  taskGroup_.may_run(endPathTask_, ex);
198  TDEBUG_END_TASK_SI(4, scheduleID)
199  << "trigger path processing terminate because of EXCEPTION";
200  return;
201  }
202 
203  try {
204  schedule_->process_event_paths_done(principal_);
205  taskGroup_.may_run(endPathTask_);
206  }
207  catch (...) {
208  taskGroup_.may_run(endPathTask_, current_exception());
209  };
210 
211  // Start the endPathTask going.
212  TDEBUG_END_TASK_SI(4, scheduleID);
213  }
214 
215  private:
217  WaitingTaskPtr const endPathTask_;
220  };
221 
222  void
223  TriggerPathsExecutor::process_event(WaitingTaskPtr endPathTask,
224  EventPrincipal& event_principal)
225  {
226  // We get here as part of the readAndProcessEventTask (schedule
227  // head task).
228  auto const scheduleID = sc_.id();
229  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
230  if (results_inserter_) {
231  results_inserter_->reset();
232  }
235  try {
236  if (triggerPathsInfo_.paths().empty()) {
237  auto pathsDoneTask = make_waiting_task<PathsDoneTask>(
238  this, endPathTask, event_principal, taskGroup_);
239  taskGroup_.may_run(pathsDoneTask);
240  TDEBUG_END_FUNC_SI(4, scheduleID);
241  return;
242  }
243  auto pathsDoneTask = std::make_shared<WaitingTask>(
244  PathsDoneTask{this, endPathTask, event_principal, taskGroup_},
245  triggerPathsInfo_.paths().size());
246  for (auto& path : triggerPathsInfo_.paths()) {
247  // Start each path running. The path will start a spawn chain
248  // going to run each worker in the order specified on the
249  // path, and when they have all been run, it will call
250  // doneWaiting() on the pathsDoneTask, which decrements its
251  // reference count, which will eventually cause it to run when
252  // every path has finished.
253  path.process(pathsDoneTask, event_principal);
254  }
255  TDEBUG_END_FUNC_SI(4, scheduleID);
256  }
257  catch (...) {
258  taskGroup_.may_run(endPathTask, current_exception());
259  TDEBUG_END_FUNC_SI(4, scheduleID) << "because of EXCEPTION";
260  }
261  }
262 
263  void
265  {
266  // We come here as part of the pathsDoneTask.
267  auto const scheduleID = sc_.id();
268  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
269  try {
272  }
273  if (results_inserter_) {
274  // FIXME: not sure what the trigger bit should be
275  auto const& resultsInserterDesc = results_inserter_->description();
276  PathContext const pc{sc_,
278  {resultsInserterDesc.moduleLabel()}};
279  ModuleContext const mc{pc, resultsInserterDesc};
280  results_inserter_->doWork_event(principal, mc);
281  }
282  }
283  catch (cet::exception& e) {
284  auto action = actionTable_.find(e.root_cause());
286  assert(action != actions::FailPath);
287  assert(action != actions::FailModule);
288  if (action != actions::SkipEvent) {
289  TDEBUG_END_FUNC_SI(4, scheduleID);
290  throw;
291  }
292  mf::LogWarning(e.category())
293  << "An exception occurred inserting the TriggerResults object:\n"
294  << cet::trim_right_copy(e.what(), " \n");
295  }
296  TDEBUG_END_FUNC_SI(4, scheduleID);
297  }
298 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
ScheduleContext const sc_
std::unique_ptr< Worker > results_inserter_
HLTGlobalStatus & pathResults()
Definition: PathsInfo.cc:89
std::vector< Path > & paths()
Definition: PathsInfo.cc:51
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:71
#define TDEBUG_END_TASK_SI(LEVEL, SI)
PathsDoneTask(TriggerPathsExecutor *const schedule, WaitingTaskPtr const endPathTask, EventPrincipal &principal, GlobalTaskGroup &group)
void process(Transition, Principal &)
bool skip_non_replicated(Worker const &)
error
Definition: include.cc:26
STL namespace.
QTextStream & hex(QTextStream &s)
void respondToOpenOutputFiles(FileBlock const &)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:19
ActionTable const & actionTable_
std::string trim_right_copy(std::string source, std::string const &t=" ")
Definition: trim.h:54
const double e
Transition
Definition: Transition.h:7
void respondToOpenInputFile(FileBlock const &)
def move(depos, offset)
Definition: depos.py:107
#define TDEBUG_FUNC_SI(LEVEL, SI)
void process_event_paths_done(EventPrincipal &)
void reset_for_event()
Definition: PathsInfo.cc:82
QTextStream & dec(QTextStream &s)
void respondToCloseInputFile(FileBlock const &)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
void incrementPassedEventCount()
Definition: PathsInfo.cc:101
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
void process_event(hep::concurrency::WaitingTaskPtr endPathTask, EventPrincipal &)
Namespace containing all the test actions.
static auto art_path_spec()
Definition: PathContext.h:32
void reset()
Definition: PathsInfo.cc:74
void respondToCloseOutputFiles(FileBlock const &)
void beginJob(detail::SharedResources const &resources)
void incrementTotalEventCount()
Definition: PathsInfo.cc:95
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33