EigerSavingCtrlObj.cpp 13.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2015
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#include <algorithm>
#include "EigerSavingCtrlObj.h"
24
#include "EigerCameraRequests.h"
25
26
27
28
29
30
31
32
33

#include <eigerapi/Requests.h>
#include <eigerapi/EigerDefines.h>

using namespace lima;
using namespace lima::Eiger;
using namespace eigerapi;

const int MAX_SIMULTANEOUS_DOWNLOAD = 4;
34
35
36
37
38
39
40
41
42
43
44
45
/*----------------------------------------------------------------------------
			     HDF5 HEADER
----------------------------------------------------------------------------*/
struct HeaderKey2Index
{
  const char*		key_name;
  Requests::PARAM_NAME	param_name;
};

static HeaderKey2Index available_header[] = {
  {"beam_center_x",Requests::HEADER_BEAM_CENTER_X},
  {"beam_center_y",Requests::HEADER_BEAM_CENTER_Y},
46
47
  {"chi_increment",Requests::HEADER_CHI_INCREMENT},
  {"chi_start",Requests::HEADER_CHI_START},
48
  {"detector_distance",Requests::HEADER_DETECTOR_DISTANCE},
49
50
51
52
53
54
  {"kappa_increment",Requests::HEADER_KAPPA_INCREMENT},
  {"kappa_start",Requests::HEADER_KAPPA_START},
  {"omega_increment",Requests::HEADER_OMEGA_INCREMENT},
  {"omega_start",Requests::HEADER_OMEGA_START},
  {"phi_increment",Requests::HEADER_PHI_INCREMENT},
  {"phi_start",Requests::HEADER_PHI_START},
55
56
  {"wavelength",Requests::HEADER_WAVELENGTH},
};
57
58
59
60
61
/*----------------------------------------------------------------------------
			    Polling thread
----------------------------------------------------------------------------*/
class SavingCtrlObj::_PollingThread : public Thread
{
62
  DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj::_PollingThread","Eiger");
63
64
65
66
67
68
69
70
71
72
73
public:
  _PollingThread(SavingCtrlObj&,eigerapi::Requests*);
  virtual ~_PollingThread();
protected:
  virtual void threadFunction();
private:
  SavingCtrlObj&	m_saving;
  eigerapi::Requests*	m_requests;
};

SavingCtrlObj::SavingCtrlObj(Camera& cam) :
74
  HwSavingCtrlObj(HwSavingCtrlObj::COMMON_HEADER,false),
75
76
77
78
79
80
81
82
83
  m_cam(cam),
  m_nb_file_to_watch(0),
  m_nb_file_transfer_started(0),
  m_concurrent_download(0),
  m_poll_master_file(false),
  m_quit(false)
{
  m_polling_thread = new _PollingThread(*this,this->m_cam.m_requests);
  m_polling_thread->start();
84
85
86
87
88
89
90
  // Known keys for common header
  int nb_header_key = sizeof(available_header) / sizeof(HeaderKey2Index);
  for(int i = 0;i < nb_header_key;++i)
    {
      HeaderKey2Index& index = available_header[i];
      m_availables_header_keys[index.key_name] = index.param_name;
    }
91
92
93
94
95
96
97
}

/*----------------------------------------------------------------------------
			End download callback
----------------------------------------------------------------------------*/
class SavingCtrlObj::_EndDownloadCallback : public CurlLoop::FutureRequest::Callback
{
98
  DEB_CLASS_NAMESPC(DebModCamera,"SavingCtrlObj::_EndDownloadCallback","Eiger");
99
100
101
public:
  _EndDownloadCallback(SavingCtrlObj&,const std::string &filename);

102
103
  virtual void status_changed(CurlLoop::FutureRequest::Status,
			      std::string error);
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
private:
  SavingCtrlObj&	m_saving;
  std::string		m_filename;
};

/*----------------------------------------------------------------------------
			    SavingCtrlObj
----------------------------------------------------------------------------*/
SavingCtrlObj::~SavingCtrlObj()
{
  delete m_polling_thread;
}

void SavingCtrlObj::getPossibleSaveFormat(std::list<std::string> &format_list) const
{
  format_list.push_back(HwSavingCtrlObj::HDF5_FORMAT_STR);
}

122
123
124
125
void SavingCtrlObj::setCommonHeader(const HwSavingCtrlObj::HeaderMap& header)
{
  DEB_MEMBER_FUNCT();

126
127
  MultiParamRequest synchro(m_cam);

128
129
130
131
132
133
  for(HwSavingCtrlObj::HeaderMap::const_iterator i = header.begin();
      i != header.end();++i)
    {
      std::map<std::string,int>::iterator header_index = m_availables_header_keys.find(i->first);
      if(header_index == m_availables_header_keys.end())
	THROW_HW_ERROR(Error) << "Header key: " << i->first << " not yet managed ";
134
      synchro.addSet(Requests::PARAM_NAME(header_index->second),i->second);
135
136
    }

137
  synchro.wait();
138
139
140
141
142
143
144
}

void SavingCtrlObj::resetCommonHeader()
{
  // todo
}

145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
void SavingCtrlObj::setSerieId(int value)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(value);

  AutoMutex lock(m_cond.mutex());
  m_serie_id = value;
}

SavingCtrlObj::Status SavingCtrlObj::getStatus()
{
  DEB_MEMBER_FUNCT();
  AutoMutex lock(m_cond.mutex());
  bool status = m_poll_master_file ||
    (m_nb_file_to_watch != m_nb_file_transfer_started);
  DEB_RETURN() << DEB_VAR2(status,m_error_msg);
  if(m_error_msg.empty())
    return status ? RUNNING : IDLE;
  else
    return ERROR;
}

void SavingCtrlObj::stop()
{
  DEB_MEMBER_FUNCT();
  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = m_nb_file_to_watch = 0;
  m_poll_master_file = false;
}

175
void SavingCtrlObj::_setActive(bool active, int)
176
177
178
179
180
{
  DEB_MEMBER_FUNCT();

  const char *active_str = active ? "enabled" : "disabled";
  DEB_TRACE() << "FILEWRITER_MODE:" << DEB_VAR1(active_str);
181
  setEigerParam(m_cam,Requests::FILEWRITER_MODE,active_str);
182
183
}

184
void SavingCtrlObj::_prepare(int)
185
186
187
{
  DEB_MEMBER_FUNCT();

188
189
  MultiParamRequest synchro(m_cam);

190
191
  int frames_per_file = int(m_frames_per_file);
  DEB_TRACE() << "NIMAGES_PER_FILE:" << DEB_VAR1(frames_per_file);
192
  synchro.addSet(Requests::NIMAGES_PER_FILE,frames_per_file);
193
  DEB_TRACE() << "FILEWRITER_NAME_PATTERN" << DEB_VAR1(m_prefix);
194
195
  synchro.addSet(Requests::FILEWRITER_NAME_PATTERN,m_prefix);
  synchro.wait();
196
197
198
199
200
201

  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = m_nb_file_to_watch = 0;
  m_poll_master_file = true;
}

202
void SavingCtrlObj::_start(int)
203
204
205
206
207
208
209
210
211
212
213
214
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR1(m_active);

  int nb_frames;	m_cam.getNbFrames(nb_frames);
  double expo_time;	m_cam.getExpTime(expo_time);

  AutoMutex lock(m_cond.mutex());
  m_nb_file_transfer_started = 0;
  m_nb_file_to_watch = nb_frames / m_frames_per_file;
  if(nb_frames % m_frames_per_file) ++m_nb_file_to_watch;

215
  m_waiting_time = (expo_time * std::min(nb_frames,int(m_frames_per_file))) / 2.;
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
  
  m_cond.broadcast();
  
  DEB_TRACE() << DEB_VAR2(m_nb_file_to_watch,m_waiting_time);
}
//----------------------------------------------------------------------------
//
//----------------------------------------------------------------------------

SavingCtrlObj::_PollingThread::_PollingThread(SavingCtrlObj& saving,
					      eigerapi::Requests* requests) :
  m_saving(saving),
  m_requests(requests)
{
  pthread_attr_setscope(&m_thread_attr,PTHREAD_SCOPE_PROCESS);
}

SavingCtrlObj::_PollingThread::~_PollingThread()
{
  AutoMutex lock(m_saving.m_cond.mutex());
  m_saving.m_quit = true;
  m_saving.m_cond.broadcast();
  lock.unlock();

  join();
}

void SavingCtrlObj::_PollingThread::threadFunction()
{
  DEB_MEMBER_FUNCT();
246
247
248
249
250
251

  Camera::ApiGeneration api;
  m_saving.m_cam.getApiGeneration(api);
  Requests::PARAM_NAME ls_name = ((api == Camera::Eiger1) ? Requests::FILEWRITER_LS :
							    Requests::FILEWRITER_LS2);

252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
  AutoMutex lock(m_saving.m_cond.mutex());
  
  while(!m_saving.m_quit)
    {
      while(!m_saving.m_quit &&
	    (m_saving.m_concurrent_download >= MAX_SIMULTANEOUS_DOWNLOAD ||
	     (!m_saving.m_poll_master_file &&
	      (m_saving.m_nb_file_to_watch == 
	       m_saving.m_nb_file_transfer_started))))
	{
	  m_saving.m_cond.broadcast();
	  m_saving.m_cond.wait();
	}

      if(m_saving.m_quit) break;
      std::string prefix = m_saving.m_prefix;
      std::string directory = m_saving.m_directory;

270
271
272
273
274
275
276
277
278
279
280
281
282
      char id_str[] = "$id";
      size_t id_pos = prefix.find(id_str);
      if (id_pos != std::string::npos) {
	char serie_id_str[32];
	snprintf(serie_id_str, sizeof(serie_id_str), "%d", m_saving.m_serie_id);
	std::string aux = prefix.substr(0, id_pos) + serie_id_str;
	size_t id_end = id_pos + sizeof(id_str);
	if (prefix.size() > id_end)
	  aux += prefix.substr(id_end);
	prefix = aux;
      }
      DEB_TRACE() << DEB_VAR2(directory, prefix);

283
284
285
286
287
      int total_nb_frames; m_saving.m_cam.getNbFrames(total_nb_frames);
      
      int frames_per_file = m_saving.m_frames_per_file;

      //Ls request
288
289
290
291
292
      std::vector<std::string> files;
      {
	AutoMutexUnlock u(lock);
	getEigerParam(m_saving.m_cam,ls_name,files);
      }
293
294
295
296
297
298
299

      // try to download master file
      if(m_saving.m_poll_master_file)
	{
	  std::ostringstream src_file_name;
	  src_file_name << prefix <<  "_master.h5";
	  bool master_file_found = false;
300
301
	  for(std::vector<std::string>::iterator i = files.begin();
	      !master_file_found && i != files.end();++i)
302
303
304
305
306
307
	    master_file_found = *i == src_file_name.str();

	  if(master_file_found)
	    {
	      std::string master_file_name = prefix + "_master.h5";
	      std::string dest_path = directory + "/" + master_file_name;
308
	      TransferReq master_file_req;
309
310
311
	      master_file_req = startEigerTransfer(m_saving.m_cam,
						   src_file_name.str(),
						   dest_path,lock);
312
313
314
315
316
	      if (!master_file_req) {
		// stop the loop
		m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		continue;
	      }
317
	      CallbackPtr end_cbk(new _EndDownloadCallback(m_saving,src_file_name.str()));
318
319
320
321
	      {
		AutoMutexUnlock u(lock);
		master_file_req->register_callback(end_cbk);
	      }
322
323
324
325
326
327
328
329
330
	      m_saving.m_poll_master_file = false;
	      ++m_saving.m_concurrent_download;
	    }
	}
      
      if(m_saving.m_nb_file_transfer_started < m_saving.m_nb_file_to_watch)
	{
	  int next_file_nb = m_saving.m_nb_file_transfer_started + 1;

331
	  std::sort(files.begin(),files.end());
332
333
334
335
336
337
338
	  char file_nb[32];
	  snprintf(file_nb,sizeof(file_nb),"%.6d",next_file_nb);

	  std::ostringstream src_file_name;
	  src_file_name << prefix << "_data_"  << file_nb << ".h5";
	  
	  //init find the first file_name of the list
339
340
	  std::vector<std::string>::iterator file_name = files.begin();
	  for(;file_name != files.end();++file_name)
341
342
	    if(*file_name == src_file_name.str()) break;

343
	  for(;file_name != files.end() &&
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
		m_saving.m_concurrent_download < MAX_SIMULTANEOUS_DOWNLOAD;
	      ++file_name,++next_file_nb)
	    {

	      snprintf(file_nb,sizeof(file_nb),"%.6d",next_file_nb);
	      src_file_name.clear();src_file_name.seekp(0);
	      src_file_name << prefix << "_data_"
			    << file_nb << ".h5";
	      if(*file_name == src_file_name.str()) // will start the transfer
		{
		  if(next_file_nb > m_saving.m_nb_file_to_watch)
		    {
		      DEB_WARNING() << "Something weird happened " 
				    << DEB_VAR2(next_file_nb,m_saving.m_nb_file_to_watch);
		      break;
		    }

		  DEB_TRACE() << "Start transfer file: " << DEB_VAR1(*file_name);
		  std::string dest_path = directory + "/" + src_file_name.str();
363
		  TransferReq file_req;
364
365
366
		  file_req = startEigerTransfer(m_saving.m_cam,
						src_file_name.str(),
						dest_path,lock);
367
368
369
370
371
		  if (!file_req) {
		    // stop the loop
		    m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		    break;
		  }
372
		  ++m_saving.m_nb_file_transfer_started,++m_saving.m_concurrent_download;
373
		  CallbackPtr end_cbk(new _EndDownloadCallback(m_saving,src_file_name.str()));
374
375
376
377
		  {
		    AutoMutexUnlock u(lock);
		    file_req->register_callback(end_cbk);
		  }
378
379
380
381
382
383
384
385
386

		  if(m_saving.m_callback)
		    {
		      int written_frame = m_saving.m_nb_file_transfer_started * frames_per_file;
		      if(written_frame > total_nb_frames)
			written_frame = total_nb_frames;
		      
		      //lima index start at 0
		      --written_frame;
387
388
389
390
391
		      bool continueFlag;
		      {
			AutoMutexUnlock u(lock);
			continueFlag = m_saving.m_callback->newFrameWritten(written_frame);
		      }
392
393
394
395
396
397
398
399
400
401
402
403
404
		      if(!continueFlag) // stop the loop
			m_saving.m_nb_file_to_watch = m_saving.m_nb_file_transfer_started = 0;
		    }
		}
	      else
		break;
	    }
	}

      m_saving.m_cond.wait(m_saving.m_waiting_time);
    }
}

405
406
407
408
409
410
void SavingCtrlObj::_download_finished(std::string filename, bool ok,
				       std::string error)
{
  DEB_MEMBER_FUNCT();
  DEB_PARAM() << DEB_VAR3(filename, ok, error);

411
  m_cam.newFrameAcquired();
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427

  AutoMutex lock(m_cond.mutex());
  if(!ok)
    {
      m_error_msg = "Failed to download file: ";
      m_error_msg += filename;
      DEB_ERROR() << m_error_msg << ": " << error;
      //Stop the polling
      m_poll_master_file = false;
      m_nb_file_transfer_started = m_nb_file_to_watch = 0;
    }

  --m_concurrent_download;
  m_cond.broadcast();
}

428
429
430
431
432
433
434
435
436
437
438
/*----------------------------------------------------------------------------
		      class _EndDownloadCallback
----------------------------------------------------------------------------*/
SavingCtrlObj::_EndDownloadCallback::_EndDownloadCallback(SavingCtrlObj& saving,
							  const std::string& filename) :
  m_saving(saving),
  m_filename(filename)
{
}

void SavingCtrlObj::_EndDownloadCallback::
439
status_changed(CurlLoop::FutureRequest::Status status, std::string error)
440
441
{
  DEB_MEMBER_FUNCT();
442
443
444
  DEB_PARAM() << DEB_VAR2(status, error);
  bool ok = (status == CurlLoop::FutureRequest::OK);
  m_saving._download_finished(m_filename, ok, error);
445
}