Fourdee.cxx
Go to the documentation of this file.
1 #include "WireCellGen/Fourdee.h"
5 #include "WireCellUtil/ExecMon.h"
6 
8 
11 
12 using namespace std;
13 using namespace WireCell;
14 
15 
17  : m_depos(nullptr)
18  , m_drifter(nullptr)
19  , m_ductor(nullptr)
20  , m_dissonance(nullptr)
21  , m_digitizer(nullptr)
22  , m_output(nullptr)
23 {
24 }
25 
26 Gen::Fourdee::~Fourdee()
27 {
28 }
29 
31 {
33 
34  // the 4 d's and proof the developer can not count:
35  put(cfg, "DepoSource", "TrackDepos");
36  put(cfg, "DepoFilter", "");
37  put(cfg, "Drifter", "Drifter");
38  put(cfg, "Ductor", "Ductor");
39  put(cfg, "Dissonance", "SilentNoise");
40  put(cfg, "Digitizer", "Digitizer");
41  put(cfg, "Filter", "");
42  put(cfg, "FrameSink", "DumpFrames");
43 
44  return cfg;
45 }
46 
47 
49 {
50  std::string tn="";
51  Configuration cfg = thecfg;
52 
53  cerr << "Gen::Fourdee:configure:\n";
54 
55  tn = get<string>(cfg, "DepoSource");
56  cerr << "\tDepoSource: " << tn << endl;
57  m_depos = Factory::find_maybe_tn<IDepoSource>(tn);
58 
59  tn = get<string>(cfg, "DepoFilter");
60  if (tn.empty()) {
61  m_depofilter = nullptr;
62  cerr << "\tDepoFilter: none\n";
63  }
64  else {
65  cerr << "\tDepoFilter: " << tn << endl;
66  m_depofilter = Factory::find_maybe_tn<IDepoFilter>(tn);
67  }
68 
69  tn = get<string>(cfg, "Drifter");
70  cerr << "\tDrifter: " << tn << endl;
71  m_drifter = Factory::find_maybe_tn<IDrifter>(tn);
72 
73  tn = get<string>(cfg, "Ductor");
74  cerr << "\tDuctor: " << tn << endl;
75  m_ductor = Factory::find_maybe_tn<IDuctor>(tn);
76 
77 
78  tn = get<string>(cfg, "Dissonance","");
79  if (tn.empty()) { // noise is optional
80  m_dissonance = nullptr;
81  cerr << "\tDissonance: none\n";
82  }
83  else {
84  m_dissonance = Factory::find_maybe_tn<IFrameSource>(tn);
85  cerr << "\tDissonance: " << tn << endl;
86  }
87 
88  tn = get<string>(cfg, "Digitizer","");
89  if (tn.empty()) { // digitizer is optional, voltage saved w.o. it.
90  m_digitizer = nullptr;
91  cerr << "\tDigitizer: none\n";
92  }
93  else {
94  m_digitizer = Factory::find_maybe_tn<IFrameFilter>(tn);
95  cerr << "\tDigitizer: " << tn << endl;
96  }
97 
98  tn = get<string>(cfg, "Filter","");
99  if (tn.empty()) { // filter is optional
100  m_filter = nullptr;
101  cerr << "\tFilter: none\n";
102  }
103  else {
104  m_filter = Factory::find_maybe_tn<IFrameFilter>(tn);
105  cerr << "\tFilter: " << tn << endl;
106  }
107 
108  tn = get<string>(cfg, "FrameSink","");
109  if (tn.empty()) { // sink is optional
110  m_output = nullptr;
111  cerr << "\tSink: none\n";
112  }
113  else {
114  m_output = Factory::find_maybe_tn<IFrameSink>(tn);
115  cerr << "\tSink: " << tn << endl;
116  }
117 }
118 
119 
120 void dump(const IFrame::pointer frame)
121 {
122  if (!frame) {
123  cerr << "Fourdee: dump: empty frame\n";
124  return;
125  }
126 
127  for (auto tag: frame->frame_tags()) {
128  const auto& tlist = frame->tagged_traces(tag);
129  cerr << "Fourdee: frame tag: " << tag << " with " << tlist.size() << " traces\n";
130  }
131  for (auto tag: frame->trace_tags()) {
132  const auto& tlist = frame->tagged_traces(tag);
133  cerr << "Fourdee: trace tag: " << tag << " with " << tlist.size() << " traces\n";
134  }
135 
136  auto traces = frame->traces();
137  const int ntraces = traces->size();
138 
139  if (ntraces <= 0) {
140  cerr << "Fourdee: dump: no traces\n";
141  return;
142  }
143 
144  std::vector<int> tbins, tlens;
145  for (auto trace : *traces) {
146  const int tbin = trace->tbin();
147  tbins.push_back(tbin);
148  tlens.push_back(tbin+trace->charge().size());
149  }
150 
151  int tmin = *(std::minmax_element(tbins.begin(), tbins.end()).first);
152  int tmax = *(std::minmax_element(tlens.begin(), tlens.end()).second);
153 
154  cerr << "frame: #" << frame->ident()
155  << " @" << frame->time()/units::ms
156  << "ms with " << ntraces << " traces, tbins in: "
157  << "[" << tmin << "," << tmax << "]"
158  << endl;
159 }
160 
161 template<typename DEPOS>
162 void dump(DEPOS& depos)
163 {
164  if (depos.empty() or (depos.size() == 1 and !depos[0])) {
165  std::cerr << "Fourdee::dump: empty depos set\n";
166  return;
167  }
168 
169  std::vector<double> t,x,y,z;
170  double qtot = 0.0;
171  double qorig = 0.0;
172 
173  for (auto depo : depos) {
174  if (!depo) {
175  cerr << "Gen::Fourdee: null depo" << endl;
176  break;
177  }
178  auto prior = depo->prior();
179  if (!prior) {
180  cerr << "Gen::Fourdee: null prior depo" << endl;
181  }
182  else {
183  qorig += prior->charge();
184  }
185  t.push_back(depo->time());
186  x.push_back(depo->pos().x());
187  y.push_back(depo->pos().y());
188  z.push_back(depo->pos().z());
189  qtot += depo->charge();
190  }
191 
192  auto tmm = std::minmax_element(t.begin(), t.end());
193  auto xmm = std::minmax_element(x.begin(), x.end());
194  auto ymm = std::minmax_element(y.begin(), y.end());
195  auto zmm = std::minmax_element(z.begin(), z.end());
196  const int ndepos = depos.size();
197 
198  std::cerr << "Gen::FourDee: drifted " << ndepos << ", extent:\n"
199  << "\tt in [ " << (*tmm.first)/units::us << "," << (*tmm.second)/units::us << "]us,\n"
200  << "\tx in [" << (*xmm.first)/units::mm << ","<<(*xmm.second)/units::mm<<"]mm,\n"
201  << "\ty in [" << (*ymm.first)/units::mm << ","<<(*ymm.second)/units::mm<<"]mm,\n"
202  << "\tz in [" << (*zmm.first)/units::mm << ","<<(*zmm.second)/units::mm<<"]mm,\n"
203  << "\tQtot=" << qtot/units::eplus << ", <Qtot>=" << qtot/ndepos/units::eplus << " "
204  << " Qorig=" << qorig/units::eplus
205  << ", <Qorig>=" << qorig/ndepos/units::eplus << " electrons" << std::endl;
206 
207 
208 }
209 
210 
211 // Implementation warning: this violates node and proc generality in
212 // order to coerce a join into a linear pipeline.
213 class NoiseAdderProc : public FilterProc {
214 public:
215  NoiseAdderProc(IFrameSource::pointer nn) : noise_node(nn) {}
216  virtual ~NoiseAdderProc() {}
217 
218  virtual Pipe& input_pipe() {
219  return iq;
220  }
221  virtual Pipe& output_pipe() {
222  return oq;
223  }
224 
225  virtual bool operator()() {
226  if (iq.empty()) { return false; }
227 
228  const IFrame::pointer iframe = boost::any_cast<const IFrame::pointer>(iq.front());
229  iq.pop();
230 
231  if (!iframe) { // eos
232  std::cerr << "NoiseAdderProc eos\n";
233  boost::any out = iframe;
234  oq.push(out);
235  return true;
236  }
237 
238  IFrame::pointer nframe;
239  bool ok = (*noise_node)(nframe);
240  if (!ok) return false;
241  nframe = Gen::sum(IFrame::vector{iframe,nframe}, iframe->ident());
242  boost::any anyout = nframe;
243  oq.push(anyout);
244  return true;
245  }
246 
247 private:
248  Pipe iq, oq;
250 
251 };
252 
253 
255 {
256  execute_new();
257  //execute_old();
258 }
259 
261 {
262  if (!m_depos) {
263  cerr << "Fourdee: no depos, can't do much" << endl;
264  return;
265  }
266 
267  if (!m_ductor and (m_digitizer or m_dissonance or m_digitizer or m_filter or m_output) ) {
268  std::cerr <<"Fourdee: a Ductor is required for subsequent pipeline stages\n";
269  return;
270  }
271 
272  Pipeline pipeline;
273  auto source = new SourceNodeProc(m_depos);
274  Proc* last_proc = source;
275 
276  if (m_drifter) { // depo in, depo out
277  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_drifter) << endl;
278  auto proc = new QueuedNodeProc(m_drifter);
279  last_proc = join(pipeline, last_proc, proc);
280  }
281  if (m_depofilter) { // depo in, depo out
282  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_depofilter) << endl;
283  auto proc = new FunctionNodeProc(m_depofilter);
284  last_proc = join(pipeline, last_proc, proc);
285  }
286 
287  if (m_ductor) { // depo in, zero or more frames out
288  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_ductor) << endl;
289  auto proc = new QueuedNodeProc(m_ductor);
290  last_proc = join(pipeline, last_proc, proc);
291  }
292 
293  if (m_dissonance) { // frame in, frame out
294  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_dissonance) << endl;
295  auto proc = new NoiseAdderProc(m_dissonance);
296  last_proc = join(pipeline, last_proc, proc);
297  }
298 
299  if (m_digitizer) { // frame in, frame out
300  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_digitizer) << endl;
301  auto proc = new FunctionNodeProc(m_digitizer);
302  last_proc = join(pipeline, last_proc, proc);
303  }
304 
305  if (m_filter) { // frame in, frame out
306  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_filter) << endl;
307  auto proc = new FunctionNodeProc(m_filter);
308  last_proc = join(pipeline, last_proc, proc);
309  }
310 
311  if (m_output) { // frame in, full stop.
312  cerr << "Pipeline adding #"<<pipeline.size()<<": " << type(*m_output) << endl;
313  auto proc = new SinkNodeProc(m_output);
314  last_proc = join(pipeline, last_proc, proc);
315  }
316  else {
317  cerr << "Pipeline adding #"<<pipeline.size()<<": sink\n";
318  auto sink = new DropSinkProc;
319  last_proc = join(pipeline, last_proc, sink);
320  }
321 
322  // truly last proc needs to be added by hand.
323  pipeline.push_back(last_proc);
324 
325 
326 
327  // A "drain end first" strategy gives attention to draining
328  // queues the more toward the the end of the pipeline they are.
329  // This keeps the overall pipeline empty and leads to "pulse" type
330  // data movement. It's stupid for multiprocessing but in a single
331  // thread will keep memory usage low.
332  size_t pipelen = pipeline.size();
333  while (true) {
334  bool did_something = false;
335  for (size_t ind = pipelen-1; ind > 0; --ind) { // source has no input
336  SinkProc* proc = dynamic_cast<SinkProc*>(pipeline[ind]);
337  if (proc->input_pipe().empty()) {
338  continue;
339  }
340  bool ok = (*proc)();
341  if (!ok) {
342  std::cerr << "Pipeline failed\n";
343  return;
344  }
345  //std::cerr << "Executed process " << ind << std::endl;
346  did_something = true;
347  break;
348  }
349  if (!did_something) {
350  bool ok = (*pipeline[0])();
351  if (!ok) {
352  std::cerr << "Source empty\n";
353  return;
354  }
355  //std::cerr << "Executed source\n";
356  }
357  // otherwise, go through pipeline again
358  }
359 
360 }
362 {
363  if (!m_depos) cerr << "Fourdee: no depos" << endl;
364  if (!m_drifter) cerr << "Fourdee: no drifter" << endl;
365  if (!m_ductor) cerr << "Fourdee: no ductor" << endl;
366  if (!m_dissonance) cerr << "Fourdee: no dissonance" << endl;
367  if (!m_digitizer) cerr << "Fourdee: no digitizer" << endl;
368  if (!m_filter) cerr << "Fourdee: no filter" << endl;
369  if (!m_output) cerr << "Fourdee: no sink" << endl;
370 
371  // here we make a manual pipeline. In a "real" app this might be
372  // a DFP executed by TBB.
373  int count=0;
374  int ndepos = 0;
375  int ndrifted = 0;
376  ExecMon em;
377  cerr << "Gen::Fourdee: starting\n";
378  while (true) {
379  ++count;
380 
381  IDepo::pointer depo;
382  if (!(*m_depos)(depo)) {
383  cerr << "DepoSource is empty\n";
384  }
385  if (!depo) {
386  cerr << "Got null depo from source at loop iteration " << count << endl;
387  }
388  else {
389  ++ndepos;
390  }
391  //cerr << "Gen::FourDee: seen " << ndepos << " depos\n";
392 
393  IDrifter::output_queue drifted;
394  if (!(*m_drifter)(depo, drifted)) {
395  cerr << "Stopping on " << type(*m_drifter) << endl;
396  goto bail;
397  }
398  if (drifted.empty()) {
399  continue;
400  }
401  if (m_depofilter) {
402  IDrifter::output_queue fdrifted;
403  for (auto drifted_depo : drifted) {
404  IDepo::pointer fdepo;
405  if ((*m_depofilter)(drifted_depo, fdepo)) {
406  fdrifted.push_back(fdepo);
407  }
408  }
409  drifted = fdrifted;
410  }
411 
412  ndrifted += drifted.size();
413  cerr << "Gen::FourDee: seen " << ndrifted << " drifted\n";
414  dump(drifted);
415 
416  for (auto drifted_depo : drifted) {
417  IDuctor::output_queue frames;
418  if (!(*m_ductor)(drifted_depo, frames)) {
419  cerr << "Stopping on " << type(*m_ductor) << endl;
420  goto bail;
421  }
422  if (frames.empty()) {
423  continue;
424  }
425  cerr << "Gen::FourDee: got " << frames.size() << " frames\n";
426 
427  for (IFrameFilter::input_pointer voltframe : frames) {
428  em("got frame");
429  cerr << "voltframe: ";
430  dump(voltframe);
431 
432  if (m_dissonance) {
433  cerr << "Gen::FourDee: including noise\n";
434  IFrame::pointer noise;
435  if (!(*m_dissonance)(noise)) {
436  cerr << "Stopping on " << type(*m_dissonance) << endl;
437  goto bail;
438  }
439  if (noise) {
440  cerr << "noiseframe: ";
441  dump(noise);
442  voltframe = Gen::sum(IFrame::vector{voltframe,noise}, voltframe->ident());
443  em("got noise");
444  }
445  else {
446  cerr << "Noise source is empty\n";
447  }
448  }
449 
451  if (m_digitizer) {
452  if (!(*m_digitizer)(voltframe, adcframe)) {
453  cerr << "Stopping on " << type(*m_digitizer) << endl;
454  goto bail;
455  }
456  em("digitized");
457  cerr << "digiframe: ";
458  dump(adcframe);
459  }
460  else {
461  adcframe = voltframe;
462  }
463 
464  if (m_filter) {
466  if (!(*m_filter)(adcframe, filtframe)) {
467  cerr << "Stopping on " << type(*m_filter) << endl;
468  goto bail;
469  }
470  adcframe = filtframe;
471  em("filtered");
472  }
473 
474  if (m_output) {
475  if (!(*m_output)(adcframe)) {
476  cerr << "Stopping on " << type(*m_output) << endl;
477  goto bail;
478  }
479  em("output");
480  }
481  cerr << "Gen::Fourdee: frame with " << adcframe->traces()->size() << " traces\n";
482  }
483  }
484  }
485  bail: // what's this weird syntax? What is this, BASIC?
486  cerr << em.summary() << endl;
487 }
488 
std::vector< Proc * > Pipeline
Definition: GenPipeline.h:32
WireCell::IDepoSource::pointer m_depos
Definition: Fourdee.h:49
std::shared_ptr< const IFrame > pointer
Definition: IData.h:19
const CharType(& source)[N]
Definition: pointer.h:1147
static const double eplus
Definition: Units.h:110
virtual Pipe & input_pipe()
Definition: Fourdee.cxx:218
virtual Pipe & input_pipe()=0
std::string string
Definition: nybbler.cc:12
STL namespace.
void put(Configuration &cfg, const std::string &dotpath, const T &val)
Put value in configuration at the dotted path.
cfg
Definition: dbjson.py:29
std::vector< pointer > vector
Definition: IData.h:21
virtual ~NoiseAdderProc()
Definition: Fourdee.cxx:216
static const double ms
Definition: Units.h:104
WireCell::IDepoFilter::pointer m_depofilter
Definition: Fourdee.h:50
std::deque< output_pointer > output_queue
virtual bool operator()()
Definition: Fourdee.cxx:225
Binning tbins(nticks, t0, t0+readout_time)
double y
std::shared_ptr< const IFrame > input_pointer
Definition: IFunctionNode.h:39
virtual void execute_new()
Definition: Fourdee.cxx:260
WireCell::IDuctor::pointer m_ductor
Definition: Fourdee.h:52
virtual void execute()
Implement to run something.
Definition: Fourdee.cxx:254
const int ndepos
Proc * join(Pipeline &pipeline, Proc *src, Proc *dst)
Definition: GenPipeline.h:218
double z
virtual void execute_old()
Definition: Fourdee.cxx:361
WireCell::IFrameSource::pointer m_dissonance
Definition: Fourdee.h:53
virtual Pipe & output_pipe()
Definition: Fourdee.cxx:221
std::shared_ptr< IFrameSource > pointer
Definition: IFrameSource.h:15
virtual void configure(const WireCell::Configuration &config)
Accept a configuration.
Definition: Fourdee.cxx:48
NoiseAdderProc(IFrameSource::pointer nn)
Definition: Fourdee.cxx:215
std::shared_ptr< const IFrame > output_pointer
Definition: IFunctionNode.h:40
static const double mm
Definition: Units.h:55
IFrame::pointer sum(std::vector< IFrame::pointer > frames, int ident)
Definition: FrameUtil.cxx:15
Definition: Main.h:22
IFrameSource::pointer noise_node
Definition: Fourdee.cxx:249
WireCell::IFrameSink::pointer m_output
Definition: Fourdee.h:56
virtual WireCell::Configuration default_configuration() const
Optional, override to return a hard-coded default configuration.
Definition: Fourdee.cxx:30
std::string type(const T &t)
Definition: Type.h:20
Json::Value Configuration
Definition: Configuration.h:50
WireCell::IDrifter::pointer m_drifter
Definition: Fourdee.h:51
static const double us
Definition: Units.h:105
std::queue< boost::any > Pipe
Definition: GenPipeline.h:22
list x
Definition: train.py:276
WireCell::IFrameFilter::pointer m_filter
Definition: Fourdee.h:55
WIRECELL_FACTORY(FourDee, WireCell::Gen::Fourdee, WireCell::IApplication, WireCell::IConfigurable) using namespace std
second_as<> second
Type of time stored in seconds, in double precision.
Definition: spacetime.h:80
void dump(const IFrame::pointer frame)
Definition: Fourdee.cxx:120
std::string summary() const
Return summary up to now.
Definition: ExecMon.cxx:21
WireCell::IFrameFilter::pointer m_digitizer
Definition: Fourdee.h:54
QTextStream & endl(QTextStream &s)