RootInputFileSequence.cc
Go to the documentation of this file.
2 // vim: set sw=2:
3 
4 #include "TFile.h"
13 #include "art/Utilities/Globals.h"
15 #include "fhiclcpp/ParameterSet.h"
17 
18 #include <ctime>
19 #include <map>
20 #include <set>
21 #include <stack>
22 #include <string>
23 #include <utility>
24 
25 using namespace cet;
26 using namespace std;
27 
28 namespace art {
29 
30  RootInputFileSequence::RootInputFileSequence(
32  InputFileCatalog& catalog,
33  FastCloningInfoProvider const& fcip,
35  UpdateOutputCallbacks& outputCallbacks,
36  ProcessConfiguration const& processConfig)
37  : catalog_{catalog}
38  , firstFile_{true}
39  , seekingFile_{false}
40  , fileIndexes_(fileCatalogItems().size())
41  , eventsToSkip_{config().skipEvents()}
42  , compactSubRunRanges_{config().compactSubRunRanges()}
43  , noEventSort_{config().noEventSort()}
44  , skipBadFiles_{config().skipBadFiles()}
45  , treeCacheSize_{config().cacheSize()}
46  , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
47  , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
48  , delayedReadEventProducts_{config().delayedReadEventProducts()}
49  , delayedReadSubRunProducts_{config().delayedReadSubRunProducts()}
50  , delayedReadRunProducts_{config().delayedReadRunProducts()}
51  , groupSelectorRules_{config().inputCommands(),
52  "inputCommands",
53  "InputSource"}
54  , dropDescendants_{config().dropDescendantsOfDroppedBranches()}
55  , readParameterSets_{config().readParameterSets()}
56  , fastCloningInfo_{fcip}
57  , processingMode_{pMode}
58  , processConfiguration_{processConfig}
59  , outputCallbacks_{outputCallbacks}
60  {
61  auto const& primaryFileNames = catalog_.fileSources();
62 
63  map<string const, vector<string> const> secondaryFilesMap;
64 
65  std::vector<Config::SecondaryFile> secondaryFiles;
66  if (config().secondaryFileNames(secondaryFiles)) {
67  // Until we can find a way to atomically update the
68  // 'selectedProducts' list for output modules, secondary input
69  // files can be used only in single-threaded, single-schedule
70  // execution.
71  auto const& globals = *Globals::instance();
72  if (globals.nthreads() != 1 && globals.nschedules() != 1) {
73  throw Exception{
75  "An error occurred while creating the RootInput source.\n"}
76  << "This art process is using " << globals.nthreads()
77  << " thread(s) and " << globals.nschedules() << " schedule(s).\n"
78  << "Secondary file names can be used only when 1 thread and 1 "
79  "schedule are specified.\n"
80  << "This is done by specifying '-j=1' at the command line.\n";
81  }
82 
83  for (auto const& val : secondaryFiles) {
84  auto const a = val.a();
85  auto const b = val.b();
86  if (a.empty()) {
88  << "Empty filename found as value of an \"a\" parameter!\n";
89  }
90  for (auto const& name : b) {
91  if (name.empty()) {
93  << "Empty secondary filename found as value of an \"b\" "
94  "parameter!\n";
95  }
96  }
97  secondaryFilesMap.emplace(a, b);
98  }
99  }
100 
102  stk;
103  for (auto const& primaryFileName : primaryFileNames) {
104  vector<string> secondaries;
105  auto SFMI = secondaryFilesMap.find(primaryFileName);
106  if (SFMI == secondaryFilesMap.end()) {
107  // This primary has no secondaries.
108  secondaryFileNames_.push_back(std::move(secondaries));
109  continue;
110  }
111  if (!SFMI->second.size()) {
112  // Has an empty secondary list.
113  secondaryFileNames_.push_back(std::move(secondaries));
114  continue;
115  }
116  stk.emplace_back(SFMI->second.cbegin(), SFMI->second.cend());
117  while (stk.size()) {
118  auto val = stk.back();
119  stk.pop_back();
120  if (val.first == val.second) {
121  // Reached end of this filename list.
122  continue;
123  }
124  auto const& fn = *val.first;
125  ++val.first;
126  secondaries.push_back(fn);
127  auto SI = secondaryFilesMap.find(fn);
128  if (SI == secondaryFilesMap.end()) {
129  // Has no secondary list.
130  if (val.first == val.second) {
131  // Reached end of this filename list.
132  continue;
133  }
134  stk.emplace_back(val.first, val.second);
135  continue;
136  }
137  if (!SI->second.size()) {
138  // Has an empty secondary list.
139  if (val.first == val.second) {
140  // Reached end of this filename list.
141  continue;
142  }
143  stk.emplace_back(val.first, val.second);
144  continue;
145  }
146  stk.emplace_back(val.first, val.second);
147  stk.emplace_back(SI->second.cbegin(), SI->second.cend());
148  }
149  secondaryFileNames_.push_back(std::move(secondaries));
150  }
151  RunNumber_t firstRun{};
152  bool const haveFirstRun{config().hasFirstRun(firstRun)};
153  SubRunNumber_t firstSubRun{};
154  bool const haveFirstSubRun{config().hasFirstSubRun(firstSubRun)};
155  EventNumber_t firstEvent{};
156  bool const haveFirstEvent{config().hasFirstEvent(firstEvent)};
157 
158  RunID const firstRunID{haveFirstRun ? RunID{firstRun} : RunID::firstRun()};
159  SubRunID const firstSubRunID{haveFirstSubRun ?
160  SubRunID{firstRunID.run(), firstSubRun} :
161  SubRunID::firstSubRun(firstRunID)};
162 
163  origEventID_ = haveFirstEvent ? EventID{firstSubRunID, firstEvent} :
164  EventID::firstEvent(firstSubRunID);
165 
166  if (noEventSort_ && haveFirstEvent) {
168  << "Illegal configuration options passed to RootInput\n"
169  << "You cannot request \"noEventSort\" and also set \"firstEvent\".\n";
170  }
171  if (primary()) {
172  duplicateChecker_ = std::make_shared<DuplicateChecker>(config().dc);
173  }
174  if (pendingClose_) {
176  << "RootInputFileSequence looking for next file with a pending close!";
177  }
178 
179  while (catalog_.getNextFile()) {
181  if (rootFile_) {
182  // We found one, good, stop now.
183  break;
184  }
185  }
186 
187  if (!rootFile_) {
188  // We could not open any input files, stop.
189  return;
190  }
191  if (config().setRunNumber(setRun_)) {
192  try {
193  forcedRunOffset_ = rootFile_->setForcedRunOffset(setRun_);
194  }
195  catch (art::Exception& e) {
196  if (e.categoryCode() == errors::InvalidNumber) {
198  << "setRunNumber " << setRun_
199  << " does not correspond to a valid run number in ["
200  << RunID::firstRun().run() << ", " << RunID::maxRun().run()
201  << "]\n";
202  } else {
203  throw; // Rethrow.
204  }
205  }
206  if (forcedRunOffset_ < 0) {
208  << "The value of the 'setRunNumber' parameter must not be\n"
209  << "less than the first run number in the first input file.\n"
210  << "'setRunNumber' was " << setRun_ << ", while the first run was "
211  << setRun_ - forcedRunOffset_ << ".\n";
212  }
213  }
214  if (!readParameterSets_) {
215  mf::LogWarning("PROVENANCE")
216  << "Source parameter readParameterSets was set to false: parameter set "
217  "provenance\n"
218  << "will NOT be available in this or subsequent jobs using output from "
219  "this job.\n"
220  << "Check your experiment's policy on this issue to avoid future "
221  "problems\n"
222  << "with analysis reproducibility.\n";
223  }
224  if (compactSubRunRanges_) {
225  mf::LogWarning("PROVENANCE")
226  << "Source parameter compactEventRanges was set to true: enabling "
227  "compact event ranges\n"
228  << "creates a history that can cause file concatenation problems if a "
229  "given SubRun spans\n"
230  << "multiple input files. Use with care.\n";
231  }
232  }
233 
234  EventID
236  {
237  // Attempt to find event in currently open input file.
238  bool found = rootFile_->setEntry_Event(eID, true);
239  // found in the current file
240  if (found) {
241  return rootFile_->eventIDForFileIndexPosition();
242  }
243  // fail if not searchable
244  if (!catalog_.isSearchable()) {
245  return EventID();
246  }
247  // Look for event in files previously opened without reopening unnecessary
248  // files.
249  for (auto itBegin = fileIndexes_.cbegin(),
250  itEnd = fileIndexes_.cend(),
251  it = itBegin;
252  (!found) && it != itEnd;
253  ++it) {
254  if (*it && (*it)->contains(eID, exact)) {
255  // We found it. Close the currently open file, and open the correct one.
256  catalog_.rewindTo(std::distance(itBegin, it));
257  initFile(/*skipBadFiles=*/false);
258  // Now get the event from the correct file.
259  found = rootFile_->setEntry_Event(eID, exact);
260  assert(found);
261  seekingFile_ = true;
262  }
263  }
264  // Look for event in files not yet opened.
265  while (catalog_.getNextFile()) {
266  initFile(/*skipBadFiles=*/false);
267  found = rootFile_->setEntry_Event(eID, exact);
268  if (found) {
269  seekingFile_ = true;
270  return rootFile_->eventIDForFileIndexPosition();
271  }
272  }
273  return EventID();
274  }
275 
276  EventID
278  {
279  skip(offset);
280  return rootFile_->eventIDForFileIndexPosition();
281  }
282 
283  vector<FileCatalogItem> const&
285  {
286  return catalog_.fileCatalogItems();
287  }
288 
289  void
291  {
292  closeFile_();
293  }
294 
295  std::unique_ptr<FileBlock>
297  {
298  std::unique_ptr<FileBlock> result;
299  if (firstFile_ && rootFile_) {
300  firstFile_ = false;
301  result = rootFile_->createFileBlock();
302  return result;
303  }
304  if (firstFile_) {
305  // We are at the first file in the sequence of files.
306  firstFile_ = false;
307  while (catalog_.getNextFile()) {
309  if (rootFile_) {
310  // We found one, good, stop now.
311  break;
312  }
313  }
314  if (!rootFile_) {
315  // We could not open any input files, stop.
316  return result;
317  }
318  if (setRun_) {
319  try {
320  forcedRunOffset_ = rootFile_->setForcedRunOffset(setRun_);
321  }
322  catch (art::Exception& e) {
323  if (e.categoryCode() == errors::InvalidNumber) {
325  << "setRunNumber " << setRun_
326  << " does not correspond to a valid run number in ["
327  << RunID::firstRun().run() << ", " << RunID::maxRun().run()
328  << "]\n";
329  } else {
330  throw; // Rethrow.
331  }
332  }
333  if (forcedRunOffset_ < 0) {
335  << "The value of the 'setRunNumber' parameter must not be\n"
336  << "less than the first run number in the first input file.\n"
337  << "'setRunNumber' was " << setRun_ << ", while the first run was "
338  << setRun_ - forcedRunOffset_ << ".\n";
339  }
340  }
341  } else if (seekingFile_) {
342  seekingFile_ = false;
343  initFile(/*skipBadFiles=*/false);
344  } else if (!nextFile()) {
345  // FIXME: Turn this into a throw!
346  assert(false);
347  }
348  if (!rootFile_) {
349  return result;
350  }
351  result = rootFile_->createFileBlock();
352  return result;
353  }
354 
355  void
357  {
358  if (!rootFile_) {
359  return;
360  }
361  // Account for events skipped in the file.
362  eventsToSkip_ = rootFile_->eventsToSkip();
363  rootFile_->close(primary());
364  detail::logFileAction("Closed input file ", rootFile_->fileName());
365  rootFile_.reset();
366  if (duplicateChecker_.get() != nullptr) {
367  duplicateChecker_->inputFileClosed();
368  }
369  }
370 
371  void
373  {
374  pendingClose_ = true;
375  }
376 
377  void
378  RootInputFileSequence::initFile(bool const skipBadFiles)
379  {
380  // close the currently open file, any, and delete the RootInputFile object.
381  closeFile_();
382  std::unique_ptr<TFile> filePtr;
383  try {
384  detail::logFileAction("Initiating request to open input file ",
386  filePtr.reset(TFile::Open(catalog_.currentFile().fileName().c_str()));
387  }
388  catch (cet::exception e) {
389  if (!skipBadFiles) {
391  << e.explain_self()
392  << "\nRootInputFileSequence::initFile(): Input file "
394  << " was not found or could not be opened.\n";
395  }
396  }
397  if (!filePtr || filePtr->IsZombie()) {
398  if (!skipBadFiles) {
400  << "RootInputFileSequence::initFile(): Input file "
402  << " was not found or could not be opened.\n";
403  }
404  mf::LogWarning("")
405  << "Input file: " << catalog_.currentFile().fileName()
406  << " was not found or could not be opened, and will be skipped.\n";
407  return;
408  }
409  detail::logFileAction("Opened input file ",
411  vector<string> empty_vs;
412  rootFile_ = make_shared<RootInputFile>(
414  catalog_.url(),
417  std::move(filePtr),
418  origEventID_,
430  noEventSort_,
435  /*primaryFile*/ exempt_ptr<RootInputFile>{nullptr},
436  secondaryFileNames_.empty() ?
437  empty_vs :
439  this,
441 
443  if (catalog_.currentIndex() + 1 > fileIndexes_.size()) {
444  fileIndexes_.resize(catalog_.currentIndex() + 1);
445  }
446  fileIndexes_[catalog_.currentIndex()] = rootFile_->fileIndexSharedPtr();
447  }
448 
449  std::unique_ptr<RootInputFile>
451  string const& name,
452  exempt_ptr<RootInputFile> primaryFile)
453  {
454  std::unique_ptr<TFile> filePtr;
455  try {
456  detail::logFileAction("Attempting to open secondary input file ", name);
457  filePtr.reset(TFile::Open(name.c_str()));
458  }
459  catch (cet::exception e) {
461  << e.explain_self()
462  << "\nRootInputFileSequence::openSecondaryFile(): Input file " << name
463  << " was not found or could not be opened.\n";
464  }
465  if (!filePtr || filePtr->IsZombie()) {
467  << "RootInputFileSequence::openSecondaryFile(): Input file " << name
468  << " was not found or could not be opened.\n";
469  }
470  detail::logFileAction("Opened secondary input file ", name);
471  vector<string> empty_secondary_filenames;
472  return std::make_unique<RootInputFile>(name,
473  /*url*/ "",
475  /*logicalFileName*/ "",
476  std::move(filePtr),
477  origEventID_,
489  noEventSort_,
491  /*duplicateChecker_*/ nullptr,
494  primaryFile,
495  empty_secondary_filenames,
496  this,
498  }
499 
500  bool
502  {
503  if (!catalog_.getNextFile()) {
504  // no more files
505  return false;
506  }
508  return true;
509  }
510 
511  bool
513  {
514  // no going back for non-persistent files
515  if (!catalog_.isSearchable()) {
516  return false;
517  }
518  // no file in the catalog
520  return false;
521  }
522  // first file in the catalog, move to the last file in the list
523  if (catalog_.currentIndex() == 0) {
524  return false;
525  } else {
527  }
528  initFile(/*skipBadFiles=*/false);
529  if (rootFile_) {
530  rootFile_->setToLastEntry();
531  }
532  return true;
533  }
534 
535  void
536  RootInputFileSequence::readIt(EventID const& id, bool exact)
537  {
538  // Attempt to find event in currently open input file.
539  bool found = rootFile_->setEntry_Event(id, exact);
540  if (found) {
542  return;
543  }
544  if (!catalog_.isSearchable()) {
545  return;
546  }
547  // Look for event in cached files
548  for (auto IB = fileIndexes_.cbegin(), IE = fileIndexes_.cend(), I = IB;
549  I != IE;
550  ++I) {
551  if (*I && (*I)->contains(id, exact)) {
552  // We found it. Close the currently open file, and open the correct one.
553  catalog_.rewindTo(std::distance(IB, I));
554  initFile(/*skipBadFiles=*/false);
555  found = rootFile_->setEntry_Event(id, exact);
556  assert(found);
558  return;
559  }
560  }
561  // Look for event in files not yet opened.
562  while (catalog_.getNextFile()) {
563  initFile(/*skipBadFiles=*/false);
564  found = rootFile_->setEntry_Event(id, exact);
565  if (found) {
567  return;
568  }
569  }
570  // Not found
571  return;
572  }
573 
574  unique_ptr<EventPrincipal>
576  {
577  // Create and setup the EventPrincipal.
578  //
579  // 1. create an EventPrincipal with a unique EventID
580  // 2. For each entry in the provenance, put in one Group,
581  // holding the Provenance for the corresponding EDProduct.
582  // 3. set up the caches in the EventPrincipal to know about this
583  // Group.
584  //
585  // We do *not* create the EDProduct instance (the equivalent of reading
586  // the branch containing this EDProduct. That will be done by the
587  // Delayed Reader when it is asked to do so.
588  //
590  return rootFile_->readEvent();
591  }
592 
593  std::unique_ptr<RangeSetHandler>
595  {
596  return rootFile_->runRangeSetHandler();
597  }
598 
599  std::unique_ptr<RangeSetHandler>
601  {
602  return rootFile_->subRunRangeSetHandler();
603  }
604 
605  void
607  {
608  // Attempt to find subRun in currently open input file.
609  bool found = rootFile_->setEntry_SubRun(id);
610  if (found) {
611  return;
612  }
613  if (!catalog_.isSearchable()) {
614  return;
615  }
616  // Look for event in cached files
617  for (auto itBegin = fileIndexes_.begin(),
618  itEnd = fileIndexes_.end(),
619  it = itBegin;
620  it != itEnd;
621  ++it) {
622  if (*it && (*it)->contains(id, true)) {
623  // We found it. Close the currently open file, and open the correct one.
624  catalog_.rewindTo(std::distance(itBegin, it));
625  initFile(/*skipBadFiles=*/false);
626  found = rootFile_->setEntry_SubRun(id);
627  assert(found);
628  return;
629  }
630  }
631  // Look for subRun in files not yet opened.
632  while (catalog_.getNextFile()) {
633  initFile(/*skipBadFiles=*/false);
634  found = rootFile_->setEntry_SubRun(id);
635  if (found) {
636  return;
637  }
638  }
639  // not found
640  return;
641  }
642 
643  std::unique_ptr<SubRunPrincipal>
645  {
646  return rootFile_->readSubRun(rp);
647  }
648 
649  void
651  {
652  // Attempt to find run in current file.
653  bool found = rootFile_->setEntry_Run(id);
654  if (found) {
655  // Got it, done.
656  return;
657  }
658  if (!catalog_.isSearchable()) {
659  // Cannot random access files, give up.
660  return;
661  }
662  // Look for the run in the opened files.
663  for (auto B = fileIndexes_.cbegin(), E = fileIndexes_.cend(), I = B; I != E;
664  ++I) {
665  if (*I && (*I)->contains(id, true)) {
666  // We found it, open the file.
667  catalog_.rewindTo(std::distance(B, I));
668  initFile(/*skipBadFiles=*/false);
669  found = rootFile_->setEntry_Run(id);
670  assert(found);
671  return;
672  }
673  }
674  // Look for run in files not yet opened.
675  while (catalog_.getNextFile()) {
676  initFile(/*skipBadFiles=*/false);
677  found = rootFile_->setEntry_Run(id);
678  if (found) {
679  return;
680  }
681  }
682  // Not found.
683  return;
684  }
685 
686  std::unique_ptr<RunPrincipal>
688  {
689  return rootFile_->readRun();
690  }
691 
694  {
695  if (firstFile_) {
696  return input::IsFile;
697  }
698  if (rootFile_) {
699  auto const entryType = rootFile_->getNextEntryTypeWanted();
700  if (entryType == FileIndex::kEvent) {
701  return input::IsEvent;
702  } else if (entryType == FileIndex::kSubRun) {
703  return input::IsSubRun;
704  } else if (entryType == FileIndex::kRun) {
705  return input::IsRun;
706  }
707  assert(entryType == FileIndex::kEnd);
708  }
709  // now we are either at the end of a root file
710  // or the current file is not a root file
711  if (!catalog_.hasNextFile()) {
712  return input::IsStop;
713  }
714  return input::IsFile;
715  }
716 
717  // Rewind to before the first event that was read.
718  void
720  {
721  if (!catalog_.isSearchable()) {
723  << "RootInputFileSequence::rewind_() "
724  << "cannot rollback on non-searchable file catalogs.";
725  }
726  firstFile_ = true;
727  catalog_.rewind();
728  if (duplicateChecker_.get() != nullptr) {
729  duplicateChecker_->rewind();
730  }
731  }
732 
733  // Rewind to the beginning of the current file
734  void
736  {
737  rootFile_->rewind();
738  }
739 
740  // Advance "offset" events. Offset can be positive or negative (or zero).
741  void
743  {
744  while (offset != 0) {
745  offset = rootFile_->skipEvents(offset);
746  if (offset > 0 && !nextFile()) {
747  return;
748  }
749  if (offset < 0 && !previousFile()) {
750  return;
751  }
752  }
753  rootFile_->skipEvents(0);
754  }
755 
756  bool
758  {
759  return true;
760  }
761 
762  ProcessConfiguration const&
764  {
765  return processConfiguration_;
766  }
767 
768 } // namespace art
size_t currentIndex() const
std::unique_ptr< RangeSetHandler > runRangeSetHandler()
bool hasNextFile(int attempts=5)
std::vector< std::shared_ptr< FileIndex > > fileIndexes_
ProcessConfiguration const & processConfiguration_
static constexpr size_t indexEnd
std::vector< FileCatalogItem > const & fileCatalogItems() const
bool getNextFile(int attempts=5)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler()
std::unique_ptr< RunPrincipal > readRun_()
std::unique_ptr< RootInputFile > openSecondaryFile(std::string const &name, cet::exempt_ptr< RootInputFile > primaryFile)
void logFileAction(const char *msg, std::string const &file)
Definition: logFileAction.cc:9
STL namespace.
std::string & url()
Definition: FileCatalog.h:98
std::vector< FileCatalogItem > const & fileCatalogItems() const
std::string const & logicalFileName() const
Definition: FileCatalog.h:30
ProcessConfiguration const & processConfiguration() const
std::shared_ptr< DuplicateChecker > duplicateChecker_
RunNumber_t run() const
Definition: RunID.h:65
EventID seekToEvent(EventID const &, bool exact=false)
std::vector< std::vector< std::string > > const & secondaryFileNames() const
FileCatalogItem const & currentFile() const
const double e
intermediate_table::const_iterator const_iterator
GroupSelectorRules groupSelectorRules_
static RunID maxRun()
Definition: RunID.h:111
const double a
RunNumber_t run() const
Definition: SubRunID.h:84
FastCloningInfoProvider fastCloningInfo_
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
std::unique_ptr< SubRunPrincipal > readSubRun_(cet::exempt_ptr< RunPrincipal const >)
RootInputFileSharedPtr rootFile_
static SubRunID firstSubRun()
Definition: SubRunID.h:152
void rewindTo(size_t index)
std::unique_ptr< FileBlock > readFile_()
std::vector< std::vector< std::string > > secondaryFileNames_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
InputSource::ProcessingMode processingMode_
std::unique_ptr< EventPrincipal > readEvent_()
void initFile(bool skipBadFiles)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
static RunID firstRun()
Definition: RunID.h:117
static Globals * instance()
Definition: Globals.cc:26
std::vector< std::string > const & fileSources() const
static EventID firstEvent()
Definition: EventID.h:191
RootInputFileSharedPtr rootFileForLastReadEvent_
std::string const & fileName() const
Definition: FileCatalog.h:25
UpdateOutputCallbacks & outputCallbacks_
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:120