Commit 2a0fdb93 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Move Receiver to SlsDetectorReceiver and BufferThread to Receiver::Port

parent 56330aed
......@@ -31,10 +31,14 @@ set(SLS_DETECTOR_RECV ${SLS_DETECTOR_DIR}/slsReceiverSoftware)
add_subdirectory(${SLS_DETECTOR_DIR})
set(${NAME}_srcs src/SlsDetectorDefs.cpp src/SlsDetectorArgs.cpp
src/SlsDetectorCPUAffinity.cpp src/SlsDetectorModel.cpp
src/SlsDetectorCamera.cpp src/SlsDetectorEiger.cpp
src/SlsDetectorInterface.cpp)
set(${NAME}_srcs src/SlsDetectorDefs.cpp
src/SlsDetectorArgs.cpp
src/SlsDetectorCPUAffinity.cpp
src/SlsDetectorModel.cpp
src/SlsDetectorReceiver.cpp
src/SlsDetectorCamera.cpp
src/SlsDetectorEiger.cpp
src/SlsDetectorInterface.cpp)
add_library(lima${NAME} SHARED ${${NAME}_srcs})
......
......@@ -24,11 +24,10 @@
#define __SLS_DETECTOR_CAMERA_H
#include "SlsDetectorArgs.h"
#include "SlsDetectorModel.h"
#include "SlsDetectorReceiver.h"
#include "SlsDetectorCPUAffinity.h"
#include "multiSlsDetector.h"
#include "slsReceiverUsers.h"
#include "lima/HwBufferMgr.h"
#include "lima/HwMaxImageSizeCallback.h"
......@@ -74,10 +73,14 @@ public:
{ return m_det->getNMods(); }
int getTotNbPorts()
{ return m_recv_list.size() * m_recv_ports; }
{ return m_recv_list.size() * m_recv_nb_ports; }
int getPortIndex(int recv_idx, int port)
{ return recv_idx * m_recv_ports + port; }
{ return recv_idx * m_recv_nb_ports + port; }
std::pair<int, int> splitPortIndex(int port_idx)
{ return std::pair<int, int>(port_idx / m_recv_nb_ports,
port_idx % m_recv_nb_ports);}
void setBufferCbMgr(StdBufferCbMgr *buffer_cb_mgr)
{ m_buffer_cb_mgr = buffer_cb_mgr; }
......@@ -172,11 +175,9 @@ public:
private:
typedef std::map<int, int> RecvPortMap;
typedef std::queue<int> FrameQueue;
typedef FrameMap::Item::FinishInfo FinishInfo;
typedef FrameMap::Item::FinishInfoList FinishInfoList;
typedef std::vector<AutoPtr<Receiver> > RecvList;
typedef std::vector<Receiver::Port *> RecvPortList;
struct AppInputData
{
......@@ -190,104 +191,6 @@ private:
void parseConfigFile();
};
class Receiver
{
DEB_CLASS_NAMESPC(DebModCamera, "Camera::Receiver",
"SlsDetector");
public:
Receiver(Camera *cam, int idx, int rx_port);
~Receiver();
void start();
private:
friend class Camera;
static int fileStartCallback(char *fpath, char *fname,
FrameType fidx, uint32_t dsize,
void *priv);
static void portCallback(FrameType frame,
uint32_t exp_len,
uint32_t recv_packets,
uint64_t bunch_id,
uint64_t timestamp,
uint16_t mod_id,
uint16_t x, uint16_t y, uint16_t z,
uint32_t debug,
uint16_t rr_nb,
uint8_t det_type,
uint8_t cb_version,
char *dptr,
uint32_t dsize,
void *priv);
int fileStartCallback(char *fpath, char *fname, uint64_t fidx,
uint32_t dsize);
void portCallback(FrameType frame, int port, char *dptr,
uint32_t dsize);
Camera *m_cam;
int m_idx;
int m_rx_port;
Args m_args;
AutoPtr<slsReceiverUsers> m_recv;
};
typedef std::vector<AutoPtr<Receiver> > RecvList;
class BufferThread : public Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Camera::BufferThread",
"SlsDetector");
public:
typedef std::vector<FinishInfo> FinishInfoArray;
BufferThread();
~BufferThread();
void init(Camera *cam, int port_idx);
void prepareAcq();
pid_t getTID()
{ return m_tid; }
bool isBadFrame(FrameType frame);
int getNbBadFrames()
{
AutoMutex l = lock();
return m_bad_frame_list.size();
}
void getBadFrameList(int first_idx, int last_idx, IntList& bfl)
{
AutoMutex l = lock();
IntList::const_iterator b = m_bad_frame_list.begin();
bfl.assign(b + first_idx, b + last_idx);
}
protected:
virtual void start();
virtual void threadFunction();
private:
friend class Camera;
AutoMutex lock()
{ return m_cond.mutex(); }
void processFinishInfo(const FinishInfo& finfo);
Camera *m_cam;
int m_port_idx;
FrameMap::Item *m_frame_map_item;
pid_t m_tid;
bool m_end;
Cond m_cond;
IntList m_bad_frame_list;
};
class AcqThread : public Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Camera::AcqThread",
......@@ -324,22 +227,15 @@ private:
FrameQueue m_frame_queue;
};
struct PortStats {
Stats stats;
Timestamp last_t0;
Timestamp last_t1;
void reset()
{
stats.reset();
last_t0 = last_t1 = Timestamp();
}
};
friend class Model;
friend class Receiver;
friend class GlobalCPUAffinityMgr;
void setModel(Model *model);
RecvPortList getRecvPortList();
Receiver::Port *getRecvPort(int port_idx);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
......@@ -357,10 +253,6 @@ private:
void removeSharedMem();
void createReceivers();
void processRecvFileStart(int recv_idx, uint32_t dsize);
void processRecvPort(int port_idx, FrameType frame, char *dptr,
uint32_t dsize);
bool checkLostPackets();
FrameType getLastReceivedFrame();
......@@ -393,8 +285,10 @@ private:
Model *m_model;
Cond m_cond;
AutoPtr<AppInputData> m_input_data;
RecvList m_recv_list;
AutoPtr<multiSlsDetector> m_det;
FrameMap m_frame_map;
int m_recv_nb_ports;
RecvList m_recv_list;
int m_recv_fifo_depth;
TrigMode m_trig_mode;
FrameType m_nb_frames;
......@@ -402,20 +296,16 @@ private:
double m_lat_time;
double m_frame_period;
Settings m_settings;
FrameMap m_frame_map;
int m_recv_ports;
StdBufferCbMgr *m_buffer_cb_mgr;
PixelDepth m_pixel_depth;
ImageType m_image_type;
bool m_raw_mode;
AutoPtr<BufferThread, true> m_buffer_thread;
AutoPtr<AcqThread> m_acq_thread;
State m_state;
double m_new_frame_timeout;
double m_abort_sleep_time;
bool m_tol_lost_packets;
FrameArray m_prev_ifa;
std::vector<PortStats> m_port_stats;
TimeRangesChangedCallback *m_time_ranges_cb;
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
......
......@@ -34,6 +34,7 @@ namespace SlsDetector
{
class Camera;
class Receiver;
class Model
{
......@@ -88,6 +89,8 @@ class Model
private:
friend class Camera;
friend class Receiver;
Camera *m_cam;
Type m_type;
};
......
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2011
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
#ifndef __SLS_DETECTOR_RECEIVER_H
#define __SLS_DETECTOR_RECEIVER_H
#include "SlsDetectorModel.h"
#include "slsReceiverUsers.h"
namespace lima
{
namespace SlsDetector
{
class Camera;
class Receiver
{
DEB_CLASS_NAMESPC(DebModCamera, "Receiver", "SlsDetector");
public:
Receiver(Camera *cam, int idx, int rx_port);
~Receiver();
void start();
void setNbPorts(int nb_ports);
void prepareAcq();
private:
friend class Camera;
class Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Receiver::Port",
"SlsDetector");
public:
struct Stats {
SlsDetector::Stats stats;
Timestamp last_t0;
Timestamp last_t1;
void reset()
{
stats.reset();
last_t0 = last_t1 = Timestamp();
}
};
Port(Receiver& recv, int port);
pid_t getTID()
{ return m_thread.getTID(); }
void prepareAcq();
void processFileStart(uint32_t dsize);
void processFrame(FrameType frame, char *dptr, uint32_t dsize);
bool isBadFrame(FrameType frame);
int getNbBadFrames()
{
AutoMutex l = lock();
return m_bad_frame_list.size();
}
void getBadFrameList(int first_idx, int last_idx, IntList& bfl)
{
AutoMutex l = lock();
IntList::const_iterator b = m_bad_frame_list.begin();
bfl.assign(b + first_idx, b + last_idx);
}
Stats& getStats()
{ return m_stats; }
private:
friend class Receiver;
typedef FrameMap::Item::FinishInfo FinishInfo;
typedef FrameMap::Item::FinishInfoList FinishInfoList;
typedef std::vector<FinishInfo> FinishInfoArray;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera,
"Receiver::Port::Thread",
"SlsDetector");
public:
Thread(Port& port);
virtual ~Thread();
pid_t getTID()
{ return m_tid; }
virtual void start();
protected:
virtual void threadFunction();
private:
Port& m_port;
pid_t m_tid;
volatile bool m_end;
};
AutoMutex lock()
{ return m_mutex; }
void pollFrameFinished();
void stopPollFrameFinished();
void processFinishInfo(const FinishInfo& finfo);
Camera *m_cam;
Model *m_model;
int m_port_idx;
Mutex m_mutex;
FrameMap::Item *m_frame_map_item;
IntList m_bad_frame_list;
Thread m_thread;
Stats m_stats;
};
typedef std::vector<AutoPtr<Port> > PortList;
static int fileStartCallback(char *fpath, char *fname,
FrameType fidx, uint32_t dsize,
void *priv);
static void portCallback(FrameType frame,
uint32_t exp_len,
uint32_t recv_packets,
uint64_t bunch_id,
uint64_t timestamp,
uint16_t mod_id,
uint16_t x, uint16_t y, uint16_t z,
uint32_t debug,
uint16_t rr_nb,
uint8_t det_type,
uint8_t cb_version,
char *dptr,
uint32_t dsize,
void *priv);
int fileStartCallback(char *fpath, char *fname, uint64_t fidx,
uint32_t dsize);
void portCallback(FrameType frame, int port, char *dptr,
uint32_t dsize);
Camera *m_cam;
int m_idx;
int m_rx_port;
Args m_args;
AutoPtr<slsReceiverUsers> m_recv;
PortList m_port_list;
};
} // namespace SlsDetector
} // namespace lima
#endif // __SLS_DETECTOR_RECEIVER_H
......@@ -84,238 +84,6 @@ void Camera::AppInputData::parseConfigFile()
}
}
Camera::Receiver::Receiver(Camera *cam, int idx, int rx_port)
: m_cam(cam), m_idx(idx), m_rx_port(rx_port)
{
DEB_CONSTRUCTOR();
ostringstream os;
os << "slsReceiver"
<< " --rx_tcpport " << m_rx_port;
m_args.set(os.str());
start();
m_recv->registerCallBackStartAcquisition(fileStartCallback, this);
m_recv->registerCallBackRawDataReady(portCallback, this);
}
Camera::Receiver::~Receiver()
{
DEB_DESTRUCTOR();
m_recv->stop();
}
void Camera::Receiver::start()
{
DEB_MEMBER_FUNCT();
int init_ret;
m_recv = new slsReceiverUsers(m_args.size(), m_args, init_ret);
if (init_ret == slsReceiverDefs::FAIL)
THROW_HW_ERROR(Error) << "Error creating slsReceiver";
if (m_recv->start() == slsReceiverDefs::FAIL)
THROW_HW_ERROR(Error) << "Error starting slsReceiver";
}
int Camera::Receiver::fileStartCallback(char *fpath, char *fname,
uint64_t fidx, uint32_t dsize,
void *priv)
{
DEB_STATIC_FUNCT();
Receiver *recv = static_cast<Receiver *>(priv);
return recv->fileStartCallback(fpath, fname, fidx, dsize);
}
void Camera::Receiver::portCallback(FrameType frame,
uint32_t exp_len,
uint32_t recv_packets,
uint64_t bunch_id,
uint64_t timestamp,
uint16_t mod_id,
uint16_t x, uint16_t y, uint16_t z,
uint32_t debug,
uint16_t rr_nb,
uint8_t det_type,
uint8_t cb_version,
char *dptr,
uint32_t dsize,
void *priv)
{
DEB_STATIC_FUNCT();
Receiver *recv = static_cast<Receiver *>(priv);
int port = (x % 2);
FrameType lima_frame = frame - 1;
DEB_PARAM() << DEB_VAR2(frame, lima_frame);
recv->portCallback(lima_frame, port, dptr, dsize);
}
int Camera::Receiver::fileStartCallback(char *fpath, char *fname,
uint64_t fidx, uint32_t dsize)
{
DEB_MEMBER_FUNCT();
if (m_cam->m_model)
m_cam->processRecvFileStart(m_idx, dsize);
return 0;
}
void Camera::Receiver::portCallback(FrameType frame, int port, char *dptr,
uint32_t dsize)
{
DEB_MEMBER_FUNCT();
if (!m_cam->m_model || (m_cam->getState() == Stopping))
return;
Timestamp t0 = Timestamp::now();
int port_idx = m_cam->getPortIndex(m_idx, port);
PortStats& port_stats = m_cam->m_port_stats[port_idx];
if (port_stats.last_t0.isSet())
port_stats.stats.cb_period.add(t0 - port_stats.last_t0);
if (port_stats.last_t1.isSet())
port_stats.stats.recv_exec.add(t0 - port_stats.last_t1);
port_stats.last_t0 = t0;
try {
if (frame >= m_cam->m_nb_frames)
THROW_HW_ERROR(Error) << "Invalid "
<< DEB_VAR2(frame, DebHex(frame));
m_cam->processRecvPort(port_idx, frame, dptr, dsize);
} catch (Exception& e) {
ostringstream err_msg;
err_msg << "Receiver::frameCallback: " << e << ": "
<< DEB_VAR3(m_idx, frame, port);
Event::Code err_code = Event::CamOverrun;
Event *event = new Event(Hardware, Event::Error, Event::Camera,
err_code, err_msg.str());
DEB_EVENT(*event) << DEB_VAR1(*event);
m_cam->reportEvent(event);
}
Timestamp t1 = Timestamp::now();
port_stats.stats.cb_exec.add(t1 - t0);
port_stats.last_t1 = t1;
}
Camera::BufferThread::BufferThread()
: m_cam(NULL), m_port_idx(-1), m_frame_map_item(NULL)
{
DEB_CONSTRUCTOR();
}
void Camera::BufferThread::init(Camera *cam, int port_idx)
{
DEB_MEMBER_FUNCT();
m_cam = cam;
m_port_idx = port_idx;
m_frame_map_item = &m_cam->m_frame_map.getItem(m_port_idx);
start();
}
void Camera::BufferThread::prepareAcq()
{
DEB_MEMBER_FUNCT();
m_bad_frame_list.clear();
m_bad_frame_list.reserve(16 * 1024);
}
Camera::BufferThread::~BufferThread()
{
DEB_DESTRUCTOR();
if (!hasStarted())
return;
{
AutoMutex l = lock();
m_end = true;
m_cond.broadcast();
}
m_frame_map_item->stopPollFrameFinished();
}
void Camera::BufferThread::start()
{
DEB_MEMBER_FUNCT();
m_end = true;
Thread::start();
struct sched_param param;
param.sched_priority = 50;
int ret = pthread_setschedparam(m_thread, SCHED_RR, &param);
if (ret != 0)
DEB_ERROR() << "Could not set real-time priority!!";
AutoMutex l = lock();
while (m_end)
m_cond.wait();
}
void Camera::BufferThread::threadFunction()
{
DEB_MEMBER_FUNCT();
m_tid = gettid();
AutoMutex l = lock();
m_end = false;
m_cond.broadcast();