EventProcessor.h
Go to the documentation of this file.
1 #ifndef art_Framework_EventProcessor_EventProcessor_h
2 #define art_Framework_EventProcessor_EventProcessor_h
3 // vim: set sw=2 expandtab :
4 
5 // ===========================
6 // The art application object.
7 // ===========================
8 
34 #include "cetlib/cpu_timer.h"
35 #include "fhiclcpp/fwd.h"
36 #include "hep_concurrency/thread_sanitize.h"
37 
38 #include <atomic>
39 #include <memory>
40 
41 namespace art {
42 
44  public:
45  // Status codes:
46  // 0 successful completion
47  // 3 signal received
48  // values are for historical reasons.
49  enum StatusCode { epSuccess = 0, epSignal = 3 };
50 
51  // Special Member Functions
52  explicit EventProcessor(fhicl::ParameterSet const& pset,
53  detail::EnabledModules const& enabled_modules);
54  EventProcessor(EventProcessor const&) = delete;
55  EventProcessor(EventProcessor&&) = delete;
56  EventProcessor& operator=(EventProcessor const&) = delete;
58 
59  // API to run_art
60  //
61  // Run the job until done, which means:
62  //
63  // - no more input data, or
64  // - input maxEvents parameter limit reached, or
65  // - output maxEvents parameter limit reached, or
66  // - input maxSubRuns parameter limit reached.
67  //
68  // Return values:
69  //
70  // epSignal: processing terminated early, SIGUSR2 encountered
71  // epSuccess: all other cases
72  //
74 
75  private:
76  class EndPathTask;
77  class EndPathRunnerTask;
78 
79  // Event-loop infrastructure
82  void processEventAsync(ScheduleID sid);
83  void finishEventAsync(ScheduleID sid);
84 
85  template <Level L>
86  bool levelsToProcess();
87  template <Level L>
88  std::enable_if_t<is_above_most_deeply_nested_level(L)> begin();
89  template <Level L>
90  void process();
91  template <Level L>
92  void finalize();
93  template <Level L>
94  void
96  {}
97  template <Level L>
98  void
100  {}
102 
103  // Level-specific member functions
104  void beginJob();
105  void endJob();
106  void endJobAllSchedules();
107  void openInputFile();
108  bool outputsToOpen();
109  void openSomeOutputFiles();
110  void closeInputFile();
111  void closeSomeOutputFiles();
112  void closeAllOutputFiles();
113  void closeAllFiles();
114  void respondToOpenInputFile();
118  void readRun();
119  void beginRun();
122  void endRun();
123  void writeRun();
124  void readSubRun();
125  void beginSubRun();
128  void endSubRun();
129  void writeSubRun();
130  void readEvent();
131  void processEvent();
132  void writeEvent();
135  void terminateAbnormally_();
136 
137  private:
138  template <typename T>
139  using tsan = hep::concurrency::thread_sanitize<T>;
140 
141  template <typename T>
142  using tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr<T>;
143 
144  Schedule&
146  {
147  return schedules_->at(id);
148  }
149 
150  Schedule&
152  {
153  return schedule(ScheduleID::first());
154  }
155 
158  {
159  return scheduler_->actionTable().find(e.root_cause());
160  }
161 
162  // Next containment level to move to.
163  std::atomic<Level> nextLevel_{Level::ReadyToAdvance};
164 
165  // Utility object to run a functor and collect any exceptions thrown.
167 
168  // Used for timing the job.
170 
171  // Used to keep track of whether or not we have already call beginRun.
172  std::atomic<bool> beginRunCalled_{false};
173 
174  // Used to keep track of whether or not we have already call beginSubRun.
175  std::atomic<bool> beginSubRunCalled_{false};
176 
177  // When set allows runs to end.
178  std::atomic<bool> finalizeRunEnabled_{false};
179 
180  // When set allows subruns to end.
181  std::atomic<bool> finalizeSubRunEnabled_{false};
182 
183  // A signal/slot system for registering a callback to be called
184  // when a specific action is taken by the framework.
186 
187  // Used to update various output fields in logged messages.
189 
190  // List of callbacks which, when invoked, can update the state of
191  // any output modules.
192  // FIXME: Used only in the ctor!
194 
195  // Product descriptions for the products that appear in
196  // produces<T>() clauses in modules. Note that this is the master
197  // copy and must be kept alive until producedProductLookupTables_
198  // is destroyed because it has references to us.
200 
201  // Product lookup tables for the products that appear in
202  // produces<T>() clauses in modules. Note that this also serves as
203  // the master list of produced products and must be kept alive
204  // until no more principals that might use it exist. Also note
205  // that we keep references to the internals of
206  // producedProductDescriptions_.
208 
210 
211  // The entity that manages all configuration data from the
212  // services.scheduler block and (eventually) sets up the TBB task
213  // scheduler.
215 
216  std::unique_ptr<GlobalTaskGroup> taskGroup_{nullptr};
217 
219 
221 
222  // The service subsystem.
224 
225  // Despite the name, this is what parses the paths and modules in
226  // the FHiCL file and creates and owns them.
228 
229  // The source of input data.
231 
232  // The schedule runners.
234 
235  // The currently open primary input file.
237 
238  // The currently active RunPrincipal.
240 
241  // The currently active SubRunPrincipal.
243 
244  // The currently active EventPrincipals.
246 
247  // Are we configured to process empty runs?
248  bool const handleEmptyRuns_;
249 
250  // Are we configured to process empty subruns?
252 
253  // Used to communicate exceptions from worker threads to the main
254  // thread.
256 
257  // Set to true for the first event in a subRun to signal that we
258  // should not advance to the next entry. Note that this is shared
259  // in common between all the schedules. This is only needed
260  // because we cannot peek ahead to see that the next entry is an
261  // event, we actually must advance to it before we can know.
262  std::atomic<bool> firstEvent_{true};
263 
264  // Are we current switching output files?
265  std::atomic<bool> fileSwitchInProgress_{false};
266  };
267 
268 } // namespace art
269 
270 #endif /* art_Framework_EventProcessor_EventProcessor_h */
271 
272 // Local Variables:
273 // mode: c++
274 // End:
void readAndProcessAsync(ScheduleID sid)
tsan_unique_ptr< InputSource > input_
tsan< ProductDescriptions > producedProductDescriptions_
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
tsan_unique_ptr< FileBlock > fb_
bool const handleEmptyRuns_
std::atomic< Level > nextLevel_
Schedule & main_schedule()
Level
Definition: Level.h:13
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
hep::concurrency::thread_sanitize_unique_ptr< T > tsan_unique_ptr
EventProcessor & operator=(EventProcessor const &)=delete
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
std::enable_if_t< is_above_most_deeply_nested_level(L)> begin()
void recordOutputModuleClosureRequests()
tsan< cet::cpu_timer > timer_
OutputFileStatus
const double e
void finalizeContainingLevels()
EventProcessor(fhicl::ParameterSet const &pset, detail::EnabledModules const &enabled_modules)
tsan< std::map< ScheduleID, Schedule > > schedules_
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
std::atomic< bool > finalizeSubRunEnabled_
PerScheduleContainer< std::unique_ptr< EventPrincipal > > eventPrincipals_
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
std::atomic< bool > finalizeRunEnabled_
std::atomic< bool > fileSwitchInProgress_
hep::concurrency::thread_sanitize< T > tsan
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
void finishEventAsync(ScheduleID sid)
static ProductTables invalid()
StatusCode runToCompletion()
SharedException sharedException_
void processEventAsync(ScheduleID sid)
Schedule & schedule(ScheduleID const id)
ActivityRegistry actReg_
void beginSubRunIfNotDoneAlready()
tsan< MFStatusUpdater > mfStatusUpdater_
void setSubRunAuxiliaryRangeSetID()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33