RootOutput_module.cc
Go to the documentation of this file.
1 // vim: set sw=2 expandtab :
2 
30 #include "fhiclcpp/ParameterSet.h"
31 #include "fhiclcpp/types/Atom.h"
34 #include "fhiclcpp/types/Table.h"
35 #include "hep_concurrency/RecursiveMutex.h"
37 
38 #include <iomanip>
39 #include <iostream>
40 #include <memory>
41 #include <sstream>
42 #include <string>
43 #include <utility>
44 
45 using namespace std;
46 using namespace hep::concurrency;
47 
48 namespace {
49  string const dev_null{"/dev/null"};
50 }
51 
52 namespace art {
53 
54  class RootOutputFile;
55 
56  class RootOutput final : public OutputModule {
57 
58  // Constants.
59  public:
60  static constexpr char const* default_tmpDir{"<parent-path-of-filename>"};
61 
62  // Config.
63  public:
64  struct Config {
65  using Name = fhicl::Name;
67  template <typename T>
69  template <typename T>
72  Atom<string> catalog{Name("catalog"), ""};
73  OptionalAtom<bool> dropAllEvents{Name("dropAllEvents")};
74  Atom<bool> dropAllSubRuns{Name("dropAllSubRuns"), false};
75  OptionalAtom<bool> fastCloning{Name("fastCloning")};
76  Atom<string> tmpDir{Name("tmpDir"), default_tmpDir};
77  Atom<int> compressionLevel{Name("compressionLevel"), 7};
78  Atom<int64_t> saveMemoryObjectThreshold{Name("saveMemoryObjectThreshold"),
79  -1l};
80  Atom<int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
81  Atom<int> splitLevel{Name("splitLevel"), 99};
82  Atom<int> basketSize{Name("basketSize"), 16384};
83  Atom<bool> dropMetaDataForDroppedData{Name("dropMetaDataForDroppedData"),
84  false};
85  Atom<string> dropMetaData{Name("dropMetaData"), "NONE"};
86  Atom<bool> writeParameterSets{Name("writeParameterSets"), true};
88  Name("fileProperties")};
89 
91  {
92  // Both RootOutput module and OutputModule use the "fileName"
93  // FHiCL parameter. However, whereas in OutputModule the
94  // parameter has a default, for RootOutput the parameter should
95  // not. We therefore have to change the default flag setting
96  // for 'OutputModule::Config::fileName'.
97  using namespace fhicl::detail;
98  ParameterBase* adjustFilename{
99  const_cast<fhicl::Atom<string>*>(&omConfig().fileName)};
100  adjustFilename->set_par_style(fhicl::par_style::REQUIRED);
101  }
102 
103  struct KeysToIgnore {
104  set<string>
105  operator()() const
106  {
108  keys.insert("results");
109  return keys;
110  }
111  };
112  };
113 
115 
116  // Special Member Functions.
117  public:
118  ~RootOutput();
119  explicit RootOutput(Parameters const&);
120  RootOutput(RootOutput const&) = delete;
121  RootOutput(RootOutput&&) = delete;
122  RootOutput& operator=(RootOutput const&) = delete;
123  RootOutput& operator=(RootOutput&&) = delete;
124 
125  // Member Functions.
126  public:
127  void postSelectProducts() override;
128  void beginJob() override;
129  void endJob() override;
130  void beginRun(RunPrincipal const&) override;
131  void endRun(RunPrincipal const&) override;
132  void beginSubRun(SubRunPrincipal const&) override;
133  void endSubRun(SubRunPrincipal const&) override;
134  void event(EventPrincipal const&) override;
135 
136  // Member Functions -- Replace OutputModule Functions.
137  private:
138  string fileNameAtOpen() const;
139  string fileNameAtClose(string const& currentFileName);
140  string const& lastClosedFileName() const override;
141  Granularity fileGranularity() const override;
142  void openFile(FileBlock const&) override;
143  void respondToOpenInputFile(FileBlock const&) override;
144  void readResults(ResultsPrincipal const& resp) override;
145  void respondToCloseInputFile(FileBlock const&) override;
146  void incrementInputFileNumber() override;
147  void write(EventPrincipal&) override;
148  void writeSubRun(SubRunPrincipal&) override;
149  void writeRun(RunPrincipal&) override;
150  void setSubRunAuxiliaryRangeSetID(RangeSet const&) override;
151  void setRunAuxiliaryRangeSetID(RangeSet const&) override;
152  bool isFileOpen() const override;
153  void setFileStatus(OutputFileStatus) override;
154  bool requestsToCloseFile() const override;
155  void startEndFile() override;
156  void writeFileFormatVersion() override;
157  void writeFileIndex() override;
158  void writeEventHistory() override;
159  void writeProcessConfigurationRegistry() override;
160  void writeProcessHistoryRegistry() override;
161  void writeParameterSetRegistry() override;
162  void writeProductDescriptionRegistry() override;
163  void writeParentageRegistry() override;
164  void doWriteFileCatalogMetadata(
166  FileCatalogMetadata::collection_type const& ssmd) override;
167  void writeProductDependencies() override;
168  void finishEndFile() override;
169  void doRegisterProducts(ProductDescriptions& productsToProduce,
170  ModuleDescription const& md) override;
171 
172  // Member Functions -- Implementation Details.
173  private:
174  void doOpenFile();
175 
176  // Data Members.
177  private:
178  mutable RecursiveMutex mutex_;
179  string const catalog_;
182  string const moduleLabel_;
184  unique_ptr<RootOutputFile> rootOutputFile_;
187  string const filePattern_;
188  string tmpDir_;
189  string lastClosedFileName_{};
190  int const compressionLevel_;
193  int const splitLevel_;
194  int const basketSize_;
198  // Set false only for cases where we are guaranteed never to need historical
199  // ParameterSet information in the downstream file, such as when mixing.
205  };
206 
207  RootOutput::~RootOutput() = default;
208 
209  RootOutput::RootOutput(Parameters const& config)
210  : OutputModule{config().omConfig, config.get_PSet()}
211  , mutex_{"RootOutput::mutex_"}
212  , catalog_{config().catalog()}
213  , dropAllEvents_{false}
214  , dropAllSubRuns_{config().dropAllSubRuns()}
215  , moduleLabel_{config.get_PSet().get<string>("module_label")}
216  , inputFileCount_{0}
217  , rootOutputFile_{}
219  , fRenamer_{fstats_}
220  , filePattern_{config().omConfig().fileName()}
222  config().tmpDir()}
224  , compressionLevel_{config().compressionLevel()}
225  , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
226  , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
227  , splitLevel_{config().splitLevel()}
228  , basketSize_{config().basketSize()}
229  , dropMetaData_{config().dropMetaData()}
230  , dropMetaDataForDroppedData_{config().dropMetaDataForDroppedData()}
231  , fastCloningEnabled_{true}
232  , writeParameterSets_{config().writeParameterSets()}
233  , fileProperties_{(
235  config.get_PSet().has_key(config().fileProperties.name()),
236  filePattern_),
237  config().fileProperties())}
240  , rpm_{config.get_PSet()}
241  {
242  bool const dropAllEventsSet{config().dropAllEvents(dropAllEvents_)};
244  dropAllEventsSet, dropAllEvents_, dropAllSubRuns_);
245  // N.B. Any time file switching is enabled at a boundary other than
246  // InputFile, fastCloningEnabled_ ***MUST*** be deactivated. This is
247  // to ensure that the Event tree from the InputFile is not
248  // accidentally cloned to the output file before the output
249  // module has seen the events that are going to be processed.
250  bool const fastCloningSet{config().fastCloning(fastCloningEnabled_)};
253  if (!writeParameterSets_) {
254  mf::LogWarning("PROVENANCE")
255  << "Output module " << moduleLabel_
256  << " has parameter writeParameterSets set to false.\n"
257  << "Parameter set provenance will not be available in subsequent "
258  "jobs.\n"
259  << "Check your experiment's policy on this issue to avoid future "
260  "problems\n"
261  << "with analysis reproducibility.\n";
262  }
263  }
264 
265  void
267  {
268  RecursiveMutexSentry sentry{mutex_, __func__};
269  // Note: The file block here refers to the currently open
270  // input file, so we can find out about the available
271  // products by looping over the branches of the input
272  // file data trees.
273  if (!isFileOpen()) {
274  doOpenFile();
276  }
277  }
278 
279  void
281  {
282  RecursiveMutexSentry sentry{mutex_, __func__};
283  if (isFileOpen()) {
284  rootOutputFile_->selectProducts();
285  }
286  }
287 
288  void
290  {
291  RecursiveMutexSentry sentry{mutex_, __func__};
292  ++inputFileCount_;
293  if (!isFileOpen()) {
294  return;
295  }
296  auto const* rfb = dynamic_cast<RootFileBlock const*>(&fb);
297  bool fastCloneThisOne = fastCloningEnabled_ && rfb &&
298  (rfb->tree() != nullptr) &&
299  ((remainingEvents() < 0) ||
300  (remainingEvents() >= rfb->tree()->GetEntries()));
301  if (fastCloningEnabled_ && !fastCloneThisOne) {
302  mf::LogWarning("FastCloning")
303  << "Fast cloning deactivated for this input file due to "
304  << "empty event tree and/or event limits.";
305  }
306  if (fastCloneThisOne && !rfb->fastClonable()) {
307  mf::LogWarning("FastCloning")
308  << "Fast cloning deactivated for this input file due to "
309  << "information in FileBlock.";
310  fastCloneThisOne = false;
311  }
312  rootOutputFile_->beginInputFile(rfb, fastCloneThisOne);
314  }
315 
316  void
318  {
319  RecursiveMutexSentry sentry{mutex_, __func__};
321  [&resp](RPWorker& w) { w.rp().doReadResults(resp); });
322  }
323 
324  void
326  {
327  RecursiveMutexSentry sentry{mutex_, __func__};
328  if (isFileOpen()) {
329  rootOutputFile_->respondToCloseInputFile(fb);
330  }
331  }
332 
333  void
335  {
336  RecursiveMutexSentry sentry{mutex_, __func__};
337  if (dropAllEvents_) {
338  return;
339  }
341  ep.addToProcessHistory();
342  }
343  rootOutputFile_->writeOne(ep);
345  }
346 
347  void
349  {
350  RecursiveMutexSentry sentry{mutex_, __func__};
351  rootOutputFile_->setSubRunAuxiliaryRangeSetID(rs);
352  }
353 
354  void
356  {
357  RecursiveMutexSentry sentry{mutex_, __func__};
358  if (dropAllSubRuns_) {
359  return;
360  }
362  sr.addToProcessHistory();
363  }
364  rootOutputFile_->writeSubRun(sr);
366  }
367 
368  void
370  {
371  RecursiveMutexSentry sentry{mutex_, __func__};
372  rootOutputFile_->setRunAuxiliaryRangeSetID(rs);
373  }
374 
375  void
377  {
378  RecursiveMutexSentry sentry{mutex_, __func__};
379  if (hasNewlyDroppedBranch()[InRun]) {
380  rp.addToProcessHistory();
381  }
382  rootOutputFile_->writeRun(rp);
383  fstats_.recordRun(rp.runID());
384  }
385 
386  void
388  {
389  RecursiveMutexSentry sentry{mutex_, __func__};
390  auto resp = make_unique<ResultsPrincipal>(
392  resp->createGroupsForProducedProducts(producedResultsProducts_);
393  resp->enableLookupOfProducedProducts(producedResultsProducts_);
396  resp->addToProcessHistory();
397  }
399  [&resp](RPWorker& w) { w.rp().doWriteResults(*resp); });
400  rootOutputFile_->writeResults(*resp);
401  }
402 
403  void
405  {
406  RecursiveMutexSentry sentry{mutex_, __func__};
407  rootOutputFile_->writeFileFormatVersion();
408  }
409 
410  void
412  {
413  RecursiveMutexSentry sentry{mutex_, __func__};
414  rootOutputFile_->writeFileIndex();
415  }
416 
417  void
419  {
420  RecursiveMutexSentry sentry{mutex_, __func__};
421  rootOutputFile_->writeEventHistory();
422  }
423 
424  void
426  {
427  RecursiveMutexSentry sentry{mutex_, __func__};
428  rootOutputFile_->writeProcessConfigurationRegistry();
429  }
430 
431  void
433  {
434  RecursiveMutexSentry sentry{mutex_, __func__};
435  rootOutputFile_->writeProcessHistoryRegistry();
436  }
437 
438  void
440  {
441  RecursiveMutexSentry sentry{mutex_, __func__};
442  if (writeParameterSets_) {
443  rootOutputFile_->writeParameterSetRegistry();
444  }
445  }
446 
447  void
449  {
450  RecursiveMutexSentry sentry{mutex_, __func__};
451  rootOutputFile_->writeProductDescriptionRegistry();
452  }
453 
454  void
456  {
457  RecursiveMutexSentry sentry{mutex_, __func__};
458  rootOutputFile_->writeParentageRegistry();
459  }
460 
461  void
465  {
466  RecursiveMutexSentry sentry{mutex_, __func__};
467  rootOutputFile_->writeFileCatalogMetadata(fstats_, md, ssmd);
468  }
469 
470  void
472  {
473  RecursiveMutexSentry sentry{mutex_, __func__};
474  rootOutputFile_->writeProductDependencies();
475  }
476 
477  void
479  {
480  RecursiveMutexSentry sentry{mutex_, __func__};
481  string const currentFileName{rootOutputFile_->currentFileName()};
482  rootOutputFile_->writeTTrees();
483  rootOutputFile_.reset();
485  lastClosedFileName_ = fileNameAtClose(currentFileName);
486  detail::logFileAction("Closed output file ", lastClosedFileName_);
488  }
489 
490  void
492  ModuleDescription const& md)
493  {
494  RecursiveMutexSentry sentry{mutex_, __func__};
495  // Register Results products from ResultsProducers.
496  rpm_.for_each_RPWorker([&producedProducts, &md](RPWorker& w) {
497  auto const& params = w.params();
499  ModuleDescription{params.rpPSetID,
500  params.rpPluginType,
501  md.moduleLabel() + '#' + params.rpLabel,
503  md.processConfiguration()});
504  w.rp().registerProducts(producedProducts, w.moduleDescription());
505  });
506  // Form product table for Results products. We do this here so we
507  // can appropriately set the product tables for the
508  // ResultsPrincipal.
509  productsToProduce_ = producedProducts;
511  }
512 
513  void
515  {
516  RecursiveMutexSentry sentry{mutex_, __func__};
517  if (isFileOpen()) {
518  rootOutputFile_->setFileStatus(ofs);
519  }
520  }
521 
522  bool
524  {
525  RecursiveMutexSentry sentry{mutex_, __func__};
526  return rootOutputFile_.get() != nullptr;
527  }
528 
529  void
531  {
532  RecursiveMutexSentry sentry{mutex_, __func__};
533  if (isFileOpen()) {
534  rootOutputFile_->incrementInputFileNumber();
535  }
536  }
537 
538  bool
540  {
541  RecursiveMutexSentry sentry{mutex_, __func__};
542  return isFileOpen() ? rootOutputFile_->requestsToCloseFile() : false;
543  }
544 
547  {
548  RecursiveMutexSentry sentry{mutex_, __func__};
549  return fileProperties_.granularity();
550  }
551 
552  void
554  {
555  RecursiveMutexSentry sentry{mutex_, __func__};
556  if (inputFileCount_ == 0) {
558  << "Attempt to open output file before input file. "
559  << "Please report this to the core framework developers.\n";
560  }
561  rootOutputFile_ = make_unique<RootOutputFile>(this,
562  fileNameAtOpen(),
567  splitLevel_,
568  basketSize_,
573  detail::logFileAction("Opened output file with pattern ", filePattern_);
574  }
575 
576  string
578  {
579  return (filePattern_ == dev_null) ?
580  dev_null :
581  unique_filename(tmpDir_ + "/RootOutput");
582  }
583 
584  string
585  RootOutput::fileNameAtClose(std::string const& currentFileName)
586  {
587  return (filePattern_ == dev_null) ?
588  dev_null :
589  fRenamer_.maybeRenameFile(currentFileName, filePattern_);
590  }
591 
592  string const&
594  {
595  RecursiveMutexSentry sentry{mutex_, __func__};
596  if (lastClosedFileName_.empty()) {
597  throw Exception(errors::LogicError, "RootOutput::currentFileName(): ")
598  << "called before meaningful.\n";
599  }
600  return lastClosedFileName_;
601  }
602 
603  void
605  {
606  RecursiveMutexSentry sentry{mutex_, __func__};
608  }
609 
610  void
612  {
613  RecursiveMutexSentry sentry{mutex_, __func__};
615  }
616 
617  void
619  {
620  RecursiveMutexSentry sentry{mutex_, __func__};
621  rpm_.for_each_RPWorker([&ep](RPWorker& w) { w.rp().doEvent(ep); });
622  }
623 
624  void
626  {
627  RecursiveMutexSentry sentry{mutex_, __func__};
628  rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doBeginSubRun(srp); });
629  }
630 
631  void
633  {
634  RecursiveMutexSentry sentry{mutex_, __func__};
635  rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doEndSubRun(srp); });
636  }
637 
638  void
640  {
641  RecursiveMutexSentry sentry{mutex_, __func__};
642  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doBeginRun(rp); });
643  }
644 
645  void
647  {
648  RecursiveMutexSentry sentry{mutex_, __func__};
649  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doEndRun(rp); });
650  }
651 
652 } // namespace art
653 
std::string const & processName() const
Definition: Observer.cc:66
string const catalog_
Granularity fileGranularity() const override
EventID const & eventID() const
Definition: Principal.cc:1186
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
void registerProducts(ProductDescriptions &producedProducts, ModuleDescription const &md)
void writeParentageRegistry() override
std::vector< std::pair< std::string, std::string >> collection_type
void respondToCloseInputFile(FileBlock const &) override
void startEndFile() override
RunID const & runID() const
Definition: Principal.cc:1174
ModuleDescription const & moduleDescription() const
Definition: RPWorker.h:73
Granularity granularity() const
void writeProductDescriptionRegistry() override
void endSubRun(SubRunPrincipal const &) override
void doRegisterProducts(ProductDescriptions &productsToProduce, ModuleDescription const &md) override
void doEvent(EventPrincipal const &)
std::string string
Definition: nybbler.cc:12
auto const & get_PSet() const
void doBeginSubRun(SubRunPrincipal const &)
std::string const & moduleLabel() const
ResultsProducer & rp()
Definition: RPWorker.h:55
void recordRun(RunID const &id)
ClosingCriteria fileProperties_
string fileNameAtOpen() const
void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd) override
void doWriteResults(ResultsPrincipal &)
FileStatsCollector fstats_
void writeFileIndex() override
PostCloseFileRenamer fRenamer_
void doReadResults(ResultsPrincipal const &)
void validateFileNamePattern(bool do_check, std::string const &pattern)
void logFileAction(const char *msg, std::string const &file)
Definition: logFileAction.cc:9
void writeFileFormatVersion() override
STL namespace.
void setFileStatus(OutputFileStatus) override
void recordEvent(EventID const &id)
static constexpr char const * default_tmpDir
void addToProcessHistory()
Definition: Principal.cc:630
std::vector< BranchDescription > ProductDescriptions
bool isFileOpen() const override
string fileNameAtClose(string const &currentFileName)
void openFile(FileBlock const &) override
void writeProcessConfigurationRegistry() override
void doEndRun(RunPrincipal const &)
void incrementInputFileNumber() override
auto const & descriptions(BranchType const bt) const
Definition: ProductTables.h:41
void for_each_RPWorker(on_rpworker_t wfunc)
Definition: RPManager.cc:116
int64_t const treeMaxVirtualSize_
std::string unique_filename(std::string stem, std::string extension=".root")
void doBeginRun(RunPrincipal const &)
ProductDescriptions productsToProduce_
void writeParameterSetRegistry() override
OutputFileStatus
#define DEFINE_ART_MODULE(klass)
Definition: ModuleMacros.h:68
void beginJob()
Definition: Breakpoints.cc:14
DropMetaData dropMetaData_
void doEndSubRun(SubRunPrincipal const &)
bool requestsToCloseFile() const override
void write(EventPrincipal &) override
string const & lastClosedFileName() const override
std::string const & fileName() const
Definition: FileBlock.cc:34
void setModuleDescription(ModuleDescription const &)
Definition: RPWorker.h:79
static bool shouldFastClone(bool const fastCloningSet, bool const fastCloning, bool const wantAllEvents, ClosingCriteria const &cc)
RPParams const & params() const
Definition: RPWorker.h:67
void recordSubRun(SubRunID const &id)
SubRunID subRunID() const
Definition: Principal.cc:1180
int const compressionLevel_
RecursiveMutex mutex_
void recordInputFile(std::string const &inputFileName)
string const filePattern_
void writeProductDependencies() override
void writeSubRun(SubRunPrincipal &) override
ProcessConfiguration const & processConfiguration() const
void beginRun(RunPrincipal const &) override
int64_t const saveMemoryObjectThreshold_
void invoke(invoke_function_t< void, ARGS... > mfunc, ARGS &&...args)
Definition: RPManager.h:41
static ProductTables invalid()
int remainingEvents() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void beginJob() override
void setRunAuxiliaryRangeSetID(RangeSet const &) override
void respondToOpenInputFile(FileBlock const &) override
std::string parent_path(std::string const &path)
Definition: parent_path.cc:15
void readResults(ResultsPrincipal const &resp) override
signed __int64 int64_t
Definition: stdint.h:131
bool wantAllEvents() const
Definition: Observer.cc:72
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void setSubRunAuxiliaryRangeSetID(RangeSet const &) override
void endRun(RunPrincipal const &) override
ProductTables producedResultsProducts_
std::string maybeRenameFile(std::string const &inPath, std::string const &toPattern)
void endJob() override
fhicl::TableFragment< OutputModule::Config > omConfig
void beginSubRun(SubRunPrincipal const &) override
void writeProcessHistoryRegistry() override
void postSelectProducts() override
auto const & get(AssnsNode< L, R, D > const &r)
Definition: AssnsNode.h:115
void writeRun(RunPrincipal &) override
bool shouldDropEvents(bool const dropAllEventsSet, bool const dropAllEvents, bool const dropAllSubRuns)
void writeEventHistory() override
static const double sr
Definition: Units.h:167
string const moduleLabel_
unique_ptr< RootOutputFile > rootOutputFile_
void set_par_style(par_style const vt)
void event(EventPrincipal const &) override
AdcRoiViewer::Name Name
ModuleDescription const & moduleDescription() const
Definition: ModuleBase.cc:30
void finishEndFile() override