0%

Sync in Nachos

Preparation

Before I begin lab3, I should read some codes.

synch.cc in threads

Three kinds of synchronization routines are defined here: semaphores, locks and condition variables (the implementation of the last two are left to the reader).

Semaphore

Semaphore::Semaphore(char* debugName, int initialValue) {
name = debugName;
value = initialValue;
queue = new List;
}

void Semaphore::P() {
IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts
while (value == 0) { // semaphore not available
queue->Append((void *)currentThread); // so go to sleep
currentThread->Sleep();
}
value--; // semaphore available, consume its value
(void) interrupt->SetLevel(oldLevel); // re-enable interrupts
}

void Semaphore::V() {
Thread *thread;
IntStatus oldLevel = interrupt->SetLevel(IntOff);
thread = (Thread *)queue->Remove();
if (thread != NULL) // make thread ready, consuming the V immediately
scheduler->ReadyToRun(thread);
value++;
(void) interrupt->SetLevel(oldLevel);
}

It uses queue to achieve Semaphore, making it in order. When value = 0 , that is, no resource available, it’s put in the pending queue and the thread sleeps; When value > 0 , it breaks the while loop and consume the resource (value--), but not removed from pending queue immediately until the V() operates , after which it will be put in ReadyToRun, making value++ .

void Scheduler::ReadyToRun (Thread *thread) {
DEBUG('t', "Putting thread %s on ready list.\n", thread->getName());
thread->setStatus(READY);
readyList->Append((void *)thread);
}

void Scheduler::Run (Thread *nextThread) {
Thread *oldThread = currentThread;
// ............................
oldThread->CheckOverflow(); // check if the old thread had an undetected stack overflow
currentThread = nextThread; // switch to the next thread
currentThread->setStatus(RUNNING); // nextThread is now running
//.............................
}

And in ReadyToRun, we see that it’s put in readyList, not running immediately until Run() by switching context .

Scheduler

In thread.cc, we see how it’s scheduled:

void Thread::Fork(VoidFunctionPtr func, _int arg) {
// ...............................
StackAllocate(func, arg);
IntStatus oldLevel = interrupt->SetLevel(IntOff);
scheduler->ReadyToRun(this); // ReadyToRun assumes that interrupts are disabled!
(void) interrupt->SetLevel(oldLevel);
}

void Thread::Yield () {
Thread *nextThread;
IntStatus oldLevel = interrupt->SetLevel(IntOff);
ASSERT(this == currentThread);
DEBUG('t', "Yielding thread \"%s\"\n", getName());
nextThread = scheduler->FindNextToRun();
if (nextThread != NULL) {
scheduler->ReadyToRun(this);
scheduler->Run(nextThread);
}
(void) interrupt->SetLevel(oldLevel);
}

void Thread::Sleep () {
Thread *nextThread;
ASSERT(this == currentThread);
ASSERT(interrupt->getLevel() == IntOff);
DEBUG('t', "Sleeping thread \"%s\"\n", getName());
status = BLOCKED;
while ((nextThread = scheduler->FindNextToRun()) == NULL)
interrupt->Idle(); // no one to run, wait for an interrupt
scheduler->Run(nextThread); // returns when we've been signalled
}

When Fork(), put in ReadyToRun ; When Yield() , find next to Run() and put the current thread in ReadyToRun ; When Sleep(), set its status to BLOCKED then find one to Run() .

So the whole process is:

OS_Picture1.png

Locks

void Lock::Acquire() {
IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts
lock->P(); // procure the semaphore
owner = currentThread; // record the new owner of the lock
(void) interrupt->SetLevel(oldLevel); // re-enable interrupts
}

void Lock::Release() {
IntStatus oldLevel = interrupt->SetLevel(IntOff); // disable interrupts
// Ensure: a) lock is BUSY b) this thread is the same one that acquired it.
ASSERT(currentThread == owner);
owner = NULL; // clear the owner
lock->V(); // vanquish the semaphore
(void) interrupt->SetLevel(oldLevel);
}

When a thread acquires a lock, the lock operates P() (If no resource available, it blocks). If successful, it’ll be the lock owner. When it releases the lock (it will check if the owner is the current thread at first), it removes owner property, then the lock operators V() , which means some resources are free currently for some threads to be ready to run.

Condition Variables

void Condition::Wait(Lock* conditionLock) { 
IntStatus oldLevel = interrupt->SetLevel(IntOff);
ASSERT(conditionLock->isHeldByCurrentThread()); // check pre-condition
if (queue->IsEmpty()) {
lock = conditionLock; // helps to enforce pre-condition
}
ASSERT(lock == conditionLock); // another pre-condition
queue->Append(currentThread); // add this thread to the waiting list
conditionLock->Release(); // release the lock
currentThread->Sleep(); // goto sleep
conditionLock->Acquire(); // awaken: re-acquire the lock
(void) interrupt->SetLevel(oldLevel);
}

void Condition::Signal(Lock* conditionLock) {
Thread *nextThread;
IntStatus oldLevel = interrupt->SetLevel(IntOff);
ASSERT(conditionLock->isHeldByCurrentThread());
if (!queue->IsEmpty()) {
ASSERT(lock == conditionLock);
nextThread = (Thread *)queue->Remove();
scheduler->ReadyToRun(nextThread); // wake up the thread
}
(void) interrupt->SetLevel(oldLevel);
}

void Condition::Broadcast(Lock* conditionLock) {
Thread *nextThread;
IntStatus oldLevel = interrupt->SetLevel(IntOff);
ASSERT(conditionLock->isHeldByCurrentThread());
if (!queue->IsEmpty()) {
ASSERT(lock == conditionLock);
while (nextThread = (Thread *)queue->Remove()) {
scheduler->ReadyToRun(nextThread); // wake up the thread
}
}
(void) interrupt->SetLevel(oldLevel);
}

It uses conditional lock to schedule threads. The Wait() makes the thread waiting in conditional queue, then release lock to make the lock available for others, then go to sleep. When waken up, it acquires the lock so it’s ready to run. The Signal() pops up a thread from the conditional queue, make it ready to run. The Broadcast() empties the conditional queue and make them all ready to run.

ring.cc in lab3

void Ring::Put(slot *message) {
buffer[in].thread_id = message->thread_id;
buffer[in].value = message->value;
in = (in + 1) % size;
current++;
}

void Ring::Get(slot *message) {
message->thread_id = buffer[out].thread_id;
message->value = buffer[out].value;
out = (out + 1) % size;
current--;
}

Put() puts the message in ring with index in, which points to the current empty slot, the next one of the last non-empty slot if the ring’s not empty, then in goes to next. Get() fetches the message from ring by index out, which points to the first non-empty slot, then out goes to next for the next fetching operator. The two don’t check if the ring is empty or full.

So I add a variable current to get the current size of the ring, to achieve Full() and Empty() function.

int Ring::Empty() {
return (current == 0);
}

int Ring::Full() {
return (current == size);
}

prodcons.cc in lab3

The original code requires to be implemented, so below is what I implement.

Thread *producers[N_PROD]; //array of pointers to the producer
Thread *consumers[N_CONS]; // and consumer threads;
char prod_names[N_PROD][MAX_NAME]; //array of charater string for prod names
char cons_names[N_CONS][MAX_NAME]; //array of charater string for cons names
Semaphore *nempty, *nfull; //two semaphores for empty and full slots
Semaphore *mutex; //semaphore for the mutual exclusion
Ring *ring;

void Producer(_int which) {
int num;
slot *message = new slot(0,0);
for (num = 0; num < N_MESSG ; num++) {
message->value = num;
message->thread_id = which;
nfull->P();
mutex->P();
ring->Put(message);
mutex->V();
nempty->V();
}
}

void Consumer(_int which) {
char str[MAXLEN];
char fname[LINELEN];
int fd;
slot *message = new slot(0,0);
sprintf(fname, "tmp_%d", which);
if ( (fd = creat(fname, 0600) ) == -1) {
perror("creat: file create failed");
exit(1);
}
for (; ; ) {
nempty->P();
mutex->P();
ring->Get(message);
mutex->V();
nfull->V();
time_t my_time = time(NULL);
sprintf(str,"### %s producer id --> %d; Message number --> %d;\n", ctime(&my_time), message->thread_id, message->value);
if ( write(fd, str, strlen(str)) == -1 ) {
perror("write: write failed");
exit(1);
}
}
}

void ProdCons() {
int i;
DEBUG('t', "Entering ProdCons");
nempty = new Semaphore("nempty", BUFF_SIZE);
nfull = new Semaphore("nfull", 0);
mutex = new Semaphore("mutex", 1);
ring = new Ring(BUFF_SIZE);
for (i = 0; i < N_PROD; i++) {
sprintf(prod_names[i], "producer_%d", i);
producers[i] = new Thread(prod_names[i]);
producers[i]->Fork(Producer, i);
}
for (i = 0; i < N_CONS; i++) {
sprintf(cons_names[i], "consumer_%d", i);
consumers[i] = new Thread(cons_names[i]);
consumers[i]->Fork(Consumer, i);
}
}

Semaphore and Buffer

nempty = new Semaphore("nempty", BUFF_SIZE);
nfull = new Semaphore("nfull", 0);
mutex = new Semaphore("mutex", 1);
ring = new Ring(BUFF_SIZE);

nempty and nfull are the semaphores for critical resources. When nempty = 0 , it means the buffer is empty, and when a resource is generated, nempty-- , when consumed, nempty++ , maximum of which is BUFF_SIZE ; When nfull = 0 , it’s empty. When generated, nfull++, and when consumed, nfull-- .

The Fork() of producer and consumer threads

for (i = 0; i < N_PROD; i++)  {
sprintf(prod_names[i], "producer_%d", i);
producers[i] = new Thread(prod_names[i]);
producers[i]->Fork(Producer, i);
}
for (i = 0; i < N_CONS; i++) {
sprintf(cons_names[i], "consumer_%d", i);
consumers[i] = new Thread(cons_names[i]);
consumers[i]->Fork(Consumer, i);
}

N_PROD producers and N_CONS consumers forked with name generated by i . It uses Fork(func, arg) .

void Thread::Fork(VoidFunctionPtr func, _int arg) {
#ifdef HOST_ALPHA
DEBUG('t', "Forking thread \"%s\" with func = 0x%lx, arg = %ld\n", name, (long) func, arg);
#else
DEBUG('t', "Forking thread \"%s\" with func = 0x%x, arg = %d\n", name, (int) func, arg);
#endif
StackAllocate(func, arg);
IntStatus oldLevel = interrupt->SetLevel(IntOff);
scheduler->ReadyToRun(this); // ReadyToRun assumes that interrupts are disabled!
(void) interrupt->SetLevel(oldLevel);
}

Different from the original Fork() in Linux System Call, it doesn’t copy all the variables and status from the current thread, it just create a new thread without any variables but pass it a function and an argument, and just append it to ReadyToRun() .

lab3

Codes changed are listed above. Then just make and run.

We see it generates two files, tmp_0 and tmp_1 , which are generated by consumer 0 and consumer 1 . But there are 8 messages in tmp_0, 0 message in tmp_1 . Only the consumer 0 get the resource.

But if I run ./nachos -rs 5 , things are different:

------------------- tmp_0 -------------------
### Sun Apr 11 10:23:53 2021
producer id --> 0; Message number --> 0;
### Sun Apr 11 10:23:53 2021
producer id --> 1; Message number --> 0;
### Sun Apr 11 10:23:53 2021
producer id --> 0; Message number --> 2;
### Sun Apr 11 10:23:53 2021
producer id --> 1; Message number --> 1;
### Sun Apr 11 10:23:53 2021
producer id --> 0; Message number --> 3;
### Sun Apr 11 10:23:53 2021
producer id --> 1; Message number --> 3;
------------------- tmp_1 -------------------
### Sun Apr 11 10:23:53 2021
producer id --> 0; Message number --> 1;
### Sun Apr 11 10:23:53 2021
producer id --> 1; Message number --> 2;

So what does rs do?

In the main.cc of lab3, we see function initialize(), which located in system.cc in threads :

else if (!strcmp(*argv, "-rs")) {
ASSERT(argc > 1);
RandomInit(atoi(*(argv + 1))); // initialize pseudo-random number generator
randomYield = TRUE;
argCount = 2;
}
-----------------------------------------------
if (randomYield) // start the timer (if needed)
timer = new Timer(TimerInterruptHandler, 0, randomYield);
-----------------------------------------------
static void TimerInterruptHandler(_int dummy) {
if (interrupt->getStatus() != IdleMode)
interrupt->YieldOnReturn();
}

We see randomYield = TRUE and if true it sets a new timer with TimerInterruptHandler() . In machine , it does many things:

// -------- in stats.h ------------
#define UserTick 1 // advance for each user-level instruction
#define SystemTick 10 // advance each time interrupts are enabled
#define TimerTicks 100 // (average) time between timer interrupts
// ------------ timer.cc -------------
Timer::Timer(VoidFunctionPtr timerHandler, _int callArg, bool doRandom) {
randomize = doRandom;
handler = timerHandler;
arg = callArg;
// schedule the first interrupt from the timer device
interrupt->Schedule(TimerHandler, (_int) this, TimeOfNextInterrupt(), TimerInt);
}
// -------------------------------------------------
int Timer::TimeOfNextInterrupt() {
if (randomize)
return 1 + (Random() % (TimerTicks * 2));
else
return TimerTicks;
}
// ------------ interrupt.cc -------------
void Interrupt::Schedule(VoidFunctionPtr handler, _int arg, int fromNow, IntType type) {
int when = stats->totalTicks + fromNow;
PendingInterrupt *toOccur = new PendingInterrupt(handler, arg, when, type);
DEBUG('i', "Scheduling interrupt handler the %s at time = %d\n", intTypeNames[type], when);
ASSERT(fromNow > 0);
pending->SortedInsert(toOccur, when);
}
// -------------- OneTick() -----------------
while (CheckIfDue(FALSE)) // check for pending interrupts
;
ChangeLevel(IntOff, IntOn); // re-enable interrupts
if (yieldOnReturn) { // if the timer device handler asked
// for a context switch, ok to do it now
yieldOnReturn = FALSE;
status = SystemMode; // yield is a kernel routine
currentThread->Yield();
status = old;
}

If randomized, it will generated a random number range in [1, 200] , which will be passed to Schedule() and appended in pending queue, telling system when to interrupt. So that’s why the context switch will occur when argument -rs is used, otherwise the consumer 0 will never yield so the consumer 1 gets nothing.