EigerSavingCtrlObj.cpp 13.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2015
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#include <algorithm>
#include "EigerSavingCtrlObj.h"
24
#include "EigerCameraRequests.h"
25
26
27
28
29
30
31
32
33

#include <eigerapi/Requests.h>
#include <eigerapi/EigerDefines.h>

using namespace lima;
using namespace lima::Eiger;
using namespace eigerapi;

const int MAX_SIMULTANEOUS_DOWNLOAD = 4;
34
35
36
37
38
39
40
41
42
43
44
45
/*----------------------------------------------------------------------------
			     HDF5 HEADER
----------------------------------------------------------------------------*/
struct HeaderKey2Index
{
  const char*		key_name;
  Requests::PARAM_NAME	param_name;
};

static HeaderKey2Index available_header[] = {
  {"beam_center_x",Requests::HEADER_BEAM_CENTER_X},
  {"beam_center_y",Requests::HEADER_BEAM_CENTER_Y},
46
47
  {"chi_increment",Requests::HEADER_CHI_INCREMENT},
  {"chi_start",Requests::HEADER_CHI_START},
48
  {"detector_distance",Requests::HEADER_DETECTOR_DISTANCE},
49
50
51
52
53
54
  {"kappa_increment",Requests::HEADER_KAPPA_INCREMENT},
  {"kappa_start",Requests::HEADER_KAPPA_START},
  {"omega_increment",Requests::HEADER_OMEGA_INCREMENT},
  {"omega_start",Requests::HEADER_OMEGA_START},
  {"phi_increment",Requests::HEADER_PHI_INCREMENT},
  {"phi_start",Requests::HEADER_PHI_START},
55
56
  {"wavelength",Requests::HEADER_WAVELENGTH},
};
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*----------------------------------------------------------------------------
			    Polling thread
----------------------------------------------------------------------------*/
class SavingCtrlObj::_PollingThread : public Thread
{
  DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj","_PollingThread");
public:
  _PollingThread(SavingCtrlObj&,eigerapi::Requests*);
  virtual ~_PollingThread();
protected:
  virtual void threadFunction();
private:
  SavingCtrlObj&	m_saving;
  eigerapi::Requests*	m_requests;
};

SavingCtrlObj::SavingCtrlObj(Camera& cam) :
74
  HwSavingCtrlObj(HwSavingCtrlObj::COMMON_HEADER,false),
75
76
77
78
79
80
81
82
83
  m_cam(cam),
  m_nb_file_to_watch(0),
  m_nb_file_transfer_started(0),
  m_concurrent_download(0),
  m_poll_master_file(false),
  m_quit(false)
{
  m_polling_thread = new _PollingThread(*this,this->m_cam.m_requests);
  m_polling_thread->start();
84
85
86
87
88
89
90
  // Known keys for common header
  int nb_header_key = sizeof(available_header) / sizeof(HeaderKey2Index);
  for(int i = 0;i < nb_header_key;++i)
    {
      HeaderKey2Index& index = available_header[i];
      m_availables_header_keys[index.key_name] = index.param_name;
    }
91
92
93
94
95
96
97
98
99
100
101
}

/*----------------------------------------------------------------------------
			End download callback
----------------------------------------------------------------------------*/
class SavingCtrlObj::_EndDownloadCallback : public CurlLoop::FutureRequest::Callback
{
  DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj","_EndDownloadCallback");
public:
  _EndDownloadCallback(SavingCtrlObj&,const std::string &filename);

102
103
  virtual void status_changed(CurlLoop::FutureRequest::Status,
			      std::string error);
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
private:
  SavingCtrlObj&	m_saving;
  std::string		m_filename;
};

/*----------------------------------------------------------------------------
			    SavingCtrlObj
----------------------------------------------------------------------------*/
SavingCtrlObj::~SavingCtrlObj()
{
  delete m_polling_thread;
}

void SavingCtrlObj::getPossibleSaveFormat(std::list<std::string> &format_list) const
{
  format_list.push_back(HwSavingCtrlObj::HDF5_FORMAT_STR);
}

122
123
124
125
void SavingCtrlObj::setCommonHeader(const HwSavingCtrlObj::HeaderMap& header)
{
  DEB_MEMBER_FUNCT();

126
127
  MultiParamRequest synchro(m_cam);

128
129
130
131
132
133
  for(HwSavingCtrlObj::HeaderMap::const_iterator i = header.begin();
      i != header.end();++i)
    {
      std::map<std::string,int>::iterator header_index = m_availables_header_keys.find(i->first);
      if(header_index == m_availables_header_keys.end())
	THROW_HW_ERROR(Error) << "Header key: " << i->first << " not yet managed ";
134
      synchro.addSet(Requests::PARAM_NAME(header_index->second),i->second);
135
136
    }

137
  synchro.wait();
138
139
140
141
142
143
144
}

void SavingCtrlObj::resetCommonHeader()
{
  // todo
}

145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
void SavingCtrlObj::setSerieId(int value)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(value);

  AutoMutex lock(m_cond.mutex());
  m_serie_id = value;
}

SavingCtrlObj::Status SavingCtrlObj::getStatus()
{
  DEB_MEMBER_FUNCT();
  AutoMutex lock(m_cond.mutex());
  bool status = m_poll_master_file ||
    (m_nb_file_to_watch != m_nb_file_transfer_started);
  DEB_RETURN() << DEB_VAR2(status,m_error_msg);
  if(m_error_msg.empty())
    return status ? RUNNING : IDLE;
  else
    return ERROR;
}

void SavingCtrlObj::stop()
{
  DEB_MEMBER_FUNCT();
  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = m_nb_file_to_watch = 0;
  m_poll_master_file = false;
}

175
void SavingCtrlObj::_setActive(bool active, int)
176
177
178
179
180
{
  DEB_MEMBER_FUNCT();

  const char *active_str = active ? "enabled" : "disabled";
  DEB_TRACE() << "FILEWRITER_MODE:" << DEB_VAR1(active_str);
181
  setEigerParam(m_cam,Requests::FILEWRITER_MODE,active_str);
182
183
}

184
void SavingCtrlObj::_prepare(int)
185
186
187
{
  DEB_MEMBER_FUNCT();

188
189
  MultiParamRequest synchro(m_cam);

190
191
  int frames_per_file = int(m_frames_per_file);
  DEB_TRACE() << "NIMAGES_PER_FILE:" << DEB_VAR1(frames_per_file);
192
  synchro.addSet(Requests::NIMAGES_PER_FILE,frames_per_file);
193
  DEB_TRACE() << "FILEWRITER_NAME_PATTERN" << DEB_VAR1(m_prefix);
194
195
  synchro.addSet(Requests::FILEWRITER_NAME_PATTERN,m_prefix);
  synchro.wait();
196
197
198
199
200
201

  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = m_nb_file_to_watch = 0;
  m_poll_master_file = true;
}

202
void SavingCtrlObj::_start(int)
203
204
205
206
207
208
209
210
211
212
213
214
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(m_active);

  int nb_frames;	m_cam.getNbFrames(nb_frames);
  double expo_time;	m_cam.getExpTime(expo_time);

  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = 0;
  m_nb_file_to_watch = nb_frames / m_frames_per_file;
  if(nb_frames % m_frames_per_file) ++m_nb_file_to_watch;

215
  m_waiting_time = (expo_time * std::min(nb_frames,int(m_frames_per_file))) / 2.;
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
  
  m_cond.broadcast();
  
  DEB_TRACE() << DEB_VAR2(m_nb_file_to_watch,m_waiting_time);
}
//----------------------------------------------------------------------------
//
//----------------------------------------------------------------------------

SavingCtrlObj::_PollingThread::_PollingThread(SavingCtrlObj& saving,
					      eigerapi::Requests* requests) :
  m_saving(saving),
  m_requests(requests)
{
  pthread_attr_setscope(&m_thread_attr,PTHREAD_SCOPE_PROCESS);
}

SavingCtrlObj::_PollingThread::~_PollingThread()
{
  AutoMutex lock(m_saving.m_cond.mutex());
  m_saving.m_quit = true;
  m_saving.m_cond.broadcast();
  lock.unlock();

  join();
}

void SavingCtrlObj::_PollingThread::threadFunction()
{
  DEB_MEMBER_FUNCT();
246
247
248
249
250
251

  Camera::ApiGeneration api;
  m_saving.m_cam.getApiGeneration(api);
  Requests::PARAM_NAME ls_name = ((api == Camera::Eiger1) ? Requests::FILEWRITER_LS :
							    Requests::FILEWRITER_LS2);

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
  AutoMutex lock(m_saving.m_cond.mutex());
  
  while(!m_saving.m_quit)
    {
      while(!m_saving.m_quit &&
	    (m_saving.m_concurrent_download >= MAX_SIMULTANEOUS_DOWNLOAD ||
	     (!m_saving.m_poll_master_file &&
	      (m_saving.m_nb_file_to_watch == 
	       m_saving.m_nb_file_transfer_started))))
	{
	  m_saving.m_cond.broadcast();
	  m_saving.m_cond.wait();
	}

      if(m_saving.m_quit) break;
      std::string prefix = m_saving.m_prefix;
      std::string directory = m_saving.m_directory;

270
271
272
273
274
275
276
277
278
279
280
281
282
      char id_str[] = "$id";
      size_t id_pos = prefix.find(id_str);
      if (id_pos != std::string::npos) {
	char serie_id_str[32];
	snprintf(serie_id_str, sizeof(serie_id_str), "%d", m_saving.m_serie_id);
	std::string aux = prefix.substr(0, id_pos) + serie_id_str;
	size_t id_end = id_pos + sizeof(id_str);
	if (prefix.size() > id_end)
	  aux += prefix.substr(id_end);
	prefix = aux;
      }
      DEB_TRACE() << DEB_VAR2(directory, prefix);

283
284
285
286
287
      int total_nb_frames; m_saving.m_cam.getNbFrames(total_nb_frames);
      
      int frames_per_file = m_saving.m_frames_per_file;

      //Ls request
288
289
290
291
292
      std::vector<std::string> files;
      {
	AutoMutexUnlock u(lock);
	getEigerParam(m_saving.m_cam,ls_name,files);
      }
293
294
295
296
297
298
299

      // try to download master file
      if(m_saving.m_poll_master_file)
	{
	  std::ostringstream src_file_name;
	  src_file_name << prefix <<  "_master.h5";
	  bool master_file_found = false;
300
301
	  for(std::vector<std::string>::iterator i = files.begin();
	      !master_file_found && i != files.end();++i)
302
303
304
305
306
307
	    master_file_found = *i == src_file_name.str();

	  if(master_file_found)
	    {
	      std::string master_file_name = prefix + "_master.h5";
	      std::string dest_path = directory + "/" + master_file_name;
308
	      TransferReq master_file_req;
309
310
311
312
313
314
315
	      startEigerTransfer(m_saving.m_cam,src_file_name.str(),
				 dest_path,lock,master_file_req);
	      if (!master_file_req) {
		// stop the loop
		m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		continue;
	      }
316
	      CallbackPtr end_cbk(new _EndDownloadCallback(m_saving,src_file_name.str()));
317
318
319
320
	      {
		AutoMutexUnlock u(lock);
		master_file_req->register_callback(end_cbk);
	      }
321
322
323
324
325
326
327
328
329
	      m_saving.m_poll_master_file = false;
	      ++m_saving.m_concurrent_download;
	    }
	}
      
      if(m_saving.m_nb_file_transfer_started < m_saving.m_nb_file_to_watch)
	{
	  int next_file_nb = m_saving.m_nb_file_transfer_started + 1;

330
	  std::sort(files.begin(),files.end());
331
332
333
334
335
336
337
	  char file_nb[32];
	  snprintf(file_nb,sizeof(file_nb),"%.6d",next_file_nb);

	  std::ostringstream src_file_name;
	  src_file_name << prefix << "_data_"  << file_nb << ".h5";
	  
	  //init find the first file_name of the list
338
339
	  std::vector<std::string>::iterator file_name = files.begin();
	  for(;file_name != files.end();++file_name)
340
341
	    if(*file_name == src_file_name.str()) break;

342
	  for(;file_name != files.end() &&
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
		m_saving.m_concurrent_download < MAX_SIMULTANEOUS_DOWNLOAD;
	      ++file_name,++next_file_nb)
	    {

	      snprintf(file_nb,sizeof(file_nb),"%.6d",next_file_nb);
	      src_file_name.clear();src_file_name.seekp(0);
	      src_file_name << prefix << "_data_"
			    << file_nb << ".h5";
	      if(*file_name == src_file_name.str()) // will start the transfer
		{
		  if(next_file_nb > m_saving.m_nb_file_to_watch)
		    {
		      DEB_WARNING() << "Something weird happened " 
				    << DEB_VAR2(next_file_nb,m_saving.m_nb_file_to_watch);
		      break;
		    }

		  DEB_TRACE() << "Start transfer file: " << DEB_VAR1(*file_name);
		  std::string dest_path = directory + "/" + src_file_name.str();
362
		  TransferReq file_req;
363
364
365
366
367
368
369
		  startEigerTransfer(m_saving.m_cam,src_file_name.str(),
				     dest_path, lock,file_req);
		  if (!file_req) {
		    // stop the loop
		    m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		    break;
		  }
370
		  ++m_saving.m_nb_file_transfer_started,++m_saving.m_concurrent_download;
371
		  CallbackPtr end_cbk(new _EndDownloadCallback(m_saving,src_file_name.str()));
372
373
374
375
		  {
		    AutoMutexUnlock u(lock);
		    file_req->register_callback(end_cbk);
		  }
376
377
378
379
380
381
382
383
384

		  if(m_saving.m_callback)
		    {
		      int written_frame = m_saving.m_nb_file_transfer_started * frames_per_file;
		      if(written_frame > total_nb_frames)
			written_frame = total_nb_frames;
		      
		      //lima index start at 0
		      --written_frame;
385
386
387
388
389
		      bool continueFlag;
		      {
			AutoMutexUnlock u(lock);
			continueFlag = m_saving.m_callback->newFrameWritten(written_frame);
		      }
390
391
392
393
394
395
396
397
398
399
400
401
402
		      if(!continueFlag) // stop the loop
			m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		    }
		}
	      else
		break;
	    }
	}

      m_saving.m_cond.wait(m_saving.m_waiting_time);
    }
}

403
404
405
406
407
408
void SavingCtrlObj::_download_finished(std::string filename, bool ok,
				       std::string error)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR3(filename, ok, error);

409
  m_cam.newFrameAcquired();
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425

  AutoMutex lock(m_cond.mutex());
  if(!ok)
    {
      m_error_msg = "Failed to download file: ";
      m_error_msg += filename;
      DEB_ERROR() << m_error_msg << ": " << error;
      //Stop the polling
      m_poll_master_file = false;
      m_nb_file_transfer_started = m_nb_file_to_watch = 0;
    }

  --m_concurrent_download;
  m_cond.broadcast();
}

426
427
428
429
430
431
432
433
434
435
436
/*----------------------------------------------------------------------------
		      class _EndDownloadCallback
----------------------------------------------------------------------------*/
SavingCtrlObj::_EndDownloadCallback::_EndDownloadCallback(SavingCtrlObj& saving,
							  const std::string& filename) :
  m_saving(saving),
  m_filename(filename)
{
}

void SavingCtrlObj::_EndDownloadCallback::
437
status_changed(CurlLoop::FutureRequest::Status status, std::string error)
438
439
{
  DEB_MEMBER_FUNCT();
440
441
442
  DEB_PARAM() << DEB_VAR2(status, error);
  bool ok = (status == CurlLoop::FutureRequest::OK);
  m_saving._download_finished(m_filename, ok, error);
443
}