Omnibus.cxx
Go to the documentation of this file.
2 
5 // #include "WireCellUtil/ExecMon.h"
6 
7 #include "FrameUtils.h"
9 
10 #include <iostream>
11 
14 
15 using namespace WireCell;
16 
17 SigProc::Omnibus::Omnibus()
18 {
19 }
20 
21 SigProc::Omnibus::~Omnibus()
22 {
23 }
24 
25 
26 WireCell::Configuration SigProc::Omnibus::default_configuration() const
27 {
29  cfg["source"] = ""; // required
30  cfg["filters"] = Json::arrayValue;
31  cfg["sink"] = ""; // optional
32  return cfg;
33 }
34 
36 {
37  m_input_tn = cfg["source"].asString(); // keep for printouts later
38  m_input = Factory::find_tn<IFrameSource>(m_input_tn);
39 
40  m_output_tn = cfg["sink"].asString(); // keep for printouts later
41  if (m_output_tn.empty()) {
42  std::cerr << "Omnibus has no final frame sink component.\n";
43  m_output = nullptr;
44  }
45  else {
46  m_output = Factory::find_tn<IFrameSink>(m_output_tn);
47  }
48 
49  m_filters.clear();
50  auto jffl = cfg["filters"];
51  for (auto jff : jffl) {
52  std::string fftn = jff.asString();
53  std::cerr << "Omnibus adding frame filter: \"" << fftn << "\"\n";
54  auto ff = Factory::find_tn<IFrameFilter>(fftn);
55  m_filters.push_back(ff);
56  m_filters_tn.push_back(fftn);
57  }
58 }
59 
60 
61 void SigProc::Omnibus::execute()
62 {
63  // ExecMon em("omnibus starts");
64 
65  IFrame::pointer frame;
66  if (!(*m_input)(frame)) {
67  std::cerr << "Omnibus: failed to get input frame from " << m_input_tn << "\n";
68  THROW(RuntimeError() << errmsg{"Omnibus: failed to get input frame"});
69  }
70  if (!frame) {
71  std::cerr << "Omnibus: got null frame, forwarding, assuming we have reached EOS\n";
72  }
73  else {
74  if (!frame->traces()->size()) {
75  std::cerr << "Omnibus: got empty input frame, something is busted\n";
76  THROW(RuntimeError() << errmsg{"Omnibus: got empty input frame, something is busted"});
77  }
78  else {
79  std::cerr << "Omnibus: got input frame from "<<m_input_tn<<" with " << frame->traces()->size() << " traces\n";
80  dump_frame(frame);
81  }
82  }
83 
84  //em("sourced frame");
85 
86  int count = 0;
87  for (auto ff : m_filters) {
88  std::string tn = m_filters_tn[count++];
89  IFrame::pointer nextframe;
90  if (!(*ff)(frame, nextframe)) {
91  std::cerr << "Failed to filter frame from "<<tn<<"\n"; // fixme, give more info
92  THROW(RuntimeError() << errmsg{"failed to filter frame"});
93  }
94  if (!nextframe && !frame) {
95  continue; // processing EOS
96  }
97  if (!nextframe) {
98  std::cerr << "Omnibus: filter "<<tn<<" returned a null frame\n";
99  THROW(RuntimeError() << errmsg{"filter returned a null frame"});
100  }
101  //em("filtered frame from " + tn);
102 
103  frame = nextframe;
104  nextframe = nullptr;
105  //em("dropped input to " + tn);
106 
107  if (frame) {
108  std::cerr << "Omnibus: got frame from "<<tn<<" with " << frame->traces()->size() << " traces\n";
109  dump_frame(frame);
110  }
111  }
112 
113  if (m_output) {
114  if (!(*m_output)(frame)) {
115  std::cerr << "Omnibus: failed to send output frame to "<<m_output_tn<<"\n";
116  THROW(RuntimeError() << errmsg{"failed to send output frame"});
117  }
118  }
119  //em("sunk frame");
120 
121  frame = nullptr;
122  //em("dropped output frame");
123 
124  //std::cerr << em.summary() << std::endl;
125 }
126 
127 // Local Variables:
128 // mode: c++
129 // c-basic-offset: 4
130 // End:
std::shared_ptr< const IFrame > pointer
Definition: IData.h:19
std::string string
Definition: nybbler.cc:12
boost::error_info< struct tag_errmsg, std::string > errmsg
Definition: Exceptions.h:54
void dump_frame(WireCell::IFrame::pointer frame)
Definition: FrameUtils.cxx:10
cfg
Definition: dbjson.py:29
def configure(cfg)
Definition: cuda.py:34
#define THROW(e)
Definition: Exceptions.h:25
Definition: Main.h:22
WIRECELL_FACTORY(Omnibus, WireCell::SigProc::Omnibus, WireCell::IApplication, WireCell::IConfigurable) using namespace WireCell
Json::Value Configuration
Definition: Configuration.h:50
Thrown when an error occurs during the data processing.
Definition: Exceptions.h:49