Monday 10 September 2012

Classical IPC Problems

Now that we have learnt about semaphores and mutexes, let us write programs using these for the classical IPC problems. I give solution to one version(either using semaphores or using mutexes) per problem. The other versions for each problem is left as an exercise for the reader;).

I will be giving solutions to 4 different problems.

Producer Consumer Problem
Dining Philosophers problem
Sleeping Barber problem
Readers-Writers problem

Note: I will be using the functions which I have provided in my previous post, for semaphores, which is in the file "mysem.c".

Producer-Consumer Problem:Semaphore Version

#include<stdio.h>
#include<stdlib.h>
#include<sys/types.h>
#include<pthread.h>
#include "mysem.c"

#define MAX_BUF 10


int    *buffer;
int    head=0,tail=0,item_id=1;
int    count=0;
int    sem_ID; // 3 semaphores... { Mutex, Empty, Full}
int    fflag=0,eflag=0;

void producer()
{


printf("Producer started\n");

 while(1)
 {
  int v=0;
  sem_change(sem_ID, 0, -1);
  if(count==MAX_BUF)
  {
   printf("Producer waiting\n");
   fflag=1;
   sem_change(sem_ID, 0, 1);
   sem_change(sem_ID, 2, -1);
   sem_change(sem_ID, 0, -1);    //Waits until a free slot is available
  } 
   
  count++;
  buffer[head]=item_id++;
  
  if(count==1 && eflag==1)
  {
  eflag=0; 
  sem_change(sem_ID, 1, 1);    //To signal consumer thread that buffer is not empty
  }
  printf("produced item:%d \n", buffer[head]);
  head=(head+1)%MAX_BUF;
  sem_change(sem_ID, 0, 1);
  
  sleep(rand()%3);
  
 }
}

void consumer()
{
printf("consumer started\n");
 while(1)
 {
  int v=0;
  sem_change(sem_ID, 0, -1);
  if(count==0)
  {
   printf("Consumer Waiting\n");
   eflag=1;
   sem_change(sem_ID, 0, 1);
   sem_change(sem_ID, 1, -1);
   sem_change(sem_ID, 0, -1);
      //Waits until an item is produced
  }
  count--;
  
  printf("consumed item:%d \n", buffer[tail]);
  tail=(tail+1)%MAX_BUF;
  if(count==MAX_BUF-1 && fflag==1)
  {
   fflag=0;
   sem_change(sem_ID, 2, 1);                //To signal the producer thread that       
   }
buffer is not full
   sem_change(sem_ID, 0, 1);
  sleep(rand()%4);
 }
}
int main()
{
 buffer=(int*)calloc(MAX_BUF,sizeof(int));
 pthread_t pro,con;
 int values[]={1,0,0};
 sem_ID=sem_init_diff_val(3, values); //initialize with 1, 0, 0

 pthread_create(&pro, NULL, (void*)&producer, NULL);
 pthread_create(&con, NULL, (void*)&consumer, NULL);
 
 pthread_join(pro,NULL);

 return 0;
}


Dining Philosophers Problem:Mutex Version

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#define PHILOSOPHERS 5

pthread_mutex_t   spoon[PHILOSOPHERS];

//Intializes the philosopher threads
void phil_init(int a, int* b, int* c)
{
 *b=(a>0) ? a-1 : PHILOSOPHERS;
 *c=a;
 printf("Philosopher %d started\n", a+1);
 return;
}


int  check_If_Spoons_Are_Available(int a, int b, int c)
{
 int  sum=0;
 if(a&1)
 {
  sum =  pthread_mutex_trylock(&spoon[c])==0 ? 0 : 10;
  sum += pthread_mutex_trylock(&spoon[b])==0 ? 0 :  1;
 }
 else
 {
  sum = pthread_mutex_trylock(&spoon[b])==0 ? 0 : 1;
  sum += pthread_mutex_trylock(&spoon[c])==0 ? 0 : 10;
 }
 return sum;
}

void Release_Spoons(int a, int b, int c)
{
 if(a&1)
 {
  pthread_mutex_unlock(&spoon[b]);
  pthread_mutex_unlock(&spoon[c]);
 }
 else
 {
  pthread_mutex_unlock(&spoon[c]);
  pthread_mutex_unlock(&spoon[b]);
 }
}

void wait_for_others_to_finish(int a, int b ,int c, int d)
{
 switch(a)
 {
  case 1:
   printf("Philosopher %d waiting since right spoon is unavailable\n",b+1);
   pthread_mutex_lock(&spoon[c]);
   break;
   case 10:
    printf("Philosopher %d waiting since left spoon is unavailable\n", b+1);
    pthread_mutex_lock(&spoon[d]);
    break;
    case 11:
     printf("Philosopher %d waiting since both spoons are unavailable\n", b+1);
     if(a&1)
     {
      pthread_mutex_lock(&spoon[d]);
      pthread_mutex_lock(&spoon[c]);
     }
     else
     {
      pthread_mutex_lock(&spoon[d]);
      pthread_mutex_lock(&spoon[c]);
     }
     break;
 }
 return;
}

void Eat(int a)
{
  printf("philosopher %d eating\n", a+1);
  sleep(rand()%5);
  printf("philosopher %d finished eating\n", a+1);
}
void philo(void * arg)
{
 
 int  back;
 int  front;
 int  tmp;
 int  id=*((int*)arg);
 phil_init(id, &back, &front);


 while(1)
 {
  printf("philosopher %d thinking\n", id+1);
  sleep(rand()%6);

  if((tmp=check_If_Spoons_Are_Available(id, back, front))!=0)
   wait_for_others_to_finish(tmp, id, back, front);
  
   Eat(id);
  
  Release_Spoons(*((int*)arg), back, front);
 }
}


int main(int argc, char* argv[])
{
 pthread_t  S[PHILOSOPHERS];
 int   *g;

 for(int i=0; i<PHILOSOPHERS; i++)
 pthread_mutex_init(&spoon[i], NULL);

  g=(int*)malloc(PHILOSOPHERS*sizeof(int));
 for(int i=0; i<PHILOSOPHERS; i++)
 {
   g[i]=i;
  pthread_create(&S[i], NULL, (void*)&philo,(void*)&g[i]);
 }
 pthread_join(S[0], NULL);
exit(0);
}


Sleeping Barber Problem:Semaphore Version


#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include "mysem.c"
#define MAX_C 5
int sem_ID;         //----> creates a set of 5 semaphores. { mutex, cond_empty, counting semaphore, waiting semaphore, barber semaphore}
int customers_count=0;
int eflag=fflag=0;
void barber()
{
 printf("barber started\n");
 while(1)
 {
  
  sem_change(sem_ID, 0, -1);

  if(customers_count==0)
  {  
   printf("barber sleeping\n");
   eflag=1;
   sem_change(sem_ID, 0, 1);
   sem_change(sem_ID, 1, -1);
   sem_change(sem_ID, 0, -1);
  }

  customers_count--;

  sem_change(sem_ID, 0, 1);
  sem_change(sem_ID, 3, 1);
 // printf("tail:%d\n", tail);
 // pthread_mutex_lock(&B);
  
  sem_change(sem_ID, 4, -1);
 // pthread_mutex_unlock(&B);
  
  }
}
void customer(void *arg)
{  
  
  //printf("C\n");
 sem_change(sem_ID, 0, -1);
 if(customers_count==MAX_C)       // If all seats are occupied exit the thread
  {
   int *ptr=(int*)arg;
   *ptr=0;
   printf("No place for customer %d so leaving\n", pthread_self());
   sem_change(sem_ID, 0, 1);
   return;
  } 
  
  customers_count++;

  if(customers_count==1 && eflag==1)
  {
  sem_change(sem_ID, 1, 1);
  eflag=0;
  }
  sem_change(sem_ID, 0, 1);
  printf("Customer %d got a place\n", pthread_self());
  sem_change(sem_ID, 3, -1);
  printf("Cutting for %d customer\n", pthread_self());
  sleep(rand()%5+4);
  sem_change(sem_ID, 4, 1);
 // printf("head:%d\n", head);
 // pthread_mutex_lock(&B);
  
 // pthread_mutex_unlock(&B);

  
  int *ptr=(int*)arg;
  *ptr=0;
}
int main(int argc, char* argv[])
{
pthread_t  barber_thread;
int   live_threads[MAX_C+2];     // 0 = no thread is created with this index, 

          // 1=there is a live thread with this index

pthread_t  customer_thread[MAX_C +2];
          

for(int i=0; i<MAX_C+2; i++)
 live_threads[i]=0;     // initially all are dead state


int array[]={1, 0, MAX_C, 0, 0};     // Initial values of different semaphores
sem_ID=sem_init_diff_val(5,array);

pthread_create(&barber_thread, NULL, (void*)&barber, NULL);

sleep(2);
//Continuous thread generator....
while(1)
{
 for(int i=0; i<MAX_C+2; i++)
 {
  if(live_threads[i]==0)
  {
   live_threads[i]=1;
   pthread_create(&customer_thread[i], NULL, (void*)&customer, (void*)&live_threads[i]);
   sleep(rand()%4);
  }
  
 }
}


 
exit(0);
}


Readers Writers Problem:Mutex Version

Note: The execution depends on the selection algorithm of the mutex.

#include<stdio.h>
#include<pthread.h>
pthread_mutex_t no_wait, no_acc, counter;
int no_of_readers=0;



void reader(void *arg)
{

 int id=*((int*)arg);
 printf("reader %d started\n", id);
 while(1)
 {
  sleep(rand()%4);
 check_and_wait(id);
 read(id);
}
}


void writer(void* arg)
{ 
 int id=*((int*)arg);
 printf("writer %d started\n", id);
 while(1)
{  
 sleep(rand()%5);
 check_and_wait_if_busy(id);
 write(id);
}
}


/*sub functions for reader and writer threads*/
void check_and_wait_if_busy(int id)
{
  if(pthread_mutex_trylock(&no_wait)!=0){
  printf("Writer %d Waiting\n", id);
  pthread_mutex_lock(&no_wait);
 }
}
void check_and_wait(int id)
{
 if(pthread_mutex_trylock(&no_wait)!=0){
  printf("Reader %d Waiting\n", id);
  pthread_mutex_lock(&no_wait);
 }
}
void read(int id)
{
 pthread_mutex_lock(&counter);
 no_of_readers++;
 pthread_mutex_unlock(&counter);
 if(no_of_readers==1) 
  pthread_mutex_lock(&no_acc);
 pthread_mutex_unlock(&no_wait);
 printf("reader %d reading...\n", id);
 sleep(rand()%5);
 printf("reader %d finished reading\n", id);
 
 pthread_mutex_lock(&counter);
 no_of_readers--;
 pthread_mutex_unlock(&counter);
 if(no_of_readers==0) 
  pthread_mutex_unlock(&no_acc);
 
}

void write(int id)
{
 pthread_mutex_lock(&no_acc);
 pthread_mutex_unlock(&no_wait);
 printf("Writer %d writing...\n", id);
 sleep(rand()%4+2);
 printf("Writer %d finished writing\n", id);
 pthread_mutex_unlock(&no_acc);
}

/**************************************************/
//MAIN
int main(int argc, char* argv[])
{
 pthread_t R[5],W[5];
 int ids[5];
 for(int i=0; i<5; i++)
 {
  ids[i]=i+1;
  pthread_create(&R[i], NULL, (void*)&reader, (void*)&ids[i]);
  pthread_create(&W[i], NULL, (void*)&writer, (void*)&ids[i]);
 }
 pthread_join(R[0], NULL);
 exit(0);
}

<<Prev Next>>

Saturday 8 September 2012

System V IPC:Semaphores

In the previous post, I have described about the Mutexes and Condition Variables of the pthread library. Truly speaking, they don't actually give full support to synchronization. For instance, take the well known Readers-Writers problem, in which any number of readers can read the shared data at once. Can this be implemented by Mutexes alone?? The answer is absolutely no!

So, there comes the Semaphores into the picture.

What is a semaphore?

A semaphore is just like a mutex, but it gives access to shared memory to desired number of processes. Each semaphore will be initialized with a positive value, and this value is decreased or increased by desired amount by each process. When the value reaches zero or when the amount to be decremented takes the value of the semaphore to a negative value the process has to wait( this is optional which we will be seeing shortly), until some other process increments the value.

A mutex is like a semaphore whose initial value is 1

Still confused with the difference between these two? Then click here. It explains nicely with different examples.

System calls related to Semaphores:

Through out this post, our discussion will be about these three Important System calls:

  • semget()
  • semop()
  • semctl()

Where they are stored?

These semaphores are maintained in arrays (not individually) by the kernel. It stores all these values in a special data structure called semid_ds which is a C struct defined as follows:

struct semid_ds {
     struct ipc_perm sem_perm;       /* permissions */
     time_t          sem_otime;      /* last semop time */
     time_t          sem_ctime;      /* last change time */
     struct sem      *sem_base;      /* ptr to first semaphore in array*/
     struct wait_queue *eventn;
     struct wait_queue *eventz;
     struct sem_undo  *undo;         /* undo requests on this array */
     ushort          sem_nsems;      /* no. of semaphores in array */
        };

Now, don't get panicked by seeing the above structure. This is only for your information. Most of them are irrelevant, but some of them are important.

Most of the information about the struct is understood from the comments. The first one stores information about the permissions of the semaphore set like who can read, write and execute the semaphore.
The second and third are the last time when the operations were performed on the semaphore(done by semop()) set and the modified time(done by semget() and semctl()).
The fourth one is a pointer to first semaphore in the array. All but last one are irrelevant for now and the last one stores the number of semaphores in the array

The semaphore struct:

Previously, we have seen the structure for a semaphore set. Now, here is the structure of each semaphore:

struct sem {
            short   sempid;         /* pid of last operation */
            ushort  semval;         /* current value */
            ushort  semncnt;        /* num procs awaiting increase in semval */
            ushort  semzcnt;        /* num procs awaiting semval = 0 */
        };  

I hope you wont be worrying because this is not so big as previous one :P. As I said before, this is only for information. After all being a computer science student, we shouldn't be so abstract!! ;).

Now, let us enter into the description of the main System calls. Starting with semget().


semget():

The prototype of this system call is as follows:

int semget ( key_t key, int nsems, int semflg )

Now, the first argument is a unique number generated from ftok() system call. Its prototype is as follows:

key_t ftok(const char *path, int id)

The path can be any path for a given directory. The id is a number which we give as argument. It functions such that when we give the same path name and id every time, it always gives the same number. In other words, it generates unique value for a given (path,id) pair.

Usage:

key_t G;
G=ftok(".",  12);

Now, G has a some value stored in it. Observe that, the dot which we gave refers to the current directory.(to avoid complications).

This value is passed as the first argument. The second argument is the number of semaphores you want to create in the semaphore set.

The third one, is a bit complex. It is the flags that can be passed. It accepts two flags. IPC_CREAT and IPC_EXCL.

IPC_CREAT:

This flag is used to tell the kernel to create a semaphore set, if it doesn't exists already.

IPC_EXCL:

This flag is used along with the above one, so that the function fails to create a semaphore set if it already exists.

The permissions:

In general, we also specify the permission mode along with the above flags when the semaphore set is being created for the first time. The general mode which we specify is 0660.

0660?

The meaning of the above numbers is simple. If you observe that in Linux every file has file permissions for owner , group and public. There are 3 bits(read, write, execute) for each of them making a total of 9 bits.

Now, to give r,w,x permissions to only the owner, we need to set only the first three bits while the remaining should be cleared making the value 7(111)0(000)0(000).

In the similar manner, 660 means 110 110 000. If we decode it, we find that we are giving read and write permissions on the semaphore set for both the owner and the group which he belongs to.

To give the permissions to public also we specify, 666. Sounds simple right?

All the flags are ORed with each other when specifying multiple flags.

Usage:

int sem_id=semget(key_value, no_of_sems, IPC_CREAT|0660);

I forgot to say an important thing, the return value! It is actually the unique ID for the given semaphore set, which can be used in the later time to perform operations on that set.


semctl():

Just creating the semaphore set is not enough. We have to initialize all the semaphores with some positive values(or may be even zero). This can be done using semctl() system call.The prototype is as follows:

int semctl ( int semid, int semnum, int cmd, union semun arg )

The first argument is the semid which is obtained from semget() system call. The second one is the number of the semaphore in the semaphore set to which you want to initialize the values. It starts from 0. i.e., 0 being the first semaphore in the set.

The third argument is again some what complex. Although there are 10 different commands for this argument, i am gonna explain only the essential ones which are 4 only.

Before saying about these commands, we need to examine the last argument. It is a union of type semun. The structure is as follows:

union semun {
                int val;                /* value for SETVAL */
                struct semid_ds *buf;   
                ushort *array;          /* array for GETALL & SETALL */
                struct seminfo *__buf;  
                void *__pad;
        };

As you see from the comments, only two of them are useful for now, namely the integer val and the int array array.

Now, let us discuss about the four important commands.

SETVAL:

When the third argument is SETVAL, then the value of the semaphore pointed by the semnum is set to the value specified in the val field of the semun union.(Confused?)

For example, if you want to set the 4th semaphore of the semaphore set with ID 1234, with the value say 6, then we need to write the following statements.

union semun tmp;
tmp.val=6;
semctl(1234, 3, SETVAL, tmp);

The above statements perform the desired operations. In simpler way, SETVAL is used to set the value of individual semaphore.

GETVAL:

The GETVAL is used to get the value of a specified semaphore in the set.

For example, we want to retrieve the value of 2nd semaphore of the semaphore set with ID 3456. Then, the following statement should be written.

int value=semctl(3456, 1, GETVAL, 0);

This returns the value into the value type. Note that since here, there is no need of the union, we specify it to bve zero.

SETALL:

When you want to set all the values of the semaphore at once, you can use this command. It takes the Integer array pointed by array in the semun union and sets the values of each semaphore.

For example, you want to initialize, a semaphore set whose ID is 1234 having 4 semaphores, to 2,5,8,3 respectively. Then the instructions used are:

union semun tmp;
ushort g[]={2,5,8,3};
tmp.array=g;
semctl(1234, 4, SETALL, tmp);

This performs the desired operation.

GETALL:

When you want to retrieve the values of all semaphores, then you use this command. It is same as previous one except that after the function call with this command the array of semun has the values of all the semaphores.


semop():

Now, there comes the important one. After creating and initializing the semaphore set, you wish to use them in process synchronization. This can be done with the semop() system call. It has the following prototype.

int semop ( int semid, struct sembuf *sops, unsigned nsops)

The first argument is the semaphore ID on which you want to perform the opearitons. The second argument is a pointer to a struct of type sembuf. It has the following fields:

 struct sembuf {
         ushort  sem_num;        /* semaphore index in array */
         short   sem_op;         /* semaphore operation */
         short   sem_flg;        /* operation flags */
        };

The usage of this system call is much simpler than one might be expecting. You should be initializing this struct in order to perform the operations.
The first field is the index of the semaphore on which you want to perform the operations.
The second one being the amount by which you want to increment or decrement(specify a negative value) the value of the semaphore.
The third one is the operation flags. It generally takes two flags, IPC_NOWAIT and IPC_UNDO. The first one is important for us.

IPC_NOWAIT

When this flag is specified, the process never tries to sleep, if the operation it performs on a semaphore makes the value to negative. If no flag is specified, then the process sleeps until, it can perform the operation.

For example, the value of 3rd semaphore of a semaphore set with ID 1234 is 5 and a process wants to decrement it by 6, then since the value becomes negative(-1), when this is performed, the process actually gets blocked(depending on the operation flag) until some other process increments the value of the semaphore atleast by 1. If you want the process to be blocked then you need to write the following statements:

struct sembuf tmp;
tmp.sem_num=2; //for 3rd semaphore
tmp.sem_op=-6;
tmp.sem_flg=0;//if you don't want the process to be blocked then specify IPC_NOWAIT

semop(1234, &tmp, 1);

Actually, I forgot to explain the 3rd argument. It is the number of operations you want to perform on the semaphore set. In the above case it is 1. If you want to perform multiple operations, you just need to create an array of sembufs and pass the size of the array as the 3rd argument to the semop function. This is left as an exercise for the reader.


Abstraction:

If you want to write a program using these semaphores, I am sure that you get pissed of easily, because you need to write many instructions for a small operation. Abstraction sometimes helps to overcome these problems. Now, don't think what is this one? I am technically saying to define each function for the operations you want to perform.

Here are my functions(you can define your own functions also, but they look nearly like mine ), which are pretty much useful and easily understandable.

#include<stdio.h>
#include<stdlib.h>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<pthread.h>
int ID;
union semun {
 int val;
 struct semid_ds *buf;
 unsigned short int *array;
 };
int state=1;
int sem_init(int no_of_sems, int common_intial_value)
{
 key_t h=ftok(".", state++);

 int sem_id=semget(h, no_of_sems, IPC_CREAT|0666);
 if(sem_id==-1)
 {
  printf("Semaphore init error\n");
  exit(1);
 }
 union semun tmp;
 tmp.val=common_intial_value;

 for(int i=0; i<no_of_sems; i++)
  semctl(sem_id, i, SETVAL, tmp);
 return sem_id;
}



int sem_init_diff_val(int no_of_sems, int *array)
{
 key_t h=ftok(".", state++);
 int sem_id=semget(h, no_of_sems, IPC_CREAT|0666);
 if(sem_id==-1)
 {
  printf("Semaphore init error\n");
  exit(1);
 }
 union semun tmp;//.array=array;
 tmp.array=(unsigned short int*)array;
 semctl(sem_id, 0, SETALL, tmp);
 return sem_id;
}



void sem_change(int sem_id, int sem_no, int amount)
{
 struct sembuf tmp;
 tmp.sem_num=sem_no;
 tmp.sem_flg=0;
 tmp.sem_op=amount;
 if(semop(sem_id, &tmp, 1)==-1)
  {
   printf("Sem_op error\n");
   exit(1);
  }
}

int sem_try_change(int sem_id, int sem_no, int amount)
{
 struct sembuf tmp;
 tmp.sem_num=sem_no;
 tmp.sem_flg=IPC_NOWAIT;
 tmp.sem_op=amount;
 return semop(sem_id, &tmp, 1);
  
}

Conversion between mutex code and semaphores code:

You may be confused with the above functions. Let me convert each mutex function into its equivalent semaphore function.

pthread_mutex_t A,B,C; -----> int sem_ID; //sem_id for three semaphores set
pthread_mutex_init(&A,NULL);
pthread_mutex_init(&B,NULL);
pthread_mutex_init(&C,NULL);

is same as

sem_ID=sem_init(3, 1);

pthread_mutex_lock(&A); -----> sem_change(sem_ID, 0, -1);
pthread_mutex_lock(&B); -----> sem_change(sem_ID, 1, -1);
pthread_mutex_lock(&C); -----> sem_change(sem_ID, 2, -1);

pthread_mutex_unlock(&A); -----> sem_change(sem_ID, 0, 1);
pthread_mutex_unlock(&B); -----> sem_change(sem_ID, 1, 1);
pthread_mutex_unlock(&C); -----> sem_change(sem_ID, 2, 1);

pthread_mutex_trylock(&A); -----> sem_try_change(sem_ID, 0, 1);
pthread_mutex_trylock(&B); -----> sem_try_change(sem_ID, 1, 1);
pthread_mutex_trylock(&C); -----> sem_try_change(sem_ID, 2, 1);

The conversion of condition variables is left as an exercise for the reader. :)


In the above program, you need to declare the union semun. I thought it was a predefined union. But the documentation was saying us to define it. If you further see, there is a state variable, which always increments when a new semaphore is created. This is just to make sure not to generate the same key_t value when we are creating new semaphore sets.
One last thing is that, the above code is yet to be checked for bugs, Although it was working fine with small number of threads.


Additional References:

http://www.cs.cf.ac.uk/Dave/C/node26.html
http://linux.die.net/man/7/svipc
http://cse.yeditepe.edu.tr/~sbaydere/spring2012/cse331/files/SystemVIPC.pdf
<<Prev Next>>

Thursday 6 September 2012

Pthread Library Part:2

Prerequisites: Read the part 1 of pthread library :) :)

In my previous post, I have explained about the mutexes and their importance to avoid race around conditions. But in the real world, only mutual exclusion is not enough. We may sometimes desire a thread to execute only if some condition is satisfied or else it should block.

One might argue that we can use a busy while loop which continuously checks for the required condition. Even I was also implementing using this strategy. But speaking truly, this makes the program quite inefficient due to the unwanted while loop computations.

Condition variables comes to rescue us from this issue. I give you a detailed view about this.

Condition Variables:

These are just like mutexes, not in their semantics, but the syntax which we use to declare it.

pthread_cond_t:

This is the type name used to declare the condition variables.


pthread_cond_init:

This function is used to initialize the condition variables. As I said before, this is much like the mutex declaration. The function prototype is as follows:

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

The first argument is a pointer to a condition variable and the second one is the condition attributes, which for now you can keep it as NULL, which makes it to take the default attributes.


pthread_cond_wait:

Now, here comes the vital function of the condition variables. The prototype is as follows:

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)

It takes two arguments, the first one is the condition variable and the second is the mutex which the calling thread has locked previously.It also releases the lock on the mutex by the calling thread. All these operations are performed at once. This function blocks the thread until pthread_signal function is called.


pthread_cond_signal:

The prototype is as follows:

int pthread_cond_signal(pthread_cond_t *cond)

It takes the condition variable as the agrument, and when this function is called the thread which was blocked with the call pthread_cond_wait will now be awaken.

There are two conditions for a thread to call this function:
1) The calling thread must have the lock on the mutex that was previously released by the thread which was blocked due to pthread_cond_wait.
2) The mutex that is with the calling thread must be released in the future, so that the blocked thread could get back the mutex and resume its execution.


There may be a little bit of confusion with those conditions, but that will be clear if we see an example program.

In this program, two threads initially keep on incrementing the count once in a second.
When the count reaches 20, the first thread gets blocked, because condition wait was called here. Then from here on, the second thread alone increments the count value upto 30.
Then the blocked thread resumes again and both count upto 40. Here, I was comparing two consecutive values because for a single value, there is a possibility of the value getting skipped.


#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
#define T 20
int count=0;

pthread_mutex_t J;
pthread_cond_t con;
void fun1()
{
while(1)
{

   pthread_mutex_lock(&J);
   count++;
   if(count==T || count-1==T)
    {
      printf("waiting for t2\n");
    pthread_cond_wait(&con, &J);

    }
    if(count==40 || count==41)
    {
      pthread_mutex_unlock(&J);
      return;
    }
    printf("fun1(): count=%d\n", count);
   pthread_mutex_unlock(&J);
   sleep(1);
}
}
void fun2()
{
while(1)
{
   pthread_mutex_lock(&J);
   count++;
   if(count==40 || count==41)
    {
      pthread_mutex_unlock(&J);
      return;
    }
   if(count==30)
    pthread_cond_signal(&con);
  printf("fun2(): count=%d\n", count);
   pthread_mutex_unlock(&J);
   sleep(1);
}
}
void main(int argc, char* argv[])
{
    pthread_t A,B;
    pthread_cond_init(&con, NULL);
    pthread_mutex_init(&J, NULL);
    pthread_create(&A, NULL, (void*)&fun1, NULL);
    pthread_create(&B, NULL, (void*)&fun2, NULL);
    pthread_join(A, NULL);
exit(0);
}

Try to copy the code and execute it. You will clearly understand what is going on.


pthread_cond_broadcast:

In addition to the above functions, this function is used when more than one thread is waiting on same condition variable. This function call results in waking up of all those blocked threads. Its prototype is as shown:

int pthread_cond_broadcast(pthread_cond_t *cond)

pthread_exit:

Sometimes, you may want to terminate thread when it is in the middle of execution. You may say that i will just call return. Well, you are half correct then! Because, this idea won't work if a function calls another function and its the worst idea for recursive case!! So, the pthread library provides pthread_exit function, which immediately terminates the thread, even it is the 100th or 1000th function call. Its prototype is as follows:

void pthread_exit(void *value_ptr)

The only argument is the status of exit of the calling thread which is immaterial for now.


pthread_self:

At times you may want to know the process ID(as said before linux kernel doesn't differentiate between threads and processes, hence process ID!!) of the thread, you can use this function.

pthread_t pthread_self(void)

It returns the thread ID ( or process ID) of the calling thread


pthread_kill:

Like the kill command, in pthreads we have the method pthread_kill function which has nearly the same functionality. The prototype is as follows:

int pthread_kill(pthread_t thread, int sig)

It takes the pthread_t and an integer which indicates the signal. The different signal numbers can be viewed here.


These are the basic functions required to create and run pthreads conveniently. Although, still there are many functions in the library, I will leave it to the reader to explore them

<<Prev Next>>

Pthread Library Part:1

Prerequisite: You may required to read my previous post on threads, because i will be referencing that in this post.

Pthreads, short name for POSIX Threads, is a thread library developed by the POSIX standard by IEEE. This library provides an interface to create threads in a program.

All the threads created using pthread library are purely KERNEL threads. So, what are these kernel threads? These are the threads that run directly on the kernel and Linux kernel especially doesn't actually differentiate between processes and threads. Hence, the threads are directly scheduled by the operating system, along with the other processes.

Why to learn this?

So, you want to write a program using pthreads. But why do you actually need to do that? The main reason is that writing different programs using pthreads especially the Classical Inter Process Communication problems gives you a good idea about the various issues related to parallel processing which in turn helps you visualize what actually operating system does to solve these issues.

Having known the main reason of why to write these programs, let us jump into the library and explore different types of functions and their uses.

Exploring the library

First things first, when you use a pthread library,you should include the pthread.h header file.

#include < pthread.h > 

pthread_t:

A thread object is denoted by pthread_t. This is the type name for threads in a pthread library.


pthread_create:

In order to create a thread, we use the function pthread_create and its prototype is as follows:

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg)

As we see the prototype, it takes four arguments, the first one being a pointer to a pthread_t object, second one will be discussed later, for now put it as NULL

Third argument is the function pointer to the function which you want to execute using this thread. If you closely observe the function should take void* type arguments and return a void pointer. But in general, we just write the functions which are of void type(to avoid confusion) and either take no arguments or a single void pointer. This function is type casted to void pointer while passing to the above function

The fourth one is the argument which you want to pass to the called function. It should be type casted to void pointer. In the function, it should be casted back to its original type and use its value.

The above function creates a thread and this thread executes the function which is passed as argument, and of course, the main method is executed parallelly, and the thread gets terminated once it has finished executing the given function.


pthread_join:

As said above, since the main thread executes parallelly along with the newly created thread, when the main thread is terminated it also kills the created thread due to which we can't see the action of the thread. There should be some mechanism to block the main thread until the created thread completes its execution. Here comes the pthread_join method with the following prototype:

int pthread_join(pthread_t thread, void **retval)

It takes two arguments, the name of thread for which the calling thread has to wait, In the case you call this function from main, the main thread waits for the thread specified by the argument.

The second argument is a double pointer which contains the exit status of the terminated thread, i.e., whether it exited normally or due to some other signal. This is not quite oftenly used and can be kept NULL.


We are done with the main functions required to create and run a thread. Now, let us write a simple program which creates two threads, that prints some strings.

#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>

void fun1()
{
    printf("this is from thread 1!\n");

}
void fun2()
{
     printf("this is from thread 2!\n");
}
int main(int argc, char*argv[])
{
     pthread_t t1,t2;
     pthread_create(&t1, NULL, (void*)&fun1, NULL);
     pthread_create(&t2, NULL, (void*)&fun2, NULL);
     pthread_join(t1, NULL);
     pthread_join(t2, NULL);
 exit(0);
}

Compiling the above program in Linux can be done by adding -pthread argument along with normal cc file_name.c. If you are using Solaris, then use -lpthread instead.

When you run the program, you will see two strings on the console, of course the order may change based on the scheduling of the threads, CPU load and other factors.


The above program is a trivial one and they don't actually have any shared data. But when two threads actually try to manipulate a common variable, like incrementing or decrementing the value, since those operations are not atomic, i.e., they are not executed at once by the processor, for example incrementing a value involves single instruction in C/C++:

variable_name++

But, when this is converted to machine instructions, there exists three instructions as follows:

MOV register, [addres_of_the_variable]
INC register
MOV [addres_of_the_variable], register

Speaking truly, the computer cannot guarantee that all these instructions are executed simultaneously, but there will be a chaos when these instructions are mixed up with decrement instructions of the same variable.

Hence, two threads executing parallelly, should never change the value of a shared variable at the same time, because we are not aware which instruction executes first, the value is unpredictable. This is called Race Condition.

This should be strictly avoided and there comes the MUTEX into the picture.


Mutex:

It is the short form of Mutual Execution. The code which manipulates the shared variable among the threads is called Critical section. Mutexes can be used to execute this critical section of code, exactly one thread at a time, thus avoiding the Race condition.

Let us look how this is accomplished...


pthread_mutex_t:

This is the type name of mutexes in pthread library.


pthread_mutex_init:

This is the function which initializes the mutex. It has the following prototype:

int pthread_mutex_init(pthread_mutex_t *mutex, 
    const pthread_mutexattr_t *attr)

This function takes two arguments. The first one is a pointer to the mutex and second one is the mutex attributes. We normally keep the second argument NULL.


pthread_mutex_lock:

This function is used to lock the mutex. You can imagine this as follows. Every mutex has a 'key' and when any thread calls the lock function, this key is given to the calling thread. The prototype is as follows:

int pthread_mutex_lock(pthread_mutex_t *mutex)

The only argument is the pointer to the mutex variable.


pthread_mutex_unlock:

This function is used to unlock the previously locked mutex. If you try to call this without gettng the lock of the mutex, consequently you will get an execption. You can imagine that this method actually gives back the 'key' to the mutex. The prototype is as follows:

int pthread_mutex_unlock(pthread_mutex_t *mutex)

This also takes a single argument of mutex pointer type.


I know that you are bit confused with this 'lock' thing. Let us put together what happens.

Two threads having a shared variable consequently have critical section in their function code. Our aim is that only one of the threads executes this code. So, just define a mutex and initialize it. Then, call lock function in both the threads' functions before the critical section and unlock function after the critical section.

Suppose, thread A got the 'key' and thread B also called the lock function, but since our mutex has only one key and its with thread A, thread B has to wait. When thread A returns the 'key' after executing the critical section the mutex has the key and it gives it to thread B. Now, thread B starts executing, and if thread A wants again to execute the code , it has to wait until B returns the 'key'.

Voila! what we get from the above situation is only one thread is executing the critical code, satisfying the mutual exclusion principle and avoiding the Race Condition.


pthread_mutex_trylock:

As I said before, a thread gets blocked when it calls a lock function but the 'key' is with other thread. Sometimes we don't want a thread to wait for the 'key' but just check if a mutex is locked and get the 'key' if the mutex has the key. Then try_lock function comes handy.The prototype is as follows:

int pthread_mutex_trylock(pthread_mutex_t *mutex)

If the mutex has the 'key' the thread gets it, but doesn't wait if it doesn't possess the key.


Now, let us look at a simple program to prove mutual exclusion. I am not using any shared varaibles here, instead i print some strings.

#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>

pthread_mutex_t J;

void fun1()
{
while(1)
{
   pthread_mutex_lock(&J);
   printf("1\n");
   printf("2\n");
   pthread_mutex_unlock(&J);
   sleep(1);
}
}
void fun2()
{
while(1)
{
   pthread_mutex_lock(&J);
   printf("3\n");
   printf("4\n");
   pthread_mutex_unlock(&J);
   sleep(2);
}
}
void main(int argc, char* argv[])
{
    pthread_t A,B;
    pthread_mutex_init(&J, NULL);
    pthread_create(&A, NULL, (void*)&fun1, NULL);
    pthread_create(&B, NULL, (void*)&fun2, NULL);
    pthread_join(A, NULL);
exit(0);
}

If you observe the above program, I have used sleep function so that we will be able to see the output, and more over if you observe the output you can see that 1&2 always are adjacent and 3&4 are always adjacent.

You can't have the output like 1324... because of the mutexes.Thus ensuring the Mutual Exclusion. The same can be applied to shared variables also.


<<Prev Next>>