RootOutputFile.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
23 #include "boost/date_time/posix_time/posix_time.hpp"
40 #include "canvas_root_io/Utilities/DictionaryChecker.h"
43 #include "cetlib/exempt_ptr.h"
44 #include "cetlib/sqlite/Ntuple.h"
47 #include "cetlib/sqlite/exec.h"
48 #include "cetlib/sqlite/insert.h"
49 #include "fhiclcpp/ParameterSet.h"
52 #include "hep_concurrency/RecursiveMutex.h"
53 
54 #include "Rtypes.h"
55 #include "TBranchElement.h"
56 #include "TClass.h"
57 #include "TFile.h"
58 #include "TTree.h"
59 
60 #include <algorithm>
61 #include <utility>
62 #include <vector>
63 
64 using namespace cet;
65 using namespace std;
66 using namespace hep::concurrency;
67 
68 using art::BranchType;
71 
72 namespace {
73 
74  void
75  create_table(sqlite3* const db,
76  string const& name,
77  vector<string> const& columns,
78  string const& suffix = {})
79  {
80  if (columns.empty())
82  << "Number of sqlite columns specified for table: " << name << '\n'
83  << "is zero.\n";
84  string ddl = "DROP TABLE IF EXISTS " + name +
85  "; "
86  "CREATE TABLE " +
87  name + "(" + columns.front();
88  for_each(columns.begin() + 1, columns.end(), [&ddl](auto const& col) {
89  ddl += "," + col;
90  });
91  ddl += ") ";
92  ddl += suffix;
93  ddl += ";";
94  sqlite::exec(db, ddl);
95  }
96 
97  void
98  insert_eventRanges_row(sqlite3_stmt* stmt,
99  art::SubRunNumber_t const sr,
100  art::EventNumber_t const b,
101  art::EventNumber_t const e)
102  {
103  sqlite3_bind_int64(stmt, 1, sr);
104  sqlite3_bind_int64(stmt, 2, b);
105  sqlite3_bind_int64(stmt, 3, e);
106  sqlite3_step(stmt);
107  sqlite3_reset(stmt);
108  }
109 
110  void
111  insert_rangeSets_eventSets_row(sqlite3_stmt* stmt,
112  unsigned const rsid,
113  unsigned const esid)
114  {
115  sqlite3_bind_int64(stmt, 1, rsid);
116  sqlite3_bind_int64(stmt, 2, esid);
117  sqlite3_step(stmt);
118  sqlite3_reset(stmt);
119  }
120 
121  unsigned
122  getNewRangeSetID(sqlite3* db,
123  art::BranchType const bt,
124  art::RunNumber_t const r)
125  {
126  sqlite::insert_into(db, art::BranchTypeToString(bt) + "RangeSets")
127  .values(r);
128  return sqlite3_last_insert_rowid(db);
129  }
130 
131  vector<unsigned>
132  getExistingRangeSetIDs(sqlite3* db, art::RangeSet const& rs)
133  {
134  vector<unsigned> rangeSetIDs;
135  cet::transform_all(rs, back_inserter(rangeSetIDs), [db](auto const& range) {
137  r << sqlite::select("ROWID")
138  .from(db, "EventRanges")
139  .where("SubRun=" + to_string(range.subRun()) +
140  " AND "
141  "begin=" +
142  to_string(range.begin()) +
143  " AND "
144  "end=" +
145  to_string(range.end()));
146  return unique_value(r);
147  });
148  return rangeSetIDs;
149  }
150 
151  void
152  insertIntoEventRanges(sqlite3* db, art::RangeSet const& rs)
153  {
154  sqlite::Transaction txn{db};
155  sqlite3_stmt* stmt{nullptr};
156  string const ddl{"INSERT INTO EventRanges(SubRun, begin, end) "
157  "VALUES(?, ?, ?);"};
158  sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
159  for (auto const& range : rs) {
160  insert_eventRanges_row(stmt, range.subRun(), range.begin(), range.end());
161  }
162  sqlite3_finalize(stmt);
163  txn.commit();
164  }
165 
166  void
167  insertIntoJoinTable(sqlite3* db,
168  art::BranchType const bt,
169  unsigned const rsID,
170  vector<unsigned> const& eventRangesIDs)
171  {
172  sqlite::Transaction txn{db};
173  sqlite3_stmt* stmt{nullptr};
174  string const ddl{
175  "INSERT INTO " + art::BranchTypeToString(bt) +
176  "RangeSets_EventRanges(RangeSetsID, EventRangesID) Values(?,?);"};
177  sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
178  cet::for_all(eventRangesIDs, [stmt, rsID](auto const eventRangeID) {
179  insert_rangeSets_eventSets_row(stmt, rsID, eventRangeID);
180  });
181  sqlite3_finalize(stmt);
182  txn.commit();
183  }
184 
185  void
186  maybeInvalidateRangeSet(BranchType const bt,
187  art::RangeSet const& principalRS,
188  art::RangeSet& productRS)
189  {
190  assert(principalRS.is_sorted());
191  assert(productRS.is_sorted());
192  if (!productRS.is_valid()) {
193  return;
194  }
195  if (bt == art::InRun && productRS.is_full_run()) {
196  return;
197  }
198  if (bt == art::InSubRun && productRS.is_full_subRun()) {
199  return;
200  }
201  if (productRS.ranges().empty()) {
202  return;
203  }
204  auto const r = productRS.run();
205  auto const& productFront = productRS.ranges().front();
206  if (!principalRS.contains(r, productFront.subRun(), productFront.begin())) {
207  productRS = art::RangeSet::invalid();
208  }
209  }
210 
212 
213  // The purpose of 'maybeInvalidateRangeSet' is to support the
214  // following situation. Suppose process 1 creates three files with
215  // one Run product each, all corresponding to the same Run. Let's
216  // call the individual Run product instances in the three separate
217  // files as A, B, and C. Now suppose that the three files serve as
218  // inputs to process 2, where a concatenation is being performed AND
219  // ALSO an output file switch. Process 2 results in two output
220  // files, and now, in process 3, we concatenate the outputs from
221  // process 2. The situation would look like this:
222  //
223  // Process 1: [A] [B] [C]
224  // \ / \ /
225  // Process 2: [A + B] [B + C]
226  // \ / \ /
227  // D=agg(A,B) | | E=agg(B,C)
228  // \ /
229  // Process 3: [D + E]
230  //
231  // Notice the complication in process 3: product 'B' will be
232  // aggregated twice: once with A, and once with C. Whenever the
233  // output from process 3 is read as input to another process, the
234  // fetched product will be equivalent to A+2B+C.
235  //
236  // To avoid this situation, we compare the RangeSet of the product
237  // with the RangeSet of the in-memory RunAuxiliary. If the
238  // beginning of B's RangeSet is not contained within the auxiliary's
239  // RangeSet, then a dummy product with an invalid RangeSet is
240  // written to disk. Instead of the diagram above, we have:
241  //
242  // Process 1: [A] [B] [C]
243  // \ / \ /
244  // Process 2: [A + B] [x + C]
245  // \ / \ /
246  // D=agg(A,B) | | E=agg(x,C)=C
247  // \ /
248  // Process 3: [D + E]
249  //
250  // where 'x' represent a dummy product. Upon aggregating D and E,
251  // we obtain the correctly formed A+B+C product.
252  template <BranchType BT>
254  getRangeSet(art::OutputHandle const& oh,
255  art::RangeSet const& principalRS,
256  bool const producedInThisProcess)
257  {
258  auto rs = oh.isValid() ? oh.rangeOfValidity() : art::RangeSet::invalid();
259  // Because a user can specify (e.g.):
260  // r.put(move(myProd), art::runFragment(myRangeSet));
261  // products that are produced in this process can have valid, yet
262  // arbitrary RangeSets. We therefore never invalidate a RangeSet
263  // that corresponds to a product produced in this process.
264  //
265  // It is possible for a user to specify a RangeSet which does not
266  // correspond AT ALL to the in-memory auxiliary RangeSet. In that
267  // case, users should not expect to be able to retrieve products
268  // for which no corresponding events or sub-runs were processed.
269  if (!producedInThisProcess) {
270  maybeInvalidateRangeSet(BT, principalRS, rs);
271  }
272  return rs;
273  }
274 
275  template <BranchType BT>
277  getRangeSet(art::OutputHandle const&,
278  art::RangeSet const& /*principalRS*/,
279  bool const /*producedInThisProcess*/)
280  {
281  return art::RangeSet::invalid();
282  }
283 
284  template <BranchType BT>
286  setProductRangeSetID(art::RangeSet const& /*rs*/,
287  sqlite3*,
289  map<unsigned, unsigned>& /*checksumToIndexLookup*/)
290  {}
291 
292  template <BranchType BT>
294  setProductRangeSetID(art::RangeSet const& rs,
295  sqlite3* db,
296  art::EDProduct* product,
297  map<unsigned, unsigned>& checksumToIndexLookup)
298  {
299  if (!rs.is_valid()) { // Invalid range-sets not written to DB
300  return;
301  }
302  // Set range sets for SubRun and Run products
303  auto it = checksumToIndexLookup.find(rs.checksum());
304  if (it != checksumToIndexLookup.cend()) {
305  product->setRangeSetID(it->second);
306  } else {
307  unsigned const rsID = getNewRangeSetID(db, BT, rs.run());
308  product->setRangeSetID(rsID);
309  checksumToIndexLookup.emplace(rs.checksum(), rsID);
310  insertIntoEventRanges(db, rs);
311  auto const& eventRangesIDs = getExistingRangeSetIDs(db, rs);
312  insertIntoJoinTable(db, BT, rsID, eventRangesIDs);
313  }
314  }
315 
316  bool
317  maxCriterionSpecified(art::ClosingCriteria const& cc)
318  {
319  auto fp = mem_fn(&art::ClosingCriteria::fileProperties);
320  return (fp(cc).nEvents() !=
322  (fp(cc).nSubRuns() !=
324  (fp(cc).nRuns() != art::ClosingCriteria::Defaults::unsigned_max()) ||
325  (fp(cc).size() != art::ClosingCriteria::Defaults::size_max()) ||
326  (fp(cc).age().count() !=
328  }
329 
330 } // unnamed namespace
331 
332 namespace art {
333 
334  RootOutputFile::OutputItem::~OutputItem() = default;
335 
336  RootOutputFile::OutputItem::OutputItem(BranchDescription const& bd)
337  : branchDescription_{bd}, product_{nullptr}
338  {}
339 
340  string const&
342  {
344  }
345 
346  bool
348  {
350  }
351 
352  // Part of static interface.
353  bool
354  RootOutputFile::shouldFastClone(bool const fastCloningSet,
355  bool const fastCloning,
356  bool const wantAllEvents,
357  ClosingCriteria const& cc)
358  {
359  bool result = fastCloning;
360  mf::LogInfo("FastCloning")
361  << "Initial fast cloning configuration "
362  << (fastCloningSet ? "(user-set): " : "(from default): ") << boolalpha
363  << fastCloning;
364  if (fastCloning && !wantAllEvents) {
365  result = false;
366  mf::LogWarning("FastCloning")
367  << "Fast cloning deactivated due to presence of\n"
368  << "event selection configuration.";
369  }
370  if (fastCloning && maxCriterionSpecified(cc) &&
372  result = false;
373  mf::LogWarning("FastCloning")
374  << "Fast cloning deactivated due to request to allow\n"
375  << "output file switching at an Event, SubRun, or Run boundary.";
376  }
377  return result;
378  }
379 
381 
383  string const& fileName,
384  ClosingCriteria const& fileSwitchCriteria,
385  int const compressionLevel,
386  int64_t const saveMemoryObjectThreshold,
387  int64_t const treeMaxVirtualSize,
388  int const splitLevel,
389  int const basketSize,
390  DropMetaData dropMetaData,
391  bool const dropMetaDataForDroppedData,
392  bool const fastCloningRequested)
393  : mutex_{"RootOutputFile::mutex_"}
394  , compressionLevel_{compressionLevel}
395  , saveMemoryObjectThreshold_{saveMemoryObjectThreshold}
396  , treeMaxVirtualSize_{treeMaxVirtualSize}
397  , splitLevel_{splitLevel}
398  , basketSize_{basketSize}
399  , dropMetaData_{dropMetaData}
402  {
403  om_ = om;
404  file_ = fileName;
405  fileSwitchCriteria_ = fileSwitchCriteria;
407  dropMetaDataForDroppedData_ = dropMetaDataForDroppedData;
408  fastCloningEnabledAtConstruction_ = fastCloningRequested;
409  wasFastCloned_ = false;
410  filePtr_.reset(
411  TFile::Open(file_.c_str(), "recreate", "", compressionLevel));
412  // Don't split metadata tree or event description tree
419  // Create the tree that will carry (event) History objects.
421  filePtr_.get(), rootNames::eventHistoryTreeName(), splitLevel);
422  if (!eventHistoryTree_) {
424  << "Failed to create the tree for History objects\n";
425  }
426  pEventAux_ = nullptr;
427  pSubRunAux_ = nullptr;
428  pRunAux_ = nullptr;
429  pResultsAux_ = nullptr;
434  pHistory_ = new History;
435  if (!eventHistoryTree_->Branch(rootNames::eventHistoryBranchName().c_str(),
436  &pHistory_,
437  basketSize,
438  0)) {
440  << "Failed to create a branch for History in the output file\n";
441  }
442  delete pHistory_;
443  pHistory_ = nullptr;
444  treePointers_[0] =
445  make_unique<RootOutputTree>(filePtr_.get(),
446  InEvent,
447  pEventAux_,
449  basketSize,
450  splitLevel,
451  treeMaxVirtualSize,
452  saveMemoryObjectThreshold);
453  treePointers_[1] =
454  make_unique<RootOutputTree>(filePtr_.get(),
455  InSubRun,
456  pSubRunAux_,
458  basketSize,
459  splitLevel,
460  treeMaxVirtualSize,
461  saveMemoryObjectThreshold);
462  treePointers_[2] = make_unique<RootOutputTree>(filePtr_.get(),
463  InRun,
464  pRunAux_,
466  basketSize,
467  splitLevel,
468  treeMaxVirtualSize,
469  saveMemoryObjectThreshold);
470  treePointers_[3] =
471  make_unique<RootOutputTree>(filePtr_.get(),
472  InResults,
473  pResultsAux_,
475  basketSize,
476  splitLevel,
477  treeMaxVirtualSize,
478  saveMemoryObjectThreshold);
479  dataTypeReported_ = false;
480  rootFileDB_.reset(
482  "RootFileDB",
483  filePtr_.get(),
484  SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE));
485  subRunRSID_ = -1U;
486  runRSID_ = -1U;
488  // Check that dictionaries for the auxiliaries exist
489  root::DictionaryChecker checker;
490  checker.checkDictionaries<EventAuxiliary>();
491  checker.checkDictionaries<SubRunAuxiliary>();
492  checker.checkDictionaries<RunAuxiliary>();
493  checker.checkDictionaries<ResultsAuxiliary>();
494  checker.reportMissingDictionaries();
495  // Event ranges
497  "EventRanges",
498  {"SubRun INTEGER",
499  "begin INTEGER",
500  "end INTEGER",
501  "UNIQUE (SubRun,begin,end) ON CONFLICT IGNORE"});
502  // SubRun range sets
503  using namespace cet::sqlite;
504  create_table(*rootFileDB_, "SubRunRangeSets", column<int>{"Run"});
506  "SubRunRangeSets_EventRanges",
507  {"RangeSetsID INTEGER",
508  "EventRangesID INTEGER",
509  "PRIMARY KEY(RangeSetsID,EventRangesID)"},
510  "WITHOUT ROWID");
511  // Run range sets
512  create_table(*rootFileDB_, "RunRangeSets", column<int>{"Run"});
514  "RunRangeSets_EventRanges",
515  {"RangeSetsID INTEGER",
516  "EventRangesID INTEGER",
517  "PRIMARY KEY(RangeSetsID,EventRangesID)"},
518  "WITHOUT ROWID");
519  }
520 
521  void
523  {
524  RecursiveMutexSentry sentry{mutex_, __func__};
525  status_ = ofs;
526  }
527 
528  string const&
530  {
531  RecursiveMutexSentry sentry{mutex_, __func__};
532  return file_;
533  }
534 
535  void
537  {
538  RecursiveMutexSentry sentry{mutex_, __func__};
539  auto selectProductsToWrite = [this](BranchType const bt) {
540  auto& items = selectedOutputItemList_[bt];
541  for (auto const& pr : om_->keptProducts()[bt]) {
542  auto const& pd = pr.second;
543  // Persist Results products only if they have been produced by
544  // the current process.
545  if (bt == InResults && !pd.produced()) {
546  continue;
547  }
548  checkDictionaries(pd);
549  // Although the transient flag is already checked when
550  // OutputModule::doSelectProducts is called, it can be flipped
551  // to 'true' after the BranchDescription transients have been
552  // fluffed, which happens during the checkDictionaries call.
553  if (pd.transient()) {
554  continue;
555  }
556  items.emplace(pd);
557  }
558  for (auto const& val : items) {
559  treePointers_[bt]->addOutputBranch(val.branchDescription_,
560  val.product_);
561  }
562  };
563  for_each_branch_type(selectProductsToWrite);
564  }
565 
566  void
568  bool const fastCloneFromOutputModule)
569  {
570  RecursiveMutexSentry sentry{mutex_, __func__};
571  // FIXME: the logic here is nasty.
573  fastCloneFromOutputModule && rfb};
574  // Create output branches, and then redo calculation to determine if
575  // fast cloning should be done.
576  selectProducts();
577  if (shouldFastClone &&
578  !treePointers_[InEvent]->checkSplitLevelAndBasketSize(rfb->tree())) {
579  mf::LogWarning("FastCloning")
580  << "Fast cloning deactivated for this input file due to "
581  << "splitting level and/or basket size.";
582  shouldFastClone = false;
583  } else if (rfb && rfb->tree() &&
584  rfb->tree()->GetCurrentFile()->GetVersion() < 60001) {
585  mf::LogWarning("FastCloning")
586  << "Fast cloning deactivated for this input file due to "
587  << "ROOT version used to write it (< 6.00/01)\n"
588  "having a different splitting policy.";
589  shouldFastClone = false;
590  }
591  if (shouldFastClone && rfb->fileFormatVersion().value_ < 10) {
592  mf::LogWarning("FastCloning")
593  << "Fast cloning deactivated for this input file due to "
594  << "reading in file that has a different ProductID schema.";
595  shouldFastClone = false;
596  }
598  mf::LogWarning("FastCloning")
599  << "Fast cloning reactivated for this input file.";
600  }
601  treePointers_[InEvent]->beginInputFile(shouldFastClone);
602  auto tree = (rfb && rfb->tree()) ? rfb->tree() : nullptr;
603  wasFastCloned_ = treePointers_[InEvent]->fastCloneTree(tree);
604  }
605 
606  void
608  {
609  RecursiveMutexSentry sentry{mutex_, __func__};
611  }
612 
613  void
615  {
616  RecursiveMutexSentry sentry{mutex_, __func__};
617  cet::for_all(treePointers_, [](auto const& p) { p->setEntries(); });
618  }
619 
620  bool
622  {
623  RecursiveMutexSentry sentry{mutex_, __func__};
624  using namespace chrono;
625  unsigned int constexpr oneK{1024u};
626  fp_.updateSize(filePtr_->GetSize() / oneK);
627  fp_.updateAge(duration_cast<seconds>(steady_clock::now() - beginTime_));
629  }
630 
631  void
633  {
634  RecursiveMutexSentry sentry{mutex_, __func__};
635  // Auxiliary branch.
636  // Note: pEventAux_ must be set before calling fillBranches
637  // since it gets written out in that routine.
638  pEventAux_ = &e.eventAux();
639  // Because getting the data may cause an exception to be
640  // thrown we want to do that first before writing anything
641  // to the file about this event.
642  fillBranches<InEvent>(e, pEventProductProvenanceVector_);
643  // History branch.
644  History historyForOutput{e.history()};
645  historyForOutput.addEventSelectionEntry(om_->selectorConfig());
646  pHistory_ = &historyForOutput;
647  int sz = eventHistoryTree_->Fill();
648  if (sz <= 0) {
650  << "Failed to fill the History tree for event: " << e.eventID()
651  << "\nTTree::Fill() returned " << sz << " bytes written." << endl;
652  }
653  // Add the dataType to the job report if it hasn't already been done
654  if (!dataTypeReported_) {
655  string dataType{"MC"};
656  if (pEventAux_->isRealData()) {
657  dataType = "Data";
658  }
659  dataTypeReported_ = true;
660  }
661  pHistory_ = &e.history();
662  // Add event to index
664  fp_.update_event();
665  }
666 
667  void
669  {
670  RecursiveMutexSentry sentry{mutex_, __func__};
671  pSubRunAux_ = &sr.subRunAux();
673  fillBranches<InSubRun>(sr, pSubRunProductProvenanceVector_);
677  }
678 
679  void
681  {
682  RecursiveMutexSentry sentry{mutex_, __func__};
683  pRunAux_ = &r.runAux();
685  fillBranches<InRun>(r, pRunProductProvenanceVector_);
687  fp_.runEntryNumber());
689  }
690 
691  void
693  {
694  RecursiveMutexSentry sentry{mutex_, __func__};
695  auto pid = root::getObjectRequireDict<ParentageID>();
696  ParentageID const* hash = &pid;
697  if (!parentageTree_->Branch(
698  rootNames::parentageIDBranchName().c_str(), &hash, basketSize_, 0)) {
700  << "Failed to create a branch for ParentageIDs in the output file";
701  }
702  hash = nullptr;
703  auto par = root::getObjectRequireDict<Parentage>();
704  Parentage const* desc = &par;
705  if (!parentageTree_->Branch(
706  rootNames::parentageBranchName().c_str(), &desc, basketSize_, 0)) {
708  << "Failed to create a branch for Parentages in the output file";
709  }
710  desc = nullptr;
711  for (auto const& pr : ParentageRegistry::get()) {
712  hash = &pr.first;
713  desc = &pr.second;
714  parentageTree_->Fill();
715  }
716  parentageTree_->SetBranchAddress(rootNames::parentageIDBranchName().c_str(),
717  nullptr);
718  parentageTree_->SetBranchAddress(rootNames::parentageBranchName().c_str(),
719  nullptr);
720  }
721 
722  void
724  {
725  RecursiveMutexSentry sentry{mutex_, __func__};
727  auto const* pver = &ver;
728  TBranch* b = metaDataTree_->Branch(
729  metaBranchRootName<FileFormatVersion>(), &pver, basketSize_, 0);
730  // FIXME: Turn this into a throw!
731  assert(b);
732  b->Fill();
733  }
734 
735  void
737  {
738  RecursiveMutexSentry sentry{mutex_, __func__};
740  FileIndex::Element elem{};
741  auto const* findexElemPtr = &elem;
742  TBranch* b = fileIndexTree_->Branch(
743  metaBranchRootName<FileIndex::Element>(), &findexElemPtr, basketSize_, 0);
744  // FIXME: Turn this into a throw!
745  assert(b);
746  for (auto& entry : fileIndex_) {
747  findexElemPtr = &entry;
748  b->Fill();
749  }
750  b->SetAddress(0);
751  }
752 
753  void
755  {
756  RecursiveMutexSentry sentry{mutex_, __func__};
758  }
759 
760  void
762  {
763  // We don't do this yet; currently we're storing a slightly
764  // bloated ProcessHistoryRegistry.
765  }
766 
767  void
769  {
770  RecursiveMutexSentry sentry{mutex_, __func__};
771  ProcessHistoryMap pHistMap;
772  for (auto const& pr : ProcessHistoryRegistry::get()) {
773  pHistMap.emplace(pr);
774  }
775  auto const* p = &pHistMap;
776  TBranch* b = metaDataTree_->Branch(
777  metaBranchRootName<ProcessHistoryMap>(), &p, basketSize_, 0);
778  if (b == nullptr) {
780  << "Unable to locate required "
781  "ProcessHistoryMap branch in output "
782  "metadata tree.\n";
783  }
784  b->Fill();
785  }
786 
787  void
789  FileStatsCollector const& stats,
792  {
793  RecursiveMutexSentry sentry{mutex_, __func__};
794  using namespace cet::sqlite;
795  Ntuple<string, string> fileCatalogMetadata{
796  *rootFileDB_, "FileCatalog_metadata", {{"Name", "Value"}}, true};
797  Transaction txn{*rootFileDB_};
798  for (auto const& kv : md) {
799  fileCatalogMetadata.insert(kv.first, kv.second);
800  }
801  // Add our own specific information: File format and friends.
802  fileCatalogMetadata.insert("file_format", "\"artroot\"");
803  fileCatalogMetadata.insert("file_format_era",
805  fileCatalogMetadata.insert("file_format_version",
807  // File start time.
808  namespace bpt = boost::posix_time;
809  auto formatted_time = [](auto const& t) {
810  return cet::canonical_string(bpt::to_iso_extended_string(t));
811  };
812  fileCatalogMetadata.insert("start_time",
813  formatted_time(stats.outputFileOpenTime()));
814  // File "end" time: now, since file is not actually closed yet.
815  fileCatalogMetadata.insert(
816  "end_time",
817  formatted_time(boost::posix_time::second_clock::universal_time()));
818  // Run/subRun information.
819  if (!stats.seenSubRuns().empty()) {
820  auto I = find_if(md.crbegin(), md.crend(), [](auto const& p) {
821  return p.first == "run_type";
822  });
823  if (I != md.crend()) {
824  ostringstream buf;
825  buf << "[ ";
826  for (auto const& srid : stats.seenSubRuns()) {
827  buf << "[ " << srid.run() << ", " << srid.subRun() << ", "
828  << cet::canonical_string(I->second) << " ], ";
829  }
830  // Rewind over last delimiter.
831  buf.seekp(-2, ios_base::cur);
832  buf << " ]";
833  fileCatalogMetadata.insert("runs", buf.str());
834  }
835  }
836  // Number of events.
837  fileCatalogMetadata.insert("event_count",
838  std::to_string(stats.eventsThisFile()));
839  // first_event and last_event.
840  auto eidToTuple = [](EventID const& eid) -> string {
841  ostringstream eidStr;
842  eidStr << "[ " << eid.run() << ", " << eid.subRun() << ", " << eid.event()
843  << " ]";
844  return eidStr.str();
845  };
846  fileCatalogMetadata.insert("first_event",
847  eidToTuple(stats.lowestEventID()));
848  fileCatalogMetadata.insert("last_event",
849  eidToTuple(stats.highestEventID()));
850  // File parents.
851  if (!stats.parents().empty()) {
852  ostringstream pstring;
853  pstring << "[ ";
854  for (auto const& parent : stats.parents()) {
855  pstring << cet::canonical_string(parent) << ", ";
856  }
857  // Rewind over last delimiter.
858  pstring.seekp(-2, ios_base::cur);
859  pstring << " ]";
860  fileCatalogMetadata.insert("parents", pstring.str());
861  }
862  // Incoming stream-specific metadata overrides.
863  for (auto const& kv : ssmd) {
864  fileCatalogMetadata.insert(kv.first, kv.second);
865  }
866  txn.commit();
867  }
868 
869  void
871  {
872  RecursiveMutexSentry sentry{mutex_, __func__};
874  }
875 
876  void
878  {
879  RecursiveMutexSentry sentry{mutex_, __func__};
880  // Make a local copy of the UpdateOutputCallbacks's ProductList,
881  // removing any transient or pruned products.
883  auto productDescriptionsToWrite = [this, &reg](BranchType const bt) {
884  for (auto const& pr : descriptionsToPersist_[bt]) {
885  auto const& desc = pr.second;
886  reg.productList_.emplace(BranchKey{desc}, desc);
887  }
888  };
889  for_each_branch_type(productDescriptionsToWrite);
890  ProductRegistry const* regp = &reg;
891  TBranch* b = metaDataTree_->Branch(
892  metaBranchRootName<ProductRegistry>(), &regp, basketSize_, 0);
893  // FIXME: Turn this into a throw!
894  assert(b);
895  b->Fill();
896  }
897 
898  void
900  {
901  RecursiveMutexSentry sentry{mutex_, __func__};
902  BranchChildren const* ppDeps = &om_->branchChildren();
903  TBranch* b = metaDataTree_->Branch(
904  metaBranchRootName<BranchChildren>(), &ppDeps, basketSize_, 0);
905  // FIXME: Turn this into a throw!
906  assert(b);
907  b->Fill();
908  }
909 
910  void
912  {
913  RecursiveMutexSentry sentry{mutex_, __func__};
914  pResultsAux_ = &resp.resultsAux();
915  fillBranches<InResults>(resp, pResultsProductProvenanceVector_);
916  }
917 
918  void
920  {
921  RecursiveMutexSentry sentry{mutex_, __func__};
926  [this](BranchType const bt) { treePointers_[bt]->writeTree(); });
927  }
928 
929  void
931  {
932  RecursiveMutexSentry sentry{mutex_, __func__};
933  subRunRSID_ = getNewRangeSetID(*rootFileDB_, InSubRun, ranges.run());
934  insertIntoEventRanges(*rootFileDB_, ranges);
935  auto const& eventRangesIDs = getExistingRangeSetIDs(*rootFileDB_, ranges);
936  insertIntoJoinTable(*rootFileDB_, InSubRun, subRunRSID_, eventRangesIDs);
937  }
938 
939  void
941  {
942  RecursiveMutexSentry sentry{mutex_, __func__};
943  runRSID_ = getNewRangeSetID(*rootFileDB_, InRun, ranges.run());
944  insertIntoEventRanges(*rootFileDB_, ranges);
945  auto const& eventRangesIDs = getExistingRangeSetIDs(*rootFileDB_, ranges);
946  insertIntoJoinTable(*rootFileDB_, InRun, runRSID_, eventRangesIDs);
947  }
948 
949  template <BranchType BT>
952  RangeSet const&,
953  string const& wrappedName)
954  {
955  RecursiveMutexSentry sentry{mutex_, __func__};
956  if (oh.isValid()) {
957  return oh.wrapper();
958  }
959  return dummyProductCache_.product(wrappedName);
960  }
961 
962  template <BranchType BT>
965  RangeSet const& prunedProductRS,
966  string const& wrappedName)
967  {
968  RecursiveMutexSentry sentry{mutex_, __func__};
969  if (oh.isValid() && prunedProductRS.is_valid()) {
970  return oh.wrapper();
971  }
972  return dummyProductCache_.product(wrappedName);
973  }
974 
975  template <BranchType BT>
976  void
978  vector<ProductProvenance>* vpp)
979  {
980  RecursiveMutexSentry sentry{mutex_, __func__};
981  bool const fastCloning = ((BT == InEvent) && wasFastCloned_);
982  map<unsigned, unsigned> checksumToIndex;
983  auto const& principalRS = principal.seenRanges();
984  set<ProductProvenance> keptprv;
985  for (auto const& val : selectedOutputItemList_[BT]) {
986  auto const& bd = val.branchDescription_;
987  auto const pid = bd.productID();
988  descriptionsToPersist_[BT].emplace(pid, bd);
989  bool const produced = bd.produced();
990  bool const resolveProd = (produced || !fastCloning ||
991  treePointers_[BT]->uncloned(bd.branchName()));
992  // Update the kept provenance
993  bool const keepProvenance =
995  (produced && (dropMetaData_ == DropMetaData::DropPrior)));
996  auto const& oh = principal.getForOutput(pid, resolveProd);
997  auto prov = keptprv.begin();
998  if (keepProvenance) {
999  if (oh.productProvenance()) {
1000  prov = keptprv.insert(*oh.productProvenance()).first;
1003  {
1004  vector<ProductProvenance const*> stacked_pp;
1005  stacked_pp.push_back(&*oh.productProvenance());
1006  while (true) {
1007  if (stacked_pp.size() == 0) {
1008  break;
1009  }
1010  auto current_pp = stacked_pp.back();
1011  stacked_pp.pop_back();
1012  for (auto const parent_bid :
1013  current_pp->parentage().parents()) {
1014  // Note: Suppose the parent ProductID corresponds to
1015  // product that has been requested to be
1016  // "dropped"--i.e. someone has specified "drop
1017  // *_m1a_*_*" in their configuration, and
1018  // although a given product matching this
1019  // pattern will not be included in the
1020  // selectedProducts_ list, one of the parents of
1021  // a selected product can match the "dropping"
1022  // pattern and its BranchDescription will still
1023  // be written to disk since it is inserted into
1024  // the descriptionsToPersist_ data member.
1025  auto parent_bd = principal.getProductDescription(parent_bid);
1026  if (!parent_bd) {
1027  // FIXME: Is this an error condition?
1028  continue;
1029  }
1030  descriptionsToPersist_[BT].emplace(parent_bid, *parent_bd);
1031  if (!parent_bd->produced()) {
1032  // We got it from the input, nothing to do.
1033  continue;
1034  }
1035  auto parent_pp =
1036  principal.branchToProductProvenance(parent_bid);
1037  if (!parent_pp || (dropMetaData_ != DropMetaData::DropNone)) {
1038  continue;
1039  }
1040  if (!keptprv.insert(*parent_pp).second) {
1041  // Already there, done.
1042  continue;
1043  }
1046  stacked_pp.push_back(parent_pp.get());
1047  }
1048  }
1049  }
1050  }
1051  }
1052  } else {
1053  // No provenance: product was either not produced, or was
1054  // dropped; create provenance to remember that.
1055  auto status = productstatus::dropped();
1056  if (produced) {
1057  status = productstatus::neverCreated();
1058  }
1059  prov = keptprv.emplace(pid, status).first;
1060  }
1061  }
1062  // Resolve the product if we are going to attempt to write it out.
1063  if (resolveProd) {
1064  // Product was either produced, or we are not cloning the whole
1065  // file and the product branch was not cloned so we should be
1066  // able to get a pointer to it from the passed principal and
1067  // write it out.
1068  auto const& rs = getRangeSet<BT>(oh, principalRS, produced);
1069  if (RangeSetsSupported<BT>::value && !rs.is_valid()) {
1070  // At this point we are now going to write out a dummy product
1071  // whose Wrapper present flag is false because the range set
1072  // got invalidated to present double counting when combining
1073  // run or subrun products from multiple fragments. We change
1074  // the provenance status that we are going to write out to
1075  // dummyToPreventDoubleCount to flag this case. Note that the
1076  // requirement is only that the status not be
1077  // productstatus::present(). We use a special code to make it
1078  // easier for humans to tell what is going on.
1079  auto prov_bid = prov->productID();
1080  if (keptprv.erase(*prov) != 1ull) {
1081  throw Exception(errors::LogicError, "KeptProvenance::setStatus")
1082  << "Attempt to set product status for product whose provenance "
1083  "is not being recorded.\n";
1084  }
1085  prov =
1086  keptprv
1087  .emplace(prov_bid, productstatus::dummyToPreventDoubleCount())
1088  .first;
1089  }
1090  auto const* product = getProduct<BT>(oh, rs, bd.wrappedName());
1091  setProductRangeSetID<BT>(
1092  rs, *rootFileDB_, const_cast<EDProduct*>(product), checksumToIndex);
1093  val.product_ = product;
1094  }
1095  }
1096  vpp->assign(keptprv.begin(), keptprv.end());
1097  for (auto const& val : *vpp) {
1098  if (val.productStatus() == productstatus::uninitialized()) {
1100  "RootOutputFile::fillBranches(principal, vpp):")
1101  << "Attempt to write a product with uninitialized provenance!\n";
1102  }
1103  }
1104  treePointers_[BT]->fillTree();
1105  vpp->clear();
1106  }
1107 
1108 } // namespace art
SubRunAuxiliary const * pSubRunAux_
EventAuxiliary const * pEventAux_
std::vector< std::pair< std::string, std::string >> collection_type
void setFileStatus(OutputFileStatus const ofs)
int const compressionLevel_
T unique_value(query_result< T > const &r)
Definition: query_result.h:101
bool fastCloningEnabledAtConstruction_
FileIndex::EntryNumber_t runEntryNumber() const
std::string const & currentFileName() const
RunID const & runID() const noexcept
RootOutputFile(OutputModule *, std::string const &fileName, ClosingCriteria const &fileSwitchCriteria, int const compressionLevel, int64_t const saveMemoryObjectThreshold, int64_t const treeMaxVirtualSize, int const splitLevel, int const basketSize, DropMetaData dropMetaData, bool dropMetaDataForDroppedData, bool fastCloningRequested)
Granularity granularity() const
ProductProvenances * pRunProductProvenanceVector_
int64_t const treeMaxVirtualSize_
EventID const & eventID() const noexcept
T * get() const
Definition: ServiceHandle.h:64
auto insert_into(sqlite3 *const db, std::string const &tablename)
Definition: insert.h:112
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
ProductProvenances resultsProductProvenanceVector_
std::chrono::steady_clock::time_point beginTime_
std::unique_ptr< TFile > filePtr_
cur
Definition: dbjson.py:21
std::vector< EventRange > const & ranges() const
Definition: RangeSet.cc:96
std::string const & parentageTreeName()
Definition: rootNames.cc:21
void beginInputFile(RootFileBlock const *, bool fastClone)
EDProduct const * wrapper() const
Definition: OutputHandle.cc:56
std::string const & branchName() const
std::array< std::set< OutputItem >, NumBranchTypes > selectedOutputItemList_
std::string const & eventHistoryTreeName()
Definition: rootNames.cc:54
DropMetaData dropMetaData_
STL namespace.
ProductProvenances runProductProvenanceVector_
BranchDescription const branchDescription_
void writeOne(EventPrincipal const &)
hep::concurrency::RecursiveMutex mutex_
bool is_valid() const
Definition: RangeSet.cc:118
FileIndex::EntryNumber_t eventEntryNumber() const
void setRunAuxiliaryRangeSetID(RangeSet const &)
bool is_sorted() const
Definition: RangeSet.cc:137
SubRunAuxiliary const & subRunAux() const
Definition: Principal.cc:1156
std::string columns()
Definition: create_table.h:113
ProductStatus uninitialized()
Definition: ProductStatus.h:42
RunNumber_t run() const
Definition: RangeSet.cc:90
FileProperties const & fileProperties() const
std::map< ProcessHistoryID const, ProcessHistory > ProcessHistoryMap
std::vector< std::string > parents(bool want_basename=true) const
static void writeTTree(TTree *) noexcept(false)
ProductProvenances eventProductProvenanceVector_
ProductProvenances * pResultsProductProvenanceVector_
void writeFileCatalogMetadata(FileStatsCollector const &stats, FileCatalogMetadata::collection_type const &, FileCatalogMetadata::collection_type const &)
ProductProvenance const * productProvenance() const
Definition: OutputHandle.cc:50
OutputFileStatus status_
SubRunID const & subRunID() const noexcept
void setSubRunAuxiliaryRangeSetID(RangeSet const &)
ProductList productList_
void addEventSelectionEntry(EventSelectionID const &eventSelection)
Definition: History.cc:19
ProductProvenances subRunProductProvenanceVector_
std::string const & metaDataTreeName()
Definition: rootNames.cc:40
OutputFileStatus
const double e
bool should_close(FileProperties const &) const
RangeSet seenRanges() const
Definition: Principal.cc:1000
static collection_type const & get()
bool is_full_run() const
Definition: RangeSet.cc:124
void updateAge(std::chrono::seconds const age)
FileProperties fp_
OutputHandle getForOutput(ProductID const &, bool resolveProd) const
Definition: Principal.cc:1087
cet::exempt_ptr< BranchDescription const > getProductDescription(ProductID const pid, bool const alwaysEnableLookupOfProducedProducts=false) const
Definition: Principal.cc:1116
void update_subRun(OutputFileStatus const status)
void create_table(sqlite3 *const db, std::string const &tablename, Cols const &...cols)
Definition: create_table.h:156
char const * metaBranchRootName()
Definition: rootNames.h:61
void writeSubRun(SubRunPrincipal const &)
std::string const & parentageBranchName()
Definition: rootNames.cc:33
void sortBy_Run_SubRun_Event()
Definition: FileIndex.cc:161
static bool shouldFastClone(bool const fastCloningSet, bool const fastCloning, bool const wantAllEvents, ClosingCriteria const &cc)
detail::DummyProductCache dummyProductCache_
boost::posix_time::ptime outputFileOpenTime() const
int getFileFormatVersion()
void writeResults(ResultsPrincipal &resp)
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
bool isValid() const
Definition: OutputHandle.cc:38
RootOutputTreePtrArray treePointers_
std::unique_ptr< cet::sqlite::Connection > rootFileDB_
std::string const & parentageIDBranchName()
Definition: rootNames.cc:27
void respondToCloseInputFile(FileBlock const &)
EDProduct const * product(std::string const &wrappedName)
void addEntry(EventID const &eID, EntryNumber_t entry)
Definition: FileIndex.cc:146
bool contains(RunNumber_t, SubRunNumber_t, EventNumber_t) const
Definition: RangeSet.cc:102
ProductProvenances * pEventProductProvenanceVector_
std::enable_if_t<!detail::RangeSetsSupported< BT >::value, EDProduct const * > getProduct(OutputHandle const &, RangeSet const &productRS, std::string const &wrappedName)
auto transform_all(Container &, OutputIt, UnaryOp)
EventID const & highestEventID() const
ProductStatus dropped()
Definition: ProductStatus.h:26
void setRangeSetID(unsigned const id) const
auto select(T const &...t)
Definition: select.h:155
void writeProductDescriptionRegistry()
std::string const & getFileFormatEra()
RunAuxiliary const & runAux() const
Definition: Principal.cc:1150
std::string const & BranchTypeToString(BranchType const bt)
Definition: BranchType.cc:65
cet::exempt_ptr< ProductProvenance const > branchToProductProvenance(ProductID const &) const
Definition: Principal.cc:505
p
Definition: test.py:228
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
static TTree * makeTTree(TFile *, std::string const &name, int splitLevel)
std::string const & fileIndexTreeName()
Definition: rootNames.cc:47
std::array< ProductDescriptionsByID, NumBranchTypes > descriptionsToPersist_
std::string const & eventHistoryBranchName()
Definition: rootNames.cc:61
void checkDictionaries(BranchDescription const &productDesc)
bool canonical_string(std::string const &str, std::string &result)
void writeProcessConfigurationRegistry()
ResultsAuxiliary const & resultsAux() const
Definition: Principal.cc:1168
OutputModule const * om_
signed __int64 int64_t
Definition: stdint.h:131
int64_t const saveMemoryObjectThreshold_
SelectionsArray const & keptProducts() const
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void writeRun(RunPrincipal const &)
bool operator<(OutputItem const &rh) const
static RangeSet invalid()
Definition: RangeSet.cc:46
EventAuxiliary const & eventAux() const
Definition: Principal.cc:1162
static constexpr EventID invalidEvent() noexcept
Definition: EventID.h:203
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
ProductStatus dummyToPreventDoubleCount()
Definition: ProductStatus.h:31
auto for_all(FwdCont &, Func)
BranchType
Definition: BranchType.h:18
void writeProcessHistoryRegistry()
static constexpr unsigned unsigned_max()
bool is_full_subRun() const
Definition: RangeSet.cc:131
ResultsAuxiliary const * pResultsAux_
void update_run(OutputFileStatus const status)
std::set< SubRunID > const & seenSubRuns() const
ProductProvenances * pSubRunProductProvenanceVector_
void for_each_branch_type(F f)
Definition: BranchType.h:36
RunAuxiliary const * pRunAux_
static constexpr unsigned size_max()
cet::registry_via_id< success_t, val > reg
FileIndex::EntryNumber_t subRunEntryNumber() const
void setRangeSetID(unsigned const id)
Definition: EDProduct.h:79
History const * pHistory_
void exec(sqlite3 *db, std::string const &ddl)
Definition: exec.cc:5
RangeSet const & rangeOfValidity() const
Definition: OutputHandle.cc:62
bool isRealData() const noexcept
std::string const & branchName() const noexcept
BranchChildren const & branchChildren() const
static const double sr
Definition: Units.h:167
std::size_t eventsThisFile() const
ClosingCriteria fileSwitchCriteria_
void updateSize(unsigned const size)
std::string to_string(ModuleType const mt)
Definition: ModuleType.h:34
unsigned checksum() const
Definition: RangeSet.cc:214
ProductStatus neverCreated()
Definition: ProductStatus.h:21
EventID const & lowestEventID() const
void fillBranches(Principal const &, std::vector< ProductProvenance > *)
static void exportTo(sqlite3 *db)
fhicl::ParameterSetID selectorConfig() const
Definition: Observer.cc:84
void setRangeSetID(unsigned const id) const
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:120
static constexpr unsigned seconds_max()