Commit 7b947156 authored by Laurent Claustre's avatar Laurent Claustre

Hoops, fixed a bug introduced with last merge from branch hdf5_direct_chunk_write

parent d1e6ade1
Pipeline #1858 passed with stages
in 59 seconds
......@@ -36,258 +36,6 @@ static const long int WRITE_BUFFER_SIZE = 64*1024;
using namespace lima;
<<<<<<< HEAD
const int SaveContainerEdf::_BufferHelper::BUFFER_HELPER_SIZE = 64 * 1024;
SaveContainerEdf::_BufferHelper::_BufferHelper()
{
DEB_CONSTRUCTOR();
_init(BUFFER_HELPER_SIZE);
}
SaveContainerEdf::_BufferHelper::_BufferHelper(int buffer_size)
{
DEB_CONSTRUCTOR();
DEB_PARAM() << DEB_VAR1(buffer_size);
_init(buffer_size);
}
void SaveContainerEdf::_BufferHelper::_init(int buffer_size)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(buffer_size);
used_size = 0;
#ifdef __unix
if(posix_memalign(&buffer,4*1024,buffer_size))
#else
buffer = _aligned_malloc(buffer_size,4*1024);
if(!buffer)
#endif
THROW_CTL_ERROR(Error) << "Can't allocate buffer";
}
SaveContainerEdf::_BufferHelper::~_BufferHelper()
{
#ifdef __unix
free(buffer);
#else
_aligned_free(buffer);
#endif
}
#ifdef WITH_EDFGZ_SAVING
#include <zlib.h>
#include "processlib/SinkTask.h"
#define TEST_AVAIL_OUT if(!m_compression_struct.avail_out) \
{ \
_BufferHelper *newBuffer = new _BufferHelper(); \
m_compression_struct.next_out = (Bytef*)newBuffer->buffer; \
m_compression_struct.avail_out = newBuffer->BUFFER_HELPER_SIZE; \
return_buffers->push_back(newBuffer); \
}
class SaveContainerEdf::Compression : public SinkTaskBase
{
DEB_CLASS_NAMESPC(DebModControl,"Compression Task","Control");
SaveContainerEdf& m_container;
int m_frame_per_file;
CtSaving::HeaderMap m_header;
z_stream_s m_compression_struct;
public:
Compression(SaveContainerEdf &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
{
DEB_CONSTRUCTOR();
m_compression_struct.next_in = NULL;
m_compression_struct.avail_in = 0;
m_compression_struct.total_in = 0;
m_compression_struct.next_out = NULL;
m_compression_struct.avail_out = 0;
m_compression_struct.total_out = 0;
m_compression_struct.zalloc = NULL;
m_compression_struct.zfree = NULL;
if(deflateInit2(&m_compression_struct,Z_DEFAULT_COMPRESSION,
Z_DEFLATED,
31,
8,
Z_DEFAULT_STRATEGY) != Z_OK)
THROW_CTL_ERROR(Error) << "Can't init compression struct";
};
~Compression()
{
deflateEnd(&m_compression_struct);
}
virtual void process(Data &aData)
{
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
ZBufferType *aBufferListPt = new ZBufferType();
const std::string& tmpBuffer = buffer.str();
try
{
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
_compression((char*)aData.data(),aData.size(),aBufferListPt);
_end_compression(aBufferListPt);
}
catch(Exception&)
{
for(ZBufferType::iterator i = aBufferListPt->begin();
i != aBufferListPt->end();++i)
delete *i;
delete aBufferListPt;
throw;
}
m_container._setBuffer(aData.frameNumber,aBufferListPt);
}
void _compression(const char *buffer,int size,ZBufferType* return_buffers)
{
DEB_MEMBER_FUNCT();
m_compression_struct.next_in = (Bytef*)buffer;
m_compression_struct.avail_in = size;
while(m_compression_struct.avail_in)
{
TEST_AVAIL_OUT;
if(deflate(&m_compression_struct,Z_NO_FLUSH) != Z_OK)
THROW_CTL_ERROR(Error) << "deflate error";
return_buffers->back()->used_size = _BufferHelper::BUFFER_HELPER_SIZE -
m_compression_struct.avail_out;
}
}
void _end_compression(ZBufferType* return_buffers)
{
DEB_MEMBER_FUNCT();
int deflate_res = Z_OK;
while(deflate_res == Z_OK)
{
TEST_AVAIL_OUT;
deflate_res = deflate(&m_compression_struct,Z_FINISH);
return_buffers->back()->used_size = _BufferHelper::BUFFER_HELPER_SIZE -
m_compression_struct.avail_out;
}
if(deflate_res != Z_STREAM_END)
THROW_CTL_ERROR(Error) << "deflate error";
}
};
#endif
#ifdef WITH_EDFLZ4_SAVING
#include <lz4frame.h>
#include "processlib/SinkTask.h"
static const int LZ4_HEADER_SIZE = 19;
static const int LZ4_FOOTER_SIZE = 4;
static const LZ4F_preferences_t lz4_preferences = {
{ LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0, 0, LZ4F_noBlockChecksum },
0, /* compression level */
1, /* autoflush */
{ 0, 0, 0, 0 }, /* reserved, must be set to 0 */
};
class SaveContainerEdf::Lz4Compression : public SinkTaskBase
{
DEB_CLASS_NAMESPC(DebModControl,"Lz4 Compression Task","Control");
SaveContainerEdf& m_container;
int m_frame_per_file;
CtSaving::HeaderMap m_header;
LZ4F_compressionContext_t m_ctx;
public:
Lz4Compression(SaveContainerEdf &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
{
DEB_CONSTRUCTOR();
LZ4F_errorCode_t result = LZ4F_createCompressionContext(&m_ctx, LZ4F_VERSION);
if(LZ4F_isError(result))
THROW_CTL_ERROR(Error) << "LZ4 context init failed: " << DEB_VAR1(result);
};
~Lz4Compression()
{
LZ4F_freeCompressionContext(m_ctx);
}
virtual void process(Data &aData)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR1(aData);
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
ZBufferType *aBufferListPt = new ZBufferType();
const std::string& tmpBuffer = buffer.str();
try
{
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
_compression((char*)aData.data(),aData.size(),aBufferListPt);
}
catch(Exception&)
{
for(ZBufferType::iterator i = aBufferListPt->begin();
i != aBufferListPt->end();++i)
delete *i;
delete aBufferListPt;
throw;
}
m_container._setBuffer(aData.frameNumber,aBufferListPt);
}
void _compression(const char *src,int size,ZBufferType* return_buffers)
{
DEB_MEMBER_FUNCT();
int buffer_size = LZ4F_compressFrameBound(size,&lz4_preferences);
buffer_size += LZ4_HEADER_SIZE + LZ4_FOOTER_SIZE;
_BufferHelper *newBuffer = new _BufferHelper(buffer_size);
return_buffers->push_back(newBuffer);
char* buffer = (char*)newBuffer->buffer;
int offset = LZ4F_compressBegin(m_ctx,buffer,
buffer_size,&lz4_preferences);
if(LZ4F_isError(offset))
THROW_CTL_ERROR(Error) << "Failed to start compression: " << DEB_VAR1(offset);
int error_code = LZ4F_compressUpdate(m_ctx,buffer + offset,buffer_size - offset,
src,size,NULL);
if(LZ4F_isError(error_code))
THROW_CTL_ERROR(Error) << "Compression Failed: "
<< DEB_VAR2(error_code,LZ4F_getErrorName(error_code));
offset += error_code;
error_code = LZ4F_compressEnd(m_ctx, buffer + offset, size - offset, NULL);
if(LZ4F_isError(error_code))
THROW_CTL_ERROR(Error) << "Failed to end compression: " << DEB_VAR1(error_code);
offset += error_code;
newBuffer->used_size = offset;
}
};
#endif
=======
>>>>>>> hdf5_direct_chunk_write
#ifdef WIN32
/** @brief this is a small wrapper class for ofstream class.
* All this is for overcome performance issue with window std::ofstream
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment