Featured image of post 深入解析嵌入式 Linux 之消息队列:进程间通信的桥梁

深入解析嵌入式 Linux 之消息队列:进程间通信的桥梁

深入解析嵌入式 Linux 之消息队列:进程间通信的桥梁

消息队列的概念

Linux系统中进程间通信的一种方式,能传输用户自定义类型的数据,相同类型数据按到达顺序排队,不同数据类型数据存于不同队列。

这里有个关键细节:不同类型的数据不能放在同一个消息队列中。也就是说,系统中可以同时存在多个消息队列,每个队列只存储一种类型的数据;因此,进程读取消息时,必须指定数据类型,才能从对应队列中获取有效数据。

问题:系统如何管理多个消息队列?进程如何选择队列?

每个消息队列都有一个唯一的键值(key),这个key就相当于消息队列的“身份证”,进程通过指定key值,就能精准定位到目标消息队列,实现消息的收发。

在Linux中,使用命令ipcs -a可以查看系统中所有的IPC对象(包含消息队列、共享内存、信号量组)的详细信息。

image-20260206210514865

image-20260206210315117

创建消息队列:从命令到函数

当系统中不存在目标消息队列时,我们需要先创建它,Linux提供了两种方式:ipcmk命令(快速创建)msgget()函数(编程创建),后者也是实际开发中最常用的方式。

ipcmk命令

该命令用于快速创建IPC对象,创建消息队列的语法如下:

1
2
ipcmk -Q   		  # -Q 表示创建消息队列,默认权限为0644
ipcmk -Q -m 0666  # 可指定权限(八进制),消息队列无需执行权限

image-20260206211005604

image-20260206211118318

msgget()函数

msgget()函数的作用是:创建一个新的消息队列,或打开一个已存在的消息队列,是编程中操作消息队列的“入口”函数。

image-20260206214245402

第一个参数需要传入一个key_t类型的值,该值指的是要创建的消息队列的key键值,key也可以称为密钥。 键值类型key_t其实在内核源码中指的是int类型

第二个参数指的是创建消息队列的标志,其中IPC_CREAT指的是如果消息队列不存在则创建,IPC_EXCL指的是如果消息队列存在则表示函数调用失败。

另外,也可以指定消息队列的权限,权限的结构和open函数的mode类型,采用八进制表示,只不过消息队列不需要设置执行权限,所以权限设置为****0644****即可。

问题:key的值由用户自定义可以吗?

可以由用户指定,但不建议手动指定。因为如果多个进程同时创建消息队列,手动指定的key值可能重复,导致队列创建失败。

正确做法:由系统生成唯一的key值,Linux提供了ftok()函数,专门用于生成System-V IPC对象的唯一key值。

ftok()函数:生成唯一的键值

ftok()函数的核心逻辑:将一个已存在的文件路径和一个用户指定的项目ID,转换为唯一的key值

image-20260206220517365

第一个参数指的是系统中已经存在并且可以访问的一个文件的路径,用户可以指定一个文件,但是该文件必须存在且可以被访问,其实就是为了得到文件的属性信息中的inode编号和设备编号,因为Linux系统中一个文件的inode编号是唯一的。

第二个参数指的是项目ID,这个可以由用户进行指定,虽然参数proj_id的类型是整型int,但是只会用到最低8it,所以这个参数的范围其实是1~255,因为这个参数的值必须是非0值。

ftok()函数生成的键值key的组成:proj_id的低8位+ 设备编号的低8位+ inode编号的低16位。

stat()函数:获取文件信息

stat()函数用于获取文件的详细属性信息。

image-20260206221403994

image-20260206221948544

访问消息队列:收发消息(msgsnd() & msgrcv())

创建消息队列后,进程之间就可以通过它收发消息了。Linux提供了两个核心函数:msgsnd()(发送消息)msgrcv()(接收消息),两者配合使用,实现按类型收发数据。

image-20260206224118388

msgsnd()发送消息

msgsnd()函数的作用:将指定类型的消息,发送到指定的消息队列中

1
2
3
4
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg);  // 成功返回0,失败返回-1

image-20260206225655757

  • int msgid:消息队列的标识符,由msgget()函数返回。

  • const void *msgp:指向消息结构体的指针,该结构体需要用户自定义,必须包含两个成员:

    1
    2
    3
    4
    5
    
     // 自定义消息结构体(固定格式,mtype必须是第一个成员,且大于0)
    struct msgbuf {
        long mtype;  // 消息类型,必须是大于0的正整数(用户自定义)
        char mtext[1024];  // 消息正文,可自定义长度和类型(如数组、其他结构体)
    };
    
  • size_t msgsz:消息正文(mtext)的大小(按字节计算),可以是0(表示无消息正文,仅传递类型),必须是非负整数。

  • int msgflg:发送消息的标志,常用值:

    • 0:默认阻塞模式。如果消息队列剩余空间不足,进程会阻塞,直到队列有足够空间容纳消息。
    • IPC_NOWAIT:非阻塞模式。如果队列剩余空间不足,直接返回-1,并设置错误码(EAGAIN),不阻塞进程。

msgrcv()接收消息

msgrcv()函数的作用:从指定的消息队列中,接收指定类型的消息

1
2
3
4
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msgid, void *msgp, size_t msgsz, long msgtyp, int msgflg);  // 成功返回接收的字节数,失败返回-1

image-20260206230735132

  • int msgid:消息队列的标识符(同msgsnd())。
  • void *msgp:存放接收消息的缓存区,类型为自定义的msgbuf结构体指针(与发送时的结构体一致)。
  • size_t msgsz:缓存区(mtext)的大小(按字节计算),如果消息正文长度大于msgsz,需通过msgflg控制处理方式。
  • long msgtyp:要接收的消息类型,核心参数,有三种取值方式:
    • msgtyp = 0:不区分类型,接收队列中的第一个消息(先进先出)。
    • msgtyp > 0:接收类型为msgtyp的第一个消息;若配合msgflg=MSG_EXCEPT,则接收除msgtyp之外的第一个消息。
    • msgtyp < 0:接收类型小于等于msgtyp绝对值的第一个消息(取最小类型)。例如:msgtyp=-3,会接收类型为1、2、3中最小类型的第一个消息。
  • int msgflg:接收消息的标志,常用值:
    • 0:默认阻塞模式,若队列中无指定类型的消息,进程会阻塞,直到有对应消息。
    • IPC_NOWAIT:非阻塞模式,无对应消息时直接返回-1,不阻塞。
    • MSG_EXCEPT:与msgtyp>0配合,接收除msgtyp之外的第一个消息。
    • MSG_NOERROR:如果消息正文长度大于msgsz,截断消息,只接收msgsz字节,不报错(否则返回-1)。
函数 默认阻塞场景 非阻塞方式 错误码(非阻塞时)
msgsnd() 消息队列满(字节数 / 消息数达上限) `msgflg = IPC_NOWAIT` EAGAIN
msgrcv() 队列中无符合 msgtyp 的消息 `msgflg = IPC_NOWAIT` ENOMSG

实操练习:进程间通过消息队列+信号通信

需求:设计两个进程(A和B),实现以下逻辑:

  1. 进程A创建一个消息队列,然后向进程B发送SIGUSR1信号(通知B队列已创建)。
  2. 进程B捕获SIGUSR1信号后,打开该消息队列,将自己的PID作为消息正文,发送到消息队列中。
  3. 进程B发送消息后,向进程A发送SIGUSR2信号(通知A消息已发送)。
  4. 进程A捕获SIGUSR2信号后,从消息队列中读取消息,输出消息正文(进程B的PID)。

进程A

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

// UNIX 系统调用头文件(按功能分类)
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/msg.h>


int flag = 0;
struct msgbuf
{
	long mtype;  //消息类型,必须大于0
	int  mtext;  //消息正文
}first;

pid_t pid_A;
pid_t pid_B;

void signal_handler(int signum) {
    if (signum == SIGUSR2) {
        printf("【进程A】收到来自进程B的SIGUSR2信号\n");
        flag = 1;
    }

}

int main(int argc, char *argv[]) 
{
    //创建有命名管道
    int mkfifo_result = mkfifo("./myfifo", 0666);
    if (mkfifo_result == -1) {
       fprintf(stderr,"Fifo created failed,error[%d],%s\n",errno,strerror(errno));
        return 1;
    }
    printf("【进程A】命名管道创建成功:%s\n", "./myfifo");
    //打开有命名管道
    int fd = open("./myfifo", O_WRONLY);
    if (fd == -1) {
        fprintf(stderr, "Failed to open FIFO: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }
    //向有命名管道写入自己的pid_A
    pid_A = getpid();
    char pid_str[32];
    sprintf(pid_str, "%d", pid_A);
    write(fd, pid_str, strlen(pid_str)+1);
    close(fd);
    printf("【进程A】已向管道写入自己的PID:%s\n", pid_str);

    //打开管道读取B的pid
    int fd_read = open("./myfifo", O_RDONLY);
    if (fd_read == -1) {
        fprintf(stderr, "Failed to open FIFO for reading: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }

    char buffer[32];
    int bytes_read = read(fd_read, buffer, sizeof(buffer)-1);
    if (bytes_read > 0) {
        buffer[bytes_read] = '\0'; 
        printf("【进程A】从管道读取到的B的PID: %s\n", buffer);
    } else {
        fprintf(stderr, "Failed to read from FIFO: error[%d],%s\n", errno, strerror(errno));
    }
    close(fd_read);

     //获取KEY
    key_t key = ftok(".", 1);
    //创建消息队列
    int msgid = msgget(key, IPC_CREAT | 0666);
    if (msgid == -1) {
        fprintf(stderr, "Failed to create message queue: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }
    printf("【进程A】消息队列创建成功,msgid: %d\n", msgid);

    //向进程B发送信号
    pid_B = atoi(buffer);
    if (kill(pid_B, SIGUSR1) == -1) {
        fprintf(stderr, "Failed to send signal to B: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }
    printf("【进程A】已向进程B发送SIGUSR1信号\n");
    signal(SIGUSR2, signal_handler);
    while (!flag);
    if (flag) {
        if (msgrcv(msgid, &first, sizeof(first.mtext), 1, 0) == -1) {
            fprintf(stderr, "Failed to receive message: error[%d],%s\n", errno, strerror(errno));
            return 1;
        }
        printf("【进程A】从消息队列收到消息,内容为:%d\n", first.mtext);
    }
    return 0;
}

进程B

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

// UNIX 系统调用头文件(按功能分类)
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int flag = 0;
struct msgbuf
{
	long mtype;  //消息类型,必须大于0
	int  mtext;  //消息正文
}first;
pid_t pid_A;
pid_t pid_B;

void signal_handler(int signum) {
    if (signum == SIGUSR1) {
        printf("【进程B】收到来自进程A的SIGUSR1信号\n");
        flag = 1;
    }

}

int main(int argc, char *argv[]) 
{

    //打开有命名管道读取A的pid
    int fd = open("./myfifo", O_RDWR);
    if (fd == -1) {
        fprintf(stderr, "Failed to open FIFO for reading: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }

    char buffer[32];
    int bytes_read = read(fd, buffer, sizeof(buffer)-1);
    if (bytes_read > 0) {
        buffer[bytes_read] = '\0'; 
        printf("【进程B】从管道读取到的A的PID: %s\n", buffer);
        pid_A = atoi(buffer);
    } else {
        fprintf(stderr, "Failed to read from FIFO: error[%d],%s\n", errno, strerror(errno));
    }

    //向有命名管道写入自己的pid
    pid_B = getpid();
    char pid_str[32];
    sprintf(pid_str, "%d", pid_B);
    write(fd, pid_str, strlen(pid_str)+1);
    printf("【进程B】已向管道写入自己的PID:%s\n", pid_str);
    close(fd);

    // 获取消息队列
    key_t key = ftok(".", 1);
    int msgid = msgget(key, 0666);
    if (msgid == -1) {
        fprintf(stderr, "Failed to create message queue: error[%d],%s\n", errno, strerror(errno));
        return 1;
    }
    signal(SIGUSR1, signal_handler);
    while(!flag);

    if(flag){//发送消息队列
    first.mtype = 1;
    first.mtext = pid_B;
    msgsnd(msgid, &first, sizeof(first.mtext), 0);
    printf("【进程B】已向消息队列发送消息,内容为:%d\n", first.mtext);
    //向进程A发送信号
    if (kill(pid_A, SIGUSR2) == -1) {
        fprintf(stderr, "Failed to send signal to A : error[%d],%s\n", errno, strerror(errno));
        return 1;
    }
    printf("【进程B】已向进程A发送SIGUSR2信号\n");
    }
    

    return 0;
}

这里我是用了,管道来发送和接收对方的PID,因为使用信号通信,所以用管道来接收PID

控制消息队列:删除与属性管理(msgctl())

IPC对象(包括消息队列)是持久性资源,不会随进程终止而自动销毁,必须手动删除,否则会一直占用系统资源,导致资源泄露。

Linux提供了msgctl()函数,用于管理消息队列,支持三种操作:获取队列属性、设置队列属性、删除队列(最常用)。

指令删除(快速清理用)

终端中使用ipcrm命令,可快速删除消息队列,语法如下:

1
2
ipcrm -q msgid  # 通过msgid删除(最精准,msgid可通过ipcs -q查看)
ipcrm -Q key    # 通过key值删除

image-20260207104123349

函数控制(编程核心)

image-20260207105726254
image-20260207111614726
image-20260207111835294

msgctl()函数语法如下:

1
2
3
4
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf);  // 成功返回0,失败返回-1
  • int msgid:消息队列的标识符。
  • int cmd:要执行的操作命令,常用值:
    • IPC_RMID:删除消息队列(最常用),删除后队列不可再访问,所有未读取的消息会被清空。
    • IPC_STAT:获取消息队列的属性(如队列容量、消息数量),存储到buf指向的msqid_ds结构体中。
    • IPC_SET:设置消息队列的属性(如权限),通过buf指向的msqid_ds结构体配置。
  • struct msqid_ds *buf:指向msqid_ds结构体的指针,用于存储/设置队列属性;如果cmd=IPC_RMID,可设为NULL(无需属性)。
1
2
3
4
5
6
// 删除msgid对应的消息队列
if (msgctl(msgid, IPC_RMID, NULL) == -1) {
    perror("msgctl error");  // 报错提示
    exit(-1);
}
printf("消息队列删除成功\n");

总结

  1. 消息队列的核心优势:按类型收发数据,支持结构化交互,是System-V IPC对象之一。
  2. 三个核心函数:msgget()(创建/打开队列)、msgsnd()/msgrcv()(收发消息)、msgctl()(删除/管理队列)。
  3. 关键细节:key值的生成(ftok()函数)、消息类型的指定、队列的手动删除(避免资源泄露)。
最后更新于 2026-03-26 23:00
...