57 std::vector<std::string>
58 sorted_module_labels(std::vector<WorkerInPath::ConfigInfo>
const& wcis)
60 std::vector<std::string> sorted_modules;
62 wcis, back_inserter(sorted_modules), [](
auto const& wci) {
63 return wci.moduleConfigInfo->modDescription.moduleLabel();
65 std::sort(
begin(sorted_modules),
end(sorted_modules));
66 return sorted_modules;
76 : outputCallbacks_{outputCallbacks}
89 auto const& trigger_path_specs = enabled_modules.trigger_path_specs();
91 std::set<std::string> recorded_path_name;
92 for (
auto const& [
path_spec, entries] : trigger_path_specs) {
103 worker_config_infos.emplace_back(mci_p,
action);
119 auto const& end_paths = enabled_modules.end_paths();
121 for (
auto const& [
path_spec, entries] : end_paths) {
136 if (
size(end_paths) > 1u) {
138 <<
"Multiple end paths have been combined into one end path,\n" 139 <<
"\"end_path\" since order is irrelevant.";
143 std::vector<PathSpec>
146 std::vector<PathSpec>
result;
149 result.push_back(pr.first);
154 std::vector<std::string>
157 std::vector<std::string>
result;
160 result.push_back(pr.first.name);
165 std::vector<std::string>
168 std::vector<std::string>
result;
180 std::vector<std::string>
const& producing_services)
184 auto const nschedules =
207 for (
auto const& [
path_spec, worker_config_infos] :
211 sc,
path_spec, sorted_module_labels(worker_config_infos)};
240 auto const graph_info_collection =
243 auto const module_graph =
245 auto const graph_filename =
246 procPS_.
get<
string>(
"services.scheduler.dataDependencyGraph", {});
247 if (!graph_filename.empty()) {
250 cerr <<
"Generated data-dependency graph file: " << graph_filename
253 auto const&
err = module_graph.second;
288 std::map<std::string, detail::ModuleConfigInfo>
292 std::map<std::string, detail::ModuleConfigInfo>
result{};
294 for (
auto const& [module_label, key_and_type] : enabled_modules.
modules()) {
298 auto const lib_spec = module_pset.
get<
string>(
"module_type");
301 es <<
" ERROR: Module with label " << module_label <<
" of type " 303 <<
" but defined in code as a " <<
to_string(actual_mod_type)
319 es <<
" ERROR: Configuration of module with label " << module_label
320 <<
" encountered the following error:\n" 324 if (
auto err_msg = es.str(); not
empty(err_msg)) {
326 <<
"The following were encountered while processing the module " 337 vector<string> configErrMsgs;
338 for (
auto const& [module_label, mci] :
allModules_) {
339 auto const& modPS = mci.modPS;
340 auto const&
md = mci.modDescription;
342 auto const module_threading_type =
md.moduleThreadingType();
349 if (
auto err_msg = get_if<std::string>(&mod)) {
350 configErrMsgs.push_back(*err_msg);
354 assert(std::holds_alternative<ModuleBase*>(mod));
355 auto module = std::get<ModuleBase*>(mod);
359 modules.shared.emplace(module_label,
360 std::shared_ptr<ModuleBase>{
module});
364 replicated_modules[sid].reset(
module);
368 auto fill_replicated_module = [&,
this](
ScheduleID const sid) {
370 if (
auto mod_ptr = get_if<ModuleBase*>(&repl_mod)) {
371 replicated_modules[sid].reset(*mod_ptr);
374 schedule_iteration.for_each_schedule(fill_replicated_module);
375 modules.replicated.emplace(module_label, replicated_modules);
387 module->getConsumables());
390 if (!configErrMsgs.empty()) {
394 << rule(
'=') <<
"\n\n" 395 <<
"!! The following modules have been misconfigured: !!" 397 for (
auto const&
err : configErrMsgs) {
398 msg <<
"\n" << rule(
'-') <<
"\n" <<
err;
400 msg <<
"\n" << rule(
'=') <<
"\n\n";
421 if (module_factory_func ==
nullptr) {
424 <<
" has internal symbol definition problems: consult an " 428 mod->setModuleDescription(md);
444 vector<WorkerInPath::ConfigInfo>
const& wci_list,
446 map<
string, std::shared_ptr<Worker>>&
workers,
452 vector<WorkerInPath> wips;
453 for (
auto const& wci : wci_list) {
454 auto const& mci = *wci.moduleConfigInfo;
455 auto const filterAction = wci.filterAction;
456 auto const& module_label = mci.modDescription.moduleLabel();
458 auto const&
md = mci.modDescription;
459 std::shared_ptr<Worker> worker{
nullptr};
464 <<
"Reusing worker " <<
hex << it->second <<
dec 465 <<
" path: " <<
to_string(
pi) <<
" type: " <<
md.moduleName()
466 <<
" label: " << module_label;
476 worker =
makeWorker_(modules, mci.modDescription, wp);
477 TDEBUG(5) <<
"Made worker " <<
hex << worker <<
dec <<
" (" << sid
478 <<
") path: " <<
to_string(
pi) <<
" type: " <<
md.moduleName()
479 <<
" label: " << module_label <<
"\n";
483 workers.emplace(module_label, worker);
492 std::shared_ptr<Worker>
503 return modules.
shared.at(module_label);
505 return modules.
replicated.at(module_label)[sid];
511 "make_worker_from_module",
512 worker_from_module_factory_func);
518 if (worker_from_module_factory_func ==
nullptr) {
520 <<
"Module " << md.
moduleName() <<
" with version " 522 <<
" has internal symbol definition problems: consult an expert.";
527 return std::shared_ptr<Worker>{
528 worker_from_module_factory_func(
module, md, wp)};
542 if (mod_type_func ==
nullptr) {
545 <<
" has internal symbol definition problems: consult an expert.";
547 return mod_type_func();
556 lib_spec,
"moduleThreadingType", mod_threading_type_func);
562 if (mod_threading_type_func ==
nullptr) {
565 <<
" has internal symbol definition problems: consult an expert.";
567 return mod_threading_type_func();
575 std::vector<std::string>
const& producing_services)
578 auto& source_info =
result[
"input_source"];
580 set<string> path_names;
582 path_names.insert(pr.first.name);
584 source_info.paths = path_names;
587 source_info.paths = {
"end_path"};
591 std::map<std::string, std::set<ProductInfo>> produced_products_per_module;
592 std::map<std::string, std::set<std::string>> viewable_products_per_module;
594 auto const& module_name = pd.moduleLabel();
595 produced_products_per_module[module_name].emplace(
597 pd.friendlyClassName(),
599 pd.productInstanceName(),
600 ProcessTag{pd.processName(), pd.processName()});
601 if (pd.supportsView()) {
605 viewable_products_per_module[module_name].insert(
606 pd.productInstanceName());
611 for (
auto const& service_name : producing_services) {
612 auto& graph_info =
result[service_name];
615 auto found = produced_products_per_module.find(service_name);
616 if (
found ==
cend(produced_products_per_module)) {
620 graph_info.produced_products =
found->second;
626 produced_products_per_module,
627 viewable_products_per_module,
632 produced_products_per_module,
633 viewable_products_per_module,
641 string const& path_name,
643 std::map<
std::string, std::set<ProductInfo>>
const& produced_products,
644 std::map<
std::string, std::set<std::string>>
const& viewable_products,
647 auto const worker_config_begin =
cbegin(worker_configs);
649 for (
auto it = worker_config_begin,
end =
cend(worker_configs); it !=
end;
651 auto const& mci = *it->moduleConfigInfo;
652 auto const& module_name = mci.modDescription.moduleLabel();
653 auto& graph_info = info_collection[module_name];
654 graph_info.paths.insert(path_name);
655 graph_info.module_type = mci.moduleType;
657 auto found = produced_products.find(module_name);
659 graph_info.produced_products =
found->second;
662 auto const& consumables =
664 graph_info.consumed_products =
680 string const allowed_path_spec{R
"([\w\*\?]+)"}; 681 regex const regex{
"(\\w+:)?(!|exception@)?(" + allowed_path_spec +
689 for (
auto const& worker_config : worker_configs) {
690 auto const& mci = *worker_config.moduleConfigInfo;
691 auto const& module_name = mci.modDescription.moduleLabel();
692 auto const&
ps = mci.modPS;
693 auto& graph_info = info_collection[module_name];
695 auto path_specs =
ps.get<vector<string>>(
"SelectEvents", {});
709 assert(matches.size() == 5);
710 graph_info.select_events.insert(matches[3]);
consumables_t::mapped_type const & consumables(std::string const &module_label) const
std::map< module_label_t, PerScheduleContainer< std::shared_ptr< ModuleBase > > > replicated
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
void collectConsumes(std::string const &module_label, consumables_t::mapped_type const &consumables)
ProductDescriptions & productsToProduce_
void setProcessName(std::string const &)
static std::string end_path()
static ParameterSetID const & put(ParameterSet const &ps)
ModuleBase *(fhicl::ParameterSet const &, ProcessingFrame const &) ModuleMaker_t
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
keytype_for_name_t const & modules() const noexcept
std::shared_ptr< Worker > makeWorker_(ModulesByThreadingType const &modules, ModuleDescription const &md, WorkerParams const &wp)
static ConsumesInfo * instance()
void msg(const char *fmt,...)
void fillModuleOnlyDeps_(std::string const &path_name, detail::configs_t const &worker_configs, std::map< std::string, std::set< ProductInfo >> const &produced_products, std::map< std::string, std::set< std::string >> const &viewable_products, detail::collection_map_t &info_collection) const
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleConstruction
std::string const & moduleLabel() const
std::map< module_name_t, ModuleGraphInfo > collection_map_t
std::vector< std::string > triggerPathNames_() const
static constexpr ScheduleID first()
art::detail::paths_to_modules_t protoTrigPathLabels_
void setTriggerPSet(fhicl::ParameterSet const &)
std::vector< BranchDescription > ProductDescriptions
ModuleType module_type(std::string const &full_key)
QTextStream & hex(QTextStream &s)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleConstruction
maybe_module_t makeModule_(fhicl::ParameterSet const &module_pset, ModuleDescription const &md, ScheduleID) const
decltype(auto) constexpr size(T &&obj)
ADL-aware version of std::size.
T getSymbolByLibspec(std::string const &libspec, std::string const &sym_name) const
std::string const & moduleName() const
PerScheduleContainer< PathsInfo > triggerPathsInfo_
std::vector< WorkerInPath::ConfigInfo > configs_t
std::vector< PathSpec > path_specs(std::vector< std::string > const &path_spec_strs)
std::variant< ModuleBase *, std::string > maybe_module_t
ScheduleID::size_type nschedules() const
std::vector< std::string > prependedTriggerPathNames_() const
T get(std::string const &key) const
constexpr exempt_ptr< E > make_exempt_ptr(E *) noexcept
std::string const & getReleaseVersion()
PerScheduleContainer< PathsInfo > endPathInfo_
std::string bold_fontify(std::string const &s)
ModuleThreadingType loadModuleThreadingType_(std::string const &lib_spec) const
std::map< std::string, detail::ModuleConfigInfo > allModules_
#define TDEBUG_FUNC_SI(LEVEL, SI)
void print_module_graph(std::ostream &os, ModuleGraphInfoMap const &modInfos, ModuleGraph const &graph)
ModulesByThreadingType makeModules_(ScheduleID::size_type n)
ModuleType( ModuleTypeFunc_t)
static constexpr double ps
static auto end_path_spec()
void fillSelectEventsDeps_(detail::configs_t const &worker_configs, detail::collection_map_t &info_collection) const
auto transform_all(Container &, OutputIt, UnaryOp)
void err(const char *fmt,...)
QTextStream & dec(QTextStream &s)
ParameterSetID id() const
std::vector< PathSpec > triggerPathSpecs() const
ModuleThreadingType moduleThreadingType() const
std::map< std::string, detail::ModuleConfigInfo > moduleInformation_(detail::EnabledModules const &enabled_modules) const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
UpdateOutputCallbacks & outputCallbacks_
ActionTable const & exceptActions_
char const * what() const noexcept override
std::vector< WorkerInPath > fillWorkers_(PathContext const &pc, std::vector< WorkerInPath::ConfigInfo > const &wci_list, ModulesByThreadingType const &modules, std::map< std::string, std::shared_ptr< Worker >> &workers, GlobalTaskGroup &task_group, detail::SharedResources &resources)
PathID pathID() const noexcept
std::map< module_label_t, std::shared_ptr< ModuleBase > > shared
void remove_whitespace(std::string &str)
detail::collection_map_t getModuleGraphInfoCollection_(std::vector< std::string > const &producing_services)
art::detail::module_entries_for_ordered_path_t triggerPathSpecs_
void setTriggerPathNames(std::vector< std::string > const &)
tbb::task_group & native_group()
std::set< ProductInfo > consumed_products_for_module(std::string const ¤t_process, ConsumesInfo::consumables_t::mapped_type const &consumables, std::map< std::string, std::set< ProductInfo >> const &produced_products, std::map< std::string, std::set< std::string >> const &viewable_products, config_const_iterator const config_begin, config_const_iterator const config_it)
decltype(auto) constexpr cbegin(T &&obj)
ADL-aware version of std::cbegin.
void createModulesAndWorkers(GlobalTaskGroup &task_group, detail::SharedResources &resources, std::vector< std::string > const &producing_services)
ModuleThreadingType( ModuleThreadingTypeFunc_t)
PathSpec path_spec(std::string const &path_spec)
decltype(auto) constexpr begin(T &&obj)
ADL-aware version of std::begin.
bool is_observer(ModuleType const mt)
art::detail::configs_t protoEndPathLabels_
PerScheduleContainer< PathsInfo > & triggerPathsInfo()
static Globals * instance()
PerScheduleContainer< PathsInfo > & endPathInfo()
Worker *(std::shared_ptr< ModuleBase >, ModuleDescription const &, WorkerParams const &) WorkerFromModuleMaker_t
ActivityRegistry const & actReg_
ModuleType loadModuleType_(std::string const &lib_spec) const
std::string to_string(ModuleType const mt)
fhicl::ParameterSet procPS_
void put(std::string const &key)
void wrapLibraryManagerException(cet::exception const &e, std::string const &item_type, std::string const &libspec, std::string const &release)
cet::coded_exception< error, detail::translate > exception
decltype(auto) constexpr empty(T &&obj)
ADL-aware version of std::empty.
std::pair< ModuleGraph, std::string > make_module_graph(ModuleGraphInfoMap const &modInfos, paths_to_modules_t const &trigger_paths, configs_t const &end_path)