SharedResourcesRegistry.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
6 #include "hep_concurrency/RecursiveMutex.h"
7 #include "hep_concurrency/SerialTaskQueue.h"
8 #include "hep_concurrency/tsan.h"
9 
10 #include <algorithm>
11 #include <atomic>
12 #include <cassert>
13 #include <map>
14 #include <memory>
15 #include <mutex>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 using namespace hep::concurrency;
21 using namespace std;
22 using namespace std::string_literals;
23 
24 namespace art {
25 
26  detail::SharedResource_t const SharedResourcesRegistry::Legacy{"__legacy__",
27  false};
28 
29  SharedResourcesRegistry::QueueAndCounter::QueueAndCounter()
30  : queue_{std::make_shared<hep::concurrency::SerialTaskQueue>()}
31  {}
32 
34  SharedResourcesRegistry::instance(bool shutdown /*= false*/)
35  {
36  static SharedResourcesRegistry* me{nullptr};
37  if (shutdown) {
38  delete me;
39  me = nullptr;
40  return me;
41  }
42  if (me == nullptr) {
43  me = new SharedResourcesRegistry{};
44  }
45  return me;
46  }
47 
49  public:
51  {
53  }
55  };
56 
57  namespace {
60  error_context(std::string const& name)
61  {
62  return "An error occurred while attempting to register the shared resource '"s +
63  name + "'.\n";
64  }
65  } // unnamed namespace
66 
68  {
69  frozen_ = false;
70  nLegacy_ = 0U;
71  // Propulate queues for known shared resources. Creating these
72  // slots does *not* automatically introduce synchronization.
73  // Synchronization is enabled based on the resource-names argument
74  // presented to the 'createQueues' member function.
76  }
77 
78  void
80  false)
81  {
82  RecursiveMutexSentry sentry{mutex_, __func__};
83  if (frozen_) {
84  throw art::Exception{art::errors::LogicError, error_context(name)}
85  << "The shared-resources registry has been frozen. All 'serialize' "
86  "calls\n"
87  << "must be made in the constructor of a shared module and no later.\n";
88  }
89  auto it = resourceMap_.find(name);
90  if (it == cend(resourceMap_)) {
91  throw art::Exception{art::errors::LogicError, error_context(name)}
92  << "A 'serialize' call was made for a resource that has not been "
93  "registered.\n"
94  << "If the resource is an art-based service, make sure that the "
95  "service\n"
96  << "has been configured for this job. Otherwise, use the "
97  "'serializeExternal'\n"
98  << "function call. If neither of these approaches is appropriate, "
99  "contact\n"
100  << "artists@fnal.gov.\n";
101  }
102  auto& queueAndCounter = it->second;
103  if (name == Legacy.name) {
104  ++nLegacy_;
105  // Make sure all non-legacy resources have a higher count, which
106  // makes legacy always the first queue.
107  for (auto& keyAndVal : resourceMap_) {
108  // Note: keyAndVal.first is a string (name of resource)
109  // Note: keyAndVal.second is a QueueAndCounter
110  ++keyAndVal.second.counter_;
111  }
112  return;
113  }
114  // count the number of times the resource was registered
115  ++queueAndCounter.counter_;
116  if (queueAndCounter.counter_.load() == 1) {
117  // Make sure all non-legacy resources have a higher count, which
118  // makes legacy always the first queue. When first registering
119  // a non-legacy resource, we have to account for any legacy
120  // resource registrations already made.
121  queueAndCounter.counter_ += nLegacy_;
122  }
123  }
124 
125  void
127  detail::SharedResource_t const& resource)
128  {
129  RecursiveMutexSentry sentry{mutex_, __func__};
130  resourceMap_[resource.name];
131  }
132 
133  void
135  false)
136  {
137  RecursiveMutexSentry sentry{mutex_, __func__};
138  // Note: This has the intended side-effect of creating the entry
139  // if it does not yet exist.
142  }
143 
144  void
146  {
147  frozen_ = true;
148  }
149 
150  vector<shared_ptr<SerialTaskQueue>>
151  SharedResourcesRegistry::createQueues(string const& resourceName) const
152  {
153  RecursiveMutexSentry sentry{mutex_, __func__};
154  vector const names{resourceName};
155  return createQueues(names);
156  }
157 
158  vector<shared_ptr<SerialTaskQueue>>
160  vector<string> const& resourceNames) const
161  {
162  RecursiveMutexSentry sentry{mutex_, __func__};
163  map<pair<unsigned, string>, shared_ptr<SerialTaskQueue>> sortedResources;
164  if (cet::search_all(resourceNames, Legacy.name)) {
165  // This acquirer is for a legacy module, get the queues for all
166  // resources.
167  // Note: We do not trust legacy modules, they may be accessing
168  // one of the shared resources without our knowledge, so we
169  // isolate them from the one modules as well as each other.
170  for (auto const& [key, queueAndCounter] : resourceMap_) {
171  sortedResources.emplace(make_pair(queueAndCounter.counter_.load(), key),
172  atomic_load(&queueAndCounter.queue_));
173  }
174  } else {
175  // Not for a legacy module, get the queues for the named
176  // resources.
177  for (auto const& name : resourceNames) {
178  auto iter = resourceMap_.find(name);
179  assert(iter != resourceMap_.end());
180  auto const& [key, queueAndCounter] = *iter;
181  sortedResources.emplace(make_pair(queueAndCounter.counter_.load(), key),
182  atomic_load(&queueAndCounter.queue_));
183  }
184  }
185  vector<shared_ptr<SerialTaskQueue>> queues;
186  if (sortedResources.empty()) {
187  // Error, none of the resource names were registered. Calling
188  // code is depending on there being at least one shared queue.
189  queues.emplace_back(make_shared<SerialTaskQueue>());
190  } else {
191  // At least some of the named resources exist.
192  queues.reserve(sortedResources.size());
193  for (auto const& keyAndVal : sortedResources) {
194  // Note: keyAndVal.second is a shared_ptr<SerialTaskQueue>
195  queues.push_back(keyAndVal.second);
196  }
197  }
198  return queues;
199  }
200 
201 } // namespace art
static QCString name
Definition: declinfo.cpp:673
std::map< std::string, QueueAndCounter > resourceMap_
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
Definition: StdUtils.h:87
std::string string
Definition: nybbler.cc:12
void registerSharedResource(detail::SharedResource_t const &) noexcept( false)
struct vector vector
STL namespace.
std::vector< std::shared_ptr< hep::concurrency::SerialTaskQueue > > createQueues(std::string const &resourceName) const
hep::concurrency::RecursiveMutex mutex_
static SharedResourcesRegistry * instance(bool shutdown=false)
static detail::SharedResource_t const Legacy
bool search_all(FwdCont const &, Datum const &)
def key(type, name=None)
Definition: graph.py:13
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void updateSharedResource(std::string const &) noexcept(false)
static QCString * s
Definition: config.cpp:1042