Three kinds of synchronization routines are defined here: semaphores, locks and condition variables (the implementation of the last two are left to the reader).
Semaphore
Semaphore::Semaphore(char* debugName, int initialValue) { name = debugName; value = initialValue; queue = new List; }
voidSemaphore::P(){ IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts while (value == 0) { // semaphore not available queue->Append((void *)currentThread); // so go to sleep currentThread->Sleep(); } value--; // semaphore available, consume its value (void) interrupt->SetLevel(oldLevel); // re-enable interrupts }
voidSemaphore::V(){ Thread *thread; IntStatus oldLevel = interrupt->SetLevel(IntOff); thread = (Thread *)queue->Remove(); if (thread != NULL) // make thread ready, consuming the V immediately scheduler->ReadyToRun(thread); value++; (void) interrupt->SetLevel(oldLevel); }
It uses queue to achieve Semaphore, making it in order. When value = 0 , that is, no resource available, it’s put in the pending queue and the thread sleeps; When value > 0 , it breaks the while loop and consume the resource (value--), but not removed from pending queue immediately until the V() operates , after which it will be put in ReadyToRun, making value++ .
voidScheduler::Run(Thread *nextThread){ Thread *oldThread = currentThread; // ............................ oldThread->CheckOverflow(); // check if the old thread had an undetected stack overflow currentThread = nextThread; // switch to the next thread currentThread->setStatus(RUNNING); // nextThread is now running //............................. }
And in ReadyToRun, we see that it’s put in readyList, not running immediately until Run() by switching context .
voidThread::Sleep(){ Thread *nextThread; ASSERT(this == currentThread); ASSERT(interrupt->getLevel() == IntOff); DEBUG('t', "Sleeping thread \"%s\"\n", getName()); status = BLOCKED; while ((nextThread = scheduler->FindNextToRun()) == NULL) interrupt->Idle(); // no one to run, wait for an interrupt scheduler->Run(nextThread); // returns when we've been signalled }
When Fork(), put in ReadyToRun ; When Yield() , find next to Run() and put the current thread in ReadyToRun ; When Sleep(), set its status to BLOCKED then find one to Run() .
So the whole process is:
Locks
voidLock::Acquire(){ IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts lock->P(); // procure the semaphore owner = currentThread; // record the new owner of the lock (void) interrupt->SetLevel(oldLevel); // re-enable interrupts }
voidLock::Release(){ IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts // Ensure: a) lock is BUSY b) this thread is the same one that acquired it. ASSERT(currentThread == owner); owner = NULL; // clear the owner lock->V(); // vanquish the semaphore (void) interrupt->SetLevel(oldLevel); }
When a thread acquires a lock, the lock operates P() (If no resource available, it blocks). If successful, it’ll be the lock owner. When it releases the lock (it will check if the owner is the current thread at first), it removes owner property, then the lock operators V() , which means some resources are free currently for some threads to be ready to run.
Condition Variables
voidCondition::Wait(Lock* conditionLock){ IntStatus oldLevel = interrupt->SetLevel(IntOff); ASSERT(conditionLock->isHeldByCurrentThread()); // check pre-condition if (queue->IsEmpty()) { lock = conditionLock; // helps to enforce pre-condition } ASSERT(lock == conditionLock); // another pre-condition queue->Append(currentThread); // add this thread to the waiting list conditionLock->Release(); // release the lock currentThread->Sleep(); // goto sleep conditionLock->Acquire(); // awaken: re-acquire the lock (void) interrupt->SetLevel(oldLevel); }
voidCondition::Signal(Lock* conditionLock){ Thread *nextThread; IntStatus oldLevel = interrupt->SetLevel(IntOff); ASSERT(conditionLock->isHeldByCurrentThread()); if (!queue->IsEmpty()) { ASSERT(lock == conditionLock); nextThread = (Thread *)queue->Remove(); scheduler->ReadyToRun(nextThread); // wake up the thread } (void) interrupt->SetLevel(oldLevel); }
voidCondition::Broadcast(Lock* conditionLock){ Thread *nextThread; IntStatus oldLevel = interrupt->SetLevel(IntOff); ASSERT(conditionLock->isHeldByCurrentThread()); if (!queue->IsEmpty()) { ASSERT(lock == conditionLock); while (nextThread = (Thread *)queue->Remove()) { scheduler->ReadyToRun(nextThread); // wake up the thread } } (void) interrupt->SetLevel(oldLevel); }
It uses conditional lock to schedule threads. The Wait() makes the thread waiting in conditional queue, then release lock to make the lock available for others, then go to sleep. When waken up, it acquires the lock so it’s ready to run. The Signal() pops up a thread from the conditional queue, make it ready to run. The Broadcast() empties the conditional queue and make them all ready to run.
ring.cc in lab3
voidRing::Put(slot *message){ buffer[in].thread_id = message->thread_id; buffer[in].value = message->value; in = (in + 1) % size; current++; }
Put() puts the message in ring with index in, which points to the current empty slot, the next one of the last non-empty slot if the ring’s not empty, then in goes to next. Get() fetches the message from ring by index out, which points to the first non-empty slot, then out goes to next for the next fetching operator. The two don’t check if the ring is empty or full.
So I add a variable current to get the current size of the ring, to achieve Full() and Empty() function.
intRing::Empty(){ return (current == 0); }
intRing::Full(){ return (current == size); }
prodcons.cc in lab3
The original code requires to be implemented, so below is what I implement.
Thread *producers[N_PROD]; //array of pointers to the producer Thread *consumers[N_CONS]; // and consumer threads; char prod_names[N_PROD][MAX_NAME]; //array of charater string for prod names char cons_names[N_CONS][MAX_NAME]; //array of charater string for cons names Semaphore *nempty, *nfull; //two semaphores for empty and full slots Semaphore *mutex; //semaphore for the mutual exclusion Ring *ring;
voidProducer(_int which){ int num; slot *message = new slot(0,0); for (num = 0; num < N_MESSG ; num++) { message->value = num; message->thread_id = which; nfull->P(); mutex->P(); ring->Put(message); mutex->V(); nempty->V(); } }
voidProdCons(){ int i; DEBUG('t', "Entering ProdCons"); nempty = new Semaphore("nempty", BUFF_SIZE); nfull = new Semaphore("nfull", 0); mutex = new Semaphore("mutex", 1); ring = new Ring(BUFF_SIZE); for (i = 0; i < N_PROD; i++) { sprintf(prod_names[i], "producer_%d", i); producers[i] = new Thread(prod_names[i]); producers[i]->Fork(Producer, i); } for (i = 0; i < N_CONS; i++) { sprintf(cons_names[i], "consumer_%d", i); consumers[i] = new Thread(cons_names[i]); consumers[i]->Fork(Consumer, i); } }
Semaphore and Buffer
nempty = new Semaphore("nempty", BUFF_SIZE); nfull = new Semaphore("nfull", 0); mutex = new Semaphore("mutex", 1); ring = new Ring(BUFF_SIZE);
nempty and nfull are the semaphores for critical resources. When nempty = 0 , it means the buffer is empty, and when a resource is generated, nempty-- , when consumed, nempty++ , maximum of which is BUFF_SIZE ; When nfull = 0 , it’s empty. When generated, nfull++, and when consumed, nfull-- .
The Fork() of producer and consumer threads
for (i = 0; i < N_PROD; i++) { sprintf(prod_names[i], "producer_%d", i); producers[i] = new Thread(prod_names[i]); producers[i]->Fork(Producer, i); } for (i = 0; i < N_CONS; i++) { sprintf(cons_names[i], "consumer_%d", i); consumers[i] = new Thread(cons_names[i]); consumers[i]->Fork(Consumer, i); }
N_PROD producers and N_CONS consumers forked with name generated by i . It uses Fork(func, arg) .
Different from the original Fork() in Linux System Call, it doesn’t copy all the variables and status from the current thread, it just create a new thread without any variables but pass it a function and an argument, and just append it to ReadyToRun() .
lab3
Codes changed are listed above. Then just make and run.
We see it generates two files, tmp_0 and tmp_1 , which are generated by consumer 0 and consumer 1 . But there are 8 messages in tmp_0, 0 message in tmp_1 . Only the consumer 0 get the resource.
But if I run ./nachos -rs 5 , things are different:
------------------- tmp_0 ------------------- ### Sun Apr 11 10:23:53 2021 producer id --> 0; Message number --> 0; ### Sun Apr 11 10:23:53 2021 producer id --> 1; Message number --> 0; ### Sun Apr 11 10:23:53 2021 producer id --> 0; Message number --> 2; ### Sun Apr 11 10:23:53 2021 producer id --> 1; Message number --> 1; ### Sun Apr 11 10:23:53 2021 producer id --> 0; Message number --> 3; ### Sun Apr 11 10:23:53 2021 producer id --> 1; Message number --> 3; ------------------- tmp_1 ------------------- ### Sun Apr 11 10:23:53 2021 producer id --> 0; Message number --> 1; ### Sun Apr 11 10:23:53 2021 producer id --> 1; Message number --> 2;
So what does rs do?
In the main.cc of lab3, we see function initialize(), which located in system.cc in threads :
elseif (!strcmp(*argv, "-rs")) { ASSERT(argc > 1); RandomInit(atoi(*(argv + 1))); // initialize pseudo-random number generator randomYield = TRUE; argCount = 2; } ----------------------------------------------- if (randomYield) // start the timer (if needed) timer = new Timer(TimerInterruptHandler, 0, randomYield); ----------------------------------------------- staticvoidTimerInterruptHandler(_int dummy){ if (interrupt->getStatus() != IdleMode) interrupt->YieldOnReturn(); }
We see randomYield = TRUE and if true it sets a new timer with TimerInterruptHandler() . In machine , it does many things:
// -------- in stats.h ------------ #define UserTick 1 // advance for each user-level instruction #define SystemTick 10 // advance each time interrupts are enabled #define TimerTicks 100 // (average) time between timer interrupts
// ------------ interrupt.cc ------------- voidInterrupt::Schedule(VoidFunctionPtr handler, _int arg, int fromNow, IntType type){ int when = stats->totalTicks + fromNow; PendingInterrupt *toOccur = new PendingInterrupt(handler, arg, when, type); DEBUG('i', "Scheduling interrupt handler the %s at time = %d\n", intTypeNames[type], when); ASSERT(fromNow > 0); pending->SortedInsert(toOccur, when); } // -------------- OneTick() ----------------- while (CheckIfDue(FALSE)) // check for pending interrupts ; ChangeLevel(IntOff, IntOn); // re-enable interrupts if (yieldOnReturn) { // if the timer device handler asked // for a context switch, ok to do it now yieldOnReturn = FALSE; status = SystemMode; // yield is a kernel routine currentThread->Yield(); status = old; }
If randomized, it will generated a random number range in [1, 200] , which will be passed to Schedule() and appended in pending queue, telling system when to interrupt. So that’s why the context switch will occur when argument -rs is used, otherwise the consumer 0 will never yield so the consumer 1 gets nothing.