POSIX Message Queues in Linux

July 24, 2021 0 Comments

POSIX Message Queues in Linux

POSIX Message Queues in Linux, For IPC, the Linux operating system includes a mechanism called Message queue. Message queues allow processes on the same computer to share data. A process can either construct a new message queue or use one that has already been created by another process.


The ID of a message queue is unique; no two message queues can have the same ID. Message queues are managed by the kernel OS.

POSIX message queues have been built, and messages are stored in the mqueue file system. To initiate message queue operations, applications are provided with file APIs. each message identifies with priority.

Only high-priority communications can read first, followed by low priority messages (no random access).

mq_open:

mq_open() creates a new POSIX message queue or opens an existing queue. The queue identify by name.

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);

Link with -lrt.

mq_close:

mq_close() closes the message queue descriptor mqdes.

#include <mqueue.h>

int mq_close(mqd_t mqdes);

Link with -lrt.

mq_send:

mq_send() adds the message pointed to by msg_ptr to the message queue referred to by the message queue descriptor mqdes. The msg_len argument specifies the length of the message pointed to by msg_ptr; this length must be less than or equal to the queue’s mq_msgsize attribute.

The msg_prio argument is a non-negative integer that specifies the priority of this message.

#include <mqueue.h>

int mq_send(mqd_t mqdes, const char *msg_ptr,
                     size_t msg_len, unsigned int msg_prio);


Link with -lrt.

mq_receive:

mq_receive is for retrieving a message from the queue referred by the descriptor mqdes.

#include <mqueue.h>

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
                          size_t msg_len, unsigned int *msg_prio);


Link with -lrt.

mq_unlink:

mq_unlink() removes the specified message queue name. The message queue name removes immediately. The queue itself destroyed once any other processes that have the queue open close their descriptors referring to the queue.

#include <mqueue.h>

int mq_unlink(const char *name);

Link with -lrt.

sender example:

#include<stdio.h>
#include<fcntl.h>           /* For O_* constants */
#include<sys/stat.h>        /* For mode constants */
#include<mqueue.h>
#include<string.h>
int main()
{
        mqd_t mq;
        int m;
        char buf[30] = "hello world";
        mq = mq_open("/test_queue", O_WRONLY|O_CREAT,0,0);
        if(mq == -1) {
                printf("mq_open error\n");
                return -1;
        }
        m = mq_send(mq,buf,strlen(buf),0);
        if(m == -1) {
                printf("mq_send error");
                return -1;
        }
        printf("%d\n",m);
        mq_close(mq);
        return 0;
}

receiver example:

#include<stdio.h>
#include<fcntl.h>           /* For O_* constants */
#include<sys/stat.h>        /* For mode constants */
#include<mqueue.h>
#include<string.h>
int main()
{
        mqd_t mq;
        int m;
        char buff[8096];
        struct mq_attr attr;
        attr.mq_flags = 0;
        attr.mq_maxmsg = 10;
        attr.mq_msgsize = 8096;
        attr.mq_curmsgs = 0;
        mq = mq_open("/test_queue", O_CREAT | O_RDWR, 0644, &attr);
        if(mq == -1) {
                printf("mq_open error\n");
                return -1;
        }
        m = mq_receive(mq,buff,sizeof(buff),NULL);
        if(m == -1) {
                printf("mq_receive error\n");
                return -1;
        }
        printf("%d\n",m);
        printf("%s\n",buff);
        mq_close(mq);
        return 0;
}
Outpu:
gcc receiver.c -o receiver -lrt
gcc  sender.c -o sender -lrt

run receiver on one terminal
./receiver
11
hello world
run sender on other terminal
./sender
0

For new message arrival on an empty message queue, readers can register for message queue notification or message arrival notification with the message queue. Any of the following approaches can use to register a notification.
1. Signal notification
2. Thread notification

Signal notification:

Example Signal notification:

#include <stdio.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

mqd_t mqid;

void handler(int x)
{

        char msg[11];
        int pri;        // = 30;
        ssize_t mrec;

        mrec = mq_receive(mqid,msg,8496,&pri);
        if(mrec < 0)
        {
                perror("mq_receive");
                exit(1);
        }
        printf("data received is %s\n",msg);


}
int main(int argc, char **argv)
{

        int ret;

        signal(SIGUSR1,handler);
        struct sigevent event;
        memset(&event,0,sizeof(event));
        event.sigev_notify = SIGEV_SIGNAL;
        event.sigev_signo = SIGUSR1;

        mqid = mq_open("/test_queue",O_RDWR| O_CREAT, 0, 0);
        if(mqid < 0)
        {
                perror("mq_open:");
                exit(2);
        }

        ret = mq_notify(mqid,&event);
        if(ret < 0)
        {
                perror("mq_notify");
                exit(3);
        }
        //sleep(30);
        while(1);
        return 0;
}
Output:
gcc signal_posix_msq.c -o signal_posix_msq -lrt
gcc  sender.c -o sender -lrt

run signal_posix_msq on one terminal
./signal_posix_msq
data received is hello world

run sender on other terminal
./sender
0

Thread notification:

Example Thread notification

#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include<signal.h>
#include<unistd.h>

mqd_t mqid;

void *threadfun(void *p)
{
        printf("threadfun created\n");
        char msg[256];

        ssize_t mrec;
        int pri = ((union sigval *)p)->sival_int;
        mrec = mq_receive(mqid,msg,8496,&pri);
        if(mrec < 0)
        {
                perror("mq_receive");
        }
        printf("data is %s\n",msg);

        pthread_exit(NULL);

}

int main(int argc, char *argv[])
{
        int ret;

        pthread_t t1;

        /*To set msgQ attributes*/
        struct mq_attr attr;
        attr.mq_flags = 0;
        attr.mq_maxmsg = 10;
        attr.mq_msgsize = 4096;
        attr.mq_curmsgs = 0;
        
        if ((mqid  = mq_open ("/test_queue", O_RDONLY | O_CREAT, 0660, &attr)) == -1) {
                printf ("Client: mq_open failed");
                exit (1);
        }


        struct sigevent  info;
        memset(&info,0,sizeof(info));

        info.sigev_notify = SIGEV_THREAD;
        info.sigev_value.sival_int = 30;
        info.sigev_notify_function = (void *)threadfun;
        info.sigev_notify_attributes = NULL;

        ret = mq_notify(mqid,&info);
        if(ret < 0)
        {
                perror("mq_notify");
                exit(2);
        }

        ret = pthread_create(&t1,NULL,threadfun,&(info.sigev_value));
        if(ret != 0)
        {
                perror("pthread_creat");
                exit(3);
        }

        sleep(10);
        pthread_join(t1,NULL);
        mq_close(mqid);
        return 0;


}
Output:
gcc thread_posix_msq.c -o thread_posix_msq -lrt -lpthread

run thread_posix_msq on one terminal
./thread_posix_msq
threadfun created
data is hello world

run sender on other terminal
./sender
0
Share This: