Commit 1cee9710 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Move Port Threads to the Receiver level

parent e9b98a7e
......@@ -485,7 +485,7 @@ class SystemCPUAffinityMgr
struct RecvCPUAffinity {
CPUAffinityList listeners;
CPUAffinityList writers;
CPUAffinityList port_threads;
CPUAffinityList recv_threads;
RecvCPUAffinity();
CPUAffinity all() const;
......@@ -495,8 +495,8 @@ struct RecvCPUAffinity {
{ return listeners; }
const CPUAffinityList& Writers() const
{ return writers; }
const CPUAffinityList& PortThreads() const
{ return port_threads; }
const CPUAffinityList& RecvThreads() const
{ return recv_threads; }
typedef const CPUAffinityList& (RecvCPUAffinity::*Selector)() const;
};
......@@ -505,7 +505,7 @@ struct RecvCPUAffinity {
inline CPUAffinity RecvCPUAffinity::all() const
{
return (CPUAffinityList_all(listeners) | CPUAffinityList_all(writers) |
CPUAffinityList_all(port_threads));
CPUAffinityList_all(recv_threads));
}
......
......@@ -143,10 +143,10 @@ public:
void setTolerateLostPackets(bool tol_lost_packets);
void getTolerateLostPackets(bool& tol_lost_packets);
int getNbBadFrames(int port_idx);
void getBadFrameList(int port_idx, int first_idx, int last_idx,
int getNbBadFrames(int recv_idx);
void getBadFrameList(int recv_idx, int first_idx, int last_idx,
IntList& bad_frame_list);
void getBadFrameList(int port_idx, IntList& bad_frame_list);
void getBadFrameList(int recv_idx, IntList& bad_frame_list);
void prepareAcq();
void startAcq();
......@@ -308,7 +308,7 @@ private:
double m_new_frame_timeout;
double m_abort_sleep_time;
bool m_tol_lost_packets;
FrameArray m_prev_ifa;
FrameArray m_prev_gfa;
TimeRangesChangedCallback *m_time_ranges_cb;
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
......
......@@ -65,7 +65,7 @@ class Eiger : public Model
Eiger(Camera *cam);
~Eiger();
virtual void getFrameDim(FrameDim& frame_dim, bool raw = false);
virtual std::string getName();
......@@ -114,8 +114,8 @@ class Eiger : public Model
virtual bool checkSettings(Settings settings);
virtual int getNbRecvPorts();
virtual Model::RecvPort *getRecvPort(int port_idx);
virtual int getNbRecvs();
virtual Model::Recv *getRecv(int recv_idx);
virtual void prepareAcq();
......@@ -123,54 +123,86 @@ class Eiger : public Model
friend class Correction;
friend class CorrBase;
class RecvPort : public Model::RecvPort
typedef FrameMap::Mask Mask;
class Recv : public Model::Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::RecvPort",
"SlsDetector");
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
RecvPort(Eiger *eiger, int recv_idx, int port);
class Port : public Model::Recv::Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Port",
"SlsDetector");
public:
Port(Recv *recv, int port);
void prepareAcq();
virtual void processFrame(FrameType frame, char *dptr,
uint32_t dsize, char *bptr);
private:
friend class Recv;
void copy2LimaBuffer(char *dst, char *src,
int thread_idx);
Recv *m_recv;
int m_recv_idx;
int m_port;
bool m_top_half_recv;
bool m_port_idx;
bool m_raw;
bool m_pixel_depth_4;
int m_slw; // source line width
int m_dlw; // dest line width
int m_scw; // source chip width
int m_dcw; // dest chip width
int m_pchips;
int m_copy_lines;
int m_spo; // source port offset
int m_dpo; // dest port offset
int m_sto; // source thread offset
int m_dto; // dest thread offset
int m_nb_buffers;
NumaSoftBufferAllocMgr m_buffer_alloc_mgr;
StdBufferCbMgr m_buffer_cb_mgr;
};
typedef std::vector<AutoPtr<Port> > PortList;
Recv(Eiger *eiger, int idx);
virtual int getNbPorts();
virtual Port *getPort(int port_idx);
void prepareAcq();
virtual void processRecvFileStart(uint32_t dsize);
virtual void processRecvPort(FrameType frame, char *dptr,
uint32_t dsize, char *bptr);
virtual int getNbPortProcessingThreads();
virtual void processPortThread(FrameType frame, char *bptr,
int thread_idx);
virtual void processFileStart(uint32_t dsize);
virtual int getNbProcessingThreads();
virtual void processThread(const FrameData& frame_data,
char *bptr, int thread_idx);
private:
void copy2LimaBuffer(char *dst, char *src, int thread_idx);
void expandPixelDepth4(char *dst, char *src, int len4,
int thread_idx);
struct ExpandData {
int nb_ports;
char *src[MaxFrameMapItemGroupSize];
char *dst;
int len4;
Mask valid;
};
void expandPixelDepth4(ExpandData& data, int thread_idx);
Eiger *m_eiger;
int m_port;
bool m_top_half_recv;
bool m_port_idx;
bool m_raw;
bool m_pixel_depth_4;
bool m_thread_proc;
int m_recv_idx;
int m_idx;
int m_nb_threads;
int m_slw; // source line width
int m_dlw; // dest line width
int m_scw; // source chip width
int m_dcw; // dest chip width
int m_pchips;
int m_copy_lines;
int m_spo; // source port offset
int m_dpo; // dest port offset
int m_sto; // source thread offset
int m_dto; // dest thread offset
int m_nb_buffers;
NumaSoftBufferAllocMgr m_buffer_alloc_mgr;
StdBufferCbMgr m_buffer_cb_mgr;
FrameType m_last_recv_frame;
FrameType m_last_proc_frame;
bool m_thread_proc;
volatile FrameType m_last_recv_frame;
volatile FrameType m_last_proc_frame;
bool m_overrun;
PortList m_port_list;
};
typedef std::vector<AutoPtr<RecvPort> > RecvPortList;
typedef std::vector<AutoPtr<Recv> > RecvList;
class CorrBase
{
......@@ -216,7 +248,7 @@ class Eiger : public Model
};
Camera *m_cam;
int m_nb_ports;
int m_nb_recvs;
std::vector<BadFrameData> m_bfd_list;
};
......@@ -243,7 +275,7 @@ class Eiger : public Model
ChipBorderCorr(Eiger *eiger)
: CorrBase(eiger)
{}
virtual void prepareAcq()
{
CorrBase::prepareAcq();
......@@ -388,7 +420,7 @@ class Eiger : public Model
FrameDim m_recv_frame_dim;
CorrList m_corr_list;
RecvPortList m_recv_port_list;
RecvList m_recv_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
bool m_expand_4_in_threads;
......
......@@ -25,80 +25,173 @@
#include "SlsDetectorDefs.h"
namespace lima
#include <queue>
namespace lima
{
namespace SlsDetector
{
#define MaxFrameMapItemGroupSize 32
class FrameMap
{
DEB_CLASS_NAMESPC(DebModCamera, "FrameMap", "SlsDetector");
public:
typedef std::bitset<MaxFrameMapItemGroupSize> Mask;
struct FrameData {
int frame;
Mask valid;
FrameData()
{}
FrameData(int f, Mask v) : frame(f), valid(v)
{}
};
typedef std::vector<FrameData> FrameDataList;
struct FinishInfo {
FrameType first_lost;
int nb_lost;
SortedIntList finished;
};
typedef std::vector<FinishInfo> FinishInfoList;
class Item
{
DEB_CLASS_NAMESPC(DebModCamera, "Item", "SlsDetector");
private:
class Follower;
public:
typedef std::pair<int, bool> FrameData;
typedef std::vector<FrameData> FrameDataList;
class Group
{
DEB_CLASS_NAMESPC(DebModCamera, "Item::Group",
"SlsDetector");
public:
typedef std::vector<Item *> ItemList;
Group();
Group(Group&& o);
~Group();
void setItemList(const ItemList& item_list);
void clear();
struct FinishInfo {
FrameType first_lost;
int nb_lost;
SortedIntList finished;
FrameDataList pollFrameFinished();
FinishInfoList
getFrameFinishInfo(const FrameDataList& data_list);
void stopPollFrameFinished();
FrameType getLastFrame() const
{ return m_last_frame; }
private:
typedef std::vector<AutoPtr<Follower> > FollowerList;
void setFrameMap(FrameMap *map);
FrameMap *m_map;
volatile bool m_stopped;
FollowerList m_follower_list;
FrameType m_last_frame;
};
typedef std::vector<FinishInfo> FinishInfoList;
Item();
Item(Item&& o);
~Item();
void checkFinishedFrame(FrameType frame);
void frameFinished(FrameType frame, bool no_check, bool valid);
FrameDataList pollFrameFinished();
FinishInfoList getFrameFinishInfo(const FrameDataList&
data_list);
void stopPollFrameFinished();
FrameMap *getFrameMap()
{ return m_map; }
int getIndex()
{ return m_idx; }
private:
friend class Group;
friend class FrameMap;
class FrameQueue
class FrameQueue;
class Follower
{
DEB_CLASS_NAMESPC(DebModCamera, "Item::Follower",
"SlsDetector");
public:
FrameQueue(int size = 1000);
Follower();
Follower(Follower&& o);
~Follower();
void setGroup(Group *group, int idx);
void setItem(Item *item);
int getIndex()
{ return m_idx; }
bool empty()
{ return m_data_queue.empty(); }
void update();
void pop();
void clear();
FrameData& front()
{ return *m_next_data; }
private:
friend class FrameQueue;
typedef std::queue<FrameDataList> FrameDataQueue;
Group *m_group;
int m_idx;
Mask m_mask;
Item *m_item;
volatile int m_read_idx;
FrameDataQueue m_data_queue;
FrameDataList::iterator m_next_data;
};
typedef std::vector<Follower *> FollowerList;
class FrameQueue
{
public:
FrameQueue(Item *item, int size = 1000);
void clear();
void push(FrameData data);
FrameDataList pop_all();
void stop();
FrameDataList popAll(Follower *f);
private:
int index(int i)
{ return i % m_size; }
FrameDataList m_array;
int m_size;
volatile int m_write_idx;
volatile int m_read_idx;
volatile bool m_stopped;
FollowerList *m_follower_list;
};
friend bool SlsDetector::operator <(FrameData a, FrameData b);
void setFrameMap(FrameMap *map);
void init(FrameMap *map, int idx);
void clear();
FrameMap *m_map;
int m_idx;
FollowerList m_follower_list;
FrameQueue m_frame_queue;
FrameType m_last_pushed_frame;
FrameType m_last_frame;
};
typedef std::vector<Item> ItemList;
FrameMap();
void setNbItems(int nb_items);
void setBufferSize(int buffer_size);
void clear();
......@@ -106,24 +199,29 @@ class FrameMap
Item& getItem(int item)
{ return m_item_list[item]; }
FrameArray getItemFrameArray() const;
FrameArray getGroupFrameArray() const;
FrameType getLastItemFrame() const
{ return getLatestFrame(getItemFrameArray()); }
FrameType getLastGroupFrame() const
{ return getLatestFrame(getGroupFrameArray()); }
FrameType getLastFinishedFrame() const
{ return getOldestFrame(getItemFrameArray()); }
{ return getOldestFrame(getGroupFrameArray()); }
int getNbItemGroups()
{ return m_group_list.size(); }
private:
friend class Item;
typedef std::vector<Item::Group *> GroupList;
struct AtomicCounter {
int count;
Mutex mutex;
void set(int reset)
{ count = reset; }
bool dec_test_and_reset(int reset)
{
mutex.lock();
......@@ -135,15 +233,17 @@ class FrameMap
}
};
typedef std::vector<AtomicCounter> CounterList;
int m_nb_items;
int m_buffer_size;
CounterList m_frame_item_count_list;
ItemList m_item_list;
GroupList m_group_list;
};
std::ostream& operator <<(std::ostream& os, const FrameMap& m);
} // namespace SlsDetector
} // namespace lima
......
......@@ -42,23 +42,37 @@ class Model
{
DEB_CLASS_NAMESPC(DebModCamera, "Model", "SlsDetector");
public:
class RecvPort
typedef Defs::Settings Settings;
typedef FrameMap::FrameData FrameData;
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::RecvPort",
"SlsDetector");
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv", "SlsDetector");
public:
virtual ~RecvPort();
virtual void processRecvFileStart(uint32_t dsize) = 0;
// TODO: add file finished callback
virtual void processRecvPort(FrameType frame, char *dptr,
uint32_t dsize, char *bptr) = 0;
virtual int getNbPortProcessingThreads() = 0;
virtual void processPortThread(FrameType frame, char *bptr,
int thread_idx) = 0;
};
class Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Model::Recv::Port",
"SlsDetector");
public:
virtual ~Port();
typedef Defs::Settings Settings;
// TODO: add file finished callback
virtual void processFrame(FrameType frame, char *dptr,
uint32_t dsize, char *bptr) = 0;
};
virtual ~Recv();
virtual int getNbPorts() = 0;
virtual Port *getPort(int port_idx) = 0;
virtual void processFileStart(uint32_t dsize) = 0;
virtual int getNbProcessingThreads() = 0;
virtual void processThread(const FrameData &frame, char *bptr,
int thread_idx) = 0;
};
Model(Camera *cam, Type type);
virtual ~Model();
......@@ -99,8 +113,10 @@ class Model
virtual bool checkSettings(Settings settings) = 0;
virtual int getNbRecvPorts() = 0;
virtual RecvPort *getRecvPort(int port_idx) = 0;
virtual int getNbRecvs() = 0;
virtual Recv *getRecv(int recv_idx) = 0;
int getNbRecvPorts();
virtual void prepareAcq() = 0;
......
......@@ -45,15 +45,40 @@ public:
~Receiver();
void start();
void setNbPorts(int nb_ports);
void setModelRecv(Model::Recv *model_recv);
void prepareAcq();
void setCPUAffinity(const RecvCPUAffinity& recv_affinity);
void setNbThreads(int nb_threads);
int getNbThreads();
pid_t getThreadID(int thread_idx)
{ return m_thread_list[thread_idx]->getThreadID(); }
bool isBadFrame(FrameType frame)
{ return m_thread_list[0]->isBadFrame(frame); }
int getNbBadFrames()
{ return m_thread_list[0]->getNbBadFrames(); }
void getBadFrameList(int first_idx, int last_idx, IntList& bfl)
{ return m_thread_list[0]->getBadFrameList(first_idx, last_idx, bfl); }
private:
friend class Camera;
typedef FrameMap::Item::Group ItemGroup;
typedef ItemGroup::ItemList ItemList;
typedef FrameMap::FrameData FrameData;
typedef FrameMap::FrameDataList FrameDataList;
typedef FrameMap::FinishInfo FinishInfo;
typedef FrameMap::FinishInfoList FinishInfoList;
typedef std::vector<FinishInfo> FinishInfoArray;
class Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Receiver::Port",
......@@ -72,85 +97,61 @@ private:
Port(Receiver& recv, int port);
pid_t getThreadID(int idx)
{ return m_thread_list[idx].getThreadID(); }
void prepareAcq();
void processFileStart(uint32_t dsize);
void processFrame(FrameType frame, char *dptr, uint32_t dsize);
bool isBadFrame(FrameType frame);
int getNbBadFrames()
{
AutoMutex l = lock();
return m_bad_frame_list.size();
}
void getBadFrameList(int first_idx, int last_idx, IntList& bfl)
{
AutoMutex l = lock();
IntList::const_iterator b = m_bad_frame_list.begin();
bfl.assign(b + first_idx, b + last_idx);
}
Stats& getStats()
{ return m_stats; }
void setNbThreads(int nb_threads);
int getNbThreads();
private: