C++ 进程间通信(IPC):共享内存、消息队列、管道

各位观众,各位大佬,欢迎来到今天的IPC(Inter-Process Communication,进程间通信)技术讲座! 今天我们要聊的是C++世界里,进程之间如何愉快地交流,互通有无,一起协作完成任务。 咱们要讲的重点是三种非常经典且常用的IPC方式:共享内存、消息队列和管道。

开场白:为什么我们需要IPC?

想象一下,你是一个乐队的指挥,手下有吉他手、鼓手、贝斯手等等不同的乐手,他们分别负责不同的乐器。 如果每个人都只顾着自己演奏,那肯定是一锅粥,噪音污染! 要想演奏出美妙的音乐,他们必须相互协调,互相配合,这就是IPC的作用!

在计算机世界里,进程就像乐队里的乐手,它们是独立的个体,有自己的内存空间,互不干扰。 但是,很多时候,我们需要多个进程一起工作,共同完成一个复杂的任务。 这时候,就需要一种机制,让它们能够互相交流信息,协同工作,这就是进程间通信(IPC)。

第一乐章:共享内存——“嘿,哥们,我把东西放这儿了,你自己拿!”

共享内存就像一个公共的储物柜,多个进程都可以访问它。 一个进程可以把数据放进储物柜,另一个进程可以从储物柜里取出数据。 这种方式非常高效,因为进程不需要复制数据,直接访问共享的内存区域。

优点:

  • 速度快:进程直接访问内存,无需数据复制。
  • 延迟低:适用于对实时性要求高的场景。

缺点:

  • 同步复杂:多个进程同时访问共享内存,需要复杂的同步机制(例如互斥锁、信号量)来避免数据竞争。
  • 安全性:如果一个进程写入了错误的数据,可能会影响到其他进程。

代码示例(基于POSIX):

#include <iostream>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h> // For mutex

// 定义共享内存的大小
#define SHM_SIZE 1024

// 定义共享内存的键值
#define SHM_KEY 1234

// 互斥锁,用于同步对共享内存的访问
pthread_mutex_t mutex;

//生产者进程
void producer() {
    // 获取共享内存ID
    int shmid = shmget(SHM_KEY, SHM_SIZE, IPC_CREAT | 0666);
    if (shmid == -1) {
        perror("shmget failed");
        exit(1);
    }

    // 将共享内存映射到进程的地址空间
    char* shm = (char*)shmat(shmid, NULL, 0);
    if (shm == (char*) -1) {
        perror("shmat failed");
        exit(1);
    }

    // 准备要写入的数据
    const char* message = "Hello from producer!";

    // 上锁,保证只有一个进程可以写入共享内存
    pthread_mutex_lock(&mutex);

    // 写入数据
    strncpy(shm, message, SHM_SIZE - 1);
    shm[SHM_SIZE - 1] = ''; // 确保字符串以 null 结尾

    std::cout << "Producer wrote: " << message << std::endl;

    // 解锁
    pthread_mutex_unlock(&mutex);

    // 分离共享内存
    if (shmdt(shm) == -1) {
        perror("shmdt failed");
        exit(1);
    }
}

//消费者进程
void consumer() {
    // 获取共享内存ID
    int shmid = shmget(SHM_KEY, SHM_SIZE, 0666);
    if (shmid == -1) {
        perror("shmget failed");
        exit(1);
    }

    // 将共享内存映射到进程的地址空间
    char* shm = (char*)shmat(shmid, NULL, 0);
    if (shm == (char*) -1) {
        perror("shmat failed");
        exit(1);
    }

    // 上锁,保证只有一个进程可以读取共享内存
    pthread_mutex_lock(&mutex);

    // 读取数据
    std::cout << "Consumer read: " << shm << std::endl;

    // 解锁
    pthread_mutex_unlock(&mutex);

    // 分离共享内存
    if (shmdt(shm) == -1) {
        perror("shmdt failed");
        exit(1);
    }

    // 删除共享内存 (可选,由一个进程负责)
    if (shmctl(shmid, IPC_RMID, NULL) == -1) {
        perror("shmctl failed");
        exit(1);
    }
}

int main() {
    // 初始化互斥锁
    if (pthread_mutex_init(&mutex, NULL) != 0) {
        perror("Mutex init failed");
        return 1;
    }

    // 创建子进程
    pid_t pid = fork();

    if (pid == 0) {
        // 子进程 (消费者)
        consumer();
    } else if (pid > 0) {
        // 父进程 (生产者)
        producer();
        wait(NULL); // 等待子进程结束
    } else {
        perror("fork failed");
        return 1;
    }

    // 销毁互斥锁
    pthread_mutex_destroy(&mutex);

    return 0;
}

代码解释:

  1. shmget(): 创建或获取一个共享内存段的ID。SHM_KEY 是一个唯一的键值,用于标识共享内存段。 IPC_CREAT 表示如果共享内存段不存在,则创建它。 0666 是权限,允许所有用户读写。
  2. shmat(): 将共享内存段映射到进程的地址空间。 shm 是一个指向共享内存段的指针。
  3. shmdt(): 分离共享内存段,取消映射。
  4. shmctl(): 控制共享内存段,例如删除它。 IPC_RMID 表示删除共享内存段。
  5. pthread_mutex_t mutex;: 定义了一个互斥锁,用于同步对共享内存的访问。
  6. pthread_mutex_lock(&mutex);: 尝试获取互斥锁。 如果锁已经被其他线程/进程持有,则当前线程/进程会阻塞,直到锁可用。
  7. pthread_mutex_unlock(&mutex);: 释放互斥锁。

重点提示:

  • 共享内存需要手动管理,包括创建、映射、分离和删除。
  • 同步机制(如互斥锁)是使用共享内存的关键,否则会出现数据竞争。
  • 错误处理很重要,要检查每个函数的返回值,确保程序能够正确处理错误。

第二乐章:消息队列——“快递小哥,帮我把这个包裹送到隔壁老王家!”

消息队列就像一个邮局,进程可以把消息发送到队列中,另一个进程可以从队列中接收消息。 这种方式允许进程异步通信,发送者不需要等待接收者,提高了程序的并发性。

优点:

  • 异步通信:发送者和接收者不需要同时在线。
  • 解耦:发送者和接收者不需要知道彼此的细节。
  • 灵活:可以发送不同类型和大小的消息。

缺点:

  • 速度相对较慢:需要数据复制。
  • 可能存在消息丢失:如果队列满了,新的消息可能会被丢弃。
  • 需要额外的管理:例如,需要定义消息的格式和大小。

代码示例(基于POSIX):

#include <iostream>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <cstring>
#include <unistd.h>

// 定义消息结构体
struct message {
    long msg_type; // 消息类型,必须是 long 类型
    char msg_text[100]; // 消息内容
};

// 定义消息队列的键值
#define MSG_KEY 5678

//发送者进程
void sender() {
    // 获取消息队列ID
    int msgid = msgget(MSG_KEY, IPC_CREAT | 0666);
    if (msgid == -1) {
        perror("msgget failed");
        exit(1);
    }

    // 创建消息
    message msg;
    msg.msg_type = 1; // 消息类型
    strcpy(msg.msg_text, "Hello from sender!");

    // 发送消息
    if (msgsnd(msgid, &msg, sizeof(msg.msg_text), 0) == -1) {
        perror("msgsnd failed");
        exit(1);
    }

    std::cout << "Sender sent: " << msg.msg_text << std::endl;
}

//接收者进程
void receiver() {
    // 获取消息队列ID
    int msgid = msgget(MSG_KEY, 0666);
    if (msgid == -1) {
        perror("msgget failed");
        exit(1);
    }

    // 接收消息
    message msg;
    if (msgrcv(msgid, &msg, sizeof(msg.msg_text), 1, 0) == -1) {
        perror("msgrcv failed");
        exit(1);
    }

    std::cout << "Receiver received: " << msg.msg_text << std::endl;

    // 删除消息队列 (可选,由一个进程负责)
    if (msgctl(msgid, IPC_RMID, NULL) == -1) {
        perror("msgctl failed");
        exit(1);
    }
}

int main() {
    // 创建子进程
    pid_t pid = fork();

    if (pid == 0) {
        // 子进程 (接收者)
        receiver();
    } else if (pid > 0) {
        // 父进程 (发送者)
        sender();
        wait(NULL); // 等待子进程结束
    } else {
        perror("fork failed");
        return 1;
    }

    return 0;
}

代码解释:

  1. msgget(): 创建或获取一个消息队列的ID。MSG_KEY 是一个唯一的键值,用于标识消息队列。 IPC_CREAT 表示如果消息队列不存在,则创建它。 0666 是权限,允许所有用户读写。
  2. msgsnd(): 向消息队列发送消息。 msgid 是消息队列的ID,&msg 是要发送的消息的地址,sizeof(msg.msg_text) 是消息的大小,0 是标志位,通常设置为 0。
  3. msgrcv(): 从消息队列接收消息。 msgid 是消息队列的ID,&msg 是用于存储接收到的消息的地址,sizeof(msg.msg_text) 是消息的最大大小,1 是消息类型,0 是标志位,通常设置为 0。
  4. msgctl(): 控制消息队列,例如删除它。 IPC_RMID 表示删除消息队列。
  5. struct message: 定义了消息的结构体,msg_type 必须是 long 类型,用于消息的过滤。

重点提示:

  • 消息队列需要定义消息的结构体,包括消息类型和消息内容。
  • 消息类型可以用于过滤消息,接收者可以选择只接收特定类型的消息。
  • 消息队列的大小是有限制的,需要合理设置。
  • 删除消息队列通常由一个进程负责,避免多个进程同时删除。

第三乐章:管道——“对讲机,一头说,一头听!”

管道就像一个单向的对讲机,一个进程可以向管道写入数据,另一个进程可以从管道读取数据。 管道分为两种:匿名管道和命名管道(也叫FIFO)。

  • 匿名管道: 只能用于父子进程或兄弟进程之间的通信。
  • 命名管道(FIFO): 可以用于任意进程之间的通信。

优点:

  • 简单易用:管道是操作系统提供的基本IPC机制。
  • 数据流式传输:数据以字节流的形式传输。

缺点:

  • 单向通信:只能在一个方向上进行通信。
  • 速度相对较慢:需要数据复制。
  • 需要同步:需要避免多个进程同时读写管道。

代码示例(匿名管道):

#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>

int main() {
    int pipefd[2]; // pipefd[0] 是读端, pipefd[1] 是写端
    pid_t pid;
    std::string message = "Hello from parent!";

    // 创建管道
    if (pipe(pipefd) == -1) {
        perror("pipe failed");
        return 1;
    }

    // 创建子进程
    pid = fork();

    if (pid == 0) {
        // 子进程 (读取管道)
        close(pipefd[1]); // 关闭写端

        char buffer[100];
        ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);
        if (bytes_read == -1) {
            perror("read failed");
            return 1;
        }

        buffer[bytes_read] = ''; // 确保字符串以 null 结尾
        std::cout << "Child received: " << buffer << std::endl;

        close(pipefd[0]); // 关闭读端
    } else if (pid > 0) {
        // 父进程 (写入管道)
        close(pipefd[0]); // 关闭读端

        write(pipefd[1], message.c_str(), message.length());
        std::cout << "Parent sent: " << message << std::endl;

        close(pipefd[1]); // 关闭写端
        wait(NULL); // 等待子进程结束
    } else {
        perror("fork failed");
        return 1;
    }

    return 0;
}

代码解释:

  1. pipe(): 创建一个管道。 pipefd[0] 是管道的读端,pipefd[1] 是管道的写端。
  2. close(): 关闭管道的读端或写端。
  3. read(): 从管道的读端读取数据。
  4. write(): 向管道的写端写入数据。

代码示例(命名管道):

#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <sys/stat.h>
#include <fcntl.h>

#define FIFO_PATH "/tmp/my_fifo"

//发送者进程
void sender() {
    // 创建 FIFO (如果不存在)
    if (mkfifo(FIFO_PATH, 0666) == -1 && errno != EEXIST) {
        perror("mkfifo failed");
        exit(1);
    }

    // 打开 FIFO 用于写入
    int fd = open(FIFO_PATH, O_WRONLY);
    if (fd == -1) {
        perror("open failed");
        exit(1);
    }

    std::string message = "Hello from sender!";
    write(fd, message.c_str(), message.length());
    std::cout << "Sender sent: " << message << std::endl;

    close(fd);
}

//接收者进程
void receiver() {
    // 打开 FIFO 用于读取
    int fd = open(FIFO_PATH, O_RDONLY);
    if (fd == -1) {
        perror("open failed");
        exit(1);
    }

    char buffer[100];
    ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
    if (bytes_read == -1) {
        perror("read failed");
        exit(1);
    }

    buffer[bytes_read] = '';
    std::cout << "Receiver received: " << buffer << std::endl;

    close(fd);

    // 删除 FIFO (可选,由一个进程负责)
    unlink(FIFO_PATH);
}

int main() {
    // 创建子进程
    pid_t pid = fork();

    if (pid == 0) {
        // 子进程 (接收者)
        receiver();
    } else if (pid > 0) {
        // 父进程 (发送者)
        sender();
        wait(NULL); // 等待子进程结束
    } else {
        perror("fork failed");
        return 1;
    }

    return 0;
}

代码解释:

  1. mkfifo(): 创建一个命名管道(FIFO)。 FIFO_PATH 是FIFO的路径,0666 是权限,允许所有用户读写。 EEXIST 错误表示FIFO已经存在。
  2. open(): 打开FIFO用于读取或写入。 O_RDONLY 表示只读,O_WRONLY 表示只写。
  3. unlink(): 删除FIFO。

重点提示:

  • 匿名管道只能用于父子进程或兄弟进程之间的通信。
  • 命名管道可以用于任意进程之间的通信。
  • 管道是单向的,如果需要双向通信,需要创建两个管道。
  • 需要注意管道的阻塞行为,如果管道为空,read() 函数会阻塞,直到有数据可读;如果管道满了,write() 函数会阻塞,直到有空间可写。

总结:选择合适的IPC方式

选择哪种IPC方式取决于具体的应用场景和需求。 下面是一个简单的比较表格:

特性 共享内存 消息队列 管道
速度
同步 复杂 简单 简单
灵活性
适用场景 大量数据传输,实时性要求高 异步通信,解耦 父子进程通信,简单数据传输
共享范围 本机 本机 本机

结束语:IPC的艺术

IPC就像一门艺术,需要根据实际情况选择合适的工具和技巧。 掌握了这些IPC技术,你就可以让你的C++程序更好地协作,构建出更强大的应用!希望今天的讲座对大家有所帮助,谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注