Commit b7d61b44 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

CtSaving: refactoring:

* Make frame & file arithmetics common to all SaveContainers:
  pass pre-calculated information to _open to avoide race conditions
  like that in HDF5 between the last two files
parent f778c2a2
......@@ -314,8 +314,7 @@ public:
std::list<double>& incoming_speed) const;
void getRawStatistic(StatisticsType&) const;
void updateNbFrames(long nb_frames);
void cleanRemainingFrames(long last_acquired_frame_nr);
void updateNbFrames(long nb_acquired_frames);
void getParameters(CtSaving::Parameters&) const;
void clear();
......@@ -348,7 +347,8 @@ public:
protected:
virtual void* _open(const std::string& filename,
std::ios_base::openmode flags) = 0;
std::ios_base::openmode flags,
CtSaving::Parameters& pars) = 0;
virtual void _close(void*) = 0;
virtual long _writeFile(void*, Data& data,
CtSaving::HeaderMap& aHeader,
......@@ -359,10 +359,14 @@ public:
virtual void _setBuffer(int frameNumber, ZBufferList&& buffer);
virtual ZBufferList _takeBuffers(int dataId);
int m_written_frames;
Stream& m_stream;
private:
mutable Mutex m_lock;
Stream& m_stream;
long m_frames_to_write;
long m_files_to_write;
long m_written_frames;
private:
void close(const Params2Handler::iterator& it, AutoMutex& l);
StatisticsType m_statistic;
......@@ -371,14 +375,12 @@ public:
std::string m_log_stat_directory;
std::string m_log_stat_filename;
FILE* m_log_stat_file;
mutable Cond m_cond;
long m_nb_frames_to_write;
Frame2Params m_frame_params;
Params2Handler m_params_handler;
int m_max_writing_task; ///< number of maximum parallel write
int m_running_writing_task; ///< number of concurrent write running
Mutex m_lock;
Mutex m_buffers_lock;
dataId2ZBufferList m_buffers;
};
......@@ -504,13 +506,9 @@ public:
m_save_cnt->createStatistic(data);
}
void updateNbFrames(long last_acquired_frame_nr)
{
m_save_cnt->updateNbFrames(last_acquired_frame_nr);
}
void cleanRemainingFrames(long last_acquired_frame_nr)
void updateNbFrames(long nb_acquired_frames)
{
m_save_cnt->cleanRemainingFrames(last_acquired_frame_nr);
m_save_cnt->updateNbFrames(nb_acquired_frames);
}
private:
......
......@@ -30,6 +30,8 @@
namespace lima {
class SaveContainerEdf;
#ifdef WITH_Z_COMPRESSION
#include <zlib.h>
......@@ -37,14 +39,13 @@ namespace lima {
{
DEB_CLASS_NAMESPC(DebModControl,"File Z Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
z_stream_s m_compression_struct;
public:
FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileZCompression();
virtual void process(Data &aData);
private:
......@@ -83,13 +84,12 @@ static const LZ4F_preferences_t lz4_preferences = {
{
DEB_CLASS_NAMESPC(DebModControl,"File Lz4 Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
LZ4F_compressionContext_t m_ctx;
public:
FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileLz4Compression();
virtual void process(Data &aData);
void _compression(const char *src,size_t size,ZBufferList& return_buffers);
......
......@@ -1885,9 +1885,7 @@ void CtSaving::_compressionFinished(Data& aData, Stream& stream)
return;
m_nb_cbk.erase(count_it);
bool ready_flag = _allStreamReady(frame_nr);
if (!ready_flag) {
if (!_allStreamReady(frame_nr)) {
FrameMap::value_type map_pair(frame_nr, aData);
m_frame_datas.insert(map_pair);
return;
......@@ -1932,33 +1930,39 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream)
m_cond.signal();
return;
}
aData.releaseBuffer(); // release finished data
FrameHeaderMap::iterator aHeaderIter;
FrameMap::iterator nextDataIter, end = m_frame_datas.end();
for (nextDataIter = m_frame_datas.begin(); nextDataIter != end; ++nextDataIter)
// limit search depth to the number of concurrent writing tasks
int tasks;
stream.getMaxConcurrentWritingTask(tasks);
FrameHeaderMap::iterator header_it;
FrameMap::iterator data_it, data_end = m_frame_datas.end();
for (data_it = m_frame_datas.begin(); tasks && (data_it != data_end); ++data_it, --tasks)
{
frame_nr = nextDataIter->first;
aHeaderIter = m_frame_headers.find(frame_nr);
bool header_available = (aHeaderIter != m_frame_headers.end());
frame_nr = data_it->first;
header_it = m_frame_headers.find(frame_nr);
// sorted frames: if header is not available yet, abort search
if ((saving_mode == AutoHeader) && (header_it == m_frame_headers.end()))
return;
bool can_save = _allStreamReady(frame_nr);
if (can_save && ((saving_mode == AutoFrame) || header_available))
if (can_save)
break;
}
if (nextDataIter == end)
if ((data_it == data_end) || !tasks)
return;
Data aNewData = nextDataIter->second;
m_frame_datas.erase(nextDataIter);
aData = data_it->second;
m_frame_datas.erase(data_it);
HeaderMap task_header;
_takeHeader(aHeaderIter, task_header, false);
_takeHeader(header_it, task_header, false);
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, task_header, task_list);
_postTaskList(aNewData, task_list, SAVING_PRIORITY);
_postTaskList(aData, task_list, SAVING_PRIORITY);
}
/** @brief this methode set the error saving status in CtControl
......@@ -2078,9 +2082,6 @@ void CtSaving::_stop()
// Update the number of frames so that SaveContainer::writeFile() will properly
// call SaveContainer::close()
stream.updateNbFrames(img_status.LastImageAcquired + 1);
// Clean up the frame parameters so that _allStreamReady() return true
stream.cleanRemainingFrames(img_status.LastImageAcquired + 1);
}
}
......@@ -2133,7 +2134,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
long frameId = aData.frameNumber;
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
Frame2Params::iterator fpars = m_frame_params.find(frameId);
if (fpars == m_frame_params.end())
THROW_CTL_ERROR(Error) << "Can't find saving parameters for frame"
......@@ -2205,7 +2206,7 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
}
lock.lock();
bool acq_end = (++m_written_frames == m_nb_frames_to_write);
bool acq_end = (++m_written_frames == m_frames_to_write);
lock.unlock();
// close before marking that we have finished the frame
......@@ -2220,17 +2221,15 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
}
}
lock.lock();
m_frame_params.erase(fpars);
--m_running_writing_task;
lock.unlock();
Timestamp end_write = Timestamp::now();
Timestamp diff = end_write - start_write;
DEB_TRACE() << "Write took : " << diff << "s";
lock.lock();
m_frame_params.erase(fpars);
--m_running_writing_task;
writeFileStat(aData, start_write, end_write, write_size);
}
......@@ -2272,7 +2271,7 @@ void CtSaving::SaveContainer::writeFileStat(Data& aData, Timestamp start, Timest
void CtSaving::SaveContainer::setEnableLogStat(bool enable)
{
// TODO: check that no current saving is active
AutoMutex aLock = AutoMutex(m_cond.mutex());
AutoMutex aLock = AutoMutex(m_lock);
if (m_log_stat_enable && !enable) {
fclose(m_log_stat_file);
m_log_stat_file = NULL;
......@@ -2344,7 +2343,7 @@ void CtSaving::SaveContainer::prepareLogStat(const CtSaving::Parameters& pars)
void CtSaving::SaveContainer::setStatisticSize(int aSize)
{
AutoMutex aLock = AutoMutex(m_cond.mutex());
AutoMutex aLock = AutoMutex(m_lock);
if (long(m_statistic.size()) > aSize)
{
size_t aDiffSize = m_statistic.size() - aSize;
......@@ -2357,7 +2356,7 @@ void CtSaving::SaveContainer::setStatisticSize(int aSize)
int CtSaving::SaveContainer::getStatisticSize() const
{
AutoMutex aLock(m_cond.mutex());
AutoMutex aLock(m_lock);
return m_statistic_size;
}
......@@ -2378,7 +2377,7 @@ void CtSaving::SaveContainer::getStatistic(std::list<double>& writing_speed,
StatisticsType copy;
{
AutoMutex aLock = AutoMutex(m_cond.mutex());
AutoMutex aLock = AutoMutex(m_lock);
copy = m_statistic;
}
......@@ -2467,14 +2466,13 @@ void CtSaving::SaveContainer::getParameters(CtSaving::Parameters& pars) const
pars = m_stream.getParameters(Acq);
}
void CtSaving::SaveContainer::clear()
{
DEB_MEMBER_FUNCT();
this->close();
AutoMutex aLock(m_cond.mutex());
AutoMutex aLock(m_lock);
m_statistic.clear();
_clear(); // call inheritance if needed
}
......@@ -2485,32 +2483,40 @@ void CtSaving::SaveContainer::prepare(CtControl& ct)
int nb_frames;
ct.acquisition()->getAcqNbFrames(nb_frames);
CtSaving::Parameters pars = m_stream.getParameters(Auto);
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
m_statistic.clear();
m_nb_frames_to_write = nb_frames;
m_frames_to_write = nb_frames;
m_files_to_write = 0;
m_written_frames = 0;
if (m_nb_frames_to_write && // if not live
if (m_frames_to_write && // if not live
pars.savingMode != CtSaving::Manual)
{
long nextNumber = pars.nextNumber - 1;
for (long i = 0; i < nb_frames; ++i)
bool multi_set = (pars.overwritePolicy == MultiSet);
for (long i = 0; i < m_frames_to_write; ++i)
{
FrameParameters frame_par(pars);
CtSaving::Parameters& file_pars = frame_par.m_pars;
if (pars.overwritePolicy == MultiSet)
frame_par.m_pars.framesPerFile = 1; // force to 1
if (multi_set)
file_pars.framesPerFile = 1; // force to 1
else
{
bool new_file = !(i % pars.framesPerFile);
int idx = i % file_pars.framesPerFile;
bool new_file = (idx == 0);
if (new_file) ++nextNumber;
frame_par.m_pars.nextNumber = nextNumber;
file_pars.nextNumber = nextNumber;
frame_par.m_threadable = new_file;
long first_frame = i - idx;
if (first_frame + file_pars.framesPerFile > m_frames_to_write)
file_pars.framesPerFile = m_frames_to_write - first_frame;
}
std::pair<Frame2Params::iterator, bool> result =
m_frame_params.insert(Frame2Params::value_type(i, frame_par));
if (!result.second)
result.first->second = frame_par;
}
m_files_to_write = multi_set ? 1 : (nextNumber - pars.nextNumber + 1);
}
m_running_writing_task = 0;
prepareLogStat(pars);
......@@ -2518,13 +2524,16 @@ void CtSaving::SaveContainer::prepare(CtControl& ct)
_prepare(ct); // call inheritance if needed
}
void CtSaving::SaveContainer::updateNbFrames(long last_acquired_frame_nr)
void CtSaving::SaveContainer::updateNbFrames(long nb_acquired_frames)
{
DEB_MEMBER_FUNCT();
DEB_TRACE() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_nb_frames_to_write = last_acquired_frame_nr;
DEB_TRACE() << DEB_VAR1(nb_acquired_frames);
AutoMutex lock(m_lock);
m_frames_to_write = nb_acquired_frames;
m_frame_params.erase(
m_frame_params.find(nb_acquired_frames),
m_frame_params.end());
}
bool CtSaving::SaveContainer::isReady(long frame_nr) const
......@@ -2532,16 +2541,13 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame_nr);
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
bool ready;
// mean all writing tasks
if (frame_nr < 0)
{
ready = m_frame_params.empty();
for (Frame2Params::const_iterator it = m_frame_params.begin();
ready && it != m_frame_params.end(); ++it)
ready = !it->second.m_running;
}
else
{
......@@ -2566,23 +2572,12 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
return ready;
}
void CtSaving::SaveContainer::cleanRemainingFrames(long last_acquired_frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(last_acquired_frame_nr);
AutoMutex lock(m_cond.mutex());
m_frame_params.erase(
m_frame_params.find(last_acquired_frame_nr),
m_frame_params.end());
}
void CtSaving::SaveContainer::setReady(long frame_nr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(frame_nr);
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
// mean all frames
if (frame_nr < 0)
m_frame_params.clear();
......@@ -2598,7 +2593,7 @@ void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
Frame2Params::iterator i = m_frame_params.find(frame_nr);
if (i == m_frame_params.end())
{
......@@ -2614,7 +2609,7 @@ void CtSaving::SaveContainer::prepareWrittingFrame(long frame_nr)
void CtSaving::SaveContainer::createStatistic(Data& data)
{
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
//Insert statistic
StatisticsType::value_type stat_pair(data.frameNumber, Stat());
stat_pair.second.incoming_size = data.size();
......@@ -2624,7 +2619,7 @@ void CtSaving::SaveContainer::createStatistic(Data& data)
void CtSaving::SaveContainer::compressionStart(Data& data)
{
Timestamp start = Timestamp::now();
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
StatisticsType::iterator i = m_statistic.find(data.frameNumber);
if (i != m_statistic.end())
i->second.compression_start = start;
......@@ -2633,7 +2628,7 @@ void CtSaving::SaveContainer::compressionStart(Data& data)
void CtSaving::SaveContainer::compressionFinished(Data& data)
{
Timestamp end = Timestamp::now();
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
StatisticsType::iterator i = m_statistic.find(data.frameNumber);
if (i != m_statistic.end())
i->second.compression_end = end;
......@@ -2665,7 +2660,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars)
CtSaving::Parameters& pars = fpars.m_pars;
{
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
Params2Handler::iterator handler = m_params_handler.find(pars);
if (handler != m_params_handler.end())
return *handler;
......@@ -2701,7 +2696,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars)
for (int nbTry = 0; !handler.m_handler && (nbTry < 5); ++nbTry)
{
try {
handler.m_handler = _open(aFileName, openFlags);
handler.m_handler = _open(aFileName, openFlags, pars);
}
catch (std::ios_base::failure & error) {
error_desc = error.what();
......@@ -2737,7 +2732,7 @@ CtSaving::SaveContainer::open(FrameParameters& fpars)
Params2Handler::value_type map_pair(pars, handler);
bool ok;
{
AutoMutex lock(m_cond.mutex());
AutoMutex lock(m_lock);
ok = m_params_handler.insert(map_pair).second;
}
if (!ok) {
......@@ -2774,7 +2769,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params,
{
DEB_MEMBER_FUNCT();
AutoMutex aLock(m_cond.mutex());
AutoMutex aLock(m_lock);
if (!params) // close all
{
for (Params2Handler::iterator it = m_params_handler.begin();
......@@ -2802,7 +2797,7 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params,
void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers)
{
AutoMutex aLock(m_lock);
AutoMutex aLock(m_buffers_lock);
std::pair<dataId2ZBufferList::iterator, bool> result;
result = m_buffers.emplace(std::move(frameNumber), std::move(buffers));
if (!result.second)
......@@ -2811,7 +2806,7 @@ void CtSaving::SaveContainer::_setBuffer(int frameNumber, ZBufferList&& buffers)
ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId)
{
AutoMutex aLock(m_lock);
AutoMutex aLock(m_buffers_lock);
dataId2ZBufferList::iterator i = m_buffers.find(dataId);
ZBufferList aReturnBufferPt(std::move(i->second));
m_buffers.erase(i);
......@@ -2820,7 +2815,7 @@ ZBufferList CtSaving::SaveContainer::_takeBuffers(int dataId)
void CtSaving::SaveContainer::_clear()
{
AutoMutex aLock(m_lock);
AutoMutex aLock(m_buffers_lock);
m_buffers.clear();
}
......@@ -2872,12 +2867,13 @@ void CtSaving::Stream::checkWriteAccess()
// test all file is mode == Abort
if (m_pars.overwritePolicy == Abort)
{
CtAcquisition* anAcq = m_saving.m_ctrl.acquisition();
int nbAcqFrames;
anAcq->getAcqNbFrames(nbAcqFrames);
int framesToWrite;
anAcq->getAcqNbFrames(framesToWrite);
if (framesToWrite == 0)
framesToWrite = 1;
int framesPerFile = m_pars.framesPerFile;
int nbFiles = (nbAcqFrames + framesPerFile - 1) / framesPerFile;
int nbFiles = (framesToWrite + framesPerFile - 1) / framesPerFile;
int firstFileNumber = m_acquisition_pars.nextNumber;
int lastFileNumber = m_acquisition_pars.nextNumber + nbFiles - 1;
......
......@@ -434,7 +434,8 @@ SinkTaskBase* SaveContainerCbf::getCompressionTask(const CtSaving::HeaderMap &he
}
void* SaveContainerCbf::_open(const std::string &filename,
std::ios_base::openmode stdOpenflags)
std::ios_base::openmode stdOpenflags,
CtSaving::Parameters& /*pars*/)
{
DEB_MEMBER_FUNCT();
char openFlags[8];
......
......@@ -53,7 +53,8 @@ namespace lima {
protected:
virtual void* _open(const std::string &filename,
std::ios_base::openmode flags);
std::ios_base::openmode flags,
CtSaving::Parameters& pars);
virtual void _close(void*);
virtual long _writeFile(void*,Data &data,
CtSaving::HeaderMap &aHeader,
......
......@@ -30,9 +30,9 @@ using namespace lima;
#ifdef WITH_Z_COMPRESSION
const int FileZCompression::BUFFER_HELPER_SIZE = 64 * 1024;
FileZCompression::FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileZCompression::FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -65,9 +65,7 @@ void FileZCompression::process(Data &aData)
ZBufferList aBufferListPt;
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header, buffer);
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
_compression((char*)aData.data(),aData.size(),aBufferListPt);
......@@ -126,9 +124,9 @@ void FileZCompression::_end_compression(ZBufferList& return_buffers)
#ifdef WITH_LZ4_COMPRESSION
FileLz4Compression::FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileLz4Compression::FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -148,9 +146,7 @@ void FileLz4Compression::process(Data &aData)
DEB_PARAM() << DEB_VAR1(aData);
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header,buffer);
ZBufferList aBufferListPt;
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
......
......@@ -179,7 +179,7 @@ SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream,
#ifdef __unix
m_nb_buffers(0),
#endif
m_format(format)
m_format(format), m_frames_per_file(0)
{
DEB_CONSTRUCTOR();
}
......@@ -188,7 +188,7 @@ SaveContainerEdf::~SaveContainerEdf()
{
DEB_DESTRUCTOR();
#ifdef __unix
if(m_free_buffers.size() != m_nb_buffers)
if(int(m_free_buffers.size()) != m_nb_buffers)
DEB_WARNING() << "Missing free buffers: "
<< "got " << m_free_buffers.size() << ", "
<< "expected " << m_nb_buffers;
......@@ -224,8 +224,15 @@ void SaveContainerEdf::releaseBuffer(void *buffer)
}
#endif
void SaveContainerEdf::_prepare(CtControl&)
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
m_frames_per_file = pars.framesPerFile;
}
void* SaveContainerEdf::_open(const std::string &filename,
std::ios_base::openmode openFlags)
std::ios_base::openmode openFlags,
CtSaving::Parameters & /*pars*/)
{
DEB_MEMBER_FUNCT();
return new File(*this, filename, openFlags);
......@@ -264,9 +271,7 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
if(aFormat == CtSaving::EDF)
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
MmapInfo info = _writeEdfHeader(aData,aHeader,
pars.framesPerFile,*fout);
MmapInfo info = _writeEdfHeader(aData,aHeader,*fout);
write_size += info.header_size;
}
#ifdef __unix
......@@ -277,8 +282,7 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
MmapInfo& mmap_info = file->m_mmap_info;
if(!mmap_info) // Create header and mmap
{