EmptyEvent_source.cc
Go to the documentation of this file.
1 // vim: set sw=2 expandtab :
2 
23 #include "fhiclcpp/ParameterSet.h"
24 #include "fhiclcpp/types/Atom.h"
29 
30 #include <cassert>
31 #include <chrono>
32 #include <cstdint>
33 #include <memory>
34 #include <thread>
35 
36 using namespace fhicl;
37 using namespace std;
38 using namespace std::chrono_literals;
39 using std::chrono::steady_clock;
41 
42 namespace art {
43 
44  class EmptyEvent final : public DRISI {
45  public:
46  struct Config {
47 
50  Atom<int> numberEventsInRun{Name("numberEventsInRun"),
51  drisi_config().maxEvents()};
52  Atom<int> numberEventsInSubRun{Name("numberEventsInSubRun"),
53  drisi_config().maxSubRuns()};
54  Atom<uint32_t> maxTime{
55  Name("maxTime"),
56  Comment(
57  "If specified, the 'maxTime' parameter indicates the maximum "
58  "allowed\n"
59  "wall-clock time (in seconds) for which new events may be created.\n"
60  "This option is mutually exclusive with the 'maxEvents' and "
61  "'maxSubRuns'\n"
62  "configuration parameters."),
64  Atom<uint32_t> eventCreationDelay{
65  Name("eventCreationDelay"),
66  Comment("The 'eventCreationDelay' parameter is an integral value\n"
67  "in the range [0, 1000000), corresponding to microseconds.\n"
68  "If specified, the input source will sleep for the specified "
69  "duration\n"
70  "of time before each new event, subrun, or run is created.\n"),
71  0u};
72  Atom<bool> resetEventOnSubRun{Name("resetEventOnSubRun"), true};
73  OptionalAtom<RunNumber_t> firstRun{Name("firstRun")};
74  OptionalAtom<SubRunNumber_t> firstSubRun{Name("firstSubRun")};
75  OptionalAtom<EventNumber_t> firstEvent{Name("firstEvent")};
76  OptionalDelegatedParameter timestampPlugin{
77  Name("timestampPlugin"),
78  Comment(
79  "The 'timestampPlugin' parameter must be a FHiCL table\n"
80  "of the form:\n\n"
81  " timestampPlugin: {\n"
82  " plugin_type: <plugin specification>\n"
83  " ...\n"
84  " }\n\n"
85  "See the notes in art/Framework/Core/EmptyEventTimestampPlugin.h\n"
86  "for more details.")};
87 
88  struct KeysToIgnore {
89  std::set<std::string>
90  operator()() const
91  {
92  return {"module_label"};
93  }
94  };
95  };
96 
98 
100 
101  EmptyEvent(EmptyEvent const&) = delete;
102  EmptyEvent(EmptyEvent&&) = delete;
103  EmptyEvent& operator=(EmptyEvent const&) = delete;
104  EmptyEvent& operator=(EmptyEvent&&) = delete;
105 
106  private:
107  unique_ptr<RangeSetHandler> runRangeSetHandler() override;
108  unique_ptr<RangeSetHandler> subRunRangeSetHandler() override;
109  input::ItemType getNextItemType() override;
110  unique_ptr<RunPrincipal> readRun_() override;
111  unique_ptr<SubRunPrincipal> readSubRun_(
113  unique_ptr<EventPrincipal> readEvent_() override;
114  unique_ptr<EmptyEventTimestampPlugin> makePlugin_(
115  OptionalDelegatedParameter const& maybeConfig);
116  void beginJob() override;
117  void endJob() override;
118  void skip(int offset) override;
119  void rewind_() override;
120 
121  unsigned const numberEventsInRun_;
122  unsigned const numberEventsInSubRun_;
123  steady_clock::time_point const beginTime_{steady_clock::now()};
126  unsigned numberEventsInThisRun_{};
127  unsigned numberEventsInThisSubRun_{};
128  EventID origEventID_{};
129  EventID eventID_{};
130  bool firstTime_{true};
131  bool newFile_{true};
132  bool newRun_{true};
133  bool newSubRun_{true};
135  cet::BasicPluginFactory pluginFactory_{};
136  unique_ptr<EmptyEventTimestampPlugin> plugin_;
137  };
138 
139 } // namespace art
140 
144  desc.moduleDescription}
145  , numberEventsInRun_{static_cast<uint32_t>(config().numberEventsInRun())}
146  , numberEventsInSubRun_{static_cast<uint32_t>(
148  , maxTime_{config().maxTime()}
151  , plugin_{makePlugin_(config().timestampPlugin)}
152 {
153  // Additional configuration checking which is cumbersome to do with
154  // the FHiCL validation system.
155  auto const& pset = config.get_PSet();
156  if (pset.has_key("maxTime") &&
157  (pset.has_key("maxEvents") || pset.has_key("maxSubRuns"))) {
158  throw Exception{
160  "An error occurred while configuring the EmptyEvent source.\n"}
161  << "The 'maxTime' parameter cannot be used with the 'maxEvents' or "
162  "'maxSubRuns' parameters.\n"
163  "Type 'art --print-description EmptyEvent' for the allowed "
164  "configuration.\n";
165  }
166 
167  RunNumber_t firstRun{};
168  bool haveFirstRun = config().firstRun(firstRun);
169  SubRunNumber_t firstSubRun{};
170  bool haveFirstSubRun = config().firstSubRun(firstSubRun);
171  EventNumber_t firstEvent{};
172  bool haveFirstEvent = config().firstEvent(firstEvent);
173  RunID firstRunID = haveFirstRun ? RunID(firstRun) : RunID::firstRun();
174  SubRunID firstSubRunID = haveFirstSubRun ?
175  SubRunID(firstRunID.run(), firstSubRun) :
176  SubRunID::firstSubRun(firstRunID);
177  origEventID_ =
178  haveFirstEvent ?
179  EventID(firstSubRunID.run(), firstSubRunID.subRun(), firstEvent) :
180  EventID::firstEvent(firstSubRunID);
182 }
183 
186 {
187  // Trigger framework stop if max allowed time is exceeded.
188  // N.B. Since the begin time corresponds to source construction and
189  // not the actual event loop, there will be minor differences wrt
190  // the time reported for executing a given job.
192  return input::IsStop;
193  }
194  // First check for sanity because skip(offset) can be abused and so can the
195  // ctor.
196  if (!eventID_.runID().isValid()) {
197  return input::IsStop;
198  }
199  if (!eventID_.subRunID().isValid()) {
200  return input::IsStop;
201  }
202  if (!eventID_.isValid()) {
203  return input::IsStop;
204  }
205  if (newFile_) {
206  newFile_ = false;
207  return input::IsFile;
208  }
209  if (newRun_) {
210  newRun_ = false;
211  if (eventCreationDelay_ > 0ms) {
212  std::this_thread::sleep_for(eventCreationDelay_);
213  }
214  return input::IsRun;
215  }
216  if (newSubRun_) {
217  newSubRun_ = false;
218  if (eventCreationDelay_ > 0ms) {
219  std::this_thread::sleep_for(eventCreationDelay_);
220  }
221  return input::IsSubRun;
222  }
223  if ((numberEventsInRun_ > 0) &&
225  // Time to switch runs.
226  newRun_ = false;
227  newSubRun_ = true;
230  eventID_ = EventID(
232  firstTime_ = true;
233  if (eventCreationDelay_ > 0ms) {
234  std::this_thread::sleep_for(eventCreationDelay_);
235  }
236  return input::IsRun;
237  }
238  if ((numberEventsInSubRun_ > 0) &&
240  // Time to switch subruns.
241  newRun_ = false;
242  newSubRun_ = false;
244  if (resetEventOnSubRun_) {
246  } else {
248  }
249  firstTime_ = true;
250  if (eventCreationDelay_ > 0ms) {
251  std::this_thread::sleep_for(eventCreationDelay_);
252  }
253  return input::IsSubRun;
254  }
255  // same run and subrun
256  if (!firstTime_) {
257  eventID_ = eventID_.next();
258  if (!eventID_.runID().isValid()) {
259  return input::IsStop;
260  }
261  }
262  firstTime_ = false;
265  if (eventCreationDelay_ > 0ms) {
266  std::this_thread::sleep_for(eventCreationDelay_);
267  }
268  return input::IsEvent;
269 }
270 
271 unique_ptr<art::RunPrincipal>
273 {
274  unique_ptr<RunPrincipal> result;
275  auto ts = plugin_ ? plugin_->doBeginRunTimestamp(eventID_.runID()) :
277  RunAuxiliary const runAux{
279  result = make_unique<RunPrincipal>(runAux, processConfiguration(), nullptr);
280  assert(result.get() != nullptr);
281  if (plugin_) {
282  ModuleContext const mc{moduleDescription()};
283  Run const r{*result, mc};
284  plugin_->doBeginRun(r);
285  }
286  return result;
287 }
288 
289 unique_ptr<art::SubRunPrincipal>
291 {
292  unique_ptr<SubRunPrincipal> result;
293  if (processingMode() == Runs) {
294  return result;
295  }
296  auto ts = plugin_ ? plugin_->doBeginSubRunTimestamp(eventID_.subRunID()) :
298  SubRunAuxiliary const subRunAux{
300  result =
301  make_unique<SubRunPrincipal>(subRunAux, processConfiguration(), nullptr);
302  assert(result.get() != nullptr);
303  result->setRunPrincipal(rp);
304  if (plugin_) {
305  ModuleContext const mc{moduleDescription()};
306  SubRun const sr{*result, mc};
307  plugin_->doBeginSubRun(sr);
308  }
309  return result;
310 }
311 
312 unique_ptr<art::EventPrincipal>
314 {
315  unique_ptr<EventPrincipal> result;
317  return result;
318  }
319  auto timestamp = plugin_ ? plugin_->doEventTimestamp(eventID_) :
321  EventAuxiliary const eventAux{
322  eventID_, timestamp, false, EventAuxiliary::Any};
323  result = make_unique<EventPrincipal>(eventAux,
325  nullptr,
326  make_unique<History>(),
327  make_unique<NoDelayedReader>(),
330  assert(result.get() != nullptr);
331  return result;
332 }
333 
334 unique_ptr<art::RangeSetHandler>
336 {
337  return make_unique<OpenRangeSetHandler>(eventID_.run());
338 }
339 
340 unique_ptr<art::RangeSetHandler>
342 {
343  return make_unique<OpenRangeSetHandler>(eventID_.run());
344 }
345 
346 void
348 {
349  if (plugin_) {
350  plugin_->doBeginJob();
351  }
352 }
353 
354 void
356 {
357  if (plugin_) {
358  plugin_->doEndJob();
359  }
360 }
361 
362 std::unique_ptr<art::EmptyEventTimestampPlugin>
364 {
365  std::unique_ptr<EmptyEventTimestampPlugin> result;
366  try {
367  ParameterSet pset;
368  if (maybeConfig.get_if_present(pset)) {
369  auto const libspec = pset.get<std::string>("plugin_type");
370  auto const pluginType = pluginFactory_.pluginType(libspec);
371  if (pluginType ==
373  result = pluginFactory_.makePlugin<decltype(result)>(libspec, pset);
374  } else {
375  throw Exception(errors::Configuration, "EmptyEvent: ")
376  << "unrecognized plugin type " << pluginType << "for plugin "
377  << libspec << ".\n";
378  }
379  }
380  }
381  catch (cet::exception& e) {
382  throw Exception(errors::Configuration, "EmptyEvent: ", e)
383  << "Exception caught while processing plugin spec.\n";
384  }
385  return result;
386 }
387 
388 void
390 {
391  for (; offset < 0; ++offset) {
393  }
394  for (; offset > 0; --offset) {
395  eventID_ = eventID_.next();
396  }
397 }
398 
399 void
401 {
402  if (plugin_) {
403  plugin_->doRewind();
404  }
405  firstTime_ = true;
406  newFile_ = true;
407  newRun_ = true;
408  newSubRun_ = true;
412 }
413 
EventID previous() const
Definition: EventID.h:159
bool isValid() const
Definition: EventID.h:122
RunID const & runID() const
Definition: EventID.h:92
OptionalAtom< EventNumber_t > firstEvent
SubRunID const & subRunID() const
Definition: EventID.h:104
std::chrono::microseconds const eventCreationDelay_
std::optional< T > get_if_present() const
EventID next() const
Definition: EventID.h:134
EmptyEvent(Parameters const &config, InputSourceDescription &desc)
unique_ptr< SubRunPrincipal > readSubRun_(cet::exempt_ptr< RunPrincipal const >) override
OptionalAtom< RunNumber_t > firstRun
TableFragment< DRISI::Config > drisi_config
static QCString result
unsigned numberEventsInThisRun_
ModuleDescription const & moduleDescription() const
Definition: InputSource.cc:12
std::string string
Definition: nybbler.cc:12
unique_ptr< EmptyEventTimestampPlugin > makePlugin_(OptionalDelegatedParameter const &maybeConfig)
unsigned const numberEventsInRun_
ChannelGroupService::Name Name
std::enable_if_t<!std::is_function_v< RESULT_TYPE >, RESULT_TYPE > makePlugin(std::string const &libspec, ARGS &&...args) const
STL namespace.
OptionalAtom< SubRunNumber_t > firstSubRun
ModuleType module_type(std::string const &full_key)
microsecond microseconds
Alias for common language habits.
Definition: spacetime.h:122
unique_ptr< RangeSetHandler > runRangeSetHandler() override
RunNumber_t run() const
Definition: RunID.h:64
RunNumber_t run() const
Definition: EventID.h:98
bool isValid() const
Definition: SubRunID.h:97
void rewind_() override
Definition: Run.h:17
steady_clock::time_point const beginTime_
static constexpr double ms
Definition: Units.h:96
const double e
void beginJob()
Definition: Breakpoints.cc:14
second seconds
Alias for common language habits.
Definition: spacetime.h:88
static Config * config
Definition: config.cpp:1054
unsigned const numberEventsInSubRun_
std::chrono::seconds const maxTime_
RunNumber_t run() const
Definition: SubRunID.h:85
T get(std::string const &key) const
Definition: ParameterSet.h:271
cet::BasicPluginFactory pluginFactory_
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
void skip(int offset) override
EventID nextSubRun(EventNumber_t first=IDNumber< Level::Event >::first()) const
Definition: EventID.h:147
unique_ptr< EmptyEventTimestampPlugin > plugin_
static int max(int a, int b)
static SubRunID firstSubRun()
Definition: SubRunID.h:153
void endJob() override
unique_ptr< EventPrincipal > readEvent_() override
static constexpr Timestamp invalidTimestamp()
Definition: Timestamp.h:82
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define Comment
std::string pluginType(std::string const &libspec) const
std::set< std::string > operator()() const
void beginJob() override
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
ProcessConfiguration const & processConfiguration() const
Definition: InputSource.cc:18
unique_ptr< RunPrincipal > readRun_() override
bool isValid() const
Definition: RunID.h:70
ModuleDescription const & moduleDescription
EventNumber_t event() const
Definition: EventID.h:116
SubRunNumber_t subRun() const
Definition: SubRunID.h:91
static RunID firstRun()
Definition: RunID.h:116
bool const resetEventOnSubRun_
EventID nextRun() const
Definition: EventID.h:153
static EventID firstEvent()
Definition: EventID.h:190
static constexpr double sr
Definition: Units.h:166
#define DEFINE_ART_INPUT_SOURCE(klass)
Atom< uint32_t > eventCreationDelay
SubRunNumber_t subRun() const
Definition: EventID.h:110
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
unsigned numberEventsInThisSubRun_
unique_ptr< RangeSetHandler > subRunRangeSetHandler() override
input::ItemType getNextItemType() override
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:120