Redis `Streams` 作为消息队列:精确一次消费与消息重试

大家好!今天咱们来聊聊 Redis Streams,这玩意儿作为消息队列,怎么实现“精确一次消费”和“消息重试”。记住,咱们的目标是既要确保消息不丢,又要避免重复处理,还得优雅地处理消费失败的情况。

Redis Streams:不止是个日志

首先,别把 Redis Streams 仅仅看成一个高级版的日志系统。它虽然能记录事件流,但更重要的是,它提供了强大的消费组 (Consumer Groups) 功能,这让它具备了成为一个靠谱的消息队列的潜力。

精确一次消费:理论与现实

“精确一次消费”(Exactly-Once Semantics)听起来很美好,但实现起来异常复杂。在分布式系统中,彻底的精确一次消费几乎是不可能的,或者说成本太高。我们通常追求的是“至少一次” (At-Least-Once) 结合“幂等性” (Idempotence) 来模拟“精确一次”。

  • 至少一次 (At-Least-Once): 保证每条消息至少被消费一次。这意味着消息可能被重复消费。
  • 幂等性 (Idempotence): 如果一个操作执行多次,其结果与执行一次相同,那么这个操作就是幂等的。

Streams + Consumer Groups:消费的基础

Streams 的 Consumer Groups 允许一组消费者共同消费一个 Stream 的消息。 每个消息只会被分配给 Consumer Group 中的一个消费者。 这避免了多个消费者重复处理同一条消息。

代码示例:生产者

import redis

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 消息数据
message = {'event': 'user_registered', 'user_id': 123, 'timestamp': 1678886400}

# 添加消息到 Stream
message_id = r.xadd('user_events', message, maxlen=1000, approximate=True)  #maxlen限制长度,近似值

print(f"消息已发送,ID: {message_id}")

这段代码很简单,就是把一个用户注册事件添加到名为 user_events 的 Stream 中。xadd 命令负责添加消息。maxlen 参数限制 Stream 的长度,防止无限增长。approximate=True 允许 Redis 近似地修剪 Stream,提高性能。

代码示例:消费者 (初始消费)

import redis
import time

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 消费组和消费者信息
group_name = 'user_registration_group'
consumer_name = 'consumer_1'
stream_name = 'user_events'

# 尝试创建消费组 (如果不存在)
try:
    r.xgroup_create(stream_name, group_name, id='0-0', mkstream=True) #从头开始消费,mkstream=true stream不存在自动创建
except redis.exceptions.ResponseError as e:
    if str(e) == 'BUSYGROUP Consumer Group name already exists':
        print("消费组已存在,跳过创建")
    else:
        raise e

# 循环消费消息
while True:
    try:
        # 从 Stream 中读取消息
        response = r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '>'}, count=1, block=5000) #block 阻塞5秒
        if response:
            stream_name, messages = response[0]
            for message_id, message_data in messages:
                print(f"已收到消息: {message_id}, 数据: {message_data}")

                # 在这里处理消息 (模拟业务逻辑)
                try:
                    process_message(message_data) # 模拟业务逻辑
                    # 确认消息已处理
                    r.xack(stream_name, group_name, message_id)
                    print(f"消息 {message_id} 已确认")

                except Exception as e:
                    print(f"处理消息 {message_id} 失败: {e}")
                    # 暂时不确认消息,稍后重试
                    # 可以选择将错误信息记录下来

        else:
            print("没有新消息,等待...")
    except Exception as e:
        print(f"消费过程中发生错误: {e}")
    time.sleep(1)  # 稍微休息一下

def process_message(message_data):
    # 模拟业务逻辑:从消息数据中提取 user_id 并进行处理
    user_id = int(message_data.get(b'user_id').decode())  # 假设 user_id 是字节串
    print(f"正在处理用户 ID: {user_id}")
    # 模拟处理过程 (例如,发送欢迎邮件,更新数据库等)
    # ...
    if user_id % 2 == 0:
        raise Exception("模拟处理错误: 用户ID是偶数") #模拟出错
    print(f"用户 ID {user_id} 处理完成")
  • xgroup_create: 创建 Consumer Group。mkstream=True 表示如果 Stream 不存在,则自动创建。id='0-0' 表示从 Stream 的开头开始消费。
  • xreadgroup: 从 Consumer Group 中读取消息。streams={stream_name: '>'} 表示读取所有尚未被消费的消息。count=1 限制每次读取的消息数量。block=5000 表示如果 Stream 中没有消息,则阻塞 5 秒。
  • xack: 确认消息已处理。只有确认了消息,Redis 才会将消息从 Pending Entries List (PEL) 中移除。
  • process_message: 模拟处理消息的业务逻辑。

消息重试:PEL 的作用

Streams 的 Pending Entries List (PEL) 是实现消息重试的关键。PEL 记录了所有已分发给 Consumer Group,但尚未被确认 (ACKed) 的消息。

如果消费者在处理消息时崩溃或发生错误,消息将仍然保留在 PEL 中。 当消费者重新启动时,它可以检查 PEL 并重新处理这些未确认的消息。

代码示例:消费者 (处理 PEL 中的消息)

import redis
import time

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 消费组和消费者信息
group_name = 'user_registration_group'
consumer_name = 'consumer_1'
stream_name = 'user_events'

def process_pending_messages():
    """处理 PEL 中的消息"""
    pending_messages = r.xpending(stream_name, groupname=group_name, start='-', end='+', count=10) #一次最多处理10条
    if pending_messages and pending_messages[0] > 0: #确认有积压消息
        print("发现未处理的消息,正在尝试重新处理...")
        # 获取属于当前消费者的未确认消息
        response = r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '0'}, count=10, block=None) #block=None 不阻塞
        if response:
            stream_name, messages = response[0]
            for message_id, message_data in messages:
                print(f"重新处理消息: {message_id}, 数据: {message_data}")
                try:
                    process_message(message_data)
                    r.xack(stream_name, group_name, message_id)
                    print(f"消息 {message_id} 重新确认成功")
                except Exception as e:
                    print(f"重新处理消息 {message_id} 失败: {e}")
                    # 可以考虑增加重试次数限制,超过限制则记录错误并放弃重试
                    # 或者将消息发送到死信队列 (Dead Letter Queue)

# 循环消费消息 (包括 PEL 中的消息)
while True:
    try:
        # 先处理 PEL 中的消息
        process_pending_messages()

        # 从 Stream 中读取新消息
        response = r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '>'}, count=1, block=5000)
        if response:
            stream_name, messages = response[0]
            for message_id, message_data in messages:
                print(f"已收到消息: {message_id}, 数据: {message_data}")

                # 在这里处理消息 (模拟业务逻辑)
                try:
                    process_message(message_data)
                    # 确认消息已处理
                    r.xack(stream_name, group_name, message_id)
                    print(f"消息 {message_id} 已确认")

                except Exception as e:
                    print(f"处理消息 {message_id} 失败: {e}")
                    # 暂时不确认消息,稍后重试
                    # 可以选择将错误信息记录下来
        else:
            print("没有新消息,等待...")
    except Exception as e:
        print(f"消费过程中发生错误: {e}")
    time.sleep(1)  # 稍微休息一下

def process_message(message_data):
    # 模拟业务逻辑:从消息数据中提取 user_id 并进行处理
    user_id = int(message_data.get(b'user_id').decode())  # 假设 user_id 是字节串
    print(f"正在处理用户 ID: {user_id}")
    # 模拟处理过程 (例如,发送欢迎邮件,更新数据库等)
    # ...
    if user_id % 2 == 0:
        raise Exception("模拟处理错误: 用户ID是偶数") #模拟出错
    print(f"用户 ID {user_id} 处理完成")
  • xpending: 获取 PEL 中的消息信息。它可以告诉你某个 Consumer Group 中有多少未处理的消息。
  • xreadgroup (使用 0 作为消息 ID): 从 PEL 中读取属于当前消费者的未确认消息。
  • process_pending_messages: 定期检查并处理 PEL 中的消息。

幂等性:避免重复处理

即使我们使用了消息重试机制,仍然可能出现消息被重复处理的情况。 为了解决这个问题,我们需要确保消息处理的逻辑是幂等的。

实现幂等性的方法:

  1. 唯一 ID: 为每个消息分配一个全局唯一的 ID。在处理消息之前,检查这个 ID 是否已经被处理过。 如果是,则直接跳过。
  2. 版本号/时间戳: 如果消息涉及到数据的更新,可以使用版本号或时间戳来确保只有最新的消息被处理。
  3. 数据库约束: 利用数据库的唯一约束来防止重复插入数据。

代码示例:使用唯一 ID 实现幂等性

import redis
import uuid

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 生产者
def send_message(message_data):
    message_id = str(uuid.uuid4()) #生成唯一ID
    message = {'message_id': message_id, **message_data}
    r.xadd('user_events', message, maxlen=1000, approximate=True)
    print(f"消息已发送,ID: {message_id}")

# 消费者
def process_message(message_data):
    message_id = message_data.get(b'message_id').decode()
    user_id = int(message_data.get(b'user_id').decode())

    # 检查消息是否已经被处理过
    if is_message_processed(message_id):
        print(f"消息 {message_id} 已经被处理过,跳过")
        return

    print(f"正在处理消息: {message_id}, 用户 ID: {user_id}")
    # 模拟业务逻辑
    # ...

    # 标记消息为已处理
    mark_message_as_processed(message_id)
    print(f"消息 {message_id} 处理完成")

def is_message_processed(message_id):
    """检查消息是否已经被处理过 (使用 Redis Set 存储已处理的消息 ID)"""
    return r.sismember('processed_messages', message_id)

def mark_message_as_processed(message_id):
    """标记消息为已处理"""
    r.sadd('processed_messages', message_id)

# 使用示例
# send_message({'event': 'user_registered', 'user_id': 456, 'timestamp': 1678887000})

在这个例子中,我们使用 Redis Set processed_messages 来存储已经处理过的消息 ID。在处理消息之前,我们检查消息 ID 是否存在于 Set 中。 如果存在,则说明消息已经被处理过,直接跳过。

死信队列 (Dead Letter Queue):处理失败的消息

有些消息可能因为各种原因无法被成功处理,例如:

  • 消息格式错误
  • 依赖的服务不可用
  • 业务逻辑错误

对于这些无法处理的消息,我们可以将它们发送到死信队列 (Dead Letter Queue, DLQ)。 DLQ 是一个特殊的 Stream,用于存储所有处理失败的消息。

代码示例:使用死信队列

import redis
import time

# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 消费组和消费者信息
group_name = 'user_registration_group'
consumer_name = 'consumer_1'
stream_name = 'user_events'
dlq_stream_name = 'user_events_dlq'

MAX_RETRIES = 3 #最大重试次数

def process_message(message_data, retry_count=0):
    user_id = int(message_data.get(b'user_id').decode())
    print(f"正在处理用户 ID: {user_id}, 重试次数: {retry_count}")
    # 模拟业务逻辑
    # ...
    if user_id % 2 == 0:
        raise Exception("模拟处理错误: 用户ID是偶数")
    print(f"用户 ID {user_id} 处理完成")

def send_to_dlq(message_id, message_data, error_message):
    """将消息发送到死信队列"""
    dlq_message = {'original_message_id': message_id.decode(), 'data': message_data, 'error': error_message}
    r.xadd(dlq_stream_name, dlq_message)
    print(f"消息 {message_id} 已发送到死信队列")

# 循环消费消息
while True:
    try:
        response = r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '>'}, count=1, block=5000)
        if response:
            stream_name, messages = response[0]
            for message_id, message_data in messages:
                try:
                    process_message(message_data)
                    r.xack(stream_name, group_name, message_id)
                    print(f"消息 {message_id} 已确认")

                except Exception as e:
                    print(f"处理消息 {message_id} 失败: {e}")
                    # 获取消息的重试次数
                    pending_info = r.xpending(stream_name, groupname=group_name, start=message_id, end=message_id, count=1) #获取消息积压信息
                    retry_count = pending_info[0][3] if pending_info[0] else 0

                    if retry_count < MAX_RETRIES:
                        print(f"消息 {message_id} 重试次数 {retry_count + 1} < {MAX_RETRIES}, 稍后重试")
                        # 不确认消息,等待重试
                    else:
                        print(f"消息 {message_id} 达到最大重试次数,发送到死信队列")
                        send_to_dlq(message_id, message_data, str(e))
                        r.xack(stream_name, group_name, message_id) #确认消息,防止无限重试
        else:
            print("没有新消息,等待...")
    except Exception as e:
        print(f"消费过程中发生错误: {e}")
    time.sleep(1)
  • dlq_stream_name: 定义死信队列的 Stream 名称。
  • send_to_dlq: 将消息发送到死信队列。
  • MAX_RETRIES: 定义最大重试次数。
  • process_message 中,如果处理消息失败,我们会检查消息的重试次数。 如果重试次数小于 MAX_RETRIES,则不确认消息,等待重试。 如果重试次数达到 MAX_RETRIES,则将消息发送到死信队列并确认消息。

总结:Redis Streams + 精确一次消费

技术点 说明 代码示例
Consumer Groups 允许多个消费者共同消费一个 Stream,每个消息只会被分配给一个消费者。 r.xgroup_create(stream_name, group_name, id='0-0', mkstream=True)
r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '>'}, count=1, block=5000)
PEL 记录所有已分发但未确认的消息。 消费者可以检查 PEL 并重新处理未确认的消息。 r.xpending(stream_name, groupname=group_name, start='-', end='+', count=10)
r.xreadgroup(groupname=group_name, consumername=consumer_name, streams={stream_name: '0'}, count=10, block=None)
幂等性 确保消息处理操作执行多次的结果与执行一次的结果相同。 可以使用唯一 ID、版本号/时间戳、数据库约束等方法实现。 生产者: 添加 message_id = str(uuid.uuid4())
消费者: if is_message_processed(message_id): return
mark_message_as_processed(message_id)
is_message_processed(message_id)
死信队列 用于存储处理失败的消息。 可以将无法处理的消息发送到 DLQ,以便后续分析和处理。 dlq_stream_name = 'user_events_dlq'
send_to_dlq(message_id, message_data, str(e))
r.xadd(dlq_stream_name, dlq_message)
重试机制 在处理消息失败时,可以进行重试。可以设置最大重试次数,超过最大重试次数则将消息放入死信队列。 获取重试次数:pending_info = r.xpending(stream_name, groupname=group_name, start=message_id, end=message_id, count=1)
判断是否重试:if retry_count < MAX_RETRIES:
发送到DLQ:send_to_dlq(message_id, message_data, str(e))

总而言之,利用 Redis Streams 的 Consumer Groups 和 PEL,结合幂等性设计和死信队列,我们可以构建一个相对可靠的消息队列系统, 尽可能地接近“精确一次消费”的目标。 当然,这需要根据具体的业务场景进行调整和优化。 希望今天的分享对大家有所帮助!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注