test_tbb_dfp_any.cxx
Go to the documentation of this file.
1 // A minimally complete example of a tbb dfp that only deals in boost::any.
2 
3 #include "WireCellUtil/Testing.h"
4 
5 #include <tbb/flow_graph.h>
6 #include <boost/any.hpp>
7 
8 #include <string>
9 #include <deque>
10 #include <iostream>
11 using namespace std;
12 
13 // mock INode categories
16 };
17 
18 class MockNode {
19 public:
20  virtual ~MockNode() {}
21  virtual NodeCategory category() = 0;
22  virtual int concurrency() { return 0; }
23 };
24 
25 // mock an INode::pointer
26 typedef std::shared_ptr<MockNode> mock_node_pointer;
27 
28 
29 
30 class MockSource : public MockNode {
31  int m_count;
32  const int m_maxcount;
33 public:
34  MockSource(int maxcount = 10) : m_count(0), m_maxcount(maxcount) {}
35  virtual ~MockSource() {}
36  virtual NodeCategory category() { return sourceCat; }
37  virtual bool extract(boost::any& out) {
38  cerr << "Source: " << m_count << endl;
39  if (m_count > m_maxcount) {
40  cerr << "ModeSource drained\n";
41  return false;
42  }
43  ++m_count;
44  out = m_count;
45  return true;
46  }
47 };
48 
49 class MockFunction : public MockNode {
50  std::deque<int> m_numbers;
51 public:
52  virtual ~MockFunction() {}
53  virtual NodeCategory category() { return functionCat; }
54  virtual bool insert(boost::any& anyin) {
55  int num = boost::any_cast<int>(anyin);
56  m_numbers.push_back(num);
57  return true;
58  }
59  virtual bool extract(boost::any& anyout) {
60  if (m_numbers.empty()) {
61  return false;
62  }
63  anyout = m_numbers.front();
64  m_numbers.pop_front();
65  return true;
66  }
67 };
68 
69 class MockSink : public MockNode {
70 public:
71  virtual ~MockSink() {}
72  virtual NodeCategory category() { return sinkCat; }
73  virtual bool insert(const boost::any& anyin) {
74  int num = boost::any_cast<int>(anyin);
75  cerr << "Sunk number " << num << endl;
76  return true;
77  }
78 };
79 
80 // fixme: this fakes the factory until we clean up nodes to allow
81 // empty c'tor and use configuration.
83 {
84 
85  if (node_desc == "source") { // note actual desc should be class or class:inst
86  return mock_node_pointer(new MockSource);
87  }
88  if (node_desc == "drift") { // note actual desc should be class or class:inst
89  return mock_node_pointer(new MockFunction);
90  }
91  if (node_desc == "sink") { // note actual desc should be class or class:inst
92  return mock_node_pointer(new MockSink);
93  }
94  return nullptr;
95 }
96 
97 
98 
99 
100 typedef tbb::flow::sender<boost::any> sender_type;
101 typedef tbb::flow::receiver<boost::any> receiver_type;
102 typedef std::shared_ptr<sender_type> sender_port_pointer;
103 typedef std::shared_ptr<receiver_type> receiver_port_pointer;
104 typedef std::vector<sender_port_pointer> sender_port_vector;
105 typedef std::vector<receiver_port_pointer> receiver_port_vector;
106 
107 typedef tbb::flow::source_node<boost::any> source_node;
108 typedef tbb::flow::function_node<boost::any> sink_node;
109 
110 // base facade, expose sender/receiver ports and provide initialize hook
112 public:
113  virtual ~TbbNodeWrapper() {}
114 
117 
118  // call before running graph
119  virtual void initialize() { }
120 
121 };
122 
123 // expose wrappers only as a shared pointer
124 typedef std::shared_ptr<TbbNodeWrapper> TbbNode;
125 
126 
127 //
128 // SOURCE
129 //
130 
131 // adapter to convert from WC source node to TBB source node body.
133 public:
135  m_wcnode = std::dynamic_pointer_cast<MockSource>(wcnode);
136  Assert(m_wcnode);
137  }
139  cerr << "TbbSourceBody copied\n";
140  m_wcnode = other.m_wcnode;
141  }
143 
144  // assignment - should this in general clone the underlying WC node to allow for proper concurrency?
145  void operator=( const TbbSourceBody& other) {
146  cerr << "TbbSourceBody assigned\n";
147  m_wcnode = other.m_wcnode;
148  }
149 
150  bool operator()(boost::any& out) {
151  cerr << "Extracting from " << m_wcnode << endl;
152  return m_wcnode->extract(out);
153  }
154 private:
155  std::shared_ptr<MockSource> m_wcnode;
156 
157 };
158 
159 // implement facade to access ports for source nodes
161 public:
163  : m_tbbnode(new source_node(graph, TbbSourceBody(wcnode), false))
164  { }
165 
166  virtual void initialize() {
167  cerr << "Activating source node\n";
168  m_tbbnode->activate();
169  }
170 
172  auto ptr = dynamic_pointer_cast< sender_type >(m_tbbnode);
173  Assert(ptr);
174  return sender_port_vector{ptr};
175  }
176 private:
177  std::shared_ptr<source_node> m_tbbnode;
178 };
179 
180 
181 
182 //
183 // SINK
184 //
185 
186 // adapter to convert from WC sink node to TBB sink node body.
187 class TbbSinkBody {
188 public:
189 
191  m_wcnode = std::dynamic_pointer_cast<MockSink>(wcnode);
192  Assert(m_wcnode);
193  }
195  cerr << "TbbSinkBody copied\n";
196  m_wcnode = other.m_wcnode;
197  }
199 
200  // assignment - should this in general clone the underlying WC node to allow for proper concurrency?
201  void operator=( const TbbSinkBody& other) {
202  cerr << "TbbSinkBody assigned\n";
203  m_wcnode = other.m_wcnode;
204  }
205 
206  boost::any operator()(const boost::any& in) {
207  cerr << "Inserting to " << m_wcnode << endl;
208  m_wcnode->insert(in);
209  return in;
210  }
211 private:
212  std::shared_ptr<MockSink> m_wcnode;
213 };
214 
215 
216 
217 // implement facade to access ports for sink nodes
219 public:
221  : m_tbbnode(new sink_node(graph, wcnode->concurrency(), TbbSinkBody(wcnode)))
222  { }
223 
225  auto ptr = dynamic_pointer_cast< receiver_type >(m_tbbnode);
226  Assert(ptr);
227  return receiver_port_vector{ptr};
228  }
229 private:
230  std::shared_ptr<sink_node> m_tbbnode;
231 
232 };
233 
234 
235 
237 {
238  mock_node_pointer wcnode = get_node(node_desc);
239  if (! wcnode) {
240  cerr << "Failed to get node for " << node_desc << endl;
241  return nullptr;
242  }
243 
244  cerr << "Getting node from category: " << wcnode->category() << endl;
245  switch (wcnode->category()) {
246  case sourceCat:
247  return TbbNode(new TbbSourceNodeWrapper(graph, wcnode));
248  case sinkCat:
249  return TbbNode(new TbbSinkNodeWrapper(graph, wcnode));
250  // case functionCat:
251  // return TbbNode(new TbbFunctionNodeWrapper(garph, wcnode));
252  default:
253  return nullptr;
254  }
255  return nullptr;
256 }
257 
258 bool connect(TbbNode sender, TbbNode receiver, size_t sport=0, size_t rport=0);
259 bool connect(TbbNode sender, TbbNode receiver, size_t sport, size_t rport)
260 {
261  Assert(sender);
262  Assert(receiver);
263  auto sports = sender->sender_ports();
264  auto rports = receiver->receiver_ports();
265 
266  Assert(sports.size() > sport);
267  Assert(rports.size() > rport);
268 
269  sender_type* s = sports[sport].get();
270  receiver_type* r = rports[rport].get();
271  Assert(s);
272  Assert(r);
273 
274  cerr << "Connecting " << s << " and " << r << endl;
275  make_edge(*s, *r);
276  return true;
277 }
278 
279 int main()
280 {
282  TbbNode source = make_node(graph, "source");
283  Assert(source);
284  TbbNode sink = make_node(graph, "sink");
285  Assert(sink);
286 
287  Assert (connect(source, sink));
288 
289  sink->initialize();
290  source->initialize();
291 
292 
293  graph.wait_for_all();
294 
295  return 0;
296 }
virtual bool extract(boost::any &anyout)
std::vector< receiver_port_pointer > receiver_port_vector
bool connect(TbbNode sender, TbbNode receiver, size_t sport=0, size_t rport=0)
mock_node_pointer get_node(const std::string &node_desc)
const CharType(& source)[N]
Definition: pointer.h:1147
std::shared_ptr< sink_node > m_tbbnode
void operator=(const TbbSinkBody &other)
virtual ~MockFunction()
TbbNode make_node(tbb::flow::graph &graph, const std::string &node_desc)
virtual receiver_port_vector receiver_ports()
std::string string
Definition: nybbler.cc:12
TbbSinkBody(const TbbSinkBody &other)
void operator=(const TbbSourceBody &other)
virtual ~MockNode()
boost::any operator()(const boost::any &in)
def graph(desc, maker=maker)
Definition: apa.py:294
STL namespace.
TbbSourceBody(const TbbSourceBody &other)
std::vector< sender_port_pointer > sender_port_vector
virtual bool insert(const boost::any &anyin)
virtual sender_port_vector sender_ports()
virtual sender_port_vector sender_ports()
virtual int concurrency()
std::shared_ptr< sender_type > sender_port_pointer
std::shared_ptr< source_node > m_tbbnode
NodeCategory
TbbSinkNodeWrapper(tbb::flow::graph &graph, mock_node_pointer wcnode)
#define Assert
Definition: Testing.h:7
std::shared_ptr< receiver_type > receiver_port_pointer
TbbSinkBody(mock_node_pointer wcnode)
const int m_maxcount
virtual NodeCategory category()
int main()
virtual bool extract(boost::any &out)
bool operator()(boost::any &out)
virtual ~TbbNodeWrapper()
virtual void initialize()
std::deque< int > m_numbers
virtual NodeCategory category()
MockSource(int maxcount=10)
const void * ptr(const T *p)
Definition: format.h:3138
std::shared_ptr< MockSink > m_wcnode
virtual NodeCategory category()
tbb::flow::function_node< boost::any > sink_node
std::shared_ptr< MockNode > mock_node_pointer
TbbSourceNodeWrapper(tbb::flow::graph &graph, mock_node_pointer wcnode)
tbb::flow::sender< boost::any > sender_type
virtual ~MockSource()
tbb::flow::receiver< boost::any > receiver_type
std::shared_ptr< TbbNodeWrapper > TbbNode
virtual bool insert(boost::any &anyin)
static QCString * s
Definition: config.cpp:1042
TbbSourceBody(mock_node_pointer wcnode)
tbb::flow::source_node< boost::any > source_node
QTextStream & endl(QTextStream &s)
virtual receiver_port_vector receiver_ports()
virtual ~MockSink()
std::shared_ptr< MockSource > m_wcnode