Principal.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
33 #include "cetlib/exempt_ptr.h"
34 #include "range/v3/view.hpp"
35 
36 #include <atomic>
37 #include <cassert>
38 #include <memory>
39 #include <string>
40 #include <utility>
41 #include <vector>
42 
43 using namespace cet;
44 using namespace std;
45 
46 namespace {
47  std::string const indent(2, ' ');
48 }
49 
50 namespace art {
51 
52  namespace {
53 
54  unique_ptr<Group>
55  create_group(DelayedReader* reader, BranchDescription const& bd)
56  {
57  auto const& class_name = bd.producedClassName();
58  auto gt = Group::grouptype::normal;
59  if (is_assns(class_name)) {
60  if (name_of_template_arg(class_name, 2) == "void"s) {
61  gt = Group::grouptype::assns;
62  } else {
63  gt = Group::grouptype::assnsWithData;
64  }
65  }
66  return make_unique<Group>(reader, bd, make_unique<RangeSet>(), gt);
67  }
68 
69  } // unnamed namespace
70 
71  void
72  Principal::ctor_create_groups(
74  {
75  if (!presentProducts) {
76  return;
77  }
78  // Note: Dropped products are a problem. We should not create
79  // groups for them now because later we may open a secondary
80  // file which actually contains them and we want the
81  // secondary principal to have those groups. However some
82  // code expects to be able to find a group for dropped
83  // products, so getGroupTryAllFiles ignores groups for
84  // dropped products instead.
85  for (auto const& pr : presentProducts->descriptions) {
86  auto const& pd = pr.second;
87  assert(pd.branchType() == branchType_);
88  fillGroup(pd);
89  }
90  }
91 
92  void
93  Principal::ctor_read_provenance()
94  {
95  auto ppv = delayedReader_->readProvenance();
96  for (auto iter = ppv.begin(), end = ppv.end(); iter != end; ++iter) {
97  auto g = getGroupLocal(iter->productID());
98  if (g.get() == nullptr) {
99  continue;
100  }
101  if (iter->productStatus() != productstatus::unknown()) {
102  g->setProductProvenance(make_unique<ProductProvenance>(*iter));
103  } else {
104  // We have an old format file, convert.
105  g->setProductProvenance(make_unique<ProductProvenance>(
106  iter->productID(),
108  iter->parentage().parents()));
109  }
110  }
111  }
112 
113  void
114  Principal::ctor_fetch_process_history(ProcessHistoryID const& phid)
115  {
116  if (!phid.isValid()) {
117  return;
118  }
119  ProcessHistory processHistory;
120  ProcessHistoryRegistry::get(phid, processHistory);
121  std::swap(processHistory_, processHistory);
122  }
123 
124  Principal::Principal(BranchType branchType,
125  ProcessConfiguration const& pc,
126  cet::exempt_ptr<ProductTable const> presentProducts,
127  ProcessHistoryID const& hist,
128  std::unique_ptr<DelayedReader>&& reader)
129  : branchType_{branchType}
131  , delayedReader_{std::move(reader)}
132  {
133  processHistoryModified_ = false;
134  presentProducts_ = presentProducts.get();
135  producedProducts_ = nullptr;
137  delayedReader_->setPrincipal(this);
138  eventAux_ = nullptr;
139  subRunPrincipal_ = nullptr;
140  ctor_create_groups(presentProducts);
143  }
144 
145  // Run
147  ProcessConfiguration const& pc,
148  cet::exempt_ptr<ProductTable const> presentProducts,
149  std::unique_ptr<DelayedReader>&&
150  reader /* = std::make_unique<NoDelayedReader>() */)
151  : branchType_{InRun}
153  , delayedReader_{std::move(reader)}
154  , runAux_{aux}
155  {
156  processHistoryModified_ = false;
157  presentProducts_ = presentProducts.get();
158  producedProducts_ = nullptr;
160  delayedReader_->setPrincipal(this);
161  eventAux_ = nullptr;
162  subRunPrincipal_ = nullptr;
163  ctor_create_groups(presentProducts);
166  }
167 
168  // SubRun
170  ProcessConfiguration const& pc,
171  cet::exempt_ptr<ProductTable const> presentProducts,
172  std::unique_ptr<DelayedReader>&&
173  reader /* = std::make_unique<NoDelayedReader>() */)
176  , delayedReader_{std::move(reader)}
177  , subRunAux_{aux}
178  {
179  processHistoryModified_ = false;
180  presentProducts_ = presentProducts.get();
181  producedProducts_ = nullptr;
183  delayedReader_->setPrincipal(this);
184  eventAux_ = nullptr;
185  subRunPrincipal_ = nullptr;
186  ctor_create_groups(presentProducts);
189  }
190 
191  // Event
193  EventAuxiliary const& aux,
194  ProcessConfiguration const& pc,
195  cet::exempt_ptr<ProductTable const> presentProducts,
196  std::unique_ptr<History>&& history /* = std::make_unique<History>() */,
197  std::unique_ptr<DelayedReader>&&
198  reader /* = std::make_unique<NoDelayedReader>() */,
199  bool const lastInSubRun /* = false */)
202  , delayedReader_{std::move(reader)}
203  , history_{move(history)}
204  , lastInSubRun_{lastInSubRun}
205  {
206  processHistoryModified_ = false;
207  presentProducts_ = presentProducts.get();
208  producedProducts_ = nullptr;
210  delayedReader_->setPrincipal(this);
211  eventAux_ = new EventAuxiliary(aux);
212  subRunPrincipal_ = nullptr;
213  ctor_create_groups(presentProducts);
215  ctor_fetch_process_history(history_->processHistoryID());
216  }
217 
218  // Results
220  ProcessConfiguration const& pc,
221  cet::exempt_ptr<ProductTable const> presentProducts,
222  std::unique_ptr<DelayedReader>&&
223  reader /* = std::make_unique<NoDelayedReader>() */)
226  , delayedReader_{std::move(reader)}
227  , resultsAux_{aux}
228  {
229  processHistoryModified_ = false;
230  presentProducts_ = presentProducts.get();
231  producedProducts_ = nullptr;
233  delayedReader_->setPrincipal(this);
234  eventAux_ = nullptr;
235  subRunPrincipal_ = nullptr;
236  ctor_create_groups(presentProducts);
239  }
240 
241  void
243  {
244  std::lock_guard sentry{groupMutex_};
245  auto it = groups_.find(pd.productID());
246  if (it != std::cend(groups_)) {
247  // The 'combinable' call does not require that the processing
248  // history be the same, which is not what we are checking for here.
249  auto const& found_pd = it->second->productDescription();
250  if (combinable(found_pd, pd)) {
252  << "The process name " << pd.processName()
253  << " was previously used on these products.\n"
254  << "Please modify the configuration file to use a "
255  << "distinct process name.\n";
256  }
258  << "The product ID " << pd.productID() << " of the new product:\n"
259  << pd
260  << " collides with the product ID of the already-existing product:\n"
261  << found_pd
262  << "Please modify the instance name of the new product so as to avoid "
263  "the product ID collision.\n"
264  << "In addition, please notify artists@fnal.gov of this error.\n";
265  }
266 
267  unique_ptr<Group> group = create_group(delayedReader_.get(), pd);
268  groups_[pd.productID()] = move(group);
269  }
270 
271  void
273  {
274  if (branchType_ == InRun) {
276  } else if (branchType_ == InSubRun) {
278  } else if (branchType_ == InEvent) {
279  history_->setProcessHistoryID(phid);
280  } else {
282  }
283  }
284 
285  // FIXME: This breaks the purpose of the
286  // Principal::addToProcessHistory() compare_exchange_strong
287  // because of the temporal hole between when the history is
288  // changed and when the flag is set, this must be fixed!
289  void
291  {
293  }
294 
295  // Note: LArSoft uses this extensively to create a Ptr by hand.
296  EDProductGetter const*
298  {
299  auto g = getGroupTryAllFiles(pid);
300  if (g.get() != nullptr) {
301  // Note: All produced products should be found.
302  return g.get();
303  }
304  return nullptr;
305  }
306 
307  EDProductGetter const*
309  {
310  return productGetter(pid);
311  }
312 
313  void
315  ProductTables const& producedProducts)
316  {
317  auto const& produced = producedProducts.get(branchType_);
318  producedProducts_ = &produced;
319  if (produced.descriptions.empty()) {
320  return;
321  }
322  // The process history is expanded if there is a product that is
323  // produced in this process.
325  for (auto const& pr : produced.descriptions) {
326  auto const& pd = pr.second;
327  assert(pd.branchType() == branchType_);
328  // Create a group for the produced product.
329  fillGroup(pd);
330  }
331  }
332 
333  void
335  ProductTables const& /*producedProducts*/)
336  {
338  }
339 
340  void
342  {
343  // Read all data products and provenance immediately, if
344  // available. Used only by RootInputFile to implement the
345  // delayedRead*Products config options.
346  //
347  // Note: The input source lock will be held when this routine is called.
348  //
349  // MT-TODO: For right now ignore the delay reading option for
350  // product provenance. If we do the delay reading then we
351  // must use a lock to interlock all fetches of provenance
352  // because the delay read fills the pp_by_pid_ one entry
353  // at a time, and we do not want other threads to find
354  // the info only partly there.
355  std::lock_guard sentry{groupMutex_};
356  for (auto const& pid_and_group : groups_) {
357  auto group = pid_and_group.second.get();
358  group->resolveProductIfAvailable();
359  }
360  }
361 
362  ProcessHistory const&
364  {
365  // MT note: We make no attempt to protect callers who use this
366  // call to get access to the iteration interface of the
367  // process history. See the threading notes there and
368  // here for the reasons why.
369  return processHistory_;
370  }
371 
372  ProcessConfiguration const&
374  {
375  return processConfiguration_;
376  }
377 
378  size_t
380  {
381  std::lock_guard sentry{groupMutex_};
382  return groups_.size();
383  }
384 
387  {
388  std::lock_guard sentry{groupMutex_};
389  return groups_.begin();
390  }
391 
394  {
395  std::lock_guard sentry{groupMutex_};
396  return groups_.cbegin();
397  }
398 
401  {
402  std::lock_guard sentry{groupMutex_};
403  return groups_.end();
404  }
405 
408  {
409  std::lock_guard sentry{groupMutex_};
410  return groups_.cend();
411  }
412 
413  // This is intended to be used by a module that fetches a very large
414  // data product, makes a copy, and would like to release the memory
415  // held by the original immediately.
416  void
418  {
419  // MT-FIXME: May be called by a module task, need to protect the
420  // group with a lock.
421  if (auto g = getGroupLocal(pid)) {
422  g->removeCachedProduct();
423  return;
424  }
425  for (auto const& sp : secondaryPrincipals_) {
426  if (auto g = sp->getGroupLocal(pid)) {
427  g->removeCachedProduct();
428  return;
429  }
430  }
431  throw Exception(errors::ProductNotFound, "removeCachedProduct")
432  << "Attempt to remove unknown product corresponding to ProductID: " << pid
433  << '\n'
434  << "Please contact artists@fnal.gov\n";
435  }
436 
439  {
440  // Note: The input source lock will be held when this routine is called.
441  //
442  // MT-TODO: For right now ignore the delay reading option for
443  // product provenance. If we do the delay reading then we
444  // must use a lock to interlock all fetches of provenance
445  // because the delay read fills the pp_by_pid_ one entry
446  // at a time, and we do not want other threads to find
447  // the info only partly there.
449  auto g = getGroupLocal(pid);
450  if (g.get() != nullptr) {
451  ret = g->productProvenance();
452  }
453  return ret;
454  }
455 
456  // Note: threading: The solution chosen for the following
457  // problems is to convert groups_ from type:
458  //
459  // std::map<ProductID, std::unique_ptr<Group>>
460  //
461  // to type:
462  //
463  // tbb::concurrent_unordered_map<
464  // ProductID, std::unique_ptr<Group>>
465  //
466  // We get concurrent insertion and iteration, but not
467  // concurrent erasure (which is not a problem because we never
468  // remove groups). Note that tbb uses a value for the end()
469  // iterator for this class which is always valid for comparing
470  // against the result of an interation or find (it is
471  // implemented as (<element-ptr>(nullptr), <internal-table>)).
472  //
473  // Note: threading: May be called from producer and filter
474  // module processing tasks! This requires us to protect
475  // groups_ against multiple threads attempting an insert at
476  // the same time.
477  //
478  // Note: threading: Also anyone using the iterators over
479  // groups_ (which are Principal::begin() and Principal::end())
480  // need protection against having their interators invalidated.
481  // Right now the only code doing this is:
482  //
483  // Principal::readImmediate() const
484  // Principal::getGroupTryAllFiles(ProductID const& pid) const
485  // Principal::removeCachedProduct(ProductID const pid) const
486  // Principal::findGroupsForProcess(...) const
487  // OutputModule::updateBranchChildren()
488  //
489  // Principal::readImmediate() is called just after principal
490  // creation with the input lock held. Module tasks on other
491  // streams could be doing puts which would invalid the iterator
492  // if the data product being put does not exist in the input
493  // file and is being put for the first time so this is a
494  // group insertion.
495  //
496  // Principal::getGroupTryAllFiles(ProductID const& pid) const
497  // Used by Principal::getByProductID(ProductID const& pid) const
498  // Used by art::DataViewImpl<T>::get(ProductID const pid, Handle<T>&
499  // result) const. (easy user-facing api)
500  // Used by Principal::productGetter(ProductID const pid) const
501  // Used by (Run,SubRun,Event,Results)::productGetter (advanced
502  // user-facing api)
503  // Used by Principal::getForOutput(ProductID const pid, bool
504  // resolveProd) const
505  // Used by RootOutputFile to fetch products being written to disk.
506  // Used by FileDumperOutput_module.
507  // Used by ProvenanceCheckerOutput_module.
508  //
509  // These uses are find() and compare against end(). Problem is that
510  // end() may have moved by the time we do the compare with the find
511  // result. There is also use of the mpr and secondary files.
512  //
513  // Principal::removeCachedProduct(ProductID const pid) const
514  // Principal::findGroupsForProcess(...) const
515  //
516  // These uses are find() and compare against end(). Problem is that
517  // end() may have moved by the time we do the compare with the find
518  // result. There is also use of secondary principals.
519  //
520  // OutputModule::updateBranchChildren is called only for events
521  // and only after all module processing tasks which can put
522  // products into the event have finished running, so it does
523  // not need the protection.
524  //
525  // Used by the Run, SubRun, and EventPrincipal constructors
526  // if a product was produced.
527  //
528  // Used by RootOutput_module from write, writeSubRun, and
529  // writeRun if a branch was dropped by selectEvents processing
530  // for that kind of principal.
531  //
532  // Used by RootOutput_module from startEndFile if a branch was
533  // dropped by selectEvents processing for a results principal,
534  // or if a product was produced by a results principal (note
535  // that it has not actually gotten the chance to make the
536  // product, that happens right after this call is made).
537  //
538  // Note: threading: If the only uses were from the constructors
539  // we would have no problems, but the use from the root output
540  // module is bad because it could be running concurrently with
541  // other output modules and analyzers for this same principal.
542  // So we have to use a compare_exchange_strong on
543  // processHistoryModified_ so that only one task tries to do
544  // this. We also need to stall the other output and analyzer
545  // modules that call processHistory() while we are doing the
546  // update so that they get the updated result. For output and
547  // analyzer modules that have already fetched the process
548  // history pointer, we have to stall any attempt to access its
549  // internal process configuration list while we are updating it.
550  //
551  // FIXME: threading: We hand out processHistory_ through the
552  // processHistory() interface, which is in turn handed out by
553  // the DataViewImpl::processHistory() interface to any module
554  // task that wants it. This is a problem for output modules
555  // and analyzers if an output module decides to update the
556  // process history from startEndFile. We must stall users of
557  // the process history if we updating it, both by stalling a
558  // fetch of processHistory_ once we start to update, and for
559  // those modules that have already fetched the processHistory_
560  // we must stall any attempt by them to access its internal
561  // process configuration list while we are changing it.
562  void
564  {
565  bool expected = false;
566  if (processHistoryModified_.compare_exchange_strong(expected, true)) {
567  // MT note: We have now locked out any other task trying to
568  // modify the process history. Now we have to block
569  // tasks that already have a pointer to the process
570  // history from accessing its internals while we update
571  // it. We do not protect the iteration interface, the
572  // begin(), end(), and size() are all separate calls
573  // and we cannot lock in each one because there is no
574  // way to automatically unlock.
575  std::lock_guard sentry{processHistory_.get_mutex()};
576  string const& processName = processConfiguration_.processName();
577  for (auto const& val : processHistory_) {
578  if (processName == val.processName()) {
580  << "The process name " << processName
581  << " was previously used on these products.\n"
582  << "Please modify the configuration file to use a "
583  << "distinct process name.\n";
584  }
585  }
586  processHistory_.push_back(processConfiguration_);
587  // Optimization note: As of 0_9_0_pre3 For very simple Sources
588  // (e.g. EmptyEvent) this routine takes up nearly 50% of the
589  // time per event, and 96% of the time for this routine is spent
590  // in computing the ProcessHistory id which happens because we
591  // are reconstructing the ProcessHistory for each event. It
592  // would probably be better to move the ProcessHistory
593  // construction out to somewhere which persists for longer than
594  // one Event.
595  auto const phid = processHistory_.id();
596  ProcessHistoryRegistry::emplace(phid, processHistory_);
597  // MT note: We must protect processHistory_! The id() call can
598  // modify it! Note: threading: We are modifying Run,
599  // SubRun, Event, and Results principals here, and
600  // their *Auxiliary Note: threading: and the event
601  // principal art::History.
602  setProcessHistoryIDcombined(processHistory_.id());
603  }
604  }
605 
606  std::size_t
608  ModuleContext const& mc,
609  SelectorBase const& sel,
610  std::vector<cet::exempt_ptr<Group>>& groups) const
611  {
612  // Loop over processes in reverse time order. Sometimes we want
613  // to stop after we find a process with matches so check for that
614  // at each step.
615  std::size_t found{};
616  // MT note: We must protect the process history iterators here
617  // against possible invalidation by output modules
618  // inserting a process history entry while we are
619  // iterating.
620  std::lock_guard sentry{processHistory_.get_mutex()};
621  // We must skip over duplicate entries of the same process
622  // configuration in the process history. This unfortunately
623  // happened with the SamplingInput source.
624  for (auto const& h :
625  ranges::views::reverse(processHistory_) | ranges::views::unique) {
626  if (auto it = pl.find(h.processName()); it != pl.end()) {
627  found += findGroupsForProcess(it->second, mc, sel, groups);
628  }
629  }
630  return found;
631  }
632 
635  WrappedTypeID const& wrapped,
636  SelectorBase const& sel,
637  ProcessTag const& processTag) const
638  {
639  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
640  auto const result = resolve_unique_product(groups, wrapped);
641  if (!result.has_value()) {
642  auto whyFailed = std::make_shared<Exception>(errors::ProductNotFound);
643  *whyFailed << "Found zero products matching all selection criteria\n"
644  << indent << "C++ type: " << wrapped.product_type << "\n"
645  << sel.print(indent);
646  return GroupQueryResult{whyFailed};
647  }
648  return *result;
649  }
650 
653  WrappedTypeID const& wrapped,
654  string const& label,
655  string const& productInstanceName,
656  ProcessTag const& processTag) const
657  {
658  auto const& processName = processTag.name();
659  Selector const sel{ModuleLabelSelector{label} &&
660  ProductInstanceNameSelector{productInstanceName} &&
661  ProcessNameSelector{processName}};
662  return getBySelector(mc, wrapped, sel, processTag);
663  }
664 
665  std::vector<InputTag>
667  WrappedTypeID const& wrapped,
668  SelectorBase const& sel,
669  ProcessTag const& processTag) const
670  {
671  std::vector<InputTag> tags;
672  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
673  cet::transform_all(groups, back_inserter(tags), [](auto const g) {
674  return g->productDescription().inputTag();
675  });
676  return tags;
677  }
678 
679  std::vector<GroupQueryResult>
681  WrappedTypeID const& wrapped,
682  SelectorBase const& sel,
683  ProcessTag const& processTag) const
684  {
685  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
686  return resolve_products(groups, wrapped.wrapped_product_type);
687  }
688 
689  auto
691  {
692  return delayedReader_->readFromSecondaryFile(nextSecondaryFileIdx_);
693  }
694 
695  std::vector<cet::exempt_ptr<Group>>
697  SelectorBase const& selector,
698  ProcessTag const& processTag) const
699  {
700  std::vector<cet::exempt_ptr<Group>> groups;
701  // Find groups from current process
702  if (processTag.current_process_search_allowed() &&
704  if (findGroups(
705  producedProducts_.load()->viewLookup, mc, selector, groups) != 0) {
706  return groups;
707  }
708  }
709 
710  if (!processTag.input_source_search_allowed()) {
711  return groups;
712  }
713 
714  // Look through currently opened input files
715  if (groups.empty()) {
716  groups = matchingSequenceFromInputFile(mc, selector);
717  if (!groups.empty()) {
718  return groups;
719  }
720  for (auto const& sp : secondaryPrincipals_) {
721  groups = sp->matchingSequenceFromInputFile(mc, selector);
722  if (!groups.empty()) {
723  return groups;
724  }
725  }
726  }
727  // Open more secondary files if necessary
728  if (groups.empty()) {
729  while (auto sp = tryNextSecondaryFile()) {
730  auto& new_sp = secondaryPrincipals_.emplace_back(move(sp));
731  groups = new_sp->matchingSequenceFromInputFile(mc, selector);
732  if (!groups.empty()) {
733  return groups;
734  }
735  }
736  }
737  return groups;
738  }
739 
740  std::vector<cet::exempt_ptr<Group>>
742  SelectorBase const& selector) const
743  {
744  std::vector<cet::exempt_ptr<Group>> groups;
745  if (!presentProducts_.load()) {
746  return groups;
747  }
748  findGroups(presentProducts_.load()->viewLookup, mc, selector, groups);
749  return groups;
750  }
751 
752  std::size_t
754  ModuleContext const& mc,
755  WrappedTypeID const& wrapped,
756  SelectorBase const& selector,
757  std::vector<cet::exempt_ptr<Group>>& groups) const
758  {
759  if (!presentProducts_.load()) {
760  return 0;
761  }
762  auto const& lookup = presentProducts_.load()->productLookup;
763  auto it = lookup.find(wrapped.product_type.friendlyClassName());
764  if (it == lookup.end()) {
765  return 0;
766  }
767  return findGroups(it->second, mc, selector, groups);
768  }
769 
770  std::vector<cet::exempt_ptr<Group>>
772  WrappedTypeID const& wrapped,
773  SelectorBase const& selector,
774  ProcessTag const& processTag) const
775  {
776  std::vector<cet::exempt_ptr<Group>> results;
777  unsigned ret{};
778  // Find groups from current process
779  if (processTag.current_process_search_allowed() &&
781  auto const& lookup = producedProducts_.load()->productLookup;
782  auto it = lookup.find(wrapped.product_type.friendlyClassName());
783  if (it != lookup.end()) {
784  ret += findGroups(it->second, mc, selector, results);
785  }
786  }
787 
788  if (!processTag.input_source_search_allowed()) {
789  return results;
790  }
791 
792  // Look through currently opened input files
793  ret += findGroupsFromInputFile(mc, wrapped, selector, results);
794  if (ret) {
795  return results;
796  }
797  for (auto const& sp : secondaryPrincipals_) {
798  if (sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
799  return results;
800  }
801  }
802  // Open more secondary files if necessary
803  while (auto sp = tryNextSecondaryFile()) {
804  auto& new_sp = secondaryPrincipals_.emplace_back(move(sp));
805  if (new_sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
806  return results;
807  }
808  }
809  return results;
810  }
811 
812  std::size_t
814  std::vector<ProductID> const& vpid,
815  ModuleContext const& mc,
816  SelectorBase const& sel,
818  {
819  std::size_t found{}; // Horrible hack that should go away
820  for (auto const pid : vpid) {
821  auto group = getGroupLocal(pid);
822  if (!group) {
823  continue;
824  }
825  auto const& pd = group->productDescription();
826  // If we are processing a trigger path, the only visible
827  // produced products are those that originate from modules on
828  // the same path we're currently processing.
829  if (mc.onTriggerPath() && pd.produced() &&
830  !mc.onSamePathAs(pd.moduleLabel())) {
831  continue;
832  }
833  if (!sel.match(pd)) {
834  continue;
835  }
836  // Found a good match, save it.
837  res.emplace_back(group);
838  ++found;
839  }
840  return found;
841  }
842 
843  SubRunPrincipal const&
845  {
846  if (subRunPrincipal_.load() == nullptr) {
848  << "Tried to obtain a NULL subRunPrincipal.\n";
849  }
850  return *subRunPrincipal_.load();
851  }
852 
855  {
856  return runPrincipal_;
857  }
858 
859  SubRunPrincipal const*
861  {
862  return subRunPrincipal_.load();
863  }
864 
865  void
867  {
868  runPrincipal_ = rp;
869  }
870 
871  void
873  {
874  subRunPrincipal_ = srp.get();
875  }
876 
877  RangeSet
879  {
880  return rangeSet_;
881  }
882 
883  void
885  {
886  rangeSet_ = rs;
887  }
888 
889  bool
891  {
892  return eventAux_.load()->isRealData();
893  }
894 
897  {
898  return eventAux_.load()->experimentType();
899  }
900 
901  History const&
903  {
904  return *history_;
905  }
906 
909  {
910  return history_->eventSelectionIDs();
911  }
912 
913  RunPrincipal const&
915  {
916  if (!runPrincipal_) {
918  << "Tried to obtain a NULL runPrincipal.\n";
919  }
920  return *runPrincipal_;
921  }
922 
923  bool
925  {
926  return lastInSubRun_;
927  }
928 
929  void
931  unique_ptr<ProductProvenance const>&& pp,
932  unique_ptr<EDProduct>&& edp,
933  unique_ptr<RangeSet>&& rs)
934  {
935  assert(edp);
937  // Note: We intentionally allow group and provenance replacement
938  // for run and subrun products.
939  auto group = getGroupLocal(bd.productID());
940  assert(group);
941  group->setProductAndProvenance(move(pp), move(edp), move(rs));
942  } else {
943  auto group = getGroupLocal(bd.productID());
944  assert(group);
945  if (group->anyProduct() != nullptr) {
947  "Principal::put:")
948  << "Problem found during put of " << branchType_
949  << " product: product already put for " << bd.branchName() << '\n';
950  }
951  group->setProductAndProvenance(
952  move(pp), move(edp), make_unique<RangeSet>());
953  }
954  }
955 
956  // We invoke the delay reader now if no user module has ever fetched them
957  // for this principal if resolvedProd is true.
958  //
959  // Note: This attempts to resolve the product and converts the
960  // resulting group into an OutputHandle.
961  //
962  // MT note: Right now this is single-threaded. Be careful if this
963  // changes!!!
965  Principal::getForOutput(ProductID const& pid, bool const resolveProd) const
966  {
967  // MT-FIXME: Uses of group!
968  auto g = getGroupTryAllFiles(pid);
969  if (g.get() == nullptr) {
970  return OutputHandle::invalid();
971  }
972  if (resolveProd) {
973  if (!g->resolveProductIfAvailable()) {
974  // Behavior is the same as if the group wasn't there.
975  return OutputHandle::invalid();
976  }
977  if (g->anyProduct() == nullptr) {
978  return OutputHandle::invalid();
979  }
980  if (!g->anyProduct()->isPresent()) {
981  return OutputHandle::invalid();
982  }
983  }
984  if (!g->anyProduct() && !g->productProvenance()) {
985  return OutputHandle{g->rangeOfValidity()};
986  }
987  return OutputHandle{g->anyProduct(),
988  &g->productDescription(),
989  g->productProvenance(),
990  g->rangeOfValidity()};
991  }
992 
995  ProductID const pid,
996  bool const alwaysEnableLookupOfProducedProducts /*=false*/) const
997  {
998  // Find groups from current process
999  if (alwaysEnableLookupOfProducedProducts ||
1001  if (producedProducts_.load() != nullptr) {
1002  if (auto result = producedProducts_.load()->description(pid)) {
1003  return result;
1004  }
1005  }
1006  }
1007  if (presentProducts_.load()) {
1008  // Look through currently opened input files
1009  if (auto result = presentProducts_.load()->description(pid)) {
1010  return result;
1011  }
1012  }
1013  for (auto const& sp : secondaryPrincipals_) {
1014  if (auto result = sp->getProductDescription(pid)) {
1015  return result;
1016  }
1017  }
1018  return nullptr;
1019  }
1020 
1021  BranchType
1023  {
1024  return branchType_;
1025  }
1026 
1027  RunAuxiliary const&
1029  {
1030  return runAux_;
1031  }
1032 
1033  SubRunAuxiliary const&
1035  {
1036  return subRunAux_;
1037  }
1038 
1039  EventAuxiliary const&
1041  {
1042  return *eventAux_.load();
1043  }
1044 
1045  ResultsAuxiliary const&
1047  {
1048  return resultsAux_;
1049  }
1050 
1051  RunID const&
1053  {
1054  return runAux_.id();
1055  }
1056 
1057  SubRunID
1059  {
1060  return subRunAux_.id();
1061  }
1062 
1063  EventID const&
1065  {
1066  return eventAux_.load()->id();
1067  }
1068 
1069  RunNumber_t
1071  {
1072  if (branchType_ == InRun) {
1073  return runAux_.run();
1074  }
1075  if (branchType_ == InSubRun) {
1076  return subRunAux_.run();
1077  }
1078  return eventAux_.load()->id().run();
1079  }
1080 
1083  {
1084  if (branchType_ == InSubRun) {
1085  return subRunAux_.subRun();
1086  }
1087  return eventAux_.load()->id().subRun();
1088  }
1089 
1092  {
1093  return eventAux_.load()->id().event();
1094  }
1095 
1096  Timestamp const&
1098  {
1099  if (branchType_ == InRun) {
1100  return runAux_.beginTime();
1101  }
1102  return subRunAux_.beginTime();
1103  }
1104 
1105  Timestamp const&
1107  {
1108  if (branchType_ == InRun) {
1109  return runAux_.endTime();
1110  }
1111  return subRunAux_.endTime();
1112  }
1113 
1114  void
1116  {
1117  if (branchType_ == InRun) {
1118  runAux_.endTime(time);
1119  return;
1120  }
1121  subRunAux_.setEndTime(time);
1122  }
1123 
1124  Timestamp const&
1126  {
1127  return eventAux_.load()->time();
1128  }
1129 
1130  bool
1132  {
1133  if (!enableLookupOfProducedProducts_.load()) {
1134  return false;
1135  }
1136  auto pd = producedProducts_.load()->description(pid);
1137  return pd == nullptr ? false : pd->produced();
1138  }
1139 
1140  bool
1142  {
1143  if (!presentProducts_.load()) {
1144  return false;
1145  }
1146  auto pd = presentProducts_.load()->description(pid);
1147  return pd == nullptr ? false : pd->present();
1148  }
1149 
1152  {
1153  if (auto const g = getGroupTryAllFiles(pid)) {
1154  return GroupQueryResult{g};
1155  }
1156  auto whyFailed =
1157  make_shared<Exception>(errors::ProductNotFound, "InvalidID");
1158  *whyFailed << "Principal::getByProductID: no product with branch type: "
1159  << branchType_ << " product id: " << pid << '\n';
1160  return GroupQueryResult{whyFailed};
1161  }
1162 
1165  {
1166  std::lock_guard sentry{groupMutex_};
1167  auto it = groups_.find(pid);
1168  return it != groups_.cend() ? it->second.get() : nullptr;
1169  }
1170 
1173  {
1174  // Look through current process and currently opened primary input file.
1175  if (producedInProcess(pid) || presentFromSource(pid)) {
1176  return getGroupLocal(pid);
1177  }
1178  // Look through secondary files
1179  for (auto const& sp : secondaryPrincipals_) {
1180  if (sp->presentFromSource(pid)) {
1181  return sp->getGroupLocal(pid);
1182  }
1183  }
1184  // Try new secondary files
1185  while (auto sp = tryNextSecondaryFile()) {
1186  auto& new_sp = secondaryPrincipals_.emplace_back(move(sp));
1187  if (new_sp->presentFromSource(pid)) {
1188  return new_sp->getGroupLocal(pid);
1189  }
1190  }
1191  return nullptr;
1192  }
1193 
1194 } // namespace art
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
const_iterator cend() const
Definition: Principal.cc:407
cet::exempt_ptr< RunPrincipal const > runPrincipalExemptPtr() const
Definition: Principal.cc:854
std::atomic< bool > processHistoryModified_
Definition: Principal.h:298
size_t findGroups(ProcessLookup const &, ModuleContext const &, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &groups) const
Definition: Principal.cc:607
RunNumber_t run() const
Definition: Principal.cc:1070
EventID const & eventID() const
Definition: Principal.cc:1064
void setProcessHistoryID(ProcessHistoryID const &phid) const
EDProductGetter const * productGetter(ProductID const &pid) const
Definition: Principal.cc:297
std::map< std::string, std::vector< ProductID >> ProcessLookup
Definition: type_aliases.h:17
std::atomic< bool > enableLookupOfProducedProducts_
Definition: Principal.h:304
Principal(BranchType, ProcessConfiguration const &, cet::exempt_ptr< ProductTable const > presentProducts, ProcessHistoryID const &, std::unique_ptr< DelayedReader > &&)
Definition: Principal.cc:124
RunID const & runID() const
Definition: Principal.cc:1052
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
Definition: StdUtils.h:87
auto tryNextSecondaryFile() const
Definition: Principal.cc:690
static constexpr double g
Definition: Units.h:144
static QCString result
GroupCollection groups_
Definition: Principal.h:312
const_iterator end() const
Definition: Principal.cc:400
std::vector< InputTag > getInputTags(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:666
std::string friendlyClassName() const
Definition: TypeID.cc:61
const char expected[]
Definition: Exception_t.cc:22
Timestamp const & time() const
Definition: Principal.cc:1125
std::string string
Definition: nybbler.cc:12
ProcessHistory processHistory_
Definition: Principal.h:297
bool input_source_search_allowed() const
Definition: ProcessTag.cc:60
Timestamp const & beginTime() const
Definition: Principal.cc:1097
SubRunPrincipal const * subRunPrincipalPtr() const
Definition: Principal.cc:860
EventAuxiliary::ExperimentType ExperimentType() const
Definition: Principal.cc:896
struct vector vector
STL namespace.
std::recursive_mutex & get_mutex() const
auto & get(BranchType const bt)
Definition: ProductTables.h:49
History const & history() const
Definition: Principal.cc:902
constexpr pointer get() const noexcept
Definition: exempt_ptr.h:148
constexpr ProductStatus dummyToPreventDoubleCount() noexcept
Definition: ProductStatus.h:25
void addToProcessHistory()
Definition: Principal.cc:563
std::optional< GroupQueryResult > resolve_unique_product(std::vector< cet::exempt_ptr< art::Group >> const &product_groups, art::WrappedTypeID const &wrapped)
Definition: Group.cc:387
Timestamp const & beginTime() const noexcept
Definition: RunAuxiliary.cc:85
success_t id() const
ProcessHistoryID & processHistoryID() const
size_t size() const
Definition: Principal.cc:379
void setProcessHistoryID(ProcessHistoryID const &phid) const
std::vector< cet::exempt_ptr< Group > > matchingSequenceFromInputFile(ModuleContext const &, SelectorBase const &) const
Definition: Principal.cc:741
SubRunAuxiliary const & subRunAux() const
Definition: Principal.cc:1034
std::atomic< SubRunPrincipal const * > subRunPrincipal_
Definition: Principal.h:341
size_t findGroupsFromInputFile(ModuleContext const &, WrappedTypeID const &wrapped, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &results) const
Definition: Principal.cc:753
void updateSeenRanges(RangeSet const &rs)
Definition: Principal.cc:884
std::recursive_mutex groupMutex_
Definition: Principal.h:307
void readImmediate() const
Definition: Principal.cc:341
std::string const & processName() const noexcept
BranchType branchType_
Definition: Principal.h:296
bool isLastInSubRun() const
Definition: Principal.cc:924
std::vector< EventSelectionID > EventSelectionIDVector
EventNumber_t event() const
Definition: Principal.cc:1091
GroupQueryResult getBySelector(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:634
TypeID wrapped_product_type
Definition: WrappedTypeID.h:18
SubRunNumber_t subRun() const noexcept
void createGroupsForProducedProducts(ProductTables const &producedProducts)
Definition: Principal.cc:314
RangeSet rangeSet_
Definition: Principal.h:332
std::string print(std::string const &indent) const
Definition: SelectorBase.h:47
bool presentFromSource(ProductID) const
Definition: Principal.cc:1141
bool current_process_search_allowed() const
Definition: ProcessTag.cc:68
RangeSet seenRanges() const
Definition: Principal.cc:878
void removeCachedProduct(ProductID) const
Definition: Principal.cc:417
std::atomic< ProductTable const * > producedProducts_
Definition: Principal.h:303
std::unique_ptr< History > history_
Definition: Principal.h:342
OutputHandle getForOutput(ProductID const &, bool resolveProd) const
Definition: Principal.cc:965
cet::exempt_ptr< BranchDescription const > getProductDescription(ProductID const pid, bool const alwaysEnableLookupOfProducedProducts=false) const
Definition: Principal.cc:994
constexpr ProductStatus unknown() noexcept
Definition: ProductStatus.h:31
void swap(Handle< T > &a, Handle< T > &b)
def move(depos, offset)
Definition: depos.py:107
EDProductGetter const * getEDProductGetter_(ProductID const &) const override
Definition: Principal.cc:308
void setProcessHistoryIDcombined(ProcessHistoryID const &)
Definition: Principal.cc:272
SubRunID subRunID() const
Definition: Principal.cc:1058
void put(BranchDescription const &, std::unique_ptr< ProductProvenance const > &&, std::unique_ptr< EDProduct > &&, std::unique_ptr< RangeSet > &&)
Definition: Principal.cc:930
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
Timestamp const & endTime() const noexcept
bool lastInSubRun_
Definition: Principal.h:343
string name_of_template_arg(string const &template_instance, size_t desired_arg)
Definition: TypeID.cc:118
void setRunPrincipal(cet::exempt_ptr< RunPrincipal const > rp)
Definition: Principal.cc:866
RunNumber_t run() const noexcept
bool isValid() const
Definition: Hash.h:123
void enableLookupOfProducedProducts(ProductTables const &producedProducts)
Definition: Principal.cc:334
std::vector< cet::exempt_ptr< Group > > getMatchingSequence(ModuleContext const &, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:696
auto transform_all(Container &, OutputIt, UnaryOp)
void setProcessHistoryID(ProcessHistoryID const &) const
Definition: RunAuxiliary.cc:97
std::atomic< ProductTable const * > presentProducts_
Definition: Principal.h:302
void setSubRunPrincipal(cet::exempt_ptr< SubRunPrincipal const > srp)
Definition: Principal.cc:872
void ctor_fetch_process_history(ProcessHistoryID const &)
Definition: Principal.cc:114
std::vector< cet::exempt_ptr< Group > > findGroupsForProduct(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:771
RunAuxiliary const & runAux() const
Definition: Principal.cc:1028
void ctor_create_groups(cet::exempt_ptr< ProductTable const >)
Definition: Principal.cc:72
bool combinable(BranchDescription const &a, BranchDescription const &b)
ProcessHistory const & processHistory() const
Definition: Principal.cc:363
cet::exempt_ptr< ProductProvenance const > branchToProductProvenance(ProductID const &) const
Definition: Principal.cc:438
ProcessConfiguration const & processConfiguration() const
Definition: Principal.cc:373
RunNumber_t run() const noexcept
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
bool onSamePathAs(std::string const &module_label) const
Definition: ModuleContext.h:64
SubRunID const & id() const noexcept
auto const & name() const
Definition: ProcessTag.h:23
RunPrincipal const & runPrincipal() const
Definition: Principal.cc:914
static unsigned int reverse(QString &chars, unsigned char *level, unsigned int a, unsigned int b)
Definition: qstring.cpp:11649
int nextSecondaryFileIdx_
Definition: Principal.h:330
ResultsAuxiliary const & resultsAux() const
Definition: Principal.cc:1046
bool isReal() const
Definition: Principal.cc:890
Timestamp const & endTime() const noexcept
Definition: RunAuxiliary.cc:91
ProcessConfiguration const & processConfiguration_
Definition: Principal.h:299
std::atomic< EventAuxiliary * > eventAux_
Definition: Principal.h:337
void ctor_read_provenance()
Definition: Principal.cc:93
ProcessHistoryID & processHistoryID() const noexcept
Definition: RunAuxiliary.cc:67
EventAuxiliary const & eventAux() const
Definition: Principal.cc:1040
cet::exempt_ptr< Group > getGroupLocal(ProductID const) const
Definition: Principal.cc:1164
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
BranchType
Definition: BranchType.h:20
std::vector< GroupQueryResult > getMany(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:680
ProcessHistoryID const & processHistoryID() const noexcept
bool onTriggerPath() const
Definition: ModuleContext.h:58
static OutputHandle invalid()
Definition: OutputHandle.h:44
void fillGroup(BranchDescription const &)
Definition: Principal.cc:242
RunAuxiliary runAux_
Definition: Principal.h:333
void setEndTime(Timestamp const &time)
bool match(BranchDescription const &p) const
Definition: SelectorBase.h:41
SubRunPrincipal const & subRunPrincipal() const
Definition: Principal.cc:844
GroupQueryResult getByLabel(ModuleContext const &mc, WrappedTypeID const &wrapped, std::string const &label, std::string const &productInstanceName, ProcessTag const &processTag) const
Definition: Principal.cc:652
std::unique_ptr< DelayedReader > delayedReader_
Definition: Principal.h:316
void markProcessHistoryAsModified()
Definition: Principal.cc:290
ResultsAuxiliary resultsAux_
Definition: Principal.h:339
std::string const & processName() const noexcept
GroupQueryResult getByProductID(ProductID const pid) const
Definition: Principal.cc:1151
auto const & get(AssnsNode< L, R, D > const &r)
Definition: AssnsNode.h:115
std::vector< std::unique_ptr< Principal > > secondaryPrincipals_
Definition: Principal.h:326
static auto emplace(value_type const &value)
GroupCollection::const_iterator const_iterator
Definition: Principal.h:57
std::string const & branchName() const noexcept
constexpr bool range_sets_supported(BranchType const bt)
SubRunAuxiliary subRunAux_
Definition: Principal.h:334
Timestamp const & endTime() const
Definition: Principal.cc:1106
static QCString * s
Definition: config.cpp:1042
bool producedInProcess(ProductID) const
Definition: Principal.cc:1131
ProductID productID() const noexcept
const_iterator cbegin() const
Definition: Principal.cc:393
cet::exempt_ptr< Group const > getGroupTryAllFiles(ProductID const) const
Definition: Principal.cc:1172
size_t findGroupsForProcess(std::vector< ProductID > const &vpid, ModuleContext const &mc, SelectorBase const &selector, std::vector< cet::exempt_ptr< Group >> &groups) const
Definition: Principal.cc:813
bool is_assns(std::string const &type_name)
Definition: TypeID.h:66
Timestamp const & beginTime() const noexcept
RunID const & id() const noexcept
Definition: RunAuxiliary.cc:79
SubRunNumber_t subRun() const
Definition: Principal.cc:1082
const_iterator begin() const
Definition: Principal.cc:386
BranchType branchType() const
Definition: Principal.cc:1022
cet::exempt_ptr< RunPrincipal const > runPrincipal_
Definition: Principal.h:340
EventSelectionIDVector const & eventSelectionIDs() const
Definition: Principal.cc:908
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:120
std::vector< GroupQueryResult > resolve_products(std::vector< cet::exempt_ptr< art::Group >> const &groups, art::TypeID const &wrapped_type)
Definition: Group.cc:428