Commit 71872d46 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Remove intermediate buffer for 4-to-8 bit expansion:

* Move Receiver::Thread to Model::Recv::Thread
* Remove FrameMap::Item::Group/Follower/FrameQueue
* Block Receiver::Port threads while Model::Recv::Thread work
* Support arbitrary number of threads, distribute with ThreadBalance
* Remove ExpandInThreads
parent 967144d7
......@@ -100,8 +100,8 @@ public:
void getFrameDim(FrameDim& frame_dim, bool raw = false)
{ m_model->getFrameDim(frame_dim, raw); }
const FrameMap& getFrameMap()
{ return m_frame_map; }
FrameMap *getFrameMap()
{ return &m_frame_map; }
void putCmd(const std::string& s, int idx = -1);
std::string getCmd(const std::string& s, int idx = -1);
......@@ -143,10 +143,10 @@ public:
void setTolerateLostPackets(bool tol_lost_packets);
void getTolerateLostPackets(bool& tol_lost_packets);
int getNbBadFrames(int recv_idx);
void getBadFrameList(int recv_idx, int first_idx, int last_idx,
int getNbBadFrames(int item_idx);
void getBadFrameList(int item_idx, int first_idx, int last_idx,
IntList& bad_frame_list);
void getBadFrameList(int recv_idx, IntList& bad_frame_list);
void getBadFrameList(int item_idx, IntList& bad_frame_list);
void prepareAcq();
void startAcq();
......@@ -163,6 +163,8 @@ public:
GlobalCPUAffinityMgr::ProcessingFinishedEvent *
getProcessingFinishedEvent();
void reportException(Exception& e, string name);
private:
typedef std::map<int, int> RecvPortMap;
typedef std::queue<int> FrameQueue;
......@@ -308,7 +310,7 @@ private:
double m_new_frame_timeout;
double m_abort_sleep_time;
bool m_tol_lost_packets;
FrameArray m_prev_gfa;
FrameArray m_prev_ifa;
TimeRangesChangedCallback *m_time_ranges_cb;
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
......
......@@ -36,7 +36,8 @@ namespace SlsDetector
#define EIGER_PACKET_DATA_LEN (4 * 1024)
#define MaxEigerNbPorts MaxFrameMapItemGroupSize
#define MaxEigerNbPorts 32
#define MaxEigerNbThreads 32
class Eiger : public Model
{
......@@ -49,6 +50,8 @@ class Eiger : public Model
typedef Defs::ClockDiv ClockDiv;
typedef std::vector<int> ThreadBalance;
enum ParallelMode {
NonParallel, Parallel, Safe,
};
......@@ -70,7 +73,7 @@ class Eiger : public Model
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Geometry",
"SlsDetector");
public:
typedef FrameMap::Mask Mask;
typedef std::bitset<MaxEigerNbPorts> Mask;
class Recv
{
......@@ -82,7 +85,6 @@ class Eiger : public Model
struct Expand4Data {
char *src[MaxEigerNbPorts];
char *dst;
int len4;
Mask valid;
};
......@@ -107,7 +109,8 @@ class Eiger : public Model
int cw; // chip width
int lw; // line width
int po; // port offset
int to; // thread offset
// thread offset
int to[MaxEigerNbThreads];
};
Recv *m_recv;
......@@ -121,6 +124,10 @@ class Eiger : public Model
CalcData m_dst;
int m_pchips;
int m_copy_lines;
int m_raw_port_blocks;
int m_port_blocks[MaxEigerNbThreads];
int m_first_port[MaxEigerNbThreads];
int m_len4[MaxEigerNbThreads];
};
typedef std::vector<AutoPtr<Port> > PortList;
......@@ -132,11 +139,17 @@ class Eiger : public Model
void setThreadProcessing(bool thread_proc)
{ m_thread_proc = thread_proc; }
void setNbProcessingThreads(int nb_proc_threads)
{ m_nb_proc_threads = nb_proc_threads; }
{
if (nb_proc_threads != m_nb_proc_threads)
m_thread_bal.clear();
m_nb_proc_threads = nb_proc_threads;
}
void setThreadBalance(const ThreadBalance& thread_bal)
{ m_thread_bal = thread_bal; }
void prepareAcq();
void expandPixelDepth4(Expand4Data& data,
void expandPixelDepth4(const Expand4Data& data,
int thread_idx);
private:
friend class Port;
......@@ -147,6 +160,7 @@ class Eiger : public Model
PortList m_port_list;
int m_nb_proc_threads;
bool m_thread_proc;
ThreadBalance m_thread_bal;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
......@@ -241,10 +255,12 @@ class Eiger : public Model
void setThresholdEnergy(int thres);
void getThresholdEnergy(int& thres);
void setExpand4InThreads(bool expand_4_in_threads);
void getExpand4InThreads(bool& expand_4_in_threads);
protected:
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr);
virtual void updateImageSize();
virtual bool checkSettings(Settings settings);
......@@ -253,6 +269,8 @@ class Eiger : public Model
virtual Model::Recv *getRecv(int recv_idx);
virtual void prepareAcq();
virtual void startAcq();
virtual void stopAcq();
private:
friend class Correction;
......@@ -262,6 +280,8 @@ class Eiger : public Model
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
typedef Geometry::Recv::Expand4Data Expand4Data;
class Port : public Model::Recv::Port
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Port",
......@@ -269,7 +289,45 @@ class Eiger : public Model
public:
Port(Recv *recv, int port);
struct Sync {
Recv *recv;
int port;
FrameType frame;
char *src;
char *dst;
bool waiting, stopped;
Sync(Recv *r, int p) : recv(r), port(p)
{}
void prepareAcq()
{
waiting = stopped = false;
frame = -1;
}
void processFrame(FrameType f, char *s, char *d)
{
frame = f;
src = s;
dst = d;
waiting = true;
AutoMutex l = recv->lockPort(false);
recv->updatePortFrame(this, l);
while (waiting && !stopped)
recv->waitPort();
}
void stopAcq()
{
AutoMutex l = recv->lockPort();
stopped = true;
recv->broadcastPort();
}
};
void prepareAcq();
void stopAcq();
virtual void processFrame(FrameType frame, char *dptr,
uint32_t dsize, char *bptr);
......@@ -278,10 +336,8 @@ class Eiger : public Model
Recv *m_recv;
int m_port;
Sync m_sync;
Geometry::Recv::Port *m_geom;
int m_nb_buffers;
NumaSoftBufferAllocMgr m_buffer_alloc_mgr;
StdBufferCbMgr m_buffer_cb_mgr;
};
typedef std::vector<AutoPtr<Port> > PortList;
......@@ -291,22 +347,80 @@ class Eiger : public Model
virtual Port *getPort(int port_idx);
void prepareAcq();
void startAcq();
void stopAcq();
virtual void processFileStart(uint32_t dsize);
virtual int getNbProcessingThreads();
virtual void processThread(const FrameData& frame_data,
char *bptr, int thread_idx);
virtual void setNbProcessingThreads(int nb_proc_threads);
virtual pid_t getThreadID(int thread_idx)
{ return m_thread_list[thread_idx]->getThreadID(); }
void updateFrameMapItem(FrameMap::Item *item)
{ m_frame_map_item = item; }
private:
friend class Port::Sync;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv::Thread",
"SlsDetector");
public:
virtual ~Thread();
void init(Recv *recv, int idx);
void addNewFrame(const Expand4Data& data)
{
AutoMutex l = lockThread();
m_data = &data;
signalThread();
}
protected:
virtual void start();
virtual void threadFunction();
private:
AutoMutex lockThread()
{ return AutoMutex(m_cond.mutex()); }
void waitThread()
{ m_cond.wait(); }
void signalThread()
{ m_cond.signal(); }
Recv *m_recv;
int m_idx;
bool m_end;
Cond m_cond;
const Expand4Data *m_data;
};
typedef std::vector<AutoPtr<Thread> > ThreadList;
AutoMutex lockPort(bool locked=true)
{ return AutoMutex(m_cond.mutex(), (locked ? AutoMutex::Locked :
AutoMutex::UnLocked)); }
void broadcastPort()
{ m_cond.broadcast(); }
void waitPort()
{ m_cond.wait(); }
void updatePortFrame(Port::Sync *sync, AutoMutex& pl);
void updateProcessingFrame(AutoMutex& tl);
Eiger *m_eiger;
int m_idx;
Cond m_cond;
Geometry::Recv *m_geom;
int m_nb_proc_threads;
FrameType m_frame;
int m_nb_ready_threads;
Geometry::Recv::Expand4Data m_expand_data;
bool m_thread_proc;
volatile FrameType m_last_recv_frame;
volatile FrameType m_last_proc_frame;
bool m_overrun;
ThreadList m_thread_list;
PortList m_port_list;
FrameMap::Item *m_frame_map_item;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
......@@ -526,7 +640,6 @@ class Eiger : public Model
RecvList m_recv_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
bool m_expand_4_in_threads;
};
std::ostream& operator <<(std::ostream& os, Eiger::ParallelMode mode);
......
......@@ -25,7 +25,7 @@
#include "SlsDetectorDefs.h"
#include <queue>
#include <algorithm>
namespace lima
{
......@@ -33,79 +33,23 @@ namespace lima
namespace SlsDetector
{
#define MaxFrameMapItemGroupSize 32
class FrameMap
{
DEB_CLASS_NAMESPC(DebModCamera, "FrameMap", "SlsDetector");
public:
typedef std::bitset<MaxFrameMapItemGroupSize> Mask;
struct FrameData {
int frame;
Mask valid;
FrameData()
{}
FrameData(int f, Mask v) : frame(f), valid(v)
{}
};
typedef std::vector<FrameData> FrameDataList;
struct FinishInfo {
FrameType first_lost;
int nb_lost;
SortedIntList finished;
};
typedef std::vector<FinishInfo> FinishInfoList;
class Item
{
DEB_CLASS_NAMESPC(DebModCamera, "Item", "SlsDetector");
private:
class Follower;
public:
class Group
{
DEB_CLASS_NAMESPC(DebModCamera, "Item::Group",
"SlsDetector");
public:
typedef std::vector<Item *> ItemList;
Group();
Group(Group&& o);
~Group();
void setItemList(const ItemList& item_list);
void clear();
FrameDataList pollFrameFinished();
FinishInfoList
getFrameFinishInfo(const FrameDataList& data_list);
void stopPollFrameFinished();
FrameType getLastFrame() const
{ return m_last_frame; }
private:
typedef std::vector<AutoPtr<Follower> > FollowerList;
void setFrameMap(FrameMap *map);
FrameMap *m_map;
volatile bool m_stopped;
FollowerList m_follower_list;
FrameType m_last_frame;
};
Item();
Item(Item&& o);
~Item();
void checkFinishedFrame(FrameType frame);
void frameFinished(FrameType frame, bool no_check, bool valid);
Item(FrameMap *map, int idx);
FrameMap *getFrameMap()
{ return m_map; }
......@@ -113,108 +57,53 @@ class FrameMap
int getIndex()
{ return m_idx; }
private:
friend class Group;
friend class FrameMap;
class FrameQueue;
class Follower
{
DEB_CLASS_NAMESPC(DebModCamera, "Item::Follower",
"SlsDetector");
public:
Follower();
Follower(Follower&& o);
~Follower();
void setGroup(Group *group, int idx);
void setItem(Item *item);
int getIndex()
{ return m_idx; }
bool empty()
{ return m_data_queue.empty(); }
void update();
void pop();
void clear();
FrameData& front()
{ return *m_next_data; }
private:
friend class FrameQueue;
typedef std::queue<FrameDataList> FrameDataQueue;
Group *m_group;
int m_idx;
Mask m_mask;
Item *m_item;
volatile int m_read_idx;
FrameDataQueue m_data_queue;
FrameDataList::iterator m_next_data;
};
typedef std::vector<Follower *> FollowerList;
class FrameQueue
{
public:
FrameQueue(Item *item, int size = 1000);
void clear();
void push(FrameData data);
FrameDataList popAll(Follower *f);
private:
int index(int i)
{ return i % m_size; }
void checkFinishedFrame(FrameType frame);
FinishInfo frameFinished(FrameType frame, bool no_check,
bool valid);
FrameDataList m_array;
int m_size;
volatile int m_write_idx;
FollowerList *m_follower_list;
};
bool isBadFrame(FrameType frame);
int getNbBadFrames();
void getBadFrameList(int first_idx, int last_idx, IntList& bfl);
friend bool SlsDetector::operator <(FrameData a, FrameData b);
private:
friend class FrameMap;
void init(FrameMap *map, int idx);
void clear();
AutoMutex lock()
{ return m_mutex; }
Mutex m_mutex;
FrameMap *m_map;
int m_idx;
FollowerList m_follower_list;
FrameQueue m_frame_queue;
FrameType m_last_pushed_frame;
FrameType m_last_frame;
IntList m_bad_frame_list;
};
typedef std::vector<Item> ItemList;
typedef std::vector<AutoPtr<Item> > ItemList;
FrameMap();
FrameMap(Camera *cam);
void setNbItems(int nb_items);
int getNbItems()
{ return m_nb_items; }
void setBufferSize(int buffer_size);
void clear();
Item& getItem(int item)
Item *getItem(int item)
{ return m_item_list[item]; }
FrameArray getGroupFrameArray() const;
FrameArray getItemFrameArray() const;
FrameType getLastGroupFrame() const
{ return getLatestFrame(getGroupFrameArray()); }
FrameType getLastItemFrame() const
{ return getLatestFrame(getItemFrameArray()); }
FrameType getLastFinishedFrame() const
{ return getOldestFrame(getGroupFrameArray()); }
int getNbItemGroups()
{ return m_group_list.size(); }
{ return getOldestFrame(getItemFrameArray()); }
private:
friend class Item;
typedef std::vector<Item::Group *> GroupList;
struct AtomicCounter {
int count;
Mutex mutex;
......@@ -234,13 +123,35 @@ class FrameMap
};
typedef std::vector<AtomicCounter> CounterList;
Camera *m_cam;
int m_nb_items;
int m_buffer_size;
CounterList m_frame_item_count_list;
ItemList m_item_list;
GroupList m_group_list;
};
inline bool FrameMap::Item::isBadFrame(FrameType frame)
{
AutoMutex l = lock();
IntList::iterator end = m_bad_frame_list.end();
return (std::find(m_bad_frame_list.begin(), end, frame) != end);
}
inline int FrameMap::Item::getNbBadFrames()
{
AutoMutex l = lock();
return m_bad_frame_list.size();
}
inline void FrameMap::Item::getBadFrameList(int first_idx, int last_idx,
IntList& bfl)
{
AutoMutex l = lock();
IntList::const_iterator b = m_bad_frame_list.begin();
bfl.assign(b + first_idx, b + last_idx);
}
std::ostream& operator <<(std::ostream& os, const FrameMap& m);
......
......@@ -44,7 +44,7 @@ class Model
public:
typedef Defs::Settings Settings;
typedef FrameMap::FrameData FrameData;
typedef FrameMap::FinishInfo FinishInfo;
class Recv
{
......@@ -70,8 +70,8 @@ class Model
virtual void processFileStart(uint32_t dsize) = 0;
virtual int getNbProcessingThreads() = 0;
virtual void processThread(const FrameData &frame, char *bptr,
int thread_idx) = 0;
virtual void setNbProcessingThreads(int nb_proc_threads) = 0;
virtual pid_t getThreadID(int thread_idx) = 0;
};
Model(Camera *cam, Type type);
......@@ -102,6 +102,11 @@ class Model
virtual void getTimeRanges(TimeRanges& time_ranges) = 0;
virtual int getNbFrameMapItems() = 0;
virtual void updateFrameMapItems(FrameMap *map) = 0;
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr) = 0;
protected:
void updateCameraModel();
void updateTimeRanges();
......@@ -119,6 +124,10 @@ class Model
int getNbRecvPorts();
virtual void prepareAcq() = 0;
virtual void startAcq() = 0;
virtual void stopAcq() = 0;
void processFinishInfo(const FinishInfo& finfo);
private:
friend class Camera;
......
......@@ -52,33 +52,9 @@ public:
void setCPUAffinity(const RecvCPUAffinity& recv_affinity);
void setNbThreads(int nb_threads);
int getNbThreads();
pid_t getThreadID(int thread_idx)
{ return m_thread_list[thread_idx]->getThreadID(); }
bool isBadFrame(FrameType frame)
{ return m_thread_list[0]->isBadFrame(frame); }