爱程序网

linux消息队列通信

来源: 阅读:

IPC机制

      进程间通信机制(Inter Process Communication,IPC),这些IPC机制的存在使UNIX在进程通信领域手段相当丰富,也使得程序员在开发一个由多个进程协作的任务组成的系统时,可以采用多种方法。这些高级IPC机制可分为以下三类:消息传递.信号量.共享存储。

       信息传递构造一个消息队列,进程间通过发送消息和接收信息进行通信。

       消息实质上就是一些字或字节的序列(未必以空字符结尾)。它通过消息队列的方法在进程间传递。

      进程间通信通过IPC对象,每个IPC对象都有唯一的ID号,通信双方需要获取该ID,创建者通过创建函数可以得到该值,可是另外的进程不能随意访问创建者的空间,于是约定利用相同的KEY值对于相同的ID,

      系统为每一个IPC对象保存一个ipc_perm结构体,该结构说明了IPC对象的权限和所有者,每一个版本的内核各有不用的ipc_perm结构成员。若要查看详细的定义请参阅文件<sys/ipc.h>。

 

ipc_perm 结构定义于/usr/include/bits/ipc.h中,原型如下:
struct ipc_perm
{
     key_t key; //关键字
     uid_t uid; /*共享内存所有者的有效用户ID */
     gid_t gid; /* 共享内存所有者所属组的有效组ID*/
     uid_t cuid; /* 共享内存创建 者的有效用户ID*/
     gid_t cgid; /* 共享内存创建者所属组的有效组ID*/
     unsigned short mode; /* Permissions + SHM_DEST和SHM_LOCKED标志*/
      unsignedshort seq; /* 序列号*/
};

 

Ftok函数:

     系统建立IPC通讯(如消息队列、共享内存时)必须指定一个ID值。通常情况下,该id值通过ftok函数得到。
ftok原型:
ftok(char *pathname, int projid)
参数:
pathname:文件名(含路径),通常设置为当前目录“.” 比如projid'a',则为"./a"文件
projid:项目ID,必须为非0整数(0-255).

其实它就是用来实现从文件名到关键值的映射。

 

(一)msgsnd函数:

功能: 进程利用它来向消息队列发送消息。

函数声明: int msgsnd ( int msqid, struct msgbuf *msgp, int msgsz, int msgflg )
返回值: 调用成功:返回0值;调用失败: -1

参数介绍:

第一个参数msqid: 是消息队列对象的标识符(由msgget()函数得到),

第二个参数msgp : 指向要发送的消息所在的内存,

第三个参数msgsz: 是要发送信息的长度(字节数),可以用以下的公式计算:
(msgsz = sizeof(struct mymsgbuf) - sizeof(long);)
第四个参数:是控制函数行为的标志,可以取以下的值:
0,忽略标志位;
IPC_NOWAIT:如果消息队列已满,消息将不被写入队列,控制权返回调用函数的线
程。如果不指定这个参数,线程将被阻塞直到消息被可以被写入。

 

(二)msgget()函数:

功能: 用来创建新的消息队列获取已有的消息队列

函数声明: int msgget ( key_t key, int msgflg )
返回值: 调用成功:消息队列的标识符;调用失败: -1:

参数介绍:

第一个参数key 消息队列对象的关键字(key),函数将它与已有的消息队列对象的关键字进行比较来判断消息队列对象是否已经创建。

第二个参数msgflg:msgflg 控制函数具体操作。它可以取下面的几个值:
IPC_CREAT :如果消息队列对象不存在,则创建之,否则则进行打开操作;
IPC_EXCL: 和IPC_CREAT 一起使用(用”|”连接),如果消息对象不存在则创建之,否则产生一个错误并返回-1。

除了以上的两个标志以外,在msgflg 标志中还可以有存取权限控制符。这种控制符的
意义和文件系统中的权限控制符是类似的。

 

(三)msgrcv()函数:

功能:用来从消息队列中取出消息。
函数声明: int msgrcv ( int msqid, struct msgbuf *msgp, int msgsz, long
mtype,int msgflg )
返回值: 调用成功:从消息队列里拷贝过来的字节数;调用失败:返回-1;

参数介绍:

函数的前三个参数和msgsnd()函数中对应的参数的含义是相同的。

第四个参数mtype:

      指定了函数从队列中所取的消息的类型。函数将从队列中搜索类型与之匹配的消息并将之返回。如果mtype 的值是零的话,函数将不做类型检查而自动返回队列中的最旧(优先级最小的)的消息。
第五个参数依然是是控制函数行为的标志,取值可以是:
    0,表示忽略;

    MSG_NOERROR,如果函数取得的消息长度大于msgsz,将只返回msgsz 长度的信息,
剩下的部分被丢弃了。如果不指定这个参数,E2BIG 将被返回,而消息则留在队列中不被
取出。    
     IPC_NOWAIT,如果消息队列为空,则返回一个ENOMSG,并将控制权交回调用函数
的进程。如果不指定这个参数,那么进程将被阻塞直到函数可以从队列中得到符合条件的
消息为止。如果一个client 正在等待消息的时候队列被删除,EIDRM 就会被返回。如果进
程在阻塞等待过程中收到了系统的中断信号,EINTR 就会被返回。  

(注意:当消息从队列内取出后,相应的消息就从队列中删除了。)

 

msgctl()函数:

功能:直接控制消息队列的行为。

函数声明: int msgctl ( int msgqid, int cmd, struct msqid_ds *buf )
返回值:调用成功就返回0;失败就返回-1

参数介绍:

第一个参数msgqid :是消息队列对象的标识符。
第二个参数是函数要对消息队列进行的操作,它可以是:
      IPC_STAT:取出系统保存的消息队列的msqid_ds 数据,并将其存入参数buf 指向的msqid_ds 结构
中。
      IPC_SET:设定消息队列的msqid_ds 数据中的msg_perm 成员。设定的值由buf 指向的msqid_ds
结构给出。
      IPC_EMID:将队列从系统内核中删除。
     这三个命令的功能都是明显的,所以就不多解释了。唯一需要强调的是在IPC_STAT
命令中队列的msqid_ds 数据中唯一能被设定的只有msg_perm 成员,其是ipc_perm 类型的
数据。而ipc_perm 中能被修改的只有mode,pid 和uid 成员。其他的都是只能由系统来设定
的。

 

实验代码:

(一)简单的test.c

//test.c#include<sys/msg.h>
#include<sys/ipc.h>
struct mymsg{
    long mtype;         /* 消息类别 */
    char mtext [30];   /* message text */ 
};
int main()
{
    key_t key = ftok(".",4);                                  //以当前的路径为基础调用ftok函数返回一个关键值
        printf("当前文件夹的关键值是:%d\n",key);
    int mqid = msgget(key,0644|IPC_CREAT|IPC_EXCL);  //使用关键值来创建或打开一个消息,并返回它的标识符
        printf("消息队列的标识符是:%d\n",mqid);
    struct mymsg msg_snd={1,"qwerty"};               //定义放进消息队列里面的消息    
    int a = msgsnd(mqid,&msg_snd,6,0);               //调用msgsnd函数把消息写进消息队列里面
    if(a==0)                                               //写入队列成功,a值为0,否则为-1
    {
        struct mymsg msg_rcv;                           
        msg_rcv.mtype=1;
        int b = msgrcv(mqid,&msg_rcv,7,msg_rcv.mtype,0);      //调用msgrcv函数把消息读出来
        if(b!=-1)
        {printf("通过msgrcv从消息队列里面读取的消息:%s\n",msg_rcv.mtext);}
     }
    return 0;
}
View Code

实验前:

image

实验后:

image

 

如果去除将消息取出的部分函数,则编译结果中将含有6个字节数,一个消息:

image

实验分析:

      了解好前面介绍的几个函数后,这个小实验就很好理解了。Ftok函数以当前路径为基础返回一个关键值,megget又根据关键值创建了消息队列(这段代码里是创建队列,而非打开);而msgsnd函数和msgrcv函数都是根据消息队列标识符mqid(message queue id)来进行相应操作的,要注意的是,当消息被取走后,消息队列还在,但消息内容为空。

 

(二)具有优先级的队列

       实现一个排队系统,使队列中的每个成员都被赋予一个优先级。一个服务进程从队列中取出成员并按照通常的方式处理。例如,排队的成员可以是文件名,此时服务进程就把文件复制到一个打印机上。

 实验代码:

头文件:

//用于消息传递实验的头文件
/*q.h--header for message facility example */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <error.h>

#define QKEY  (key_t)0105    //给出消息队列的关键值
#define QPERM  0660    //
#define MAXOBN  50    //maximum length of obj.name  给消息队列里面的消息加以限制
#define MAXPRIOR  10    //最大优先级

struct q_entry {
    long mtype;
    char mtext[MAXOBN+1];
}; //注意不要漏掉此处的分号,否则会导致包含该q.h文件的C程序出现语法错误
View Code

enter方法

#include"q.h"
#include<stdio.h>
int init_queue(void) {                      
    int queue_id;
    if ((queue_id=msgget(QKEY, IPC_CREAT|QPERM))==-1)  //使用msgget创建或打开一个消息队列,宏定义里面已经定义了关键值
        perror("获取队列的关键值失败!!!!!!!!!");
    return(queue_id);
}
int warn (char*s){
    fprintf(stderr,"错误: %s\n",s);
}
int enter(char *objname, int priority) {    //各种if检测判断,但主要是调用msgsnd
    int len, s_qid;
    struct q_entry s_entry;       //消息中转站,也是结构体类型

    if ((len=strlen(objname))>MAXOBN) { //要先检查名称长度和队列的权限
        warn("队列名字太长拉,出错拉!");
        return(-1);
    }
    if (priority>MAXPRIOR||priority<0) {  //priority的最小值要为1
        warn("队列的权限超出了限制或则小于0");
        return(-1);
    }
    
    
    if ((s_qid=init_queue())==-1)          //调用init_queue打开消息队列
        return(-1);
     /*
    if(priority!=1)
    {
        warn("不是消息对列的专用类型队\n");
        return(-1);
    }
    */    
    s_entry.mtype=(long)priority;       //中转站的消息初始化
    strncpy(s_entry.mtext, objname, MAXOBN);    
    
    if(s_entry.mtype==1)
    {return 0;}

    if (msgsnd(s_qid, &s_entry, len, 0)==-1) {    //把消息从中转站写进去消息队列里面
        printf("消息发送到队列里失败");
        return(-1);    
    }    
     else {return(0);}    
}
View Code

serve方法

#include"q.h"
 serve(void) {              //这个函数是用于处理消息队列中的服务程序,像是enter的翻版
    int mlen, r_qid;
    struct q_entry r_entry;
    
    if ((r_qid=init_queue())==-1)  //通过标识符检查消息队列是否存在
        return(-1);

    for(;;) {
        if ((mlen=msgrcv(r_qid, &r_entry, MAXOBN, 
                         (-1*MAXPRIOR), MSG_NOERROR))==-1)
        {
            perror("从消息队列里面读取消息失败\n");
            return(-1);
        }
         else 
         {
            r_entry.mtext[mlen]='\0';

            //pro_obj完成实际的处理工作,就好比打印机系统把文件输出到打印设备上
             proc_obj(&r_entry);
        }
    }
}
View Code

etest函数(向队列里面放入一个成员,而stest则就可以处理队列中的一个成员

#include<stdio.h>
#include<stdlib.h>
#include"q.h"
//etest向队列里面放入一个成员,而stest则就可以处理队列中的一个成员
main(int argc, char **argv) {
    int priority,n;
    if (argc!=3) {

        fprintf(stderr,"usage: %s 对象的优先权:\n",argv[0]);
        exit(1);
    }
    if ((priority=atol(argv[2]))<=0|priority>MAXPRIOR) {
        warn("队列优先顺序无效。");
        exit(2);
    }
    if (enter(argv[1], priority)<0) {
        warn("enter failure");
        exit(3);
    }
    exit(0);
}
View Code

stest函数(服务器进程函数,处理队列里面元素的函数)

#include <sys/types.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
truct msg_buf{
     int mtype;//消息类型
     char data[255];//数据
};

int main(int argc,  char *argv[])
{
      key_t key;
      int msgid;
      int ret;
      struct msg_buf msgbuf;
      //获取key值
      key = ftok(".",  'a');
      printf("key = [%x]\n",  key);

      //创建消息队列
      msgid = msgget(key,  IPC_CREAT|0666);/*通过文件对应*/
      if(msgid == -1)
      {
            printf("creat error\n");
            return -1;
      }

      //以当前进程类型,非阻塞方式发送"test data"到消息队列
      msgbuf.mtype = getpid();
      strcpy(msgbuf.data,  "test data");
      ret = msgsnd(msgid,  &msgbuf,  sizeof(msgbuf.data),  IPC_NOWAIT);
      if(ret == -1)
      {
            printf("send message err\n");
            return -1;
      }

      //以非阻塞方式接收数据
      memset(&msgbuf,  0,  sizeof(msgbuf));
      ret = msgrcv(msgid,  &msgbuf,  sizeof(msgbuf.data), getpid(),  IPC_NOWAIT);
      if(ret == -1)
      {
            printf("receive message err\n");
            return -1;
      }
      printf("receive msg = [%s]\n",  msgbuf.data);
      return 0;
}
View Code

 

实验结果:

运行这两个简单程序。在服务器进程stest启动之前,etest向队列放入3条消息。注意,这些消息最终被输出的顺序。

image

 

删除消息队列:

image

 

相关思考
1、 清空消息队列里面的全部消息
方法:更改proc_obj函数:

int proc_obj(struct q_entry *msg) {

return 0;//printf("优先权: %ld 对象名: %s \n",msg->mtype, msg->mtext);

}

分析:被改后的函数不再把消息输出到界面,而是往缓存里面里返回0值,此前缓存里面是存放取出的消息内容,被替换成0值了。消息也就不存在了。

结果如下:

image

2、消息队列成为某一特定优先级的专用队列:
方法:

只要在enter函数里面增加一个if判段就可以了,这里假定消息队列只给优先级为1的用,如下图:

image

结果如下:

image

3、过滤给定优先级的所有消息
方法:

可以再把所给定的优先级的消息过滤掉,方法是在enter函数加上一个优先级的判断语句,但暂时没有实现清空已有优先级的消息。

添加的判断语句为:

image

结果如下:

image

实验扩展补充:

linux下进程间通信的几种主要手段简介:

  1. 管道(Pipe)及有名管道(named pipe):管道可用于具有亲缘关系进程间的通信,有名管道克服了管道没有名字的限制,因此,除具有管道所具有的功能外,它还允许无亲缘关系进程间的通信;
  2. 信号(Signal):信号是比较复杂的通信方式,用于通知接受进程有某种事件发生,除了用于进程间通信外,进程还可以发送信号给进程本身;linux除了支持Unix早期信号语义函数sigal外,还支持语义符合Posix.1标准的信号函数sigaction(实际上,该函数是基于BSD的,BSD为了实现可靠信号机制,又能够统一对外接口,用sigaction函数重新实现了signal函数);
  3. 报文(Message)队列(消息队列):消息队列是消息的链接表,包括Posix消息队列system V消息队列。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。消息队列克服了信号承载信息量少,管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  4. 共享内存:使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
  5. 信号量(semaphore):主要作为进程间以及同一进程不同线程之间的同步手段。
  6. 套接口(Socket):更为一般的进程间通信机制,可用于不同机器之间的进程间通信。起初是由Unix系统的BSD分支开发出来的,但现在一般可以移植到其它类Unix系统上:Linux和System V的变种都支持套接字。

关于爱程序网 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 人才招聘 - 帮助