22 #include "boost/json.hpp" 52 OutputModule::~OutputModule() noexcept =
default;
57 config().eoFragment().rejectEvents(),
59 , groupSelectorRules_{
config().outputCommands(),
62 , configuredFileName_{
config().fileName()}
63 , dataTier_{
config().dataTier()}
64 , streamName_{
config().streamName()}
66 std::vector<ParameterSet> fcmdPluginPSets;
67 if (
config().fcmdPlugins.get_if_present(fcmdPluginPSets)) {
68 plugins_ = makePlugins_(fcmdPluginPSets);
80 ,
dataTier_{pset.get<
string>(
"dataTier",
"")}
96 return "OutputWorker";
134 auto selectProductForBranchType = [
this, &tables](
BranchType const bt) {
141 for (
auto const&
val : productList) {
152 auto& found_pd = it->second;
201 FDEBUG(2) <<
"beginRun called\n";
212 FDEBUG(2) <<
"beginSubRun called\n";
222 std::atomic<std::size_t>& counts_run,
223 std::atomic<std::size_t>& counts_passed,
224 std::atomic<std::size_t>& )
226 FDEBUG(2) <<
"doEvent called\n";
239 FDEBUG(2) <<
"writeEvent called\n";
245 auto const& trRef(trHandle.isValid() ?
268 FDEBUG(2) <<
"endSubRun called\n";
278 FDEBUG(2) <<
"writeSubRun called\n";
285 FDEBUG(2) <<
"writeAuxiliaryRangeSets(rp) called\n";
292 FDEBUG(2) <<
"endRun called\n";
302 FDEBUG(2) <<
"writeRun called\n";
327 unique_ptr<ResultsPrincipal> respHolder;
329 if (respPtr ==
nullptr) {
330 respHolder = make_unique<ResultsPrincipal>(
334 respPtr = respHolder.get();
406 for (
auto const& pid_and_uptr_to_grp : ep) {
407 auto const& group = *pid_and_uptr_to_grp.second;
408 if (group.productProvenance()) {
414 iter->second.insert(group.productProvenance()->parentageID());
427 set<ParentageID>
const& eIds = bp.second;
428 for (
auto const& eId : eIds) {
433 for (
auto const&
p : par.
parents()) {
556 collectStreamSpecificMetadata(
557 vector<unique_ptr<FileCatalogMetadataPlugin>>
const& plugins,
558 vector<string>
const& pluginNames,
561 size_t pluginCounter = 0;
562 ostringstream errors;
563 for (
auto&
plugin : plugins) {
565 ssmd.reserve(tmp.size() + ssmd.size());
566 for (
auto&&
entry : tmp) {
568 string checkString(
"{ ");
571 boost::json::error_code ec;
573 auto const n_parsed_chars = p.write_some(checkString, ec);
575 errors <<
"OutputModule::writeCatalogMetadata():" << ec.message()
576 <<
" in metadata produced by plugin " 577 << pluginNames[pluginCounter] <<
":\n" 578 <<
" Faulty key/value clause:\n" 579 << checkString <<
"\n" 580 << (n_parsed_chars ?
string(n_parsed_chars,
'-') :
"")
588 auto const errMsg = errors.str();
589 if (!errMsg.empty()) {
635 result.reserve(psets.size());
638 for (
auto const& pset : psets) {
639 auto const& libspec =
640 pluginNames_.emplace_back(pset.get<
string>(
"plugin_type"));
645 <<
"unrecognized plugin type " << pluginType <<
".\n";
655 <<
"Exception caught while processing FCMDPlugins[" <<
count 687 std::array<bool, NumBranchTypes>
const&
virtual void writeFileIdentifier()
bool doOpenFile(FileBlock const &fb)
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
EventID const & eventID() const
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
void doWriteEvent(EventPrincipal &ep, ModuleContext const &mc)
virtual void writeProcessConfigurationRegistry()
void updateBranchParents(EventPrincipal &ep)
virtual void readResults(ResultsPrincipal const &resp)
virtual void doBeginJob(detail::SharedResources const &resources)
virtual void finishEndFile()
Handle< TriggerResults > getTriggerResults(Event const &e) const
virtual void incrementInputFileNumber()
virtual void event(EventPrincipal const &)
virtual void respondToCloseOutputFiles(FileBlock const &)
std::string const & moduleLabel() const
virtual void writeRun(RunPrincipal &r)=0
virtual void writeSubRun(SubRunPrincipal &sr)=0
virtual void doRegisterProducts(ProductDescriptions &, ModuleDescription const &)
std::vector< std::string > pluginNames_
bool doEndSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
std::vector< ProductID > const & parents() const
virtual void respondToOpenInputFile(FileBlock const &)
void writeFileCatalogMetadata()
std::enable_if_t<!std::is_function_v< RESULT_TYPE >, RESULT_TYPE > makePlugin(std::string const &libspec, ARGS &&...args) const
void doWriteSubRun(SubRunPrincipal &srp)
std::vector< BranchDescription > ProductDescriptions
void doSelectProducts(ProductTables const &)
bool doBeginSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
virtual void writeFileFormatVersion()
virtual void beginRun(RunPrincipal const &)
bool doEvent(EventPrincipal const &ep, ModuleContext const &mc, std::atomic< std::size_t > &counts_run, std::atomic< std::size_t > &counts_passed, std::atomic< std::size_t > &counts_failed)
auto const & descriptions(BranchType const bt) const
BranchChildren branchChildren_
virtual void endRun(RunPrincipal const &)
virtual bool isFileOpen() const
virtual std::string const & lastClosedFileName() const
std::vector< std::unique_ptr< FileCatalogMetadataPlugin >> PluginCollection_t
void insertEmpty(ProductID parent)
QCollection::Item first()
void doRespondToCloseInputFile(FileBlock const &fb)
void selectProducts(ProductTables const &)
SharedResource_t const LegacyResource
virtual void writeFileIndex()
virtual void respondToOpenOutputFiles(FileBlock const &)
BranchType branchType() const noexcept
GroupSelectorRules groupSelectorRules_
bool limitReached() const
PluginCollection_t plugins_
void registerProducts(ProductDescriptions &, ModuleDescription const &)
std::string workerType() const
virtual void setSubRunAuxiliaryRangeSetID(RangeSet const &)
static collection_type const & get()
virtual void writeParameterSetRegistry()
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
virtual void endSubRun(SubRunPrincipal const &)
bool doEndRun(RunPrincipal const &rp, ModuleContext const &mc)
std::array< std::unique_ptr< GroupSelector const >, NumBranchTypes > groupSelector_
virtual void setRunAuxiliaryRangeSetID(RangeSet const &)
virtual void writeParentageRegistry()
bool wantEvent(ScheduleID id, Event const &e) const
virtual void write(EventPrincipal &e)=0
bool dropped() const noexcept
void doSetSubRunAuxiliaryRangeSetID(RangeSet const &)
SelectionsArray keptProducts_
ServiceHandle< CatalogInterface > ci_
virtual void writeProductDescriptionRegistry()
std::array< Selections, NumBranchTypes > SelectionsArray
virtual Granularity fileGranularity() const
std::map< ProductID, std::set< ParentageID > > branchParents_
bool selected(BranchDescription const &) const
ProcessConfiguration const & processConfiguration() const
std::string configuredFileName_
ResultsPrincipal const * resultsPrincipal() const
bool combinable(BranchDescription const &a, BranchDescription const &b)
virtual bool requestsToCloseFile() const
int remainingEvents() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
void fillDependencyGraph()
void insertChild(ProductID parent, ProductID child)
cet::BasicPluginFactory pluginFactory_
void createQueues(SharedResources const &resources)
void doRespondToOpenOutputFiles(FileBlock const &fb)
virtual void postSelectProducts()
bool canonical_string(std::string const &str, std::string &result)
PluginCollection_t makePlugins_(std::vector< fhicl::ParameterSet > const &psets)
virtual void openFile(FileBlock const &)
std::string pluginType(std::string const &libspec) const
void doWriteRun(RunPrincipal &rp)
virtual void writeEventHistory()
SelectionsArray const & keptProducts() const
auto for_all(FwdCont &, Func)
virtual void writeProcessHistoryRegistry()
void serialize(T const &...)
virtual void writeProductDependencies()
virtual void setFileStatus(OutputFileStatus)
void for_each_branch_type(F f)
bool transient() const noexcept
virtual void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd)
virtual void respondToCloseInputFile(FileBlock const &)
void doRespondToCloseOutputFiles(FileBlock const &fb)
static constexpr double sr
BranchChildren const & branchChildren() const
virtual void beginSubRun(SubRunPrincipal const &)
void doRespondToOpenInputFile(FileBlock const &fb)
virtual void writeBranchIDListRegistry()
ProductID productID() const noexcept
virtual void startEndFile()
cet::coded_exception< error, detail::translate > exception
void doSetRunAuxiliaryRangeSetID(RangeSet const &)
void configure(OutputModuleDescription const &desc)
ModuleDescription const & moduleDescription() const
bool doBeginRun(RunPrincipal const &rp, ModuleContext const &)