Commit 1f168a62 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline
Browse files

Formalize Receiver::ImagePackets, evolve Model API:

* Only one processing thread, move packet processing
  to Camera readRecvPackets and assemblePackets
* Remove Recv helper class in Jungfrau
parent f38dccf7
......@@ -246,6 +246,9 @@ private:
void removeSharedMem();
void createReceivers();
DetFrameImagePackets readRecvPackets();
void assemblePackets(DetFrameImagePackets&& det_packets);
bool checkLostPackets();
FrameType getLastReceivedFrame();
......@@ -311,6 +314,7 @@ private:
PixelDepthCPUAffinityMap m_cpu_affinity_map;
GlobalCPUAffinityMgr m_global_cpu_affinity_mgr;
AutoPtr<AcqThread> m_acq_thread;
FramePacketMap m_frame_packet_map;
};
} // namespace SlsDetector
......
......@@ -71,14 +71,12 @@ class Eiger : public Model
class Port;
struct FrameData {
typedef Receiver::ImageData RecvImageData;
char *src[EigerNbRecvPorts];
char *dst;
Mask valid;
FrameData();
FrameData(const RecvImageData& image, char *d);
FrameData(const Receiver::ImagePackets& image, char *d);
};
class Port
......@@ -269,8 +267,6 @@ class Eiger : public Model
protected:
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
......@@ -292,32 +288,7 @@ class Eiger : public Model
};
typedef std::vector<AutoPtr<Beb> > BebList;
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Eiger::Recv", "SlsDetector");
public:
typedef Geometry::Recv::FrameData FrameData;
typedef Receiver::ImageData RecvImageData;
Recv(Eiger *eiger, int idx);
virtual ~Recv();
void prepareAcq();
bool processOneFrame(FrameType frame, char *bptr);
void processBadFrame(FrameType frame, char *bptr);
private:
Eiger *m_eiger;
int m_idx;
Geometry::Recv *m_geom;
Receiver *m_recv;
int m_data_offset;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
......@@ -574,8 +545,6 @@ class Eiger : public Model
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
void processOneFrame(AutoMutex& l);
CorrBase *createBadRecvFrameCorr();
CorrBase *createChipBorderCorr(ImageType image_type);
CorrBase *createInterModGapCorr();
......@@ -622,10 +591,6 @@ class Eiger : public Model
RecvList m_recv_list;
ModelReconstruction *m_reconstruction;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
bool m_fixed_clock_div;
ClockDiv m_clock_div;
......
......@@ -198,8 +198,6 @@ class Jungfrau : public Model
protected:
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr);
virtual void setThreadCPUAffinity(const CPUAffinityList& aff_list);
......@@ -214,30 +212,8 @@ class Jungfrau : public Model
private:
friend class GainPed;
class Recv
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::Recv", "SlsDetector");
public:
typedef Receiver::ImageData RecvImageData;
Recv(Jungfrau *jungfrau, int idx);
void prepareAcq();
bool processOneFrame(FrameType frame, char *bptr);
void processBadFrame(FrameType frame, char *bptr);
private:
Jungfrau *m_jungfrau;
int m_idx;
bool m_raw;
Receiver *m_recv;
FrameDim m_frame_dim;
int m_data_offset;
};
typedef std::vector<AutoPtr<Recv> > RecvList;
typedef std::vector<Receiver *> RecvList;
class Thread : public lima::Thread
{
DEB_CLASS_NAMESPC(DebModCamera, "Jungfrau::Thread", "SlsDetector");
......@@ -630,8 +606,6 @@ class Jungfrau : public Model
int getNbProcessingThreads();
void setNbProcessingThreads(int nb_proc_threads);
void processOneFrame(AutoMutex& l);
Cond m_cond;
AutoPtr<GainPedImgProc> m_gain_ped_img_proc;
AutoPtr<GainADCMapImgProc> m_gain_adc_map_img_proc;
......@@ -642,10 +616,6 @@ class Jungfrau : public Model
ModelReconstruction *m_reconstruction;
RecvList m_recv_list;
FrameType m_nb_frames;
FrameType m_next_frame;
FrameType m_last_frame;
SortedIntList m_in_process;
FrameMap::Item *m_frame_map_item;
ThreadList m_thread_list;
};
......
......@@ -82,8 +82,6 @@ class Model
virtual int getNbFrameMapItems() = 0;
virtual void updateFrameMapItems(FrameMap *map) = 0;
virtual void processBadItemFrame(FrameType frame, int item,
char *bptr) = 0;
virtual bool isAcqActive();
virtual bool isXferActive() = 0;
......@@ -92,6 +90,8 @@ class Model
virtual Reconstruction *getReconstruction();
void processPackets();
protected:
void updateCameraModel();
void updateCameraImageSize();
......@@ -125,6 +125,8 @@ class Model
protected:
AutoPtr<sls::Detector> m_det;
FrameType m_next_frame;
FrameType m_last_frame;
};
......
......@@ -49,18 +49,27 @@ public:
typedef slsDetectorDefs::sls_detector_header sls_detector_header;
typedef slsDetectorDefs::sls_receiver_header sls_receiver_header;
struct ImageData {
class ImagePackets {
public:
FrameType frame;
sls_receiver_header header;
int numberOfPorts;
std::bitset<MAX_NUM_PORTS> validPortData;
ImageData() : frame(-1), numberOfPorts(0)
{ header.detHeader.frameNumber = -1; }
virtual ~ImageData() {}
virtual ~ImagePackets() {}
uint64_t detFrame() { return header.detHeader.frameNumber; }
bool assemble(char *buf)
{ return recv->asmImagePackets(this, buf); }
protected:
friend class Receiver;
ImagePackets(Receiver *r) : frame(-1), numberOfPorts(0), recv(r)
{ header.detHeader.frameNumber = -1; }
Receiver *recv;
};
Receiver(Camera *cam, int idx, int rx_port);
......@@ -75,9 +84,10 @@ public:
void setCPUAffinity(const RecvCPUAffinity& recv_affinity);
ImageData *readImagePackets();
bool asmImagePackets(ImageData *image_data, char *buffer);
ImagePackets *readImagePackets();
void fillBadFrame(char *buf);
SlsDetector::Stats& getStats()
{ return m_stats.stats; }
......@@ -85,6 +95,7 @@ public:
private:
friend class Camera;
friend class ImagePackets;
struct Stats {
SlsDetector::Stats stats;
......@@ -99,7 +110,8 @@ private:
struct AssemblerImpl;
AutoPtr<ImageData> readSkippableImagePackets();
bool asmImagePackets(ImagePackets *image_data, char *buffer);
AutoPtr<ImagePackets> readSkippableImagePackets();
Camera *m_cam;
int m_idx;
......@@ -112,6 +124,10 @@ private:
bool m_last_skipped;
};
typedef std::map<int, AutoPtr<Receiver::ImagePackets>> DetImagePackets;
typedef std::map<FrameType, DetImagePackets> FramePacketMap;
typedef FramePacketMap::value_type DetFrameImagePackets;
} // namespace SlsDetector
......
......@@ -98,8 +98,6 @@ class Eiger : public SlsDetector::Model
protected:
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void processBadItemFrame(unsigned long frame, int item,
char *bptr);
virtual void updateImageSize();
......
......@@ -196,8 +196,6 @@ class Jungfrau : public SlsDetector::Model
protected:
virtual int getNbFrameMapItems();
virtual void updateFrameMapItems(FrameMap *map);
virtual void processBadItemFrame(unsigned long frame, int item,
char *bptr);
virtual void updateImageSize();
......
......@@ -67,8 +67,6 @@ public:
virtual int getNbFrameMapItems() = 0;
virtual void updateFrameMapItems(FrameMap *map) = 0;
virtual void processBadItemFrame(unsigned long frame, int item,
char *bptr) = 0;
virtual bool isXferActive() = 0;
......
......@@ -944,6 +944,71 @@ Camera::DetStatus Camera::getDetTrigStatus()
return trig_status;
}
DetFrameImagePackets Camera::readRecvPackets()
{
DEB_MEMBER_FUNCT();
FrameType frame = m_model->m_next_frame++;
DetFrameImagePackets det_frame_packets{frame, {}};
DetImagePackets& det_packets = det_frame_packets.second;
FramePacketMap::iterator it = m_frame_packet_map.find(frame);
if (it != m_frame_packet_map.end()) {
det_packets = std::move(det_packets);
m_frame_packet_map.erase(it);
}
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
while (det_packets.find(i) == det_packets.end()) {
Receiver::ImagePackets *image_packets;
image_packets = m_recv_list[i]->readImagePackets();
if (!image_packets)
break;
typedef DetImagePackets::value_type MapEntry;
FrameType f = image_packets->frame;
if (f == frame) {
det_packets.emplace(MapEntry(i, image_packets));
break;
} else {
DetImagePackets& other = m_frame_packet_map[f];
other.emplace(MapEntry(i, image_packets));
}
}
}
FrameType& m_last_frame = m_model->m_last_frame;
if ((int(m_last_frame) == -1) || (frame > m_last_frame))
m_last_frame = frame;
return det_frame_packets;
}
void Camera::assemblePackets(DetFrameImagePackets&& det_frame_packets)
{
DEB_MEMBER_FUNCT();
const FrameType& frame = det_frame_packets.first;
DetImagePackets& det_packets = det_frame_packets.second;
char *bptr = m_buffer.getAcqFrameBufferPtr(frame);
int nb_recvs = getNbRecvs();
for (int i = 0; i < nb_recvs; ++i) {
bool ok = false;
if (det_packets[i]) {
ok = det_packets[i]->assemble(bptr);
det_packets[i].free();
}
if (!ok)
m_recv_list[i]->fillBadFrame(bptr);
}
FrameMap::Item *mi = m_frame_map.getItem(0);
Model::FinishInfo finfo = mi->frameFinished(frame, true, true);
m_model->processFinishInfo(finfo);
}
bool Camera::checkLostPackets()
{
DEB_MEMBER_FUNCT();
......@@ -980,12 +1045,6 @@ bool Camera::checkLostPackets()
first_bad[i] = item->getNbBadFrames();
}
}
for (int i = 0; i < nb_items; ++i) {
if (ifa[i] != last_frame) {
char *bptr = m_buffer.getAcqFrameBufferPtr(last_frame);
m_model->processBadItemFrame(last_frame, i, bptr);
}
}
if (DEB_CHECK_ANY(DebTypeWarning)) {
IntList last_bad(nb_items);
for (int i = 0; i < nb_items; ++i)
......
......@@ -191,7 +191,7 @@ Eiger::Geometry::Recv::FrameData::FrameData()
{
}
Eiger::Geometry::Recv::FrameData::FrameData(const RecvImageData& /*image*/,
Eiger::Geometry::Recv::FrameData::FrameData(const Receiver::ImagePackets& /*image*/,
char *d)
: dst(d)
{
......@@ -593,52 +593,6 @@ Eiger::Beb::Beb(const std::string& host_name)
{
}
Eiger::Recv::Recv(Eiger *eiger, int idx)
: m_eiger(eiger), m_idx(idx)
{
DEB_CONSTRUCTOR();
DEB_PARAM() << DEB_VAR1(m_idx);
m_geom = m_eiger->m_geom.getRecv(m_idx);
Camera *cam = m_eiger->getCamera();
m_recv = cam->getRecv(m_idx);
}
Eiger::Recv::~Recv()
{
DEB_DESTRUCTOR();
}
void Eiger::Recv::prepareAcq()
{
DEB_MEMBER_FUNCT();
m_data_offset = m_geom->getDstBufferOffset();
DEB_TRACE() << DEB_VAR2(m_idx, m_data_offset);
}
bool Eiger::Recv::processOneFrame(FrameType frame, char *bptr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(m_idx, frame);
AutoPtr<RecvImageData> data = m_recv->readImagePackets();
if (!data)
return false;
if (!m_recv->asmImagePackets(data, bptr))
return false;
FrameType recv_frame = data->frame;
if (recv_frame != frame)
DEB_ERROR() << "Unexpected frame: " << DEB_VAR2(recv_frame,
frame);
return true;
}
void Eiger::Recv::processBadFrame(FrameType frame, char *bptr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(m_idx, frame);
m_geom->fillBadFrame(frame, bptr);
}
Eiger::Thread::Thread(Eiger *eiger, int idx)
: m_eiger(eiger), m_idx(idx)
{
......@@ -689,8 +643,17 @@ void Eiger::Thread::threadFunction()
}
wait();
}
if (s == Running)
m_eiger->processOneFrame(l);
if (s == Running) {
try {
AutoMutexUnlock u(l);
m_eiger->processPackets();
} catch (Exception& e) {
Camera *cam = m_eiger->getCamera();
string name = ("Eiger::Thread::"
"threadFunction");
cam->reportException(e, name);
}
}
}
s = End;
......@@ -718,7 +681,7 @@ Eiger::Eiger(Camera *cam)
int nb_det_modules = getNbDetModules();
DEB_TRACE() << "Using Eiger detector, " << DEB_VAR1(nb_det_modules);
NameList host_name_list = getCamera()->getHostnameList();
NameList host_name_list = cam->getHostnameList();
NameList::const_iterator it, end = host_name_list.end();
for (it = host_name_list.begin(); it != end; ++it) {
Beb *beb = new Beb(*it);
......@@ -727,10 +690,8 @@ Eiger::Eiger(Camera *cam)
m_geom.setNbRecvs(nb_det_modules);
for (int i = 0; i < nb_det_modules; ++i) {
Recv *r = new Recv(this, i);
m_recv_list.push_back(r);
}
for (int i = 0; i < nb_det_modules; ++i)
m_recv_list.push_back(cam->getRecv(i));
setNbProcessingThreads(1);
......@@ -1007,14 +968,6 @@ int Eiger::getNbFrameMapItems()
void Eiger::updateFrameMapItems(FrameMap *map)
{
DEB_MEMBER_FUNCT();
m_frame_map_item = map->getItem(0);
}
void Eiger::processBadItemFrame(FrameType frame, int item, char *bptr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(frame, item);
m_recv_list[item]->processBadFrame(frame, bptr);
}
void Eiger::updateImageSize()
......@@ -1253,7 +1206,7 @@ void Eiger::setNbProcessingThreads(int nb_proc_threads)
int curr_nb_proc_threads = m_thread_list.size();
DEB_PARAM() << DEB_VAR2(curr_nb_proc_threads, nb_proc_threads);
if (nb_proc_threads < 1)
if (nb_proc_threads != 1)
THROW_HW_ERROR(InvalidValue) << DEB_VAR1(nb_proc_threads);
if (nb_proc_threads == curr_nb_proc_threads)
return;
......@@ -1310,8 +1263,6 @@ void Eiger::prepareAcq()
m_geom.prepareAcq();
m_in_process.clear();
Camera *cam = getCamera();
cam->getNbFrames(m_nb_frames);
m_next_frame = 0;
......@@ -1346,71 +1297,6 @@ void Eiger::stopAcq()
(*it)->stopAcq();
}
void Eiger::processOneFrame(AutoMutex& l)
{
DEB_MEMBER_FUNCT();
FrameType frame = m_next_frame++;
class Sync
{
public:
Sync(FrameType f, Eiger& e)
: frame(f), eiger(e), in_proc(e.m_in_process)
{
in_proc.insert(frame);
}
~Sync()
{
in_proc.erase(frame);
eiger.broadcast();
}
void startFinish()
{
while (!in_proc.empty()
&& (*in_proc.begin() < int(frame)))
eiger.wait();
}
private:
FrameType frame;
Eiger& eiger;
SortedIntList& in_proc;
} sync(frame, *this);
{
AutoMutexUnlock u(l);
std::bitset<64> ok;
int nb_recvs = getNbRecvs();
if (nb_recvs > 64)
THROW_HW_ERROR(Error) << "Too many receivers";
char *bptr = getAcqFrameBufferPtr(frame);
for (int i = 0; i < nb_recvs; ++i)
ok[i] = m_recv_list[i]->processOneFrame(frame, bptr);
if (ok.none())
return;
for (int i = 0; i < nb_recvs; ++i)
if (!ok[i])
m_recv_list[i]->processBadFrame(frame, bptr);
}
if ((int(m_last_frame) == -1) || (frame > m_last_frame))
m_last_frame = frame;
sync.startFinish();
{
AutoMutexUnlock u(l);
FrameMap::Item& mi = *m_frame_map_item;
FinishInfo finfo = mi.frameFinished(frame, true, true);
processFinishInfo(finfo);
}
}
Eiger::CorrBase *Eiger::createBadRecvFrameCorr()
{
DEB_MEMBER_FUNCT();
......
......@@ -226,55 +226,6 @@ std::ostream& lima::SlsDetector::operator <<(std::ostream& os,
}
/*
* Jungfrau::Recv class
*/
Jungfrau::Recv::Recv(Jungfrau *jungfrau, int idx)
: m_jungfrau(jungfrau), m_idx(idx)
{
DEB_CONSTRUCTOR();
DEB_PARAM() << DEB_VAR1(m_idx);
Camera *cam = m_jungfrau->getCamera();
m_recv = cam->getRecv(m_idx);
}
void Jungfrau::Recv::prepareAcq()
{
DEB_MEMBER_FUNCT();
m_raw = m_jungfrau->getRawMode();
m_frame_dim = m_jungfrau->getModuleFrameDim(m_idx, m_raw);
m_data_offset = m_jungfrau->getModuleDataOffset(m_idx, m_raw);
DEB_TRACE() << DEB_VAR3(m_idx, m_frame_dim, m_data_offset);
}
bool Jungfrau::Recv::processOneFrame(FrameType frame, char *bptr)
{
DEB_MEMBER_FUNCT();
DEB_PARAM() << DEB_VAR2(m_idx, frame);
AutoPtr<RecvImageData> data = m_recv->readImagePackets();
if (!data)
return false;
if (!m_recv->asmImagePackets(data, bptr))
return false;
FrameType recv_frame = data->frame;
if (recv_frame != frame)
DEB_ERROR() << "Unexpected frame: " << DEB_VAR2(recv_frame,