Redis 设计模式:用 Redis 解决常见系统设计问题

各位观众老爷们,大家好!今天咱们来聊聊Redis这位“老司机”在系统设计中是如何大显身手的。别看它是个内存数据库,但用对了地方,那效果杠杠的!咱们今天不搞虚的,直接上干货,聊聊几个常见的Redis设计模式,看看它怎么解决实际问题。

一、缓存(Cache-Aside)模式:Redis的看家本领

这是Redis最常见的应用场景,也是它赖以成名的绝技。简单来说,就是把热点数据放到Redis里,减少数据库的压力。

工作流程:

  1. 查询数据: 先查Redis,如果命中(Cache Hit),直接返回。
  2. 未命中: 如果Redis没找到(Cache Miss),再去数据库查。
  3. 更新缓存: 从数据库拿到数据后,把它写入Redis,然后再返回给用户。

代码示例(Python):

import redis
import time

# 假设我们有个数据库操作函数
def get_data_from_db(key):
    print(f"从数据库读取数据,key={key}")
    time.sleep(1)  # 模拟数据库查询耗时
    # 实际应用中,这里会连接数据库并查询
    data = f"Data from DB for {key}"
    return data

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def get_data(key):
    """
    从缓存中获取数据,如果不存在则从数据库获取并更新缓存。
    """
    try:
        data = r.get(key)
        if data:
            data = data.decode('utf-8') #从字节码转换成字符串
            print(f"从缓存读取数据,key={key}, value={data}")
            return data
        else:
            print(f"缓存未命中,key={key}")
            data = get_data_from_db(key)
            r.set(key, data, ex=60)  # 设置过期时间为60秒
            print(f"数据已写入缓存,key={key}, value={data}")
            return data
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        # 如果Redis连接失败,直接从数据库获取数据
        data = get_data_from_db(key)
        return data

# 测试
print(get_data("user:123"))
print(get_data("user:123")) # 第二次直接从缓存读取
print(get_data("product:456"))

优点:

  • 简单粗暴: 代码逻辑清晰,易于理解和实现。
  • 读写分离: 数据库只负责存储,Redis负责加速读取,减轻数据库压力。

缺点:

  • 缓存穿透: 如果请求的key在数据库中也不存在,每次都会打到数据库。解决方案:缓存空对象或使用布隆过滤器。
  • 缓存击穿: 如果某个热点key过期了,大量请求同时到达,会直接打到数据库。解决方案:设置永不过期或使用互斥锁。
  • 缓存雪崩: 大量key同时过期,导致大量请求打到数据库。解决方案:设置不同的过期时间或使用二级缓存。
  • 数据一致性问题: 当数据库数据更新时,需要同时更新缓存,否则可能出现数据不一致。解决方案:先更新数据库,再删除缓存;或者使用Canal等工具监听数据库变更。

二、发布/订阅(Pub/Sub)模式:消息的广播站

Redis的Pub/Sub功能可以实现消息的发布和订阅,类似于一个消息广播站。发布者发布消息,订阅者接收消息。

工作流程:

  1. 发布者: 向指定的频道(Channel)发布消息。
  2. 订阅者: 订阅一个或多个频道,接收发布者发布的消息。

代码示例(Python):

import redis
import threading
import time

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

# 发布者
def publisher():
    pub = r.pubsub()
    for i in range(5):
        message = f"Message {i} from publisher"
        r.publish('my_channel', message)
        print(f"Published: {message}")
        time.sleep(1)

# 订阅者
def subscriber():
    pub = r.pubsub()
    pub.subscribe('my_channel')
    print("Subscriber is listening...")
    for message in pub.listen():
        if message['type'] == 'message':
            data = message['data'].decode('utf-8')
            print(f"Received: {data}")

# 创建线程
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)

# 启动线程
subscriber_thread.start()
time.sleep(0.5) # 确保订阅者先启动
publisher_thread.start()

publisher_thread.join()
# subscriber_thread.join()  # 注意:这里注释掉,因为订阅者会一直监听,直到手动停止

优点:

  • 解耦: 发布者和订阅者之间无需知道彼此的存在,降低了系统耦合度。
  • 实时性: 消息可以实时地传递给订阅者。
  • 扩展性: 可以轻松地增加或减少订阅者。

缺点:

  • 不可靠: 如果订阅者离线,消息会丢失。如果需要可靠的消息传递,应该使用消息队列(如RabbitMQ、Kafka)。
  • 无持久化: 消息不会被持久化,Redis重启后消息会丢失。

适用场景:

  • 实时消息推送: 比如聊天室、实时监控等。
  • 事件通知: 比如用户注册成功后,通知其他模块。
  • 配置更新: 比如配置中心更新后,通知所有应用。

三、计数器(Counter)模式:记录你的每一次点击

Redis的INCR命令可以实现原子性的自增操作,非常适合用于计数器。

代码示例(Python):

import redis

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def increment_counter(key):
    """
    原子性地增加计数器的值。
    """
    try:
        count = r.incr(key)
        print(f"Counter {key} incremented to {count}")
        return count
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

def get_counter(key):
    """
    获取计数器的值。
    """
    try:
        count = r.get(key)
        if count:
            count = int(count.decode('utf-8')) #从字节码转换成字符串并转成int
            print(f"Counter {key} value is {count}")
            return count
        else:
            print(f"Counter {key} not found")
            return 0
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

# 测试
increment_counter("page_views:home")
increment_counter("page_views:home")
get_counter("page_views:home")
increment_counter("page_views:product:123")
get_counter("page_views:product:123")

适用场景:

  • 页面浏览量统计: 记录每个页面的访问次数。
  • 点赞数统计: 记录每篇文章或视频的点赞数。
  • 在线用户数统计: 记录当前在线用户数。
  • 限制请求频率(Rate Limiting): 限制用户在一定时间内发送请求的次数。

四、限流(Rate Limiting)模式:别太贪心哦

Redis可以用来实现各种限流策略,防止恶意用户或爬虫对系统造成冲击。

常见限流算法:

  • 计数器(Counter): 在一定时间内,记录请求次数,超过阈值则拒绝请求。
  • 滑动窗口(Sliding Window): 比计数器更精确,可以平滑地限制请求速率。
  • 漏桶(Leaky Bucket): 请求先进入漏桶,然后以恒定的速率流出,超过容量则丢弃请求。
  • 令牌桶(Token Bucket): 系统以恒定的速率向桶中放入令牌,请求需要先获取令牌才能通过,如果桶中没有令牌则拒绝请求。

代码示例(Python,基于计数器):

import redis
import time

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def is_rate_limited(user_id, limit, period):
    """
    基于计数器的限流。

    Args:
        user_id: 用户ID。
        limit: 时间窗口内的最大请求次数。
        period: 时间窗口的秒数。

    Returns:
        True: 超过限制,False: 未超过限制。
    """
    key = f"rate_limit:{user_id}"
    now = int(time.time())

    try:
        count = r.get(key)
        if count:
            count = int(count.decode('utf-8')) #从字节码转换成字符串并转成int
            if count >= limit:
                print(f"用户 {user_id} 超过请求限制")
                return True
            else:
                r.incr(key)
                print(f"用户 {user_id} 请求次数增加到 {count + 1}")
                return False
        else:
            # 如果key不存在,则设置key并设置过期时间
            r.set(key, 1, ex=period)
            print(f"用户 {user_id} 首次请求,设置计数器")
            return False
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return True # 假设连接错误时,也进行限流

# 测试
user_id = "user:123"
limit = 5
period = 60  # 60秒

for i in range(10):
    if is_rate_limited(user_id, limit, period):
        print("请求被拒绝")
    else:
        print("请求被允许")
    time.sleep(5) # 模拟请求

五、排行榜(Leaderboard)模式:谁是第一名?

Redis的Sorted Set(有序集合)非常适合用于实现排行榜。它可以根据分数(Score)对成员进行排序,并快速地获取排名信息。

代码示例(Python):

import redis

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def update_score(user_id, score):
    """
    更新用户在排行榜中的分数。
    """
    try:
        r.zadd("leaderboard", {user_id: score})
        print(f"用户 {user_id} 的分数更新为 {score}")
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")

def get_rank(user_id):
    """
    获取用户在排行榜中的排名。
    """
    try:
        rank = r.zrevrank("leaderboard", user_id)
        if rank is not None:
            rank = int(rank) + 1  # 排名从1开始
            print(f"用户 {user_id} 的排名是 {rank}")
            return rank
        else:
            print(f"用户 {user_id} 不在排行榜中")
            return None
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

def get_top_n(n):
    """
    获取排行榜前N名。
    """
    try:
        top_n = r.zrevrange("leaderboard", 0, n - 1, withscores=True)
        print(f"排行榜前 {n} 名:")
        for user, score in top_n:
            user = user.decode('utf-8') #从字节码转换成字符串
            print(f"  {user}: {score}")
        return top_n
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

# 测试
update_score("user:1", 100)
update_score("user:2", 200)
update_score("user:3", 150)
update_score("user:4", 250)

get_rank("user:3")
get_top_n(3)

六、会话管理(Session Management)模式:记住你的登录状态

Redis可以用来存储用户的会话信息,实现分布式会话管理。

工作流程:

  1. 用户登录: 用户登录成功后,生成一个唯一的Session ID。
  2. 存储会话信息: 将用户的会话信息(如用户名、用户ID等)存储到Redis中,Key为Session ID,Value为会话信息。
  3. 设置过期时间: 设置会话的过期时间,防止会话长期占用Redis资源。
  4. 验证会话: 每次用户请求时,从Cookie中获取Session ID,然后从Redis中获取会话信息,验证用户是否已登录。

代码示例(Python,简化版):

import redis
import uuid
import json

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def create_session(user_id):
    """
    创建会话并存储到Redis。
    """
    session_id = str(uuid.uuid4())
    session_data = {"user_id": user_id, "login_time": int(time.time())}
    try:
        r.setex(session_id, 3600, json.dumps(session_data)) # 过期时间 3600 秒
        print(f"为用户 {user_id} 创建会话,Session ID: {session_id}")
        return session_id
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

def get_session(session_id):
    """
    从Redis获取会话信息。
    """
    try:
        session_data = r.get(session_id)
        if session_data:
            session_data = json.loads(session_data.decode('utf-8')) #从字节码转换成字符串并转成json
            print(f"获取到会话信息: {session_data}")
            return session_data
        else:
            print("会话不存在或已过期")
            return None
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

def delete_session(session_id):
    """
    从Redis删除会话。
    """
    try:
        r.delete(session_id)
        print(f"会话 {session_id} 已删除")
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")

# 测试
user_id = "user:123"
session_id = create_session(user_id)

if session_id:
    session = get_session(session_id)
    if session:
        print(f"用户ID: {session['user_id']}, 登录时间: {session['login_time']}")
    delete_session(session_id)

七、地理位置(Geospatial)模式:附近的人都在哪?

Redis 3.2 版本之后,增加了对 Geospatial 数据类型的支持,可以用来存储地理位置信息,并进行地理位置相关的计算。

常用命令:

  • GEOADD: 添加地理位置信息。
  • GEODIST: 计算两个地理位置之间的距离。
  • GEORADIUS: 查找指定半径内的地理位置。
  • GEORADIUSBYMEMBER: 查找以某个成员为中心,指定半径内的地理位置。
  • GEOHASH: 获取地理位置的 Geohash 值。

代码示例(Python):

import redis

# Redis连接配置
redis_host = 'localhost'
redis_port = 6379
redis_db = 0

# 创建Redis连接
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

def add_location(place_id, longitude, latitude):
    """
    添加地理位置信息。
    """
    try:
        r.geoadd("places", (longitude, latitude, place_id))
        print(f"添加地理位置信息:{place_id} - ({longitude}, {latitude})")
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")

def get_distance(place_id1, place_id2, unit="km"):
    """
    计算两个地理位置之间的距离。
    """
    try:
        distance = r.geodist("places", place_id1, place_id2, unit=unit)
        if distance:
            print(f"{place_id1} 和 {place_id2} 之间的距离是 {distance} {unit}")
            return distance
        else:
            print(f"{place_id1} 或 {place_id2} 不存在")
            return None
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

def find_nearby(longitude, latitude, radius, unit="km"):
    """
    查找指定半径内的地理位置。
    """
    try:
        nearby_places = r.georadius("places", longitude, latitude, radius, unit=unit, withdist=True, withcoord=True, sort="ASC")
        print(f"附近 {radius} {unit} 内的地点:")
        for place in nearby_places:
            place_id = place[0].decode('utf-8') #从字节码转换成字符串
            distance = place[1]
            longitude = place[2][0]
            latitude = place[2][1]
            print(f"  {place_id}: 距离 {distance} {unit}, 坐标 ({longitude}, {latitude})")
        return nearby_places
    except redis.exceptions.ConnectionError as e:
        print(f"Redis连接错误: {e}")
        return None

# 测试
add_location("place:1", 116.4074, 39.9042)  # 北京
add_location("place:2", 121.4737, 31.2304)  # 上海
add_location("place:3", 114.0579, 22.5431)  # 深圳

get_distance("place:1", "place:2")
find_nearby(116.4074, 39.9042, 500, unit="km") # 查找北京附近500公里内的地点

总结

Redis的设计模式多种多样,远不止上面提到的这些。关键在于理解Redis的特性,然后根据实际业务场景灵活运用。记住,没有最好的模式,只有最适合的模式。希望今天的分享能帮助大家更好地使用Redis,让你的系统跑得更快、更稳!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注