ThreadUtils.cpp 9.01 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//###########################################################################
// This file is part of LImA, a Library for Image Acquisition
//
// Copyright (C) : 2009-2011
// European Synchrotron Radiation Facility
// BP 220, Grenoble 38043
// FRANCE
//
// This is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//###########################################################################
22
23
#include "lima/ThreadUtils.h"
#include "lima/Exceptions.h"
ahoms's avatar
ahoms committed
24
#include <errno.h>
25
#include <iomanip>
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
26
#ifdef __unix
27
#include <sys/time.h>
Sebastien Petitdemange's avatar
Sebastien Petitdemange committed
28
29
30
#else
#include <time_compat.h>
#endif
31
#include <unistd.h>
32
33

#if !defined(_WIN32)
34
#include <sys/syscall.h>
35
#endif
ahoms's avatar
ahoms committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

using namespace lima;

MutexAttr::MutexAttr(Type type)
{
	if (pthread_mutexattr_init(&m_mutex_attr) != 0)
		throw LIMA_COM_EXC(Error, "Error initializing mutex attr");

	try {
		setType(type);
	} catch (...) {
		destroy();
		throw;
	}
}

MutexAttr::MutexAttr(const MutexAttr& mutex_attr)
	: m_mutex_attr(mutex_attr.m_mutex_attr)
{
}

MutexAttr::~MutexAttr()
{
	destroy();
}

void MutexAttr::setType(Type type)
{
	int kind;
	switch (type) {
	case Normal: 
		kind = PTHREAD_MUTEX_NORMAL;
		break;
	case Recursive:
		kind = PTHREAD_MUTEX_RECURSIVE;
		break;
	case ErrorCheck:
		kind = PTHREAD_MUTEX_ERRORCHECK;
		break;
	default:
		throw LIMA_COM_EXC(InvalidValue, "Invalid MutexAttr type");
	}

	if (pthread_mutexattr_settype(&m_mutex_attr, kind) != 0)
		throw LIMA_COM_EXC(Error, "Error setting mutex attr");
}

MutexAttr::Type MutexAttr::getType() const
{
	int kind;
	if (pthread_mutexattr_gettype(&m_mutex_attr, &kind) != 0)
		throw LIMA_COM_EXC(Error, "Error getting mutex attr");

	switch (kind) {
	case PTHREAD_MUTEX_NORMAL:
		return Normal;
	case PTHREAD_MUTEX_RECURSIVE:
		return Recursive;
	case PTHREAD_MUTEX_ERRORCHECK:
		return ErrorCheck;
	default:
		throw LIMA_COM_EXC(Error, "Invalid mutex attr kind");
	}
}

MutexAttr& MutexAttr::operator =(Type type)
{
	setType(type);
	return *this;
}

MutexAttr& MutexAttr::operator =(const MutexAttr& mutex_attr)
{
	m_mutex_attr = mutex_attr.m_mutex_attr;
	return *this;
}

void MutexAttr::destroy()
{
	pthread_mutexattr_destroy(&m_mutex_attr);
}


Mutex::Mutex(MutexAttr mutex_attr)
	: m_mutex_attr(mutex_attr)
{
	pthread_mutexattr_t& attr = m_mutex_attr.m_mutex_attr;
	if (pthread_mutex_init(&m_mutex, &attr) != 0)
		throw LIMA_COM_EXC(Error, "Error initializing mutex");
}

Mutex::~Mutex()
{
	pthread_mutex_destroy(&m_mutex);
}

void Mutex::lock()
{
	if (pthread_mutex_lock(&m_mutex) != 0)
		throw LIMA_COM_EXC(Error, "Error locking mutex");
}

void Mutex::unlock()
{
	if (pthread_mutex_unlock(&m_mutex) != 0)
		throw LIMA_COM_EXC(Error, "Error unlocking mutex");
}

bool Mutex::tryLock()
{
	
	switch (pthread_mutex_trylock(&m_mutex)) {
	case EBUSY:
		return false;
	case 0:
		return true;
	default:
		throw LIMA_COM_EXC(Error, "Error trying to lock mutex");
	}
}

MutexAttr Mutex::getAttr()
{
	return m_mutex_attr;
}


163
Cond::Cond() : m_mutex(MutexAttr::Normal)
ahoms's avatar
ahoms committed
164
165
166
167
168
169
170
171
172
173
{
	if (pthread_cond_init(&m_cond, NULL) != 0)
		throw LIMA_COM_EXC(Error, "Error initializing condition");
}

Cond::~Cond()
{
	pthread_cond_destroy(&m_cond);
}

174
175
176
177
178
179
180
181
182
183
184
185
186
/** @brief wait on cond variable
 *  @return 
 *  - true if ok 
 *  - false if timeout
 */
bool Cond::wait(double timeout)
{
  int retcode = 0;
  if(timeout >= 0.)
    {
      struct timeval now;
      struct timespec waitTimeout;
      gettimeofday(&now,NULL);
187
      waitTimeout.tv_sec = now.tv_sec + time_t(timeout);
188
189
      waitTimeout.tv_nsec = (now.tv_usec * 1000) + 
	long((timeout - long(timeout)) * 1e9);
190
191
      if(waitTimeout.tv_nsec >= 1000000000L) // Carry
	++waitTimeout.tv_sec,waitTimeout.tv_nsec -= 1000000000L;
192
193
194
195
196
197
198
199
      retcode = pthread_cond_timedwait(&m_cond,&m_mutex.m_mutex,&waitTimeout);
    }
  else
      retcode = pthread_cond_wait(&m_cond, &m_mutex.m_mutex);
	
  if(retcode && retcode != ETIMEDOUT)
    throw LIMA_COM_EXC(Error, "Error waiting for condition");
  return !retcode;
ahoms's avatar
ahoms committed
200
201
202
203
204
205
206
207
}

void Cond::signal()
{
	if (pthread_cond_signal(&m_cond) != 0)
		throw LIMA_COM_EXC(Error, "Error signaling condition");
}

208
209
210
211
212
213
214
215
216
217
void Cond::acquire()
{
	m_mutex.lock();
}

void Cond::release()
{
	m_mutex.unlock();
}

218
219
220
221
222
void Cond::broadcast()
{
  if (pthread_cond_broadcast(&m_cond) != 0)
    throw LIMA_COM_EXC(Error, "Error broadcast condition");
}
ahoms's avatar
ahoms committed
223

224
pid_t lima::GetThreadID() {
225
226
227
228

#if defined(_WIN32)
	return GetCurrentThreadId();
#else
229
	return syscall(SYS_gettid);
230
231
#endif
	
232
233
}

234
235
236
237
238
239
240
241
242
243
Thread::ExceptionCleanUp::ExceptionCleanUp(Thread& thread)
	: m_thread(thread)
{
}

Thread::ExceptionCleanUp::~ExceptionCleanUp()
{
	m_thread.m_exception_handled = true;
}

244
Thread::Thread() : m_thread(NULL), m_started(false), m_finished(false), m_exception_handled(false), m_tid(0)
ahoms's avatar
ahoms committed
245
{
Florent Langlois's avatar
Florent Langlois committed
246
	pthread_attr_init(&m_thread_attr);
ahoms's avatar
ahoms committed
247
248
249
250
}

Thread::~Thread()
{
251
252
	if (m_started)
		join();
Florent Langlois's avatar
Florent Langlois committed
253
	pthread_attr_destroy(&m_thread_attr);
ahoms's avatar
ahoms committed
254
255
256
257
258
259
260
}

void Thread::start()
{
	if (m_started)
		throw LIMA_COM_EXC(Error, "Thread already started");

261
	m_finished = false;
Florent Langlois's avatar
Florent Langlois committed
262
	if (pthread_create(&m_thread, &m_thread_attr, staticThreadFunction, this) != 0)
ahoms's avatar
ahoms committed
263
264
265
266
267
		throw LIMA_HW_EXC(Error, "Error creating thread");

	m_started = true;
}

Florent Langlois's avatar
Florent Langlois committed
268
void Thread::join()
ahoms's avatar
ahoms committed
269
270
{
	if (!m_started)
271
		throw LIMA_COM_EXC(Error, "Thread not started or joined");
ahoms's avatar
ahoms committed
272

273
274
	pthread_join(m_thread, NULL);
	m_started = false;
ahoms's avatar
ahoms committed
275
276
277
278
279
280
281
282
283
284
285
286
}

bool Thread::hasStarted()
{
	return m_started;
}

bool Thread::hasFinished()
{
	return m_finished;
}

287
288
289
290
291
pid_t Thread::getThreadID()
{
	return m_tid;
}

ahoms's avatar
ahoms committed
292
293
void *Thread::staticThreadFunction(void *data)
{
294
295
	using namespace std;

ahoms's avatar
ahoms committed
296
	Thread *thread = (Thread *) data;
297
	thread->m_tid = GetThreadID();
ahoms's avatar
ahoms committed
298
299
300
301

	try {
		thread->threadFunction();
	} catch (...) {
302
303
		if (!thread->m_exception_handled) {
			ostream& os = cerr;
304
305
			long long thread_id = (long long) pthread_self();
			std::streamsize w = os.width();
306
307
308
309
310
			os << "***** Thread " 
			   << setw(8) << hex << thread_id << setw(w) << dec
			   << " function exited due to an exception "
			   << "without clean-up! *****" << endl;
		}
ahoms's avatar
ahoms committed
311
312
313
314
315
316
317
	}
	
	thread->m_finished = true;
	return NULL;
}

CmdThread::AuxThread::AuxThread(CmdThread& master)
318
	: m_master(&master)
ahoms's avatar
ahoms committed
319
320
321
322
323
324
325
326
327
{
}

CmdThread::AuxThread::~AuxThread()
{
}

void CmdThread::AuxThread::threadFunction()
{
328
	m_master->cmdLoop();
ahoms's avatar
ahoms committed
329
330
331
332
333
}

CmdThread::CmdThread()
	: m_thread(*this)
{
334
335
	m_status = InInit;
	m_status_history.set(InInit);
ahoms's avatar
ahoms committed
336
337
338
339
}

CmdThread::~CmdThread()
{
340
341
342
343
344
345
346
347
	using namespace std;

	if (!m_thread.hasStarted())
		return;

	if (getStatus() != Finished) {
		cerr << "***** Error: CmdThread did not call abort "
		     << "in the derived class destructor! *****";
348
349
		abort();
	}
ahoms's avatar
ahoms committed
350
351
}

352
AutoMutex CmdThread::lock() const
ahoms's avatar
ahoms committed
353
{
354
	return AutoMutex(m_cond.mutex(), AutoMutex::Locked);
ahoms's avatar
ahoms committed
355
356
}

357
AutoMutex CmdThread::tryLock() const
ahoms's avatar
ahoms committed
358
{
359
	return AutoMutex(m_cond.mutex(), AutoMutex::TryLocked);
ahoms's avatar
ahoms committed
360
361
}

362
int CmdThread::getStatus() const
ahoms's avatar
ahoms committed
363
{
364
	AutoMutex l = lock();
365
	return m_status;
ahoms's avatar
ahoms committed
366
367
}

368
369
int CmdThread::getNextCmd() const
{
370
371
	AutoMutex l = lock();
	return m_cmd.empty() ? None : m_cmd.front();
372
373
}

ahoms's avatar
ahoms committed
374
375
376
void CmdThread::setStatus(int status)
{
	AutoMutex l = lock();
377
378
	m_status = status;
	m_status_history.set(status);
ahoms's avatar
ahoms committed
379
380
381
382
383
384
	m_cond.signal();
}

void CmdThread::waitStatus(int status)
{
	AutoMutex l = lock();
385
	while (!m_status_history.test(status))
386
		m_cond.wait();
ahoms's avatar
ahoms committed
387
388
389
390
}

int CmdThread::waitNotStatus(int status)
{
391
392
	std::bitset<16> mask(0xffff);
	mask.reset(status);
393

394
395
	AutoMutex l = lock();
	while ((m_status_history & mask).none())
396
397
		m_cond.wait();
	return m_status;
ahoms's avatar
ahoms committed
398
399
400
401
402
}

void CmdThread::sendCmd(int cmd)
{
	AutoMutex l = lock();
403
	doSendCmd(cmd);
ahoms's avatar
ahoms committed
404
}
405

406
407
408
409
/** @brief send a command only if the return of if_test is true.
 *  
 *  function if_test get as argument the command and status
 */
410
void CmdThread::sendCmdIf(int cmd, bool (*if_test)(int,int))
411
{
412
	AutoMutex l = lock();
413
414
415

	if (if_test && if_test(cmd, m_status))
		doSendCmd(cmd);
416
417
}

418
void CmdThread::doSendCmd(int cmd)
419
{
420
	if (m_status == Finished)
421
422
		throw LIMA_HW_EXC(Error, "Thread has Finished");

423
424
425
	// Assume that we will have a call to waitStatus somewhere after the new command
	m_status_history.reset();

426
427
	m_cmd.push(cmd);
	m_cond.signal();
428
}
429

ahoms's avatar
ahoms committed
430
431
432
433
434
void CmdThread::start()
{
	if (m_thread.hasStarted())
		throw LIMA_COM_EXC(InvalidValue, "Thread already started");

435
	m_cmd.push(Init);
ahoms's avatar
ahoms committed
436
437
438
439
440
	m_thread.start();
}

void CmdThread::abort()
{
441
442
443
444
445
	if (getStatus() == Finished)
		return;

	sendCmd(Abort);
	waitStatus(Finished);
ahoms's avatar
ahoms committed
446
447
448
449
450
451
}

int CmdThread::waitNextCmd()
{
	AutoMutex l = lock();

452
453
	while (m_cmd.empty())
		m_cond.wait();
ahoms's avatar
ahoms committed
454

455
456
	int cmd = m_cmd.front();
	m_cmd.pop();
ahoms's avatar
ahoms committed
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
	return cmd;
}

void CmdThread::cmdLoop()
{
	while (getStatus() != Finished) {
		int cmd = waitNextCmd();
		switch (cmd) {
		case None:
			throw LIMA_COM_EXC(InvalidValue, "Invalid None cmd");
		case Init:
			init();
			break;
		case Abort:
			setStatus(Finished);
			break;
		default:
			execCmd(cmd);
		}
	}
}