Commit 46a5e3a7 authored by operator for beamline's avatar operator for beamline

Merge branch 'ctsaving-issues-and-refactoring' into slsdetector-eiger

parents 779fc7a8 4234b9f3
Pipeline #41824 passed with stages
in 20 minutes and 4 seconds
......@@ -314,8 +314,7 @@ public:
std::list<double>& incoming_speed) const;
void getRawStatistic(StatisticsType&) const;
void updateNbFrames(long nb_frames);
void cleanRemainingFrames(long last_acquired_frame_nr);
void updateNbFrames(long nb_acquired_frames);
void getParameters(CtSaving::Parameters&) const;
void clear();
......@@ -348,7 +347,8 @@ public:
protected:
virtual void* _open(const std::string& filename,
std::ios_base::openmode flags) = 0;
std::ios_base::openmode flags,
CtSaving::Parameters& pars) = 0;
virtual void _close(void*) = 0;
virtual long _writeFile(void*, Data& data,
CtSaving::HeaderMap& aHeader,
......@@ -359,23 +359,28 @@ public:
virtual void _setBuffer(int frameNumber, ZBufferList&& buffer);
virtual ZBufferList _takeBuffers(int dataId);
int m_written_frames;
Stream& m_stream;
mutable Mutex m_lock;
Stream& m_stream;
long m_frames_to_write;
long m_files_to_write;
long m_written_frames;
private:
void close(const Params2Handler::iterator& it, AutoMutex& l);
StatisticsType m_statistic;
int m_statistic_size;
bool m_log_stat_enable;
std::string m_log_stat_directory;
std::string m_log_stat_filename;
FILE* m_log_stat_file;
mutable Cond m_cond;
long m_nb_frames_to_write;
Frame2Params m_frame_params;
Params2Handler m_params_handler;
int m_max_writing_task; ///< number of maximum parallel write
int m_running_writing_task; ///< number of concurrent write running
Mutex m_lock;
Mutex m_buffers_lock;
dataId2ZBufferList m_buffers;
};
......@@ -501,13 +506,9 @@ public:
m_save_cnt->createStatistic(data);
}
void updateNbFrames(long last_acquired_frame_nr)
{
m_save_cnt->updateNbFrames(last_acquired_frame_nr);
}
void cleanRemainingFrames(long last_acquired_frame_nr)
void updateNbFrames(long nb_acquired_frames)
{
m_save_cnt->cleanRemainingFrames(last_acquired_frame_nr);
m_save_cnt->updateNbFrames(nb_acquired_frames);
}
private:
......
......@@ -30,6 +30,8 @@
namespace lima {
class SaveContainerEdf;
#ifdef WITH_Z_COMPRESSION
#include <zlib.h>
......@@ -37,14 +39,13 @@ namespace lima {
{
DEB_CLASS_NAMESPC(DebModControl,"File Z Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
z_stream_s m_compression_struct;
public:
FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileZCompression();
virtual void process(Data &aData);
private:
......@@ -83,13 +84,12 @@ static const LZ4F_preferences_t lz4_preferences = {
{
DEB_CLASS_NAMESPC(DebModControl,"File Lz4 Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
LZ4F_compressionContext_t m_ctx;
public:
FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileLz4Compression();
virtual void process(Data &aData);
void _compression(const char *src,size_t size,ZBufferList& return_buffers);
......
......@@ -72,7 +72,7 @@ namespace lima
long long frameNumber() const;
private:
Image(const CtVideo*,VideoImage*);
Image(const CtVideo*,VideoImage*,AutoMutex&);
const CtVideo* m_video;
VideoImage* m_image;
......
......@@ -499,8 +499,9 @@ void CtControl::prepareAcq()
m_images_buffer.clear();
m_images_saved.clear();
//Clear common header
//Clear saving: common & frame headers, ZBuffers and statistics
m_ct_saving->resetInternalCommonHeader();
m_ct_saving->clear();
DEB_TRACE() << "Apply hardware bin/roi";
m_ct_image->applyHard();
......
This diff is collapsed.
......@@ -434,7 +434,8 @@ SinkTaskBase* SaveContainerCbf::getCompressionTask(const CtSaving::HeaderMap &he
}
void* SaveContainerCbf::_open(const std::string &filename,
std::ios_base::openmode stdOpenflags)
std::ios_base::openmode stdOpenflags,
CtSaving::Parameters& /*pars*/)
{
DEB_MEMBER_FUNCT();
char openFlags[8];
......
......@@ -53,7 +53,8 @@ namespace lima {
protected:
virtual void* _open(const std::string &filename,
std::ios_base::openmode flags);
std::ios_base::openmode flags,
CtSaving::Parameters& pars);
virtual void _close(void*);
virtual long _writeFile(void*,Data &data,
CtSaving::HeaderMap &aHeader,
......
......@@ -30,9 +30,9 @@ using namespace lima;
#ifdef WITH_Z_COMPRESSION
const int FileZCompression::BUFFER_HELPER_SIZE = 64 * 1024;
FileZCompression::FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileZCompression::FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -65,9 +65,7 @@ void FileZCompression::process(Data &aData)
ZBufferList aBufferListPt;
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header, buffer);
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
_compression((char*)aData.data(),aData.size(),aBufferListPt);
......@@ -126,9 +124,9 @@ void FileZCompression::_end_compression(ZBufferList& return_buffers)
#ifdef WITH_LZ4_COMPRESSION
FileLz4Compression::FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileLz4Compression::FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -148,9 +146,7 @@ void FileLz4Compression::process(Data &aData)
DEB_PARAM() << DEB_VAR1(aData);
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header,buffer);
ZBufferList aBufferListPt;
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
......
......@@ -36,6 +36,34 @@ static const long int WRITE_BUFFER_SIZE = 64*1024;
using namespace lima;
#ifdef __unix
SaveContainerEdf::MmapInfo::~MmapInfo()
{
if(mmap_addr)
munmap(mmap_addr, header_size);
}
inline void SaveContainerEdf::MmapInfo::map(const std::string& fname,
long long header_position)
{
header_position -= header_size;
long sz = sysconf(_SC_PAGESIZE);
long long mapping_offset = header_position / sz * sz;
header_size += header_position - mapping_offset;
height_offset -= mapping_offset;
size_offset -= mapping_offset;
int fd = ::open(fname.c_str(),O_RDWR);
if(fd < 0)
throw LIMA_CTL_EXC(Error, "Error opening ") << fname << ": " << strerror(errno);
mmap_addr = mmap(NULL,header_size,
PROT_WRITE,MAP_SHARED,fd,mapping_offset);
int mmap_err = errno;
::close(fd);
if(!mmap_addr)
throw LIMA_CTL_EXC(Error, "Error mapping ") << fname << ": " << strerror(mmap_err);
}
#endif
#ifdef WIN32
/** @brief this is a small wrapper class for ofstream class.
* All this is for overcome performance issue with window std::ofstream
......@@ -114,6 +142,33 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
}
#endif
/** @brief saving container handle
*
* This class manage the low-level handle
*/
SaveContainerEdf::File::File(SaveContainerEdf& cont,
const std::string& filename,
std::ios_base::openmode openFlags)
: m_cont(cont), m_filename(filename)
#ifdef __unix
, m_height(0), m_size(0)
#endif
{
m_fout.exceptions(std::ios_base::failbit | std::ios_base::badbit);
m_fout.open(filename.c_str(),openFlags);
#ifdef __unix
m_buffer = m_cont.getNewBuffer();
m_fout.rdbuf()->pubsetbuf((char*)m_buffer,WRITE_BUFFER_SIZE);
#endif
}
SaveContainerEdf::File::~File()
{
#ifdef __unix
m_cont.releaseBuffer(m_buffer);
#endif
}
/** @brief saving container
*
* This class manage file saving
......@@ -121,69 +176,73 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream,
CtSaving::FileFormat format) :
CtSaving::SaveContainer(stream),
m_format(format)
{
DEB_CONSTRUCTOR();
#ifdef __unix
if(posix_memalign(&m_fout_buffer,4*1024,WRITE_BUFFER_SIZE))
THROW_CTL_ERROR(Error) << "Can't allocated write buffer";
m_nb_buffers(0),
#endif
m_format(format), m_frames_per_file(0)
{
DEB_CONSTRUCTOR();
}
SaveContainerEdf::~SaveContainerEdf()
{
DEB_DESTRUCTOR();
#ifdef __unix
free(m_fout_buffer);
if(int(m_free_buffers.size()) != m_nb_buffers)
DEB_WARNING() << "Missing free buffers: "
<< "got " << m_free_buffers.size() << ", "
<< "expected " << m_nb_buffers;
while(!m_free_buffers.empty())
free(getNewBuffer());
#endif
}
void* SaveContainerEdf::_open(const std::string &filename,
std::ios_base::openmode openFlags)
#ifdef __unix
void *SaveContainerEdf::getNewBuffer()
{
DEB_MEMBER_FUNCT();
#ifdef WIN32
_OfStream* fout = new _OfStream();
#else
std::ofstream* fout = new std::ofstream();
#endif
try
void *buffer;
if(!m_free_buffers.empty())
{
fout->exceptions(std::ios_base::failbit | std::ios_base::badbit);
fout->open(filename.c_str(),openFlags);
#ifdef __unix
fout->rdbuf()->pubsetbuf((char*)m_fout_buffer,WRITE_BUFFER_SIZE);
#endif
buffer = m_free_buffers.top();
m_free_buffers.pop();
}
catch(...)
else
{
delete fout;
throw;
if(posix_memalign(&buffer,4*1024,WRITE_BUFFER_SIZE))
THROW_CTL_ERROR(Error) << "Can't allocate write buffer";
++m_nb_buffers;
}
m_current_filename = filename;
return fout;
return buffer;
}
void SaveContainerEdf::_close(void* f)
void SaveContainerEdf::releaseBuffer(void *buffer)
{
DEB_MEMBER_FUNCT();
#ifdef WIN32
_OfStream* fout = (_OfStream*)f;
#else
std::ofstream* fout = (std::ofstream*)f;
m_free_buffers.push(buffer);
}
#endif
delete fout; // close file
void SaveContainerEdf::_prepare(CtControl&)
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
m_frames_per_file = pars.framesPerFile;
}
#ifdef __unix
if(m_mmap_info.mmap_addr)
{
munmap(m_mmap_info.mmap_addr,m_mmap_info.header_size);
m_mmap_info.mmap_addr = NULL;
}
#endif
void* SaveContainerEdf::_open(const std::string &filename,
std::ios_base::openmode openFlags,
CtSaving::Parameters & /*pars*/)
{
DEB_MEMBER_FUNCT();
return new File(*this, filename, openFlags);
}
void SaveContainerEdf::_close(void* f)
{
DEB_MEMBER_FUNCT();
File* file = (File*) f;
delete file;
}
long SaveContainerEdf::_writeFile(void* f,Data &aData,
......@@ -192,11 +251,8 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
{
DEB_MEMBER_FUNCT();
long write_size = 0;
#ifdef WIN32
_OfStream* fout = (_OfStream*)f;
#else
std::ofstream* fout = (std::ofstream*)f;
#endif
File* file = (File*) f;
File::Stream* fout = &file->m_fout;
#if defined(WITH_Z_COMPRESSION) || defined(WITH_LZ4_COMPRESSION)
if(aFormat == CtSaving::EDFGZ || aFormat == CtSaving::EDFLZ4)
......@@ -215,47 +271,31 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
if(aFormat == CtSaving::EDF)
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
MmapInfo info = _writeEdfHeader(aData,aHeader,
pars.framesPerFile,*fout);
MmapInfo info = _writeEdfHeader(aData,aHeader,*fout);
write_size += info.header_size;
}
#ifdef __unix
else if(aFormat == CtSaving::EDFConcat)
{
m_mmap_info.height += aData.dimensions[1];
m_mmap_info.size += aData.size();
if(!m_mmap_info.mmap_addr) // Create header and mmap
file->m_height += aData.dimensions[1];
file->m_size += aData.size();
MmapInfo& mmap_info = file->m_mmap_info;
if(!mmap_info) // Create header and mmap
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
m_mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8);
write_size += m_mmap_info.header_size;
mmap_info = _writeEdfHeader(aData,aHeader,*fout,8);
write_size += mmap_info.header_size;
fout->flush();
long long header_position = fout->tellp();
header_position -= m_mmap_info.header_size;
long sz = sysconf(_SC_PAGESIZE);
long long mapping_offset = header_position / sz * sz;
m_mmap_info.header_size += header_position - mapping_offset;
m_mmap_info.height_offset -= mapping_offset;
m_mmap_info.size_offset -= mapping_offset;
int fd = ::open(m_current_filename.c_str(),O_RDWR);
if(fd > -1)
{
m_mmap_info.mmap_addr = mmap(NULL,m_mmap_info.header_size,
PROT_WRITE,MAP_SHARED,fd,mapping_offset);
::close(fd);
}
m_mmap_info.height = aData.dimensions[1];
m_mmap_info.size = aData.size();
mmap_info.map(file->m_filename, header_position);
}
else
{
char* start_size_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.size_offset;
int nbchar = sprintf(start_size_string,"%lld",m_mmap_info.size);
char* start_size_string = mmap_info.sizeLocation();
int nbchar = sprintf(start_size_string,"%lld",file->m_size);
start_size_string[nbchar] = ' ';
char* start_height_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.height_offset;
nbchar = sprintf(start_height_string,"%lld",m_mmap_info.height);
char* start_height_string = mmap_info.heightLocation();
nbchar = sprintf(start_height_string,"%lld",file->m_height);
start_height_string[nbchar] = ' ';
}
}
......@@ -272,18 +312,15 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
SinkTaskBase* SaveContainerEdf::getCompressionTask(const CtSaving::HeaderMap& header)
{
#if defined(WITH_Z_COMPRESSION) || defined(WITH_LZ4_COMPRESSION)
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
#endif
#ifdef WITH_Z_COMPRESSION
if(m_format == CtSaving::EDFGZ)
return new FileZCompression(*this,pars.framesPerFile,header);
return new FileZCompression(*this,header);
else
#endif
#ifdef WITH_LZ4_COMPRESSION
if(m_format == CtSaving::EDFLZ4)
return new FileLz4Compression(*this,pars.framesPerFile,header);
return new FileLz4Compression(*this,header);
else
#endif
return NULL;
......
......@@ -25,6 +25,8 @@
#include "lima/CtSaving.h"
#include "lima/CtSaving_Compression.h"
#include <stack>
namespace lima {
class SaveContainerEdf : public CtSaving::SaveContainer
......@@ -44,32 +46,48 @@ namespace lima {
protected:
virtual void* _open(const std::string &filename,
std::ios_base::openmode flags);
std::ios_base::openmode flags,
CtSaving::Parameters &pars);
virtual void _close(void*);
virtual long _writeFile(void*,Data &data,
CtSaving::HeaderMap &aHeader,
CtSaving::FileFormat);
private:
struct MmapInfo
virtual void _prepare(CtControl&);
private:
struct MmapInfo
{
MmapInfo() :
header_size(0),
height_offset(0),
size_offset(0),
height(0),
size(0),
mmap_addr(NULL) {}
#ifdef __unix
~MmapInfo();
void map(const std::string& fname,
long long header_position);
operator bool()
{ return mmap_addr; }
char *sizeLocation()
{ return (char*) mmap_addr + size_offset; }
char *heightLocation()
{ return (char*) mmap_addr + height_offset; }
#endif
long long header_size;
long long height_offset;
long long size_offset;
long long height;
long long size;
void* mmap_addr;
};
template<class Stream>
static MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&,
int framesPerFile,Stream&,
int nbCharReserved = 0);
MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&,
Stream&,int nbCharReserved = 0);
#ifdef WIN32
class _OfStream
{
......@@ -94,19 +112,48 @@ namespace lima {
FILE* m_fout;
int m_exc_flag;
};
#endif
struct File
{
#ifdef WIN32
typedef _OfStream Stream;
#else
void* m_fout_buffer;
typedef std::ofstream Stream;
#endif
File(SaveContainerEdf& cont, const std::string& filename,
std::ios_base::openmode openFlags);
virtual ~File();
SaveContainerEdf& m_cont;
std::string m_filename;
Stream m_fout;
#ifdef __unix
void* m_buffer;
MmapInfo m_mmap_info;
long long m_height;
long long m_size;
#endif
};
#ifdef __unix
void *getNewBuffer();
void releaseBuffer(void *buffer);
std::stack<void*> m_free_buffers;
int m_nb_buffers;
#endif
CtSaving::FileFormat m_format;
MmapInfo m_mmap_info;
std::string m_current_filename;
long m_frames_per_file;
};
template<class Stream>
SaveContainerEdf::MmapInfo
SaveContainerEdf::_writeEdfHeader(Data &aData,
CtSaving::HeaderMap &aHeader,
int framesPerFile,
Stream &sout,
int nbCharReserved)
{
......@@ -120,7 +167,7 @@ namespace lima {
ctime_r(&ctime_now, time_str);
time_str[strlen(time_str) - 1] = '\0';
int image_nb = aData.frameNumber % framesPerFile;
int image_nb = aData.frameNumber % m_frames_per_file;
char aBuffer[2048];
long long aStartPosition = sout.tellp();
......@@ -148,14 +195,14 @@ namespace lima {
}
sout << "DataType = " << aStringType << " ;\n";
SaveContainerEdf::MmapInfo offset;
sout << "Size = "; offset.size_offset = sout.tellp();
SaveContainerEdf::MmapInfo mmap_info;
sout << "Size = "; mmap_info.size_offset = sout.tellp();
snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,"");
sout << aData.size() << aBuffer;
sout << "Dim_1 = " << aData.dimensions[0] << " ;\n";
sout << "Dim_2 = "; offset.height_offset = sout.tellp();
sout << "Dim_2 = "; mmap_info.height_offset = sout.tellp();
snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,"");
sout << aData.dimensions[1] << aBuffer;
......@@ -199,8 +246,8 @@ namespace lima {
long long finalHeaderLenght = (lenght + 1023) & ~1023; // 1024 alignment
snprintf(aBuffer,sizeof(aBuffer),"%*s}\n",int(finalHeaderLenght - lenght),"");
sout << aBuffer;
offset.header_size = finalHeaderLenght;
return offset;
mmap_info.header_size = finalHeaderLenght;
return mmap_info;
}
}
......