Commit 2875a914 authored by Samuel Debionne's avatar Samuel Debionne

Fix deadlock resulting in saving task stall and consequently AcqStatus being...

Fix deadlock resulting in saving task stall and consequently AcqStatus being AcqRunning indefinitely
Fix _calcAcqStatus when acq_nb_frames == 0
parent fd7933dc
......@@ -692,6 +692,8 @@ void CtControl::getStatus(Status& status) const
status.AcquisitionStatus = AcqFault;
else if(status.AcquisitionStatus == AcqReady)
status.AcquisitionStatus = aHwStatus.acq;
DEB_RETURN() << DEB_VAR1(status);
}
/** @brief This function is DEPRECATED. Use stopAcqAsync instead
......@@ -746,6 +748,8 @@ void CtControl::stopAcqAsync(AcqStatus acq_status, ErrorCode error_code,
*/
void CtControl::_updateImageStatusThreads(bool force)
{
DEB_MEMBER_FUNCT();
ReadWriteLock::ReadGuard guard(m_img_status_thread_list_lock);
for(ImageStatusThreadList::iterator i = m_img_status_thread_list.begin();
i != m_img_status_thread_list.end();++i)
......@@ -758,18 +762,23 @@ void CtControl::_updateImageStatusThreads(bool force)
void CtControl::_calcAcqStatus()
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
AcqStatus& acq_status = m_status.AcquisitionStatus;
DEB_TRACE() << DEB_VAR1(acq_status);
if((acq_status != AcqRunning) && (acq_status != AcqFault))
return;
const ImageStatus& img_cntrs = m_status.ImageCounters;
int acq_nb_frames;
m_ct_acq->getAcqNbFrames(acq_nb_frames);
long last_frame = (m_running ? (acq_nb_frames - 1) :
long last_frame = ((m_running && (acq_nb_frames > 0)) ? (acq_nb_frames - 1) :
img_cntrs.LastImageAcquired);
DEB_TRACE() << DEB_VAR2(last_frame, img_cntrs);
bool hw_acq_end = (img_cntrs.LastImageAcquired == last_frame);
bool img_op_end = (img_cntrs.LastImageReady == last_frame);
bool cnt_op_end = (!m_op_ext_sink_task_active ||
......@@ -777,8 +786,8 @@ void CtControl::_calcAcqStatus()
bool save_end = (!m_autosave || (img_cntrs.LastImageSaved) == last_frame);
bool acq_end = (hw_acq_end && img_op_end && cnt_op_end && save_end);
DEB_TRACE() << DEB_VAR5(hw_acq_end, img_op_end, cnt_op_end, save_end,
acq_end);
DEB_TRACE() << DEB_VAR5(hw_acq_end, img_op_end, cnt_op_end, save_end, acq_end);
if(!acq_end)
return;
......@@ -1320,10 +1329,14 @@ bool CtControl::_checkOverrun(Data& aData, AutoMutex& l)
// ----------------------------------------------------------------------------
// Struct ImageStatus
// ----------------------------------------------------------------------------
CtControl::ImageStatus::ImageStatus()
CtControl::ImageStatus::ImageStatus() :
LastImageAcquired(-1),
LastBaseImageReady(-1),
LastImageReady(-1),
LastImageSaved(-1),
LastCounterReady(-1)
{
DEB_CONSTRUCTOR();
reset();
}
CtControl::ImageStatus::ImageStatus(long lastImgAcq, long lastBaseImgReady,
......@@ -1333,7 +1346,7 @@ CtControl::ImageStatus::ImageStatus(long lastImgAcq, long lastBaseImgReady,
LastImageReady(lastImgReady), LastImageSaved(lastImgSaved),
LastCounterReady(lastCntReady)
{
DEB_CONSTRUCTOR();
DEB_CONSTRUCTOR();
}
void CtControl::ImageStatus::reset()
......
......@@ -1039,6 +1039,8 @@ void CtSaving::_ReadImage(Data& image, int frameNumber)
bool CtSaving::_allStreamReady(long frame_nr)
{
DEB_MEMBER_FUNCT();
bool ready_flag = true;
for (int s = 0; ready_flag && s < m_nb_stream; ++s)
{
......@@ -1046,6 +1048,8 @@ bool CtSaving::_allStreamReady(long frame_nr)
if (stream.isActive())
ready_flag = stream.isReady(frame_nr);
}
DEB_RETURN() << DEB_VAR1(ready_flag);
return ready_flag;
}
void CtSaving::_waitWritingThreads()
......@@ -1369,13 +1373,15 @@ void CtSaving::_validateFrameHeader(long frame_nr,
bool keep_header = m_need_compression;
_takeHeader(aHeaderIter, task_header, keep_header);
if (!m_need_compression)
m_frame_datas.erase(frame_iter);
aLock.unlock();
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
_getTaskList(task_type, frame_nr, task_header, task_list);
if (!m_need_compression) {
m_frame_datas.erase(frame_iter);
}
aLock.unlock();
_postTaskList(aData, task_list,
m_need_compression ? COMPRESSION_PRIORITY : SAVING_PRIORITY);
}
......@@ -1555,11 +1561,12 @@ void CtSaving::frameReady(Data& aData)
bool keep_header = m_need_compression;
_takeHeader(aHeaderIter, task_header, keep_header);
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
aLock.unlock();
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
_getTaskList(task_type, frame_nr, task_header, task_list);
aLock.unlock();
_postTaskList(aData, task_list,
m_need_compression ? COMPRESSION_PRIORITY : SAVING_PRIORITY);
}
......@@ -1867,10 +1874,11 @@ void CtSaving::_compressionFinished(Data& aData, Stream& stream)
FrameHeaderMap::iterator header_it = m_frame_headers.find(frame_nr);
_takeHeader(header_it, header, false);
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, header, task_list);
aLock.unlock();
_postTaskList(aData, task_list, SAVING_PRIORITY);
}
......@@ -1906,22 +1914,25 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream)
for (FrameMap::iterator nextDataIter = m_frame_datas.begin();
nextDataIter != m_frame_datas.end(); ++nextDataIter)
{
FrameHeaderMap::iterator aHeaderIter = m_frame_headers.find(nextDataIter->first);
frame_nr = nextDataIter->first;
FrameHeaderMap::iterator aHeaderIter = m_frame_headers.find(frame_nr);
bool header_available = (aHeaderIter != m_frame_headers.end());
bool can_save = _allStreamReady(nextDataIter->first);
bool can_save = _allStreamReady(frame_nr);
if (!can_save ||
((saving_mode == AutoHeader) && !header_available))
continue;
Data aNewData = nextDataIter->second;
m_frame_datas.erase(nextDataIter);
HeaderMap task_header;
_takeHeader(aHeaderIter, task_header, false);
TaskList task_list;
_getTaskList(Save, nextDataIter->first, task_header, task_list);
m_frame_datas.erase(nextDataIter);
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, task_header, task_list);
_postTaskList(aNewData, task_list, SAVING_PRIORITY);
break;
}
......@@ -2029,9 +2040,11 @@ void CtSaving::_prepare()
m_saving_stop = false;
}
// CtSaving::_stop is only called from CtControl::stopAcq()
// CtSaving::_stop is only called from CtControl::_stopAcq()
void CtSaving::_stop()
{
DEB_MEMBER_FUNCT();
// Get the last image acquired counter
CtControl::ImageStatus img_status;
m_ctrl.getImageStatus(img_status);
......@@ -2055,6 +2068,8 @@ void CtSaving::_stop()
void CtSaving::_close()
{
DEB_MEMBER_FUNCT();
if (_allStreamReady(-1))
{
for (int s = 0; s < m_nb_stream; ++s)
......@@ -2481,6 +2496,9 @@ void CtSaving::SaveContainer::prepare(CtControl& ct)
void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_TRACE() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_nb_frames_to_write = last_acquired_frame_nr;
}
......@@ -2488,34 +2506,46 @@ void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr)
bool CtSaving::SaveContainer::isReady(long frame_nr) const
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame_nr);
AutoMutex lock(m_cond.mutex());
// mean all writing task
bool ready;
// mean all writing tasks
if (frame_nr < 0)
{
bool ready = m_frame_params.empty();
ready = m_frame_params.empty();
for (Frame2Params::const_iterator i = m_frame_params.begin();
ready && i != m_frame_params.end(); ++i)
ready = !i->second.m_running;
return ready;
}
Frame2Params::const_iterator i = m_frame_params.find(frame_nr);
if (i == m_frame_params.end())
return m_frame_params.empty(); // if no task is running then ready
if (i->second.m_threadable)
else
{
DEB_TRACE() << DEB_VAR2(m_running_writing_task, m_max_writing_task);
return m_running_writing_task + 1 <= m_max_writing_task;
Frame2Params::const_iterator it = m_frame_params.find(frame_nr);
if (it == m_frame_params.end())
{
// if no task is running then ready
ready = m_frame_params.empty();
}
else if (it->second.m_threadable)
{
DEB_TRACE() << DEB_VAR2(m_running_writing_task, m_max_writing_task);
ready = (m_running_writing_task + 1 <= m_max_writing_task);
}
else
// ready if we are the next (the first)
ready = (it == m_frame_params.begin());
}
return i == m_frame_params.begin(); // ready if we are the next (the first)
DEB_RETURN() << DEB_VAR1(ready);
return ready;
}
void CtSaving::SaveContainer::cleanRemainingFrames(long last_acquired_frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_frame_params.erase(
......@@ -2526,6 +2556,7 @@ void CtSaving::SaveContainer::cleanRemainingFrames(long last_acquired_frame_nr)
void CtSaving::SaveContainer::setReady(long frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame_nr);
AutoMutex lock(m_cond.mutex());
// mean all frames
......@@ -2541,6 +2572,8 @@ void CtSaving::SaveContainer::setReady(long frame_nr)
void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
Frame2Params::iterator i = m_frame_params.find(frame_nr);
if (i == m_frame_params.end())
......@@ -2741,6 +2774,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params,
if (m_log_stat_enable)
fflush(m_log_stat_file);
}
void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers)
{
AutoMutex aLock(m_lock);
......
......@@ -252,11 +252,16 @@ void SaveContainerHdf5::_prepare(CtControl& control) {
m_acq_nbframes = m_ct_parameters.acq_nbframes;
// If the acquisition requests less frame than the max per file
// we will only create a dataset with size equal to the nb. of acquired frames
if (m_frames_per_file > m_acq_nbframes)
m_frames_per_file = m_acq_nbframes;
m_max_nb_files = (m_acq_nbframes + m_frames_per_file - 1)/ m_frames_per_file;
// If not a continuous acquisition
if (m_acq_nbframes > 0) {
// If the acquisition requests less frame than the max per file
// we will only create a dataset with size equal to the nb. of acquired frames
if (m_frames_per_file > m_acq_nbframes)
m_frames_per_file = m_acq_nbframes;
m_max_nb_files = (m_acq_nbframes + m_frames_per_file - 1) / m_frames_per_file;
}
else
m_max_nb_files = 0;
m_file_cnt = 0;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment