EndPathExecutor.h
Go to the documentation of this file.
1 #ifndef art_Framework_Core_EndPathExecutor_h
2 #define art_Framework_Core_EndPathExecutor_h
3 // vim: set sw=2 expandtab :
4 
5 // =========================================================================
6 // Class to handle the execution of the end path. Invoked in all the
7 // right places by the event processor.
8 //
9 // The RangeSetHandlers manage the RangeSets that are to be assigned
10 // to (a) the (Sub)RunAuxiliaries and (b) the (Sub)Run products
11 // produced in the current process. Since all (Sub)Run
12 // products/auxiliaries produced in the current process are written to
13 // all output modules during write(Sub)Run, there is only one relevant
14 // RangeSet for the (Sub)Run at any given time. RangeSets
15 // corresponding to multiple (Sub)Run fragments are aggregated on
16 // input.
17 // =========================================================================
18 
28 #include "hep_concurrency/WaitingTask.h"
29 
30 #include <atomic>
31 #include <memory>
32 #include <set>
33 #include <vector>
34 
35 namespace art {
36  class ActivityRegistry;
37  class GlobalTaskGroup;
38  namespace detail {
39  class SharedResources;
40  }
41 
43  friend class Schedule;
44 
45  public:
47  PathManager& pm,
48  ActionTable const& actions,
49  ActivityRegistry const& areg,
51  GlobalTaskGroup& task_group);
52 
53  EndPathExecutor(EndPathExecutor&&) = delete;
54  EndPathExecutor& operator=(EndPathExecutor&&) = delete;
55  EndPathExecutor(EndPathExecutor const&) = delete;
56  EndPathExecutor& operator=(EndPathExecutor const&) = delete;
57 
58  void beginJob(detail::SharedResources const& resources);
59  void endJob();
60 
61  // Input File Open/Close.
62  void selectProducts(ProductTables const&);
63  void respondToOpenInputFile(FileBlock const& fb);
64  void respondToCloseInputFile(FileBlock const& fb);
65  void respondToOpenOutputFiles(FileBlock const& fb);
66  void respondToCloseOutputFiles(FileBlock const& fb);
67  bool someOutputsOpen() const;
68  void closeAllOutputFiles();
69 
70  void seedRunRangeSet(RangeSetHandler const&);
71  void setRunAuxiliaryRangeSetID(RangeSet const& rs);
72  void writeRun(RunPrincipal& rp);
73 
74  void seedSubRunRangeSet(RangeSetHandler const&);
75  void setSubRunAuxiliaryRangeSetID(RangeSet const& rs);
76  void writeSubRun(SubRunPrincipal& srp);
77 
78  // Process Run/SubRun
80 
81  // Process Event
82  //
83  // Used to make sure only one event is being processed at a time.
84  // The schedules take turns having their events processed on a
85  // first-come first-served basis (FIFO).
86  void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask,
88  void writeEvent(EventPrincipal&);
89 
90  // Output File Switching API
91  //
92  // Called by EventProcessor::closeSomeOutputFiles(), which is called when
93  // output file switching is happening. Note: This is really returns
94  // !outputWorkersToClose_.empty()
95  bool outputsToClose() const;
96  // MT note: This is where we need to get all the schedules
97  // synchronized, and then have all schedules do the file
98  // close, and then the file open, then the schedules can
99  // proceed. A nasty complication is that a great deal of
100  // time can go by between the file close and the file
101  // open because artdaq may pause the run in between, and
102  // wants to have all output files closed while the run is
103  // paused. They probably want the input file closed too.
104  void closeSomeOutputFiles();
105  // Note: This really just returns !outputWorkersToOpen_.empty()
106  bool outputsToOpen() const;
107  void openSomeOutputFiles(FileBlock const& fb);
108  // Note: When we are passed OutputFileStatus::Switching, we must close
109  // the file and call openSomeOutputFiles which changes it back
110  // to OutputFileStatus::Open.
111  // A side effect of switching status is the run/subrun/event writes
112  // are not counted in the overall counting by
113  // RootOutputClosingCriteria. However, they are still counted by the
114  // individual counters.
115  void setOutputFileStatus(OutputFileStatus);
116  // Note: What this is really used for is to push workers into
117  // the outputWorkersToClose_ data member.
118  void recordOutputClosureRequests(Granularity);
119  void incrementInputFileNumber();
120  // Return whether or not all of the output workers have
121  // reached their maximum limit of work to do.
122  bool allAtLimit() const;
123 
124  private:
126 
127  // Filled by ctor, const after that.
128  ScheduleContext const sc_;
133  // Filled by ctor, const after that.
134  std::vector<OutputWorker*> outputWorkers_{};
135  // Dynamic, updated by run processing.
136  std::unique_ptr<RangeSetHandler> runRangeSetHandler_{nullptr};
137  // Dynamic, updated by subrun processing.
138  std::unique_ptr<RangeSetHandler> subRunRangeSetHandler_{nullptr};
139 
140  // Output File Switching
141  std::atomic<OutputFileStatus> fileStatus_{OutputFileStatus::Closed};
142  std::set<OutputWorker*> outputWorkersToOpen_{};
143  // Note: During an output file switch, after the closes happen, the entire
144  // contents of this is moved to outputWorkersToOpen_.
145  // FIXME: The move to outputWorkersToOpen_ is not really necessary, a flag
146  // is all we need, something that says whether we should close or open what
147  // is in the list. Basically EventProcessor uses recordOutputClosureRequests
148  // to populate the list, then uses the list to do closes, then uses the same
149  // list to do opens, then clears the list.
150  std::set<OutputWorker*> outputWorkersToClose_{};
151  };
152 } // namespace art
153 
154 // Local Variables:
155 // mode: c++
156 // End:
157 
158 #endif /* art_Framework_Core_EndPathExecutor_h */
def process(f, kind)
Definition: search.py:254
OutputFileStatus
Transition
Definition: Transition.h:7
void beginJob()
Definition: Breakpoints.cc:14
callbacks
Definition: train.py:474
ScheduleContext const sc_
ActionTable const & actionTable_
GlobalTaskGroup & taskGroup_
Namespace containing all the test actions.
ActivityRegistry const & actReg_