SingleConsumerQ.cc
Go to the documentation of this file.
2 
3 namespace mf {
4 
5  SingleConsumerQ::SingleConsumerQ(int max_event_size, int max_queue_depth)
6  : max_event_size_(max_event_size)
7  , max_queue_depth_(max_queue_depth)
8  , pos_(max_queue_depth - 1)
9  , mem_(max_event_size * max_queue_depth)
10  , queue_(max_queue_depth)
11  {
12  // throw if event size 0 or queue depth 0
13 
14  for (char* i = &mem_[0]; i < &mem_[mem_.size()]; i += max_event_size)
15  buffer_pool_.push_back(i);
16  }
17 
20  {
21  // get lock
22  std::unique_lock<std::mutex> sl(pool_mutex_);
23  // wait for buffer to appear
24  while (pos_ < 0) {
25  pool_cond_.wait(sl);
26  }
27  void* v = buffer_pool_[pos_];
28  --pos_;
29  return Buffer(v, max_event_size_);
30  }
31 
32  void
34  {
35  // get lock
36  std::lock_guard<std::mutex> sl(pool_mutex_);
37  ++pos_;
38  buffer_pool_[pos_] = v;
39  pool_cond_.notify_all();
40  }
41 
42  void
44  {
45  // get lock
46  std::unique_lock<std::mutex> sl(queue_mutex_);
47  // if full, wait for item to be removed
48  while ((bpos_ + max_queue_depth_) == fpos_) {
49  push_cond_.wait(sl);
50  }
51 
52  // put buffer into queue
53  queue_[fpos_ % max_queue_depth_] = Buffer(v, len);
54  ++fpos_;
55  // signal consumer
56  pop_cond_.notify_all();
57  }
58 
61  {
62  // get lock
63  std::unique_lock<std::mutex> sl(queue_mutex_);
64  // if empty, wait for item to appear
65  while (bpos_ == fpos_) {
66  pop_cond_.wait(sl);
67  }
68  // get a buffer from the queue and return it
70  ++bpos_;
71  // note that these operations cannot throw
72  // signal producer
73  push_cond_.notify_all();
74  return v;
75  }
76 
77  void
79  {
80  // should the buffer be placed back onto the queue and not released?
81  // we got here because a commit did to occur in the consumer.
82  // we will allow consumers to call or not call commit for now, meaning
83  // that we cannot distinguish between exception conditions and normal
84  // return. The buffer will always be released
86  }
87 
88  void
90  {
92  }
93 }
std::condition_variable push_cond_
void commitProducerBuffer(void *, int)
std::condition_variable pop_cond_
SingleConsumerQ(int const max_event_size, int const max_queue_depth)
void releaseProducerBuffer(void *)
std::condition_variable pool_cond_
void releaseConsumerBuffer(void *)
void commitConsumerBuffer(void *, int)