DataFlowGraph.cxx
Go to the documentation of this file.
1 
2 
4 
5 #include "WireCellUtil/Type.h"
7 
8 #include <iostream>
9 
11 
12 using namespace std;
13 using namespace WireCell;
14 using namespace WireCellTbb;
15 
16 
17 
18 
19 DataFlowGraph::DataFlowGraph(int max_threads)
20  : m_sched(max_threads > 0 ? max_threads : tbb::task_scheduler_init::automatic)
21  , m_graph()
22  , m_factory(m_graph)
23 {
24 }
25 
27 {
28 }
29 
31 {
33  cfg["max_threads"] = 1;
34  return cfg;
35 }
36 
37 
39 {
40  // int maxthreads = get<int>(cfg,"max_threads");
41  // fixme: now what?
42 }
43 
44 
46  size_t sport, size_t rport)
47 {
48  using namespace WireCellTbb;
49 
50  Node mytail = m_factory(tail);
51  if (!mytail) {
52  cerr << "DFP: failed to get tail node wrapper for "
53  << demangle(tail->signature()) << endl;
54  return false;
55  }
56 
57  Node myhead = m_factory(head);
58  if (!myhead) {
59  cerr << "DFP: failed to get head node wrapper for "
60  << demangle(head->signature()) << endl;
61  return false;
62  }
63 
64  auto sports = mytail->sender_ports();
65  if (sport < 0 || sports.size() <= sport) {
66  cerr << "DFP: bad sender port number: " << sport << endl;
67  return false;
68  }
69 
70  auto rports = myhead->receiver_ports();
71  if (rport < 0 || rports.size() <= rport) {
72  cerr << "DFP: bad receiver port number: " << rport << endl;
73  return false;
74  }
75 
76 
77  sender_type* s = sports[sport];
78  if (!s) {
79  cerr << "DFP: failed to get sender port " << sport << endl;
80  return false;
81  }
82 
83  receiver_type* r = rports[rport];
84  if (!s) {
85  cerr << "DFP: failed to get receiver port " << rport << endl;
86  return false;
87  }
88 
89 
90  //cerr << "Connecting " << s << " and " << r << endl;
91  make_edge(*s, *r);
92  return true;
93 }
94 
95 
97 {
98  for (auto it : m_factory.seen()) {
99  cerr << "Initialize node of type: " << demangle(it.first->signature()) << endl;
100  it.second->initialize();
101  }
102  m_graph.wait_for_all();
103  return true;
104 }
105 
std::string demangle(T const *=nullptr)
Outputs a demangled name for type T.
Definition: DebugUtils.h:348
virtual bool run()
Run the graph, return false on error.
tbb::flow::graph m_graph
Definition: DataFlowGraph.h:35
virtual bool connect(WireCell::INode::pointer tail, WireCell::INode::pointer head, size_t tail_port=0, size_t head_port=0)
STL namespace.
cfg
Definition: dbjson.py:29
std::shared_ptr< NodeWrapper > Node
Definition: NodeWrapper.h:51
WIRECELL_FACTORY(TbbDataFlowGraph, WireCellTbb::DataFlowGraph, WireCell::IDataFlowGraph, WireCell::IConfigurable)
std::shared_ptr< Interface > pointer
Definition: Interface.h:16
virtual WireCell::Configuration default_configuration() const
Optional, override to return a hard-coded default configuration.
tbb::flow::sender< boost::any > sender_type
Definition: NodeWrapper.h:15
Definition: Main.h:22
virtual void configure(const WireCell::Configuration &config)
Accept a configuration.
WCNode2Wrapper & seen()
Json::Value Configuration
Definition: Configuration.h:50
static QCString * s
Definition: config.cpp:1042
QTextStream & endl(QTextStream &s)
tbb::flow::receiver< boost::any > receiver_type
Definition: NodeWrapper.h:16