/* * Copyright (c) 2005, Eric Crahen * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is furnished * to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * */ #ifndef __ZTBARRIER_H__ #define __ZTBARRIER_H__ #include "zthread/Condition.h" #include "zthread/Guard.h" #include "zthread/Waitable.h" #include "zthread/Runnable.h" namespace ZThread { /** * @class Barrier * @author Eric Crahen * @date <2003-07-16T09:54:01-0400> * @version 2.2.1 * * A Barrier is a Waitable object that serves as synchronization points for * a set of threads. A Barrier is constructed for a fixed number (N) of threads. * Threads attempting to wait() on a Barrier ( 1 - N) will block until the Nth * thread arrives. The Nth thread will awaken all the the others. * * An optional Runnable command may be associated with the Barrier. This will be run() * when the Nth thread arrives and Barrier is not broken. * * Error Checking * * A Barrier uses an all-or-nothing. All threads involved must successfully * meet at Barrier. If any one of those threads leaves before all the threads * have (as the result of an error or exception) then all threads present at * the Barrier will throw BrokenBarrier_Exception. * * A broken Barrier will cause all threads attempting to wait() on it to * throw a BrokenBarrier_Exception. * * A Barrier will remain 'broken', until it is manually reset(). */ template class Barrier : public Waitable, private NonCopyable { //! Broken flag bool _broken; //! Task flag bool _haveTask; //! Thread count unsigned int _count; //! Wait generation unsigned int _generation; //! Serialize access LockType _lock; //! Signaled when all thread arrive Condition _arrived; //! Command to run when all the thread arrive Task _task; public: //! Create a Barrier Barrier() : _broken(false), _haveTask(false), _count(Count), _generation(0), _arrived(_lock), _task(0) { } /** * Create a Barrier that executes the given task when all threads arrive * without error * * @param task Task to associate with this Barrier */ Barrier(const Task& task) : _broken(false), _haveTask(true), _count(Count), _generation(0), _arrived(_lock), _task(task) { } //! Destroy this Barrier virtual ~Barrier() {} /** * Enter barrier and wait for the other threads to arrive. This can block for an indefinite * amount of time. * * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this * Barrier as a result of an error. * @exception Interrupted_Exception thrown when the calling thread is interrupted. * A thread may be interrupted at any time, prematurely ending a wait * for one thread and breaking the barrier for all threads * * @see Waitable::wait() * * @post If no exception was thrown, all threads have successfully arrived * @post If an exception was thrown, the barrier is broken */ virtual void wait() { Guard g(_lock); if(_broken) throw BrokenBarrier_Exception(); // Break the barrier if an arriving thread is interrupted if(Thread::interrupted()) { // Release the other waiter, propagate the exception _arrived.broadcast(); _broken = true; throw Interrupted_Exception(); } if(--_count == 0) { // Wake the other threads if this was the last // arriving thread _arrived.broadcast(); // Try to run the associated task, if it throws then // break the barrier and propagate the exception try { if(_task) _task->run(); _generation++; } catch(Synchronization_Exception&) { _broken = true; throw; } catch(...) { assert(0); } } else { int myGeneration = _generation; try { // Wait for the other threads to arrive _arrived.wait(); } catch(Interrupted_Exception&) { // Its possible for a thread to be interrupted before the // last thread arrives. If the interrupted thread hasn't // resumed, then just propagate the interruption if(myGeneration != _generation) Thread().interrupt(); else _broken = true; } catch(Synchronization_Exception&) { // Break the barrier and propagate the exception _broken = true; throw; } // If the thread woke because it was notified by the thread // that broke the barrier, throw. if(_broken) throw BrokenBarrier_Exception(); } } /** * Enter barrier and wait for the other threads to arrive. This can block up to the * amount of time specified with the timeout parameter. The barrier will not break * if a thread leaves this function due to a timeout. * * @param timeout maximum amount of time, in milliseconds, to wait before * * @return * - true if the set of tasks being wait for complete before * timeout milliseconds elapse. * - false otherwise. * * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this * Barrier as a result of an error. * @exception Interrupted_Exception thrown when the calling thread is interrupted. * A thread may be interrupted at any time, prematurely ending a wait * for one thread and breaking the barrier for all threads * * @see Waitable::wait(unsigned long timeout) * * @post If no exception was thrown, all threads have successfully arrived * @post If an exception was thrown, the barrier is broken */ virtual bool wait(unsigned long timeout) { Guard g(_lock); if(_broken) throw BrokenBarrier_Exception(); // Break the barrier if an arriving thread is interrupted if(Thread::interrupted()) { // Release the other waiter, propagate the exception _arrived.broadcast(); _broken = true; throw Interrupted_Exception(); } if(--_count == 0) { // Wake the other threads if this was the last // arriving thread _arrived.broadcast(); // Try to run the associated task, if it throws then // break the barrier and propagate the exception try { if(_task) _task->run(); _generation++; } catch(Synchronization_Exception&) { _broken = true; throw; } catch(...) { assert(0); } } else { int myGeneration = _generation; try { // Wait for the other threads to arrive if(!_arrived.wait(timeout)) _broken = true; } catch(Interrupted_Exception&) { // Its possible for a thread to be interrupted before the // last thread arrives. If the interrupted thread hasn't // resumed, then just propagate the interruption if(myGeneration != _generation) Thread().interrupt(); else _broken = true; } catch(Synchronization_Exception&) { // Break the barrier and propagate the exception _broken = true; throw; } // If the thread woke because it was notified by the thread // that broke the barrier, throw. if(_broken) throw BrokenBarrier_Exception(); } return true; } /** * Break the Barrier ending the wait for any threads that were waiting on * the barrier. * * @post the Barrier is broken, all waiting threads will throw the * BrokenBarrier_Exception */ void shatter() { Guard g(_lock); _broken = true; _arrived.broadcast(); } /** * Reset the Barrier. * * @post the Barrier is no longer Broken and can be used again. */ void reset() { Guard g(_lock); _broken = false; _generation++; _count = Count; } }; } // namespace ZThread #endif // __ZTBARRIER_H__