GenPipeline.h
Go to the documentation of this file.
1 /**
2  Define a pipeline facade over WCT nodes.
3 
4  FIXME: this needs to move into the mithical "interface utilities"
5  library along with much other stuff.
6  */
7 
8 #ifndef WIRECELL_GEN_PIPELINE
9 #define WIRECELL_GEN_PIPELINE
10 
14 
15 #include <queue>
16 #include <vector>
17 
18 
19 namespace WireCell {
20 
21 
22 typedef std::queue<boost::any> Pipe;
23 
24 // Base class for something that executes as a thunk.
25 class Proc {
26 public:
27  virtual ~Proc() {}
28  // Execute one operation of the process.
29  virtual bool operator()() = 0;
30 };
31 
32 typedef std::vector<Proc*> Pipeline;
33 
34 // Base class for something that pushes elements to a Pipe.
35 class SourceProc : public virtual Proc {
36 public:
37  virtual ~SourceProc() {}
38  // Return output Pipe
39  virtual Pipe& output_pipe() = 0;
40 };
41 // Base class for something that pops elements from a Pipe.
42 class SinkProc : public virtual Proc {
43 public:
44  virtual ~SinkProc() {}
45  // Return output Pipe
46  virtual Pipe& input_pipe() = 0;
47 };
48 
49 
50 // Base class for something that intermediates between two pipes
51 class FilterProc : public virtual SourceProc, public virtual SinkProc {
52 public:
53  virtual ~FilterProc() {}
54 };
55 
56 // A filter pops one element from input pipe and pushes it to output
57 // pipe each execution.
58 class ShuntProc : public FilterProc
59 {
60 public:
61  ShuntProc(Pipe& iq, Pipe& oq) :iq(iq), oq(oq) {}
62  virtual ~ShuntProc() {}
63 
64  virtual bool operator()() {
65  if (iq.empty()) { return false; }
66  oq.push(iq.front());
67  iq.pop();
68  return true;
69  }
70  virtual Pipe& input_pipe() { return iq; }
71  virtual Pipe& output_pipe() { return oq; }
72 
73 private:
74  Pipe& iq;
75  Pipe& oq;
76 };
77 
78 // A proc which pops an input element, feeds it to a node and pushes the result.
79 class FunctionNodeProc : public FilterProc {
80 public:
82  typedef std::shared_ptr<node_t> node_pointer_t;
83 
84  FunctionNodeProc(node_pointer_t node) : node(node) {}
85  virtual ~FunctionNodeProc() {}
86 
87  virtual Pipe& input_pipe() {
88  return iq;
89  }
90  virtual Pipe& output_pipe() {
91  return oq;
92  }
93 
94  virtual bool operator()() {
95  if (iq.empty()) { return false; }
96  boost::any anyout;
97  bool ok = (*node)(iq.front(), anyout);
98  if (!ok) return false;
99  iq.pop();
100  oq.push(anyout);
101  return true;
102  }
103 
104 private:
105  Pipe iq, oq;
106  node_pointer_t node;
107 
108 };
109 
110 
111 // A sink proc that pops and drops
112 class DropSinkProc : public SinkProc
113 {
114 public:
116  virtual ~DropSinkProc() {}
117 
118  virtual bool operator()() {
119  if (iq.empty()) { return false; }
120  iq.pop();
121  return true;
122  }
123  virtual Pipe& input_pipe() { return iq; }
124 
125 private:
126  Pipe iq;
127 };
128 
129 // A proc which produces one element using a node
130 class SourceNodeProc : public SourceProc {
131 public:
133  typedef std::shared_ptr<node_t> node_pointer_t;
134 
135  SourceNodeProc(node_pointer_t node) : node(node) {}
136  virtual ~SourceNodeProc() {}
137 
138  virtual Pipe& output_pipe() {
139  return oq;
140  }
141 
142  virtual bool operator()() {
143  boost::any anyout;
144  bool ok = (*node)(anyout);
145  if (!ok) return false;
146  oq.push(anyout);
147  return true;
148  }
149 
150 private:
151  Pipe oq;
152  node_pointer_t node;
153 };
154 
155 // A proc which pops gives next element a node and pops it if node does puke.
156 class SinkNodeProc : public SinkProc {
157 public:
158  typedef typename WireCell::ISinkNodeBase node_t;
159  typedef std::shared_ptr<node_t> node_pointer_t;
160 
161  SinkNodeProc(node_pointer_t node) : node(node) {}
162  virtual ~SinkNodeProc() {}
163 
164  virtual Pipe& input_pipe() {
165  return iq;
166  }
167 
168  virtual bool operator()() {
169  if (iq.empty()) { return false; }
170  bool ok = (*node)(iq.front());
171  if (!ok) return false;
172  iq.pop();
173  return true;
174  }
175 
176 private:
177  Pipe iq;
178  node_pointer_t node;
179 };
180 
181 typedef std::deque<boost::any> queuedany;
182 
183 // A proc which pops an input element, feeds it to a node, assumes the
184 // node is a queuedany, iterates over it and feeds individual elements
185 // to the output queue.
186 class QueuedNodeProc : public FilterProc {
187 public:
189  typedef std::shared_ptr<node_t> node_pointer_t;
190 
191  QueuedNodeProc(node_pointer_t node) : node(node) {}
192  virtual ~QueuedNodeProc() {}
193 
194  virtual Pipe& input_pipe() {
195  return iq;
196  }
197  virtual Pipe& output_pipe() {
198  return oq;
199  }
200 
201  virtual bool operator()() {
202  if (iq.empty()) { return false; }
203  queuedany anyq;
204  bool ok = (*node)(iq.front(), anyq);
205  if (!ok) return false;
206  iq.pop();
207  for (auto anyo : anyq) {
208  oq.push(anyo);
209  }
210  return true;
211  }
212 
213 private:
214  Pipe iq, oq;
215  node_pointer_t node;
216 };
217 
218 Proc* join(Pipeline& pipeline, Proc* src, Proc* dst)
219 {
220  pipeline.push_back(src);
221  auto link = new ShuntProc(dynamic_cast<SourceProc*>(src)->output_pipe(),
222  dynamic_cast<SinkProc*>(dst)->input_pipe());
223  pipeline.push_back(link);
224 
225  // do not push dst. return just for syntactic sugar
226  return dst;
227 }
228 
229 
230 
231 }
232 #endif
std::vector< Proc * > Pipeline
Definition: GenPipeline.h:32
node_pointer_t node
Definition: GenPipeline.h:178
SinkNodeProc(node_pointer_t node)
Definition: GenPipeline.h:161
virtual bool operator()()
Definition: GenPipeline.h:168
virtual Pipe & output_pipe()
Definition: GenPipeline.h:71
virtual bool operator()()=0
virtual Pipe & input_pipe()
Definition: GenPipeline.h:70
WireCell::ISourceNodeBase node_t
Definition: GenPipeline.h:132
virtual bool operator()()
Definition: GenPipeline.h:64
virtual bool operator()()
Definition: GenPipeline.h:201
virtual ~SinkProc()
Definition: GenPipeline.h:44
WireCell::IQueuedoutNodeBase node_t
Definition: GenPipeline.h:188
virtual Pipe & input_pipe()
Definition: GenPipeline.h:164
virtual Pipe & input_pipe()
Definition: GenPipeline.h:87
virtual bool operator()()
Definition: GenPipeline.h:94
WireCell::ISinkNodeBase node_t
Definition: GenPipeline.h:158
std::deque< boost::any > queuedany
Definition: GenPipeline.h:181
QueuedNodeProc(node_pointer_t node)
Definition: GenPipeline.h:191
virtual ~Proc()
Definition: GenPipeline.h:27
virtual bool operator()()
Definition: GenPipeline.h:142
Proc * join(Pipeline &pipeline, Proc *src, Proc *dst)
Definition: GenPipeline.h:218
SourceNodeProc(node_pointer_t node)
Definition: GenPipeline.h:135
virtual ~FilterProc()
Definition: GenPipeline.h:53
virtual ~ShuntProc()
Definition: GenPipeline.h:62
std::shared_ptr< node_t > node_pointer_t
Definition: GenPipeline.h:189
virtual Pipe & input_pipe()
Definition: GenPipeline.h:194
WireCell::IFunctionNodeBase node_t
Definition: GenPipeline.h:81
Definition: Main.h:22
virtual Pipe & output_pipe()
Definition: GenPipeline.h:90
std::shared_ptr< node_t > node_pointer_t
Definition: GenPipeline.h:82
virtual bool operator()()
Definition: GenPipeline.h:118
ShuntProc(Pipe &iq, Pipe &oq)
Definition: GenPipeline.h:61
virtual ~SourceProc()
Definition: GenPipeline.h:37
std::shared_ptr< node_t > node_pointer_t
Definition: GenPipeline.h:133
FunctionNodeProc(node_pointer_t node)
Definition: GenPipeline.h:84
std::queue< boost::any > Pipe
Definition: GenPipeline.h:22
virtual Pipe & output_pipe()
Definition: GenPipeline.h:197
virtual Pipe & output_pipe()
Definition: GenPipeline.h:138
virtual Pipe & input_pipe()
Definition: GenPipeline.h:123
std::shared_ptr< node_t > node_pointer_t
Definition: GenPipeline.h:159