Redis 延迟队列:基于 ZSet 或 Stream 的实现

好的,咱们今天来聊聊Redis的延迟队列,这玩意儿在异步任务处理中可是个宝贝。别害怕,咱们不搞那些高深的理论,就用大白话和实际代码,把基于ZSet和Stream两种方式的实现给它扒个精光。

啥是延迟队列?为啥要用Redis?

想象一下,你搞了个电商网站,用户下单后不是立马就要发货,可能要等个30分钟,让用户有机会取消订单。或者说,你有个定时任务,需要在每天凌晨3点跑一下。这些场景,都需要用到延迟队列。

延迟队列,说白了,就是把要执行的任务先“藏”起来,等到预定的时间点再拿出来执行。

那为啥要用Redis来实现呢?原因很简单:

  • 速度快: Redis是基于内存的,读写速度嗖嗖的。
  • 简单易用: Redis的数据结构非常适合实现延迟队列。
  • 持久化: Redis支持持久化,不怕数据丢了。
  • 成熟稳定: Redis经过了大量的实践检验,稳定性没得说。

第一种方法:ZSet(Sorted Set)实现延迟队列

ZSet,有序集合,是Redis里面一个非常有用的数据结构。它里面的每个元素都有一个score,可以根据score进行排序。这简直就是为延迟队列量身定做的啊!

原理:

  1. 把要延迟执行的任务作为ZSet的member,把任务的执行时间戳作为score。
  2. 使用一个后台线程,不停地轮询ZSet,取出score小于当前时间戳的任务。
  3. 把取出的任务丢给worker线程去执行。

代码示例(Python):

import redis
import time
import json
import threading

class RedisDelayQueueZSet:
    def __init__(self, redis_host='localhost', redis_port=6379, queue_name='delay_queue_zset'):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port)
        self.queue_name = queue_name

    def enqueue(self, task, delay_seconds):
        """
        添加任务到延迟队列
        :param task: 任务内容 (可以是任何可序列化的对象)
        :param delay_seconds: 延迟秒数
        """
        execute_time = time.time() + delay_seconds
        task_json = json.dumps(task) # 将task序列化为JSON字符串
        self.redis_client.zadd(self.queue_name, {task_json: execute_time})
        print(f"Task enqueued: {task}, execute at: {execute_time}")

    def dequeue(self):
        """
        从延迟队列中取出到期任务
        :return: 到期任务列表
        """
        now = time.time()
        # 获取score小于等于当前时间的所有任务
        tasks = self.redis_client.zrangebyscore(self.queue_name, min=0, max=now)
        if not tasks:
            return []

        # 移除取出的任务,保证原子性
        with self.redis_client.pipeline() as pipe:
            pipe.zremrangebyscore(self.queue_name, min=0, max=now)
            pipe.execute()

        # 反序列化任务
        return [json.loads(task.decode('utf-8')) for task in tasks]

    def process_tasks(self):
        """
        处理到期任务的后台线程
        """
        while True:
            tasks = self.dequeue()
            if tasks:
                for task in tasks:
                    print(f"Processing task: {task}")
                    # 在这里执行你的任务逻辑,比如发送邮件、更新数据库等
            else:
                time.sleep(1) # 没有任务时,稍微休息一下

    def start_consumer(self):
        """
        启动消费者线程
        """
        consumer_thread = threading.Thread(target=self.process_tasks)
        consumer_thread.daemon = True # 设置为守护线程
        consumer_thread.start()

# 使用示例
if __name__ == '__main__':
    delay_queue = RedisDelayQueueZSet()
    delay_queue.start_consumer()

    # 添加一些测试任务
    delay_queue.enqueue({'message': 'Hello, world!'}, 5)
    delay_queue.enqueue({'message': 'Another task!'}, 10)

    # 主线程可以继续做其他事情
    time.sleep(15) # 运行15秒,看看任务是否被执行
    print("Done.")

代码解释:

  • enqueue(task, delay_seconds):添加任务到队列,计算执行时间戳,并使用zadd命令添加到ZSet中。
  • dequeue():从队列中取出到期任务,使用zrangebyscore命令获取score小于等于当前时间的所有任务,然后使用zremrangebyscore命令删除这些任务,保证原子性。
  • process_tasks():后台线程,不断轮询队列,取出到期任务并执行。
  • start_consumer():启动消费者线程。

ZSet实现的优缺点:

  • 优点: 实现简单,容易理解。
  • 缺点:
    • 轮询: 需要一个后台线程不停地轮询ZSet,比较消耗CPU资源。
    • 并发问题: 如果多个线程同时轮询,可能会取出重复的任务。代码中使用了pipeline和zremrangebyscore解决了并发问题
    • 无法保证任务的顺序: ZSet是根据score排序的,如果多个任务的score相同,那么它们的执行顺序是不确定的。
    • 如果任务量非常大,ZSet会变得很大,影响性能。

第二种方法:Stream实现延迟队列

Redis 5.0 引入了Stream数据结构,这玩意儿可是个好东西,它支持消息的持久化、消息的ID、消费组等特性,非常适合用来实现消息队列。

原理:

  1. 把要延迟执行的任务作为Stream的message,使用XADD命令添加到Stream中。XADD命令可以指定MAXLEN参数,限制Stream的长度,防止Stream无限增长。
  2. 使用XREAD命令,指定BLOCK参数,阻塞式地读取Stream中的消息。
  3. 当消息的添加时间加上延迟时间小于当前时间时,说明消息到期,可以取出并执行。

代码示例(Python):

import redis
import time
import json
import threading

class RedisDelayQueueStream:
    def __init__(self, redis_host='localhost', redis_port=6379, stream_name='delay_queue_stream', group_name='my_group', consumer_name='consumer_1'):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port)
        self.stream_name = stream_name
        self.group_name = group_name
        self.consumer_name = consumer_name

        try:
            self.redis_client.xgroup_create(self.stream_name, self.group_name, id='0', mkstream=True)
        except redis.exceptions.ResponseError as e:
            if str(e).startswith('BUSYGROUP'):
                print("Consumer Group already exists. Continuing...")
            else:
                raise e

    def enqueue(self, task, delay_seconds):
        """
        添加任务到延迟队列
        :param task: 任务内容 (可以是任何可序列化的对象)
        :param delay_seconds: 延迟秒数
        """
        execute_time = time.time() + delay_seconds
        task_json = json.dumps(task)
        message = {'task': task_json, 'execute_time': execute_time}
        self.redis_client.xadd(self.stream_name, message)
        print(f"Task enqueued: {task}, execute at: {execute_time}")

    def dequeue(self):
        """
        从延迟队列中取出到期任务
        :return: 到期任务列表
        """
        while True:
            try:
                # 使用 XREADGROUP 命令从消费组中读取消息,BLOCK 阻塞等待
                messages = self.redis_client.xreadgroup(groupname=self.group_name, consumername=self.consumer_name, streams={self.stream_name: '>'}, block=1000) # block=1000 阻塞1秒
                if messages:
                    stream_name, message_list = messages[0]
                    for message_id, message_data in message_list:
                        try:
                            task = json.loads(message_data[b'task'].decode('utf-8'))
                            execute_time = float(message_data[b'execute_time'].decode('utf-8'))

                            if time.time() >= execute_time:
                                yield task, message_id
                            else:
                                # 重新入队,延迟时间缩短
                                remaining_delay = execute_time - time.time()
                                self.enqueue(task, remaining_delay)
                                #确认消费,避免消息丢失
                                self.redis_client.xack(self.stream_name, self.group_name, message_id)
                                print(f"Task requeued with delay: {remaining_delay}")

                        except Exception as e:
                            print(f"Error processing message: {e}")
                        finally:
                            # 确认消息已经被处理,从 Stream 中移除
                            self.redis_client.xack(self.stream_name, self.group_name, message_id)
                            print(f"Task acknowledged: {message_id}")

            except redis.exceptions.ConnectionError as e:
                print(f"Connection error: {e}. Reconnecting in 5 seconds...")
                time.sleep(5)
            except Exception as e:
                print(f"Unexpected error: {e}")

    def process_tasks(self):
        """
        处理到期任务的后台线程
        """
        for task, message_id in self.dequeue():
            print(f"Processing task: {task}")
            # 在这里执行你的任务逻辑,比如发送邮件、更新数据库等
            # 确认消息已经被处理,从 Stream 中移除 (在dequeue中已经处理)
            # self.redis_client.xack(self.stream_name, self.group_name, message_id)
            # print(f"Task acknowledged: {message_id}")

    def start_consumer(self):
        """
        启动消费者线程
        """
        consumer_thread = threading.Thread(target=self.process_tasks)
        consumer_thread.daemon = True  # 设置为守护线程
        consumer_thread.start()

# 使用示例
if __name__ == '__main__':
    delay_queue = RedisDelayQueueStream()
    delay_queue.start_consumer()

    # 添加一些测试任务
    delay_queue.enqueue({'message': 'Hello, world!'}, 5)
    delay_queue.enqueue({'message': 'Another task!'}, 10)

    # 主线程可以继续做其他事情
    time.sleep(15)  # 运行15秒,看看任务是否被执行
    print("Done.")

代码解释:

  • enqueue(task, delay_seconds):添加任务到队列,计算执行时间戳,并使用xadd命令添加到Stream中。
  • dequeue():使用xreadgroup命令从消费组中读取消息,BLOCK参数指定阻塞等待,避免空轮询。如果消息未到期,重新入队。
  • process_tasks():后台线程,不断轮询队列,取出到期任务并执行。
  • start_consumer():启动消费者线程。

Stream实现的优缺点:

  • 优点:
    • 阻塞式读取: 使用XREAD命令的BLOCK参数,避免了空轮询,节省CPU资源。
    • 消息持久化: Stream本身就支持消息的持久化,不用担心数据丢失。
    • 消费组: Stream支持消费组,可以实现多个消费者并行处理任务。
    • 有序性: Stream保证消息的顺序性。
  • 缺点:
    • 实现相对复杂: 相对于ZSet,Stream的实现要稍微复杂一些。
    • 需要维护消费组: 需要创建和维护消费组。

ZSet vs Stream:选哪个?

特性 ZSet Stream
实现难度 简单 相对复杂
CPU消耗 轮询,较高 阻塞式读取,较低
数据持久化 需要配置RDB或AOF 内置支持
消息顺序性 无法保证 保证
并发处理 需要自己处理,容易出现重复消费 消费组支持,方便实现并发处理
适用场景 任务量不大,对顺序性要求不高 任务量大,对顺序性要求高,需要并发处理
消息丢失风险 有一定风险,需要额外处理保证原子性 较低,通过ACK机制保证

总结:

  • 如果你的任务量不大,对顺序性要求不高,而且你追求简单,那么ZSet是个不错的选择。
  • 如果你的任务量很大,对顺序性要求很高,而且你希望实现并发处理,那么Stream是更好的选择。

一些额外的建议:

  • 监控: 一定要对延迟队列进行监控,包括队列长度、任务执行情况等,及时发现问题。
  • 重试机制: 对于执行失败的任务,可以考虑加入重试机制,增加任务执行成功的概率。
  • 死信队列: 对于重试多次仍然失败的任务,可以将其放入死信队列,方便后续处理。
  • 序列化: 任务内容可以使用JSON或其他序列化方式,方便存储和传输。
  • 错误处理: 在处理任务时,一定要做好错误处理,防止程序崩溃。

好了,今天关于Redis延迟队列的分享就到这里。希望通过今天的讲解,你能够对Redis延迟队列有更深入的理解,并在实际项目中灵活运用。记住,没有最好的方案,只有最适合你的方案!编码愉快!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注