/*
Editor's note:
COTD Entry: Flexible Synchronization Objects by Justin Wilder [justin@bigradio.com]
*/
#include <windows.h>
#include <memory>
#ifdef _MT
namespace utility {
// Defines an interface for synchronization devices that can be waited on.
class waitable
{
public:
virtual ~waitable() throw()
{
for (handle_vector::size_type n = 0; n < hvect.size(); ++n)
::CloseHandle(hvect[n]);
}
// Waits for this object to be signalled. Returns a positive integer context value, or less than
// zero for an error or timeout. The timeout value defaults to infinite (no timeout).
int wait(unsigned int timeout = INFINITE) const throw()
{
DWORD ret = ::WaitForMultipleObjects(hvect.size(), &hvect[0], wait_all, timeout);
if ((ret >= WAIT_OBJECT_0) && (ret < (WAIT_OBJECT_0 + hvect.size())))
return ret - WAIT_OBJECT_0;
else if ((ret >= WAIT_ABANDONED_0) && (ret < (WAIT_ABANDONED_0 + hvect.size())))
return ret - WAIT_ABANDONED_0;
else
return -1;
}
protected:
typedef std::vector<HANDLE> handle_vector;
handle_vector hvect;
bool wait_all;
waitable(handle_vector::size_type res = 0, bool wait_all = false) throw() : wait_all(wait_all) { hvect.reserve(res); }
waitable(const waitable& c) throw() : hvect(c.dup_handles()), wait_all(c.wait_all) {}
handle_vector dup_handles() const throw()
{
handle_vector ret(hvect.size());
for (handle_vector::size_type n = 0; n < hvect.size(); ++n) ret[n] = dup_handle(hvect[n]);
return ret;
}
HANDLE dup_handle(HANDLE h) const throw()
{
if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(), &h, 0, FALSE, DUPLICATE_SAME_ACCESS))
return NULL;
else return h;
}
void insert(const handle_vector& hvect) throw()
{
for (handle_vector::size_type n = 0; n < hvect.size(); ++n)
this->hvect.push_back(hvect[n]);
}
void insert(const waitable& c) throw()
{
insert(c.dup_handles());
}
void insert(HANDLE h) throw()
{
hvect.push_back(h);
}
HANDLE get_handle(handle_vector::size_type index = 0) const throw()
{
assert(index < hvect.size());
return hvect[index];
}
handle_vector::size_type size() const throw() { return hvect.size(); }
friend class waitable_group;
friend class timer;
};
// Groups multiple waitable objects into a single waitable object, that will be
// signalled when all component objects are signalled (wait_all == true), or when
// any one component object is signalled (wait_all == false, the default).
class waitable_group : public waitable
{
public:
waitable_group(const waitable_group& c) throw() : waitable(c) {}
explicit waitable_group(const waitable& w1, bool wait_all = false) throw()
: waitable(w1.size(), wait_all) { insert(w1); }
waitable_group(const waitable& w1, const waitable& w2, bool wait_all = false) throw()
: waitable(w1.size() + w2.size(), wait_all) { insert(w1); insert(w2); }
waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, bool wait_all = false) throw()
: waitable(w1.size() + w2.size() + w3.size(), wait_all) { insert(w1); insert(w2); insert(w3); }
waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, const waitable& w4, bool wait_all = false) throw()
: waitable(w1.size() + w2.size() + w3.size() + w4.size(), wait_all) { insert(w1); insert(w2); insert(w3); insert(w4); }
waitable_group(const waitable* wa, unsigned int len, bool wait_all = false) throw()
: waitable(0, wait_all) { assert((len > 0) && (len < 1000)); for (unsigned int n = 0; n < len; ++n) insert(wa[n]); }
};
// TODO: make this derive from waitable.
// A lock sentry. The object it's constructed with i\s lock()'ed when the sentry is created, and
// unlock()'ed when the sentry is destroyed.
template <typename _LOCK_TYPE>
class lock_sentry
{
public:
lock_sentry(const _LOCK_TYPE& _l) : _l(_l) { _l.lock(); }
~lock_sentry() { _l.unlock(); }
private:
const _LOCK_TYPE& _l;
};
// Implements mutual-exclusion object.
class mutex : public waitable
{
public:
typedef lock_sentry<mutex> sentry;
mutex() throw() { insert(::CreateMutex(NULL, FALSE, NULL)); assert(get_handle() != NULL); }
// Blocks until the mutex can be locked.
void lock() const throw() { wait(); }
// Unlocks the mutex. Every call to lock must be matched with a subsequent unlock.
void unlock() const throw() { ::ReleaseMutex(get_handle()); }
};
// A waitable event.
class event : public waitable
{
public:
event() throw() { insert(CreateEvent(NULL, FALSE, FALSE, NULL)); assert(get_handle() != NULL); }
// Creates a copy of this event. Copied events can be used interchangeably (i.e. a function
// call on one is the same as a function call on another). The underlying system object is only
// destroyed once all copies of an event have been destroyed.
event(const event& c) : waitable(c) {}
// Releases exactly one thread waiting on this event. If no threads are waiting at the time of the signal, the
// event will remain signalled until a thread waits on it (and is subsequently released).
void signal() const throw() { SetEvent(get_handle()); }
// Releases zero or one waiting thread. If no threads are waiting at the time of the pulse,
// no threads are released.
void pulse() const throw() { PulseEvent(get_handle()); }
// Resets the event. If the event is currently signalled, this function sets it to unsignalled.
void reset() const throw() { ResetEvent(get_handle()); }
// INCOMPLETE: it's hard to implement these two with win32 events. May need to create another
// manual-reset event for this one to work. Not implemented because it's currently unused.
// Releases every thread currently waiting on this event.
void broadcast() const throw() { assert2(false, "waitable::broadcast() not currently supported"); }
// Releases every thread currently waiting on this event, as well as every thread that will wait on it
// in the future, until reset is called.
void set() const throw() { assert2(false, "waitable::set() not currently supported"); }
friend inline unsigned int wait(const event* ea, unsigned int len) throw();
};
// Your typical semaphore. It can be waited on in conjunction with any other waitable object.
class semaphore : public waitable
{
public:
// Initializes the semaphore with the given number of locks, and the given number initially locked.
// 'size' must be greater than zero and 'locked' cannot be greater than 'size'.
semaphore(unsigned int size, unsigned int locked = 0) throw()
{
assert((size > 0) && (locked <= size));
insert(::CreateSemaphore(NULL, size - locked, size, NULL));
assert(get_handle() != NULL);
}
// Creates a copy of this semaphore. Copied semaphores can be used interchangeably (i.e. a function
// call on one is the same as a function call on another). The underlying system object is only
// destroyed once all copies of a semaphore have been destroyed.
semaphore(const semaphore& c) : waitable(c) {}
// Takes a lock on the semaphore. Blocks if none are currently available. This function has
// the same effect as waitable::wait(), and can be used interchangeably with it.
void lock() const throw() { wait(); }
// Releases a lock or a waiting thread (a thread blocking on waitable::wait()) on the semaphore.
// Every lock (or wait) must be followed by an accompanying call to unlock.
void unlock() const throw() { ::ReleaseSemaphore(get_handle(), 1, NULL); }
};
// A timer that calls a callback function (or function object) at a specified time.
template <typename _CALLBACK>
class callback_timer
{
public:
typedef _CALLBACK callback_type;
typedef unsigned int time_type;
explicit callback_timer(const callback_type& cb = callback_type()) throw() : cb(cb), started(false)
{
}
// Constructs the timer initially started.
explicit callback_timer(time_type period, bool periodic = false, callback_type& cb = callback_type()) throw(std::runtime_error)
: cb(cb), started(false)
{
if (!start(period, periodic)) throw std::runtime_error("Failed to start timer.");
}
callback_timer(const timer& c) throw() : cb(c.cb), started(false) {}
~callback_timer()
{
stop();
}
// Starts the timer to be signalled in period milliseconds. If periodic is false (default)
// the timer will only be signalled once, if periodic is true, it will be signalled every
// period milliseconds.
bool start(time_type period, bool periodic = false) throw()
{
if (started == true) return false;
thread_args* args = new thread_args(period, absolute_time(), periodic, cb, halt_event, going_mutex);
if (_beginthread(thread_entry, 0, args) == -1)
{
delete args;
return false;
}
started = true;
return true;
}
// Stops the timer.
void stop() throw()
{
halt_event.signal();
going_mutex.lock(); // Wait for timer thread to cease.
started = false;
going_mutex.unlock();
}
protected:
callback_type cb;
event halt_event;
mutex going_mutex;
bool started;
struct absolute_time
{
absolute_time() { _ftime(&t); }
absolute_time(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; }
absolute_time& operator=(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; return *this; }
absolute_time& operator+=(const absolute_time& c)
{
t.millitm += c.t.millitm;
if (t.millitm >= 1000) { t.time += 1; t.millitm -= 1000; }
t.time += c.t.time;
return *this;
}
absolute_time& operator-=(const absolute_time& c)
{
t.millitm -= c.t.millitm;
if ((signed)t.millitm < 0) { t.time -= 1; t.millitm += 1000; }
t.time -= c.t.time;
return *this;
}
absolute_time operator+(const absolute_time& c) const
{
absolute_time temp(*this);
return temp += c;
}
absolute_time operator-(const absolute_time& c) const
{
absolute_time temp(*this);
return temp -= c;
}
operator time_type () const
{
return t.time * 1000 + t.millitm;
}
protected:
_timeb t;
};
struct thread_args
{
thread_args() {}
thread_args(time_type period, const absolute_time& start, bool periodic,
const callback_type& cb, const event& halt_event, const mutex& going_mutex)
: period(period), start(start), periodic(periodic), cb(cb), halt_event(halt_event),
going_mutex(going_mutex) {}
thread_args(const thread_args& c) : period(c.period), start(c.start), periodic(c.periodic),
cb(c.cb), halt_event(c.halt_event), going_mutex(c.going_mutex) {}
time_type period;
absolute_time start;
bool periodic;
callback_type cb;
event halt_event;
mutex going_mutex;
};
static void thread_entry(void* context)
{
assert(context != NULL);
std::auto_ptr<thread_args> args((thread_args*)context);
args->going_mutex.lock();
while (true)
{
time_type time_left = (absolute_time() - args->start) % args->period;
if (args->halt_event.wait(time_left) < 0)
{
args->cb();
if (args->periodic) continue;
}
break;
}
args->going_mutex.unlock();
}
};
// A waitable timer. Win32 is supposed to have a waitable timer, but it only works for NT.
// So I had to implement one myself.
class timer : public waitable
{
public:
typedef unsigned int time_type;
// Constructs the timer initially stopped.
timer() throw() : t(callback(signal_event)) {}
// Constructs the timer initially started.
explicit timer(time_type period, bool periodic = false) throw(std::runtime_error)
: t(period, periodic, callback(signal_event)) {}
timer(const timer& c) : waitable(c), t(c.t), signal_event(c.signal_event) {}
// Starts the timer to be signalled in period milliseconds. If periodic is false (default)
// the timer will only be signalled once, if periodic is true, it will be signalled every
// period milliseconds.
bool start(time_type period, bool periodic = false) throw()
{
return t.start(period, periodic);
}
// Stops the timer. This function will not change the signal state of the timer (if it was signalled
// prior to the call, it remains so).
void stop() throw()
{
t.stop();
}
protected:
class callback
{
public:
explicit callback(const event& e) : e(e) {}
callback(const callback& c) : e(c.e) {}
void operator()() { e.signal(); }
protected:
event e;
};
typedef callback_timer<callback> timer_type;
timer_type t;
event signal_event;
};
}; //namespace utility {
#endif //_MT |