23 #include "boost/date_time/posix_time/posix_time.hpp" 40 #include "canvas_root_io/Utilities/DictionaryChecker.h" 52 #include "hep_concurrency/RecursiveMutex.h" 55 #include "TBranchElement.h" 78 string const& suffix = {})
82 <<
"Number of sqlite columns specified for table: " << name <<
'\n' 84 string ddl =
"DROP TABLE IF EXISTS " + name +
87 name +
"(" + columns.front();
88 for_each(columns.begin() + 1, columns.end(), [&ddl](
auto const& col) {
98 insert_eventRanges_row(sqlite3_stmt* stmt,
103 sqlite3_bind_int64(stmt, 1, sr);
104 sqlite3_bind_int64(stmt, 2, b);
105 sqlite3_bind_int64(stmt, 3, e);
111 insert_rangeSets_eventSets_row(sqlite3_stmt* stmt,
115 sqlite3_bind_int64(stmt, 1, rsid);
116 sqlite3_bind_int64(stmt, 2, esid);
122 getNewRangeSetID(sqlite3* db,
128 return sqlite3_last_insert_rowid(db);
134 vector<unsigned> rangeSetIDs;
138 .from(db,
"EventRanges")
139 .where(
"SubRun=" +
to_string(range.subRun()) +
155 sqlite3_stmt* stmt{
nullptr};
156 string const ddl{
"INSERT INTO EventRanges(SubRun, begin, end) " 158 sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt,
nullptr);
159 for (
auto const& range : rs) {
160 insert_eventRanges_row(stmt, range.subRun(), range.begin(), range.end());
162 sqlite3_finalize(stmt);
167 insertIntoJoinTable(sqlite3* db,
170 vector<unsigned>
const& eventRangesIDs)
173 sqlite3_stmt* stmt{
nullptr};
176 "RangeSets_EventRanges(RangeSetsID, EventRangesID) Values(?,?);"};
177 sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt,
nullptr);
178 cet::for_all(eventRangesIDs, [stmt, rsID](
auto const eventRangeID) {
179 insert_rangeSets_eventSets_row(stmt, rsID, eventRangeID);
181 sqlite3_finalize(stmt);
201 if (productRS.
ranges().empty()) {
204 auto const r = productRS.
run();
205 auto const& productFront = productRS.
ranges().front();
206 if (!principalRS.
contains(r, productFront.subRun(), productFront.
begin())) {
252 template <BranchType BT>
256 bool const producedInThisProcess)
269 if (!producedInThisProcess) {
270 maybeInvalidateRangeSet(BT, principalRS, rs);
275 template <BranchType BT>
284 template <BranchType BT>
289 map<unsigned, unsigned>& )
292 template <BranchType BT>
297 map<unsigned, unsigned>& checksumToIndexLookup)
303 auto it = checksumToIndexLookup.find(rs.
checksum());
304 if (it != checksumToIndexLookup.cend()) {
307 unsigned const rsID = getNewRangeSetID(db, BT, rs.
run());
309 checksumToIndexLookup.emplace(rs.
checksum(), rsID);
310 insertIntoEventRanges(db, rs);
311 auto const& eventRangesIDs = getExistingRangeSetIDs(db, rs);
312 insertIntoJoinTable(db, BT, rsID, eventRangesIDs);
320 return (fp(cc).nEvents() !=
322 (fp(cc).nSubRuns() !=
326 (fp(cc).age().count() !=
334 RootOutputFile::OutputItem::~OutputItem() =
default;
337 : branchDescription_{bd},
product_{
nullptr}
355 bool const fastCloning,
356 bool const wantAllEvents,
359 bool result = fastCloning;
361 <<
"Initial fast cloning configuration " 362 << (fastCloningSet ?
"(user-set): " :
"(from default): ") << boolalpha
364 if (fastCloning && !wantAllEvents) {
367 <<
"Fast cloning deactivated due to presence of\n" 368 <<
"event selection configuration.";
370 if (fastCloning && maxCriterionSpecified(cc) &&
374 <<
"Fast cloning deactivated due to request to allow\n" 375 <<
"output file switching at an Event, SubRun, or Run boundary.";
383 string const& fileName,
385 int const compressionLevel,
386 int64_t const saveMemoryObjectThreshold,
387 int64_t const treeMaxVirtualSize,
388 int const splitLevel,
389 int const basketSize,
391 bool const dropMetaDataForDroppedData,
392 bool const fastCloningRequested)
393 :
mutex_{
"RootOutputFile::mutex_"}
422 if (!eventHistoryTree_) {
424 <<
"Failed to create the tree for History objects\n";
440 <<
"Failed to create a branch for History in the output file\n";
445 make_unique<RootOutputTree>(
filePtr_.get(),
452 saveMemoryObjectThreshold);
454 make_unique<RootOutputTree>(
filePtr_.get(),
461 saveMemoryObjectThreshold);
462 treePointers_[2] = make_unique<RootOutputTree>(
filePtr_.get(),
469 saveMemoryObjectThreshold);
471 make_unique<RootOutputTree>(
filePtr_.get(),
478 saveMemoryObjectThreshold);
484 SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE));
489 root::DictionaryChecker checker;
494 checker.reportMissingDictionaries();
501 "UNIQUE (SubRun,begin,end) ON CONFLICT IGNORE"});
506 "SubRunRangeSets_EventRanges",
507 {
"RangeSetsID INTEGER",
508 "EventRangesID INTEGER",
509 "PRIMARY KEY(RangeSetsID,EventRangesID)"},
514 "RunRangeSets_EventRanges",
515 {
"RangeSetsID INTEGER",
516 "EventRangesID INTEGER",
517 "PRIMARY KEY(RangeSetsID,EventRangesID)"},
524 RecursiveMutexSentry sentry{
mutex_, __func__};
531 RecursiveMutexSentry sentry{
mutex_, __func__};
538 RecursiveMutexSentry sentry{
mutex_, __func__};
539 auto selectProductsToWrite = [
this](
BranchType const bt) {
542 auto const& pd = pr.second;
553 if (pd.transient()) {
558 for (
auto const&
val : items) {
568 bool const fastCloneFromOutputModule)
570 RecursiveMutexSentry sentry{
mutex_, __func__};
573 fastCloneFromOutputModule && rfb};
580 <<
"Fast cloning deactivated for this input file due to " 581 <<
"splitting level and/or basket size.";
583 }
else if (rfb && rfb->tree() &&
584 rfb->tree()->GetCurrentFile()->GetVersion() < 60001) {
586 <<
"Fast cloning deactivated for this input file due to " 587 <<
"ROOT version used to write it (< 6.00/01)\n" 588 "having a different splitting policy.";
593 <<
"Fast cloning deactivated for this input file due to " 594 <<
"reading in file that has a different ProductID schema.";
599 <<
"Fast cloning reactivated for this input file.";
602 auto tree = (rfb && rfb->tree()) ? rfb->tree() :
nullptr;
609 RecursiveMutexSentry sentry{
mutex_, __func__};
616 RecursiveMutexSentry sentry{
mutex_, __func__};
623 RecursiveMutexSentry sentry{
mutex_, __func__};
624 using namespace chrono;
625 unsigned int constexpr oneK{1024u};
634 RecursiveMutexSentry sentry{
mutex_, __func__};
644 History historyForOutput{e.history()};
650 <<
"Failed to fill the History tree for event: " << e.eventID()
651 <<
"\nTTree::Fill() returned " << sz <<
" bytes written." << endl;
655 string dataType{
"MC"};
670 RecursiveMutexSentry sentry{
mutex_, __func__};
682 RecursiveMutexSentry sentry{
mutex_, __func__};
694 RecursiveMutexSentry sentry{
mutex_, __func__};
695 auto pid = root::getObjectRequireDict<ParentageID>();
700 <<
"Failed to create a branch for ParentageIDs in the output file";
703 auto par = root::getObjectRequireDict<Parentage>();
708 <<
"Failed to create a branch for Parentages in the output file";
725 RecursiveMutexSentry sentry{
mutex_, __func__};
727 auto const* pver = &ver;
729 metaBranchRootName<FileFormatVersion>(), &pver,
basketSize_, 0);
738 RecursiveMutexSentry sentry{
mutex_, __func__};
741 auto const* findexElemPtr = &elem;
743 metaBranchRootName<FileIndex::Element>(), &findexElemPtr,
basketSize_, 0);
747 findexElemPtr = &entry;
756 RecursiveMutexSentry sentry{
mutex_, __func__};
770 RecursiveMutexSentry sentry{
mutex_, __func__};
773 pHistMap.emplace(pr);
775 auto const*
p = &pHistMap;
777 metaBranchRootName<ProcessHistoryMap>(), &
p,
basketSize_, 0);
780 <<
"Unable to locate required " 781 "ProcessHistoryMap branch in output " 793 RecursiveMutexSentry sentry{
mutex_, __func__};
796 *
rootFileDB_,
"FileCatalog_metadata", {{
"Name",
"Value"}},
true};
798 for (
auto const& kv : md) {
799 fileCatalogMetadata.insert(kv.first, kv.second);
802 fileCatalogMetadata.insert(
"file_format",
"\"artroot\"");
803 fileCatalogMetadata.insert(
"file_format_era",
805 fileCatalogMetadata.insert(
"file_format_version",
808 namespace bpt = boost::posix_time;
809 auto formatted_time = [](
auto const&
t) {
812 fileCatalogMetadata.insert(
"start_time",
815 fileCatalogMetadata.insert(
817 formatted_time(boost::posix_time::second_clock::universal_time()));
820 auto I = find_if(md.crbegin(), md.crend(), [](
auto const&
p) {
821 return p.first ==
"run_type";
823 if (I != md.crend()) {
827 buf <<
"[ " << srid.run() <<
", " << srid.subRun() <<
", " 833 fileCatalogMetadata.insert(
"runs", buf.str());
837 fileCatalogMetadata.insert(
"event_count",
840 auto eidToTuple = [](
EventID const& eid) ->
string {
841 ostringstream eidStr;
842 eidStr <<
"[ " << eid.run() <<
", " << eid.subRun() <<
", " << eid.event()
846 fileCatalogMetadata.insert(
"first_event",
848 fileCatalogMetadata.insert(
"last_event",
851 if (!stats.
parents().empty()) {
852 ostringstream pstring;
854 for (
auto const& parent : stats.
parents()) {
860 fileCatalogMetadata.insert(
"parents", pstring.str());
863 for (
auto const& kv : ssmd) {
864 fileCatalogMetadata.insert(kv.first, kv.second);
872 RecursiveMutexSentry sentry{
mutex_, __func__};
879 RecursiveMutexSentry sentry{
mutex_, __func__};
883 auto productDescriptionsToWrite = [
this, &
reg](
BranchType const bt) {
885 auto const& desc = pr.second;
892 metaBranchRootName<ProductRegistry>(), ®p,
basketSize_, 0);
901 RecursiveMutexSentry sentry{
mutex_, __func__};
904 metaBranchRootName<BranchChildren>(), &ppDeps,
basketSize_, 0);
913 RecursiveMutexSentry sentry{
mutex_, __func__};
921 RecursiveMutexSentry sentry{
mutex_, __func__};
932 RecursiveMutexSentry sentry{
mutex_, __func__};
935 auto const& eventRangesIDs = getExistingRangeSetIDs(*
rootFileDB_, ranges);
942 RecursiveMutexSentry sentry{
mutex_, __func__};
945 auto const& eventRangesIDs = getExistingRangeSetIDs(*
rootFileDB_, ranges);
949 template <BranchType BT>
953 string const& wrappedName)
955 RecursiveMutexSentry sentry{
mutex_, __func__};
962 template <BranchType BT>
966 string const& wrappedName)
968 RecursiveMutexSentry sentry{
mutex_, __func__};
975 template <BranchType BT>
978 vector<ProductProvenance>* vpp)
980 RecursiveMutexSentry sentry{
mutex_, __func__};
982 map<unsigned, unsigned> checksumToIndex;
983 auto const& principalRS = principal.
seenRanges();
984 set<ProductProvenance> keptprv;
986 auto const& bd =
val.branchDescription_;
987 auto const pid = bd.productID();
989 bool const produced = bd.produced();
990 bool const resolveProd = (produced || !fastCloning ||
993 bool const keepProvenance =
997 auto prov = keptprv.begin();
998 if (keepProvenance) {
1004 vector<ProductProvenance const*> stacked_pp;
1007 if (stacked_pp.size() == 0) {
1010 auto current_pp = stacked_pp.back();
1011 stacked_pp.pop_back();
1012 for (
auto const parent_bid :
1013 current_pp->parentage().parents()) {
1031 if (!parent_bd->produced()) {
1040 if (!keptprv.insert(*parent_pp).second) {
1046 stacked_pp.push_back(parent_pp.get());
1059 prov = keptprv.emplace(
pid, status).first;
1068 auto const& rs = getRangeSet<BT>(oh, principalRS, produced);
1079 auto prov_bid = prov->productID();
1080 if (keptprv.erase(*prov) != 1ull) {
1082 <<
"Attempt to set product status for product whose provenance " 1083 "is not being recorded.\n";
1090 auto const* product = getProduct<BT>(oh, rs, bd.wrappedName());
1091 setProductRangeSetID<BT>(
1093 val.product_ = product;
1096 vpp->assign(keptprv.begin(), keptprv.end());
1097 for (
auto const&
val : *vpp) {
1100 "RootOutputFile::fillBranches(principal, vpp):")
1101 <<
"Attempt to write a product with uninitialized provenance!\n";
SubRunAuxiliary const * pSubRunAux_
EventAuxiliary const * pEventAux_
void setFileStatus(OutputFileStatus const ofs)
int const compressionLevel_
T unique_value(query_result< T > const &r)
bool fastCloningEnabledAtConstruction_
FileIndex::EntryNumber_t runEntryNumber() const
std::string const & currentFileName() const
RunID const & runID() const noexcept
RootOutputFile(OutputModule *, std::string const &fileName, ClosingCriteria const &fileSwitchCriteria, int const compressionLevel, int64_t const saveMemoryObjectThreshold, int64_t const treeMaxVirtualSize, int const splitLevel, int const basketSize, DropMetaData dropMetaData, bool dropMetaDataForDroppedData, bool fastCloningRequested)
Granularity granularity() const
ProductProvenances * pRunProductProvenanceVector_
int64_t const treeMaxVirtualSize_
EventID const & eventID() const noexcept
auto insert_into(sqlite3 *const db, std::string const &tablename)
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
ProductProvenances resultsProductProvenanceVector_
std::chrono::steady_clock::time_point beginTime_
std::unique_ptr< TFile > filePtr_
std::vector< EventRange > const & ranges() const
std::string const & parentageTreeName()
void beginInputFile(RootFileBlock const *, bool fastClone)
EDProduct const * wrapper() const
std::string const & branchName() const
std::array< std::set< OutputItem >, NumBranchTypes > selectedOutputItemList_
std::string const & eventHistoryTreeName()
DropMetaData dropMetaData_
ProductProvenances runProductProvenanceVector_
BranchDescription const branchDescription_
void writeOne(EventPrincipal const &)
hep::concurrency::RecursiveMutex mutex_
void writeProductDependencies()
FileIndex::EntryNumber_t eventEntryNumber() const
void setRunAuxiliaryRangeSetID(RangeSet const &)
SubRunAuxiliary const & subRunAux() const
ProductStatus uninitialized()
void writeFileFormatVersion()
void writeParentageRegistry()
FileProperties const & fileProperties() const
std::map< ProcessHistoryID const, ProcessHistory > ProcessHistoryMap
std::vector< std::string > parents(bool want_basename=true) const
static void writeTTree(TTree *) noexcept(false)
ProductProvenances eventProductProvenanceVector_
ProductProvenances * pResultsProductProvenanceVector_
void incrementInputFileNumber()
void writeFileCatalogMetadata(FileStatsCollector const &stats, FileCatalogMetadata::collection_type const &, FileCatalogMetadata::collection_type const &)
ProductProvenance const * productProvenance() const
bool dropMetaDataForDroppedData_
SubRunID const & subRunID() const noexcept
void setSubRunAuxiliaryRangeSetID(RangeSet const &)
void addEventSelectionEntry(EventSelectionID const &eventSelection)
ProductProvenances subRunProductProvenanceVector_
std::string const & metaDataTreeName()
bool should_close(FileProperties const &) const
RangeSet seenRanges() const
static collection_type const & get()
void updateAge(std::chrono::seconds const age)
OutputHandle getForOutput(ProductID const &, bool resolveProd) const
cet::exempt_ptr< BranchDescription const > getProductDescription(ProductID const pid, bool const alwaysEnableLookupOfProducedProducts=false) const
void update_subRun(OutputFileStatus const status)
void create_table(sqlite3 *const db, std::string const &tablename, Cols const &...cols)
char const * metaBranchRootName()
void writeSubRun(SubRunPrincipal const &)
std::string const & parentageBranchName()
void sortBy_Run_SubRun_Event()
static bool shouldFastClone(bool const fastCloningSet, bool const fastCloning, bool const wantAllEvents, ClosingCriteria const &cc)
detail::DummyProductCache dummyProductCache_
boost::posix_time::ptime outputFileOpenTime() const
int getFileFormatVersion()
void writeResults(ResultsPrincipal &resp)
IDNumber_t< Level::SubRun > SubRunNumber_t
RootOutputTreePtrArray treePointers_
std::unique_ptr< cet::sqlite::Connection > rootFileDB_
std::string const & parentageIDBranchName()
void respondToCloseInputFile(FileBlock const &)
EDProduct const * product(std::string const &wrappedName)
void addEntry(EventID const &eID, EntryNumber_t entry)
bool contains(RunNumber_t, SubRunNumber_t, EventNumber_t) const
ProductProvenances * pEventProductProvenanceVector_
std::enable_if_t<!detail::RangeSetsSupported< BT >::value, EDProduct const * > getProduct(OutputHandle const &, RangeSet const &productRS, std::string const &wrappedName)
auto transform_all(Container &, OutputIt, UnaryOp)
EventID const & highestEventID() const
void setRangeSetID(unsigned const id) const
auto select(T const &...t)
void writeProductDescriptionRegistry()
std::string const & getFileFormatEra()
RunAuxiliary const & runAux() const
std::string const & BranchTypeToString(BranchType const bt)
cet::exempt_ptr< ProductProvenance const > branchToProductProvenance(ProductID const &) const
void writeParameterSetRegistry()
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
TTree * eventHistoryTree_
static TTree * makeTTree(TFile *, std::string const &name, int splitLevel)
std::string const & fileIndexTreeName()
std::array< ProductDescriptionsByID, NumBranchTypes > descriptionsToPersist_
std::string const & eventHistoryBranchName()
void checkDictionaries(BranchDescription const &productDesc)
bool canonical_string(std::string const &str, std::string &result)
void writeProcessConfigurationRegistry()
ResultsAuxiliary const & resultsAux() const
int64_t const saveMemoryObjectThreshold_
SelectionsArray const & keptProducts() const
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void writeRun(RunPrincipal const &)
bool operator<(OutputItem const &rh) const
static RangeSet invalid()
EventAuxiliary const & eventAux() const
static constexpr EventID invalidEvent() noexcept
IDNumber_t< Level::Event > EventNumber_t
ProductStatus dummyToPreventDoubleCount()
auto for_all(FwdCont &, Func)
void writeProcessHistoryRegistry()
static constexpr unsigned unsigned_max()
bool is_full_subRun() const
ResultsAuxiliary const * pResultsAux_
void update_run(OutputFileStatus const status)
std::set< SubRunID > const & seenSubRuns() const
ProductProvenances * pSubRunProductProvenanceVector_
void for_each_branch_type(F f)
RunAuxiliary const * pRunAux_
static constexpr unsigned size_max()
cet::registry_via_id< success_t, val > reg
FileIndex::EntryNumber_t subRunEntryNumber() const
void setRangeSetID(unsigned const id)
History const * pHistory_
void exec(sqlite3 *db, std::string const &ddl)
RangeSet const & rangeOfValidity() const
bool isRealData() const noexcept
std::string const & branchName() const noexcept
BranchChildren const & branchChildren() const
std::size_t eventsThisFile() const
ClosingCriteria fileSwitchCriteria_
void updateSize(unsigned const size)
std::string to_string(ModuleType const mt)
unsigned checksum() const
ProductStatus neverCreated()
EventID const & lowestEventID() const
void fillBranches(Principal const &, std::vector< ProductProvenance > *)
static void exportTo(sqlite3 *db)
fhicl::ParameterSetID selectorConfig() const
void setRangeSetID(unsigned const id) const
bool requestsToCloseFile()
IDNumber_t< Level::Run > RunNumber_t
static constexpr unsigned seconds_max()