DecrepitRelicInputSourceImplementation.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
10 
11 #include <cassert>
12 #include <memory>
13 #include <string>
14 
15 using namespace std;
16 
17 namespace art {
18 
19  DecrepitRelicInputSourceImplementation::Config::~Config() = default;
20 
22  : maxEvents{fhicl::Name("maxEvents"), -1}
23  , maxSubRuns{fhicl::Name("maxSubRuns"), -1}
24  , reportFrequency{fhicl::Name("reportFrequency"), 1}
25  , errorOnFailureToPut{fhicl::Name("errorOnFailureToPut"), false}
26  , processingMode{fhicl::Name("processingMode"), defaultMode()}
27  {}
28 
29  // Note: static.
30  char const*
32  {
33  return "RunsSubRunsAndEvents";
34  }
35 
37  ~DecrepitRelicInputSourceImplementation() noexcept = default;
38 
43  ModuleDescription const& desc)
44  : InputSource{desc}
50  {
51  if (reportFrequency_ < 0) {
53  << "reportFrequency has a negative value, which is not meaningful.";
54  }
55  std::string const runMode{"Runs"};
56  std::string const runSubRunMode{"RunsAndSubRuns"};
58  if (processingMode == runMode) {
60  } else if (processingMode == "RunsAndSubRuns") {
62  } else if (processingMode != Config::defaultMode()) {
64  << "DecrepitRelicInputSourceImplementation::"
65  "DecrepitRelicInputSourceImplementation()\n"
66  << "The 'processingMode' parameter for sources has an illegal value '"
67  << processingMode << "'\n"
68  << "Legal values are '" << Config::defaultMode() << "', '"
69  << runSubRunMode << "', or '" << runMode << "'.\n";
70  }
71  }
72 
75  {
76  auto const oldState = state_;
77  if (remainingEvents_ == 0) {
78  // If the maximum event limit has been reached, stop.
80  return state_;
81  }
82  if (remainingSubRuns_ == 0) {
83  // If the maximum subRun limit has been reached, stop
84  // when reaching a new file, run, or subRun.
85  if (oldState == input::IsInvalid || oldState == input::IsFile ||
88  return state_;
89  }
90  }
91  auto newState = getNextItemType();
92  while (true) {
93  if ((newState == input::IsEvent) &&
95  newState = getNextItemType();
96  continue;
97  }
98  if ((newState == input::IsSubRun) && (processingMode_ == Runs)) {
99  newState = getNextItemType();
100  continue;
101  }
102  break;
103  }
104  if (newState == input::IsStop) {
106  // FIXME: upon the advent of a catalog system which can do
107  // something intelligent with the difference between whole-file
108  // success, partial-file success, partial-file failure and
109  // whole-file failure (such as file-open failure), we will need to
110  // communicate that difference here. The file disposition options
111  // as they are now (and the mapping to any concrete implementation
112  // we are are aware of currently) are not sufficient to the task,
113  // so we deliberately do not distinguish here between partial-file
114  // and whole-file success in particular.
115  finish();
116  return state_;
117  } else if (newState == input::IsFile || oldState == input::IsInvalid) {
119  return state_;
120  } else if (newState == input::IsRun || oldState == input::IsFile) {
122  return state_;
123  } else if (newState == input::IsSubRun || oldState == input::IsRun) {
124  assert(processingMode_ != Runs);
126  return state_;
127  }
130  return state_;
131  }
132 
133  // Return a dummy file block.
134  std::unique_ptr<FileBlock>
136  {
137  assert(state_ == input::IsFile);
138  assert((remainingEvents_ != 0) && (remainingSubRuns_ != 0));
139  return readFile_();
140  }
141 
142  // Return a dummy file block.
143  // This function must be overridden for any input source that reads a file
144  // containing Products. Such a function should update the
145  // UpdateOutputCallbacks to reflect the products found in this new file.
146  unique_ptr<FileBlock>
148  {
149  return make_unique<FileBlock>();
150  }
151 
152  void
154  {
155  return closeFile_();
156  }
157 
158  void
160  {}
161 
162  unique_ptr<RunPrincipal>
164  {
165  // Note: For the moment, we do not support saving and restoring the state of
166  // the random number generator if random numbers are generated during
167  // processing of runs (e.g. beginRun(), endRun())
168  assert(state_ == input::IsRun);
169  assert((remainingEvents_ != 0) && (remainingSubRuns_ != 0));
170  return readRun_();
171  }
172 
173  unique_ptr<SubRunPrincipal>
176  {
177  // Note: For the moment, we do not support saving and restoring the state of
178  // the random number generator if random numbers are generated during
179  // processing of subRuns (e.g. beginSubRun(), endSubRun())
180  assert(state_ == input::IsSubRun);
181  assert((remainingEvents_ != 0) && (remainingSubRuns_ != 0));
183  auto srp = readSubRun_(rp);
184  srp->setRunPrincipal(rp);
185  return srp;
186  }
187 
188  unique_ptr<EventPrincipal>
191  {
192  assert(state_ == input::IsEvent);
193  assert(remainingEvents_ != 0);
194  auto ep = readEvent_();
195  assert(srp->run() == ep->run());
196  assert(srp->subRun() == ep->subRun());
197  ep->setSubRunPrincipal(srp);
198  if (ep.get() != nullptr) {
199  if (remainingEvents_ > 0) {
201  }
204  issueReports(ep->eventID());
205  }
206  }
207  return ep;
208  }
209 
210  void
212  {
213  beginJob();
214  }
215 
216  void
218  {}
219 
220  void
222  {
223  endJob();
224  }
225 
226  void
228  {}
229 
230  void
232  {
233  skip(offset);
234  }
235 
236  void
238  {
240  << "DecrepitRelicInputSourceImplementation::skip()\n"
241  << "Random access is not implemented for this type of Input Source\n"
242  << "Contact a Framework Developer\n";
243  }
244 
245  // Begin again at the first event
246  void
248  {
252  rewind_();
253  }
254 
255  void
257  {
259  << "DecrepitRelicInputSourceImplementation::rewind()\n"
260  << "Rewind is not implemented for this type of Input Source\n"
261  << "Contact a Framework Developer\n";
262  }
263 
264  // RunsSubRunsAndEvents (default), RunsAndSubRuns, or Runs.
267  {
268  return processingMode_;
269  }
270 
271  // Accessor for maximum number of events to be read.
272  // -1 is used for unlimited.
273  int
275  {
276  return maxEvents_;
277  }
278 
279  // Accessor for remaining number of events to be read.
280  // -1 is used for unlimited.
281  int
283  {
284  return remainingEvents_;
285  }
286 
287  // Accessor for maximum number of subRuns to be read.
288  // -1 is used for unlimited.
289  int
291  {
292  return maxSubRuns_;
293  }
294 
295  // Accessor for remaining number of subRuns to be read.
296  // -1 is used for unlimited.
297  int
299  {
300  return remainingSubRuns_;
301  }
302 
303  // Reset the remaining number of events/subRuns to the maximum number.
304  void
306  {
309  }
310 
311  void
313  {
315  }
316 
319  {
320  return state_;
321  }
322 
323  void
325  {
326  state_ = state;
327  }
328 
329  void
331  {
333  }
334 
335 } // namespace art
virtual std::unique_ptr< SubRunPrincipal > readSubRun_(cet::exempt_ptr< RunPrincipal const >)=0
std::string string
Definition: nybbler.cc:12
virtual std::unique_ptr< RunPrincipal > readRun() override
ChannelGroupService::Name Name
STL namespace.
virtual std::unique_ptr< EventPrincipal > readEvent_()=0
virtual input::ItemType getNextItemType()=0
virtual std::unique_ptr< RunPrincipal > readRun_()=0
typename config_impl< T >::type Config
Definition: ModuleMacros.h:52
void issue_reports(unsigned count, EventID const &id)
static Config * config
Definition: config.cpp:1054
DecrepitRelicInputSourceImplementation(fhicl::TableFragment< Config > const &, ModuleDescription const &)
virtual std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const >) override
virtual std::unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const >) override
virtual ~DecrepitRelicInputSourceImplementation() noexcept=0
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::unique_ptr< FileBlock > readFile() override
Read next file.