From be3c1fe50968292d3e4796bccaf7d83234ed97c0 Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Tue, 23 Jun 2020 15:08:23 +0200 Subject: [PATCH 01/10] CtSaving: refactoring: * Better identification of critical sections * Do not mark frame as saved until container is closed * Include saving_mode when reporting saving errors * Remove frame from m_frame_datas when scheduling compression in AutoHeader * Improve container opening and closing --- control/include/lima/CtSaving.h | 3 + control/src/CtSaving.cpp | 293 ++++++++++++++++---------------- 2 files changed, 150 insertions(+), 146 deletions(-) diff --git a/control/include/lima/CtSaving.h b/control/include/lima/CtSaving.h index ee373f61..aaa742e5 100644 --- a/control/include/lima/CtSaving.h +++ b/control/include/lima/CtSaving.h @@ -362,6 +362,9 @@ public: int m_written_frames; Stream& m_stream; private: + + void close(const Params2Handler::iterator& it, AutoMutex& l); + StatisticsType m_statistic; int m_statistic_size; bool m_log_stat_enable; diff --git a/control/src/CtSaving.cpp b/control/src/CtSaving.cpp index 8bd4bed1..01c00469 100755 --- a/control/src/CtSaving.cpp +++ b/control/src/CtSaving.cpp @@ -1397,8 +1397,7 @@ void CtSaving::_validateFrameHeader(long frame_nr, bool keep_header = m_need_compression; _takeHeader(aHeaderIter, task_header, keep_header); - if (!m_need_compression) - m_frame_datas.erase(frame_iter); + m_frame_datas.erase(frame_iter); aLock.unlock(); @@ -1921,9 +1920,8 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream) //@todo check if the frame is still available if (m_end_cbk) { - aLock.unlock(); + AutoMutexUnlock u(aLock); m_end_cbk->finished(aData); - aLock.lock(); } SavingMode saving_mode = getAcqSavingMode(); @@ -1935,31 +1933,32 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream) return; } - for (FrameMap::iterator nextDataIter = m_frame_datas.begin(); - nextDataIter != m_frame_datas.end(); ++nextDataIter) + FrameHeaderMap::iterator aHeaderIter; + FrameMap::iterator nextDataIter, end = m_frame_datas.end(); + for (nextDataIter = m_frame_datas.begin(); nextDataIter != end; ++nextDataIter) { frame_nr = nextDataIter->first; - FrameHeaderMap::iterator aHeaderIter = m_frame_headers.find(frame_nr); + aHeaderIter = m_frame_headers.find(frame_nr); bool header_available = (aHeaderIter != m_frame_headers.end()); bool can_save = _allStreamReady(frame_nr); - if (!can_save || - ((saving_mode == AutoHeader) && !header_available)) - continue; + if (can_save && ((saving_mode == AutoFrame) || header_available)) + break; + } + if (nextDataIter == end) + return; - Data aNewData = nextDataIter->second; - m_frame_datas.erase(nextDataIter); + Data aNewData = nextDataIter->second; + m_frame_datas.erase(nextDataIter); - HeaderMap task_header; - _takeHeader(aHeaderIter, task_header, false); + HeaderMap task_header; + _takeHeader(aHeaderIter, task_header, false); - aLock.unlock(); + aLock.unlock(); - TaskList task_list; - _getTaskList(Save, frame_nr, task_header, task_list); + TaskList task_list; + _getTaskList(Save, frame_nr, task_header, task_list); - _postTaskList(aNewData, task_list, SAVING_PRIORITY); - break; - } + _postTaskList(aNewData, task_list, SAVING_PRIORITY); } /** @brief this methode set the error saving status in CtControl @@ -1972,17 +1971,15 @@ void CtSaving::_setSavingError(CtControl::ErrorCode anErrorCode) if (saving_mode == Manual) return; - AutoMutex aLock(m_ctrl.m_cond.mutex()); - if (m_ctrl.m_status.AcquisitionStatus != AcqFault) { - m_ctrl.m_status.AcquisitionStatus = AcqFault; - m_ctrl.m_status.Error = anErrorCode; - - DEB_ERROR() << DEB_VAR1(m_ctrl.m_status); + AutoMutex aLock(m_ctrl.m_cond.mutex()); + if (m_ctrl.m_status.AcquisitionStatus != AcqFault) { + m_ctrl.m_status.AcquisitionStatus = AcqFault; + m_ctrl.m_status.Error = anErrorCode; + DEB_ERROR() << DEB_VAR2(m_ctrl.m_status, saving_mode); + } } - aLock.unlock(); - m_ctrl.stopAcq(); } /** @brief preparing new acquisition @@ -2140,11 +2137,11 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) Frame2Params::iterator fpars = m_frame_params.find(frameId); if (fpars == m_frame_params.end()) THROW_CTL_ERROR(Error) << "Can't find saving parameters for frame" - << DEB_VAR1(frameId); + << DEB_VAR1(frameId); + lock.unlock(); FrameParameters& frame_par = fpars->second; - const CtSaving::Parameters pars = frame_par.m_pars; - lock.unlock(); + const CtSaving::Parameters& pars = frame_par.m_pars; long write_size; Params2Handler::value_type par_handler = open(frame_par); @@ -2208,13 +2205,10 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) } lock.lock(); - ++m_written_frames; - bool acq_end = (m_written_frames == m_nb_frames_to_write); - - m_frame_params.erase(frameId); - --m_running_writing_task; + bool acq_end = (++m_written_frames == m_nb_frames_to_write); lock.unlock(); + // close before marking that we have finished the frame if (pars.overwritePolicy != MultiSet || acq_end) // Close at the end { try { @@ -2226,6 +2220,11 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) } } + lock.lock(); + m_frame_params.erase(fpars); + --m_running_writing_task; + lock.unlock(); + Timestamp end_write = Timestamp::now(); Timestamp diff = end_write - start_write; @@ -2377,10 +2376,11 @@ void CtSaving::SaveContainer::getStatistic(std::list& writing_speed, { - - AutoMutex aLock = AutoMutex(m_cond.mutex()); - StatisticsType copy = m_statistic; - aLock.unlock(); + StatisticsType copy; + { + AutoMutex aLock = AutoMutex(m_cond.mutex()); + copy = m_statistic; + } StatisticsType::const_iterator next = copy.begin(); if (next != copy.end()) @@ -2539,9 +2539,9 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const if (frame_nr < 0) { ready = m_frame_params.empty(); - for (Frame2Params::const_iterator i = m_frame_params.begin(); - ready && i != m_frame_params.end(); ++i) - ready = !i->second.m_running; + for (Frame2Params::const_iterator it = m_frame_params.begin(); + ready && it != m_frame_params.end(); ++it) + ready = !it->second.m_running; } else { @@ -2549,7 +2549,7 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const if (it == m_frame_params.end()) { // if no task is running then ready - ready = m_frame_params.empty(); + ready = m_frame_params.empty(); } else if (it->second.m_threadable) { @@ -2657,101 +2657,116 @@ void CtSaving::SaveContainer::setMaxConcurrentWritingTask(int nb_thread) << DEB_VAR1(nb_thread); m_max_writing_task = nb_thread; } + CtSaving::SaveContainer::Params2Handler::value_type CtSaving::SaveContainer::open(FrameParameters& fpars) { DEB_MEMBER_FUNCT(); - AutoMutex lock(m_cond.mutex()); - CtSaving::Parameters& pars = fpars.m_pars; - Params2Handler::iterator handler = m_params_handler.find(pars); - - if (handler != m_params_handler.end()) - return Params2Handler::value_type(handler->first, handler->second); - else { - lock.unlock(); - - std::string aFileName = pars.directory + DIR_SEPARATOR + pars.prefix; - long index = pars.nextNumber; - char idx[64]; - if (index < 0) index = 0; - snprintf(idx, sizeof(idx), pars.indexFormat.c_str(), index); - aFileName += idx; - aFileName += pars.suffix; + AutoMutex lock(m_cond.mutex()); + Params2Handler::iterator handler = m_params_handler.find(pars); + if (handler != m_params_handler.end()) + return *handler; + } - DEB_TRACE() << DEB_VAR1(aFileName); + std::string aFileName = pars.directory + DIR_SEPARATOR + pars.prefix; + long index = pars.nextNumber; + char idx[64]; + if (index < 0) index = 0; + snprintf(idx, sizeof(idx), pars.indexFormat.c_str(), index); + aFileName += idx; + aFileName += pars.suffix; - if (pars.overwritePolicy == Abort && - !access(aFileName.c_str(), R_OK)) - { - m_stream.setSavingError(CtControl::SaveOverwriteError); - std::string output; - output = "Try to over write file: " + aFileName; - THROW_CTL_ERROR(Error) << output; - } - std::ios_base::openmode openFlags = std::ios_base::out | std::ios_base::binary; - if (pars.overwritePolicy == Append || - pars.overwritePolicy == MultiSet) - openFlags |= std::ios_base::app; - else if (pars.overwritePolicy == Overwrite) - openFlags |= std::ios_base::trunc; - - std::string error_desc; - Handler handler; - for (int nbTry = 0; nbTry < 5; ++nbTry) - { - try { - handler.m_handler = _open(aFileName, openFlags); - } - catch (std::ios_base::failure & error) { - error_desc = error.what(); - DEB_WARNING() << "Could not open " << aFileName << ": " - << error_desc; - } - catch (...) { - error_desc = "Unknown error"; - DEB_WARNING() << "Could not open " << aFileName << ": " - << error_desc; - } + DEB_TRACE() << DEB_VAR1(aFileName); - if (!handler.m_handler) - { - std::string output; + if (pars.overwritePolicy == Abort && + !access(aFileName.c_str(), R_OK)) + { + m_stream.setSavingError(CtControl::SaveOverwriteError); + std::string output; + output = "Try to over write file: " + aFileName; + THROW_CTL_ERROR(Error) << output; + } + std::ios_base::openmode openFlags = std::ios_base::out | std::ios_base::binary; + if (pars.overwritePolicy == Append || + pars.overwritePolicy == MultiSet) + openFlags |= std::ios_base::app; + else if (pars.overwritePolicy == Overwrite) + openFlags |= std::ios_base::trunc; - if (access(pars.directory.c_str(), W_OK)) - { - m_stream.setSavingError(CtControl::SaveAccessError); - output = "Can not write in directory: " + pars.directory; - THROW_CTL_ERROR(Error) << output; - } - } - else - { - DEB_TRACE() << "Open file: " << aFileName; - handler.m_nb_frames = pars.framesPerFile; - lock.lock(); - Params2Handler::value_type map_pair(pars, handler); - std::pair result = - m_params_handler.insert(map_pair); - return Params2Handler::value_type(result.first->first, - result.first->second); - } + std::string error_desc; + Handler handler; + for (int nbTry = 0; !handler.m_handler && (nbTry < 5); ++nbTry) + { + try { + handler.m_handler = _open(aFileName, openFlags); + } + catch (std::ios_base::failure & error) { + error_desc = error.what(); + DEB_WARNING() << "Could not open " << aFileName << ": " + << error_desc; + } + catch (...) { + error_desc = "Unknown error"; + DEB_WARNING() << "Could not open " << aFileName << ": " + << error_desc; } - if (!handler.m_handler) + if (!handler.m_handler && access(pars.directory.c_str(), W_OK)) { - m_stream.setSavingError(CtControl::SaveOpenError); - std::string output; - output = "Failure opening " + aFileName; - if (!error_desc.empty()) - output += ": " + error_desc; + m_stream.setSavingError(CtControl::SaveAccessError); + std::string output = "Can not write in directory: " + pars.directory; THROW_CTL_ERROR(Error) << output; } } - // we can't reach this line (normally) just for compiler - return Params2Handler::value_type(CtSaving::Parameters(), Handler()); + + if (!handler.m_handler) + { + m_stream.setSavingError(CtControl::SaveOpenError); + std::string output; + output = "Failure opening " + aFileName; + if (!error_desc.empty()) + output += ": " + error_desc; + THROW_CTL_ERROR(Error) << output; + } + + DEB_TRACE() << "Open file: " << aFileName; + handler.m_nb_frames = pars.framesPerFile; + Params2Handler::value_type map_pair(pars, handler); + bool ok; + { + AutoMutex lock(m_cond.mutex()); + ok = m_params_handler.insert(map_pair).second; + } + if (!ok) { + _close(handler.m_handler); + THROW_CTL_ERROR(Error) << "Error inserting handle"; + } + return map_pair; +} + +inline void CtSaving::SaveContainer::close(const Params2Handler::iterator& it, AutoMutex& l) +{ + void* raw_handler = it->second.m_handler; + if (raw_handler == NULL) + return; + + it->second.m_handler = NULL; + + { + AutoMutexUnlock u(l); + _close(raw_handler); + } + + Parameters& pars = m_stream.getParameters(Acq); + if ((pars.overwritePolicy != MultiSet) && + (pars.overwritePolicy != Append)) { + int nextNumber = it->first.nextNumber + 1; + if (pars.nextNumber < nextNumber) + pars.nextNumber = nextNumber; + } } void CtSaving::SaveContainer::close(const CtSaving::Parameters* params, @@ -2762,34 +2777,20 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params, AutoMutex aLock(m_cond.mutex()); if (!params) // close all { - for (Params2Handler::iterator i = m_params_handler.begin(); - i != m_params_handler.end(); ++i) - { - if (i->second.m_handler) - _close(i->second.m_handler); - } + for (Params2Handler::iterator it = m_params_handler.begin(); + it != m_params_handler.end(); ++it) + close(it, aLock); m_params_handler.clear(); } else { - Params2Handler::iterator handler = m_params_handler.find(*params); - if (force_close || !--handler->second.m_nb_frames) - { - void* raw_handler = handler->second.m_handler; - const Parameters frame_pars = handler->first; - m_params_handler.erase(handler); - aLock.unlock(); - _close(raw_handler); - - Parameters& pars = m_stream.getParameters(Acq); - if (pars.overwritePolicy != MultiSet && - pars.overwritePolicy != Append) - { - int nextNumber = frame_pars.nextNumber + 1; - aLock.lock(); - if (pars.nextNumber < nextNumber) - pars.nextNumber = nextNumber; - } + Params2Handler::iterator it = m_params_handler.find(*params); + if (it == m_params_handler.end()) + THROW_CTL_ERROR(Error) << "Could not find handle for " + << DEB_VAR1(params); + if (force_close || !--it->second.m_nb_frames) { + close(it, aLock); + m_params_handler.erase(it); } } -- GitLab From f778c2a2de3be2ee6e78fb0e2953f57071aed6bc Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Thu, 1 Oct 2020 11:50:15 +0200 Subject: [PATCH 02/10] Improve SaveContainerEdf buffer management: * Allocate different buffers for parallel Files * Recycle free buffers * Refactor MmapInfo, used in EDFConcat format --- control/src/CtSaving_Edf.cpp | 170 +++++++++++++++++++++-------------- control/src/CtSaving_Edf.h | 73 ++++++++++++--- 2 files changed, 162 insertions(+), 81 deletions(-) diff --git a/control/src/CtSaving_Edf.cpp b/control/src/CtSaving_Edf.cpp index 416d90ae..f658578d 100644 --- a/control/src/CtSaving_Edf.cpp +++ b/control/src/CtSaving_Edf.cpp @@ -36,6 +36,34 @@ static const long int WRITE_BUFFER_SIZE = 64*1024; using namespace lima; +#ifdef __unix +SaveContainerEdf::MmapInfo::~MmapInfo() +{ + if(mmap_addr) + munmap(mmap_addr, header_size); +} + +inline void SaveContainerEdf::MmapInfo::map(const std::string& fname, + long long header_position) +{ + header_position -= header_size; + long sz = sysconf(_SC_PAGESIZE); + long long mapping_offset = header_position / sz * sz; + header_size += header_position - mapping_offset; + height_offset -= mapping_offset; + size_offset -= mapping_offset; + int fd = ::open(fname.c_str(),O_RDWR); + if(fd < 0) + throw LIMA_CTL_EXC(Error, "Error opening ") << fname << ": " << strerror(errno); + mmap_addr = mmap(NULL,header_size, + PROT_WRITE,MAP_SHARED,fd,mapping_offset); + int mmap_err = errno; + ::close(fd); + if(!mmap_addr) + throw LIMA_CTL_EXC(Error, "Error mapping ") << fname << ": " << strerror(mmap_err); +} +#endif + #ifdef WIN32 /** @brief this is a small wrapper class for ofstream class. * All this is for overcome performance issue with window std::ofstream @@ -114,6 +142,33 @@ SaveContainerEdf::_OfStream::operator<< (const long data) } #endif +/** @brief saving container handle + * + * This class manage the low-level handle + */ +SaveContainerEdf::File::File(SaveContainerEdf& cont, + const std::string& filename, + std::ios_base::openmode openFlags) + : m_cont(cont), m_filename(filename) +#ifdef __unix + , m_height(0), m_size(0) +#endif +{ + m_fout.exceptions(std::ios_base::failbit | std::ios_base::badbit); + m_fout.open(filename.c_str(),openFlags); +#ifdef __unix + m_buffer = m_cont.getNewBuffer(); + m_fout.rdbuf()->pubsetbuf((char*)m_buffer,WRITE_BUFFER_SIZE); +#endif +} + +SaveContainerEdf::File::~File() +{ +#ifdef __unix + m_cont.releaseBuffer(m_buffer); +#endif +} + /** @brief saving container * * This class manage file saving @@ -121,69 +176,66 @@ SaveContainerEdf::_OfStream::operator<< (const long data) SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream, CtSaving::FileFormat format) : CtSaving::SaveContainer(stream), +#ifdef __unix + m_nb_buffers(0), +#endif m_format(format) { DEB_CONSTRUCTOR(); -#ifdef __unix - if(posix_memalign(&m_fout_buffer,4*1024,WRITE_BUFFER_SIZE)) - THROW_CTL_ERROR(Error) << "Can't allocated write buffer"; -#endif } SaveContainerEdf::~SaveContainerEdf() { DEB_DESTRUCTOR(); #ifdef __unix - free(m_fout_buffer); + if(m_free_buffers.size() != m_nb_buffers) + DEB_WARNING() << "Missing free buffers: " + << "got " << m_free_buffers.size() << ", " + << "expected " << m_nb_buffers; + while(!m_free_buffers.empty()) + free(getNewBuffer()); #endif } -void* SaveContainerEdf::_open(const std::string &filename, - std::ios_base::openmode openFlags) +#ifdef __unix +void *SaveContainerEdf::getNewBuffer() { DEB_MEMBER_FUNCT(); -#ifdef WIN32 - _OfStream* fout = new _OfStream(); -#else - std::ofstream* fout = new std::ofstream(); -#endif - try + + void *buffer; + if(!m_free_buffers.empty()) { - fout->exceptions(std::ios_base::failbit | std::ios_base::badbit); - fout->open(filename.c_str(),openFlags); -#ifdef __unix - fout->rdbuf()->pubsetbuf((char*)m_fout_buffer,WRITE_BUFFER_SIZE); -#endif + buffer = m_free_buffers.top(); + m_free_buffers.pop(); } - catch(...) + else { - delete fout; - throw; + if(posix_memalign(&buffer,4*1024,WRITE_BUFFER_SIZE)) + THROW_CTL_ERROR(Error) << "Can't allocate write buffer"; + ++m_nb_buffers; } - - m_current_filename = filename; - - return fout; + return buffer; } -void SaveContainerEdf::_close(void* f) +void SaveContainerEdf::releaseBuffer(void *buffer) { DEB_MEMBER_FUNCT(); -#ifdef WIN32 - _OfStream* fout = (_OfStream*)f; -#else - std::ofstream* fout = (std::ofstream*)f; + m_free_buffers.push(buffer); +} #endif - delete fout; // close file +void* SaveContainerEdf::_open(const std::string &filename, + std::ios_base::openmode openFlags) +{ + DEB_MEMBER_FUNCT(); + return new File(*this, filename, openFlags); +} -#ifdef __unix - if(m_mmap_info.mmap_addr) - { - munmap(m_mmap_info.mmap_addr,m_mmap_info.header_size); - m_mmap_info.mmap_addr = NULL; - } -#endif +void SaveContainerEdf::_close(void* f) +{ + DEB_MEMBER_FUNCT(); + File* file = (File*) f; + delete file; } long SaveContainerEdf::_writeFile(void* f,Data &aData, @@ -192,11 +244,8 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData, { DEB_MEMBER_FUNCT(); long write_size = 0; -#ifdef WIN32 - _OfStream* fout = (_OfStream*)f; -#else - std::ofstream* fout = (std::ofstream*)f; -#endif + File* file = (File*) f; + File::Stream* fout = &file->m_fout; #if defined(WITH_Z_COMPRESSION) || defined(WITH_LZ4_COMPRESSION) if(aFormat == CtSaving::EDFGZ || aFormat == CtSaving::EDFLZ4) @@ -223,39 +272,26 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData, #ifdef __unix else if(aFormat == CtSaving::EDFConcat) { - m_mmap_info.height += aData.dimensions[1]; - m_mmap_info.size += aData.size(); - if(!m_mmap_info.mmap_addr) // Create header and mmap + file->m_height += aData.dimensions[1]; + file->m_size += aData.size(); + MmapInfo& mmap_info = file->m_mmap_info; + if(!mmap_info) // Create header and mmap { const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq); - m_mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8); - write_size += m_mmap_info.header_size; + mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8); + write_size += mmap_info.header_size; fout->flush(); long long header_position = fout->tellp(); - header_position -= m_mmap_info.header_size; - long sz = sysconf(_SC_PAGESIZE); - long long mapping_offset = header_position / sz * sz; - m_mmap_info.header_size += header_position - mapping_offset; - m_mmap_info.height_offset -= mapping_offset; - m_mmap_info.size_offset -= mapping_offset; - int fd = ::open(m_current_filename.c_str(),O_RDWR); - if(fd > -1) - { - m_mmap_info.mmap_addr = mmap(NULL,m_mmap_info.header_size, - PROT_WRITE,MAP_SHARED,fd,mapping_offset); - ::close(fd); - } - m_mmap_info.height = aData.dimensions[1]; - m_mmap_info.size = aData.size(); + mmap_info.map(file->m_filename, header_position); } else { - char* start_size_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.size_offset; - int nbchar = sprintf(start_size_string,"%lld",m_mmap_info.size); + char* start_size_string = mmap_info.sizeLocation(); + int nbchar = sprintf(start_size_string,"%lld",file->m_size); start_size_string[nbchar] = ' '; - char* start_height_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.height_offset; - nbchar = sprintf(start_height_string,"%lld",m_mmap_info.height); + char* start_height_string = mmap_info.heightLocation(); + nbchar = sprintf(start_height_string,"%lld",file->m_height); start_height_string[nbchar] = ' '; } } diff --git a/control/src/CtSaving_Edf.h b/control/src/CtSaving_Edf.h index 3c370db1..4a00a17b 100644 --- a/control/src/CtSaving_Edf.h +++ b/control/src/CtSaving_Edf.h @@ -25,6 +25,8 @@ #include "lima/CtSaving.h" #include "lima/CtSaving_Compression.h" +#include + namespace lima { class SaveContainerEdf : public CtSaving::SaveContainer @@ -49,27 +51,41 @@ namespace lima { virtual long _writeFile(void*,Data &data, CtSaving::HeaderMap &aHeader, CtSaving::FileFormat); - private: - struct MmapInfo + private: + struct MmapInfo { MmapInfo() : header_size(0), height_offset(0), size_offset(0), - height(0), - size(0), mmap_addr(NULL) {} + +#ifdef __unix + ~MmapInfo(); + + void map(const std::string& fname, + long long header_position); + + operator bool() + { return mmap_addr; } + + char *sizeLocation() + { return (char*) mmap_addr + size_offset; } + + char *heightLocation() + { return (char*) mmap_addr + height_offset; } +#endif + long long header_size; long long height_offset; long long size_offset; - long long height; - long long size; void* mmap_addr; }; template static MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&, int framesPerFile,Stream&, int nbCharReserved = 0); + #ifdef WIN32 class _OfStream { @@ -94,12 +110,41 @@ namespace lima { FILE* m_fout; int m_exc_flag; }; +#endif + + struct File + { +#ifdef WIN32 + typedef _OfStream Stream; #else - void* m_fout_buffer; + typedef std::ofstream Stream; +#endif + + File(SaveContainerEdf& cont, const std::string& filename, + std::ios_base::openmode openFlags); + virtual ~File(); + + SaveContainerEdf& m_cont; + std::string m_filename; + Stream m_fout; + +#ifdef __unix + void* m_buffer; + MmapInfo m_mmap_info; + long long m_height; + long long m_size; #endif + }; + +#ifdef __unix + void *getNewBuffer(); + void releaseBuffer(void *buffer); + + std::stack m_free_buffers; + int m_nb_buffers; +#endif + CtSaving::FileFormat m_format; - MmapInfo m_mmap_info; - std::string m_current_filename; }; template @@ -148,14 +193,14 @@ namespace lima { } sout << "DataType = " << aStringType << " ;\n"; - SaveContainerEdf::MmapInfo offset; - sout << "Size = "; offset.size_offset = sout.tellp(); + SaveContainerEdf::MmapInfo mmap_info; + sout << "Size = "; mmap_info.size_offset = sout.tellp(); snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,""); sout << aData.size() << aBuffer; sout << "Dim_1 = " << aData.dimensions[0] << " ;\n"; - sout << "Dim_2 = "; offset.height_offset = sout.tellp(); + sout << "Dim_2 = "; mmap_info.height_offset = sout.tellp(); snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,""); sout << aData.dimensions[1] << aBuffer; @@ -199,8 +244,8 @@ namespace lima { long long finalHeaderLenght = (lenght + 1023) & ~1023; // 1024 alignment snprintf(aBuffer,sizeof(aBuffer),"%*s}\n",int(finalHeaderLenght - lenght),""); sout << aBuffer; - offset.header_size = finalHeaderLenght; - return offset; + mmap_info.header_size = finalHeaderLenght; + return mmap_info; } } -- GitLab From b7d61b44594171f28335bb9a8164f4967c7831fa Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Wed, 7 Oct 2020 14:14:33 +0200 Subject: [PATCH 03/10] CtSaving: refactoring: * Make frame & file arithmetics common to all SaveContainers: pass pre-calculated information to _open to avoide race conditions like that in HDF5 between the last two files --- control/include/lima/CtSaving.h | 28 ++-- control/include/lima/CtSaving_Compression.h | 16 +-- control/src/CtSaving.cpp | 146 ++++++++++---------- control/src/CtSaving_Cbf.cpp | 3 +- control/src/CtSaving_Cbf.h | 3 +- control/src/CtSaving_Compression.cpp | 20 ++- control/src/CtSaving_Edf.cpp | 27 ++-- control/src/CtSaving_Edf.h | 14 +- control/src/CtSaving_Hdf5.cpp | 39 ++---- control/src/CtSaving_Hdf5.h | 6 +- control/src/CtSaving_Tiff.cpp | 3 +- control/src/CtSaving_Tiff.h | 3 +- 12 files changed, 141 insertions(+), 167 deletions(-) diff --git a/control/include/lima/CtSaving.h b/control/include/lima/CtSaving.h index aaa742e5..0dae751e 100644 --- a/control/include/lima/CtSaving.h +++ b/control/include/lima/CtSaving.h @@ -314,8 +314,7 @@ public: std::list& incoming_speed) const; void getRawStatistic(StatisticsType&) const; - void updateNbFrames(long nb_frames); - void cleanRemainingFrames(long last_acquired_frame_nr); + void updateNbFrames(long nb_acquired_frames); void getParameters(CtSaving::Parameters&) const; void clear(); @@ -348,7 +347,8 @@ public: protected: virtual void* _open(const std::string& filename, - std::ios_base::openmode flags) = 0; + std::ios_base::openmode flags, + CtSaving::Parameters& pars) = 0; virtual void _close(void*) = 0; virtual long _writeFile(void*, Data& data, CtSaving::HeaderMap& aHeader, @@ -359,10 +359,14 @@ public: virtual void _setBuffer(int frameNumber, ZBufferList&& buffer); virtual ZBufferList _takeBuffers(int dataId); - int m_written_frames; - Stream& m_stream; - private: + mutable Mutex m_lock; + Stream& m_stream; + + long m_frames_to_write; + long m_files_to_write; + long m_written_frames; + private: void close(const Params2Handler::iterator& it, AutoMutex& l); StatisticsType m_statistic; @@ -371,14 +375,12 @@ public: std::string m_log_stat_directory; std::string m_log_stat_filename; FILE* m_log_stat_file; - mutable Cond m_cond; - long m_nb_frames_to_write; Frame2Params m_frame_params; Params2Handler m_params_handler; int m_max_writing_task; ///< number of maximum parallel write int m_running_writing_task; ///< number of concurrent write running - Mutex m_lock; + Mutex m_buffers_lock; dataId2ZBufferList m_buffers; }; @@ -504,13 +506,9 @@ public: m_save_cnt->createStatistic(data); } - void updateNbFrames(long last_acquired_frame_nr) - { - m_save_cnt->updateNbFrames(last_acquired_frame_nr); - } - void cleanRemainingFrames(long last_acquired_frame_nr) + void updateNbFrames(long nb_acquired_frames) { - m_save_cnt->cleanRemainingFrames(last_acquired_frame_nr); + m_save_cnt->updateNbFrames(nb_acquired_frames); } private: diff --git a/control/include/lima/CtSaving_Compression.h b/control/include/lima/CtSaving_Compression.h index 8893e317..22a5781c 100644 --- a/control/include/lima/CtSaving_Compression.h +++ b/control/include/lima/CtSaving_Compression.h @@ -30,6 +30,8 @@ namespace lima { + class SaveContainerEdf; + #ifdef WITH_Z_COMPRESSION #include @@ -37,14 +39,13 @@ namespace lima { { DEB_CLASS_NAMESPC(DebModControl,"File Z Compression Task","Control"); - CtSaving::SaveContainer& m_container; - int m_frame_per_file; + SaveContainerEdf& m_container; CtSaving::HeaderMap m_header; z_stream_s m_compression_struct; public: - FileZCompression(CtSaving::SaveContainer &save_cnt, - int framesPerFile,const CtSaving::HeaderMap &header); + FileZCompression(SaveContainerEdf &save_cnt, + const CtSaving::HeaderMap &header); ~FileZCompression(); virtual void process(Data &aData); private: @@ -83,13 +84,12 @@ static const LZ4F_preferences_t lz4_preferences = { { DEB_CLASS_NAMESPC(DebModControl,"File Lz4 Compression Task","Control"); - CtSaving::SaveContainer& m_container; - int m_frame_per_file; + SaveContainerEdf& m_container; CtSaving::HeaderMap m_header; LZ4F_compressionContext_t m_ctx; public: - FileLz4Compression(CtSaving::SaveContainer &save_cnt, - int framesPerFile,const CtSaving::HeaderMap &header); + FileLz4Compression(SaveContainerEdf &save_cnt, + const CtSaving::HeaderMap &header); ~FileLz4Compression(); virtual void process(Data &aData); void _compression(const char *src,size_t size,ZBufferList& return_buffers); diff --git a/control/src/CtSaving.cpp b/control/src/CtSaving.cpp index 01c00469..ebda7e84 100755 --- a/control/src/CtSaving.cpp +++ b/control/src/CtSaving.cpp @@ -1885,9 +1885,7 @@ void CtSaving::_compressionFinished(Data& aData, Stream& stream) return; m_nb_cbk.erase(count_it); - bool ready_flag = _allStreamReady(frame_nr); - - if (!ready_flag) { + if (!_allStreamReady(frame_nr)) { FrameMap::value_type map_pair(frame_nr, aData); m_frame_datas.insert(map_pair); return; @@ -1932,33 +1930,39 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream) m_cond.signal(); return; } + aData.releaseBuffer(); // release finished data - FrameHeaderMap::iterator aHeaderIter; - FrameMap::iterator nextDataIter, end = m_frame_datas.end(); - for (nextDataIter = m_frame_datas.begin(); nextDataIter != end; ++nextDataIter) + // limit search depth to the number of concurrent writing tasks + int tasks; + stream.getMaxConcurrentWritingTask(tasks); + FrameHeaderMap::iterator header_it; + FrameMap::iterator data_it, data_end = m_frame_datas.end(); + for (data_it = m_frame_datas.begin(); tasks && (data_it != data_end); ++data_it, --tasks) { - frame_nr = nextDataIter->first; - aHeaderIter = m_frame_headers.find(frame_nr); - bool header_available = (aHeaderIter != m_frame_headers.end()); + frame_nr = data_it->first; + header_it = m_frame_headers.find(frame_nr); + // sorted frames: if header is not available yet, abort search + if ((saving_mode == AutoHeader) && (header_it == m_frame_headers.end())) + return; bool can_save = _allStreamReady(frame_nr); - if (can_save && ((saving_mode == AutoFrame) || header_available)) + if (can_save) break; } - if (nextDataIter == end) + if ((data_it == data_end) || !tasks) return; - Data aNewData = nextDataIter->second; - m_frame_datas.erase(nextDataIter); + aData = data_it->second; + m_frame_datas.erase(data_it); HeaderMap task_header; - _takeHeader(aHeaderIter, task_header, false); + _takeHeader(header_it, task_header, false); aLock.unlock(); TaskList task_list; _getTaskList(Save, frame_nr, task_header, task_list); - _postTaskList(aNewData, task_list, SAVING_PRIORITY); + _postTaskList(aData, task_list, SAVING_PRIORITY); } /** @brief this methode set the error saving status in CtControl @@ -2078,9 +2082,6 @@ void CtSaving::_stop() // Update the number of frames so that SaveContainer::writeFile() will properly // call SaveContainer::close() stream.updateNbFrames(img_status.LastImageAcquired + 1); - - // Clean up the frame parameters so that _allStreamReady() return true - stream.cleanRemainingFrames(img_status.LastImageAcquired + 1); } } @@ -2133,7 +2134,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) long frameId = aData.frameNumber; - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); Frame2Params::iterator fpars = m_frame_params.find(frameId); if (fpars == m_frame_params.end()) THROW_CTL_ERROR(Error) << "Can't find saving parameters for frame" @@ -2205,7 +2206,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) } lock.lock(); - bool acq_end = (++m_written_frames == m_nb_frames_to_write); + bool acq_end = (++m_written_frames == m_frames_to_write); lock.unlock(); // close before marking that we have finished the frame @@ -2220,17 +2221,15 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader) } } - lock.lock(); - m_frame_params.erase(fpars); - --m_running_writing_task; - lock.unlock(); - Timestamp end_write = Timestamp::now(); Timestamp diff = end_write - start_write; DEB_TRACE() << "Write took : " << diff << "s"; lock.lock(); + m_frame_params.erase(fpars); + --m_running_writing_task; + writeFileStat(aData, start_write, end_write, write_size); } @@ -2272,7 +2271,7 @@ void CtSaving::SaveContainer::writeFileStat(Data& aData, Timestamp start, Timest void CtSaving::SaveContainer::setEnableLogStat(bool enable) { // TODO: check that no current saving is active - AutoMutex aLock = AutoMutex(m_cond.mutex()); + AutoMutex aLock = AutoMutex(m_lock); if (m_log_stat_enable && !enable) { fclose(m_log_stat_file); m_log_stat_file = NULL; @@ -2344,7 +2343,7 @@ void CtSaving::SaveContainer::prepareLogStat(const CtSaving::Parameters& pars) void CtSaving::SaveContainer::setStatisticSize(int aSize) { - AutoMutex aLock = AutoMutex(m_cond.mutex()); + AutoMutex aLock = AutoMutex(m_lock); if (long(m_statistic.size()) > aSize) { size_t aDiffSize = m_statistic.size() - aSize; @@ -2357,7 +2356,7 @@ void CtSaving::SaveContainer::setStatisticSize(int aSize) int CtSaving::SaveContainer::getStatisticSize() const { - AutoMutex aLock(m_cond.mutex()); + AutoMutex aLock(m_lock); return m_statistic_size; } @@ -2378,7 +2377,7 @@ void CtSaving::SaveContainer::getStatistic(std::list& writing_speed, StatisticsType copy; { - AutoMutex aLock = AutoMutex(m_cond.mutex()); + AutoMutex aLock = AutoMutex(m_lock); copy = m_statistic; } @@ -2467,14 +2466,13 @@ void CtSaving::SaveContainer::getParameters(CtSaving::Parameters& pars) const pars = m_stream.getParameters(Acq); } - void CtSaving::SaveContainer::clear() { DEB_MEMBER_FUNCT(); this->close(); - AutoMutex aLock(m_cond.mutex()); + AutoMutex aLock(m_lock); m_statistic.clear(); _clear(); // call inheritance if needed } @@ -2485,32 +2483,40 @@ void CtSaving::SaveContainer::prepare(CtControl& ct) int nb_frames; ct.acquisition()->getAcqNbFrames(nb_frames); CtSaving::Parameters pars = m_stream.getParameters(Auto); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); m_statistic.clear(); - m_nb_frames_to_write = nb_frames; + m_frames_to_write = nb_frames; + m_files_to_write = 0; m_written_frames = 0; - if (m_nb_frames_to_write && // if not live + if (m_frames_to_write && // if not live pars.savingMode != CtSaving::Manual) { long nextNumber = pars.nextNumber - 1; - for (long i = 0; i < nb_frames; ++i) + bool multi_set = (pars.overwritePolicy == MultiSet); + for (long i = 0; i < m_frames_to_write; ++i) { FrameParameters frame_par(pars); + CtSaving::Parameters& file_pars = frame_par.m_pars; - if (pars.overwritePolicy == MultiSet) - frame_par.m_pars.framesPerFile = 1; // force to 1 + if (multi_set) + file_pars.framesPerFile = 1; // force to 1 else { - bool new_file = !(i % pars.framesPerFile); + int idx = i % file_pars.framesPerFile; + bool new_file = (idx == 0); if (new_file) ++nextNumber; - frame_par.m_pars.nextNumber = nextNumber; + file_pars.nextNumber = nextNumber; frame_par.m_threadable = new_file; + long first_frame = i - idx; + if (first_frame + file_pars.framesPerFile > m_frames_to_write) + file_pars.framesPerFile = m_frames_to_write - first_frame; } std::pair result = m_frame_params.insert(Frame2Params::value_type(i, frame_par)); if (!result.second) result.first->second = frame_par; } + m_files_to_write = multi_set ? 1 : (nextNumber - pars.nextNumber + 1); } m_running_writing_task = 0; prepareLogStat(pars); @@ -2518,13 +2524,16 @@ void CtSaving::SaveContainer::prepare(CtControl& ct) _prepare(ct); // call inheritance if needed } -void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr) +void CtSaving::SaveContainer::updateNbFrames(long nb_acquired_frames) { DEB_MEMBER_FUNCT(); - DEB_TRACE() << DEB_VAR1(last_acquired_frame_nr); - - AutoMutex lock(m_cond.mutex()); - m_nb_frames_to_write = last_acquired_frame_nr; + DEB_TRACE() << DEB_VAR1(nb_acquired_frames); + + AutoMutex lock(m_lock); + m_frames_to_write = nb_acquired_frames; + m_frame_params.erase( + m_frame_params.find(nb_acquired_frames), + m_frame_params.end()); } bool CtSaving::SaveContainer::isReady(long frame_nr) const @@ -2532,16 +2541,13 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const DEB_MEMBER_FUNCT(); DEB_PARAM() << DEB_VAR1(frame_nr); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); bool ready; // mean all writing tasks if (frame_nr < 0) { ready = m_frame_params.empty(); - for (Frame2Params::const_iterator it = m_frame_params.begin(); - ready && it != m_frame_params.end(); ++it) - ready = !it->second.m_running; } else { @@ -2566,23 +2572,12 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const return ready; } -void CtSaving::SaveContainer::cleanRemainingFrames(long last_acquired_frame_nr) -{ - DEB_MEMBER_FUNCT(); - DEB_PARAM() << DEB_VAR1(last_acquired_frame_nr); - - AutoMutex lock(m_cond.mutex()); - m_frame_params.erase( - m_frame_params.find(last_acquired_frame_nr), - m_frame_params.end()); -} - void CtSaving::SaveContainer::setReady(long frame_nr) { DEB_MEMBER_FUNCT(); DEB_PARAM() << DEB_VAR1(frame_nr); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); // mean all frames if (frame_nr < 0) m_frame_params.clear(); @@ -2598,7 +2593,7 @@ void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr) { DEB_MEMBER_FUNCT(); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); Frame2Params::iterator i = m_frame_params.find(frame_nr); if (i == m_frame_params.end()) { @@ -2614,7 +2609,7 @@ void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr) void CtSaving::SaveContainer::createStatistic(Data& data) { - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); //Insert statistic StatisticsType::value_type stat_pair(data.frameNumber, Stat()); stat_pair.second.incoming_size = data.size(); @@ -2624,7 +2619,7 @@ void CtSaving::SaveContainer::createStatistic(Data& data) void CtSaving::SaveContainer::compressionStart(Data& data) { Timestamp start = Timestamp::now(); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); StatisticsType::iterator i = m_statistic.find(data.frameNumber); if (i != m_statistic.end()) i->second.compression_start = start; @@ -2633,7 +2628,7 @@ void CtSaving::SaveContainer::compressionStart(Data& data) void CtSaving::SaveContainer::compressionFinished(Data& data) { Timestamp end = Timestamp::now(); - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); StatisticsType::iterator i = m_statistic.find(data.frameNumber); if (i != m_statistic.end()) i->second.compression_end = end; @@ -2665,7 +2660,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars) CtSaving::Parameters& pars = fpars.m_pars; { - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); Params2Handler::iterator handler = m_params_handler.find(pars); if (handler != m_params_handler.end()) return *handler; @@ -2701,7 +2696,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars) for (int nbTry = 0; !handler.m_handler && (nbTry < 5); ++nbTry) { try { - handler.m_handler = _open(aFileName, openFlags); + handler.m_handler = _open(aFileName, openFlags, pars); } catch (std::ios_base::failure & error) { error_desc = error.what(); @@ -2737,7 +2732,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars) Params2Handler::value_type map_pair(pars, handler); bool ok; { - AutoMutex lock(m_cond.mutex()); + AutoMutex lock(m_lock); ok = m_params_handler.insert(map_pair).second; } if (!ok) { @@ -2774,7 +2769,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params, { DEB_MEMBER_FUNCT(); - AutoMutex aLock(m_cond.mutex()); + AutoMutex aLock(m_lock); if (!params) // close all { for (Params2Handler::iterator it = m_params_handler.begin(); @@ -2802,7 +2797,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params, void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers) { - AutoMutex aLock(m_lock); + AutoMutex aLock(m_buffers_lock); std::pair result; result = m_buffers.emplace(std::move(frameNumber), std::move(buffers)); if (!result.second) @@ -2811,7 +2806,7 @@ void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers) ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId) { - AutoMutex aLock(m_lock); + AutoMutex aLock(m_buffers_lock); dataId2ZBufferList::iterator i = m_buffers.find(dataId); ZBufferList aReturnBufferPt(std::move(i->second)); m_buffers.erase(i); @@ -2820,7 +2815,7 @@ ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId) void CtSaving::SaveContainer::_clear() { - AutoMutex aLock(m_lock); + AutoMutex aLock(m_buffers_lock); m_buffers.clear(); } @@ -2872,12 +2867,13 @@ void CtSaving::Stream::checkWriteAccess() // test all file is mode == Abort if (m_pars.overwritePolicy == Abort) { - CtAcquisition* anAcq = m_saving.m_ctrl.acquisition(); - int nbAcqFrames; - anAcq->getAcqNbFrames(nbAcqFrames); + int framesToWrite; + anAcq->getAcqNbFrames(framesToWrite); + if (framesToWrite == 0) + framesToWrite = 1; int framesPerFile = m_pars.framesPerFile; - int nbFiles = (nbAcqFrames + framesPerFile - 1) / framesPerFile; + int nbFiles = (framesToWrite + framesPerFile - 1) / framesPerFile; int firstFileNumber = m_acquisition_pars.nextNumber; int lastFileNumber = m_acquisition_pars.nextNumber + nbFiles - 1; diff --git a/control/src/CtSaving_Cbf.cpp b/control/src/CtSaving_Cbf.cpp index 88728a8f..d7b6af3f 100644 --- a/control/src/CtSaving_Cbf.cpp +++ b/control/src/CtSaving_Cbf.cpp @@ -434,7 +434,8 @@ SinkTaskBase* SaveContainerCbf::getCompressionTask(const CtSaving::HeaderMap &he } void* SaveContainerCbf::_open(const std::string &filename, - std::ios_base::openmode stdOpenflags) + std::ios_base::openmode stdOpenflags, + CtSaving::Parameters& /*pars*/) { DEB_MEMBER_FUNCT(); char openFlags[8]; diff --git a/control/src/CtSaving_Cbf.h b/control/src/CtSaving_Cbf.h index c51aeb1a..28d12f2a 100644 --- a/control/src/CtSaving_Cbf.h +++ b/control/src/CtSaving_Cbf.h @@ -53,7 +53,8 @@ namespace lima { protected: virtual void* _open(const std::string &filename, - std::ios_base::openmode flags); + std::ios_base::openmode flags, + CtSaving::Parameters& pars); virtual void _close(void*); virtual long _writeFile(void*,Data &data, CtSaving::HeaderMap &aHeader, diff --git a/control/src/CtSaving_Compression.cpp b/control/src/CtSaving_Compression.cpp index 0d98509d..9ff5cb9b 100644 --- a/control/src/CtSaving_Compression.cpp +++ b/control/src/CtSaving_Compression.cpp @@ -30,9 +30,9 @@ using namespace lima; #ifdef WITH_Z_COMPRESSION const int FileZCompression::BUFFER_HELPER_SIZE = 64 * 1024; -FileZCompression::FileZCompression(CtSaving::SaveContainer &save_cnt, - int framesPerFile,const CtSaving::HeaderMap &header) : - m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header) +FileZCompression::FileZCompression(SaveContainerEdf &save_cnt, + const CtSaving::HeaderMap &header) : + m_container(save_cnt),m_header(header) { DEB_CONSTRUCTOR(); @@ -65,9 +65,7 @@ void FileZCompression::process(Data &aData) ZBufferList aBufferListPt; std::ostringstream buffer; - SaveContainerEdf::_writeEdfHeader(aData,m_header, - m_frame_per_file, - buffer); + m_container._writeEdfHeader(aData,m_header, buffer); const std::string& tmpBuffer = buffer.str(); _compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt); _compression((char*)aData.data(),aData.size(),aBufferListPt); @@ -126,9 +124,9 @@ void FileZCompression::_end_compression(ZBufferList& return_buffers) #ifdef WITH_LZ4_COMPRESSION -FileLz4Compression::FileLz4Compression(CtSaving::SaveContainer &save_cnt, - int framesPerFile,const CtSaving::HeaderMap &header) : - m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header) +FileLz4Compression::FileLz4Compression(SaveContainerEdf &save_cnt, + const CtSaving::HeaderMap &header) : + m_container(save_cnt),m_header(header) { DEB_CONSTRUCTOR(); @@ -148,9 +146,7 @@ void FileLz4Compression::process(Data &aData) DEB_PARAM() << DEB_VAR1(aData); std::ostringstream buffer; - SaveContainerEdf::_writeEdfHeader(aData,m_header, - m_frame_per_file, - buffer); + m_container._writeEdfHeader(aData,m_header,buffer); ZBufferList aBufferListPt; const std::string& tmpBuffer = buffer.str(); _compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt); diff --git a/control/src/CtSaving_Edf.cpp b/control/src/CtSaving_Edf.cpp index f658578d..3b70dc2f 100644 --- a/control/src/CtSaving_Edf.cpp +++ b/control/src/CtSaving_Edf.cpp @@ -179,7 +179,7 @@ SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream, #ifdef __unix m_nb_buffers(0), #endif - m_format(format) + m_format(format), m_frames_per_file(0) { DEB_CONSTRUCTOR(); } @@ -188,7 +188,7 @@ SaveContainerEdf::~SaveContainerEdf() { DEB_DESTRUCTOR(); #ifdef __unix - if(m_free_buffers.size() != m_nb_buffers) + if(int(m_free_buffers.size()) != m_nb_buffers) DEB_WARNING() << "Missing free buffers: " << "got " << m_free_buffers.size() << ", " << "expected " << m_nb_buffers; @@ -224,8 +224,15 @@ void SaveContainerEdf::releaseBuffer(void *buffer) } #endif +void SaveContainerEdf::_prepare(CtControl&) +{ + const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq); + m_frames_per_file = pars.framesPerFile; +} + void* SaveContainerEdf::_open(const std::string &filename, - std::ios_base::openmode openFlags) + std::ios_base::openmode openFlags, + CtSaving::Parameters & /*pars*/) { DEB_MEMBER_FUNCT(); return new File(*this, filename, openFlags); @@ -264,9 +271,7 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData, if(aFormat == CtSaving::EDF) { - const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq); - MmapInfo info = _writeEdfHeader(aData,aHeader, - pars.framesPerFile,*fout); + MmapInfo info = _writeEdfHeader(aData,aHeader,*fout); write_size += info.header_size; } #ifdef __unix @@ -277,8 +282,7 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData, MmapInfo& mmap_info = file->m_mmap_info; if(!mmap_info) // Create header and mmap { - const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq); - mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8); + mmap_info = _writeEdfHeader(aData,aHeader,*fout,8); write_size += mmap_info.header_size; fout->flush(); long long header_position = fout->tellp(); @@ -308,18 +312,15 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData, SinkTaskBase* SaveContainerEdf::getCompressionTask(const CtSaving::HeaderMap& header) { -#if defined(WITH_Z_COMPRESSION) || defined(WITH_LZ4_COMPRESSION) - const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq); -#endif #ifdef WITH_Z_COMPRESSION if(m_format == CtSaving::EDFGZ) - return new FileZCompression(*this,pars.framesPerFile,header); + return new FileZCompression(*this,header); else #endif #ifdef WITH_LZ4_COMPRESSION if(m_format == CtSaving::EDFLZ4) - return new FileLz4Compression(*this,pars.framesPerFile,header); + return new FileLz4Compression(*this,header); else #endif return NULL; diff --git a/control/src/CtSaving_Edf.h b/control/src/CtSaving_Edf.h index 4a00a17b..91caeddf 100644 --- a/control/src/CtSaving_Edf.h +++ b/control/src/CtSaving_Edf.h @@ -46,11 +46,14 @@ namespace lima { protected: virtual void* _open(const std::string &filename, - std::ios_base::openmode flags); + std::ios_base::openmode flags, + CtSaving::Parameters &pars); virtual void _close(void*); virtual long _writeFile(void*,Data &data, CtSaving::HeaderMap &aHeader, CtSaving::FileFormat); + virtual void _prepare(CtControl&); + private: struct MmapInfo { @@ -82,9 +85,8 @@ namespace lima { void* mmap_addr; }; template - static MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&, - int framesPerFile,Stream&, - int nbCharReserved = 0); + MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&, + Stream&,int nbCharReserved = 0); #ifdef WIN32 class _OfStream @@ -145,13 +147,13 @@ namespace lima { #endif CtSaving::FileFormat m_format; + long m_frames_per_file; }; template SaveContainerEdf::MmapInfo SaveContainerEdf::_writeEdfHeader(Data &aData, CtSaving::HeaderMap &aHeader, - int framesPerFile, Stream &sout, int nbCharReserved) { @@ -165,7 +167,7 @@ namespace lima { ctime_r(&ctime_now, time_str); time_str[strlen(time_str) - 1] = '\0'; - int image_nb = aData.frameNumber % framesPerFile; + int image_nb = aData.frameNumber % m_frames_per_file; char aBuffer[2048]; long long aStartPosition = sout.tellp(); diff --git a/control/src/CtSaving_Hdf5.cpp b/control/src/CtSaving_Hdf5.cpp index 1ba0de9b..8f79953c 100644 --- a/control/src/CtSaving_Hdf5.cpp +++ b/control/src/CtSaving_Hdf5.cpp @@ -247,30 +247,13 @@ void SaveContainerHdf5::_prepare(CtControl& control) { CtSaving::OverwritePolicy overwrite_policy; control.saving()->getOverwritePolicy(overwrite_policy); m_is_multiset = (overwrite_policy == CtSaving::MultiSet); - CtSaving::Parameters pars; - control.saving()->getParameters(pars); - if (m_is_multiset) - m_frames_per_file = m_ct_parameters.acq_nbframes; - else - m_frames_per_file = pars.framesPerFile; - - m_acq_nbframes = m_ct_parameters.acq_nbframes; - - // If not a continuous acquisition - if (m_acq_nbframes > 0) { - // If the acquisition requests less frame than the max per file - // we will only create a dataset with size equal to the nb. of acquired frames - if (m_frames_per_file > m_acq_nbframes) - m_frames_per_file = m_acq_nbframes; - m_max_nb_files = (m_acq_nbframes + m_frames_per_file - 1) / m_frames_per_file; - } - else - m_max_nb_files = 0; + AutoMutex lock(m_lock); m_file_cnt = 0; } -void* SaveContainerHdf5::_open(const std::string &filename, std::ios_base::openmode openFlags) { +void* SaveContainerHdf5::_open(const std::string &filename, std::ios_base::openmode openFlags, + CtSaving::Parameters& pars) { DEB_MEMBER_FUNCT(); AutoPtr<_File> file = new _File(); @@ -500,19 +483,15 @@ void* SaveContainerHdf5::_open(const std::string &filename, std::ios_base::openm THROW_CTL_ERROR(Error) << "File " << filename << " not opened successfully"; } - // increase file counter to manage multi-files and last file with less than frames_per_file frames - file->m_file_index = m_file_cnt++; + { + AutoMutex lock(m_lock); + file->m_file_index = m_file_cnt++; + } - // check if this is the last file with less frames than m_frames_per_file - file->m_nb_frames = m_frames_per_file; - if ((m_file_cnt == m_max_nb_files) && (m_acq_nbframes % m_frames_per_file) != 0) - file->m_nb_frames = m_acq_nbframes % m_frames_per_file; + file->m_nb_frames = pars.framesPerFile; - DEB_TRACE() << "m_file_cnt = "<< m_file_cnt; - DEB_TRACE() << "m_max_nb_files = "<< m_max_nb_files; + DEB_TRACE() << "m_file_cnt = " << m_file_cnt; DEB_TRACE() << "m_nb_frames = "<< file->m_nb_frames; - DEB_TRACE() << "m_acq_nbframes = "<< m_acq_nbframes; - DEB_TRACE() << "m_frames_per_file = "<< m_frames_per_file; return file.forget(); } diff --git a/control/src/CtSaving_Hdf5.h b/control/src/CtSaving_Hdf5.h index ec785aba..a85321bf 100644 --- a/control/src/CtSaving_Hdf5.h +++ b/control/src/CtSaving_Hdf5.h @@ -59,7 +59,8 @@ public: virtual SinkTaskBase* getCompressionTask(const CtSaving::HeaderMap&); protected: virtual void _prepare(CtControl &control); - virtual void* _open(const std::string &filename, std::ios_base::openmode flags); + virtual void* _open(const std::string &filename, std::ios_base::openmode flags, + CtSaving::Parameters& pars); virtual void _close(void*); virtual long _writeFile(void*,Data &data, CtSaving::HeaderMap &aHeader, CtSaving::FileFormat); @@ -101,9 +102,6 @@ private: HwInterface *m_hw_int; bool m_is_multiset; int m_compression_level; - int m_frames_per_file; - int m_acq_nbframes; - int m_max_nb_files; int m_file_cnt; }; diff --git a/control/src/CtSaving_Tiff.cpp b/control/src/CtSaving_Tiff.cpp index 769b880a..4b340886 100644 --- a/control/src/CtSaving_Tiff.cpp +++ b/control/src/CtSaving_Tiff.cpp @@ -57,7 +57,8 @@ SaveContainerTiff::~SaveContainerTiff() } void* SaveContainerTiff::_open(const std::string &filename, - std::ios_base::openmode /*flags*/) + std::ios_base::openmode /*flags*/, + CtSaving::Parameters& /*pars*/) { DEB_MEMBER_FUNCT(); return new std::string(filename); diff --git a/control/src/CtSaving_Tiff.h b/control/src/CtSaving_Tiff.h index 6ea1fbb3..45501feb 100644 --- a/control/src/CtSaving_Tiff.h +++ b/control/src/CtSaving_Tiff.h @@ -32,7 +32,8 @@ namespace lima { virtual ~SaveContainerTiff(); protected: virtual void* _open(const std::string &filename, - std::ios_base::openmode flags); + std::ios_base::openmode flags, + CtSaving::Parameters& pars); virtual void _close(void*); virtual long _writeFile(void*,Data &data, CtSaving::HeaderMap &aHeader, -- GitLab From d071e90f3efcb1825d11dc508fd67c785652236b Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Fri, 9 Oct 2020 10:24:49 +0200 Subject: [PATCH 04/10] CtVideo: refactoring and fix potential issues --- control/include/lima/CtVideo.h | 2 +- control/src/CtVideo.cpp | 38 +++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/control/include/lima/CtVideo.h b/control/include/lima/CtVideo.h index 8f0f0209..c35b72a3 100644 --- a/control/include/lima/CtVideo.h +++ b/control/include/lima/CtVideo.h @@ -72,7 +72,7 @@ namespace lima long long frameNumber() const; private: - Image(const CtVideo*,VideoImage*); + Image(const CtVideo*,VideoImage*,AutoMutex&); const CtVideo* m_video; VideoImage* m_image; diff --git a/control/src/CtVideo.cpp b/control/src/CtVideo.cpp index b9acc361..a13113de 100644 --- a/control/src/CtVideo.cpp +++ b/control/src/CtVideo.cpp @@ -36,8 +36,14 @@ public: AutoMutex aLock(m_cnt.m_cond.mutex()); VideoImage *anImage = m_cnt.m_write_image; DEB_TRACE() << DEB_VAR1(*anImage); - while(anImage->inused) + while(anImage->inused) { m_cnt.m_cond.wait(); + if (anImage != m_cnt.m_write_image) { + DEB_TRACE() << "Read/write swapped"; + anImage = m_cnt.m_write_image; + } + } + DEB_TRACE() << DEB_VAR1(*anImage); anImage->inused = -1; // Write Mode aLock.unlock(); @@ -60,13 +66,14 @@ public: // if read Image is not use, swap if(!m_cnt.m_read_image->inused) { + DEB_TRACE() << "swapping read and write"; m_cnt.m_write_image = m_cnt.m_read_image; m_cnt.m_write_image->frameNumber = -1; m_cnt.m_read_image = anImage; if(m_cnt.m_image_callback) { - CtVideo::Image anImageWrapper(&m_cnt,anImage); + CtVideo::Image anImageWrapper(&m_cnt,anImage,aLock); ImageCallback *cb = m_cnt.m_image_callback; aLock.unlock(); cb->newImage(anImageWrapper); @@ -194,7 +201,7 @@ bool CtVideo::_InternalImageCBK::newImage(char * data,int width,int height,Video if(m_video.m_image_callback) { - CtVideo::Image anImageWrapper(&m_video,anImage); + CtVideo::Image anImageWrapper(&m_video,anImage,aLock); aLock.unlock(); SinkTaskBase* cbk_task = new SinkTaskBase(); @@ -309,8 +316,8 @@ CtVideo::Image::~Image() if(m_video) { AutoMutex aLock(m_video->m_cond.mutex()); - --(m_image->inused); - m_video->m_cond.broadcast(); + if (--(m_image->inused) == 0) + m_video->m_cond.broadcast(); } } @@ -338,8 +345,8 @@ CtVideo::Image& CtVideo::Image::operator=(const CtVideo::Image &other) if(m_video) { AutoMutex aLock(m_video->m_cond.mutex()); - --(m_image->inused); - m_video->m_cond.broadcast(); + if (--(m_image->inused) == 0) + m_video->m_cond.broadcast(); } m_video = other.m_video; @@ -350,10 +357,12 @@ CtVideo::Image& CtVideo::Image::operator=(const CtVideo::Image &other) /** @brief an other contructor * This methode should be call under Lock */ -CtVideo::Image::Image(const CtVideo *video,VideoImage *image) : +CtVideo::Image::Image(const CtVideo *video,VideoImage *image, AutoMutex&l) : m_video(video), m_image(image) { + if (!m_video || (&l.mutex() != &m_video->m_cond.mutex()) || !l.locked()) + throw LIMA_CTL_EXC(InvalidValue, "Invalid video/mutex"); ++(m_image->inused); } @@ -428,6 +437,8 @@ CtVideo::~CtVideo() void CtVideo::setActive(bool aFlag) { + DEB_MEMBER_FUNCT(); + DEB_PARAM() << DEB_VAR1(aFlag); AutoMutex aLock(m_cond.mutex()); m_active_flag = aFlag; } @@ -729,19 +740,26 @@ void CtVideo::getBin(Bin &aBin) const // --- images void CtVideo::getLastImage(CtVideo::Image &anImage) const { + DEB_MEMBER_FUNCT(); AutoMutex aLock(m_cond.mutex()); + DEB_TRACE() << DEB_VAR3(m_write_image->inused, + m_write_image->frameNumber, + m_read_image->frameNumber); if(m_write_image->inused >= 0 && // No writter m_write_image->frameNumber >= m_read_image->frameNumber) { + DEB_TRACE() << "swapping read and write"; VideoImage *tmp = m_read_image; m_read_image = m_write_image; m_write_image = tmp; - m_write_image->frameNumber = -1; + if (!m_read_image->inused) + m_write_image->frameNumber = -1; } - CtVideo::Image tmpImage(this,m_read_image); + CtVideo::Image tmpImage(this,m_read_image,aLock); aLock.unlock(); anImage = tmpImage; + DEB_RETURN() << DEB_VAR1(anImage.m_image->frameNumber); } void CtVideo::getLastImageCounter(long long &anImageCounter) const -- GitLab From 4234b9f3e1e90228880027d500b91708ede8c748 Mon Sep 17 00:00:00 2001 From: operator for beamline Date: Fri, 9 Oct 2020 10:24:49 +0200 Subject: [PATCH 05/10] CtSaving: check and clear frame containers before every acquisition --- control/src/CtControl.cpp | 3 ++- control/src/CtSaving.cpp | 11 +++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/control/src/CtControl.cpp b/control/src/CtControl.cpp index 6b9296c0..9ee7a7b8 100644 --- a/control/src/CtControl.cpp +++ b/control/src/CtControl.cpp @@ -499,8 +499,9 @@ void CtControl::prepareAcq() m_images_buffer.clear(); m_images_saved.clear(); - //Clear common header + //Clear saving: common & frame headers, ZBuffers and statistics m_ct_saving->resetInternalCommonHeader(); + m_ct_saving->clear(); DEB_TRACE() << "Apply hardware bin/roi"; m_ct_image->applyHard(); diff --git a/control/src/CtSaving.cpp b/control/src/CtSaving.cpp index ebda7e84..a0fb53d9 100755 --- a/control/src/CtSaving.cpp +++ b/control/src/CtSaving.cpp @@ -1743,8 +1743,14 @@ void CtSaving::clear() } AutoMutex aLock(m_cond.mutex()); + if (m_frame_headers.size()) + DEB_WARNING() << DEB_VAR1(m_frame_headers.size()); m_frame_headers.clear(); + if (m_common_header.size()) + DEB_WARNING() << DEB_VAR1(m_common_header.size()); m_common_header.clear(); // @fix Should we clear common header??? + if (m_frame_datas.size()) + DEB_WARNING() << DEB_VAR1(m_frame_datas.size()); m_frame_datas.clear(); } @@ -2797,6 +2803,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params, void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers) { + DEB_MEMBER_FUNCT(); AutoMutex aLock(m_buffers_lock); std::pair result; result = m_buffers.emplace(std::move(frameNumber), std::move(buffers)); @@ -2806,6 +2813,7 @@ void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers) ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId) { + DEB_MEMBER_FUNCT(); AutoMutex aLock(m_buffers_lock); dataId2ZBufferList::iterator i = m_buffers.find(dataId); ZBufferList aReturnBufferPt(std::move(i->second)); @@ -2815,7 +2823,10 @@ ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId) void CtSaving::SaveContainer::_clear() { + DEB_MEMBER_FUNCT(); AutoMutex aLock(m_buffers_lock); + if (m_buffers.size()) + DEB_WARNING() << DEB_VAR1(m_buffers.size()); m_buffers.clear(); } -- GitLab From 7b417c1ff97aba424300e0240534bc9a92011012 Mon Sep 17 00:00:00 2001 From: Laurent Claustre Date: Wed, 3 Mar 2021 13:04:20 +0100 Subject: [PATCH 06/10] pin processlib to 1.7* version 1.8* is sip incompatible and will be used from lima-core 1.10 --- conda/debug/meta.yaml | 2 +- conda/release/meta.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conda/debug/meta.yaml b/conda/debug/meta.yaml index 8f00f0a5..52a3dcdb 100644 --- a/conda/debug/meta.yaml +++ b/conda/debug/meta.yaml @@ -21,7 +21,7 @@ requirements: host: - python {{ python }} - sip 4.19.8 #For the SIP module - - processlib + - processlib 1.7* - libconfig 1.7* # I/O formats - zlib diff --git a/conda/release/meta.yaml b/conda/release/meta.yaml index 5e4ab359..6ff42b97 100644 --- a/conda/release/meta.yaml +++ b/conda/release/meta.yaml @@ -19,7 +19,7 @@ requirements: host: - python {{ python }} - sip 4.19.8 #For the SIP module - - processlib + - processlib 1.7* - libconfig 1.7* # I/O formats - zlib -- GitLab From b2dd9e87ad1ffa88ba3f976f065111c6a2107c9b Mon Sep 17 00:00:00 2001 From: Laurent Claustre Date: Wed, 3 Mar 2021 15:40:20 +0100 Subject: [PATCH 07/10] libconfig: FindLibconfig for libconfig++.so and no more liblibconfig++.so, conda package from conda-forge and no more esrf-bcu neither bcu-ci/stable --- cmake/FindLibconfig.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/FindLibconfig.cmake b/cmake/FindLibconfig.cmake index f204c7fc..0d713cd4 100644 --- a/cmake/FindLibconfig.cmake +++ b/cmake/FindLibconfig.cmake @@ -1,4 +1,4 @@ -find_library(LIBCONFIG_LIBRARIES libconfig++) +find_library(LIBCONFIG_LIBRARIES config++) find_path(LIBCONFIG_INCLUDE_DIRS libconfig.h++) include(FindPackageHandleStandardArgs) -- GitLab From 091b2201dbaa917f78b817ac833dac0e973a6638 Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Fri, 5 Mar 2021 13:07:33 +0100 Subject: [PATCH 08/10] ThreadUtils: add OS error code information when throwing exceptions --- common/src/Exceptions.cpp | 2 +- common/src/ThreadUtils.cpp | 61 ++++++++++++++---------- common/test/CMakeLists.txt | 2 +- common/test/test_mutex.cpp | 98 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 26 deletions(-) create mode 100644 common/test/test_mutex.cpp diff --git a/common/src/Exceptions.cpp b/common/src/Exceptions.cpp index baaa7503..61ad4672 100644 --- a/common/src/Exceptions.cpp +++ b/common/src/Exceptions.cpp @@ -84,7 +84,7 @@ Exception::Exception(Layer layer, ErrorType err_type, const string& err_desc, << getErrDesc(); else std::cerr << "********* Exception(" << getErrType() << "): " - << getErrDesc(); + << getErrDesc() << " *********" << std::endl; } Layer Exception::getLayer() const diff --git a/common/src/ThreadUtils.cpp b/common/src/ThreadUtils.cpp index 2942950c..d209ffab 100644 --- a/common/src/ThreadUtils.cpp +++ b/common/src/ThreadUtils.cpp @@ -36,10 +36,20 @@ using namespace lima; +inline void check_error(int ret, const char *desc) +{ + if (ret == 0) + return; + std::ostringstream os; + os << desc << ": " << strerror(ret) << " (" << ret << ")"; + throw LIMA_COM_EXC(Error, os.str()); +} + + MutexAttr::MutexAttr(Type type) { - if (pthread_mutexattr_init(&m_mutex_attr) != 0) - throw LIMA_COM_EXC(Error, "Error initializing mutex attr"); + int ret = pthread_mutexattr_init(&m_mutex_attr); + check_error(ret, "Error initializing mutex attr"); try { setType(type); @@ -76,15 +86,15 @@ void MutexAttr::setType(Type type) throw LIMA_COM_EXC(InvalidValue, "Invalid MutexAttr type"); } - if (pthread_mutexattr_settype(&m_mutex_attr, kind) != 0) - throw LIMA_COM_EXC(Error, "Error setting mutex attr"); + int ret = pthread_mutexattr_settype(&m_mutex_attr, kind); + check_error(ret, "Error setting mutex attr"); } MutexAttr::Type MutexAttr::getType() const { int kind; - if (pthread_mutexattr_gettype(&m_mutex_attr, &kind) != 0) - throw LIMA_COM_EXC(Error, "Error getting mutex attr"); + int ret = pthread_mutexattr_gettype(&m_mutex_attr, &kind); + check_error(ret, "Error getting mutex attr"); switch (kind) { case PTHREAD_MUTEX_NORMAL: @@ -120,8 +130,8 @@ Mutex::Mutex(MutexAttr mutex_attr) : m_mutex_attr(mutex_attr) { pthread_mutexattr_t& attr = m_mutex_attr.m_mutex_attr; - if (pthread_mutex_init(&m_mutex, &attr) != 0) - throw LIMA_COM_EXC(Error, "Error initializing mutex"); + int ret = pthread_mutex_init(&m_mutex, &attr); + check_error(ret, "Error initializing mutex"); } Mutex::~Mutex() @@ -131,14 +141,14 @@ Mutex::~Mutex() void Mutex::lock() { - if (pthread_mutex_lock(&m_mutex) != 0) - throw LIMA_COM_EXC(Error, "Error locking mutex"); + int ret = pthread_mutex_lock(&m_mutex); + check_error(ret, "Error locking mutex"); } void Mutex::unlock() { - if (pthread_mutex_unlock(&m_mutex) != 0) - throw LIMA_COM_EXC(Error, "Error unlocking mutex"); + int ret = pthread_mutex_unlock(&m_mutex); + check_error(ret, "Error unlocking mutex"); } bool Mutex::tryLock() @@ -162,8 +172,8 @@ MutexAttr Mutex::getAttr() Cond::Cond() : m_mutex(MutexAttr::Normal) { - if (pthread_cond_init(&m_cond, NULL) != 0) - throw LIMA_COM_EXC(Error, "Error initializing condition"); + int ret = pthread_cond_init(&m_cond, NULL); + check_error(ret, "Error initializing condition"); } Cond::~Cond() @@ -194,15 +204,15 @@ bool Cond::wait(double timeout) else retcode = pthread_cond_wait(&m_cond, &m_mutex.m_mutex); - if(retcode && retcode != ETIMEDOUT) - throw LIMA_COM_EXC(Error, "Error waiting for condition"); + if(retcode != ETIMEDOUT) + check_error(retcode, "Error waiting for condition"); return !retcode; } void Cond::signal() { - if (pthread_cond_signal(&m_cond) != 0) - throw LIMA_COM_EXC(Error, "Error signaling condition"); + int ret = pthread_cond_signal(&m_cond); + check_error(ret, "Error signaling condition"); } void Cond::acquire() @@ -217,8 +227,8 @@ void Cond::release() void Cond::broadcast() { - if (pthread_cond_broadcast(&m_cond) != 0) - throw LIMA_COM_EXC(Error, "Error broadcast condition"); + int ret = pthread_cond_broadcast(&m_cond); + check_error(ret, "Error broadcast condition"); } pid_t lima::GetThreadID() { @@ -241,7 +251,9 @@ Thread::ExceptionCleanUp::~ExceptionCleanUp() m_thread.m_exception_handled = true; } -Thread::Thread() : m_thread(NULL), m_started(false), m_finished(false), m_exception_handled(false), m_tid(0) +Thread::Thread() + : m_thread(0), m_started(false), m_finished(false), + m_exception_handled(false), m_tid(0) { pthread_attr_init(&m_thread_attr); } @@ -259,8 +271,9 @@ void Thread::start() throw LIMA_COM_EXC(Error, "Thread already started"); m_finished = false; - if (pthread_create(&m_thread, &m_thread_attr, staticThreadFunction, this) != 0) - throw LIMA_HW_EXC(Error, "Error creating thread"); + int ret = pthread_create(&m_thread, &m_thread_attr, + staticThreadFunction, this); + check_error(ret, "Error creating thread"); m_started = true; } @@ -418,7 +431,7 @@ void CmdThread::sendCmdIf(int cmd, bool (*if_test)(int,int)) void CmdThread::doSendCmd(int cmd) { if (m_status == Finished) - throw LIMA_HW_EXC(Error, "Thread has Finished"); + throw LIMA_COM_EXC(Error, "Thread has Finished"); // Assume that we will have a call to waitStatus somewhere after the new command m_status_history.reset(); diff --git a/common/test/CMakeLists.txt b/common/test/CMakeLists.txt index 8bdace72..8c04c33c 100644 --- a/common/test/CMakeLists.txt +++ b/common/test/CMakeLists.txt @@ -22,7 +22,7 @@ set(test_src test_membuffer test_ordered_map) if (NOT WIN32) - list(APPEND test_src test_regex) + list(APPEND test_src test_regex test_mutex) endif() limatools_run_camera_tests("${test_src}" ${NAME}) diff --git a/common/test/test_mutex.cpp b/common/test/test_mutex.cpp new file mode 100644 index 00000000..c654fc58 --- /dev/null +++ b/common/test/test_mutex.cpp @@ -0,0 +1,98 @@ +#include "lima/ThreadUtils.h" +#include "lima/Debug.h" +#include "lima/Exceptions.h" + +using namespace lima; + +DEB_GLOBAL(DebModTest); + +class TestThread : public Thread +{ + DEB_CLASS(DebModTest, "TestThread"); +public: + enum Type { Tester, Locker, Releaser }; + + TestThread(Type type) : m_type(type), m_started(false) + { + DEB_CONSTRUCTOR(); + DEB_PARAM() << DEB_VAR1(type); + } + + void start() + { + DEB_MEMBER_FUNCT(); + Thread::start(); + AutoMutex l(m_cond.mutex()); + while (!m_started) + m_cond.wait(); + DEB_TRACE() << "Main: started"; + } + +protected: + void threadFunction() + { + DEB_MEMBER_FUNCT(); + + CleanUp cleanup(*this); + + if (m_type == Tester) + THROW_COM_ERROR(Error) << "Testing cleanup"; + + if (m_type == Locker) { + AutoMutex l(m_mutex); + l.leaveLocked(); + DEB_TRACE() << "Leave locked"; + } else { + AutoMutex l(m_mutex, AutoMutex::PrevLocked); + DEB_TRACE() << "Previously unlocked"; + } + } + +private: + class CleanUp : public ExceptionCleanUp + { + public: + using ExceptionCleanUp::ExceptionCleanUp; + virtual ~CleanUp() { + static_cast(m_thread).signalStarted(); + } + }; + + void signalStarted() + { + DEB_MEMBER_FUNCT(); + AutoMutex l(m_cond.mutex()); + m_started = true; + m_cond.signal(); + DEB_TRACE() << "Thread: started"; + } + + static Mutex m_mutex; + Type m_type; + Cond m_cond; + bool m_started; +}; + +Mutex TestThread::m_mutex; + +int main(int argc, char *argv[]) +{ + DEB_GLOBAL_FUNCT(); + + DebParams::setTypeFlags(DebParams::AllFlags); + + DEB_TRACE() << "Starting test"; + + TestThread tester(TestThread::Tester); + tester.start(); + tester.join(); + + TestThread locker(TestThread::Locker); + TestThread releaser(TestThread::Releaser); + locker.start(); + releaser.start(); + locker.join(); + releaser.join(); + + return 0; +} -- GitLab From 95aeeae84440e3d2127dd65f712d35c8d96f7ee7 Mon Sep 17 00:00:00 2001 From: Alejandro Homs Puron Date: Fri, 5 Mar 2021 13:27:50 +0100 Subject: [PATCH 09/10] Buffer & Video: fix issues with SoftOpExt: * Take into account SoftOpExt in image buffer overrun & availability --- control/src/CtControl.cpp | 17 ++++++++++++----- control/src/CtVideo.cpp | 21 +++++++++++++++++++-- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/control/src/CtControl.cpp b/control/src/CtControl.cpp index 9ee7a7b8..a092140e 100644 --- a/control/src/CtControl.cpp +++ b/control/src/CtControl.cpp @@ -1296,11 +1296,18 @@ bool CtControl::_checkOverrun(Data& aData, AutoMutex& l) const ImageStatus &imageStatus = m_status.ImageCounters; - long imageToProcess = imageStatus.LastImageAcquired - - imageStatus.LastBaseImageReady; + // ext ops are not in-place, relaxing hw buffer limit is LastImageReady + // if no ext ops, full processing chain needs orig hw buffer + bool full_chain = !m_op_ext_link_task_active; - long imageToSave = imageStatus.LastImageAcquired - - imageStatus.LastImageSaved; + bool lastProcIsCounter = m_op_ext_sink_task_active && full_chain; + long lastProcessed = lastProcIsCounter ? imageStatus.LastCounterReady : + imageStatus.LastImageReady; + long imageToProcess = imageStatus.LastImageAcquired - lastProcessed; + + long lastUsedForSave = full_chain ? imageStatus.LastImageSaved : + imageStatus.LastImageReady; + long imageToSave = imageStatus.LastImageAcquired - lastUsedForSave; long nb_buffers; m_ct_buffer->getNumber(nb_buffers); @@ -1322,7 +1329,7 @@ bool CtControl::_checkOverrun(Data& aData, AutoMutex& l) m_ct_saving->getSaveCounters(first_to_save, last_to_save); DEB_ERROR() << DEB_VAR2(first_to_save, last_to_save); int frames_to_save = last_to_save - first_to_save + 1; - int frames_to_compress = imageStatus.LastBaseImageReady - last_to_save; + int frames_to_compress = imageStatus.LastImageReady - last_to_save; bool slow_processing = frames_to_compress > frames_to_save; DEB_ERROR() << DEB_VAR2(frames_to_compress, frames_to_save); error_code = slow_processing ? ProcessingOverun : SaveOverun; diff --git a/control/src/CtVideo.cpp b/control/src/CtVideo.cpp index a13113de..5ccc8f6c 100644 --- a/control/src/CtVideo.cpp +++ b/control/src/CtVideo.cpp @@ -4,6 +4,7 @@ #include "lima/CtAcquisition.h" #include "lima/CtImage.h" #include "lima/CtBuffer.h" +#include "lima/SoftOpExternalMgr.h" #include "processlib/PoolThreadMgr.h" #include "processlib/SinkTask.h" @@ -26,7 +27,8 @@ class CtVideo::_Data2ImageTask : public SinkTaskBase { DEB_CLASS_NAMESPC(DebModControl,"Data to image","Control"); public: - _Data2ImageTask(CtVideo &cnt) : SinkTaskBase(),m_nb_buffer(0),m_cnt(cnt) {} + _Data2ImageTask(CtVideo &cnt) : SinkTaskBase(),m_nb_buffer(0), + m_data_always_available(false),m_cnt(cnt) {} virtual void process(Data &aData) { @@ -82,13 +84,21 @@ public: } long m_nb_buffer; + bool m_data_always_available; + private: inline bool _check_available(Data& aData) { + DEB_MEMBER_FUNCT(); CtControl::ImageStatus status; m_cnt.m_ct.getImageStatus(status); - + + if (m_data_always_available) + return true; + long offset = status.LastImageAcquired - aData.frameNumber; + DEB_TRACE() << DEB_VAR4(status.LastImageAcquired, aData.frameNumber, + offset, m_nb_buffer); return offset < m_nb_buffer; } @@ -1054,6 +1064,13 @@ void CtVideo::_prepareAcq() m_read_image->frameNumber = -1; m_write_image->frameNumber = -1; + m_data_2_image_task->m_data_always_available = false; + SoftOpExternalMgr* op_ext = m_ct.externalOperation(); + bool op_ext_link_task_active, op_ext_sink_task_active; + op_ext->isTaskActive(op_ext_link_task_active, op_ext_sink_task_active); + if ((m_pars.video_source == LAST_IMAGE) && op_ext_link_task_active) + m_data_2_image_task->m_data_always_available = true; + CtBuffer* buffer = m_ct.buffer(); buffer->getNumber(m_data_2_image_task->m_nb_buffer); } -- GitLab From 083dbeff058605a49989918749d3089884cb91f6 Mon Sep 17 00:00:00 2001 From: Laurent Claustre Date: Tue, 9 Mar 2021 15:26:12 +0100 Subject: [PATCH 10/10] fix libconfig find for both windows and linux --- cmake/FindLibconfig.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/FindLibconfig.cmake b/cmake/FindLibconfig.cmake index 0d713cd4..5c0821f3 100644 --- a/cmake/FindLibconfig.cmake +++ b/cmake/FindLibconfig.cmake @@ -1,4 +1,4 @@ -find_library(LIBCONFIG_LIBRARIES config++) +find_library(LIBCONFIG_LIBRARIES NAMES config++ libconfig++) find_path(LIBCONFIG_INCLUDE_DIRS libconfig.h++) include(FindPackageHandleStandardArgs) -- GitLab