InputFileCatalog.cc
Go to the documentation of this file.
1 //////////////////////////////////////////////////////////////////////
2 //
3 // InputFileCatalog
4 //
5 //////////////////////////////////////////////////////////////////////
6 
12 #include "boost/algorithm/string.hpp"
14 #include "cetlib_except/exception.h"
15 
16 #include <cassert>
17 
18 namespace art {
19 
22  : fileSources_{config().namesParameter()}
23  {
24 
25  if (fileSources_.empty()) {
27  "InputFileCatalog::InputFileCatalog()\n")
28  << "Empty '" << config().namesParameter.name()
29  << "' parameter specified for input source.\n";
30  }
31 
32  // Configure FileDelivery service
33  ci_->configure(fileSources_);
34  searchable_ = ci_->isSearchable();
35 
36  if (searchable_)
37  fileCatalogItems_.resize(fileSources_.size());
38  }
39 
40  FileCatalogItem const&
42  {
43  if (fileIdx_ == indexEnd) {
44  throw art::Exception(
46  "Cannot access the current file while the file catalog is empty!");
47  }
48  assert(fileIdx_ <= maxIdx_);
50  }
51 
52  size_t
54  {
55  return fileIdx_;
56  }
57 
58  bool
59  InputFileCatalog::getNextFile(int const attempts)
60  {
61  // get next file from FileDelivery service and give it to
62  // currentFile_ object returns false if theres no more file
63  //
64  // If hasNextFile() has been called prior to getNextFile(), it
65  // does not actually go fetch the next file from FileDelivery
66  // service, instead, it advances the iterator by one and make the
67  // "hidden" next file current.
68 
70  return false;
71 
72  if ((nextFileProbed_ && hasNextFile_) ||
73  retrieveNextFile(nextItem_, attempts)) {
74  nextFileProbed_ = false;
75  fileIdx_ =
76  (fileIdx_ == indexEnd) ? 0 : (searchable_ ? (fileIdx_ + 1) : 0);
77  if (fileIdx_ > maxIdx_)
78  maxIdx_ = fileIdx_;
80  return true;
81  }
82 
83  return false;
84  }
85 
86  bool
87  InputFileCatalog::hasNextFile(int const attempts)
88  {
89  // A probe. It tries (and actually does) retrieve the next file
90  // from the FileDelivery service. But does not advance the current
91  // file pointer
92  if (nextFileProbed_)
93  return hasNextFile_;
94 
96  nextFileProbed_ = true;
97 
98  return hasNextFile_;
99  }
100 
101  bool
103  int const attempts,
104  bool const transferOnly)
105  {
106  // Tell the service the current opened file (if there is one) is consumed
107  finish();
108 
109  // retrieve (deliver and transfer) next file from service
110  // or, do the transfer only
112  if (transferOnly) {
113  status = transferNextFile(item);
114  } else {
115  status = retrieveNextFileFromCacheOrService(item);
116  }
117 
118  switch (status) {
120  // mark the file as transferred
121  ci_->updateStatus(item.uri(), FileDisposition::TRANSFERRED);
122  return true;
123  }
125  return false;
126  }
128  if (attempts <= 1) {
130  "InputFileCatalog::retrieveNextFile()\n")
131  << "Delivery error encountered after reaching maximum number of "
132  "attemtps!";
133  } else {
134  return retrieveNextFile(item, attempts - 1, false);
135  }
136  }
138  if (attempts <= 1) {
139  // if we end up with a transfer error, the method returns
140  // with a true flag and empty filename. Weired enough, but
141  // the next file does exist we just cannot retrieve it. Therefore
142  // we notify the service that the file has been skipped
143  ci_->updateStatus(item.uri(), FileDisposition::SKIPPED);
144  return true;
145  } else {
146  return retrieveNextFile(item, attempts - 1, true);
147  }
148  }
149  }
150  assert(false);
151  return false; // Unreachable, but keeps the compiler happy
152  }
153 
156  {
157  // Try to get it from cached files
158  if (fileIdx_ < maxIdx_) {
159  item = fileCatalogItems_[fileIdx_ + 1];
161  }
162 
163  // Try to get it from the service
164  std::string uri;
165  double wait = 0.0;
166 
167  // get file delivered
168  int const result = ci_->getNextFileURI(uri, wait);
169 
170  if (result == FileDeliveryStatus::NO_MORE_FILES)
172 
173  if (result != FileDeliveryStatus::SUCCESS)
175 
176  item = FileCatalogItem("", "", uri);
177 
178  // get file transfered
179  return transferNextFile(item);
180  }
181 
184  {
185  std::string pfn;
186  int const result = ft_->translateToLocalFilename(item.uri(), pfn);
187 
188  if (result != FileTransferStatus::SUCCESS) {
189  item.fileName("");
190  item.logicalFileName("");
191  item.skip();
193  }
194 
195  // successfully retrieved the file
196  std::string lfn = pfn;
197  boost::trim(pfn);
198 
199  if (pfn.empty()) {
200  throw art::Exception(
202  "InputFileCatalog::retrieveNextFileFromCacheService()\n")
203  << "An empty string specified in parameter for input source.\n";
204  }
205 
206  if (isPhysical(pfn)) {
207  lfn.clear();
208  }
209 
210  item.fileName(pfn);
211  item.logicalFileName(lfn);
213  }
214 
215  void
217  {
218  if (!searchable_) {
220  "InputFileCatalog::rewind()\n")
221  << "A non-searchable catalog is not allowed to rewind!";
222  }
223  fileIdx_ = 0;
224  }
225 
226  void
228  {
229  // rewind to a previous file location in the catalog service is
230  // not rewinded. only usable when
231  // FileDeliveryService::areFilesPersistent() is true
232  if (!searchable_) {
234  "InputFileCatalog::rewindTo()\n")
235  << "A non-searchable catalog is not allowed to rewind!";
236  }
237 
238  if (index > maxIdx_) {
240  "InputFileCatalog::rewindTo()\n")
241  << "Index " << index << " is out of range!";
242  }
243 
244  fileIdx_ = index;
245  }
246 
247  void
249  {
250  if (fileIdx_ != indexEnd // there is a current file
251  && !currentFile().skipped() // not skipped
252  && !currentFile().consumed()) // not consumed
253  {
254  ci_->updateStatus(currentFile().uri(), FileDisposition::CONSUMED);
255  fileCatalogItems_[fileIdx_].consume();
256  }
257  }
258 
259 } // namespace art
std::vector< std::string > fileSources_
size_t currentIndex() const
std::string const & name() const
Definition: ParameterBase.h:43
bool hasNextFile(int attempts=5)
static std::string trim(const std::string &str, const std::string &whitespace=" \t")
Definition: doxyindexer.cpp:47
void wait(std::chrono::milliseconds dur)
static QCString result
static constexpr size_t indexEnd
std::string string
Definition: nybbler.cc:12
FileCatalogStatus retrieveNextFileFromCacheOrService(FileCatalogItem &item)
bool retrieveNextFile(FileCatalogItem &item, int attempts, bool transferOnly=false)
bool getNextFile(int attempts=5)
ServiceHandle< FileTransfer > ft_
std::string const & logicalFileName() const
Definition: FileCatalog.h:30
ServiceHandle< CatalogInterface > ci_
FileCatalogStatus
fhicl::Sequence< std::string > namesParameter
FileCatalogItem const & currentFile() const
std::vector< FileCatalogItem > fileCatalogItems_
static bool isPhysical(std::string const &name)
Definition: FileCatalog.h:93
static Config * config
Definition: config.cpp:1054
FileCatalogItem nextItem_
void rewindTo(size_t index)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
InputFileCatalog(fhicl::TableFragment< Config > const &config)
std::string const & fileName() const
Definition: FileCatalog.h:25
FileCatalogStatus transferNextFile(FileCatalogItem &item)
std::string const & uri() const
Definition: FileCatalog.h:35