Commit 926911bf authored by Laurent Claustre's avatar Laurent Claustre
Browse files

Merge branch 'ctsaving-issues-and-refactoring' into 'master'

CtSaving & CtVideo issues and refactoring

See merge request !191
parents eb32f09e 083dbeff
Pipeline #44626 passed with stages
in 30 minutes and 19 seconds
find_library(LIBCONFIG_LIBRARIES libconfig++)
find_library(LIBCONFIG_LIBRARIES NAMES config++ libconfig++)
find_path(LIBCONFIG_INCLUDE_DIRS libconfig.h++)
include(FindPackageHandleStandardArgs)
......
......@@ -84,7 +84,7 @@ Exception::Exception(Layer layer, ErrorType err_type, const string& err_desc,
<< getErrDesc();
else
std::cerr << "********* Exception(" << getErrType() << "): "
<< getErrDesc();
<< getErrDesc() << " *********" << std::endl;
}
Layer Exception::getLayer() const
......
......@@ -36,10 +36,20 @@
using namespace lima;
inline void check_error(int ret, const char *desc)
{
if (ret == 0)
return;
std::ostringstream os;
os << desc << ": " << strerror(ret) << " (" << ret << ")";
throw LIMA_COM_EXC(Error, os.str());
}
MutexAttr::MutexAttr(Type type)
{
if (pthread_mutexattr_init(&m_mutex_attr) != 0)
throw LIMA_COM_EXC(Error, "Error initializing mutex attr");
int ret = pthread_mutexattr_init(&m_mutex_attr);
check_error(ret, "Error initializing mutex attr");
try {
setType(type);
......@@ -76,15 +86,15 @@ void MutexAttr::setType(Type type)
throw LIMA_COM_EXC(InvalidValue, "Invalid MutexAttr type");
}
if (pthread_mutexattr_settype(&m_mutex_attr, kind) != 0)
throw LIMA_COM_EXC(Error, "Error setting mutex attr");
int ret = pthread_mutexattr_settype(&m_mutex_attr, kind);
check_error(ret, "Error setting mutex attr");
}
MutexAttr::Type MutexAttr::getType() const
{
int kind;
if (pthread_mutexattr_gettype(&m_mutex_attr, &kind) != 0)
throw LIMA_COM_EXC(Error, "Error getting mutex attr");
int ret = pthread_mutexattr_gettype(&m_mutex_attr, &kind);
check_error(ret, "Error getting mutex attr");
switch (kind) {
case PTHREAD_MUTEX_NORMAL:
......@@ -120,8 +130,8 @@ Mutex::Mutex(MutexAttr mutex_attr)
: m_mutex_attr(mutex_attr)
{
pthread_mutexattr_t& attr = m_mutex_attr.m_mutex_attr;
if (pthread_mutex_init(&m_mutex, &attr) != 0)
throw LIMA_COM_EXC(Error, "Error initializing mutex");
int ret = pthread_mutex_init(&m_mutex, &attr);
check_error(ret, "Error initializing mutex");
}
Mutex::~Mutex()
......@@ -131,14 +141,14 @@ Mutex::~Mutex()
void Mutex::lock()
{
if (pthread_mutex_lock(&m_mutex) != 0)
throw LIMA_COM_EXC(Error, "Error locking mutex");
int ret = pthread_mutex_lock(&m_mutex);
check_error(ret, "Error locking mutex");
}
void Mutex::unlock()
{
if (pthread_mutex_unlock(&m_mutex) != 0)
throw LIMA_COM_EXC(Error, "Error unlocking mutex");
int ret = pthread_mutex_unlock(&m_mutex);
check_error(ret, "Error unlocking mutex");
}
bool Mutex::tryLock()
......@@ -162,8 +172,8 @@ MutexAttr Mutex::getAttr()
Cond::Cond() : m_mutex(MutexAttr::Normal)
{
if (pthread_cond_init(&m_cond, NULL) != 0)
throw LIMA_COM_EXC(Error, "Error initializing condition");
int ret = pthread_cond_init(&m_cond, NULL);
check_error(ret, "Error initializing condition");
}
Cond::~Cond()
......@@ -194,15 +204,15 @@ bool Cond::wait(double timeout)
else
retcode = pthread_cond_wait(&m_cond, &m_mutex.m_mutex);
if(retcode && retcode != ETIMEDOUT)
throw LIMA_COM_EXC(Error, "Error waiting for condition");
if(retcode != ETIMEDOUT)
check_error(retcode, "Error waiting for condition");
return !retcode;
}
void Cond::signal()
{
if (pthread_cond_signal(&m_cond) != 0)
throw LIMA_COM_EXC(Error, "Error signaling condition");
int ret = pthread_cond_signal(&m_cond);
check_error(ret, "Error signaling condition");
}
void Cond::acquire()
......@@ -217,8 +227,8 @@ void Cond::release()
void Cond::broadcast()
{
if (pthread_cond_broadcast(&m_cond) != 0)
throw LIMA_COM_EXC(Error, "Error broadcast condition");
int ret = pthread_cond_broadcast(&m_cond);
check_error(ret, "Error broadcast condition");
}
pid_t lima::GetThreadID() {
......@@ -241,7 +251,9 @@ Thread::ExceptionCleanUp::~ExceptionCleanUp()
m_thread.m_exception_handled = true;
}
Thread::Thread() : m_thread(NULL), m_started(false), m_finished(false), m_exception_handled(false), m_tid(0)
Thread::Thread()
: m_thread(0), m_started(false), m_finished(false),
m_exception_handled(false), m_tid(0)
{
pthread_attr_init(&m_thread_attr);
}
......@@ -259,8 +271,9 @@ void Thread::start()
throw LIMA_COM_EXC(Error, "Thread already started");
m_finished = false;
if (pthread_create(&m_thread, &m_thread_attr, staticThreadFunction, this) != 0)
throw LIMA_HW_EXC(Error, "Error creating thread");
int ret = pthread_create(&m_thread, &m_thread_attr,
staticThreadFunction, this);
check_error(ret, "Error creating thread");
m_started = true;
}
......@@ -418,7 +431,7 @@ void CmdThread::sendCmdIf(int cmd, bool (*if_test)(int,int))
void CmdThread::doSendCmd(int cmd)
{
if (m_status == Finished)
throw LIMA_HW_EXC(Error, "Thread has Finished");
throw LIMA_COM_EXC(Error, "Thread has Finished");
// Assume that we will have a call to waitStatus somewhere after the new command
m_status_history.reset();
......
......@@ -22,7 +22,7 @@
set(test_src test_membuffer test_ordered_map)
if (NOT WIN32)
list(APPEND test_src test_regex)
list(APPEND test_src test_regex test_mutex)
endif()
limatools_run_camera_tests("${test_src}" ${NAME})
#include "lima/ThreadUtils.h"
#include "lima/Debug.h"
#include "lima/Exceptions.h"
using namespace lima;
DEB_GLOBAL(DebModTest);
class TestThread : public Thread
{
DEB_CLASS(DebModTest, "TestThread");
public:
enum Type { Tester, Locker, Releaser };
TestThread(Type type) : m_type(type), m_started(false)
{
DEB_CONSTRUCTOR();
DEB_PARAM() << DEB_VAR1(type);
}
void start()
{
DEB_MEMBER_FUNCT();
Thread::start();
AutoMutex l(m_cond.mutex());
while (!m_started)
m_cond.wait();
DEB_TRACE() << "Main: started";
}
protected:
void threadFunction()
{
DEB_MEMBER_FUNCT();
CleanUp cleanup(*this);
if (m_type == Tester)
THROW_COM_ERROR(Error) << "Testing cleanup";
if (m_type == Locker) {
AutoMutex l(m_mutex);
l.leaveLocked();
DEB_TRACE() << "Leave locked";
} else {
AutoMutex l(m_mutex, AutoMutex::PrevLocked);
DEB_TRACE() << "Previously unlocked";
}
}
private:
class CleanUp : public ExceptionCleanUp
{
public:
using ExceptionCleanUp::ExceptionCleanUp;
virtual ~CleanUp() {
static_cast<TestThread&>(m_thread).signalStarted();
}
};
void signalStarted()
{
DEB_MEMBER_FUNCT();
AutoMutex l(m_cond.mutex());
m_started = true;
m_cond.signal();
DEB_TRACE() << "Thread: started";
}
static Mutex m_mutex;
Type m_type;
Cond m_cond;
bool m_started;
};
Mutex TestThread::m_mutex;
int main(int argc, char *argv[])
{
DEB_GLOBAL_FUNCT();
DebParams::setTypeFlags(DebParams::AllFlags);
DEB_TRACE() << "Starting test";
TestThread tester(TestThread::Tester);
tester.start();
tester.join();
TestThread locker(TestThread::Locker);
TestThread releaser(TestThread::Releaser);
locker.start();
releaser.start();
locker.join();
releaser.join();
return 0;
}
......@@ -21,7 +21,7 @@ requirements:
host:
- python {{ python }}
- sip 4.19.8 #For the SIP module
- processlib
- processlib 1.7*
- libconfig 1.7*
# I/O formats
- zlib
......
......@@ -19,7 +19,7 @@ requirements:
host:
- python {{ python }}
- sip 4.19.8 #For the SIP module
- processlib
- processlib 1.7*
- libconfig 1.7*
# I/O formats
- zlib
......
......@@ -314,8 +314,7 @@ public:
std::list<double>& incoming_speed) const;
void getRawStatistic(StatisticsType&) const;
void updateNbFrames(long nb_frames);
void cleanRemainingFrames(long last_acquired_frame_nr);
void updateNbFrames(long nb_acquired_frames);
void getParameters(CtSaving::Parameters&) const;
void clear();
......@@ -348,7 +347,8 @@ public:
protected:
virtual void* _open(const std::string& filename,
std::ios_base::openmode flags) = 0;
std::ios_base::openmode flags,
CtSaving::Parameters& pars) = 0;
virtual void _close(void*) = 0;
virtual long _writeFile(void*, Data& data,
CtSaving::HeaderMap& aHeader,
......@@ -359,23 +359,28 @@ public:
virtual void _setBuffer(int frameNumber, ZBufferList&& buffer);
virtual ZBufferList _takeBuffers(int dataId);
int m_written_frames;
Stream& m_stream;
mutable Mutex m_lock;
Stream& m_stream;
long m_frames_to_write;
long m_files_to_write;
long m_written_frames;
private:
void close(const Params2Handler::iterator& it, AutoMutex& l);
StatisticsType m_statistic;
int m_statistic_size;
bool m_log_stat_enable;
std::string m_log_stat_directory;
std::string m_log_stat_filename;
FILE* m_log_stat_file;
mutable Cond m_cond;
long m_nb_frames_to_write;
Frame2Params m_frame_params;
Params2Handler m_params_handler;
int m_max_writing_task; ///< number of maximum parallel write
int m_running_writing_task; ///< number of concurrent write running
Mutex m_lock;
Mutex m_buffers_lock;
dataId2ZBufferList m_buffers;
};
......@@ -501,13 +506,9 @@ public:
m_save_cnt->createStatistic(data);
}
void updateNbFrames(long last_acquired_frame_nr)
{
m_save_cnt->updateNbFrames(last_acquired_frame_nr);
}
void cleanRemainingFrames(long last_acquired_frame_nr)
void updateNbFrames(long nb_acquired_frames)
{
m_save_cnt->cleanRemainingFrames(last_acquired_frame_nr);
m_save_cnt->updateNbFrames(nb_acquired_frames);
}
private:
......
......@@ -30,6 +30,8 @@
namespace lima {
class SaveContainerEdf;
#ifdef WITH_Z_COMPRESSION
#include <zlib.h>
......@@ -37,14 +39,13 @@ namespace lima {
{
DEB_CLASS_NAMESPC(DebModControl,"File Z Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
z_stream_s m_compression_struct;
public:
FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileZCompression();
virtual void process(Data &aData);
private:
......@@ -83,13 +84,12 @@ static const LZ4F_preferences_t lz4_preferences = {
{
DEB_CLASS_NAMESPC(DebModControl,"File Lz4 Compression Task","Control");
CtSaving::SaveContainer& m_container;
int m_frame_per_file;
SaveContainerEdf& m_container;
CtSaving::HeaderMap m_header;
LZ4F_compressionContext_t m_ctx;
public:
FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header);
FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header);
~FileLz4Compression();
virtual void process(Data &aData);
void _compression(const char *src,size_t size,ZBufferList& return_buffers);
......
......@@ -72,7 +72,7 @@ namespace lima
long long frameNumber() const;
private:
Image(const CtVideo*,VideoImage*);
Image(const CtVideo*,VideoImage*,AutoMutex&);
const CtVideo* m_video;
VideoImage* m_image;
......
......@@ -499,8 +499,9 @@ void CtControl::prepareAcq()
m_images_buffer.clear();
m_images_saved.clear();
//Clear common header
//Clear saving: common & frame headers, ZBuffers and statistics
m_ct_saving->resetInternalCommonHeader();
m_ct_saving->clear();
DEB_TRACE() << "Apply hardware bin/roi";
m_ct_image->applyHard();
......@@ -1295,11 +1296,18 @@ bool CtControl::_checkOverrun(Data& aData, AutoMutex& l)
const ImageStatus &imageStatus = m_status.ImageCounters;
long imageToProcess = imageStatus.LastImageAcquired -
imageStatus.LastBaseImageReady;
// ext ops are not in-place, relaxing hw buffer limit is LastImageReady
// if no ext ops, full processing chain needs orig hw buffer
bool full_chain = !m_op_ext_link_task_active;
bool lastProcIsCounter = m_op_ext_sink_task_active && full_chain;
long lastProcessed = lastProcIsCounter ? imageStatus.LastCounterReady :
imageStatus.LastImageReady;
long imageToProcess = imageStatus.LastImageAcquired - lastProcessed;
long imageToSave = imageStatus.LastImageAcquired -
imageStatus.LastImageSaved;
long lastUsedForSave = full_chain ? imageStatus.LastImageSaved :
imageStatus.LastImageReady;
long imageToSave = imageStatus.LastImageAcquired - lastUsedForSave;
long nb_buffers;
m_ct_buffer->getNumber(nb_buffers);
......@@ -1321,7 +1329,7 @@ bool CtControl::_checkOverrun(Data& aData, AutoMutex& l)
m_ct_saving->getSaveCounters(first_to_save, last_to_save);
DEB_ERROR() << DEB_VAR2(first_to_save, last_to_save);
int frames_to_save = last_to_save - first_to_save + 1;
int frames_to_compress = imageStatus.LastBaseImageReady - last_to_save;
int frames_to_compress = imageStatus.LastImageReady - last_to_save;
bool slow_processing = frames_to_compress > frames_to_save;
DEB_ERROR() << DEB_VAR2(frames_to_compress, frames_to_save);
error_code = slow_processing ? ProcessingOverun : SaveOverun;
......
This diff is collapsed.
......@@ -434,7 +434,8 @@ SinkTaskBase* SaveContainerCbf::getCompressionTask(const CtSaving::HeaderMap &he
}
void* SaveContainerCbf::_open(const std::string &filename,
std::ios_base::openmode stdOpenflags)
std::ios_base::openmode stdOpenflags,
CtSaving::Parameters& /*pars*/)
{
DEB_MEMBER_FUNCT();
char openFlags[8];
......
......@@ -53,7 +53,8 @@ namespace lima {
protected:
virtual void* _open(const std::string &filename,
std::ios_base::openmode flags);
std::ios_base::openmode flags,
CtSaving::Parameters& pars);
virtual void _close(void*);
virtual long _writeFile(void*,Data &data,
CtSaving::HeaderMap &aHeader,
......
......@@ -30,9 +30,9 @@ using namespace lima;
#ifdef WITH_Z_COMPRESSION
const int FileZCompression::BUFFER_HELPER_SIZE = 64 * 1024;
FileZCompression::FileZCompression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileZCompression::FileZCompression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -65,9 +65,7 @@ void FileZCompression::process(Data &aData)
ZBufferList aBufferListPt;
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header, buffer);
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
_compression((char*)aData.data(),aData.size(),aBufferListPt);
......@@ -126,9 +124,9 @@ void FileZCompression::_end_compression(ZBufferList& return_buffers)
#ifdef WITH_LZ4_COMPRESSION
FileLz4Compression::FileLz4Compression(CtSaving::SaveContainer &save_cnt,
int framesPerFile,const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_frame_per_file(framesPerFile),m_header(header)
FileLz4Compression::FileLz4Compression(SaveContainerEdf &save_cnt,
const CtSaving::HeaderMap &header) :
m_container(save_cnt),m_header(header)
{
DEB_CONSTRUCTOR();
......@@ -148,9 +146,7 @@ void FileLz4Compression::process(Data &aData)
DEB_PARAM() << DEB_VAR1(aData);
std::ostringstream buffer;
SaveContainerEdf::_writeEdfHeader(aData,m_header,
m_frame_per_file,
buffer);
m_container._writeEdfHeader(aData,m_header,buffer);
ZBufferList aBufferListPt;
const std::string& tmpBuffer = buffer.str();
_compression(tmpBuffer.c_str(),tmpBuffer.size(),aBufferListPt);
......
......@@ -36,6 +36,34 @@ static const long int WRITE_BUFFER_SIZE = 64*1024;
using namespace lima;
#ifdef __unix
SaveContainerEdf::MmapInfo::~MmapInfo()
{
if(mmap_addr)
munmap(mmap_addr, header_size);
}
inline void SaveContainerEdf::MmapInfo::map(const std::string& fname,
long long header_position)
{
header_position -= header_size;
long sz = sysconf(_SC_PAGESIZE);
long long mapping_offset = header_position / sz * sz;
header_size += header_position - mapping_offset;
height_offset -= mapping_offset;
size_offset -= mapping_offset;
int fd = ::open(fname.c_str(),O_RDWR);
if(fd < 0)
throw LIMA_CTL_EXC(Error, "Error opening ") << fname << ": " << strerror(errno);
mmap_addr = mmap(NULL,header_size,
PROT_WRITE,MAP_SHARED,fd,mapping_offset);
int mmap_err = errno;
::close(fd);
if(!mmap_addr)
throw LIMA_CTL_EXC(Error, "Error mapping ") << fname << ": " << strerror(mmap_err);
}
#endif
#ifdef WIN32
/** @brief this is a small wrapper class for ofstream class.
* All this is for overcome performance issue with window std::ofstream
......@@ -114,6 +142,33 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
}
#endif
/** @brief saving container handle
*
* This class manage the low-level handle
*/
SaveContainerEdf::File::File(SaveContainerEdf& cont,
const std::string& filename,
std::ios_base::openmode openFlags)
: m_cont(cont), m_filename(filename)
#ifdef __unix
, m_height(0), m_size(0)
#endif
{
m_fout.exceptions(std::ios_base::failbit | std::ios_base::badbit);
m_fout.open(filename.c_str(),openFlags);
#ifdef __unix
m_buffer = m_cont.getNewBuffer();
m_fout.rdbuf()->pubsetbuf((char*)m_buffer,WRITE_BUFFER_SIZE);
#endif
}
SaveContainerEdf::File::~File()
{
#ifdef __unix
m_cont.releaseBuffer(m_buffer);
#endif
}
/** @brief saving container
*
* This class manage file saving
......@@ -121,69 +176,73 @@ SaveContainerEdf::_OfStream::operator<< (const long data)
SaveContainerEdf::SaveContainerEdf(CtSaving::Stream& stream,
CtSaving::FileFormat format) :
CtSaving::SaveContainer(stream),
m_format(format)
{
DEB_CONSTRUCTOR();
#ifdef __unix
if(posix_memalign(&m_fout_buffer,4*1024,WRITE_BUFFER_SIZE))
THROW_CTL_ERROR(Error) << "Can't allocated write buffer";
m_nb_buffers