34 #include "range/v3/view.hpp" 55 create_group(DelayedReader* reader, BranchDescription
const& bd)
57 auto const& class_name = bd.producedClassName();
58 auto gt = Group::grouptype::normal;
61 gt = Group::grouptype::assns;
63 gt = Group::grouptype::assnsWithData;
66 return make_unique<Group>(reader, bd, make_unique<RangeSet>(),
gt);
72 Principal::ctor_create_groups(
75 if (!presentProducts) {
85 for (
auto const& pr : presentProducts->descriptions) {
86 auto const& pd = pr.second;
87 assert(pd.branchType() == branchType_);
93 Principal::ctor_read_provenance()
95 auto ppv = delayedReader_->readProvenance();
96 for (
auto iter = ppv.begin(),
end = ppv.end(); iter !=
end; ++iter) {
97 auto g = getGroupLocal(iter->productID());
98 if (
g.get() ==
nullptr) {
102 g->setProductProvenance(make_unique<ProductProvenance>(*iter));
105 g->setProductProvenance(make_unique<ProductProvenance>(
108 iter->parentage().parents()));
121 std::swap(processHistory_, processHistory);
128 std::unique_ptr<DelayedReader>&& reader)
129 : branchType_{branchType}
149 std::unique_ptr<DelayedReader>&&
172 std::unique_ptr<DelayedReader>&&
196 std::unique_ptr<History>&&
history ,
197 std::unique_ptr<DelayedReader>&&
199 bool const lastInSubRun )
222 std::unique_ptr<DelayedReader>&&
249 auto const& found_pd = it->second->productDescription();
253 <<
" was previously used on these products.\n" 254 <<
"Please modify the configuration file to use a " 255 <<
"distinct process name.\n";
258 <<
"The product ID " << pd.
productID() <<
" of the new product:\n" 260 <<
" collides with the product ID of the already-existing product:\n" 262 <<
"Please modify the instance name of the new product so as to avoid " 263 "the product ID collision.\n" 264 <<
"In addition, please notify artists@fnal.gov of this error.\n";
279 history_->setProcessHistoryID(phid);
300 if (
g.get() !=
nullptr) {
319 if (produced.descriptions.empty()) {
325 for (
auto const& pr : produced.descriptions) {
326 auto const& pd = pr.second;
356 for (
auto const& pid_and_group :
groups_) {
357 auto group = pid_and_group.second.get();
358 group->resolveProductIfAvailable();
422 g->removeCachedProduct();
426 if (
auto g = sp->getGroupLocal(pid)) {
427 g->removeCachedProduct();
432 <<
"Attempt to remove unknown product corresponding to ProductID: " << pid
434 <<
"Please contact artists@fnal.gov\n";
450 if (
g.get() !=
nullptr) {
451 ret =
g->productProvenance();
578 if (processName ==
val.processName()) {
580 <<
"The process name " << processName
581 <<
" was previously used on these products.\n" 582 <<
"Please modify the configuration file to use a " 583 <<
"distinct process name.\n";
595 auto const phid = processHistory_.
id();
626 if (
auto it = pl.find(
h.processName()); it != pl.end()) {
641 if (!
result.has_value()) {
643 *whyFailed <<
"Found zero products matching all selection criteria\n" 655 string const& productInstanceName,
658 auto const& processName = processTag.
name();
665 std::vector<InputTag>
671 std::vector<InputTag> tags;
674 return g->productDescription().inputTag();
679 std::vector<GroupQueryResult>
695 std::vector<cet::exempt_ptr<Group>>
700 std::vector<cet::exempt_ptr<Group>> groups;
715 if (groups.empty()) {
717 if (!groups.empty()) {
721 groups = sp->matchingSequenceFromInputFile(mc, selector);
722 if (!groups.empty()) {
728 if (groups.empty()) {
731 groups = new_sp->matchingSequenceFromInputFile(mc, selector);
732 if (!groups.empty()) {
740 std::vector<cet::exempt_ptr<Group>>
744 std::vector<cet::exempt_ptr<Group>> groups;
764 if (it == lookup.end()) {
767 return findGroups(it->second, mc, selector, groups);
770 std::vector<cet::exempt_ptr<Group>>
776 std::vector<cet::exempt_ptr<Group>> results;
783 if (it != lookup.end()) {
784 ret +=
findGroups(it->second, mc, selector, results);
798 if (sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
804 auto& new_sp = secondaryPrincipals_.emplace_back(
move(sp));
805 if (new_sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
814 std::vector<ProductID>
const& vpid,
820 for (
auto const pid : vpid) {
825 auto const& pd = group->productDescription();
833 if (!sel.
match(pd)) {
837 res.emplace_back(group);
848 <<
"Tried to obtain a NULL subRunPrincipal.\n";
898 return eventAux_.load()->experimentType();
910 return history_->eventSelectionIDs();
918 <<
"Tried to obtain a NULL runPrincipal.\n";
931 unique_ptr<ProductProvenance const>&& pp,
932 unique_ptr<EDProduct>&& edp,
933 unique_ptr<RangeSet>&&
rs)
945 if (group->anyProduct() !=
nullptr) {
949 <<
" product: product already put for " << bd.
branchName() <<
'\n';
951 group->setProductAndProvenance(
952 move(pp),
move(edp), make_unique<RangeSet>());
969 if (
g.get() ==
nullptr) {
973 if (!
g->resolveProductIfAvailable()) {
977 if (
g->anyProduct() ==
nullptr) {
980 if (!
g->anyProduct()->isPresent()) {
984 if (!
g->anyProduct() && !
g->productProvenance()) {
988 &
g->productDescription(),
989 g->productProvenance(),
990 g->rangeOfValidity()};
996 bool const alwaysEnableLookupOfProducedProducts )
const 999 if (alwaysEnableLookupOfProducedProducts ||
1014 if (
auto result = sp->getProductDescription(pid)) {
1137 return pd ==
nullptr ?
false : pd->produced();
1147 return pd ==
nullptr ?
false : pd->present();
1158 *whyFailed <<
"Principal::getByProductID: no product with branch type: " 1168 return it !=
groups_.cend() ? it->second.get() :
nullptr;
1180 if (sp->presentFromSource(pid)) {
1181 return sp->getGroupLocal(pid);
1186 auto& new_sp = secondaryPrincipals_.emplace_back(
move(sp));
1187 if (new_sp->presentFromSource(pid)) {
1188 return new_sp->getGroupLocal(pid);
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
const_iterator cend() const
cet::exempt_ptr< RunPrincipal const > runPrincipalExemptPtr() const
std::atomic< bool > processHistoryModified_
size_t findGroups(ProcessLookup const &, ModuleContext const &, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &groups) const
EventID const & eventID() const
void setProcessHistoryID(ProcessHistoryID const &phid) const
EDProductGetter const * productGetter(ProductID const &pid) const
std::map< std::string, std::vector< ProductID >> ProcessLookup
std::atomic< bool > enableLookupOfProducedProducts_
Principal(BranchType, ProcessConfiguration const &, cet::exempt_ptr< ProductTable const > presentProducts, ProcessHistoryID const &, std::unique_ptr< DelayedReader > &&)
RunID const & runID() const
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
auto tryNextSecondaryFile() const
static constexpr double g
const_iterator end() const
std::vector< InputTag > getInputTags(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
std::string friendlyClassName() const
Timestamp const & time() const
ProcessHistory processHistory_
bool input_source_search_allowed() const
Timestamp const & beginTime() const
SubRunPrincipal const * subRunPrincipalPtr() const
EventAuxiliary::ExperimentType ExperimentType() const
std::recursive_mutex & get_mutex() const
auto & get(BranchType const bt)
History const & history() const
constexpr pointer get() const noexcept
constexpr ProductStatus dummyToPreventDoubleCount() noexcept
void addToProcessHistory()
std::optional< GroupQueryResult > resolve_unique_product(std::vector< cet::exempt_ptr< art::Group >> const &product_groups, art::WrappedTypeID const &wrapped)
Timestamp const & beginTime() const noexcept
ProcessHistoryID & processHistoryID() const
void setProcessHistoryID(ProcessHistoryID const &phid) const
std::vector< cet::exempt_ptr< Group > > matchingSequenceFromInputFile(ModuleContext const &, SelectorBase const &) const
SubRunAuxiliary const & subRunAux() const
std::atomic< SubRunPrincipal const * > subRunPrincipal_
size_t findGroupsFromInputFile(ModuleContext const &, WrappedTypeID const &wrapped, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &results) const
void updateSeenRanges(RangeSet const &rs)
std::recursive_mutex groupMutex_
void readImmediate() const
std::string const & processName() const noexcept
bool isLastInSubRun() const
std::vector< EventSelectionID > EventSelectionIDVector
EventNumber_t event() const
GroupQueryResult getBySelector(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
TypeID wrapped_product_type
SubRunNumber_t subRun() const noexcept
void createGroupsForProducedProducts(ProductTables const &producedProducts)
std::string print(std::string const &indent) const
bool presentFromSource(ProductID) const
bool current_process_search_allowed() const
RangeSet seenRanges() const
void removeCachedProduct(ProductID) const
std::atomic< ProductTable const * > producedProducts_
std::unique_ptr< History > history_
OutputHandle getForOutput(ProductID const &, bool resolveProd) const
cet::exempt_ptr< BranchDescription const > getProductDescription(ProductID const pid, bool const alwaysEnableLookupOfProducedProducts=false) const
constexpr ProductStatus unknown() noexcept
void swap(Handle< T > &a, Handle< T > &b)
EDProductGetter const * getEDProductGetter_(ProductID const &) const override
void setProcessHistoryIDcombined(ProcessHistoryID const &)
SubRunID subRunID() const
void put(BranchDescription const &, std::unique_ptr< ProductProvenance const > &&, std::unique_ptr< EDProduct > &&, std::unique_ptr< RangeSet > &&)
IDNumber_t< Level::SubRun > SubRunNumber_t
Timestamp const & endTime() const noexcept
string name_of_template_arg(string const &template_instance, size_t desired_arg)
void setRunPrincipal(cet::exempt_ptr< RunPrincipal const > rp)
RunNumber_t run() const noexcept
void enableLookupOfProducedProducts(ProductTables const &producedProducts)
std::vector< cet::exempt_ptr< Group > > getMatchingSequence(ModuleContext const &, SelectorBase const &, ProcessTag const &) const
auto transform_all(Container &, OutputIt, UnaryOp)
void setProcessHistoryID(ProcessHistoryID const &) const
std::atomic< ProductTable const * > presentProducts_
void setSubRunPrincipal(cet::exempt_ptr< SubRunPrincipal const > srp)
void ctor_fetch_process_history(ProcessHistoryID const &)
std::vector< cet::exempt_ptr< Group > > findGroupsForProduct(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
RunAuxiliary const & runAux() const
void ctor_create_groups(cet::exempt_ptr< ProductTable const >)
bool combinable(BranchDescription const &a, BranchDescription const &b)
ProcessHistory const & processHistory() const
cet::exempt_ptr< ProductProvenance const > branchToProductProvenance(ProductID const &) const
ProcessConfiguration const & processConfiguration() const
RunNumber_t run() const noexcept
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
bool onSamePathAs(std::string const &module_label) const
SubRunID const & id() const noexcept
auto const & name() const
RunPrincipal const & runPrincipal() const
static unsigned int reverse(QString &chars, unsigned char *level, unsigned int a, unsigned int b)
int nextSecondaryFileIdx_
ResultsAuxiliary const & resultsAux() const
Timestamp const & endTime() const noexcept
ProcessConfiguration const & processConfiguration_
std::atomic< EventAuxiliary * > eventAux_
void ctor_read_provenance()
ProcessHistoryID & processHistoryID() const noexcept
EventAuxiliary const & eventAux() const
cet::exempt_ptr< Group > getGroupLocal(ProductID const) const
IDNumber_t< Level::Event > EventNumber_t
std::vector< GroupQueryResult > getMany(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
ProcessHistoryID const & processHistoryID() const noexcept
bool onTriggerPath() const
static OutputHandle invalid()
void fillGroup(BranchDescription const &)
void setEndTime(Timestamp const &time)
bool match(BranchDescription const &p) const
SubRunPrincipal const & subRunPrincipal() const
GroupQueryResult getByLabel(ModuleContext const &mc, WrappedTypeID const &wrapped, std::string const &label, std::string const &productInstanceName, ProcessTag const &processTag) const
std::unique_ptr< DelayedReader > delayedReader_
void markProcessHistoryAsModified()
ResultsAuxiliary resultsAux_
std::string const & processName() const noexcept
GroupQueryResult getByProductID(ProductID const pid) const
auto const & get(AssnsNode< L, R, D > const &r)
std::vector< std::unique_ptr< Principal > > secondaryPrincipals_
static auto emplace(value_type const &value)
GroupCollection::const_iterator const_iterator
std::string const & branchName() const noexcept
constexpr bool range_sets_supported(BranchType const bt)
SubRunAuxiliary subRunAux_
Timestamp const & endTime() const
bool producedInProcess(ProductID) const
ProductID productID() const noexcept
const_iterator cbegin() const
cet::exempt_ptr< Group const > getGroupTryAllFiles(ProductID const) const
size_t findGroupsForProcess(std::vector< ProductID > const &vpid, ModuleContext const &mc, SelectorBase const &selector, std::vector< cet::exempt_ptr< Group >> &groups) const
bool is_assns(std::string const &type_name)
Timestamp const & beginTime() const noexcept
RunID const & id() const noexcept
SubRunNumber_t subRun() const
const_iterator begin() const
BranchType branchType() const
cet::exempt_ptr< RunPrincipal const > runPrincipal_
EventSelectionIDVector const & eventSelectionIDs() const
IDNumber_t< Level::Run > RunNumber_t
std::vector< GroupQueryResult > resolve_products(std::vector< cet::exempt_ptr< art::Group >> const &groups, art::TypeID const &wrapped_type)