Group.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
9 #include "cetlib_except/demangle.h"
10 #include "range/v3/view.hpp"
11 
12 #include <string>
13 
14 using namespace std;
15 
16 namespace art {
17 
18  using namespace detail;
19 
20  Group::~Group()
21  {
22  delete productProvenance_.load();
23  delete product_.load();
24  delete rangeSet_.load();
25  delete partnerProduct_.load();
26  delete baseProduct_.load();
27  delete partnerBaseProduct_.load();
28  }
29 
30  Group::Group(DelayedReader* reader,
31  BranchDescription const& bd,
32  unique_ptr<RangeSet>&& rs,
33  grouptype const gt,
34  unique_ptr<EDProduct>&& edp /*= nullptr*/)
35  : branchDescription_{bd}
36  , delayedReader_{reader}
37  , product_{edp.release()}
38  , rangeSet_{rs.release()}
39  , grpType_{gt}
40  {}
41 
42  EDProduct const*
43  Group::getIt_() const
44  {
45  std::lock_guard sentry{mutex_};
46  if (grpType_ == grouptype::normal) {
48  return product_.load();
49  }
50  return uniqueProduct();
51  }
52 
53  EDProduct const*
55  {
56  std::lock_guard sentry{mutex_};
57  if (grpType_ == grouptype::normal) {
58  return product_.load();
59  }
60  EDProduct* result = product_.load();
61  if (result == nullptr) {
62  result = partnerProduct_.load();
63  }
64  if (grpType_ == grouptype::assns) {
65  return result;
66  }
67  if (result != nullptr) {
68  return result;
69  }
70  result = baseProduct_.load();
71  if (result == nullptr) {
72  result = partnerBaseProduct_.load();
73  }
74  return result;
75  }
76 
77  EDProduct const*
79  {
80  std::lock_guard sentry{mutex_};
81  if (grpType_ == grouptype::normal) {
82  return product_.load();
83  }
84  throw Exception(errors::LogicError, "AmbiguousProduct")
85  << cet::demangle_symbol(typeid(*this).name())
86  << " was asked for a held product (uniqueProduct()) "
87  << "without specifying which one was wanted.\n";
88  }
89 
90  EDProduct const*
91  Group::uniqueProduct(TypeID const& wanted_wrapper_type) const
92  {
93  std::lock_guard sentry{mutex_};
94  if (product_.load() == nullptr) {
95  return nullptr;
96  }
97  if (grpType_ == grouptype::normal) {
98  return product_.load();
99  }
100 
101  auto assns_type_ids = product_.load()->getTypeIDs();
102  if (grpType_ == grouptype::assns) {
103  assert(assns_type_ids.size() == 2ull);
104  if (wanted_wrapper_type ==
105  assns_type_ids.at(product_metatype::RightLeft)) {
106  return partnerProduct_.load();
107  }
108  return product_.load();
109  }
110 
111  assert(assns_type_ids.size() == 4ull);
112  if (wanted_wrapper_type == assns_type_ids.at(product_metatype::RightLeft)) {
113  return partnerBaseProduct_.load();
114  }
115  if (wanted_wrapper_type == assns_type_ids.at(product_metatype::LeftRight)) {
116  return baseProduct_.load();
117  }
118  if (wanted_wrapper_type ==
119  assns_type_ids.at(product_metatype::RightLeftData)) {
120  return partnerProduct_.load();
121  }
122  return product_.load();
123  }
124 
125  BranchDescription const&
127  {
128  return branchDescription_;
129  }
130 
131  ProductID
133  {
134  return branchDescription_.productID();
135  }
136 
137  RangeSet const&
139  {
140  std::lock_guard sentry{mutex_};
141  return *rangeSet_.load();
142  }
143 
146  {
147  std::lock_guard sentry{mutex_};
148  return productProvenance_.load();
149  }
150 
151  // Called by Principal::ctor_read_provenance()
152  // Called by Principal::insert_pp
153  // Called by RootDelayedReader::getProduct_
154  void
155  Group::setProductProvenance(unique_ptr<ProductProvenance const>&& pp)
156  {
157  std::lock_guard sentry{mutex_};
158  delete productProvenance_.load();
159  productProvenance_ = pp.release();
160  }
161 
162  // Called by Principal::put
163  void
164  Group::setProductAndProvenance(unique_ptr<ProductProvenance const>&& pp,
165  unique_ptr<EDProduct>&& edp,
166  unique_ptr<RangeSet>&& rs)
167  {
168  std::lock_guard sentry{mutex_};
169  delete productProvenance_.load();
170  productProvenance_ = pp.release();
171  delete product_.load();
172  product_ = edp.release();
173  delete rangeSet_.load();
174  rangeSet_ = rs.release();
175  }
176 
177  void
179  {
180  std::lock_guard sentry{mutex_};
182  throw Exception(errors::LogicError, "Group::removeCachedProduct():")
183  << "Attempt to remove a produced product!\n"
184  << "This routine should only be used to remove large data products "
185  << "read from disk (like raw digits).\n";
186  }
187  delete product_.load();
188  product_ = nullptr;
189  if (grpType_ == grouptype::normal) {
190  return;
191  }
192  delete partnerProduct_.load();
193  partnerProduct_ = nullptr;
194  if (grpType_ == grouptype::assns) {
195  return;
196  }
197  delete baseProduct_.load();
198  baseProduct_ = nullptr;
199  delete partnerBaseProduct_.load();
200  partnerBaseProduct_ = nullptr;
201  delete rangeSet_.load();
202  rangeSet_ = new RangeSet{};
203  }
204 
205  bool
207  {
208  if (branchDescription_.dropped()) {
209  // Not a product we are producing this time around, and it is not
210  // present in any of the input files we have opened so far.
211  return false;
212  }
214  std::lock_guard sentry{mutex_};
215  bool availableAfterCombine{false};
218  availableAfterCombine =
219  delayedReader_->isAvailableAfterCombine(branchDescription_.productID());
220  }
222  if (productProvenance_.load() == nullptr) {
223  // No provenance, must be a produced product which has not been
224  // put yet, or a non-produced product that is available after
225  // combine (agggregation) and not yet read, or a non-produced
226  // product in a secondary file that has not yet been opened.
227  if (!branchDescription_.produced()) {
228  if (availableAfterCombine) {
229  // No provenance, not produced, but we can get it from the
230  // input, we just have not done so yet. Claim the product is
231  // available.
232  return true;
233  }
234  // Not produced, not availableAfterCombine, must be in a
235  // secondary file that has not yet been opened. We report it as
236  // not available so that the Principal getBy* routines will try
237  // the next secondary file.
238  return false;
239  }
240  if (product_.load()) {
241  throw Exception(errors::LogicError, "Group::status():")
242  << "We have a produced product, the product has been put(), but "
243  "there is no provenance!\n";
244  }
245  // We have a product product which has not been put(), and has no
246  // provenance (as it should be).
248  } else {
249  // Not a produced product, and not yet delay read, use the status
250  // from the on-file provenance.
251  status = productProvenance_.load()->productStatus();
252  }
255  if (!availableAfterCombine) {
256  // We know this is a produced run or subrun product which is not
257  // present in any fragments.
258  return status == productstatus::present();
259  }
260  }
261  // We now know we are either an event or results product, or we are
262  // a run or subrun product that can become valid through product
263  // combination (aggregation).
265  // We now know that we are a run or subrun product that can become
266  // valid through product combination (aggregation). Special case
267  // this and report the product as available even though the the
268  // provenance product status is a special flag that is not the
269  // present status.
270  // This is here to allow fetching of a product specially marked by
271  // RootOutputFile as a dummy with an invalid range set created to
272  // prevent double-counting when combining run/subrun products. We
273  // allow the fetch to happen because the call to
274  // isPossiblyAvailable above determined that the fetch will result
275  // in a valid and present product, even though this particular
276  // dummy one is not.
277  return true;
278  }
279  // Note: Technically this is not necessary since the Wrapper present
280  // flag covers this case, but this way we never do the I/O on
281  // the product if we already have the provenance.
282  return status == productstatus::present();
283  }
284 
285  bool
287  TypeID wanted_wrapper_type /*= TypeID{}*/) const
288  {
289  std::lock_guard sentry{mutex_};
290  // Now try to get the master product.
291  if (product_.load() == nullptr) {
292  // Not already resolved.
294  // Never produced, hopeless.
295  return false;
296  }
297  if (!productAvailable()) {
298  // Not possible to get it, hopeless.
299  return false;
300  }
301  // Now try to read it.
302  // Note: This may call back to us to update the product
303  // provenance if run or subRun data product merging creates a
304  // new provenance.
305  product_ =
307  ->getProduct(this, branchDescription_.productID(), *rangeSet_.load())
308  .release();
309  if (product_.load() == nullptr) {
310  // We failed to get the master product, hopeless.
311  return false;
312  }
313  }
314 
315  if (!wanted_wrapper_type) {
316  // The type of the product is not known, therefore the on-disk
317  // representation is sufficient.
318  return true;
319  }
320 
321  if (grpType_ == grouptype::normal) {
322  // If we get here, we have successfully read a normal product.
323  return true;
324  }
325 
326  assert(grpType_ != grouptype::normal);
327  auto normal_metatype = (grpType_ == grouptype::assns) ?
330 
331  auto assns_type_ids = product_.load()->getTypeIDs();
332  assert(!assns_type_ids.empty());
333 
334  if (wanted_wrapper_type == assns_type_ids.at(normal_metatype)) {
335  return true;
336  }
337 
338  auto partner_metatype = (grpType_ == grouptype::assns) ?
341  if (wanted_wrapper_type == assns_type_ids.at(partner_metatype)) {
342  if (partnerProduct_.load() != nullptr) {
343  // They wanted the partner product, and we have already made it, done.
344  return true;
345  }
346  // They want the partner product, ask the wrapper to make it for us,
347  // who ends up asking the assns to do it.
349  product_.load()->makePartner(wanted_wrapper_type.typeInfo()).release();
350  return partnerProduct_.load() != nullptr;
351  }
352 
354 
355  if (wanted_wrapper_type == assns_type_ids.at(product_metatype::LeftRight)) {
356  if (baseProduct_.load() != nullptr) {
357  // They wanted the base product, and we have already made it, done.
358  return true;
359  }
360  // They want the base, ask the wrapper to make it for us,
361  // who ends up asking the assns to do it.
362  baseProduct_ =
363  product_.load()->makePartner(wanted_wrapper_type.typeInfo()).release();
364  return baseProduct_.load() != nullptr;
365  }
366  if (partnerBaseProduct_.load() != nullptr) {
367  // They wanted the partner base product, and we have already made it,
368  // done.
369  return true;
370  }
372  product_.load()->makePartner(wanted_wrapper_type.typeInfo()).release();
373  return partnerBaseProduct_.load() != nullptr;
374  }
375 
376  bool
377  Group::tryToResolveProduct(TypeID const& wanted_wrapper)
378  {
379  std::lock_guard sentry{mutex_};
380  resolveProductIfAvailable(wanted_wrapper);
381 
382  // If the product is a dummy filler, it will now be marked unavailable.
383  return productAvailable();
384  }
385 
386  std::optional<GroupQueryResult>
388  std::vector<cet::exempt_ptr<art::Group>> const& product_groups,
389  art::WrappedTypeID const& wrapped)
390  {
391  auto by_process_name = [](auto const ga, auto const gb) {
392  return ga->productDescription().processName() ==
393  gb->productDescription().processName();
394  };
395 
396  // We group product groups according to their process names. The
397  // product groups have already been assembled in reverse-process
398  // history. The first process with a match wins. Note that it is
399  // an error for there to be multiple matches per process.
400  for (auto const groups_per_process :
401  ranges::views::group_by(product_groups, by_process_name)) {
402  // Keep track of all matched groups so that a helpful error
403  // message can be reported.
404  std::vector<cet::exempt_ptr<art::Group>> matched_groups;
405  for (auto const group : groups_per_process) {
406  if (group->tryToResolveProduct(wrapped.wrapped_product_type)) {
407  matched_groups.emplace_back(group.get());
408  }
409  }
410 
411  if (auto const num_matches = matched_groups.size(); num_matches == 1) {
412  return std::make_optional(GroupQueryResult{matched_groups[0]});
413  } else if (num_matches > 1) {
415  e << "Found " << num_matches
416  << " products rather than one that match all criteria\n"
417  << " C++ type: " << wrapped.product_type << "\n";
418  for (auto group : matched_groups) {
419  e << " " << group->productDescription().inputTag() << '\n';
420  }
421  throw e;
422  }
423  }
424  return std::nullopt;
425  }
426 
427  std::vector<GroupQueryResult>
429  art::TypeID const& wrapped_type)
430  {
431  std::vector<GroupQueryResult> results;
432  for (auto group : groups) {
433  if (group->tryToResolveProduct(wrapped_type)) {
434  results.emplace_back(group.get());
435  }
436  }
437  return results;
438  }
439 
440 } // namespace art
bool produced() const noexcept
static QCString result
bool tryToResolveProduct(TypeID const &)
Definition: Group.cc:377
ProductID productID() const
Definition: Group.cc:132
bool productAvailable() const
Definition: Group.cc:206
struct vector vector
STL namespace.
constexpr ProductStatus dummyToPreventDoubleCount() noexcept
Definition: ProductStatus.h:25
std::optional< GroupQueryResult > resolve_unique_product(std::vector< cet::exempt_ptr< art::Group >> const &product_groups, art::WrappedTypeID const &wrapped)
Definition: Group.cc:387
bool present() const noexcept
std::atomic< ProductProvenance const * > productProvenance_
Definition: Group.h:91
std::atomic< RangeSet * > rangeSet_
Definition: Group.h:100
std::atomic< EDProduct * > baseProduct_
Definition: Group.h:116
void setProductAndProvenance(std::unique_ptr< ProductProvenance const > &&, std::unique_ptr< EDProduct > &&, std::unique_ptr< RangeSet > &&)
Definition: Group.cc:164
grouptype
Definition: Group.h:34
TypeID wrapped_product_type
Definition: WrappedTypeID.h:18
BranchType branchType() const noexcept
cet::exempt_ptr< ProductProvenance const > productProvenance() const
Definition: Group.cc:145
const double e
constexpr ProductStatus uninitialized() noexcept
Definition: ProductStatus.h:36
BranchDescription const & productDescription() const noexcept
Definition: Group.cc:126
bool dropped() const noexcept
std::atomic< EDProduct * > product_
Definition: Group.h:96
EDProduct const * getIt_() const override
Definition: Group.cc:43
std::recursive_mutex mutex_
Definition: Group.h:87
EDProduct const * anyProduct() const
Definition: Group.cc:54
cet::exempt_ptr< DelayedReader const > const delayedReader_
Definition: Group.h:78
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
constexpr ProductStatus neverCreated() noexcept
Definition: ProductStatus.h:15
string release
Definition: conf.py:24
std::atomic< EDProduct * > partnerProduct_
Definition: Group.h:109
BranchDescription const & branchDescription_
Definition: Group.h:74
bool resolveProductIfAvailable(TypeID wanted_wrapper=TypeID{}) const
Definition: Group.cc:286
def group_by(rflist, field)
Definition: __init__.py:162
void removeCachedProduct()
Definition: Group.cc:178
std::atomic< EDProduct * > partnerBaseProduct_
Definition: Group.h:120
grouptype const grpType_
Definition: Group.h:102
EDProduct const * uniqueProduct() const
Definition: Group.cc:78
void setProductProvenance(std::unique_ptr< ProductProvenance const > &&)
Definition: Group.cc:155
constexpr ProductStatus present() noexcept
Definition: ProductStatus.h:10
ProductID productID() const noexcept
RangeSet const & rangeOfValidity() const
Definition: Group.cc:138
std::type_info const & typeInfo() const
Definition: TypeID.cc:36
std::vector< GroupQueryResult > resolve_products(std::vector< cet::exempt_ptr< art::Group >> const &groups, art::TypeID const &wrapped_type)
Definition: Group.cc:428