EigerStream.cpp 18.3 KB
Newer Older
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2015
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#include <fcntl.h>
#include <unistd.h>

#include <map>
#include <set>

#include <zmq.h>

30
#include "EigerCameraRequests.h"
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
31
32
33
34
35
#include <eigerapi/EigerDefines.h>

#include "lima/Exceptions.h"
#include "EigerStream.h"

36
37
38
#define _BSD_SOURCE
#include <endian.h>

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
39
40
41
using namespace lima;
using namespace lima::Eiger;
using namespace eigerapi;
42

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
43
44
45
46
47
48
49
50
51
52
53
54
55
//			--- Message struct ---
struct Stream::Message
{
  Message()
  {
    zmq_msg_init(&msg);
  }
  ~Message()
  {
    zmq_msg_close(&msg);
  }
  zmq_msg_t* get_msg() {return &msg;}

56
57
58
59
60
61
  void get_msg_data_n_size(void*& data, size_t& size)
  {
    data = zmq_msg_data(&msg);
    size = zmq_msg_size(&msg);
  }

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
  zmq_msg_t msg;
};
//		      --- buffer management ---
class Stream::_BufferCtrlObj : public SoftBufferCtrlObj
{
  DEB_CLASS_NAMESPC(DebModCamera,"Stream","_BufferCtrlObj");
public:
  _BufferCtrlObj(Stream& stream) : 
    m_stream(stream)
  {
  }
private:
  Stream&	m_stream;
};

77
78
79
//		      --- Stream::ImageData ---
void Stream::ImageData::getMsgDataNSize(void*& data, size_t& size) const
{
80
81
82
83
84
85
86
87
88
  msg->get_msg_data_n_size(data, size);
}

std::ostream& lima::Eiger::operator <<(std::ostream& os, Stream::State state)
{
  const char *name;
  switch (state) {
  case Stream::State::Init: name = "Init"; break;
  case Stream::State::Idle: name = "Idle"; break;
89
  case Stream::State::Starting: name = "Starting"; break;
90
  case Stream::State::Connected: name = "Connected"; break;
91
  case Stream::State::Failed: name = "Failed"; break;
92
93
94
  case Stream::State::Armed: name = "Armed"; break;
  case Stream::State::Running: name = "Running"; break;
  case Stream::State::Stopped: name = "Stopped"; break;
95
96
  case Stream::State::Aborting: name = "Aborting"; break;
  case Stream::State::Quitting: name = "Quitting"; break;
97
98
99
  default: name = "Unknown";
  }
  return os << name;
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
}

std::ostream& lima::Eiger::operator <<(std::ostream& os,
				       const Stream::ImageData& img_data)
{
  void *msg_data;
  size_t msg_size;
  img_data.getMsgDataNSize(msg_data, msg_size);
  return os << "<"
	    << "data=" << msg_data << ", "
	    << "size=" << msg_size << ", "
	    << "depth=" << img_data.depth << ", "
	    << "comp_type=" << img_data.comp_type
	    << ">";
}

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
116
//			 --- Stream class ---
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
inline bool Stream::_isRunning() const
{
  return ((m_state == Connected) || (m_state == Armed) || (m_state == Running));
}

inline Json::Value Stream::_get_json_header(MessagePtr &msg)
{
  DEB_MEMBER_FUNCT();
  void* data;
  size_t data_size;
  msg->get_msg_data_n_size(data, data_size);
  DEB_TRACE() << "json_header=" << std::string((char *) data, data_size);
  
  const char* begin = (const char*)data;
  const char* end = begin + data_size;
  Json::Value header;
  Json::Reader reader;
  if (!reader.parse(begin,end,header))
    THROW_HW_ERROR(Error) << "Error parsing header: " << std::string(begin, end);
  return header;
}

inline Json::Value Stream::_get_global_header(const Json::Value& stream_header,
					      MessageList& pending_messages)
{
  DEB_MEMBER_FUNCT();
  std::string header_detail = stream_header.get("header_detail","").asString();
  if (header_detail != m_header_detail_str)
    THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(header_detail) << ", "
			  << "expected " << m_header_detail_str;
  int nb_parts;
  int header_message_id;
  switch (m_header_detail) {
  case OFF:
    nb_parts = 1;
    header_message_id = 0;
    break;
  case BASIC:
    nb_parts = 2;
    header_message_id = 1;
    break;
  case ALL:
    nb_parts = 8;
    header_message_id = 1;
    break;
  }
  int nb_messages = pending_messages.size();
  if (nb_messages < nb_parts)
    THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(nb_messages)
			  << " for " << DEB_VAR1(m_header_detail_str);
    
  return _get_json_header(pending_messages[header_message_id]);
}

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
171
172
173
174
Stream::Stream(Camera& cam) : 
  m_cam(cam),
  m_header_detail(OFF),
  m_dirty_flag(true),
175
  m_state(Init),
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
176
177
178
179
  m_buffer_ctrl_obj(new Stream::_BufferCtrlObj(*this))
{
  DEB_CONSTRUCTOR();

180
181
182
  bool is_le = (htole16(0x1234) == 0x1234);
  m_endianess = (is_le ? '<' : '>');

183
  m_buffer_mgr = &m_buffer_ctrl_obj->getBuffer();
184
  m_active = _getStreamMode();
185

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
186
187
188
189
190
  m_zmq_context = zmq_ctx_new();
  if(pipe(m_pipes))
    THROW_HW_ERROR(Error) << "Can't open pipe";

  pthread_create(&m_thread_id,NULL,_runFunc,this);
191
192
193
194

  AutoMutex lock(m_cond.mutex());
  while (m_state != Idle)
    m_cond.wait();
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
195
196
197
198
}

Stream::~Stream()
{
199
200
201
202
203
204
205
206
207
  DEB_DESTRUCTOR();

  {
    AutoMutex aLock(m_cond.mutex());
    DEB_TRACE() << "Quitting";
    m_state = Quitting;
    m_cond.broadcast();
    _send_synchro();
  }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
208
209
210
211
212
213
214
215

  if(m_thread_id > 0)
    pthread_join(m_thread_id,NULL);

  close(m_pipes[0]),close(m_pipes[1]);
  zmq_ctx_destroy(m_zmq_context);
}

216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
void Stream::_setStreamMode(bool enabled)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(enabled);
  std::string enabled_str = enabled ? "enabled" : "disabled";
  DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
  setEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
}

bool Stream::_getStreamMode()
{
  DEB_MEMBER_FUNCT();
  std::string enabled_str;
  getEigerParam(m_cam,Requests::STREAM_MODE,enabled_str);
  DEB_TRACE() << "STREAM_MODE:" << DEB_VAR1(enabled_str);
  bool enabled = (enabled_str == "enabled");
  DEB_RETURN() << DEB_VAR1(enabled);
  return enabled;
}

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
236
237
void Stream::start()
{
238
  DEB_MEMBER_FUNCT();
239
  AutoMutex aLock(m_cond.mutex());
240
241
242
243
  if (m_state != Armed)
    THROW_HW_ERROR(Error) << "Stream is not Armed (no global header)";
  DEB_TRACE() << "Running";
  m_state = Running;
244
  m_buffer_mgr->setStartTimestamp(Timestamp::now());
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
245
246
247
248
}

void Stream::stop()
{
249
250
251
252
253
254
  DEB_MEMBER_FUNCT();
  AutoMutex aLock(m_cond.mutex());
  if (!_isRunning())
    return;
  DEB_TRACE() << "Stopped";
  m_state = Stopped;
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
255
256
}

257
258
259
260
261
262
263
void Stream::abort()
{
  DEB_MEMBER_FUNCT();
  AutoMutex aLock(m_cond.mutex());
  _abort();
}

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
264
265
266
267
268
269
270
271
void Stream::_send_synchro()
{
  DEB_MEMBER_FUNCT();

  if(write(m_pipes[1],"|",1) == -1)
    DEB_ERROR() << "Something wrong happened!";
}

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
void Stream::_abort()
{
  DEB_MEMBER_FUNCT();

  if((m_state == Running) || (m_state == Stopped)) {
    DEB_TRACE() << "Aborting";
    m_state = Aborting;
    _send_synchro();
    m_cond.broadcast();
    while((m_state != Idle) && (m_state != Failed))
      m_cond.wait();
  }
  if (m_state == Failed) {
    m_state = Idle;
    THROW_HW_ERROR(Error) << "Stream failed";
  }
}

Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
290
291
bool Stream::isRunning() const
{
292
  DEB_MEMBER_FUNCT();
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
293
  AutoMutex aLock(m_cond.mutex());
294
295
296
297
  DEB_TRACE() << DEB_VAR1(m_state);
  bool running = _isRunning();
  DEB_RETURN() << DEB_VAR1(running);
  return running;
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
}

void Stream::getHeaderDetail(Stream::HeaderDetail& detail) const
{
  AutoMutex lock(m_cond.mutex());
  detail = m_header_detail;
}

void Stream::setHeaderDetail(Stream::HeaderDetail detail)
{
  AutoMutex lock(m_cond.mutex());
  m_header_detail = detail,m_dirty_flag = true;
}

void Stream::setActive(bool active)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(active);

  AutoMutex lock(m_cond.mutex());
318
319
  DEB_TRACE() << DEB_VAR2(m_active, m_state);

320
321
322
323
324
325
  // wait for previous sequence to finish
  while((m_state == Running) || (m_state == Stopped))
    m_cond.wait();

  if(!active) {
    _abort();
326
327
328
329
330
331
332
333
334
  } else if(m_dirty_flag) { //Send parameters only if changed
    switch(m_header_detail) {
    case ALL:
      m_header_detail_str = "all";break;
    case BASIC:
      m_header_detail_str = "basic";break;
    default:
      m_header_detail_str = "none";break;
    }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
335

336
    DEB_TRACE() << "STREAM_HEADER_DETAIL:" << DEB_VAR1(m_header_detail_str);
337
    setEigerParam(m_cam,Requests::STREAM_HEADER_DETAIL,m_header_detail_str);
338
339
340
    m_dirty_flag = false;
  }

341
342
343
344
345
346
347
  if(active != m_active) {
    _setStreamMode(active);
    m_active = active;
  }

  if(!m_active || (m_state == Connected) || (m_state == Armed))
    return;
348
  
349
  m_state = Starting;
350
  m_cond.broadcast();
351
  while(m_state == Starting)
352
    m_cond.wait();
353
354
355
356
  
  if (m_state == Failed) {
    m_state = Idle;
    THROW_HW_ERROR(Error) << "Error starting stream";
357
358
  } else if (m_state != Connected) {
    THROW_HW_ERROR(Error) << "Internal error: " << DEB_VAR1(m_state);
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
  }
}

void Stream::waitArmed(double timeout)
{
  DEB_MEMBER_FUNCT();

  AutoMutex lock(m_cond.mutex());
  Timestamp t0 = Timestamp::now();
  DEB_TRACE() << DEB_VAR1(m_state);
  while(m_state == Connected) {
    double elapsed = Timestamp::now() - t0;
    if (elapsed >= timeout)
      break;
    m_cond.wait(timeout - elapsed);
  }
  if (m_state == Failed)
    m_state = Idle;
  if (m_state != Armed)
    THROW_HW_ERROR(Error) << "Global header not received";
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
379
380
381
382
383
}

HwBufferCtrlObj* Stream::getBufferCtrlObj()
{
  DEB_MEMBER_FUNCT();
384
  return m_buffer_ctrl_obj.get();
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
385
386
387
388
389
390
391
392
}

void* Stream::_runFunc(void *streamPt)
{
  ((Stream*)streamPt)->_run();
  return NULL;
}

393
void Stream::_run()
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
394
{
395
  DEB_MEMBER_FUNCT();
396
  
397
398
399
400
401
402
  AutoMutex lock(m_cond.mutex());
  m_state = Idle;
  while (1) {
    DEB_TRACE() << "Wait";
    m_cond.broadcast();

403
    while((m_state == Idle) || (m_state == Failed))
404
      m_cond.wait();
405
    if(m_state == Quitting)
406
      break;
407
408
409
410
411
    else if(m_state == Starting)
      DEB_TRACE() << "Running: " << DEB_VAR1(m_state);
    else
      DEB_ERROR() << "Invalid " << DEB_VAR1(m_state);

412
    try {
413
414
415
416
      {
	AutoMutexUnlock u(lock);
	_run_sequence();
      }
417
418
419
420
421
      m_state = Idle;
    } catch (Exception& e) {
      m_state = Failed;
    }
  }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
422
423
}

424
void Stream::_run_sequence()
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
425
{
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
  DEB_MEMBER_FUNCT();

  //create stream socket
  void *stream_socket = zmq_socket(m_zmq_context,ZMQ_PULL);
  if (!stream_socket)
    THROW_HW_ERROR(Error) << "Could not create zmq_socket";

  struct zmq_socket_deleter {
    void operator()(void *s) { zmq_close(s); }
  };
  std::unique_ptr<void, zmq_socket_deleter> socket_ptr(stream_socket);

  char stream_endpoint[256];
  snprintf(stream_endpoint,sizeof(stream_endpoint),
	   "tcp://%s:9999",m_cam.getDetectorIp().c_str());
  if(zmq_connect(stream_socket,stream_endpoint) != 0) {
    char error_buffer[256];
    const char *error_msg = strerror_r(errno,error_buffer,sizeof(error_buffer));
    THROW_HW_ERROR(Error) << "Connection error to " << stream_endpoint << ": "
			  << DEB_VAR2(errno, error_msg);
  }

448
449
450
451
452
453
454
455
456
457
458
459
  {
    AutoMutex lock(m_cond.mutex());
    TrigMode trigger_mode;
    m_cam.getTrigMode(trigger_mode);
    m_ext_trigger = ((trigger_mode != IntTrig) &&
		     (trigger_mode != IntTrigMult));
    m_cam.getCompressionType(m_comp_type);

    DEB_TRACE() << "Connected to " << stream_endpoint;
    m_state = Connected;
    m_cond.broadcast();
  }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477

  //  Initialize poll set
  zmq_pollitem_t items [] = {
    { NULL, m_pipes[0], ZMQ_POLLIN, 0 },
    { stream_socket, 0, ZMQ_POLLIN, 0 }
  };

  bool continue_flag = true;
  while(continue_flag) {	// reading loop
    DEB_TRACE() << "Enter poll";
    zmq_poll(items,2,-1);
    DEB_TRACE() << "Exit poll";

    if(items[0].revents & ZMQ_POLLIN) { // reading synchro pipe
      char buffer[1024];
      if(read(m_pipes[0],buffer,sizeof(buffer)) == -1)
	DEB_WARNING() << "Something strange happened!";

478
479
480
481
      {
	AutoMutex lock(m_cond.mutex());
	continue_flag = !((m_state == Aborting) || (m_state == Quitting));
      }
482
483
484
      if (!continue_flag)
	break;
    }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
485

486
487
488
489
490
491
492
493
494
495
496
497
498
499
    if(items[1].revents & ZMQ_POLLIN) { // reading stream
      try {
	continue_flag = _read_zmq_messages(stream_socket);
      } catch (Exception& e) {
	std::ostringstream err_msg;
	err_msg << "Stream error: " << e.getErrMsg();
	Event::Code err_code = Event::CamFault;
	Event *event = new Event(Hardware, Event::Error, Event::Camera,
				 err_code, err_msg.str());
	DEB_EVENT(*event) << DEB_VAR1(*event);
 	m_cam.reportEvent(event);
      }
    }
  }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
500
501
}

502
bool Stream::_read_zmq_messages(void *stream_socket)
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
503
504
{
  DEB_MEMBER_FUNCT();
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536

  MessageList pending_messages;
  pending_messages.reserve(9);
  int more;
  do {
    MessagePtr msg(new Stream::Message());
    zmq_msg_t *zmq_msg = msg->get_msg();
    int ret = zmq_msg_recv(zmq_msg,stream_socket,0);
    if (ret == -1) {
      if (errno == EAGAIN) {
	  DEB_TRACE() << "zmq EAGAIN";
	  break;
      }
      char errno_buffer[256];
      char *errno_msg = strerror_r(errno,errno_buffer,sizeof(errno_buffer));
      THROW_HW_ERROR(Error)
	<< "Error receiving zmq message: "
	<< DEB_VAR3(errno, errno_msg, pending_messages.size());
    }
    more = zmq_msg_more(zmq_msg);
    pending_messages.emplace_back(msg);
  } while(more);

  int nb_messages = pending_messages.size();
  DEB_TRACE() << DEB_VAR1(nb_messages);
  if (nb_messages == 0)
    return true;

  AutoMutex lock(m_cond.mutex());
  bool waiting_global_header = (m_state == Connected);
  bool stopped = (m_state == Stopped);
  lock.unlock();
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
537
  
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
  Json::Value stream_header = _get_json_header(pending_messages[0]);
  std::string htype = stream_header.get("htype","").asString();
  DEB_TRACE() << DEB_VAR1(htype);

  bool is_global_header = (htype.find("dheader-") != std::string::npos);
  if (is_global_header != waiting_global_header) {
    DEB_WARNING() << "Global header mismatch: "
		  << DEB_VAR2(is_global_header, waiting_global_header);
    return true;
  } else if (is_global_header) {
    lock.lock();
    m_state = Armed;
    DEB_TRACE() << "Global header received: " << DEB_VAR1(m_state);
    Json::Value header = _get_global_header(stream_header,pending_messages);
    m_cond.broadcast();
    return true;
  } else if(htype.find("dimage-") != std::string::npos) {
    int frameid = stream_header.get("frame",-1).asInt();
    DEB_TRACE() << DEB_VAR1(frameid);
    //stream_header.get("hash","md5sum")
    if (nb_messages < 3)
      THROW_HW_ERROR(Error) << "Should receive at least 3 messages part, "
			    << "only received " << nb_messages;

    Json::Value data_header = _get_json_header(pending_messages[1]);
    //Data size (width,height)
    Json::Value shape = data_header.get("shape","");
    if (!shape.isArray() || shape.size() != 2)
      THROW_HW_ERROR(Error) << "Invalid data shape: " << shape.asString();
    FrameDim anImageDim;
    anImageDim.setSize(Size(shape[0u].asInt(),shape[1u].asInt()));
    //data type
    ImageType image_type;
    std::string dtype = data_header.get("type","none").asString();
    if(dtype == "int32")
      image_type = Bpp32S;
    else if(dtype == "uint32")
      image_type = Bpp32;
    else if(dtype == "int16")
      image_type = Bpp16S;
    else if(dtype == "uint16")
      image_type = Bpp16;
    else
      THROW_HW_ERROR(Error) << "Invalid " << DEB_VAR1(dtype);
    anImageDim.setImageType(image_type);
    DEB_TRACE() << DEB_VAR1(anImageDim);

    if (frameid == 0) {
      lock.lock();
      m_last_info.encoding = data_header.get("encoding", "").asString();
      m_last_info.frame_dim = anImageDim;
      m_last_info.packed_size = data_header.get("size", "-1").asInt();
      _checkCompression(m_last_info);
591
      lock.unlock();
592
593
594
595
596
597
598
599
600
601
    }

    Json::Value config_header = _get_json_header(pending_messages[3]);
    if (frameid == 0)
      DEB_TRACE() << DEB_VAR1(config_header["start_time"].asString());

    if (stopped) {
      DEB_TRACE() << "Stopped: ignoring data";
      return true;
    }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
602

603
604
605
    HwFrameInfoType frame_info;
    frame_info.acq_frame_nb = frameid;
    void* buffer_ptr = m_buffer_mgr->getFrameBufferPtr(frameid);
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
606
    {
607
608
609
610
611
      lock.lock();
      m_data_2_msg[buffer_ptr] = ImageData{pending_messages[2],
					   anImageDim.getDepth(),
					   m_comp_type};
      lock.unlock();
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
612
    }
613
614

    bool continue_flag = m_buffer_mgr->newFrameReady(frame_info);
615
    m_cam.newFrameAcquired();
616
617
618
619
620
621
622
623
624
625
    bool do_disarm = (m_ext_trigger && m_cam.allFramesAcquired());
    if (!continue_flag && !do_disarm) {
      DEB_WARNING() << "Unexpected " << DEB_VAR1(continue_flag) << ": "
		    << "Disarming camera";
      do_disarm = true;
    }
    if (do_disarm)
      m_cam.disarm();
    return true;
  } else if (htype.find("dseries_end-") != std::string::npos) {
626
    DEB_TRACE() << "Finishing";
627
628
629
630
631
    return false;
  } else {
    DEB_WARNING() << "Unknown header: " << htype;
    return true;
  }
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
632
}
633

634
void Stream::_checkCompression(const StreamInfo& info)
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR2(info, m_endianess);

  const std::string& encoding = info.encoding;

  char endianess = *encoding.rbegin();
  if (endianess != m_endianess)
    THROW_HW_ERROR(Error) << "Endianess mismatch: "
			  << "got " << endianess << ", "
			  << "expected " << m_endianess;

  CompressionType comp_type;
  if (encoding == std::string(1, m_endianess)) {
    comp_type = Camera::NoCompression;
  } else {
    const std::string lz4 = std::string("lz4") + m_endianess;
    if (encoding == lz4) {
      comp_type = Camera::LZ4;
    } else {
      const char *bs;
      switch (info.frame_dim.getImageType()) {
      case Bpp32S: case Bpp32: bs = "bs32-"; break;
      case Bpp16S: case Bpp16: bs = "bs16-"; break;
      case Bpp8S:  case Bpp8:  bs = "bs8-";  break;
      }
      if (encoding == std::string(bs) + lz4)
	comp_type = Camera::BSLZ4;
      else
	THROW_HW_ERROR(Error) << "Unexpected encoding: " << encoding;
    }
  }
  DEB_TRACE() << DEB_VAR1(comp_type);

  if (comp_type != m_comp_type)
    THROW_HW_ERROR(Error) << "Unexpected compression type: " << comp_type;
}

673
674
675
676
677
678
679
void Stream::getLastStreamInfo(StreamInfo& last_info)
{
  DEB_MEMBER_FUNCT();
  last_info = m_last_info;
  DEB_RETURN() << DEB_VAR1(last_info);
}

680
Stream::ImageData Stream::get_msg(void* aDataBuffer)
681
682
683
684
685
686
687
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(aDataBuffer);

  AutoMutex lock(m_cond.mutex());
  Data2Message::iterator it = m_data_2_msg.find(aDataBuffer);
  if(it == m_data_2_msg.end())
688
689
690
    THROW_HW_ERROR(Error) << "Can't find image_data message";
  ImageData img_data = it->second;
  m_data_2_msg.erase(it);
691
692
693
  lock.unlock();
  if (DEB_CHECK_ANY(DebTypeReturn))
    DEB_RETURN() << DEB_VAR1(img_data);
694
  return img_data;
695
696
697
698
699
700
701
702
703
}

void Stream::release_all_msgs()
{
  DEB_MEMBER_FUNCT();

  AutoMutex lock(m_cond.mutex());
  m_data_2_msg.clear();
}