//------------------------------------------------------------------------ // threads.h - C++ Interface to modified LWP package // Copyright (C) 1997 Paolo De Marino // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public // License as published by the Free Software Foundation; either // version 2 of the License, or (at your option) any later version, // with the only exception that all the people in the THANKS file // must receive credit. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Library General Public License for more details. // // You should have received a copy of the GNU Library General Public // License along with this library; see the file COPYING.LIB. // If not, write to the Free Software Foundation, Inc., 675 Mass Ave, // Cambridge, MA 02139, USA. // // For contacting the author send electronic mail to // paolodemarino@usa.net // // Or paper mail to // // Paolo De Marino // Via Donizetti 1/E // 80127 Naples // Italy // // History: see history.txt // -------------------------------------------------------------------------- #ifndef __THREADS_H #define __THREADS_H #include "lwp.h" #include "queue.h" #include // Forward declarations of (almost) all the classes: class CritSect; class MutexSemaphore; class MutexSemaphoreLock; class CountSemaphore; class Gate; class Event; class ReadWriteSemaphore; class WriteLock; class ReadLock; class Thread; class Message; class CritSect { private: static unsigned int numLocks; // Lets CritSect be used recursively public: CritSect() { if(numLocks++ == 0) lwp_thread_disable(); }; ~CritSect() { if(--numLocks == 0) lwp_thread_enable(); }; }; class MutexSemaphore { lwp_semaphore sema; int numLocks; public: MutexSemaphore(void) : numLocks(0) { lwp_init_semaphore(&sema); }; ~MutexSemaphore() { }; // Don't know how to handle this exactly! Semaphores and sync objects are // essentially static! friend class MutexSemaphoreLock; int volatile isLocked(void) { return sema.owned; }; int volatile owner(void) { return sema.owner_id; }; volatile void getLock(void) { // Waiting for a semaphore you own doesn't block the machine! lwp_wait_semaphore(&sema); numLocks++; // Now we can increment the variable without risks. }; volatile void unLock(void) { if(--numLocks == 0) lwp_release_semaphore(&sema); }; }; class MutexSemaphoreLock { MutexSemaphore & _res; public: MutexSemaphoreLock(MutexSemaphore & res) : _res(res) { _res.getLock(); } ~MutexSemaphoreLock() { _res.unLock(); } }; // This class needs a few explanations. When a thread is waiting for the // semaphore, it does just wait for the counter to become != 0, then // decrement it. As there is a REMOTE chance that the thread will be // pre-empted AFTER waiting and BEFORE decrementing the variable, and thus // that TWO or more threads will pass, we lock the waiting threads // using the waitQueue mutex. In this way, the passCounter variable is // locked over the waiting/modification period. // The Clear() function shouldn't lock the semaphore, as it would bring to // deadlock. class CountSemaphore { volatile int passCounter; MutexSemaphore waitQueue; public: CountSemaphore(int howMany = 0) : passCounter(howMany) { }; ~CountSemaphore() { }; volatile void Clear(unsigned int howMany = 1) { passCounter += howMany; } volatile void Wait() { MutexSemaphoreLock lock(waitQueue); lwp_wait_true(&passCounter); passCounter--; } }; template class SerializedQueue { private: MutexSemaphore QueueLock; CountSemaphore waiters; Queue theQueue; public: SerializedQueue() : QueueLock(),waiters(),theQueue() { }; ~SerializedQueue() { }; void push(T what) { do { MutexSemaphoreLock lock(QueueLock); theQueue.push(what); } while(0); waiters.Clear(); } T pop() { waiters.Wait(); // Wait for elements do { MutexSemaphoreLock lock(QueueLock); T retval = theQueue.pop(); return retval; } while(0); } int isEmpty() const { return theQueue.isEmpty(); // No need to lock it! } int numElems() const { return theQueue.numElems(); // = = = = =! } }; class Gate // Used to free a whole set of threads at the { // same time! private: volatile int theGate; public: Gate(int open = 0) : theGate(open) { }; ~Gate() { }; void Open() { theGate = 1; } void Close() { theGate = 0; } void Wait() { lwp_wait_true(&theGate); } }; class Event // Mainly, a complication of the precedent classes. { // An "Event" can be fired as many times as you wish! private: // Gate firstGate,secondGate; volatile int event_flag; public: // Event() : firstGate(1),secondGate(0) { }; Event() : event_flag(0) { }; ~Event() { }; // The two lwp_yield, due to the internals of the LWP package, // will warranty all the waiting threads to have been cleared to pass. void Raise() { // firstGate.Close(); // Lock the first gate! // secondGate.Open(); // Open the second gate! // lwp_yield(); // Release control for the 1st time... // secondGate.Close(); // Close the second gate... // firstGate.Open(); // Let all the others out! // lwp_yield(); // Release control for the 2nd time! lwp_pulse_true(&event_flag); }; void Wait() { // firstGate.Wait(); // VERY simple! // secondGate.Wait(); lwp_wait_true(&event_flag); }; }; class Message { private: Thread* theSource; void * theMessage; public: Message() { theSource = 0; theMessage = 0; } Message(Thread* src,void *msg) { theSource = src; theMessage = msg; } Message(const Message& org) { theSource = org.theSource; theMessage= org.theMessage; } Message& operator=(const Message& org) { theSource = org.theSource; theMessage= org.theMessage; return *this; } ~Message() { }; // Doesn't delete the contents! void *Contents(void) const { return theMessage; }; Thread *Source(void) const { return theSource; }; }; // Writer-priority read/write lock. // This class implements a very common kind of lock, used every time there // is an object that is harmless to be read concurrently by many process, // but that can be updated by just one process at a time. // It is a writer-priority lock because the writer: // 1st: locks all the other writers; // 2nd: locks all the reader processes, than can even starve if there is a // big number of writing processes, or if the same process writes // to the object frequently. This can be avoided simply by calling // lwp_yield after releasing the writing lock. class ReadWriteSemaphore { private: Gate readersQueue; volatile int readers; MutexSemaphore writersLock; public: ReadWriteSemaphore() : readersQueue(1),readers(0),writersLock() { }; ~ReadWriteSemaphore() { }; void ReadLock() { readersQueue.Wait(); // Wait for gate to be opened. do { CritSect crit; readers++; // Signal we are reading! } while(0); } void ReadUnLock() { CritSect crit; readers--; // Signal we are reading no more! } void WriteLock() { writersLock.getLock(); readersQueue.Close(); // Close the readers queue. lwp_wait_false(&readers); // Wait for all the readers // to have finished } void WriteUnLock() { readersQueue.Open(); // Readers can go on now! writersLock.unLock(); } }; class WriteLock { private: ReadWriteSemaphore & theSemaphore; public: WriteLock(ReadWriteSemaphore & sema) : theSemaphore(sema) { theSemaphore.WriteLock(); } ~WriteLock() { theSemaphore.WriteUnLock(); } }; class ReadLock { private: ReadWriteSemaphore & theSemaphore; public: ReadLock(ReadWriteSemaphore & sema) : theSemaphore(sema) { theSemaphore.ReadLock(); } ~ReadLock() { theSemaphore.ReadUnLock(); } }; typedef int ThreadId; // ------------------------------------------------------------Thread class Thread { protected: ThreadId pid; volatile int run, // Set to 0 by constructor, locks the process // when a communication has already been estabilished. dead; // Checks whether the thread is dead. SerializedQueue messagesQueue; // Message handler data // ------------------------------------------------------------Static static void spawnedProcess(void* instance); Thread() : messagesQueue() { }; // Protected no args constructor public: // -----------------------------------------------------------Publics Thread(unsigned stack,unsigned priority = 1,int startImmediately = 0); void start(void) { run = 1; // Start the thread now! } static Thread ¤tThread(void) { return * ( (Thread *) lwp_getuserptr() ); } static ThreadId currentPid(void) { return lwp_getpid(); } static unsigned getPriority(void) { return lwp_get_priority(); } static void setPriority(unsigned priority) { lwp_set_priority(priority); } int isDead(void) const { return dead; } int isLiving(void) const { return !dead; } void waitCompletion(void) { assert(getPid() != currentPid()); // This is, not ourselves lwp_wait_true(&dead); } // This function needs a few explanations. 1st: the lwp_kill function // removes a task from the execution list. IT DOESN'T KILL IT INSTANTLY! // Thus we set the "dead" flag ONLY when we're sure it won't harm. // 2nd: if we're committing suicide, we won't be able to unlock the CPU.Thus, // we set the "dead" flag BEFORE calling the lwp_kill, so that it won't // harm. void kill(void) { if( pid != currentPid() ) { if(!dead) // If it isn't already dead, { lwp_kill(pid); // Kill the other task dead = 1; // Now, it is surely dead! }; } else { dead = 1; // No risk in doing so. lwp_kill(pid); // Suicide! (Remove from tasks list). } }; ThreadId const getPid(void) const { return pid; }; // --------------------------------------------------Message Handling void postMessage(void *Mess) { Message theMessage(¤tThread(),Mess); messagesQueue.push(theMessage); }; static void postMessage(void *Mess,Thread& dest) { Message theMessage(&Thread::currentThread(),Mess); dest.messagesQueue.push(theMessage); }; Message getMessage(void) { return messagesQueue.pop(); // Waits for the message to come! }; int waitingMessages(void) const { return messagesQueue.numElems(); }; int noMessages(void) const { return messagesQueue.isEmpty(); }; // -----------------------------------------------------------Virtual virtual void prepare(void); // Starts as soon as the thread is created virtual void execute(void); // Starts when the thread is "start()"ed virtual void cleanup(void); // Starts when execute() has finished }; void InitLwp(int speed); void DoneLwp(void); #endif