RootDelayedReader.cc
Go to the documentation of this file.
2 // vim: sw=2 expandtab :
3 
14 #include "canvas_root_io/Streamers/ProductIDStreamer.h"
15 #include "canvas_root_io/Streamers/RefCoreStreamer.h"
16 #include "cetlib/crc32.h"
17 
18 #include "TBranch.h"
19 #include "TBranchElement.h"
20 #include "TClass.h"
21 
22 #include <atomic>
23 #include <cassert>
24 #include <cstring>
25 #include <utility>
26 #include <vector>
27 
28 using namespace std;
29 
30 namespace art {
31 
32  RootDelayedReader::~RootDelayedReader() = default;
33 
34  RootDelayedReader::RootDelayedReader(
36  sqlite3* db,
37  vector<input::EntryNumber> const& entrySet,
39  TBranch* provenanceBranch,
40  int64_t const saveMemoryObjectThreshold,
43  BranchType const branchType,
44  EventID const eID,
45  bool const compactSubRunRanges)
46  : DelayedReader()
47  , fileFormatVersion_{version}
48  , db_{db}
49  , entrySet_{entrySet}
50  , branches_{branches}
51  , provenanceBranch_{provenanceBranch}
52  , saveMemoryObjectThreshold_{saveMemoryObjectThreshold}
53  , primaryFile_{primaryFile}
54  , branchIDLists_{bidLists}
55  , branchType_{branchType}
56  , eventID_{eID}
57  , compactSubRunRanges_{compactSubRunRanges}
58  {}
59 
60  void
62  {
63  principal_ = principal;
64  }
65 
66  std::vector<ProductProvenance>
68  {
70  vector<ProductProvenance> ppv;
71  auto p_ppv = &ppv;
72  provenanceBranch_->SetAddress(&p_ppv);
73  // Note: This provenance may be replaced later for
74  // run and subrun products by the combination
75  // process (agggregation).
77  return ppv;
78  }
79 
80  // FIXME: The following function may need to go somewhere outside of
81  // RootDelayedReader. For run and subrun products check to see if attempting
82  // to read them will result in a combination with a valid range set. If not
83  // report to the caller that they can skip doing the read.
84  bool
86  {
87  if ((branchType_ != InSubRun) && (branchType_ != InRun)) {
88  // We only handle run and subrun products here, tell
89  // the caller he should proceed.
90  return true;
91  }
92  // Products from files that did not support range sets are
93  // worth attempting to read, tell the caller to proceed.
94  if (fileFormatVersion_.value_ < 9) {
95  return true;
96  }
97  {
99  for (auto I = entrySet_.cbegin(), E = entrySet_.cend(); I != E; ++I) {
100  vector<ProductProvenance> ppv;
101  ProductProvenance const* prov = nullptr;
102  {
103  auto p_ppv = &ppv;
104  provenanceBranch_->SetAddress(&p_ppv);
106  for (auto const& val : ppv) {
107  if (val.productID() == bid) {
108  prov = &val;
109  break;
110  }
111  }
112  }
113  // Note: If this is a produced product then it might not be in
114  // any of the fragments, this is not an error.
115  if (prov != nullptr) {
116  if (prov->productStatus() == productstatus::present()) {
117  // We found a usable product in the set of fragments,
118  // tell our caller to proceed.
119  return true;
120  }
121  }
122  }
123  // None of the run or subrun fragments have a usable product,
124  // tell our caller that he can give up now.
125  return false;
126  }
127  }
128 
129  unique_ptr<EDProduct>
131  ProductID const pid,
132  RangeSet& rs) const
133  {
134  auto iter = branches_->find(pid);
135  assert(iter != branches_->end());
136  input::BranchInfo const& branchInfo = iter->second;
137  TBranch* br = branchInfo.productBranch_;
138  assert(br != nullptr);
139  auto const& pd = branchInfo.branchDescription_;
140  // Note: It is not an error to attempt to delay read a produced
141  // run or subrun product because there might be many of them spread
142  // across multiple fragments of the same run or subrun which will
143  // be combined below.
144  if ((branchType_ != InSubRun) && (branchType_ != InRun)) {
145  if (pd.produced()) {
146  throw Exception{errors::LogicError, "RootDelayedReader::getProduct_"}
147  << "Attempt to delay read a produced product!\n";
148  }
149  }
150  // Note: threading: The configure ref core streamer and the related i/o
151  // operations must be done with the source lock held!
152  InputSourceMutexSentry sentry;
153  configureProductIDStreamer(branchIDLists_);
154  configureRefCoreStreamer(principal_.get());
155  TClass* cl = TClass::GetClass(pd.wrappedName().c_str());
156  auto get_product = [this, cl, br](auto entry) {
157  unique_ptr<EDProduct> p{static_cast<EDProduct*>(cl->New())};
158  EDProduct* pp = p.get();
159  br->SetAddress(&pp);
160  unsigned long long ticks = 0;
161  auto const bytesRead = input::getEntry(br, entry, ticks);
162  if ((saveMemoryObjectThreshold_ > -1) &&
163  (bytesRead > saveMemoryObjectThreshold_)) {
164  br->DropBaskets("all");
165  }
166  return p;
167  };
168  // Retrieve first product
169  auto result = get_product(entrySet_[0]);
170  if ((branchType_ != InSubRun) && (branchType_ != InRun)) {
171  // Not a run or subrun product, all done.
172  configureProductIDStreamer();
173  configureRefCoreStreamer();
174  return result;
175  }
176  //
177  // Retrieve and combine multiple Run/SubRun products as needed (this is
178  // aggregation!).
179  //
180  // Products from files that did not support RangeSets are
181  // assigned RangeSets that correspond to the entire run/subrun.
182  if (fileFormatVersion_.value_ < 9) {
183  if (branchType_ == InRun) {
185  } else {
187  }
188  configureProductIDStreamer();
189  configureRefCoreStreamer();
190  return result;
191  }
192  // Fetch the provenance for the first product.
193  vector<ProductProvenance> ppv;
194  unique_ptr<ProductProvenance const> prov;
195  {
196  auto p_ppv = &ppv;
197  provenanceBranch_->SetAddress(&p_ppv);
199  for (auto const& val : ppv) {
200  if (val.productID() == pid) {
201  prov.reset(new ProductProvenance(val));
202  break;
203  }
204  }
205  }
206  // Note: Cannot make this assert here because it can be so that the first
207  // product
208  // is a dummy wrapper with the present flag false, and no provenance
209  // has been written (How???), but the second product is present and
210  // has provenance.
211  // assert((prov.get() != nullptr) && "Could not find provenance for this
212  // Run/SubRun product!");
213  // Unfortunately, we cannot use detail::resolveRangeSetInfo in
214  // this case because products that represent a full (Sub)Run are
215  // allowed to be duplicated in an input file. The behavior in
216  // such a case is a NOP.
217  RangeSet mergedRangeSet = detail::resolveRangeSet(db_,
218  "SomeInput"s,
219  branchType_,
220  result->getRangeSetID(),
222  // Note: If the mergedRangeSet is invalid here that means the first product
223  // was a dummy created
224  // by RootOutputFile to prevent double-counting when combining
225  // products. Or the user is playing games that are going to hurt them
226  // by intentionally created a product with an invalid range set.
227  // Note: Also in that case we have a product status of unknown which we may
228  // have to replace later.
229  for (auto it = entrySet_.cbegin() + 1, e = entrySet_.cend(); it != e;
230  ++it) {
231  auto p = get_product(*it);
232  vector<ProductProvenance> new_ppv;
233  unique_ptr<ProductProvenance const> new_prov;
234  {
235  auto p_ppv = &new_ppv;
236  provenanceBranch_->SetAddress(&p_ppv);
238  for (auto const& val : new_ppv) {
239  if (val.productID() == pid) {
240  new_prov.reset(new ProductProvenance(val));
241  break;
242  }
243  }
244  }
245  // assert((new_prov.get() != nullptr) && "Could not find provenance for
246  // this Run/SubRun product!"); auto const id = p->getRangeSetID();
247  RangeSet const& newRS = detail::resolveRangeSet(db_,
248  "SomeInput"s,
249  branchType_,
250  p->getRangeSetID(),
252  if (!mergedRangeSet.is_valid() && !newRS.is_valid()) {
253  // Both range sets are invalid, do nothing.
254  // RootOutputFile creates this situation to prevent double-counting when
255  // combining products. Possibly can also happen when a produced product
256  // is not produced or when a product is dropped?
257  } else if (mergedRangeSet.is_valid() && !newRS.is_valid()) {
258  // FIXME: Can a neverCreated or dropped product have a valid range set?
259  // assert(prov->productStatus() == productstatus::present());
260  // Our current merged range set is valid and the next one is invalid, do
261  // nothing. RootOutputFile creates this situation to prevent
262  // double-counting when combining products. Possibly can also happen
263  // when a produced product is not produced or when a product is dropped?
264  } else if (!mergedRangeSet.is_valid() && newRS.is_valid()) {
265  // We finally have a valid range set to use.
266  // FIXME: Can a neverCreated or dropped product have a valid range set?
267  // assert(new_prov->productStatus() == productstatus::present());
268  mergedRangeSet = newRS;
269  std::swap(result, p);
270  // Replace the provenance.
271  // Note: We do not worry about productstatus::unknown() here because
272  // newRS is valid.
273  principal_->insert_pp(const_cast<Group*>(grp),
274  make_unique<ProductProvenance const>(*new_prov));
275  prov = move(new_prov);
276  } else if (art::disjoint_ranges(mergedRangeSet, newRS)) {
277  // Old and new range sets can be combined, do it.
278  // FIXME: Can a neverCreated or dropped product have a valid range set?
279  // assert(prov->productStatus() == productstatus::present());
280  // assert(new_prov->productStatus() == productstatus::present());
281  result->combine(p.get());
282  mergedRangeSet.merge(newRS);
283  // FIXME: What do we do about provenance??? What if one product has
284  // status present and the other neverCreated or dropped?
285  // FIXME: Possibly cannot happen because in the case that the new
286  // provenance is not present it would have
287  // FIXME: an invalid range set?
288  // Note: We do not worry about productstatus::unknown() here because
289  // newRS is valid. assert((prov->productStatus() ==
290  // new_prov->productStatus()) && "Unequal product status when merging
291  // range sets!");
292  } else if (art::same_ranges(mergedRangeSet, newRS)) {
293  // The ranges are the same, so the behavior is a NOP. If
294  // the stakeholders decide that products with the same
295  // ranges should be checked for equality, the condition
296  // will be added here.
297  // FIXME: Can a neverCreated or dropped product have a valid range set?
298  // assert(prov->productStatus() == productstatus::present());
299  // assert(new_prov->productStatus() == productstatus::present());
300  // FIXME: What do we do about provenance??? What if one product has
301  // status present and the other neverCreated or dropped?
302  // FIXME: Possibly cannot happen because in the case that the new
303  // provenance is not present it would have
304  // FIXME: an invalid range set?
305  // Note: We do not worry about productstatus::unknown() here because
306  // newRS is valid. assert((prov->productStatus() ==
307  // new_prov->productStatus()) && "Unequal product status when products
308  // have identical range sets!");
309  } else if (art::overlapping_ranges(mergedRangeSet, newRS)) {
311  "RootDelayedReader::getProduct_"}
312  << "\nThe following ranges corresponding to the product:\n"
313  << " '" << branchInfo.branchDescription_ << "'"
314  << "\ncannot be aggregated\n"
315  << mergedRangeSet << " and\n"
316  << newRS << "\nPlease contact artists@fnal.gov.\n";
317  }
318  }
319  // Now transfer the calculated mergedRangeSet to the output argument.
320  std::swap(rs, mergedRangeSet);
321  // And now we are done.
322  configureProductIDStreamer();
323  configureRefCoreStreamer();
324  return result;
325  }
326 
327  // FIXME: This should be a member of RootInputFileSequence.
328  int
330  {
331  // idx being a number we can actually use is a precondition of this
332  // function.
333  assert(!(idx < 0));
334  // Note:
335  //
336  // Return code of -2 means stop, -1 means event-not-found,
337  // otherwise 0 for success.
338  //
339  auto const& sfnm = primaryFile_->secondaryFileNames();
340  assert(!(static_cast<decltype(sfnm.size())>(idx) > sfnm.size()));
341  if (sfnm.empty()) { // No configured secondary files.
342  return -2;
343  }
344  auto const& sf = primaryFile_->secondaryFiles();
345  if (static_cast<decltype(sfnm.size())>(idx) == sfnm.size()) {
346  // We're done.
347  return -2;
348  }
349  if (!sf[idx]) {
350  primaryFile_->openSecondaryFile(idx);
351  }
352  switch (branchType_) {
353  case InEvent: {
354  if (!sf[idx]->readEventForSecondaryFile(eventID_)) {
355  return -1;
356  }
357  } break;
358  case InSubRun: {
359  if (!sf[idx]->readSubRunForSecondaryFile(eventID_.subRunID())) {
360  return -1;
361  }
362  } break;
363  case InRun: {
364  if (!sf[idx]->readRunForSecondaryFile(eventID_.runID())) {
365  return -1;
366  }
367  } break;
368  default: {
369  assert(false && "RootDelayedReader encountered an unknown BranchType!");
370  return -2;
371  }
372  }
373  return 0;
374  }
375 
376 } // namespace art
RunID const & runID() const
Definition: EventID.h:93
SubRunID const & subRunID() const
Definition: EventID.h:105
ProductStatus const & productStatus() const
bool isAvailableAfterCombine_(ProductID) const override
STL namespace.
RangeSet resolveRangeSet(RangeSetInfo const &rs)
bool is_valid() const
Definition: RangeSet.cc:118
int openNextSecondaryFile_(int idx) override
static RangeSet forSubRun(SubRunID)
Definition: RangeSet.cc:58
std::vector< ProductProvenance > readProvenance_() const override
const double e
void swap(Handle< T > &a, Handle< T > &b)
cet::exempt_ptr< input::BranchMap const > branches_
std::enable_if_t< detail::are_handles< T, U >::value, bool > disjoint_ranges(T const &a, U const &b)
std::vector< input::EntryNumber > const entrySet_
p
Definition: test.py:228
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
cet::exempt_ptr< RootInputFile > primaryFile_
void setPrincipal_(cet::exempt_ptr< Principal >) override
signed __int64 int64_t
Definition: stdint.h:131
std::enable_if_t< detail::are_handles< T, U >::value, bool > overlapping_ranges(T const &a, U const &b)
std::enable_if_t< detail::are_handles< T, U >::value, bool > same_ranges(T const &a, U const &b)
BranchType
Definition: BranchType.h:18
Int_t getEntry(TBranch *branch, EntryNumber entryNumber)
Definition: getEntry.cc:20
RangeSet & merge(RangeSet const &other)
Definition: RangeSet.cc:296
cet::exempt_ptr< Principal > principal_
cet::exempt_ptr< BranchIDLists const > branchIDLists_
static const double s
Definition: Units.h:99
TBranch * productBranch_
Definition: Inputfwd.h:37
ProductStatus present()
Definition: ProductStatus.h:16
static RangeSet forRun(RunID)
Definition: RangeSet.cc:52
std::unique_ptr< EDProduct > getProduct_(Group const *, ProductID, RangeSet &) const override
FileFormatVersion fileFormatVersion_