Wrappers.h
Go to the documentation of this file.
1 #ifndef WIRECELL_PGRAPH_WRAPPERS
2 #define WIRECELL_PGRAPH_WRAPPERS
3 
4 #include "WireCellPgraph/Graph.h"
5 
6 // fixme: this is a rather monolithic file that should be broken out
7 // into its own package. It needs to depend on util and iface but NOT
8 // gen.
18 
19 #include "WireCellUtil/Type.h"
20 
21 #include <map>
22 #include <iostream> // debug
23 #include <sstream>
24 
25 namespace WireCell { namespace Pgraph {
26 
27  // Node wrappers are constructed with just an INode::pointer
28  // and adapt it to Pgraph::Node. They operate at the
29  // boost::any level and the I*BaseNode INode level. They are
30  // not meant to be constructed directly but through the
31  // type-erasing Pgraph::Factory. Their operator() must return
32  // false if their underlying node can not be called or if that
33  // node returns false meaning no change of data.
34 
35  // Base class taking care of constructing ports and providing
36  // ident().
37  class PortedNode : public Pgraph::Node
38  {
39  public:
40  PortedNode(INode::pointer wcnode) : m_wcnode(wcnode) {
41  if (! m_wcnode.get()) {
42  THROW(ValueError() << errmsg{"Pgraph::PortedNode got null INode"});
43  }
44 
45  using Pgraph::Port;
46  for (auto sig : wcnode->input_types()) {
47  m_ports[Port::input].push_back(
48  Pgraph::Port(this, Pgraph::Port::input, sig));
49  }
50  for (auto sig : wcnode->output_types()) {
51  m_ports[Port::output].push_back(
52  Pgraph::Port(this, Pgraph::Port::output, sig));
53  }
54  }
55 
56  virtual std::string ident() {
57  std::stringstream ss;
58  ss << "<Node "
59  << " type:" << WireCell::type(*(m_wcnode.get()))
60  << " cat:" << m_wcnode->category()
61  << " sig:" << demangle(m_wcnode->signature());
62  ss << " inputs:[";
63  for (auto t : m_wcnode->input_types()) {
64  ss << " " << demangle(t);
65  }
66  ss << " ]";
67  ss << " outputs:[";
68  for (auto t : m_wcnode->output_types()) {
69  ss << " " << demangle(t);
70  }
71  ss << " ]";
72  return ss.str();
73  }
74 
75  private:
77  };
78 
79 
80  class Source : public PortedNode {
82  bool m_ok;
83  public:
84  Source(INode::pointer wcnode) : PortedNode(wcnode), m_ok(true) {
85  m_wcnode = std::dynamic_pointer_cast<ISourceNodeBase>(wcnode);
86  }
87  virtual ~Source() {}
88 
89  virtual bool operator()() {
90  Port& op = oport();
91  if (op.size()) {
92  return false; // don't call me if I've got existing output waiting
93  }
94 
95  boost::any obj;
96  m_ok = (*m_wcnode)(obj);
97  if (!m_ok) {
98  return false;
99  }
100  oport().put(obj);
101  return true;
102  }
103  };
104 
105  class Sink : public PortedNode {
107  public:
108  Sink(INode::pointer wcnode) : PortedNode(wcnode) {
109  m_wcnode = std::dynamic_pointer_cast<ISinkNodeBase>(wcnode);
110  }
111  virtual ~Sink() {}
112  virtual bool operator()() {
113  Port& ip = iport();
114  if (ip.empty()) {
115  return false; // don't call me if there is nothing to give me.
116  }
117  auto obj = ip.get();
118  bool ok = (*m_wcnode)(obj);
119  //std::cerr << "Sink returns: " << ok << std::endl;
120  return ok;
121  }
122  };
123 
124  class Function : public PortedNode {
126  public:
127  Function(INode::pointer wcnode) : PortedNode(wcnode) {
128  m_wcnode = std::dynamic_pointer_cast<IFunctionNodeBase>(wcnode);
129  }
130  virtual ~Function() {}
131  virtual bool operator()() {
132  Port& op = oport();
133  if (op.size()) {
134  return false; // don't call me if I've got existing output waiting
135  }
136  Port& ip = iport();
137  if (ip.empty()) {
138  return false; // don't call me if there is nothing to give me.
139  }
140  boost::any out;
141  auto in = ip.get();
142  bool ok = (*m_wcnode)(in, out);
143  if (!ok) {
144  return false;
145  }
146  op.put(out);
147  return true;
148  }
149  };
150 
151  class Queuedout : public PortedNode {
153  public:
154  Queuedout(INode::pointer wcnode) : PortedNode(wcnode) {
155  m_wcnode = std::dynamic_pointer_cast<IQueuedoutNodeBase>(wcnode);
156  }
157  virtual ~Queuedout() {}
158  virtual bool operator()() {
159  Port& ip = iport();
160  if (ip.empty()) { return false; }
162  auto in = ip.get();
163  bool ok = (*m_wcnode)(in, outv);
164  if (!ok) return false;
165  for (auto out : outv) {
166  oport().put(out);
167  }
168  return true;
169  }
170  };
171 
172  template<class INodeBaseType>
173  class JoinFanin : public PortedNode {
174  public:
175  typedef INodeBaseType inode_type;
177  typedef typename INodeBaseType::pointer pointer;
178 
179  JoinFanin(INode::pointer wcnode) : PortedNode(wcnode) {
180  m_wcnode = std::dynamic_pointer_cast<INodeBaseType>(wcnode);
181  }
182  virtual ~JoinFanin() {}
183  virtual bool operator()() {
184  Port& op = oport();
185  if (!op.empty()) {
186  return false; // don't call me if I've got existing output waiting
187  }
188 
189  auto& iports = input_ports();
190  size_t nin = iports.size();
191  for (size_t ind=0; ind<nin; ++ind) {
192  if (iports[ind].empty()) {
193  return false;
194  }
195  }
196  any_vector inv(nin);
197  for (size_t ind=0; ind<nin; ++ind) {
198  inv[ind] = iports[ind].get();
199  }
200  boost::any out;
201  bool ok = (*m_wcnode)(inv, out);
202  if (!ok) {
203  return false;
204  }
205  op.put(out);
206  return true;
207  }
208  private:
209  pointer m_wcnode;
210  };
213 
214 
215  template<class INodeBaseType>
216  class SplitFanout : public PortedNode {
217  public:
218  typedef INodeBaseType inode_type;
220  typedef typename INodeBaseType::pointer pointer;
221 
223  m_wcnode = std::dynamic_pointer_cast<inode_type>(wcnode);
224  }
225  virtual ~SplitFanout() {}
226  virtual bool operator()() {
227 
228  Port& ip = iport();
229  if (ip.empty()) {
230  return false; // don't call me if there is not any new input
231  }
232 
233  auto& oports = output_ports();
234  size_t nout = oports.size();
235 
236  bool full = true;
237  for (size_t ind=0; ind<nout; ++ind) {
238  if (oports[ind].empty()) {
239  full = false;
240  }
241  }
242  if (full) {
243  return false; // don't call me if all my output has something
244  }
245 
246  auto in = ip.get();
247 
248  any_vector outv(nout);
249  bool ok = (*m_wcnode)(in, outv);
250  if (!ok) {
251  return false;
252  }
253  //std::cerr << "SplitFanout: " << nout << " " << outv.size() << std::endl;
254  for (size_t ind=0; ind<nout; ++ind) {
255  oports[ind].put(outv[ind]);
256  }
257  return true;
258  }
259  private:
260  pointer m_wcnode;
261 
262  };
265 
266 
267  // N-to-1 of the same type with synchronization on input.
268  // class Fanin : public PortedNode {
269  // IFaninNodeBase::pointer m_wcnode;
270  // std::vector<bool> m_eos;
271  // public:
272  // Fanin(INode::pointer wcnode) : PortedNode(wcnode) {
273  // m_wcnode = std::dynamic_pointer_cast<IFaninNodeBase>(wcnode);
274  // }
275  // virtual ~Fanin() {}
276 
277  // };
278 
279  class Hydra : public PortedNode {
281  public:
282  Hydra(INode::pointer wcnode) : PortedNode(wcnode) {
283  m_wcnode = std::dynamic_pointer_cast<IHydraNodeBase>(wcnode);
284  }
285  virtual ~Hydra() { }
286 
287  virtual bool operator()() {
288  auto& iports = input_ports();
289  size_t nin = iports.size();
290 
291  // 0) Hydra needs all input ports full to be ready.
292  // For EOS, the concrete INode better retain the
293  // terminating nullptr in its input stream.
294  for (size_t ind=0; ind < nin; ++ind) {
295  if (iports[ind].empty()) {
296  return false;
297  }
298  }
299 
300  // 1) fill input any queue vector
302  for (size_t ind=0; ind < nin; ++ind) {
303  Edge edge = iports[ind].edge();
304  if (!edge) {
305  std::cerr << "Hydra: got broken edge\n";
306  continue;
307  }
308  if (edge->empty()) {
309  continue;
310  }
311  inqv[ind].insert(inqv[ind].begin(), edge->begin(), edge->end());
312  }
313 
314  auto& oports = output_ports();
315  size_t nout = oports.size();
316 
317  // 2) create output any queue vector
319 
320  // 3) call
321  bool ok = (*m_wcnode)(inqv, outqv);
322  if (!ok) { return false; } // fixme: this probably
323  // needs to reflect into
324  // ready().
325 
326  // 4) pop dfp input queues to match. BIG FAT
327  // WARNING: this trimming assumes calller only
328  // pop_front's. Really should hunt for which ones
329  // have been removed.
330  for (size_t ind=0; ind < nin; ++ind) {
331  size_t want = inqv[ind].size();
332  while (iports[ind].size() > want) {
333  iports[ind].get();
334  }
335  }
336 
337  // 5) send out output any queue vectors
338  for (size_t ind=0; ind < nout; ++ind) {
339  Edge edge = oports[ind].edge();
340  edge->insert(edge->end(), outqv[ind].begin(), outqv[ind].end());
341  }
342 
343  return true;
344  }
345  };
346 
347 
348  }}
349 
350 #endif
std::shared_ptr< IHydraNodeBase > pointer
Definition: IHydraNode.h:26
IFunctionNodeBase::pointer m_wcnode
Definition: Wrappers.h:125
PortList & input_ports()
Definition: Node.h:29
Queuedout(INode::pointer wcnode)
Definition: Wrappers.h:154
ISinkNodeBase::pointer m_wcnode
Definition: Wrappers.h:106
INodeBaseType inode_type
Definition: Wrappers.h:175
std::string string
Definition: nybbler.cc:12
boost::error_info< struct tag_errmsg, std::string > errmsg
Definition: Exceptions.h:54
JoinFanin< IFaninNodeBase > Fanin
Definition: Wrappers.h:212
std::shared_ptr< IFunctionNodeBase > pointer
Definition: IFunctionNode.h:16
Source(INode::pointer wcnode)
Definition: Wrappers.h:84
IHydraNodeBase::pointer m_wcnode
Definition: Wrappers.h:280
std::shared_ptr< Queue > Edge
Definition: Port.h:27
INodeBaseType::pointer pointer
Definition: Wrappers.h:177
Hydra(INode::pointer wcnode)
Definition: Wrappers.h:282
virtual bool operator()()
Definition: Wrappers.h:183
INodeBaseType::any_vector any_vector
Definition: Wrappers.h:176
virtual bool operator()()
Definition: Wrappers.h:226
virtual bool operator()()
Definition: Wrappers.h:158
std::pair< int, int > edge(const realseq_t &wave)
Definition: Waveform.cxx:121
INodeBaseType::any_vector any_vector
Definition: Wrappers.h:219
ISourceNodeBase::pointer m_wcnode
Definition: Wrappers.h:81
SplitFanout< ISplitNodeBase > Split
Definition: Wrappers.h:263
std::deque< boost::any > queuedany
decltype(auto) constexpr size(T &&obj)
ADL-aware version of std::size.
Definition: StdUtils.h:87
Data get(bool pop=true)
Definition: Port.cxx:43
std::shared_ptr< IQueuedoutNodeBase > pointer
virtual bool operator()()
Definition: Wrappers.h:131
SplitFanout< IFanoutNodeBase > Fanout
Definition: Wrappers.h:264
virtual bool operator()()
Definition: Wrappers.h:112
std::string demangle(const std::string &name)
Definition: Type.cxx:6
std::shared_ptr< Interface > pointer
Definition: Interface.h:16
Port & iport(size_t ind=0)
Definition: Node.h:22
IQueuedoutNodeBase::pointer m_wcnode
Definition: Wrappers.h:152
#define THROW(e)
Definition: Exceptions.h:25
virtual bool operator()()
Definition: Wrappers.h:89
SplitFanout(INode::pointer wcnode)
Definition: Wrappers.h:222
JoinFanin(INode::pointer wcnode)
Definition: Wrappers.h:179
Thrown when a wrong value has been encountered.
Definition: Exceptions.h:37
std::vector< boost::any > any_vector
Definition: NodeWrapper.h:21
std::vector< any_queue > any_queue_vector
Definition: IHydraNode.h:31
Definition: Main.h:22
Function(INode::pointer wcnode)
Definition: Wrappers.h:127
std::shared_ptr< ISinkNodeBase > pointer
Definition: ISinkNode.h:16
PortedNode(INode::pointer wcnode)
Definition: Wrappers.h:40
PortList m_ports[Port::ntypes]
Definition: Node.h:68
std::string type(const T &t)
Definition: Type.h:20
Sink(INode::pointer wcnode)
Definition: Wrappers.h:108
virtual bool operator()()
Definition: Wrappers.h:287
std::shared_ptr< ISourceNodeBase > pointer
Definition: ISourceNode.h:16
virtual std::string ident()
Definition: Wrappers.h:56
decltype(auto) constexpr begin(T &&obj)
ADL-aware version of std::begin.
Definition: StdUtils.h:67
INode::pointer m_wcnode
Definition: Wrappers.h:76
JoinFanin< IJoinNodeBase > Join
Definition: Wrappers.h:211
void put(Data &data)
Definition: Port.cxx:62
const GenericPointer< typename T::ValueType > & pointer
Definition: pointer.h:1124
INodeBaseType::pointer pointer
Definition: Wrappers.h:220
Port & oport(size_t ind=0)
Definition: Node.h:25
decltype(auto) constexpr empty(T &&obj)
ADL-aware version of std::empty.
Definition: StdUtils.h:92
PortList & output_ports()
Definition: Node.h:32