OutputModule.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
22 #include "boost/json.hpp"
33 #include "fhiclcpp/ParameterSet.h"
34 
35 #include <array>
36 #include <atomic>
37 #include <cassert>
38 #include <cstddef>
39 #include <iostream>
40 #include <memory>
41 #include <set>
42 #include <string>
43 #include <utility>
44 #include <vector>
45 
46 using namespace std;
47 
49 
50 namespace art {
51 
52  OutputModule::~OutputModule() noexcept = default;
53 
55  ParameterSet const& containing_pset)
56  : Observer{config().eoFragment().selectEvents(),
57  config().eoFragment().rejectEvents(),
58  containing_pset}
59  , groupSelectorRules_{config().outputCommands(),
60  "outputCommands",
61  "OutputModule"}
62  , configuredFileName_{config().fileName()}
63  , dataTier_{config().dataTier()}
64  , streamName_{config().streamName()}
65  {
66  std::vector<ParameterSet> fcmdPluginPSets;
67  if (config().fcmdPlugins.get_if_present(fcmdPluginPSets)) {
68  plugins_ = makePlugins_(fcmdPluginPSets);
69  }
70  serialize(detail::LegacyResource);
71  }
72 
73  OutputModule::OutputModule(ParameterSet const& pset)
74  : Observer{pset}
75  , groupSelectorRules_{pset.get<vector<string>>("outputCommands",
76  {"keep *"}),
77  "outputCommands",
78  "OutputModule"}
79  , configuredFileName_{pset.get<string>("fileName", "")}
80  , dataTier_{pset.get<string>("dataTier", "")}
81  , streamName_{pset.get<string>("streamName", "")}
82  , plugins_{makePlugins_(pset.get<vector<ParameterSet>>("FCMDPlugins", {}))}
83  {
85  }
86 
87  bool
89  {
90  return isFileOpen();
91  }
92 
93  string
95  {
96  return "OutputWorker";
97  }
98 
99  void
101  {}
102 
103  bool
105  {
106  return false;
107  }
108 
111  {
112  return Granularity::Unset;
113  }
114 
115  string const&
117  {
118  return configuredFileName_;
119  }
120 
121  void
123  {
125  }
126 
127  void
129  {
130  // Note: The keptProducts_ data member records all of the
131  // BranchDescription objects that may be persisted to disk. Since
132  // we do not reset it, the list never shrinks. This behavior should
133  // be reconsidered for future use cases of art.
134  auto selectProductForBranchType = [this, &tables](BranchType const bt) {
135  auto const& productList = tables.descriptions(bt);
136  groupSelector_[bt] =
137  std::make_unique<GroupSelector const>(groupSelectorRules_, productList);
138  // TODO: See if we can collapse keptProducts_ and groupSelector into
139  // a single object. See the notes in the header for GroupSelector
140  // for more information.
141  for (auto const& val : productList) {
142  BranchDescription const& pd = val.second;
143  if (pd.transient() || pd.dropped()) {
144  continue;
145  }
146  if (selected(pd)) {
147  // Here, we take care to merge the BranchDescription objects
148  // if one was already present in the keptProducts list.
149  auto& keptProducts = keptProducts_[bt];
150  if (auto it = keptProducts.find(pd.productID());
151  it != end(keptProducts)) {
152  auto& found_pd = it->second;
153  assert(combinable(found_pd, pd));
154  found_pd.merge(pd);
155  } else {
156  // New product
157  keptProducts.emplace(pd.productID(), pd);
158  }
159  continue;
160  }
161  hasNewlyDroppedBranch_[bt] = true;
162  }
163  };
164  for_each_branch_type(selectProductForBranchType);
165  }
166 
167  void
169  {
170  doSelectProducts(tables);
172  }
173 
174  void
176  {}
177 
178  void
180  ModuleDescription const& md)
181  {
182  doRegisterProducts(producedProducts, md);
183  }
184 
185  void
187  ModuleDescription const&)
188  {}
189 
190  void
192  {
193  createQueues(resources);
194  beginJob();
195  cet::for_all(plugins_, [](auto& p) { p->doBeginJob(); });
196  }
197 
198  bool
200  {
201  FDEBUG(2) << "beginRun called\n";
202  beginRun(rp);
203  Run const r{rp, mc};
204  cet::for_all(plugins_, [&r](auto& p) { p->doBeginRun(r); });
205  return true;
206  }
207 
208  bool
210  ModuleContext const& mc)
211  {
212  FDEBUG(2) << "beginSubRun called\n";
213  beginSubRun(srp);
214  SubRun const sr{srp, mc};
215  cet::for_all(plugins_, [&sr](auto& p) { p->doBeginSubRun(sr); });
216  return true;
217  }
218 
219  bool
221  ModuleContext const& mc,
222  std::atomic<std::size_t>& counts_run,
223  std::atomic<std::size_t>& counts_passed,
224  std::atomic<std::size_t>& /*counts_failed*/)
225  {
226  FDEBUG(2) << "doEvent called\n";
227  Event const e{ep, mc};
228  if (wantEvent(mc.scheduleID(), e)) {
229  ++counts_run;
230  event(ep);
231  ++counts_passed;
232  }
233  return true;
234  }
235 
236  void
238  {
239  FDEBUG(2) << "writeEvent called\n";
240  Event const e{ep, mc};
241  if (wantEvent(mc.scheduleID(), e)) {
242  write(ep);
243  // Declare that the event was selected for write to the catalog interface.
245  auto const& trRef(trHandle.isValid() ?
246  static_cast<HLTGlobalStatus>(*trHandle) :
247  HLTGlobalStatus{});
248  ci_->eventSelected(
249  moduleDescription().moduleLabel(), ep.eventID(), trRef);
250  // ... and invoke the plugins:
251  cet::for_all(plugins_, [&e](auto& p) { p->doCollectMetadata(e); });
253  if (remainingEvents_ > 0) {
255  }
256  }
257  }
258 
259  void
261  {
263  }
264 
265  bool
267  {
268  FDEBUG(2) << "endSubRun called\n";
269  endSubRun(srp);
270  SubRun const sr{srp, mc};
271  cet::for_all(plugins_, [&sr](auto& p) { p->doEndSubRun(sr); });
272  return true;
273  }
274 
275  void
277  {
278  FDEBUG(2) << "writeSubRun called\n";
279  writeSubRun(srp);
280  }
281 
282  void
284  {
285  FDEBUG(2) << "writeAuxiliaryRangeSets(rp) called\n";
287  }
288 
289  bool
291  {
292  FDEBUG(2) << "endRun called\n";
293  endRun(rp);
294  Run const r{rp, mc};
295  cet::for_all(plugins_, [&r](auto& p) { p->doEndRun(r); });
296  return true;
297  }
298 
299  void
301  {
302  FDEBUG(2) << "writeRun called\n";
303  writeRun(rp);
304  }
305 
306  void
308  {
309  endJob();
310  cet::for_all(plugins_, [](auto& p) { p->doEndJob(); });
311  }
312 
313  bool
315  {
316  if (isFileOpen()) {
317  return false;
318  }
319  openFile(fb);
320  return true;
321  }
322 
323  void
325  {
327  unique_ptr<ResultsPrincipal> respHolder;
328  ResultsPrincipal const* respPtr = fb.resultsPrincipal();
329  if (respPtr == nullptr) {
330  respHolder = make_unique<ResultsPrincipal>(
333  nullptr);
334  respPtr = respHolder.get();
335  }
336  readResults(*respPtr);
337  }
338 
339  void
341  {
343  }
344 
345  void
347  {
349  }
350 
351  void
353  {
355  }
356 
357  bool
359  {
360  if (isFileOpen()) {
361  reallyCloseFile();
362  return true;
363  }
364  return false;
365  }
366 
367  void
369  {
371  startEndFile();
374  writeFileIndex();
383  finishEndFile();
384  branchParents_.clear();
386  }
387 
388  // Called every event (by doWriteEvent) toupdate branchParents_
389  // and branchChildren_.
390  void
392  {
393  // Note: threading: We are implicitly using the Principal
394  // iterators here which iterate over the groups held
395  // by the principal, which may be updated by a producer
396  // task in another stream while we are iterating! But
397  // only for Run, SubRun, and Results principals, in the
398  // case of Event principals we arrange that no producer
399  // or filter tasks are running when we run. So since we
400  // are only called for event principals we are safe.
401  //
402  // Note: threading: We update branchParents_ and
403  // branchChildren_ here which must be protected if we
404  // become a stream or global module.
405  //
406  for (auto const& pid_and_uptr_to_grp : ep) {
407  auto const& group = *pid_and_uptr_to_grp.second;
408  if (group.productProvenance()) {
409  ProductID const pid = pid_and_uptr_to_grp.first;
410  auto iter = branchParents_.find(pid);
411  if (iter == branchParents_.end()) {
412  iter = branchParents_.emplace(pid, set<ParentageID>{}).first;
413  }
414  iter->second.insert(group.productProvenance()->parentageID());
416  }
417  }
418  }
419 
420  // Called at file close to update branchChildren_ from the accumulated
421  // branchParents_.
422  void
424  {
425  for (auto const& bp : branchParents_) {
426  ProductID const child = bp.first;
427  set<ParentageID> const& eIds = bp.second;
428  for (auto const& eId : eIds) {
429  Parentage par;
430  if (!ParentageRegistry::get(eId, par)) {
431  continue;
432  }
433  for (auto const& p : par.parents()) {
434  branchChildren_.insertChild(p, child);
435  }
436  }
437  }
438  }
439 
440  void
442  {}
443 
444  void
446  {}
447 
448  void
450  {}
451 
452  void
454  {}
455 
456  void
458  {}
459 
460  void
462  {}
463 
464  void
466  {}
467 
468  void
470  {}
471 
472  void
474  {}
475 
476  void
478  {}
479 
480  void
482  {}
483 
484  void
486  {}
487 
488  void
490  {}
491 
492  void
494  {}
495 
496  void
498  {}
499 
500  bool
502  {
503  return true;
504  }
505 
506  void
508  {}
509 
510  void
512  {}
513 
514  void
516  {}
517 
518  void
520  {}
521 
522  void
524  {}
525 
526  void
528  {}
529 
530  void
532  {}
533 
534  void
536  {}
537 
538  void
540  {}
541 
542  void
544  {}
545 
546  void
548  {}
549 
550  void
552  {}
553 
554  namespace {
555  void
556  collectStreamSpecificMetadata(
557  vector<unique_ptr<FileCatalogMetadataPlugin>> const& plugins,
558  vector<string> const& pluginNames,
560  {
561  size_t pluginCounter = 0;
562  ostringstream errors;
563  for (auto& plugin : plugins) {
564  FileCatalogMetadata::collection_type tmp = plugin->doProduceMetadata();
565  ssmd.reserve(tmp.size() + ssmd.size());
566  for (auto&& entry : tmp) {
567  if (ServiceHandle<FileCatalogMetadata const> {}->wantCheckSyntax()) {
568  string checkString("{ ");
569  checkString +=
570  cet::canonical_string(entry.first) + " : " + entry.second + " }";
571  boost::json::error_code ec;
573  auto const n_parsed_chars = p.write_some(checkString, ec);
574  if (ec) {
575  errors << "OutputModule::writeCatalogMetadata():" << ec.message()
576  << " in metadata produced by plugin "
577  << pluginNames[pluginCounter] << ":\n"
578  << " Faulty key/value clause:\n"
579  << checkString << "\n"
580  << (n_parsed_chars ? string(n_parsed_chars, '-') : "")
581  << "^\n";
582  }
583  }
584  ssmd.emplace_back(move(entry));
585  }
586  ++pluginCounter;
587  }
588  auto const errMsg = errors.str();
589  if (!errMsg.empty()) {
590  throw Exception(errors::DataCorruption) << errMsg;
591  }
592  }
593  } // namespace
594 
595  void
597  {
598  // Obtain metadata from service for output.
601  ->getMetadata(md);
602  if (!dataTier_.empty()) {
603  md.emplace_back("data_tier", cet::canonical_string(dataTier_));
604  }
605  if (!streamName_.empty()) {
606  md.emplace_back("data_stream", cet::canonical_string(streamName_));
607  }
608  // Ask any plugins for their list of metadata, and put it in a
609  // separate list for the output module. The user stream-specific
610  // metadata should override stream-specific metadata generated by the
611  // output module iself.
613  collectStreamSpecificMetadata(plugins_, pluginNames_, ssmd);
614  doWriteFileCatalogMetadata(md, ssmd);
615  }
616 
617  void
621  {}
622 
623  void
625  {}
626 
627  void
629  {}
630 
632  OutputModule::makePlugins_(vector<ParameterSet> const& psets)
633  {
635  result.reserve(psets.size());
636  size_t count{0};
637  try {
638  for (auto const& pset : psets) {
639  auto const& libspec =
640  pluginNames_.emplace_back(pset.get<string>("plugin_type"));
641  auto const pluginType = pluginFactory_.pluginType(libspec);
642  if (pluginType !=
644  throw Exception(errors::Configuration, "OutputModule: ")
645  << "unrecognized plugin type " << pluginType << ".\n";
646  }
647  result.emplace_back(
648  pluginFactory_.makePlugin<unique_ptr<FileCatalogMetadataPlugin>>(
649  libspec, pset));
650  ++count;
651  }
652  }
653  catch (cet::exception& e) {
654  throw Exception(errors::Configuration, "OutputModule: ", e)
655  << "Exception caught while processing FCMDPlugins[" << count
656  << "] in module " << moduleDescription().moduleLabel() << ".\n";
657  }
658  return result;
659  }
660 
661  int
663  {
664  return maxEvents_;
665  }
666 
667  int
669  {
670  return remainingEvents_;
671  }
672 
673  SelectionsArray const&
675  {
676  return keptProducts_;
677  }
678 
679  bool
681  {
682  auto const bt = pd.branchType();
683  assert(groupSelector_[bt]);
684  return groupSelector_[bt]->selected(pd);
685  }
686 
687  std::array<bool, NumBranchTypes> const&
689  {
690  return hasNewlyDroppedBranch_;
691  }
692 
693  BranchChildren const&
695  {
696  return branchChildren_;
697  }
698 
699  bool
701  {
702  return remainingEvents_ == 0;
703  }
704 } // namespace art
virtual void writeFileIdentifier()
bool doOpenFile(FileBlock const &fb)
end
while True: pbar.update(maxval-len(onlies[E][S])) #print iS, "/", len(onlies[E][S]) found = False for...
EventID const & eventID() const
Definition: Principal.cc:1064
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
void doWriteEvent(EventPrincipal &ep, ModuleContext const &mc)
std::vector< std::pair< std::string, std::string >> collection_type
virtual void writeProcessConfigurationRegistry()
void updateBranchParents(EventPrincipal &ep)
QList< Entry > entry
virtual void readResults(ResultsPrincipal const &resp)
virtual void doBeginJob(detail::SharedResources const &resources)
virtual void finishEndFile()
static QCString result
Handle< TriggerResults > getTriggerResults(Event const &e) const
Definition: Observer.cc:92
virtual void incrementInputFileNumber()
std::string streamName_
Definition: OutputModule.h:245
virtual void event(EventPrincipal const &)
std::string string
Definition: nybbler.cc:12
virtual void respondToCloseOutputFiles(FileBlock const &)
std::string const & moduleLabel() const
virtual void writeRun(RunPrincipal &r)=0
virtual void writeSubRun(SubRunPrincipal &sr)=0
virtual void doRegisterProducts(ProductDescriptions &, ModuleDescription const &)
std::vector< std::string > pluginNames_
Definition: OutputModule.h:250
struct vector vector
bool doEndSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
std::vector< ProductID > const & parents() const
Definition: Parentage.cc:36
virtual void respondToOpenInputFile(FileBlock const &)
void writeFileCatalogMetadata()
std::enable_if_t<!std::is_function_v< RESULT_TYPE >, RESULT_TYPE > makePlugin(std::string const &libspec, ARGS &&...args) const
STL namespace.
void doWriteSubRun(SubRunPrincipal &srp)
std::vector< BranchDescription > ProductDescriptions
void doSelectProducts(ProductTables const &)
int maxEvents() const
bool doBeginSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
virtual void writeFileFormatVersion()
virtual void beginRun(RunPrincipal const &)
bool doEvent(EventPrincipal const &ep, ModuleContext const &mc, std::atomic< std::size_t > &counts_run, std::atomic< std::size_t > &counts_passed, std::atomic< std::size_t > &counts_failed)
auto const & descriptions(BranchType const bt) const
Definition: ProductTables.h:43
BranchChildren branchChildren_
Definition: OutputModule.h:242
virtual void endRun(RunPrincipal const &)
virtual bool isFileOpen() const
Definition: Run.h:17
virtual std::string const & lastClosedFileName() const
std::vector< std::unique_ptr< FileCatalogMetadataPlugin >> PluginCollection_t
Definition: OutputModule.h:57
void insertEmpty(ProductID parent)
QCollection::Item first()
Definition: qglist.cpp:807
void doRespondToCloseInputFile(FileBlock const &fb)
void selectProducts(ProductTables const &)
SharedResource_t const LegacyResource
virtual void writeFileIndex()
virtual void respondToOpenOutputFiles(FileBlock const &)
BranchType branchType() const noexcept
GroupSelectorRules groupSelectorRules_
Definition: OutputModule.h:237
bool limitReached() const
PluginCollection_t plugins_
Definition: OutputModule.h:251
void registerProducts(ProductDescriptions &, ModuleDescription const &)
std::string workerType() const
Definition: OutputModule.cc:94
virtual void setSubRunAuxiliaryRangeSetID(RangeSet const &)
OutputFileStatus
const double e
static collection_type const & get()
bt
Definition: tracks.py:83
virtual void writeParameterSetRegistry()
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
Definition: OutputModule.h:236
virtual void endSubRun(SubRunPrincipal const &)
static Config * config
Definition: config.cpp:1054
bool doEndRun(RunPrincipal const &rp, ModuleContext const &mc)
std::array< std::unique_ptr< GroupSelector const >, NumBranchTypes > groupSelector_
Definition: OutputModule.h:235
virtual void setRunAuxiliaryRangeSetID(RangeSet const &)
def move(depos, offset)
Definition: depos.py:107
virtual void writeParentageRegistry()
virtual void endJob()
bool wantEvent(ScheduleID id, Event const &e) const
Definition: Observer.cc:74
virtual void write(EventPrincipal &e)=0
bool dropped() const noexcept
void doSetSubRunAuxiliaryRangeSetID(RangeSet const &)
p
Definition: test.py:223
SelectionsArray keptProducts_
Definition: OutputModule.h:233
ServiceHandle< CatalogInterface > ci_
Definition: OutputModule.h:246
virtual void writeProductDescriptionRegistry()
std::array< Selections, NumBranchTypes > SelectionsArray
Definition: Selections.h:12
string tmp
Definition: languages.py:63
virtual Granularity fileGranularity() const
std::map< ProductID, std::set< ParentageID > > branchParents_
Definition: OutputModule.h:241
bool selected(BranchDescription const &) const
bool fileIsOpen() const
Definition: OutputModule.cc:88
ProcessConfiguration const & processConfiguration() const
std::string configuredFileName_
Definition: OutputModule.h:243
ResultsPrincipal const * resultsPrincipal() const
Definition: FileBlock.cc:35
bool combinable(BranchDescription const &a, BranchDescription const &b)
virtual bool requestsToCloseFile() const
int remainingEvents() const
std::string dataTier_
Definition: OutputModule.h:244
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void fillDependencyGraph()
void insertChild(ProductID parent, ProductID child)
cet::BasicPluginFactory pluginFactory_
Definition: OutputModule.h:247
void createQueues(SharedResources const &resources)
Definition: SharedModule.cc:34
void doRespondToOpenOutputFiles(FileBlock const &fb)
virtual void postSelectProducts()
bool canonical_string(std::string const &str, std::string &result)
PluginCollection_t makePlugins_(std::vector< fhicl::ParameterSet > const &psets)
virtual void openFile(FileBlock const &)
std::string pluginType(std::string const &libspec) const
void doWriteRun(RunPrincipal &rp)
virtual void writeEventHistory()
SelectionsArray const & keptProducts() const
auto for_all(FwdCont &, Func)
BranchType
Definition: BranchType.h:20
virtual void writeProcessHistoryRegistry()
void serialize(T const &...)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
virtual void writeProductDependencies()
virtual void setFileStatus(OutputFileStatus)
void for_each_branch_type(F f)
Definition: BranchType.h:38
bool transient() const noexcept
virtual void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd)
virtual void respondToCloseInputFile(FileBlock const &)
void doRespondToCloseOutputFiles(FileBlock const &fb)
static constexpr double sr
Definition: Units.h:166
BranchChildren const & branchChildren() const
virtual void beginSubRun(SubRunPrincipal const &)
virtual void beginJob()
void doRespondToOpenInputFile(FileBlock const &fb)
virtual void writeBranchIDListRegistry()
ProductID productID() const noexcept
virtual void startEndFile()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void doSetRunAuxiliaryRangeSetID(RangeSet const &)
void configure(OutputModuleDescription const &desc)
ModuleDescription const & moduleDescription() const
Definition: ModuleBase.cc:15
bool doBeginRun(RunPrincipal const &rp, ModuleContext const &)