SingleConsumerQ.h
Go to the documentation of this file.
1 #ifndef messagefacility_Utilities_SingleConsumerQ_h
2 #define messagefacility_Utilities_SingleConsumerQ_h
3 /*
4  A bounded queue for use in a multi-threaded producer/consumer application.
5  This is a simple design. It is only meant to be used where there is
6  one consumer and one or more producers using the a queue instance.
7 
8  The problem with multiple consumers is the separate front/pop
9  member functions. If they are pulled together into one function,
10  multiple consumers may be possible, but exception safety would then
11  be a problem - popping an item off the queue to be held as a local
12  variable, followed by its removal from the queue. Having fixed size
13  buffers within a fixed size pool and using a circular buffer as the
14  queue alleviates most of this problem because exceptions will not
15  occur during manipulation. The only problem left to be checked is
16  how (or if) the std mutex manipulation can throw and when.
17 
18  Note: the current implementation has no protection again unsigned int
19  overflows
20 
21  Missing:
22  - the ring buffer is really not used to its fullest extent
23  - the buffer sizes are fixed and cannot grow
24  - a simple Buffer object is returned that has the pointer and len
25  separate. The length should be stored as the first word of the
26  buffer itself
27  - timeouts for consumer
28  - good way to signal to consumer to end
29  - keeping the instance of this thing around until all using threads are
30  done with it
31 */
32 
33 #include <condition_variable>
34 #include <mutex>
35 #include <vector>
36 
37 namespace mf {
38 
40  public:
41  struct Buffer {
42  Buffer() = default;
43  Buffer(void* p, int const len) : ptr_{p}, len_{len} {}
44 
45  void* ptr_{nullptr};
46  int len_{};
47  };
48 
49  SingleConsumerQ(int const max_event_size, int const max_queue_depth);
50 
51  struct ConsumerType {
54  {
55  return b.getConsumerBuffer();
56  }
57  static void
59  {
61  }
62  static void
63  commit(SingleConsumerQ& b, void* v, int size)
64  {
65  b.commitConsumerBuffer(v, size);
66  }
67  };
68 
69  struct ProducerType {
72  {
73  return b.getProducerBuffer();
74  }
75  static void
77  {
79  }
80  static void
81  commit(SingleConsumerQ& b, void* v, int size)
82  {
83  b.commitProducerBuffer(v, size);
84  }
85  };
86 
87  template <class T>
88  class OperateBuffer {
89  public:
91  : b_{b}, v_{T::get(b)}, committed_{false}
92  {}
94  {
95  if (!committed_)
96  T::release(b_, v_.ptr_);
97  }
98 
99  void*
100  buffer() const
101  {
102  return v_.ptr_;
103  }
104  int
105  size() const
106  {
107  return v_.len_;
108  }
109  void
110  commit(int const theSize = 0)
111  {
112  T::commit(b_, v_.ptr_, theSize);
113  committed_ = true;
114  }
115 
116  private:
120  };
121 
124 
126  void releaseProducerBuffer(void*);
127  void commitProducerBuffer(void*, int);
128 
130  void releaseConsumerBuffer(void*);
131  void commitConsumerBuffer(void*, int);
132 
133  int
134  maxEventSize() const
135  {
136  return max_event_size_;
137  }
138  int
140  {
141  return max_queue_depth_;
142  }
143 
144  SingleConsumerQ(SingleConsumerQ const&) = delete;
145  SingleConsumerQ operator=(SingleConsumerQ const&) = delete;
146 
147  private:
148  // the memory for the buffers
149  using ByteArray = std::vector<char>;
150  // the pool of buffers
151  using Pool = std::vector<void*>;
152  // the queue
153  using Queue = std::vector<Buffer>;
154 
157  int pos_; // use pool as stack of available buffers
161  unsigned int fpos_{}, bpos_{}; // positions for queue - front and back
162 
165  std::condition_variable pool_cond_{};
166  std::condition_variable pop_cond_{};
167  std::condition_variable push_cond_{};
168  };
169 }
170 #endif /* messagefacility_Utilities_SingleConsumerQ_h */
171 
172 // Local variables:
173 // mode: c++
174 // End:
int maxEventSize() const
std::condition_variable push_cond_
void commitProducerBuffer(void *, int)
std::vector< void * > Pool
SingleConsumerQ operator=(SingleConsumerQ const &)=delete
std::condition_variable pop_cond_
int maxQueueDepth() const
SingleConsumerQ(int const max_event_size, int const max_queue_depth)
std::vector< Buffer > Queue
void releaseProducerBuffer(void *)
static void commit(SingleConsumerQ &b, void *v, int size)
void commit(int const theSize=0)
Buffer(void *p, int const len)
static void release(SingleConsumerQ &b, void *v)
std::condition_variable pool_cond_
static void release(SingleConsumerQ &b, void *v)
std::vector< char > ByteArray
void releaseConsumerBuffer(void *)
void commitConsumerBuffer(void *, int)
static void commit(SingleConsumerQ &b, void *v, int size)