EventDecoder_service.cc
Go to the documentation of this file.
1 //////////////////////////////////////////////////////////////////////////////////////
2 //
3 // Decoder for the DAQ events for charg readout
4 // The general idea is to try to make this object threadsave when possible
5 //
6 // TODO: test compression
7 //
8 //////////////////////////////////////////////////////////////////////////////////////
9 
10 //#include <iostream>
11 #include <iomanip>
12 #include <sstream>
13 #include <cstdlib>
14 
15 #include "EventDecoder.h"
16 #include "LogMsg.h"
17 #include "Timer.h"
18 
20 
21 using namespace std;
22 using namespace dlardaq;
23 
24 //
25 // ctor
26 //
27 EventDecoder::EventDecoder(size_t nch, size_t nsample)
28 {
29  // this is needed to read compressed data
30  m_nadc = dlardaq::BitsADC; // bits for ADC resolution
31  m_nch = nch; // total number of channels
32  m_nsample = nsample; // number of samples in each channel
33 
34  m_RunHeadBuf.resize( dlardaq::RunHeadSz );
35  m_EveHeadBuf.resize( dlardaq::EveHeadSz );
36  m_EndFootBuf.resize( dlardaq::FileFootSz );
37 
38  //
39  m_bytes_left = 0;
40 
41  // data mutex initialization
42  pthread_mutex_init(&m_data_mutex, NULL);
43 }
44 
45 
46 //
47 // dtor
48 //
49 EventDecoder::~EventDecoder()
50 {
51  Close();
52  pthread_mutex_destroy(&m_data_mutex);
53 }
54 
55 
56 //
57 // close input file
58 //
60 {
61  // lock mutex
62  lock( m_data_mutex );
63 
64  if(m_file.is_open())
65  m_file.close();
66 
67  m_totev = 0;
68 
69  // unlock mutex
70  unlock( m_data_mutex );
71 }
72 
73 
74 
75 //
76 //
77 //
78 ssize_t EventDecoder::Open(std::string finname)
79 {
80  // attempt to close any previously opened files
81  Close();
82 
83  lock( m_data_mutex );
84 
85  m_EveData.clear(); // not used when reading from file
86 
87  // clear event bookmarks
88  m_events.clear();
89 
90  //
91  m_file.open(finname.c_str(), ios::in | ios::binary);
92 
93  if( !m_file.is_open() )
94  {
95  msg_err<<"Could not open "<<finname<<" for reading"<<endl;
96  unlock( m_data_mutex );
97  return -1;
98  }
99 
100  // read run header bytes
101  ReadBytes( m_RunHeadBuf );
102  dlardaq::decode_runhead(&m_RunHeadBuf[0], m_rnh);
103 
104 
105  m_pstart = m_file.tellg();
106 
107  // read footer
108  // fast forward to the end
109  m_file.seekg(-dlardaq::FileFootSz, m_file.end );
110  m_pend = m_file.tellg();
111 
112  ReadBytes( m_EndFootBuf );
113  dlardaq::decode_filefoot(&m_EndFootBuf[0], m_flf);
114 
115  // rewind back to the begining
116  m_file.seekg( m_pstart );
117 
118 
119  //
120 
121  m_totev = m_flf.num_events;
122 
123  if( m_totev == 0 )
124  {
125  msg_err<<"File "<<finname<<" is empty"<<endl;
126  unlock(m_data_mutex);
127  Close();
128  return 0;
129  }
130 
131  unlock( m_data_mutex );
132 
133  return m_totev;
134 }
135 
136 //
137 // read a given number of bytes from file
138 //
139 void EventDecoder::ReadBytes( std::vector<BYTE> &bytes )
140 {
141  m_file.read( &bytes[0], bytes.size() );
142 }
143 
144 
145 //
146 //
147 //
148 void EventDecoder::ReadEvent( std::vector<adc16_t> &adc, bool headonly )
149 {
150  // first read the header
151  ReadBytes( m_EveHeadBuf );
152  decode_evehead(&m_EveHeadBuf[0], m_evh);
153  //dlardaq::msg_info<<">>> Event number "<<m_evh.ev_num<<endl;
154  //dlardaq::msg_info<<">>> Event size "<<m_evh.ev_size<<endl;
155  //dlardaq::msg_info<<">>> Trig number "<<m_evh.trig_info.num<<endl;
156  //dlardaq::msg_info<<">>> Time stamp "<<m_evh.trig_info.ts.tv_sec<<" s "
157  //<<m_evh.trig_info.ts.tv_nsec<<" ns"<<endl;
158  //dlardaq::msg_info<<">>> Flags "<<bitset<8>(m_evh.dq_flag)<<endl;
159 
160 
161  if(headonly) // read only header and skip to next event
162  {
163  // get event size
164  size_t evsz = m_evh.ev_size;
165 
166  // move to next event
167  m_file.seekg(evsz, std::ios::cur);
168  return;
169  }
170 
171  //
172  // decode event data
173  if(!m_EveDataBuf.empty()) m_EveDataBuf.clear();
174 
175  m_EveDataBuf.resize(m_evh.ev_size, '\0');
176 
177  // read content of the file
178  ReadBytes(m_EveDataBuf);
179  Decode( &m_EveDataBuf[0], m_EveDataBuf.size(), GETDCFLAG(m_evh.dq_flag), adc );
180 }
181 
182 //
183 // get a specific event from file
184 //
185 ssize_t EventDecoder::GetEvent(size_t evnum, dlardaq::evheader_t &eh,
186  std::vector<adc16_t> &adc)
187 {
188  adc.clear();
189  if(!m_file.is_open()) return -1;
190  //if(evnum >= m_totev) return -1; //no-can-do
191  lock(m_data_mutex);
192 
193  if(evnum < m_events.size()) // already know where it is
194  {
195  m_file.seekg(m_events[evnum]);
196  ReadEvent( adc );
197  eh = m_evh;
198  }
199  else
200  {
201  size_t curev = m_events.size();
202  while( curev <= evnum )
203  {
204  // our event begins at this position
205  streampos pos = m_file.tellg();
206  m_events.push_back( pos );
207 
208  // readonly header to get event data size
209  if( curev < evnum )
210  ReadEvent(adc, true);
211  else // event we want
212  ReadEvent(adc, false);
213 
214  curev++;
215  }
216 
217  // last event is the one with want ...
218  eh = m_evh;
219  }
220 
221  unlock(m_data_mutex);
222 
223  return evnum;
224 }
225 
226 //
227 //
228 //
229 ssize_t EventDecoder::GetEvent( dlardaq::evheader_t &eh,
230  std::vector<adc16_t> &adc )
231 {
232  if(m_file.is_open())
233  {
234  // return first event
235  return GetEvent( 0, eh, adc);
236  }
237 
238  ssize_t rval = -1;
239  //else // we have an event buffer
240  if( m_EveData.empty() ) return -1;
241 
242  // lock mutex
243  lock(m_data_mutex);
244 
245  eh = m_evh;
246  adc = m_EveData;
247 
248  // clear the internal data buffer
249  m_EveData.clear();
250 
251  rval = eh.ev_num;
252 
253  // unlock mutex
254  unlock(m_data_mutex);
255  return rval;
256 }
257 
258 //
259 //
260 //
261 ssize_t EventDecoder::Decode( const char *buf, size_t nb, bool cflag,
262  std::vector<adc16_t> &adc )
263 {
264  adc.clear();
265 
266  if( !cflag ) //not compressed
267  {
268  adc.resize( (nb*8)/m_nadc, 0 );
269  unpack12into16( buf, &adc[0], nb );
270  }
271  else
272  {
273  size_t byteidx = 0;
274  HuffDataCompressor::Instance().DecompressEventData(m_nadc, m_nch, m_nsample, buf, nb, byteidx, adc);
275 
276  }
277 
278  return adc.size();
279 }
280 
281 
282 //
283 // we assume that the buffer is
284 // run header + event header + event data but it can come in different packets
285 //
286 void EventDecoder::ReadBuffer(const char *buf, size_t nb)
287 {
288  if(m_file.is_open())
289  {
290  msg_err<<"Cannot use this function while reading data from a file"<<endl;
291  return;
292  }
293 
294  // lock mutex
295  lock(m_data_mutex);
296  m_totev = 0;
297  if(!m_EveData.empty()) m_EveData.clear();
298 
299  // we have finished reading previous event
300  if( m_bytes_left <= 0 ||
301  Timer::GetTimer().splittime(false, false) > TMAXWAIT)
302  {
303  m_EveDataBuf.clear();
304  if(IsFirstPacket(buf, nb)) //check we have a correct packet
305  {
306  // decode header
307  size_t nb_read = 0;
308  const char *pdata = buf;
309 
310  // decode run header
311  ssize_t ret;
312  ret = dlardaq::decode_runhead(pdata, m_rnh);
313  if( ret <= 0)
314  {
315  // unlock mutex
316  unlock(m_data_mutex);
317  return;
318  }
319  pdata += ret;
320  nb_read += ret;
321 
322  // decode event header
323  ret = dlardaq::decode_evehead(pdata, m_evh);
324  if( ret <= 0)
325  {
326  // unlock mutex
327  unlock(m_data_mutex);
328  return;
329  }
330  pdata += ret;
331  nb_read += ret;
332 
333  m_bytes_left = m_evh.ev_size;
334  if(m_bytes_left <= 0)
335  {
336  // unlock mutex
337  unlock(m_data_mutex);
338  return;
339  }
340 
341  //msg_info<<"Start recieving event "<<m_evh.ev_num<<endl;
342 
343  // copy the data
344  m_EveDataBuf.insert(m_EveDataBuf.end(), pdata, pdata + nb-nb_read);
345  m_bytes_left -= (nb - nb_read);
346 
347  // start timer
349 
350  //
351  unlock(m_data_mutex);
352  return;
353  }
354  else
355  {
356  unlock(m_data_mutex);
357  return;
358  }
359  }
360 
361  // append to buffer
362  m_EveDataBuf.insert(m_EveDataBuf.end(), buf, buf+nb);
363  m_bytes_left -= nb;
364  if( m_bytes_left < 0 )
365  {
366  msg_err<<"Byte packet mismatch"<<endl;
367 
368  // unlock mutex
369  unlock(m_data_mutex);
370  return;
371  }
372 
373  // we have full event
374  if( m_bytes_left == 0 )
375  {
376  // decode data
377  bool cflag = GETDCFLAG(m_evh.dq_flag);
378  Decode( &m_EveDataBuf[0], m_EveDataBuf.size(), cflag, m_EveData );
379  m_totev = 1;
380 
381  msg_info<<"Recieved: run "<<m_rnh.run_num<<" event "<<m_evh.ev_num<<endl;
382  }
383 
384  // unlock mutex
385  unlock(m_data_mutex);
386 }
387 
388 
389 //
390 // detect new event key after run header
391 //
392 bool EventDecoder::IsFirstPacket(const char *buf, size_t nb)
393 {
394  BYTE k0 = buf[dlardaq::RunHeadSz];
395  BYTE k1 = buf[dlardaq::RunHeadSz+1];
396  bool rval = ( ((k0 & 0xFF) == EVSKEY) && ((k1 & 0xFF) == EVSKEY) );
397 
398  return rval;
399 }
static constexpr double nb
Definition: Units.h:81
static const size_t FileFootSz
Definition: dlardaq.h:60
uint32_t ev_num
Definition: dlardaq.h:85
std::string string
Definition: nybbler.cc:12
gOutFile Close()
int16_t adc
Definition: CRTFragment.hh:202
STL namespace.
ssize_t decode_evehead(const char *buf, evheader_t &eh)
static Timer & GetTimer()
Definition: Timer.h:35
ssize_t decode_runhead(const char *buf, runheader_t &rh)
static const size_t RunHeadSz
Definition: dlardaq.h:56
static const size_t EveHeadSz
Definition: dlardaq.h:58
#define GETDCFLAG(info)
Definition: dlardaq.h:30
#define TMAXWAIT
Definition: EventDecoder.h:21
char BYTE
Definition: dlardaq.h:39
ssize_t decode_filefoot(const char *buf, footer_t &rf)
void start()
Definition: Timer.h:41
static const short BitsADC
Definition: dlardaq.h:63
#define EVSKEY
Definition: dlardaq.h:20
static LogStream msg_info(std::cout, _strinfo)
void unpack12into16(const void *in, void *out, size_t n)
static LogStream msg_err(std::cerr, _strerror)
byte bytes
Alias for common language habits.
Definition: datasize.h:101
QTextStream & endl(QTextStream &s)