Commit 1fc2e6b6 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Synchronize after disarm command at end of acquisition

* Allow executing a CurlLoop::FutureRequest::Callback in separate thread
  and improve error diagnostics
parent fa98454a
Pipeline #20498 failed with stages
in 1 minute and 7 seconds
......@@ -48,10 +48,11 @@ namespace eigerapi
public:
Callback() {};
virtual ~Callback() {}
virtual void status_changed(FutureRequest::Status) = 0;
virtual void status_changed(FutureRequest::Status status,
std::string error) = 0;
};
void register_callback(std::shared_ptr<Callback>&);
typedef std::shared_ptr<Callback> CallbackPtr;
void register_callback(CallbackPtr& cbk, bool in_thread = false);
CURL* get_handle() {return m_handle;}
const std::string& get_url() {return m_url;}
......@@ -59,6 +60,9 @@ namespace eigerapi
FutureRequest(const std::string& url);
protected:
virtual void _request_finished() {};
void _status_changed();
static void *_callback_thread_runFunc(void *data);
CURL* m_handle;
Status m_status;
......@@ -66,7 +70,8 @@ namespace eigerapi
// Synchro
mutable pthread_mutex_t m_lock;
mutable pthread_cond_t m_cond;
std::shared_ptr<Callback>* m_cbk;
CallbackPtr* m_cbk;
bool m_cbk_in_thread;
std::string m_url;
};
......@@ -77,6 +82,7 @@ namespace eigerapi
void add_request(std::shared_ptr<FutureRequest>);
void cancel_request(std::shared_ptr<FutureRequest>);
private:
typedef std::map<CURL*,std::shared_ptr<FutureRequest> > MapRequests;
typedef std::list<std::shared_ptr<FutureRequest> > ListRequests;
......
......@@ -96,8 +96,8 @@ CurlLoop::~CurlLoop()
{
quit();
pthread_mutex_destroy(&m_lock);
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_lock);
}
void CurlLoop::quit()
......@@ -272,7 +272,7 @@ void CurlLoop::_run()
}
pthread_cond_broadcast(&req->m_cond);
if(req->m_cbk)
(*req->m_cbk)->status_changed(req->m_status);
req->_status_changed();
req->_request_finished();
lock.lock();
......@@ -324,6 +324,9 @@ CurlLoop::FutureRequest::~FutureRequest()
{
curl_easy_cleanup(m_handle);
delete m_cbk;
pthread_cond_destroy(&m_cond);
pthread_mutex_destroy(&m_lock);
}
void CurlLoop::FutureRequest::wait(double timeout,bool lock_flag) const
......@@ -366,13 +369,60 @@ CurlLoop::FutureRequest::get_status() const
return m_status;
}
void CurlLoop::FutureRequest::register_callback(std::shared_ptr<Callback>& cbk)
void CurlLoop::FutureRequest::register_callback(CallbackPtr& cbk,
bool in_thread)
{
if (!cbk)
THROW_EIGER_EXCEPTION("register_callback","invalid callback");
Lock lock(&m_lock);
m_cbk = new std::shared_ptr<Callback>(cbk);
Status status = m_status;
lock.unLock();
delete m_cbk;
m_cbk = new CallbackPtr(cbk);
m_cbk_in_thread = in_thread;
if(m_status != RUNNING)
_status_changed();
}
struct StatusChangedCallback {
CurlLoop::FutureRequest::CallbackPtr cbk;
CurlLoop::FutureRequest::Status status;
std::string error;
void fire() { cbk->status_changed(status, error); }
};
typedef std::auto_ptr<StatusChangedCallback> StatusChangedCallbackPtr;
if(status != RUNNING)
cbk->status_changed(status);
void CurlLoop::FutureRequest::_status_changed()
{
StatusChangedCallback cbk{*m_cbk, m_status, m_error_code};
if (!m_cbk_in_thread) {
cbk.fire();
return;
}
#define ERROR(x) \
do { \
std::cout << "CurlLoop::FutureRequest::_status_changed: " \
<<"Error: " << x << std::endl; \
} while (0)
StatusChangedCallbackPtr tcbk(new StatusChangedCallback(cbk));
pthread_t thread;
pthread_attr_t attr;
if (pthread_attr_init(&attr))
ERROR("could not init thread attr");
else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
ERROR("could not set thread attr detached state");
else if (pthread_create(&thread, &attr, _callback_thread_runFunc, tcbk.get()))
ERROR("could not create thread");
else
tcbk.release();
#undef ERROR
}
void *CurlLoop::FutureRequest::_callback_thread_runFunc(void *data)
{
StatusChangedCallbackPtr cbk((StatusChangedCallback *) data);
cbk->fire();
return NULL;
}
......@@ -91,7 +91,7 @@ typedef Requests::TransferReq TransferReq;
/*----------------------------------------------------------------------------
Callback MultiParamRequest
class MultiParamRequest
----------------------------------------------------------------------------*/
namespace lima
{
......@@ -174,12 +174,19 @@ private:
----------------------------------------------------------------------------*/
class Camera::TriggerCallback : public CurlLoop::FutureRequest::Callback
{
DEB_CLASS_NAMESPC(DebModCamera, "Camera::TriggerCallback", "Eiger");
public:
TriggerCallback(Camera& cam) : m_cam(cam) {}
void status_changed(CurlLoop::FutureRequest::Status status)
void status_changed(CurlLoop::FutureRequest::Status status,
std::string error)
{
m_cam._trigger_finished(status == CurlLoop::FutureRequest::OK);
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(status, error);
bool ok = (status == CurlLoop::FutureRequest::OK);
if (!ok)
DEB_ERROR() << DEB_VAR1(error);
m_cam._trigger_finished(ok);
}
private:
Camera& m_cam;
......@@ -191,45 +198,18 @@ class Camera::InitCallback : public CurlLoop::FutureRequest::Callback
public:
InitCallback(Camera& cam) : m_cam(cam) {}
void status_changed(CurlLoop::FutureRequest::Status status)
void status_changed(CurlLoop::FutureRequest::Status status,
std::string error)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(status);
DEB_PARAM() << DEB_VAR2(status, error);
bool ok = (status == CurlLoop::FutureRequest::OK);
DEB_TRACE() << DEB_VAR1(ok);
// run post-initialization code on detached thread
AutoPtr<Pars> pars = new Pars{m_cam, ok};
pthread_t thread;
pthread_attr_t attr;
if (pthread_attr_init(&attr)) {
DEB_ERROR() << "could not init thread attr";
} else if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) {
DEB_ERROR() << "could not set thread attr detached state";
} else if (pthread_create(&thread, &attr, &thread_func, pars)) {
DEB_ERROR() << "could not create thread";
} else {
DEB_TRACE() << "thread created OK";
pars.forget();
}
if (!ok)
DEB_ERROR() << DEB_VAR1(error);
m_cam._initialization_finished(ok);
}
private:
struct Pars {
Camera& cam;
bool ok;
};
static void *thread_func(void *data)
{
DEB_STATIC_FUNCT();
AutoPtr<Pars> pars = (Pars *) data;
DEB_PARAM() << DEB_VAR1(pars->ok);
pars->cam._initialization_finished(pars->ok);
return NULL;
}
Camera& m_cam;
};
......@@ -300,8 +280,8 @@ void Camera::initialize()
CommandReq async_initialize = m_requests->get_command(Requests::INITIALIZE);
lock.unlock();
std::shared_ptr<CurlLoop::FutureRequest::Callback> cbk(new InitCallback(*this));
async_initialize->register_callback(cbk);
CurlLoop::FutureRequest::CallbackPtr cbk(new InitCallback(*this));
async_initialize->register_callback(cbk, true);
}
void Camera::_initialization_finished(bool ok)
......@@ -335,7 +315,7 @@ void Camera::prepareAcq()
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
if(m_trigger_state != IDLE)
EIGER_SYNC_CMD(Requests::DISARM);
disarm();
unsigned nb_images, nb_triggers;
switch(m_trig_mode)
......@@ -402,13 +382,15 @@ void Camera::startAcq()
if(m_trig_mode == IntTrig ||
m_trig_mode == IntTrigMult)
{
DEB_TRACE() << "Trigger start";
bool disarm_at_end = ((m_trig_mode == IntTrig) ||
(m_image_number == m_nb_frames - 1));
DEB_TRACE() << "Trigger start: " << DEB_VAR1(disarm_at_end);
CommandReq trigger = m_requests->get_command(Requests::TRIGGER);
m_trigger_state = RUNNING;
lock.unlock();
std::shared_ptr<CurlLoop::FutureRequest::Callback> cbk(new TriggerCallback(*this));
trigger->register_callback(cbk);
CurlLoop::FutureRequest::CallbackPtr cbk(new TriggerCallback(*this));
trigger->register_callback(cbk, disarm_at_end);
}
}
......@@ -861,26 +843,24 @@ void Camera::_trigger_finished(bool ok)
DEB_PARAM() << DEB_VAR1(ok);
DEB_TRACE() << "Trigger end";
if (!ok)
if(!ok)
DEB_ERROR() << "Error in trigger command";
else if(isAcquisitionFinished())
try { disarm(); }
catch (...) { ok = false; }
AutoMutex lock(m_cond.mutex());
m_trigger_state = ok ? IDLE : ERROR;
lock.unlock();
// Disarm at acq end
if(isAcquisitionFinished())
CommandReq disarm = m_requests->get_command(Requests::DISARM);
}
bool Camera::isAcquisitionFinished()
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
DEB_PARAM() << DEB_VAR3(m_trigger_state, m_trigger_state, m_image_number);
bool finished = ((m_trigger_state == IDLE) &&
((m_trig_mode != IntTrigMult) ||
(m_image_number == m_nb_frames)));
DEB_PARAM() << DEB_VAR2(m_trigger_state, m_image_number);
bool finished = ((m_trig_mode != IntTrigMult) ||
(m_image_number == m_nb_frames));
DEB_RETURN() << DEB_VAR1(finished);
return finished;
}
......
......@@ -98,7 +98,8 @@ class SavingCtrlObj::_EndDownloadCallback : public CurlLoop::FutureRequest::Call
public:
_EndDownloadCallback(SavingCtrlObj&,const std::string &filename);
virtual void status_changed(CurlLoop::FutureRequest::Status);
virtual void status_changed(CurlLoop::FutureRequest::Status,
std::string error);
private:
SavingCtrlObj& m_saving;
std::string m_filename;
......@@ -452,7 +453,7 @@ SavingCtrlObj::_EndDownloadCallback::_EndDownloadCallback(SavingCtrlObj& saving,
}
void SavingCtrlObj::_EndDownloadCallback::
status_changed(CurlLoop::FutureRequest::Status status)
status_changed(CurlLoop::FutureRequest::Status status, std::string error)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_saving.m_cond.mutex());
......@@ -460,7 +461,7 @@ status_changed(CurlLoop::FutureRequest::Status status)
{
m_saving.m_error_msg = "Failed to download file: ";
m_saving.m_error_msg += m_filename;
DEB_ERROR() << m_saving.m_error_msg;
DEB_ERROR() << m_saving.m_error_msg << ": " << error;
//Stop the polling
m_saving.m_poll_master_file = false;
m_saving.m_nb_file_transfer_started = m_saving.m_nb_file_to_watch = 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment