Commit 2d296677 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Move to passive receiver

parent 759f6b65
......@@ -483,18 +483,12 @@ class SystemCPUAffinityMgr
};
struct RecvCPUAffinity {
CPUAffinityList listeners;
CPUAffinityList writers;
CPUAffinityList recv_threads;
RecvCPUAffinity();
CPUAffinity all() const;
RecvCPUAffinity& operator =(CPUAffinity a);
const CPUAffinityList& Listeners() const
{ return listeners; }
const CPUAffinityList& Writers() const
{ return writers; }
const CPUAffinityList& RecvThreads() const
{ return recv_threads; }
......@@ -504,15 +498,14 @@ struct RecvCPUAffinity {
inline CPUAffinity RecvCPUAffinity::all() const
{
return (CPUAffinityList_all(listeners) | CPUAffinityList_all(writers) |
CPUAffinityList_all(recv_threads));
return CPUAffinityList_all(recv_threads);
}
inline
bool operator ==(const RecvCPUAffinity& a, const RecvCPUAffinity& b)
{
return ((a.listeners == b.listeners) && (a.writers == b.writers));
return (a.recv_threads == b.recv_threads);
}
inline
......
......@@ -71,15 +71,11 @@ public:
int getNbDetSubModules()
{ return m_det->getNMods(); }
int getTotNbPorts()
{ return m_recv_list.size() * m_nb_recv_ports; }
int getNbRecvs()
{ return m_recv_list.size(); }
int getPortIndex(int recv_idx, int port)
{ return recv_idx * m_nb_recv_ports + port; }
std::pair<int, int> splitPortIndex(int port_idx)
{ return std::pair<int, int>(port_idx / m_nb_recv_ports,
port_idx % m_nb_recv_ports); }
Receiver* getRecv(int i)
{ return m_recv_list[i]; }
void setBufferCtrlObj(NumaSoftBufferCtrlObj *buffer_ctrl_obj)
{ m_buffer_ctrl_obj = buffer_ctrl_obj; }
......@@ -157,7 +153,7 @@ public:
void registerTimeRangesChangedCallback(TimeRangesChangedCallback& cb);
void unregisterTimeRangesChangedCallback(TimeRangesChangedCallback& cb);
void getStats(Stats& stats, int port_idx=-1);
void getStats(Stats& stats, int recv_idx=-1);
void setPixelDepthCPUAffinityMap(PixelDepthCPUAffinityMap aff_map);
void getPixelDepthCPUAffinityMap(PixelDepthCPUAffinityMap& aff_map);
......@@ -171,7 +167,6 @@ private:
typedef std::map<int, int> RecvPortMap;
typedef std::queue<int> FrameQueue;
typedef std::vector<AutoPtr<Receiver> > RecvList;
typedef std::vector<Receiver::Port *> RecvPortList;
struct AppInputData
{
......@@ -229,9 +224,6 @@ private:
void setModel(Model *model);
RecvPortList getRecvPortList();
Receiver::Port *getRecvPort(int port_idx);
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
......@@ -256,7 +248,7 @@ private:
FrameType getLastReceivedFrame();
void waitLastSkippedFrame();
void processLastSkippedFrame(int port_idx);
void processLastSkippedFrame(int recv_idx);
void getSortedBadFrameList(IntList first_idx, IntList last_idx,
IntList& bad_frame_list );
......@@ -292,9 +284,7 @@ private:
AutoPtr<AppInputData> m_input_data;
AutoPtr<slsDetectorUsers> m_det;
FrameMap m_frame_map;
int m_nb_recv_ports;
RecvList m_recv_list;
int m_recv_fifo_depth;
TrigMode m_trig_mode;
FrameType m_lima_nb_frames;
FrameType m_det_nb_frames;
......
......@@ -36,8 +36,7 @@ namespace SlsDetector
#define EIGER_PACKET_DATA_LEN (4 * 1024)
#define MaxEigerNbPorts 32
#define MaxEigerNbThreads 32
#define EigerNbRecvPorts 2
class Eiger : public Model
{
......@@ -50,8 +49,6 @@ class Eiger : public Model
typedef Defs::ClockDiv ClockDiv;
typedef std::vector<int> ThreadBalance;
enum ParallelMode {
NonParallel, Parallel, Safe,
};
......@@ -73,7 +70,7 @@ class Eiger : public Model
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Geometry",
"SlsDetector");
public:
typedef std::bitset<MaxEigerNbPorts> Mask;
typedef std::bitset<EigerNbRecvPorts> Mask;
class Recv
{
......@@ -83,9 +80,14 @@ class Eiger : public Model
class Port;
struct FrameData {
char *src[MaxEigerNbPorts];
typedef Receiver::ImageData RecvImageData;
char *src[EigerNbRecvPorts];
char *dst;
Mask valid;
FrameData();
FrameData(const RecvImageData& image, char *d);
};
class Port
......@@ -100,36 +102,27 @@ class Eiger : public Model
FrameDim getSrcFrameDim();
void copy(char *dst, char *src, int thread_idx);
void copy(char *dst, char *src);
private:
friend class Recv;
struct CalcData {
struct LocationData {
int len; // length
int cw; // chip width
int lw; // line width
};
struct ThreadData {
int src_offset;
int dst_offset;
int xfer_lines;
int port_blocks;
int first_port;
int src_len;
int off; // offset
};
Recv *m_recv;
int m_recv_idx;
int m_port;
bool m_top_half_recv;
bool m_port_idx;
bool m_raw;
CalcData m_src;
CalcData m_dst;
int m_pchips;
int m_raw_port_blocks;
ThreadData m_td[MaxEigerNbThreads];
LocationData m_src;
LocationData m_dst;
int m_port_blocks;
};
typedef std::vector<AutoPtr<Port> > PortList;
......@@ -138,32 +131,18 @@ class Eiger : public Model
int getNbPorts();
Port *getPort(int idx);
void setNbProcessingThreads(int nb_proc_threads)
{
if (nb_proc_threads != m_nb_proc_threads)
m_thread_bal.clear();
m_nb_proc_threads = nb_proc_threads;
}
void setThreadBalance(const ThreadBalance& thread_bal)
{ m_thread_bal = thread_bal; }
static
ThreadBalance getDefaultThreadBalance(int nb_threads);
void prepareAcq();
void processFrame(const FrameData& data, int thread_idx)
void processFrame(const FrameData& data)
{
if (m_pixel_depth_4)
expandPixelDepth4(data, thread_idx);
expandPixelDepth4(data);
else
copy(data, thread_idx);
copy(data);
}
void expandPixelDepth4(const FrameData& data,
int thread_idx);
void copy(const FrameData& data, int thread_idx);
void expandPixelDepth4(const FrameData& data);
void copy(const FrameData& data);
void fillBadFrame(FrameType frame, char *bptr);
......@@ -174,8 +153,6 @@ class Eiger : public Model
Geometry *m_eiger_geom;
int m_idx;
PortList m_port_list;
int m_nb_proc_threads;
ThreadBalance m_thread_bal;
bool m_pixel_depth_4;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
......@@ -300,104 +277,20 @@ class Eiger : public Model
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
typedef Geometry::Recv::FrameData FrameData;
class Port : public Model::Recv::Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Port",
"SlsDetector");
public:
struct Sync {
Recv *recv;
int port;
FrameType frame;
char *src;
char *dst;
bool waiting, stopped;
Sync(Recv *r, int p) : recv(r), port(p)
{}
void prepareAcq()
{
waiting = stopped = false;
frame = -1;
}
bool triggerProcess(FrameType f, char *s,
char *d)
{
AutoMutex l = recv->lockPort();
if (waiting)
return false;
frame = f;
src = s;
dst = d;
waiting = true;
recv->updatePortFrame(this);
return true;
}
void waitProcess()
{
AutoMutex l = recv->lockPort();
while (waiting && !stopped)
recv->waitPort();
}
void triggerProcessCleanUp()
{
AutoMutex l = recv->lockPort();
waiting = false;
}
void stopAcq()
{
AutoMutex l = recv->lockPort();
stopped = true;
recv->broadcastPort();
}
};
Port(Recv *recv, int port);
void prepareAcq();
void stopAcq();
virtual void processFrame(FrameType frame, char *dptr,
uint32_t dsize, char *bptr);
bool triggerProcess(FrameType frame, char *dptr,
uint32_t dsize, char *bptr)
{ return m_sync.triggerProcess(frame, dptr, bptr); }
void waitProcess()
{ m_sync.waitProcess(); }
void triggerProcessCleanUp()
{ m_sync.triggerProcessCleanUp(); }
private:
friend class Recv;
Recv *m_recv;
int m_port;
Sync m_sync;
Geometry::Recv::Port *m_geom;
};
typedef std::vector<AutoPtr<Port> > PortList;
typedef Receiver::ImageData RecvImageData;
Recv(Eiger *eiger, int idx);
virtual int getNbPorts();
virtual Port *getPort(int port_idx);
virtual ~Recv();
void prepareAcq();
void startAcq();
void stopAcq();
virtual void processFileStart(uint32_t dsize);
virtual int getNbProcessingThreads();
virtual void setNbProcessingThreads(int nb_proc_threads);
virtual pid_t getThreadID(int thread_idx)
{ return m_thread_list[thread_idx]->getThreadID(); }
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity);
void updateFrameMapItem(FrameMap::Item *item)
{ m_frame_map_item = item; }
......@@ -405,65 +298,89 @@ class Eiger : public Model
void processBadFrame(FrameType frame, char *bptr);
private:
friend class Port::Sync;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Thread",
"SlsDetector");
public:
enum State {
Init, Ready, Running, Stop, End,
};
Thread(Recv *recv, int idx);
virtual ~Thread();
void init(Recv *recv, int idx);
void setCPUAffinity(CPUAffinity aff);
void addNewFrame(const FrameData& data)
{
AutoMutex l = lockThread();
m_data = &data;
signalThread();
}
void prepareAcq();
void startAcq()
{ setState(Running); }
void stopAcq()
{ setState(Ready); }
protected:
virtual void start();
virtual void threadFunction();
private:
AutoMutex lockThread()
{ return AutoMutex(m_cond.mutex()); }
void waitThread()
{ m_cond.wait(); }
void signalThread()
{ m_cond.signal(); }
friend class Recv;
AutoMutex lock()
{ return m_recv->lock(); }
void wait()
{ m_recv->wait(); }
void broadcast()
{ m_recv->broadcast(); }
void setState(State state)
{
AutoMutex l = lock();
m_state = state;
broadcast();
}
Recv *m_recv;
int m_idx;
bool m_end;
Cond m_cond;
const FrameData *m_data;
State m_state;
NumaSoftBufferCtrlObj m_buffer;
RecvImageData m_image_data;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
AutoMutex lockPort()
{ return m_cond.mutex(); }
void broadcastPort()
{ m_cond.broadcast(); }
void waitPort()
AutoMutex lock()
{ return AutoMutex(m_cond.mutex()); }
void wait()
{ m_cond.wait(); }
void broadcast()
{ m_cond.broadcast(); }
void updatePortFrame(Port::Sync *sync);
void updateProcessingFrame();
bool allFramesAcquired()
{ return m_last_frame == m_nb_frames - 1; }
bool checkForRecvState(Thread& t)
{
while ((t.m_state == Thread::Ready) ||
((t.m_state == Thread::Running) &&
(m_busy || allFramesAcquired())))
wait();
return (t.m_state == Thread::Running);
}
void processOneFrame(Thread& t);
Eiger *m_eiger;
int m_idx;
Cond m_cond;
Geometry::Recv *m_geom;
FrameType m_frame;
FrameType m_candidate;
int m_nb_ready_threads;
Geometry::Recv::FrameData m_frame_data;
ThreadList m_thread_list;
PortList m_port_list;
Receiver *m_recv;
bool m_busy;
FrameType m_nb_frames;
FrameType m_last_frame;
SortedIntList m_in_process;
SortedIntList m_in_hold;
bool m_finishing;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
......
......@@ -24,6 +24,7 @@
#define __SLS_DETECTOR_MODEL_H
#include "SlsDetectorDefs.h"
#include "SlsDetectorCPUAffinity.h"
#include "lima/SizeUtils.h"
......@@ -50,28 +51,13 @@ class Model
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv", "SlsDetector");
public:
class Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv::Port",
"SlsDetector");
public:
virtual ~Port();
// TODO: add file finished callback
virtual void processFrame(FrameType frame, char *dptr,
uint32_t dsize, char *bptr) = 0;
};
virtual ~Recv();
virtual int getNbPorts() = 0;
virtual Port *getPort(int port_idx) = 0;
virtual void processFileStart(uint32_t dsize) = 0;
virtual int getNbProcessingThreads() = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual pid_t getThreadID(int thread_idx) = 0;
virtual void setCPUAffinity(const RecvCPUAffinity&
recv_affinity) = 0;
};
Model(Camera *cam, Type type);
......@@ -116,13 +102,13 @@ class Model
void putCmd(const std::string& s, int idx = -1);
std::string getCmd(const std::string& s, int idx = -1);
char *getFrameBufferPtr(FrameType frame_nb);
virtual bool checkSettings(Settings settings) = 0;
virtual int getNbRecvs() = 0;
virtual Recv *getRecv(int recv_idx) = 0;
int getNbRecvPorts();
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
......
......@@ -25,7 +25,6 @@
#include "SlsDetectorFrameMap.h"
#include "SlsDetectorModel.h"
#include "SlsDetectorCPUAffinity.h"
#include "slsReceiverUsers.h"
namespace lima
......@@ -43,78 +42,46 @@ class Receiver
public:
typedef slsReceiverDefs::sls_detector_header sls_detector_header;
typedef slsReceiverDefs::sls_receiver_header sls_receiver_header;
typedef slsReceiverDefs::thread_image_data thread_image_data;
typedef slsReceiverDefs::receiver_image_data receiver_image_data;
struct ImageData {
receiver_image_data recv_data;
FrameType frame;
};
Receiver(Camera *cam, int idx, int rx_port);
~Receiver();
void start();
void setModelRecv(Model::Recv *model_recv);
void prepareAcq();
void setCPUAffinity(const RecvCPUAffinity& recv_affinity);
bool getImage(ImageData& image_data);
SlsDetector::Stats& getStats()
{ return m_stats.stats; }
private:
friend class Camera;
class Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Receiver::Port",
"SlsDetector");
public:
struct Stats {
SlsDetector::Stats stats;
Timestamp last_t0;
Timestamp last_t1;
void reset()
{
stats.reset();
last_t0 = last_t1 = Timestamp();
}
};
Port(Receiver& recv, int port);
void prepareAcq();
Stats& getStats()
{ return m_stats; }
private:
friend class Receiver;
void portCallback(FrameType det_frame, char *dptr,
uint32_t dsize);
Camera *m_cam;
int m_port_idx;
Model::Recv::Port *m_model_port;
Stats m_stats;
struct Stats {
SlsDetector::Stats stats;
Timestamp last_t0;
Timestamp last_t1;
void reset()
{
stats.reset();
last_t0 = last_t1 = Timestamp();
}
};
typedef std::vector<AutoPtr<Port> > PortList;
static int fileStartCallback(char *fpath, char *fname,
FrameType fidx, uint32_t dsize,
void *priv);
static void portCallback(char *header, char *dptr, uint32_t dsize,
void *priv);
int fileStartCallback(char *fpath, char *fname, uint64_t fidx,
uint32_t dsize);
void getNodeMaskList(const CPUAffinityList& listener,
const CPUAffinityList& writer,
slsReceiverUsers::NodeMaskList& fifo_node_mask,
int& max_node);
Camera *m_cam;
int m_idx;