Commit be3c1fe5 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

CtSaving: refactoring:

* Better identification of critical sections
* Do not mark frame as saved until container is closed
* Include saving_mode when reporting saving errors
* Remove frame from m_frame_datas when scheduling compression in AutoHeader
* Improve container opening and closing
parent 4c85e405
......@@ -362,6 +362,9 @@ public:
int m_written_frames;
Stream& m_stream;
private:
void close(const Params2Handler::iterator& it, AutoMutex& l);
StatisticsType m_statistic;
int m_statistic_size;
bool m_log_stat_enable;
......
......@@ -1397,8 +1397,7 @@ void CtSaving::_validateFrameHeader(long frame_nr,
bool keep_header = m_need_compression;
_takeHeader(aHeaderIter, task_header, keep_header);
if (!m_need_compression)
m_frame_datas.erase(frame_iter);
m_frame_datas.erase(frame_iter);
aLock.unlock();
......@@ -1921,9 +1920,8 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream)
//@todo check if the frame is still available
if (m_end_cbk) {
aLock.unlock();
AutoMutexUnlock u(aLock);
m_end_cbk->finished(aData);
aLock.lock();
}
SavingMode saving_mode = getAcqSavingMode();
......@@ -1935,31 +1933,32 @@ void CtSaving::_saveFinished(Data& aData, Stream& stream)
return;
}
for (FrameMap::iterator nextDataIter = m_frame_datas.begin();
nextDataIter != m_frame_datas.end(); ++nextDataIter)
FrameHeaderMap::iterator aHeaderIter;
FrameMap::iterator nextDataIter, end = m_frame_datas.end();
for (nextDataIter = m_frame_datas.begin(); nextDataIter != end; ++nextDataIter)
{
frame_nr = nextDataIter->first;
FrameHeaderMap::iterator aHeaderIter = m_frame_headers.find(frame_nr);
aHeaderIter = m_frame_headers.find(frame_nr);
bool header_available = (aHeaderIter != m_frame_headers.end());
bool can_save = _allStreamReady(frame_nr);
if (!can_save ||
((saving_mode == AutoHeader) && !header_available))
continue;
if (can_save && ((saving_mode == AutoFrame) || header_available))
break;
}
if (nextDataIter == end)
return;
Data aNewData = nextDataIter->second;
m_frame_datas.erase(nextDataIter);
Data aNewData = nextDataIter->second;
m_frame_datas.erase(nextDataIter);
HeaderMap task_header;
_takeHeader(aHeaderIter, task_header, false);
HeaderMap task_header;
_takeHeader(aHeaderIter, task_header, false);
aLock.unlock();
aLock.unlock();
TaskList task_list;
_getTaskList(Save, frame_nr, task_header, task_list);
TaskList task_list;
_getTaskList(Save, frame_nr, task_header, task_list);
_postTaskList(aNewData, task_list, SAVING_PRIORITY);
break;
}
_postTaskList(aNewData, task_list, SAVING_PRIORITY);
}
/** @brief this methode set the error saving status in CtControl
......@@ -1972,17 +1971,15 @@ void CtSaving::_setSavingError(CtControl::ErrorCode anErrorCode)
if (saving_mode == Manual)
return;
AutoMutex aLock(m_ctrl.m_cond.mutex());
if (m_ctrl.m_status.AcquisitionStatus != AcqFault)
{
m_ctrl.m_status.AcquisitionStatus = AcqFault;
m_ctrl.m_status.Error = anErrorCode;
DEB_ERROR() << DEB_VAR1(m_ctrl.m_status);
AutoMutex aLock(m_ctrl.m_cond.mutex());
if (m_ctrl.m_status.AcquisitionStatus != AcqFault) {
m_ctrl.m_status.AcquisitionStatus = AcqFault;
m_ctrl.m_status.Error = anErrorCode;
DEB_ERROR() << DEB_VAR2(m_ctrl.m_status, saving_mode);
}
}
aLock.unlock();
m_ctrl.stopAcq();
}
/** @brief preparing new acquisition
......@@ -2140,11 +2137,11 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
Frame2Params::iterator fpars = m_frame_params.find(frameId);
if (fpars == m_frame_params.end())
THROW_CTL_ERROR(Error) << "Can't find saving parameters for frame"
<< DEB_VAR1(frameId);
<< DEB_VAR1(frameId);
lock.unlock();
FrameParameters& frame_par = fpars->second;
const CtSaving::Parameters pars = frame_par.m_pars;
lock.unlock();
const CtSaving::Parameters& pars = frame_par.m_pars;
long write_size;
Params2Handler::value_type par_handler = open(frame_par);
......@@ -2208,13 +2205,10 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
}
lock.lock();
++m_written_frames;
bool acq_end = (m_written_frames == m_nb_frames_to_write);
m_frame_params.erase(frameId);
--m_running_writing_task;
bool acq_end = (++m_written_frames == m_nb_frames_to_write);
lock.unlock();
// close before marking that we have finished the frame
if (pars.overwritePolicy != MultiSet || acq_end) // Close at the end
{
try {
......@@ -2226,6 +2220,11 @@ void CtSaving::SaveContainer::writeFile(Data& aData, HeaderMap& aHeader)
}
}
lock.lock();
m_frame_params.erase(fpars);
--m_running_writing_task;
lock.unlock();
Timestamp end_write = Timestamp::now();
Timestamp diff = end_write - start_write;
......@@ -2377,10 +2376,11 @@ void CtSaving::SaveContainer::getStatistic(std::list<double>& writing_speed,
{
AutoMutex aLock = AutoMutex(m_cond.mutex());
StatisticsType copy = m_statistic;
aLock.unlock();
StatisticsType copy;
{
AutoMutex aLock = AutoMutex(m_cond.mutex());
copy = m_statistic;
}
StatisticsType::const_iterator next = copy.begin();
if (next != copy.end())
......@@ -2539,9 +2539,9 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
if (frame_nr < 0)
{
ready = m_frame_params.empty();
for (Frame2Params::const_iterator i = m_frame_params.begin();
ready && i != m_frame_params.end(); ++i)
ready = !i->second.m_running;
for (Frame2Params::const_iterator it = m_frame_params.begin();
ready && it != m_frame_params.end(); ++it)
ready = !it->second.m_running;
}
else
{
......@@ -2549,7 +2549,7 @@ bool CtSaving::SaveContainer::isReady(long frame_nr) const
if (it == m_frame_params.end())
{
// if no task is running then ready
ready = m_frame_params.empty();
ready = m_frame_params.empty();
}
else if (it->second.m_threadable)
{
......@@ -2657,101 +2657,116 @@ void CtSaving::SaveContainer::setMaxConcurrentWritingTask(int nb_thread)
<< DEB_VAR1(nb_thread);
m_max_writing_task = nb_thread;
}
CtSaving::SaveContainer::Params2Handler::value_type
CtSaving::SaveContainer::open(FrameParameters& fpars)
{
DEB_MEMBER_FUNCT();
AutoMutex lock(m_cond.mutex());
CtSaving::Parameters& pars = fpars.m_pars;
Params2Handler::iterator handler = m_params_handler.find(pars);
if (handler != m_params_handler.end())
return Params2Handler::value_type(handler->first, handler->second);
else
{
lock.unlock();
std::string aFileName = pars.directory + DIR_SEPARATOR + pars.prefix;
long index = pars.nextNumber;
char idx[64];
if (index < 0) index = 0;
snprintf(idx, sizeof(idx), pars.indexFormat.c_str(), index);
aFileName += idx;
aFileName += pars.suffix;
AutoMutex lock(m_cond.mutex());
Params2Handler::iterator handler = m_params_handler.find(pars);
if (handler != m_params_handler.end())
return *handler;
}
DEB_TRACE() << DEB_VAR1(aFileName);
std::string aFileName = pars.directory + DIR_SEPARATOR + pars.prefix;
long index = pars.nextNumber;
char idx[64];
if (index < 0) index = 0;
snprintf(idx, sizeof(idx), pars.indexFormat.c_str(), index);
aFileName += idx;
aFileName += pars.suffix;
if (pars.overwritePolicy == Abort &&
!access(aFileName.c_str(), R_OK))
{
m_stream.setSavingError(CtControl::SaveOverwriteError);
std::string output;
output = "Try to over write file: " + aFileName;
THROW_CTL_ERROR(Error) << output;
}
std::ios_base::openmode openFlags = std::ios_base::out | std::ios_base::binary;
if (pars.overwritePolicy == Append ||
pars.overwritePolicy == MultiSet)
openFlags |= std::ios_base::app;
else if (pars.overwritePolicy == Overwrite)
openFlags |= std::ios_base::trunc;
std::string error_desc;
Handler handler;
for (int nbTry = 0; nbTry < 5; ++nbTry)
{
try {
handler.m_handler = _open(aFileName, openFlags);
}
catch (std::ios_base::failure & error) {
error_desc = error.what();
DEB_WARNING() << "Could not open " << aFileName << ": "
<< error_desc;
}
catch (...) {
error_desc = "Unknown error";
DEB_WARNING() << "Could not open " << aFileName << ": "
<< error_desc;
}
DEB_TRACE() << DEB_VAR1(aFileName);
if (!handler.m_handler)
{
std::string output;
if (pars.overwritePolicy == Abort &&
!access(aFileName.c_str(), R_OK))
{
m_stream.setSavingError(CtControl::SaveOverwriteError);
std::string output;
output = "Try to over write file: " + aFileName;
THROW_CTL_ERROR(Error) << output;
}
std::ios_base::openmode openFlags = std::ios_base::out | std::ios_base::binary;
if (pars.overwritePolicy == Append ||
pars.overwritePolicy == MultiSet)
openFlags |= std::ios_base::app;
else if (pars.overwritePolicy == Overwrite)
openFlags |= std::ios_base::trunc;
if (access(pars.directory.c_str(), W_OK))
{
m_stream.setSavingError(CtControl::SaveAccessError);
output = "Can not write in directory: " + pars.directory;
THROW_CTL_ERROR(Error) << output;
}
}
else
{
DEB_TRACE() << "Open file: " << aFileName;
handler.m_nb_frames = pars.framesPerFile;
lock.lock();
Params2Handler::value_type map_pair(pars, handler);
std::pair<Params2Handler::iterator, bool> result =
m_params_handler.insert(map_pair);
return Params2Handler::value_type(result.first->first,
result.first->second);
}
std::string error_desc;
Handler handler;
for (int nbTry = 0; !handler.m_handler && (nbTry < 5); ++nbTry)
{
try {
handler.m_handler = _open(aFileName, openFlags);
}
catch (std::ios_base::failure & error) {
error_desc = error.what();
DEB_WARNING() << "Could not open " << aFileName << ": "
<< error_desc;
}
catch (...) {
error_desc = "Unknown error";
DEB_WARNING() << "Could not open " << aFileName << ": "
<< error_desc;
}
if (!handler.m_handler)
if (!handler.m_handler && access(pars.directory.c_str(), W_OK))
{
m_stream.setSavingError(CtControl::SaveOpenError);
std::string output;
output = "Failure opening " + aFileName;
if (!error_desc.empty())
output += ": " + error_desc;
m_stream.setSavingError(CtControl::SaveAccessError);
std::string output = "Can not write in directory: " + pars.directory;
THROW_CTL_ERROR(Error) << output;
}
}
// we can't reach this line (normally) just for compiler
return Params2Handler::value_type(CtSaving::Parameters(), Handler());
if (!handler.m_handler)
{
m_stream.setSavingError(CtControl::SaveOpenError);
std::string output;
output = "Failure opening " + aFileName;
if (!error_desc.empty())
output += ": " + error_desc;
THROW_CTL_ERROR(Error) << output;
}
DEB_TRACE() << "Open file: " << aFileName;
handler.m_nb_frames = pars.framesPerFile;
Params2Handler::value_type map_pair(pars, handler);
bool ok;
{
AutoMutex lock(m_cond.mutex());
ok = m_params_handler.insert(map_pair).second;
}
if (!ok) {
_close(handler.m_handler);
THROW_CTL_ERROR(Error) << "Error inserting handle";
}
return map_pair;
}
inline void CtSaving::SaveContainer::close(const Params2Handler::iterator& it, AutoMutex& l)
{
void* raw_handler = it->second.m_handler;
if (raw_handler == NULL)
return;
it->second.m_handler = NULL;
{
AutoMutexUnlock u(l);
_close(raw_handler);
}
Parameters& pars = m_stream.getParameters(Acq);
if ((pars.overwritePolicy != MultiSet) &&
(pars.overwritePolicy != Append)) {
int nextNumber = it->first.nextNumber + 1;
if (pars.nextNumber < nextNumber)
pars.nextNumber = nextNumber;
}
}
void CtSaving::SaveContainer::close(const CtSaving::Parameters* params,
......@@ -2762,34 +2777,20 @@ void CtSaving::SaveContainer::close(const CtSaving::Parameters* params,
AutoMutex aLock(m_cond.mutex());
if (!params) // close all
{
for (Params2Handler::iterator i = m_params_handler.begin();
i != m_params_handler.end(); ++i)
{
if (i->second.m_handler)
_close(i->second.m_handler);
}
for (Params2Handler::iterator it = m_params_handler.begin();
it != m_params_handler.end(); ++it)
close(it, aLock);
m_params_handler.clear();
}
else
{
Params2Handler::iterator handler = m_params_handler.find(*params);
if (force_close || !--handler->second.m_nb_frames)
{
void* raw_handler = handler->second.m_handler;
const Parameters frame_pars = handler->first;
m_params_handler.erase(handler);
aLock.unlock();
_close(raw_handler);
Parameters& pars = m_stream.getParameters(Acq);
if (pars.overwritePolicy != MultiSet &&
pars.overwritePolicy != Append)
{
int nextNumber = frame_pars.nextNumber + 1;
aLock.lock();
if (pars.nextNumber < nextNumber)
pars.nextNumber = nextNumber;
}
Params2Handler::iterator it = m_params_handler.find(*params);
if (it == m_params_handler.end())
THROW_CTL_ERROR(Error) << "Could not find handle for "
<< DEB_VAR1(params);
if (force_close || !--it->second.m_nb_frames) {
close(it, aLock);
m_params_handler.erase(it);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment