Commit f778c2a2 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Improve SaveContainerEdf buffer management:

* Allocate different buffers for parallel Files
* Recycle free buffers
* Refactor MmapInfo, used in EDFConcat format
parent be3c1fe5
......@@ -36,6 +36,34 @@ static const long int WRITE_BUFFER_SIZE = 64*1024;
using namespace lima;
#ifdef __unix
SaveContainerEdf::MmapInfo::~MmapInfo()
{
if(mmap_addr)
munmap(mmap_addr, header_size);
}
inline void SaveContainerEdf::MmapInfo::map(const std::string& fname,
long long header_position)
{
header_position -= header_size;
long sz = sysconf(_SC_PAGESIZE);
long long mapping_offset = header_position / sz * sz;
header_size += header_position - mapping_offset;
height_offset -= mapping_offset;
size_offset -= mapping_offset;
int fd = ::open(fname.c_str(),O_RDWR);
if(fd < 0)
throw LIMA_CTL_EXC(Error, "Error opening ") << fname << ": " << strerror(errno);
mmap_addr = mmap(NULL,header_size,
PROT_WRITE,MAP_SHARED,fd,mapping_offset);
int mmap_err = errno;
::close(fd);
if(!mmap_addr)
throw LIMA_CTL_EXC(Error, "Error mapping ") << fname << ": " << strerror(mmap_err);
}
#endif
#ifdef WIN32
/** @brief this is a small wrapper class for ofstream class.
* All this is for overcome performance issue with window std::ofstream
......@@ -114,6 +142,33 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
}
#endif
/** @brief saving container handle
*
* This class manage the low-level handle
*/
SaveContainerEdf::File::File(SaveContainerEdf& cont,
const std::string& filename,
std::ios_base::openmode openFlags)
: m_cont(cont), m_filename(filename)
#ifdef __unix
, m_height(0), m_size(0)
#endif
{
m_fout.exceptions(std::ios_base::failbit | std::ios_base::badbit);
m_fout.open(filename.c_str(),openFlags);
#ifdef __unix
m_buffer = m_cont.getNewBuffer();
m_fout.rdbuf()->pubsetbuf((char*)m_buffer,WRITE_BUFFER_SIZE);
#endif
}
SaveContainerEdf::File::~File()
{
#ifdef __unix
m_cont.releaseBuffer(m_buffer);
#endif
}
/** @brief saving container
*
* This class manage file saving
......@@ -121,69 +176,66 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream,
CtSaving::FileFormat format) :
CtSaving::SaveContainer(stream),
#ifdef __unix
m_nb_buffers(0),
#endif
m_format(format)
{
DEB_CONSTRUCTOR();
#ifdef __unix
if(posix_memalign(&m_fout_buffer,4*1024,WRITE_BUFFER_SIZE))
THROW_CTL_ERROR(Error) << "Can't allocated write buffer";
#endif
}
SaveContainerEdf::~SaveContainerEdf()
{
DEB_DESTRUCTOR();
#ifdef __unix
free(m_fout_buffer);
if(m_free_buffers.size() != m_nb_buffers)
DEB_WARNING() << "Missing free buffers: "
<< "got " << m_free_buffers.size() << ", "
<< "expected " << m_nb_buffers;
while(!m_free_buffers.empty())
free(getNewBuffer());
#endif
}
void* SaveContainerEdf::_open(const std::string &filename,
std::ios_base::openmode openFlags)
#ifdef __unix
void *SaveContainerEdf::getNewBuffer()
{
DEB_MEMBER_FUNCT();
#ifdef WIN32
_OfStream* fout = new _OfStream();
#else
std::ofstream* fout = new std::ofstream();
#endif
try
void *buffer;
if(!m_free_buffers.empty())
{
fout->exceptions(std::ios_base::failbit | std::ios_base::badbit);
fout->open(filename.c_str(),openFlags);
#ifdef __unix
fout->rdbuf()->pubsetbuf((char*)m_fout_buffer,WRITE_BUFFER_SIZE);
#endif
buffer = m_free_buffers.top();
m_free_buffers.pop();
}
catch(...)
else
{
delete fout;
throw;
if(posix_memalign(&buffer,4*1024,WRITE_BUFFER_SIZE))
THROW_CTL_ERROR(Error) << "Can't allocate write buffer";
++m_nb_buffers;
}
m_current_filename = filename;
return fout;
return buffer;
}
void SaveContainerEdf::_close(void* f)
void SaveContainerEdf::releaseBuffer(void *buffer)
{
DEB_MEMBER_FUNCT();
#ifdef WIN32
_OfStream* fout = (_OfStream*)f;
#else
std::ofstream* fout = (std::ofstream*)f;
m_free_buffers.push(buffer);
}
#endif
delete fout; // close file
void* SaveContainerEdf::_open(const std::string &filename,
std::ios_base::openmode openFlags)
{
DEB_MEMBER_FUNCT();
return new File(*this, filename, openFlags);
}
#ifdef __unix
if(m_mmap_info.mmap_addr)
{
munmap(m_mmap_info.mmap_addr,m_mmap_info.header_size);
m_mmap_info.mmap_addr = NULL;
}
#endif
void SaveContainerEdf::_close(void* f)
{
DEB_MEMBER_FUNCT();
File* file = (File*) f;
delete file;
}
long SaveContainerEdf::_writeFile(void* f,Data &aData,
......@@ -192,11 +244,8 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
{
DEB_MEMBER_FUNCT();
long write_size = 0;
#ifdef WIN32
_OfStream* fout = (_OfStream*)f;
#else
std::ofstream* fout = (std::ofstream*)f;
#endif
File* file = (File*) f;
File::Stream* fout = &file->m_fout;
#if defined(WITH_Z_COMPRESSION) || defined(WITH_LZ4_COMPRESSION)
if(aFormat == CtSaving::EDFGZ || aFormat == CtSaving::EDFLZ4)
......@@ -223,39 +272,26 @@ long SaveContainerEdf::_writeFile(void* f,Data &aData,
#ifdef __unix
else if(aFormat == CtSaving::EDFConcat)
{
m_mmap_info.height += aData.dimensions[1];
m_mmap_info.size += aData.size();
if(!m_mmap_info.mmap_addr) // Create header and mmap
file->m_height += aData.dimensions[1];
file->m_size += aData.size();
MmapInfo& mmap_info = file->m_mmap_info;
if(!mmap_info) // Create header and mmap
{
const CtSaving::Parameters& pars = m_stream.getParameters(CtSaving::Acq);
m_mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8);
write_size += m_mmap_info.header_size;
mmap_info = _writeEdfHeader(aData,aHeader,pars.framesPerFile,*fout,8);
write_size += mmap_info.header_size;
fout->flush();
long long header_position = fout->tellp();
header_position -= m_mmap_info.header_size;
long sz = sysconf(_SC_PAGESIZE);
long long mapping_offset = header_position / sz * sz;
m_mmap_info.header_size += header_position - mapping_offset;
m_mmap_info.height_offset -= mapping_offset;
m_mmap_info.size_offset -= mapping_offset;
int fd = ::open(m_current_filename.c_str(),O_RDWR);
if(fd > -1)
{
m_mmap_info.mmap_addr = mmap(NULL,m_mmap_info.header_size,
PROT_WRITE,MAP_SHARED,fd,mapping_offset);
::close(fd);
}
m_mmap_info.height = aData.dimensions[1];
m_mmap_info.size = aData.size();
mmap_info.map(file->m_filename, header_position);
}
else
{
char* start_size_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.size_offset;
int nbchar = sprintf(start_size_string,"%lld",m_mmap_info.size);
char* start_size_string = mmap_info.sizeLocation();
int nbchar = sprintf(start_size_string,"%lld",file->m_size);
start_size_string[nbchar] = ' ';
char* start_height_string = (char*)m_mmap_info.mmap_addr + m_mmap_info.height_offset;
nbchar = sprintf(start_height_string,"%lld",m_mmap_info.height);
char* start_height_string = mmap_info.heightLocation();
nbchar = sprintf(start_height_string,"%lld",file->m_height);
start_height_string[nbchar] = ' ';
}
}
......
......@@ -25,6 +25,8 @@
#include "lima/CtSaving.h"
#include "lima/CtSaving_Compression.h"
#include <stack>
namespace lima {
class SaveContainerEdf : public CtSaving::SaveContainer
......@@ -49,27 +51,41 @@ namespace lima {
virtual long _writeFile(void*,Data &data,
CtSaving::HeaderMap &aHeader,
CtSaving::FileFormat);
private:
struct MmapInfo
private:
struct MmapInfo
{
MmapInfo() :
header_size(0),
height_offset(0),
size_offset(0),
height(0),
size(0),
mmap_addr(NULL) {}
#ifdef __unix
~MmapInfo();
void map(const std::string& fname,
long long header_position);
operator bool()
{ return mmap_addr; }
char *sizeLocation()
{ return (char*) mmap_addr + size_offset; }
char *heightLocation()
{ return (char*) mmap_addr + height_offset; }
#endif
long long header_size;
long long height_offset;
long long size_offset;
long long height;
long long size;
void* mmap_addr;
};
template<class Stream>
static MmapInfo _writeEdfHeader(Data&,CtSaving::HeaderMap&,
int framesPerFile,Stream&,
int nbCharReserved = 0);
#ifdef WIN32
class _OfStream
{
......@@ -94,12 +110,41 @@ namespace lima {
FILE* m_fout;
int m_exc_flag;
};
#endif
struct File
{
#ifdef WIN32
typedef _OfStream Stream;
#else
void* m_fout_buffer;
typedef std::ofstream Stream;
#endif
File(SaveContainerEdf& cont, const std::string& filename,
std::ios_base::openmode openFlags);
virtual ~File();
SaveContainerEdf& m_cont;
std::string m_filename;
Stream m_fout;
#ifdef __unix
void* m_buffer;
MmapInfo m_mmap_info;
long long m_height;
long long m_size;
#endif
};
#ifdef __unix
void *getNewBuffer();
void releaseBuffer(void *buffer);
std::stack<void*> m_free_buffers;
int m_nb_buffers;
#endif
CtSaving::FileFormat m_format;
MmapInfo m_mmap_info;
std::string m_current_filename;
};
template<class Stream>
......@@ -148,14 +193,14 @@ namespace lima {
}
sout << "DataType = " << aStringType << " ;\n";
SaveContainerEdf::MmapInfo offset;
sout << "Size = "; offset.size_offset = sout.tellp();
SaveContainerEdf::MmapInfo mmap_info;
sout << "Size = "; mmap_info.size_offset = sout.tellp();
snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,"");
sout << aData.size() << aBuffer;
sout << "Dim_1 = " << aData.dimensions[0] << " ;\n";
sout << "Dim_2 = "; offset.height_offset = sout.tellp();
sout << "Dim_2 = "; mmap_info.height_offset = sout.tellp();
snprintf(aBuffer,sizeof(aBuffer),"%*s ;\n",nbCharReserved,"");
sout << aData.dimensions[1] << aBuffer;
......@@ -199,8 +244,8 @@ namespace lima {
long long finalHeaderLenght = (lenght + 1023) & ~1023; // 1024 alignment
snprintf(aBuffer,sizeof(aBuffer),"%*s}\n",int(finalHeaderLenght - lenght),"");
sout << aBuffer;
offset.header_size = finalHeaderLenght;
return offset;
mmap_info.header_size = finalHeaderLenght;
return mmap_info;
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment