Commit 7ae24c5a authored by Laurent Claustre's avatar Laurent Claustre
Browse files

Merge branch 'fix-ctsaving-stream-data-race' into 'master'

Fix ctsaving stream data race

See merge request !186
parents 939726da 32ee8b7e
Pipeline #40202 passed with stages
in 60 minutes and 19 seconds
......@@ -41,6 +41,8 @@ void LIMACORE_API ClearBuffer(void *ptr, int nb_concat_frames, const FrameDim& f
struct LIMACORE_API Allocator
{
DEB_STRUCT_NAMESPC(DebModCommon, "MemUtils", "Allocator");
struct Data {
virtual ~Data() = default;
};
......@@ -54,9 +56,10 @@ struct LIMACORE_API Allocator
Allocator(Allocator&& o) : m_ref_count(0)
{
DEB_MEMBER_FUNCT();
if (o.m_ref_count != 0)
throw LIMA_COM_EXC(InvalidValue,
"Moved-from Allocator is not empty");
DEB_ERROR() << "Moved-from Allocator is not empty";
}
// Allocate a buffer of a given size and eventually return
......@@ -102,9 +105,10 @@ struct LIMACORE_API Allocator
virtual ~Allocator()
{
DEB_MEMBER_FUNCT();
if (m_ref_count != 0)
std::cerr << "Error: destroying non-empty Allocator"
<< std::endl;
DEB_ERROR() << "Error: destroying non-empty Allocator";
}
// The real resource management counter, triggered by Ref
......
......@@ -5,6 +5,7 @@
#include <iostream>
#include "lima/Constants.h"
#include "lima/Exceptions.h"
#include "processlib/Data.h"
......@@ -35,7 +36,14 @@ namespace lima
inline void alloc(int size)
{
if(!buffer || double(size) > this->size())
buffer = (char*)realloc(buffer,size);
{
char* tmp = (char*)realloc(buffer,size);
if (tmp == NULL)
throw LIMA_COM_EXC(Error, "Error in realloc: ")
<< "NULL pointer returned";
else
buffer = tmp;
}
}
inline void setParams(int fNumber,int w,int h,VideoMode m)
{
......
......@@ -82,6 +82,9 @@ Exception::Exception(Layer layer, ErrorType err_type, const string& err_desc,
if (deb_proxy)
*deb_proxy << "Exception(" << getErrType() << "): "
<< getErrDesc();
else
std::cerr << "********* Exception(" << getErrType() << "): "
<< getErrDesc();
}
Layer Exception::getLayer() const
......@@ -146,4 +149,3 @@ ostream& lima::operator <<(ostream& os, const Exception& e)
{
return os << e.getErrMsg();
}
......@@ -103,7 +103,7 @@ int lima::GetDefMaxNbBuffers(const FrameDim& frame_dim)
void lima::ClearBuffer(void *ptr, int nb_concat_frames,
const FrameDim& frame_dim)
{
memset(ptr, 0, nb_concat_frames * frame_dim.getMemSize());
memset(ptr, 0, nb_concat_frames * size_t(frame_dim.getMemSize()));
}
Allocator *Allocator::defaultAllocator()
......
......@@ -184,7 +184,7 @@ bool Cond::wait(double timeout)
struct timeval now;
struct timespec waitTimeout;
gettimeofday(&now,NULL);
waitTimeout.tv_sec = now.tv_sec + long(timeout);
waitTimeout.tv_sec = now.tv_sec + time_t(timeout);
waitTimeout.tv_nsec = (now.tv_usec * 1000) +
long((timeout - long(timeout)) * 1e9);
if(waitTimeout.tv_nsec >= 1000000000L) // Carry
......@@ -241,10 +241,8 @@ Thread::ExceptionCleanUp::~ExceptionCleanUp()
m_thread.m_exception_handled = true;
}
Thread::Thread()
Thread::Thread() : m_thread(NULL), m_started(false), m_finished(false), m_exception_handled(false), m_tid(0)
{
m_started = m_finished = m_exception_handled = false;
m_tid = 0;
pthread_attr_init(&m_thread_attr);
}
......
......@@ -70,7 +70,7 @@ namespace lima
"Control");
friend class CtAccumulation;
public:
ThresholdCallback() {};
ThresholdCallback() : m_max(0) {};
virtual ~ThresholdCallback() {};
int m_max;
......
......@@ -103,7 +103,7 @@ namespace lima
private:
typedef std::map<std::string,ModuleTypeCallback*> ModuleMap;
CtConfig(const CtConfig &other): m_ctrl(other.m_ctrl) {}
CtConfig(const CtConfig &other): m_ctrl(other.m_ctrl), m_config(NULL) {}
CtControl& m_ctrl;
libconfig::Config* m_config;
......
......@@ -267,10 +267,10 @@ public:
typedef std::map<long, FrameParameters> Frame2Params;
struct Handler
{
Handler() : m_handler(NULL) {}
Handler() : m_handler(NULL), m_nb_frames(0) {}
void* m_handler;
int m_nb_frames;
int m_nb_frames;
};
struct cmpParameters
{
......@@ -518,12 +518,14 @@ public:
void _prepare();
enum ContainerStatus {
Init, Prepare, Open,
Init, Preparing, Prepared, Open,
};
CtSaving& m_saving;
int m_idx;
// protect m_cnt_status, ensure atomic Preparing
Cond m_cond;
ContainerStatus m_cnt_status;
SaveContainer* m_save_cnt;
_SaveCBK* m_saving_cbk;
......@@ -533,6 +535,7 @@ public:
bool m_pars_dirty_flag;
bool m_active;
_CompressionCBK* m_compression_cbk;
};
friend class Stream;
......
......@@ -92,7 +92,7 @@ static const LZ4F_preferences_t lz4_preferences = {
int framesPerFile,const CtSaving::HeaderMap &header);
~FileLz4Compression();
virtual void process(Data &aData);
void _compression(const char *src,int size,ZBufferList& return_buffers);
void _compression(const char *src,size_t size,ZBufferList& return_buffers);
};
#endif // WITH_LZ4_COMPRESSION
......
......@@ -864,7 +864,7 @@ void SoftOpPeakFinder::setComputingMode(ComputingMode aComputingMode)
void SoftOpPeakFinder::getComputingMode(ComputingMode &aComputingMode) const
{
Tasks::PeakFinderTask::ComputingMode aMode;
Tasks::PeakFinderTask::ComputingMode aMode = Tasks::PeakFinderTask::MAXIMUM;
for(NameMapConstIterator i = m_task_manager.begin(); i != m_task_manager.end();++i) {
i->second.second->getComputingMode(aMode);
}
......
......@@ -172,6 +172,8 @@ protected:
private:
struct ChangeEvent {
ChangeEvent() : force(false), finished(NULL) {}
ImageStatus status;
bool force;
bool *finished;
......@@ -388,6 +390,14 @@ CtControl::CtControl(HwInterface *hw) :
CtControl::~CtControl()
{
DEB_DESTRUCTOR();
Status aStatus;
getStatus(aStatus);
if(aStatus.AcquisitionStatus == AcqRunning)
{
DEB_WARNING() << "Acquisition is still running, stopping acquisition";
stopAcq();
}
DEB_TRACE() << "Waiting for all threads to finish their tasks";
PoolThreadMgr& pool_thread_mgr = PoolThreadMgr::get();
......
......@@ -26,7 +26,7 @@
#include <unistd.h>
#include <numeric>
#ifdef __linux__
#ifdef __linux__
#include <dirent.h>
#include <sys/statvfs.h>
#else
......@@ -283,10 +283,14 @@ void CtSaving::Stream::prepare()
{
DEB_MEMBER_FUNCT();
if (m_cnt_status == Open)
m_save_cnt->close();
m_cnt_status = Init;
{
AutoMutex lock(m_cond.mutex());
if (m_cnt_status == Open) {
m_cnt_status = Init;
AutoMutexUnlock u(lock);
m_save_cnt->close();
}
}
updateParameters();
......@@ -297,27 +301,43 @@ void CtSaving::Stream::prepare()
void CtSaving::Stream::_prepare()
{
DEB_MEMBER_FUNCT();
checkWriteAccess();
m_save_cnt->prepare(m_saving.m_ctrl);
m_cnt_status = Prepare;
AutoMutex lock(m_cond.mutex());
m_cnt_status = Prepared;
}
void CtSaving::Stream::close()
{
m_save_cnt->close();
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
void CtSaving::Stream::clear()
{
m_save_cnt->clear();
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
void CtSaving::Stream::prepareWrittingFrame(long frame_nr)
{
if (m_cnt_status == Init)
_prepare();
{
AutoMutex lock(m_cond.mutex());
if (m_cnt_status == Init) {
m_cnt_status = Preparing;
{
AutoMutexUnlock u(lock);
_prepare();
}
m_cond.broadcast();
} else while (m_cnt_status == Preparing)
m_cond.wait();
}
m_save_cnt->prepareWrittingFrame(frame_nr);
}
......@@ -356,14 +376,14 @@ void CtSaving::Stream::createSaveContainer()
#ifndef WITH_NXS_SAVING
THROW_CTL_ERROR(NotSupported) << "Lima is not compiled with the nxs "
"saving option, not managed";
#endif
#endif
goto common;
case FITS:
#ifndef WITH_FITS_SAVING
THROW_CTL_ERROR(NotSupported) << "Lima is not compiled with the fits "
"saving option, not managed";
#endif
#endif
goto common;
case EDFGZ:
#ifndef WITH_Z_COMPRESSION
......@@ -473,6 +493,7 @@ void CtSaving::Stream::createSaveContainer()
m_save_cnt->setMaxConcurrentWritingTask(nb_writing_thread);
m_save_cnt->setEnableLogStat(enable_log_stat);
AutoMutex lock(m_cond.mutex());
m_cnt_status = Init;
}
......@@ -481,6 +502,8 @@ void CtSaving::Stream::writeFile(Data& data, HeaderMap& header)
DEB_MEMBER_FUNCT();
m_save_cnt->writeFile(data, header);
AutoMutex lock(m_cond.mutex());
m_cnt_status = Open;
}
......@@ -1040,7 +1063,7 @@ void CtSaving::_ReadImage(Data& image, int frameNumber)
bool CtSaving::_allStreamReady(long frame_nr)
{
DEB_MEMBER_FUNCT();
bool ready_flag = true;
for (int s = 0; ready_flag && s < m_nb_stream; ++s)
{
......@@ -1048,8 +1071,8 @@ bool CtSaving::_allStreamReady(long frame_nr)
if (stream.isActive())
ready_flag = stream.isReady(frame_nr);
}
DEB_RETURN() << DEB_VAR1(ready_flag);
DEB_RETURN() << DEB_VAR1(ready_flag);
return ready_flag;
}
void CtSaving::_waitWritingThreads()
......@@ -1564,8 +1587,8 @@ void CtSaving::frameReady(Data& aData)
aLock.unlock();
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
TaskType task_type = m_need_compression ? Compression : Save;
TaskList task_list;
_getTaskList(task_type, frame_nr, task_header, task_list);
_postTaskList(aData, task_list,
......@@ -1760,7 +1783,7 @@ void CtSaving::writeFrame(int aFrameNumber, int aNbFrames, bool synchronous)
managed_mode = getManagedMode();
}
if (managed_mode == Hardware) {
int hw_cap = m_hwsaving->getCapabilities();
if (hw_cap & HwSavingCtrlObj::MANUAL_WRITE)
......@@ -1876,7 +1899,7 @@ void CtSaving::_compressionFinished(Data& aData, Stream& stream)
_takeHeader(header_it, header, false);
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, header, task_list);
......@@ -2045,7 +2068,7 @@ void CtSaving::_prepare()
void CtSaving::_stop()
{
DEB_MEMBER_FUNCT();
// Get the last image acquired counter
CtControl::ImageStatus img_status;
m_ctrl.getImageStatus(img_status);
......@@ -2058,7 +2081,7 @@ void CtSaving::_stop()
// Update the number of frames so that SaveContainer::writeFile() will properly
// call SaveContainer::close()
stream.updateNbFrames(img_status.LastImageAcquired + 1);
// Clean up the frame parameters so that _allStreamReady() return true
stream.cleanRemainingFrames(img_status.LastImageAcquired + 1);
}
......@@ -2070,7 +2093,7 @@ void CtSaving::_stop()
void CtSaving::_close()
{
DEB_MEMBER_FUNCT();
if (_allStreamReady(-1))
{
for (int s = 0; s < m_nb_stream; ++s)
......@@ -2132,7 +2155,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
catch (std::ios_base::failure & error)
{
DEB_ERROR() << "Write failed :" << error.what();
#ifdef __linux__
#ifdef __linux__
/** struct statvfs {
unsigned long f_bsize; // file system block size
unsigned long f_frsize; //fragment size
......@@ -2499,7 +2522,7 @@ void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_TRACE() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_nb_frames_to_write = last_acquired_frame_nr;
}
......@@ -2531,7 +2554,7 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
else if (it->second.m_threadable)
{
DEB_TRACE() << DEB_VAR2(m_running_writing_task, m_max_writing_task);
ready = (m_running_writing_task + 1 <= m_max_writing_task);
}
else
......@@ -2574,7 +2597,7 @@ void CtSaving::SaveContainer::setReady(long frame_nr)
void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
Frame2Params::iterator i = m_frame_params.find(frame_nr);
if (i == m_frame_params.end())
......
......@@ -158,24 +158,24 @@ void FileLz4Compression::process(Data &aData)
m_container._setBuffer(aData.frameNumber,std::move(aBufferListPt));
}
void FileLz4Compression::_compression(const char *src,int size,
void FileLz4Compression::_compression(const char *src, size_t size,
ZBufferList& return_buffers)
{
DEB_MEMBER_FUNCT();
int buffer_size = LZ4F_compressFrameBound(size,&lz4_preferences);
size_t buffer_size = LZ4F_compressFrameBound(size,&lz4_preferences);
buffer_size += LZ4_HEADER_SIZE + LZ4_FOOTER_SIZE;
return_buffers.emplace_back(buffer_size);
ZBuffer& newBuffer = return_buffers.back();
char* buffer = (char*)newBuffer.ptr();
int offset = LZ4F_compressBegin(m_ctx,buffer,
size_t offset = LZ4F_compressBegin(m_ctx,buffer,
buffer_size,&lz4_preferences);
if(LZ4F_isError(offset))
THROW_CTL_ERROR(Error) << "Failed to start compression: " << DEB_VAR1(offset);
int error_code = LZ4F_compressUpdate(m_ctx,buffer + offset,buffer_size - offset,
size_t error_code = LZ4F_compressUpdate(m_ctx,buffer + offset,buffer_size - offset,
src,size,NULL);
if(LZ4F_isError(error_code))
THROW_CTL_ERROR(Error) << "Compression Failed: "
......
......@@ -43,6 +43,7 @@ struct SaveContainerHdf5::_File
m_in_append(false),
m_dataset_extended(false),
m_entry_index(0),
m_nb_frames(0),
m_frame_cnt(0)
{}
......@@ -198,6 +199,9 @@ SaveContainerHdf5::~SaveContainerHdf5() {
void SaveContainerHdf5::_prepare(CtControl& control) {
DEB_MEMBER_FUNCT();
// HDF5 garbage collecor
H5garbage_collect();
m_ct_image = control.image();
m_ct_acq = control.acquisition();
m_hw_int = control.hwInterface();
......@@ -361,13 +365,13 @@ void* SaveContainerHdf5::_open(const std::string &filename, std::ios_base::openm
time_t now;
time(&now);
char buf[sizeof("2011-10-08T07:07:09Z")];
#ifdef WIN32
struct tm gmtime_now;
#ifdef WIN32
gmtime_s(&gmtime_now, &now);
strftime(buf, sizeof(buf), "%FT%TZ", &gmtime_now);
#else
strftime(buf, sizeof(buf), "%FT%TZ", gmtime(&now));
gmtime_r(&now, &gmtime_now);
#endif
strftime(buf, sizeof(buf), "%FT%TZ", &gmtime_now);
string stime = string(buf);
write_h5_attribute(file->m_file,"file_time",stime);
......@@ -526,13 +530,13 @@ void SaveContainerHdf5::_close(void* f) {
time_t now;
time(&now);
char buf[sizeof("2011-10-08T07:07:09Z")];
#ifdef WIN32
struct tm gmtime_now;
#ifdef WIN32
gmtime_s(&gmtime_now, &now);
strftime(buf, sizeof(buf), "%FT%TZ", &gmtime_now);
#else
strftime(buf, sizeof(buf), "%FT%TZ", gmtime(&now));
gmtime_r(&now, &gmtime_now);
#endif
strftime(buf, sizeof(buf), "%FT%TZ", &gmtime_now);
string etime = string(buf);
write_h5_dataset(file->m_entry,"end_time",etime);
}
......
......@@ -113,8 +113,8 @@ void SoftBufferAllocMgr::allocBuffers(int nb_buffers,
try {
BufferList& bl = m_buffer_list;
if (to_alloc > 0) {
bl.resize(nb_buffers, MemBuffer(m_allocator));
bl.resize(nb_buffers, MemBuffer(m_allocator));
if (to_alloc > 0) {
DEB_TRACE() << "Allocating " << to_alloc << " buffers";
for (int i = curr_nb_buffers; i < nb_buffers; i++)
bl[i].alloc(frame_size);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment