Commit a7101e12 authored by Samuel Debionne's avatar Samuel Debionne Committed by Samuel Debionne
Browse files

Fix data race in CtSaving::Stream::m_cnt_status

parent 4df392e9
......@@ -518,12 +518,14 @@ public:
void _prepare();
enum ContainerStatus {
Init, Prepare, Open,
Init, Preparing, Prepared, Open,
};
CtSaving& m_saving;
int m_idx;
// protect m_cnt_status, ensure atomic Preparing
Cond m_cond;
ContainerStatus m_cnt_status;
SaveContainer* m_save_cnt;
_SaveCBK* m_saving_cbk;
......@@ -533,6 +535,7 @@ public:
bool m_pars_dirty_flag;
bool m_active;
_CompressionCBK* m_compression_cbk;
};
friend class Stream;
......
......@@ -26,7 +26,7 @@
#include <unistd.h>
#include <numeric>
#ifdef __linux__
#ifdef __linux__
#include <dirent.h>
#include <sys/statvfs.h>
#else
......@@ -283,10 +283,14 @@ void CtSaving::Stream::prepare()
{
DEB_MEMBER_FUNCT();
if (m_cnt_status == Open)
m_save_cnt->close();
m_cnt_status = Init;
{
AutoMutex lock(m_cond.mutex());
if (m_cnt_status == Open) {
m_cnt_status = Init;
AutoMutexUnlock u(lock);
m_save_cnt->close();
}
}
updateParameters();
......@@ -297,27 +301,43 @@ void CtSaving::Stream::prepare()
void CtSaving::Stream::_prepare()
{
DEB_MEMBER_FUNCT();
checkWriteAccess();
m_save_cnt->prepare(m_saving.m_ctrl);
m_cnt_status = Prepare;
AutoMutex lock(m_cond.mutex());
m_cnt_status = Prepared;
}
void CtSaving::Stream::close()
{
m_save_cnt->close();
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
void CtSaving::Stream::clear()
{
m_save_cnt->clear();
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
void CtSaving::Stream::prepareWrittingFrame(long frame_nr)
{
if (m_cnt_status == Init)
_prepare();
{
AutoMutex lock(m_cond.mutex());
if (m_cnt_status == Init) {
m_cnt_status = Preparing;
{
AutoMutexUnlock u(lock);
_prepare();
}
m_cond.broadcast();
} else while (m_cnt_status == Preparing)
m_cond.wait();
}
m_save_cnt->prepareWrittingFrame(frame_nr);
}
......@@ -356,14 +376,14 @@ void CtSaving::Stream::createSaveContainer()
#ifndef WITH_NXS_SAVING
THROW_CTL_ERROR(NotSupported) << "Lima is not compiled with the nxs "
"saving option, not managed";
#endif
#endif
goto common;
case FITS:
#ifndef WITH_FITS_SAVING
THROW_CTL_ERROR(NotSupported) << "Lima is not compiled with the fits "
"saving option, not managed";
#endif
#endif
goto common;
case EDFGZ:
#ifndef WITH_Z_COMPRESSION
......@@ -473,6 +493,7 @@ void CtSaving::Stream::createSaveContainer()
m_save_cnt->setMaxConcurrentWritingTask(nb_writing_thread);
m_save_cnt->setEnableLogStat(enable_log_stat);
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
......@@ -481,6 +502,8 @@ void CtSaving::Stream::writeFile(Data& data, HeaderMap& header)
DEB_MEMBER_FUNCT();
m_save_cnt->writeFile(data, header);
AutoMutex lock(m_cond.mutex());
m_cnt_status = Open;
}
......@@ -1040,7 +1063,7 @@ void CtSaving::_ReadImage(Data& image, int frameNumber)
bool CtSaving::_allStreamReady(long frame_nr)
{
DEB_MEMBER_FUNCT();
bool ready_flag = true;
for (int s = 0; ready_flag && s < m_nb_stream; ++s)
{
......@@ -1048,8 +1071,8 @@ bool CtSaving::_allStreamReady(long frame_nr)
if (stream.isActive())
ready_flag = stream.isReady(frame_nr);
}
DEB_RETURN() << DEB_VAR1(ready_flag);
DEB_RETURN() << DEB_VAR1(ready_flag);
return ready_flag;
}
void CtSaving::_waitWritingThreads()
......@@ -1564,8 +1587,8 @@ void CtSaving::frameReady(Data& aData)
aLock.unlock();
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
_getTaskList(task_type, frame_nr, task_header, task_list);
_postTaskList(aData, task_list,
......@@ -1760,7 +1783,7 @@ void CtSaving::writeFrame(int aFrameNumber, int aNbFrames, bool synchronous)
managed_mode = getManagedMode();
}
if (managed_mode == Hardware) {
int hw_cap = m_hwsaving->getCapabilities();
if (hw_cap & HwSavingCtrlObj::MANUAL_WRITE)
......@@ -1876,7 +1899,7 @@ void CtSaving::_compressionFinished(Data& aData, Stream& stream)
_takeHeader(header_it, header, false);
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, header, task_list);
......@@ -2045,7 +2068,7 @@ void CtSaving::_prepare()
void CtSaving::_stop()
{
DEB_MEMBER_FUNCT();
// Get the last image acquired counter
CtControl::ImageStatus img_status;
m_ctrl.getImageStatus(img_status);
......@@ -2058,7 +2081,7 @@ void CtSaving::_stop()
// Update the number of frames so that SaveContainer::writeFile() will properly
// call SaveContainer::close()
stream.updateNbFrames(img_status.LastImageAcquired + 1);
// Clean up the frame parameters so that _allStreamReady() return true
stream.cleanRemainingFrames(img_status.LastImageAcquired + 1);
}
......@@ -2070,7 +2093,7 @@ void CtSaving::_stop()
void CtSaving::_close()
{
DEB_MEMBER_FUNCT();
if (_allStreamReady(-1))
{
for (int s = 0; s < m_nb_stream; ++s)
......@@ -2132,7 +2155,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
catch (std::ios_base::failure & error)
{
DEB_ERROR() << "Write failed :" << error.what();
#ifdef __linux__
#ifdef __linux__
/** struct statvfs {
unsigned long f_bsize; // file system block size
unsigned long f_frsize; //fragment size
......@@ -2499,7 +2522,7 @@ void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_TRACE() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_nb_frames_to_write = last_acquired_frame_nr;
}
......@@ -2531,7 +2554,7 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
else if (it->second.m_threadable)
{
DEB_TRACE() << DEB_VAR2(m_running_writing_task, m_max_writing_task);
ready = (m_running_writing_task + 1 <= m_max_writing_task);
}
else
......@@ -2574,7 +2597,7 @@ void CtSaving::SaveContainer::setReady(long frame_nr)
void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
Frame2Params::iterator i = m_frame_params.find(frame_nr);
if (i == m_frame_params.end())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment