6 #include "hep_concurrency/RecursiveMutex.h" 7 #include "hep_concurrency/SerialTaskQueue.h" 8 #include "hep_concurrency/tsan.h" 26 detail::SharedResource_t
const SharedResourcesRegistry::Legacy{
"__legacy__",
29 SharedResourcesRegistry::QueueAndCounter::QueueAndCounter()
30 : queue_{std::make_shared<hep::concurrency::SerialTaskQueue>()}
62 return "An error occurred while attempting to register the shared resource '"s +
82 RecursiveMutexSentry sentry{
mutex_, __func__};
85 <<
"The shared-resources registry has been frozen. All 'serialize' " 87 <<
"must be made in the constructor of a shared module and no later.\n";
92 <<
"A 'serialize' call was made for a resource that has not been " 94 <<
"If the resource is an art-based service, make sure that the " 96 <<
"has been configured for this job. Otherwise, use the " 97 "'serializeExternal'\n" 98 <<
"function call. If neither of these approaches is appropriate, " 100 <<
"artists@fnal.gov.\n";
102 auto& queueAndCounter = it->second;
110 ++keyAndVal.second.counter_;
115 ++queueAndCounter.counter_;
116 if (queueAndCounter.counter_.load() == 1) {
121 queueAndCounter.counter_ +=
nLegacy_;
129 RecursiveMutexSentry sentry{
mutex_, __func__};
137 RecursiveMutexSentry sentry{
mutex_, __func__};
150 vector<shared_ptr<SerialTaskQueue>>
153 RecursiveMutexSentry sentry{
mutex_, __func__};
154 vector const names{resourceName};
158 vector<shared_ptr<SerialTaskQueue>>
160 vector<string>
const& resourceNames)
const 162 RecursiveMutexSentry sentry{
mutex_, __func__};
163 map<pair<unsigned, string>, shared_ptr<SerialTaskQueue>> sortedResources;
171 sortedResources.emplace(make_pair(queueAndCounter.counter_.load(),
key),
172 atomic_load(&queueAndCounter.queue_));
177 for (
auto const&
name : resourceNames) {
180 auto const& [
key, queueAndCounter] = *iter;
181 sortedResources.emplace(make_pair(queueAndCounter.counter_.load(),
key),
182 atomic_load(&queueAndCounter.queue_));
185 vector<shared_ptr<SerialTaskQueue>> queues;
186 if (sortedResources.empty()) {
189 queues.emplace_back(make_shared<SerialTaskQueue>());
192 queues.reserve(sortedResources.size());
193 for (
auto const& keyAndVal : sortedResources) {
195 queues.push_back(keyAndVal.second);
std::map< std::string, QueueAndCounter > resourceMap_
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
void registerSharedResource(detail::SharedResource_t const &) noexcept( false)
std::vector< std::shared_ptr< hep::concurrency::SerialTaskQueue > > createQueues(std::string const &resourceName) const
hep::concurrency::RecursiveMutex mutex_
static SharedResourcesRegistry * instance(bool shutdown=false)
static detail::SharedResource_t const Legacy
bool search_all(FwdCont const &, Datum const &)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
SharedResourcesRegistry()
~AutoShutdownSharedResourcesRegistry()
void updateSharedResource(std::string const &) noexcept(false)