Saturday, 10 November 2012

System V IPC: Message Queues

In this post, I will be telling you about another method of Inter Process Communication using message queues.

why message queues?

These are most widely used method of IPC, because a message queue is more flexible than that of shared memory. By using message queues, we can define our own protocols of passing messages between two processes. I don't say this isn't possible with shared memory, but its a bit tedious to do. Let us start learning them now.


The data structure:

Each message queue has a special structure which is used to store the various information regarding itself. It is defined as follows:


struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;  /* first message on queue */
    struct msg *msg_last;   /* last message in queue */
    time_t msg_stime;       /* last msgsnd time */
    time_t msg_rtime;       /* last msgrcv time */
    time_t msg_ctime;       /* last change time */
    struct wait_queue *wwait;
    struct wait_queue *rwait;
    ushort msg_cbytes;    
    ushort msg_qnum;     
    ushort msg_qbytes;      /* max number of bytes on queue */
    ushort msg_lspid;       /* pid of last msgsnd */
    ushort msg_lrpid;       /* last receive pid */
};

Just have a glance at the various fields used in the structure.


System calls:

There are four system calls for message queues namely:

msgget()
msgrcv()
msgsnd()
msgctl()

Starting with the first one...

msgget():

This is similar to the other gets and has the following prototype:

int msgget ( key_t key, int msgflg );

The first argument is a key value described here The flags associated with message queues are exactly same as shared memory(from my previous post).

Upon success, it returns the message queue ID and on failure it returns -1, setting the appropriate value of errno.


msgsnd():

This system call is used to put the messages on to the queue and it has the following prototype:

int msgsnd ( int msqid, void *msgp, int msgsz, int msgflg );

The first argument is the message queue ID returned from msgget. The second argument is a pointer to user-defined structure which should have atleast two fields: a long integer which specifies the type of message, a character string which stores the message. Of course, you can add as many other fields as you wish.

The third argument is the size of the user-defined structure. It shouldn't count the size of the message type variable.

Sometimes, the message queue may be full with the messages, and if a process tries to put another message onto the queue, it will either block or continue without inserting the message based on the flags. If IPC_NOWAIT is not specified, then the process gets blocked until it gets room for its message to put in or else it simply continues its execution.

It returns 0 on success and -1 on failure.


msgrcv():

This system call is used to retrieve the messages from the queue. It has the following prototype:

int msgrcv ( int msqid, void *msgp, int msgsz, long mtype, int msgflg );

The first argument is the message queue ID. The second one is a pointer to the user defined message structure. Third one is the maximum size of the message to be accepted. Fourth one is the type of the message the process wants to retrieve.

If the available message is bigger than the maximum size specified, then we have a choice of either not removing the message out of queue or remove the message and truncate it to fit the size specified. This is done by MSG_NOERROR bit. If it is specified, it is truncated or else the call fails returning -1 and keeping the queue unchanged.

Another flag used is IPC_NOWAIT in which if this is not specified, then the calling process gets blocked until there is a message available of the specified type. If that falg is specified, then it doesn't get blocked.


msgctl():

The function of msgctl is exactly same as that of shmctl except that there is a change in name of the struct argument. Its prototype is as follows:

int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );

You can see the details here.


Example:

Let us see an example program with message queues. I prefer to show the same example explained in the previous post, but with message queues.

In these programs, there is no need of any semaphores. Guess why? Because we are going to use the fact that a process gets blocked until it finds a message of the type it needs. So, here is the code of first program to run(not mandatory).


#include <stdio.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
struct mymsg
{
 long type;
 char msg[30];
 char from[10];
};
struct mymsg Msg;
int main()
{
 int msg_id=msgget(1456, IPC_CREAT | 660);
 if(msg_id==-1)
 {
  perror("msgget error!\n");
  exit(1);
 }
 int h=fork();
 if(h==0)
 {
  /*this forked process takes the i/p from console
   *and puts into the message queue
   */
  char tmp[30];
  int n;
  while(1)
  {
   n=read(0, tmp, 30);
   tmp[n]='\0';
   strcpy(Msg.msg, tmp);
   Msg.type=1; //this process sends type 1 msgs
   strcpy(Msg.from,"p1");
  if(msgsnd(msg_id, &Msg, sizeof(struct mymsg)-sizeof(long),0)==-1)
   {
    perror("send error!\n");
    exit(1);
   }
  }
 }
 else if(h>0)
 {
  /*this one reads the msgs from the queue and 
   *displays to the console...
   */
  struct mymsg Buf;
  while(1)
  {
   //it tries to fetch type 2 messages
  if(msgrcv(msg_id, &Buf, sizeof(struct mymsg)-sizeof(long),2,0)==-1)
   {
    perror("rec. error!\n");
    exit(0);
   }
   printf("%s:%s", Buf.from, Buf.msg);
  }
 }
 else
 {
  printf("fork error!\n");
  exit(1);
 }
 exit(0);
}


The second process code is same as the first one, except that they differ in the message types that they send and receive. It is as follows:


#include <stdio.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
struct mymsg
{
 long mtype;
 char msg[30];
 char from[10];
};
struct mymsg Msg;
int main()
{
 int msg_id=msgget(1456, IPC_CREAT | 660);
 if(msg_id==-1)
 {
  perror("msgget error!\n");
  exit(1);
 }
 int h=fork();
 if(h==0)
 {
  /*this forked process takes the i/p from console
   *and puts into the message queue
   */
  char tmp[30];
  int n;
  while(1)
  {
   n=read(0, tmp, 30);
   tmp[n]='\0';
   strcpy(Msg.msg, tmp);
   Msg.mtype=2; //this process sends type 2 msgs
   strcpy(Msg.from,"p2");
   if(msgsnd(msg_id, &Msg, sizeof(struct mymsg)-sizeof(long),0)==-1)
   {
    perror("send error!\n");
    exit(1);
   }
  }
 }
 else if(h>0)
 {
  /*this one reads the msgs from the queue and 
   *displays to the console...
   */
  struct mymsg Buf;
  while(1)
  {
   //it tries to fetch type 1 messages
  if(msgrcv(msg_id, &Buf, sizeof(struct mymsg)-sizeof(long),1,0)==-1)
   {
    perror("rec. error!\n");
    exit(0);
   }
   printf("%s:%s", Buf.from, Buf.msg);
  }
 }
 else
 {
  printf("fork error!\n");
  exit(1);
 }
 exit(0);
}  


When you run both the programs, you can see that a string typed in one window appears in the other one.

Some systems may need super user privileges to create a message queue. In that case run the program as super-user as follows:

sudo ./a.out


1 comment:

  1. thanks for a nice information. this is a very nice and pretty information.

    Queue system

    ReplyDelete