Worker.h
Go to the documentation of this file.
1 #ifndef art_Framework_Principal_Worker_h
2 #define art_Framework_Principal_Worker_h
3 // vim: set sw=2 expandtab :
4 
5 // ======================================================================
6 // Worker: this is a basic scheduling unit - an abstract base class to
7 // something that is really a producer or filter.
8 //
9 // A worker will not actually call through to the module unless it is
10 // in a Ready state. After a module is actually run, the state will
11 // not be Ready. The Ready state can only be reestablished by doing a
12 // reset().
13 //
14 // Pre/post module signals are posted only in the Ready state.
15 //
16 // Execution statistics are kept here.
17 //
18 // If a module has thrown an exception during execution, that
19 // exception will be rethrown if the worker is entered again and the
20 // state is not Ready. In other words, execution results (status) are
21 // cached and reused until the worker is reset().
22 // ======================================================================
23 
28 #include "hep_concurrency/WaitingTaskList.h"
29 
30 #include <atomic>
31 #include <exception>
32 #include <string>
33 #include <vector>
34 
35 namespace hep::concurrency {
36  class SerialTaskQueueChain;
37 }
38 
39 namespace art {
40  class ActivityRegistry;
41  class ModuleContext;
42  class FileBlock;
43  class RunPrincipal;
44  class SubRunPrincipal;
45  class EventPrincipal;
46  namespace detail {
47  class SharedResources;
48  }
49 
50  class Worker {
51  friend class RunWorkerFunctor;
52 
53  public:
54  enum State { Ready, Pass, Fail, Working, ExceptionThrown };
55 
56  virtual ~Worker() = default;
57  Worker(ModuleDescription const&, WorkerParams const&);
58 
59  void beginJob(detail::SharedResources const&);
60  void endJob();
61  void respondToOpenInputFile(FileBlock const& fb);
62  void respondToCloseInputFile(FileBlock const& fb);
63  void respondToOpenOutputFiles(FileBlock const& fb);
64  void respondToCloseOutputFiles(FileBlock const& fb);
65  bool doWork(Transition, Principal&, ModuleContext const&);
66 
67  void doWork_event(hep::concurrency::WaitingTaskPtr workerInPathDoneTask,
69  ModuleContext const&);
70 
71  // This is used only to do trigger results insertion.
72  void doWork_event(EventPrincipal&, ModuleContext const&);
73 
75  scheduleID() const
76  {
77  return scheduleID_;
78  }
79  ModuleDescription const& description() const;
80  std::string const& label() const;
81 
82  // Used only by WorkerInPath.
83  bool returnCode() const;
84 
85  hep::concurrency::SerialTaskQueueChain* serialTaskQueueChain() const;
86 
87  // Used by EventProcessor
88  // Used by Schedule
89  // Used by EndPathExecutor
90  void reset();
91 
92  // Used only by writeSummary
93  std::size_t timesVisited() const;
94  std::size_t timesRun() const;
95  std::size_t timesPassed() const;
96  std::size_t timesFailed() const;
97  std::size_t timesExcept() const;
98 
99  void runWorker(EventPrincipal&, ModuleContext const&);
100 
101  protected:
102  virtual std::string workerType() const = 0;
103  virtual hep::concurrency::SerialTaskQueueChain* implSerialTaskQueueChain()
104  const = 0;
105  virtual void implBeginJob(detail::SharedResources const& resources) = 0;
106  virtual void implEndJob() = 0;
107  virtual bool implDoBegin(RunPrincipal& rp, ModuleContext const& mc) = 0;
108  virtual bool implDoEnd(RunPrincipal& rp, ModuleContext const& mc) = 0;
109  virtual bool implDoBegin(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
110  virtual bool implDoEnd(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
111  virtual bool implDoProcess(EventPrincipal&, ModuleContext const&) = 0;
112 
113  private:
114  // API implementation classes must use to provide their API to us
115  virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
116  virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
117  virtual void implRespondToOpenOutputFiles(FileBlock const& fb) = 0;
118  virtual void implRespondToCloseOutputFiles(FileBlock const& fb) = 0;
119 
124  std::atomic<int> state_{Ready};
125 
126  // if state is 'exception'
127  // Note: threading: There is no accessor for this data, the only
128  // way it is ever used is from the doWork* functions. Right now
129  // event processing only sets it, but run and subrun processing
130  // reads it. It is not clear that event processing needs this
131  // anymore, and if we go to multiple runs and subruns in flight,
132  // they may not need it anymore as well. For now, leave this, is
133  // not thread safe.
134  std::exception_ptr cached_exception_{};
135 
136  std::atomic<bool> workStarted_{false};
137  std::atomic<bool> returnCode_{false};
138 
139  // Holds the waiting workerInPathDone tasks. Note: For shared
140  // modules the workers are shared. For replicated modules each
141  // schedule has its own private worker copies (the whole reason
142  // schedules exist!).
143  hep::concurrency::WaitingTaskList waitingTasks_;
144 
145  protected:
146  std::atomic<std::size_t> counts_visited_{};
147  std::atomic<std::size_t> counts_run_{};
148  std::atomic<std::size_t> counts_passed_{};
149  std::atomic<std::size_t> counts_failed_{};
150  std::atomic<std::size_t> counts_thrown_{};
151  };
152 
153 } // namespace art
154 
155 #endif /* art_Framework_Principal_Worker_h */
156 
157 // Local Variables:
158 // mode: c++
159 // End:
std::string string
Definition: nybbler.cc:12
ActionTable const & actions_
Definition: Worker.h:122
QTextStream & reset(QTextStream &s)
Transition
Definition: Transition.h:7
void beginJob()
Definition: Breakpoints.cc:14
ScheduleID scheduleID() const
Definition: Worker.h:75
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
ScheduleID const scheduleID_
Definition: Worker.h:120
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143