24 #include "hep_concurrency/WaitingTask.h" 28 #include <type_traits> 50 auto w =
val.second.get();
51 assert(sid ==
w->scheduleID());
58 outputCallbacks.registerCallback(
66 auto&
w = *label_and_worker.second;
70 w.beginJob(resources);
81 auto&
w = *label_and_worker.second;
89 error <<
"cet::exception caught in Schedule::endJob\n" 94 error <<
"Standard library exception caught in Schedule::endJob\n" 99 error <<
"Unknown exception caught in Schedule::endJob\n";
109 ow->selectProducts(tables);
117 auto&
w = *label_and_worker.second;
121 w.respondToOpenInputFile(fb);
129 auto&
w = *label_and_worker.second;
133 w.respondToCloseInputFile(fb);
141 auto&
w = *label_and_worker.second;
145 w.respondToOpenOutputFiles(fb);
153 auto&
w = *label_and_worker.second;
157 w.respondToCloseOutputFiles(fb);
165 return ow->fileIsOpen();
190 ow->setRunAuxiliaryRangeSetID(rangeSet);
217 ow->setSubRunAuxiliaryRangeSetID(rs);
225 ow->writeSubRun(srp);
240 auto&
w = *label_and_worker.second;
253 <<
"an exception occurred during current event processing\n" 258 <<
"an exception occurred during current event processing\n";
267 WaitingTaskPtr
const finalizeEventTask,
269 : endPathExec_{endPathExec}
270 , finalizeEventTask_{finalizeEventTask}
277 auto const scheduleID = endPathExec_->sc_.id();
284 rethrow_exception(ex);
288 tmp <<
"an exception occurred during current event processing\n" <<
e;
291 <<
"end path processing terminate because of EXCEPTION";
297 <<
"end path processing terminate because of EXCEPTION";
302 endPathExec_->endPathInfo_.incrementPassedEventCount();
320 auto const sid =
sc_.
id();
326 make_waiting_task<PathsDoneTask>(
this, finalizeEventTask,
taskGroup_);
349 ow->writeEvent(ep, mc);
352 auto const& eid = ep.
eventID();
355 <<
"eid: " << eid.run() <<
", " << eid.subRun() <<
", " << eid.event();
381 if (!ow->fileIsOpen()) {
402 if (!ow->openFile(fb)) {
408 outputWorkersToOpen_.clear();
425 ow->setFileStatus(ofs);
434 if (atBoundary < ow->fileGranularity()) {
439 auto wants_to_close = ow->requestsToCloseFile();
440 if (wants_to_close) {
450 ow->incrementInputFileNumber();
460 bool all_at_limit =
true;
462 if (!
w->limitReached()) {
463 all_at_limit =
false;
469 <<
"The job is terminating successfully because each output module\n" 470 <<
"has reached its configured limit.\n";
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
EndPathExecutor *const endPathExec_
EventID const & eventID() const
std::vector< Path > & paths()
bool outputsToClose() const
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenOutputFile
void seedRunRangeSet(RangeSetHandler const &)
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
#define TDEBUG_END_TASK_SI(LEVEL, SI)
bool skip_non_replicated(Worker const &)
PathsDoneTask(EndPathExecutor *const endPathExec, WaitingTaskPtr const finalizeEventTask, GlobalTaskGroup &taskGroup)
void writeSubRun(SubRunPrincipal &srp)
GlobalTaskGroup & taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
std::map< std::string, std::shared_ptr< Worker > > & workers()
void selectProducts(ProductTables const &)
void operator()(exception_ptr const ex)
void closeAllOutputFiles()
bool isLastInSubRun() const
void recordOutputClosureRequests(Granularity)
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreWriteEvent
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
void setOutputFileStatus(OutputFileStatus)
std::atomic< OutputFileStatus > fileStatus_
std::set< OutputWorker * > outputWorkersToClose_
void seedSubRunRangeSet(RangeSetHandler const &)
bool someOutputsOpen() const
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
static auto end_path_spec()
void writeRun(RunPrincipal &rp)
void beginJob(detail::SharedResources const &resources)
bool outputsToOpen() const
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
void closeSomeOutputFiles()
std::set< OutputWorker * > outputWorkersToOpen_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
WaitingTaskPtr const finalizeEventTask_
void respondToCloseInputFile(FileBlock const &fb)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
void writeEvent(EventPrincipal &)
void incrementPassedEventCount()
ActionTable const & actionTable_
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
RangeSetHandler * clone() const
void process(Transition, Principal &)
GlobalTaskGroup & taskGroup_
ActivityRegistry const & actReg_
void incrementInputFileNumber()
void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void incrementTotalEventCount()
cet::coded_exception< error, detail::translate > exception
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostWriteEvent