45 #include "hep_concurrency/WaitingTask.h" 59 using namespace string_literals;
67 ActivityRegistry& actReg,
68 detail::SharedResources& resources)
72 services_pset.
erase(
"FloatingPointControl");
73 services_pset.
erase(
"message");
74 services_pset.
erase(
"scheduler");
75 auto mgr =
new ServicesManager{
move(services_pset), actReg, resources};
76 mgr->addSystemService<FloatingPointControl>(fpcPSet, actReg);
80 std::unique_ptr<Worker>
81 maybe_trigger_results_inserter(ScheduleID
const scheduleID,
82 string const& processName,
85 UpdateOutputCallbacks& outputCallbacks,
88 ActivityRegistry
const& actReg,
90 GlobalTaskGroup& task_group,
91 detail::SharedResources& resources)
93 if (pathsInfo.paths().empty()) {
94 return std::unique_ptr<Worker>{
nullptr};
98 WorkerParams
const wp{outputCallbacks,
103 task_group.native_group(),
105 ModuleDescription
md{
107 "TriggerResultInserter",
109 ModuleThreadingType::replicated,
111 actReg.sPreModuleConstruction.invoke(
md);
112 auto producer = std::make_shared<TriggerResultInserter>(
113 trig_pset, scheduleID, pathsInfo.pathResults());
116 std::make_unique<WorkerT<ReplicatedProducer>>(
producer,
md, wp);
117 actReg.sPostModuleConstruction.invoke(md);
142 auto const scheduler_pset = services_pset.
get<
ParameterSet>(
"scheduler");
170 <<
" nthreads: " <<
scheduler_->num_threads();
172 auto const errorOnMissingConsumes =
scheduler_->errorOnMissingConsumes();
205 auto& trigger_paths_info =
pathManager_->triggerPathsInfo(sid);
206 assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
208 auto results_inserter =
209 maybe_trigger_results_inserter(sid,
221 std::forward_as_tuple(sid),
222 std::forward_as_tuple(sid,
227 move(results_inserter),
238 main_input.
put(
"module_type",
"EmptyEvent");
239 main_input.
put(
"module_label",
"source");
240 main_input.
put(
"maxEvents", -1);
241 if (!pset.get_if_present(
"source", main_input)) {
243 <<
"Could not find a source configuration: using default.";
247 main_input.
get<
string>(
"module_type"),
248 main_input.
get<
string>(
"module_label"),
264 <<
"Configuration of main input source has failed\n" 271 <<
"Configuration of main input source has failed\n" 290 vector<Worker*> allWorkers;
292 back_inserter(allWorkers),
293 [](
auto const& pr) {
return pr.second.get(); });
295 back_inserter(allWorkers),
296 [](
auto const& pr) {
return pr.second.get(); });
315 finalizeContainingLevels<L>();
325 <<
" Current level: " << L
333 EventProcessor::begin<Level::Job>()
341 EventProcessor::begin<Level::InputFile>()
348 EventProcessor::begin<Level::Run>()
362 EventProcessor::begin<Level::SubRun>()
379 EventProcessor::finalize<Level::SubRun>()
401 EventProcessor::finalize<Level::Run>()
423 EventProcessor::finalize<Level::InputFile>()
434 EventProcessor::finalize<Level::Job>()
442 EventProcessor::finalizeContainingLevels<Level::SubRun>()
444 finalize<Level::Run>();
449 EventProcessor::finalizeContainingLevels<Level::Event>()
451 finalize<Level::SubRun>();
452 finalize<Level::Run>();
457 EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
464 EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
471 EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
494 mf::LogError(
"BeginJob") <<
"A cet::exception happened while processing" 495 " the beginJob of the 'source'\n";
496 e <<
"A cet::exception happened while processing" 497 " the beginJob of the 'source'\n";
501 mf::LogError(
"BeginJob") <<
"A exception happened while processing" 502 " the beginJob of the 'source'\n";
506 mf::LogError(
"BeginJob") <<
"An unknown exception happened while" 507 " processing the beginJob of the 'source'\n";
523 ec_->call([
this] {
input_->doEndJob(); });
547 if (
fb_ ==
nullptr) {
549 <<
"Source readFile() did not return a valid FileBlock: FileBlock " 550 <<
"should be valid or readFile() should throw.\n";
597 bool outputs_to_open{
false};
598 auto check_outputs_to_open = [
this,
601 outputs_to_open =
true;
605 return outputs_to_open;
615 auto open_some_outputs = [
this](
ScheduleID const sid) {
641 FDEBUG(1) <<
string(8,
' ') <<
"closeSomeOutputFiles\n";
650 FDEBUG(1) <<
string(8,
' ') <<
"respondToOpenInputFile\n";
659 FDEBUG(1) <<
string(8,
' ') <<
"respondToCloseInputFile\n";
668 FDEBUG(1) <<
string(8,
' ') <<
"respondToOpenOutputFiles\n";
677 FDEBUG(1) <<
string(8,
' ') <<
"respondToCloseOutputFiles\n";
689 auto rsh =
input_->runRangeSetHandler();
691 auto seed_range_set = [
this, &rsh](
ScheduleID const sid) {
708 FDEBUG(1) <<
string(8,
' ') <<
"readRun.....................(" 737 "EventProcessor: an exception occurred during current event processing",
742 <<
"an exception occurred during current event processing\n";
745 FDEBUG(1) <<
string(8,
' ') <<
"beginRun....................(" <<
r 762 FDEBUG(1) <<
string(8,
' ') <<
"setRunAuxiliaryRangeSetID...(" 769 auto merge_range_sets = [
this, &mergedRS](
ScheduleID const sid) {
778 auto update_executors = [
this, &mergedRS](
ScheduleID const sid) {
791 unique_ptr<RangeSetHandler> rshAtSwitch{
810 assert(!
run.isFlush());
823 "EventProcessor: an exception occurred during current event processing",
828 <<
"an exception occurred during current event processing\n";
842 assert(!
r.isFlush());
844 FDEBUG(1) <<
string(8,
' ') <<
"writeRun....................(" <<
r 857 auto rsh =
input_->subRunRangeSetHandler();
859 auto seed_range_set = [
this, &rsh](
ScheduleID const sid) {
877 FDEBUG(1) <<
string(8,
' ') <<
"readSubRun..................(" 906 "EventProcessor: an exception occurred during current event processing",
911 <<
"an exception occurred during current event processing\n";
914 FDEBUG(1) <<
string(8,
' ') <<
"beginSubRun.................(" <<
sr 931 FDEBUG(1) <<
string(8,
' ') <<
"setSubRunAuxiliaryRangeSetID(" 938 auto merge_range_sets = [
this, &mergedRS](
ScheduleID const sid) {
947 auto update_executors = [
this, &mergedRS](
ScheduleID const sid) {
993 unsigned largestEvent = 1U;
1002 if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
1003 if (rsh.eventInfo().id().event() > largestEvent) {
1004 largestEvent = rsh.eventInfo().id().event();
1010 unique_ptr<RangeSetHandler> rshAtSwitch{
1014 unique_ptr<RangeSetHandler> runRSHAtSwitch{
1020 rshAtSwitch->flushRanges();
1036 assert(!
sr.isFlush());
1049 "EventProcessor: an exception occurred during current event processing",
1054 <<
"an exception occurred during current event processing\n";
1057 FDEBUG(1) <<
string(8,
' ') <<
"endSubRun...................(" <<
sr 1068 assert(!
sr.isFlush());
1070 FDEBUG(1) <<
string(8,
' ') <<
"writeSubRun.................(" <<
sr 1079 EventProcessor::process<most_deeply_nested_level()>()
1092 auto const last_schedule_index =
scheduler_->num_schedules() - 1;
1096 taskGroup_->native_group().run_and_wait([
this, last_schedule_index] {
1108 finalizeContainingLevels<most_deeply_nested_level()>();
1111 FDEBUG(1) <<
string(8,
' ') <<
"closeSomeOutputFiles\n";
1215 if (
schedule(sid).outputsToClose()) {
1227 TDEBUG_FUNC_SI(5, sid) <<
"Calling input_->readEvent(subRunPrincipal_)";
1239 FDEBUG(1) <<
string(8,
' ') <<
"readEvent...................(" 1240 << ep->eventID() <<
")\n";
1245 if (
schedule(sid).event_principal().eventID().isFlush()) {
1261 : evp_{evp}, sid_{sid}
1270 rethrow_exception(ex);
1274 evp_->sharedException_.store<
Exception>(
1276 "EventProcessor: an exception occurred during current " 1283 <<
"exception being ignored for current event:\n" 1289 <<
"an exception occurred during current event processing\n";
1290 evp_->sharedException_.store_current();
1296 evp_->finishEventAsync(sid_);
1310 : evp_{evp}, sid_{sid}
1320 rethrow_exception(ex);
1324 evp_->sharedException_.store<
Exception>(
1326 "EventProcessor: an exception occurred during current " 1330 <<
"terminate event loop because of EXCEPTION";
1334 <<
"exception being ignored for current event:\n" 1340 <<
"an exception occurred during current event processing\n";
1341 evp_->sharedException_.store_current();
1343 <<
"terminate event loop because of EXCEPTION";
1348 auto finalize_event_task =
1349 make_waiting_task<EndPathRunnerTask>(evp_, sid_);
1351 evp_->schedule(sid_).process_event_observers(finalize_event_task);
1355 evp_->sharedException_.store<
Exception>(
1357 "EventProcessor: an exception occurred during current event " 1361 <<
"terminate event loop because of EXCEPTION";
1365 <<
"exception being ignored for current event:\n" 1371 <<
"an exception occurred during current event processing\n";
1372 evp_->sharedException_.store_current();
1374 <<
"terminate event loop because of EXCEPTION";
1403 assert(!
schedule(sid).event_principal().eventID().isFlush());
1405 auto endPathTask = make_waiting_task<EndPathTask>(
this, sid);
1418 "EventProcessor: an exception occurred during current event processing",
1424 <<
"exception being ignored for current event:\n" 1430 <<
"an exception occurred during current event processing\n";
1444 FDEBUG(1) <<
string(8,
' ') <<
"processEvent................(" 1445 << ep.eventID() <<
")\n";
1453 std::lock_guard sentry{m};
1460 if (!ep.eventID().isFlush()) {
1468 auto const id = ep.eventID();
1470 FDEBUG(1) <<
string(8,
' ') <<
"writeEvent..................(" <<
id 1474 <<
"Calling schedules_->" 1475 "recordOutputClosureRequests(Granularity::Event)";
1482 "EventProcessor: an exception occurred " 1483 "during current event processing",
1490 <<
"exception being ignored for current event:\n" 1496 <<
"an exception occurred during current event processing\n";
1515 ec_->call([
this] { begin<L>(); });
1517 levelsToProcess<level_down(L)>()) {
1518 ec_->call([
this] { process<level_down(L)>(); });
1522 recordOutputModuleClosureRequests<L>();
1530 ec_->call([
this, &returnCode] {
1531 process<highest_level()>();
1536 if (!
ec_->empty()) {
1546 auto const itemType =
input_->nextItemType();
1547 FDEBUG(1) <<
string(4,
' ') <<
"*** nextItemType: " << itemType <<
" ***\n";
1561 <<
"Invalid next item type presented to the event processor.\n" 1562 <<
"Please contact artists@fnal.gov.";
1565 <<
"Unrecognized next item type presented to the event processor.\n" 1566 <<
"Please contact artists@fnal.gov.";
1573 if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
void readAndProcessAsync(ScheduleID sid)
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
void closeAllOutputFiles()
void respondToCloseOutputFiles(FileBlock const &)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
void respondToCloseInputFile(FileBlock const &)
main_input
CNN definition ############################.
tsan< ProductDescriptions > producedProductDescriptions_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
void throw_if_stored_exception()
void respondToOpenOutputFiles()
static ConsumesInfo * instance()
void respondToCloseInputFile()
RangeSetHandler const & runRangeSetHandler()
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
tsan_unique_ptr< FileBlock > fb_
void openSomeOutputFiles()
#define TDEBUG_END_TASK_SI(LEVEL, SI)
constexpr auto most_deeply_nested_level() noexcept
bool const handleEmptyRuns_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
std::atomic< Level > nextLevel_
bool outputsToClose() const
void invokePostBeginJobWorkers_()
void seedSubRunRangeSet(RangeSetHandler const &rsh)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
Schedule & main_schedule()
void process(Transition, Principal &)
void beginJob(detail::SharedResources const &resources)
static constexpr ScheduleID first()
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
std::vector< BranchDescription > ProductDescriptions
fhicl::ParameterSet const & triggerPSet() const
std::unique_ptr< GlobalTaskGroup > taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
void store(std::exception_ptr ex_ptr)
ProcessConfigurationID id() const
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
EndPathRunnerTask(EventProcessor *evp, ScheduleID const sid)
std::string trim_right_copy(std::string source, std::string const &t=" ")
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void freeze(tbb::task_group &group)
tsan< cet::cpu_timer > timer_
EventPrincipal & event_principal()
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostSourceConstruction
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
ScheduleID::size_type nschedules() const
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
std::atomic< bool > finalizeSubRunEnabled_
T get(std::string const &key) const
std::string const & getReleaseVersion()
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
EndPathTask(EventProcessor *evp, ScheduleID const sid)
std::atomic< bool > finalizeRunEnabled_
std::string bold_fontify(std::string const &s)
std::atomic< bool > fileSwitchInProgress_
RangeSetHandler const & subRunRangeSetHandler()
#define TDEBUG_FUNC_SI(LEVEL, SI)
void recordOutputClosureRequests(Granularity const granularity)
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
void closeSomeOutputFiles()
auto transform_all(Container &, OutputIt, UnaryOp)
std::atomic< bool > beginSubRunCalled_
void setupSignals(bool want_sigint_enabled)
void finishEventAsync(ScheduleID sid)
ParameterSetID id() const
void respondToOpenOutputFiles(FileBlock const &)
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
void incrementInputFileNumber()
RangeSet seenRanges() const
StatusCode runToCompletion()
bool outputsToOpen() const
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
char const * what() const noexcept override
void showMissingConsumes() const
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
std::string const & processName() const
void setRunAuxiliaryRangeSetID()
void processEventAsync(ScheduleID sid)
void beginRunIfNotDoneAlready()
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void endJobAllSchedules()
void closeSomeOutputFiles()
void setRequireConsumes(bool const)
void accept_principal(std::unique_ptr< EventPrincipal > principal)
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
RangeSetHandler * clone() const
bool erase(std::string const &key)
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
RangeSet & merge(RangeSet const &other)
Schedule & schedule(ScheduleID const id)
constexpr auto highest_level() noexcept
void for_each_schedule(F f) const
Namespace containing all the test actions.
static Globals * instance()
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void closeAllOutputFiles()
void beginSubRunIfNotDoneAlready()
void seedRunRangeSet(RangeSetHandler const &rsh)
void writeRun(RunPrincipal &rp)
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
static constexpr double sr
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
void respondToOpenInputFile(FileBlock const &)
void openSomeOutputFiles(FileBlock const &fb)
void operator()(exception_ptr const ex)
void respondToOpenInputFile()
void put(std::string const &key)
void setSubRunAuxiliaryRangeSetID()
void writeSubRun(SubRunPrincipal &srp)
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
QTextStream & endl(QTextStream &s)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
void terminateAbnormally_()
auto const invalid_module_context
void operator()(std::exception_ptr ex) const
void respondToCloseOutputFiles()
void setOutputFileStatus(OutputFileStatus const ofs)