Redis 大 Key 发现与优化:拆分、过期与异步删除

各位观众,大家好!今天咱们来聊聊Redis里的“大块头”——大Key。这玩意儿,就像你家冰箱里塞满了过期食品,看着挺唬人,用起来卡得让你怀疑人生。所以,咱得想办法把它们揪出来,好好收拾收拾。

啥叫大Key?为啥要怕它?

所谓大Key,就是指Redis里Value特别大的Key。具体多大算大?这没个绝对标准,得看你的Redis配置和业务场景。一般来说,String类型超过几兆,Hash、List、Set、ZSet类型元素数量超过几千,就可以算作大Key了。

为啥要怕它?因为大Key会带来一堆问题:

  • 读写慢: 读写大Key需要传输大量数据,消耗大量CPU和网络带宽,直接影响Redis的性能。
  • 阻塞Redis: Redis是单线程的,如果一个大Key的读写操作耗时过长,会阻塞其他请求,导致整个Redis服务响应变慢。
  • 内存爆炸: 大Key占用大量内存,如果Redis内存不足,可能导致OOM(Out Of Memory)错误,直接让Redis崩溃。
  • 主从同步延迟: 主节点同步大Key到从节点需要传输大量数据,导致主从同步延迟,影响数据一致性。

总之,大Key就像定时炸弹,随时可能给你的Redis服务带来麻烦。

如何发现大Key?

发现大Key的方法有很多,咱们来逐一看看:

  1. redis-cli --bigkeys 命令:

这是Redis官方提供的命令行工具,可以扫描整个Redis实例,找出占用内存最多的Key。这个命令会分析键的类型、大小,并给出一些建议。

redis-cli --bigkeys -h 127.0.0.1 -p 6379

这个命令会连接到本地6379端口的Redis实例,并扫描大Key。执行结果会告诉你每种数据类型最大的Key,以及它们的占用内存大小。

这个命令的原理是遍历所有的key,然后使用debug object命令获取每个key的序列化长度,然后按照类型进行分类,最终展示每个类型最大的key。

  1. redis-rdb-tools 工具:

这是一个开源的RDB文件分析工具,可以分析Redis的RDB快照文件,找出占用内存最多的Key。 相比于redis-cli --bigkeys,它可以离线分析,不会影响Redis的在线性能。

首先,你需要安装 redis-rdb-tools

pip install redis-rdb-tools

然后,可以使用 rdb --command memory 命令分析 RDB 文件:

rdb -c memory dump.rdb

这个命令会分析 dump.rdb 文件,并按照内存占用大小排序,列出最大的Key。

  1. SCAN 命令:

如果你不想使用外部工具,也可以自己编写脚本,使用 SCAN 命令遍历Redis实例,然后使用 MEMORY USAGE 命令获取每个Key的占用内存大小。

这是一个Python示例:

import redis

def find_big_keys(host='localhost', port=6379, db=0, threshold=1024 * 1024):  # 默认1MB
    """
    扫描Redis实例,找出大于指定阈值的Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    cursor = '0'
    big_keys = []

    while cursor != '0':
        cursor, keys = r.scan(cursor=cursor, count=1000)  # 每次扫描1000个key
        for key in keys:
            memory_usage = r.memory_usage(key)
            if memory_usage > threshold:
                key_str = key.decode('utf-8')
                type = r.type(key).decode('utf-8')
                big_keys.append((key_str, memory_usage, type))

    # 按照内存占用大小排序
    big_keys.sort(key=lambda x: x[1], reverse=True)

    print("Found the following big keys:")
    for key, memory_usage, type in big_keys:
        print(f"Key: {key}, Memory Usage: {memory_usage} bytes, Type: {type}")

if __name__ == '__main__':
    find_big_keys()

这个脚本会连接到本地6379端口的Redis实例,扫描所有Key,找出大于1MB的Key,并按照内存占用大小排序输出。

  1. 监控系统:

如果你的Redis实例已经接入了监控系统(比如 Prometheus、Grafana),可以监控Redis的内存使用情况、CPU使用率、网络带宽等指标。如果发现这些指标异常升高,可能就是大Key导致的。 很多监控系统也会提供单独的大key监控功能,比如通过分析 slowlog 找到潜在的大key访问。

如何优化大Key?

发现了大Key,接下来就要想办法优化它们。优化的方法有很多,咱们来逐一看看:

  1. 拆分:

这是最常用的方法,将一个大Key拆分成多个小Key。拆分的方法取决于Key的数据类型和业务场景。

  • String: 如果String类型的Key存储的是一个大的JSON字符串,可以将其拆分成多个小JSON字符串,每个小JSON字符串对应一部分数据。
import redis
import json

def split_string_key(host='localhost', port=6379, db=0, key='big_string_key', chunk_size=1024):
    """
    将一个String类型的Key拆分成多个小Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    value = r.get(key)
    if value is None:
        print(f"Key {key} not found.")
        return

    value_str = value.decode('utf-8')
    value_len = len(value_str)
    num_chunks = (value_len + chunk_size - 1) // chunk_size  # 计算需要拆分成多少个chunk

    for i in range(num_chunks):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, value_len)
        chunk = value_str[start:end]
        new_key = f"{key}:part{i}"  # 新key的命名规则
        r.set(new_key, chunk)
        print(f"Created new key: {new_key}")

    # 删除原来的大Key
    r.delete(key)
    print(f"Deleted original key: {key}")

if __name__ == '__main__':
    # 示例:假设你的Redis中有一个名为 'big_string_key' 的String类型的Key
    # 先设置一个大的String值
    r = redis.Redis(host='localhost', port=6379, db=0)
    big_value = json.dumps({"data": [f"item_{i}" for i in range(5000)]})  # 模拟一个大的JSON
    r.set('big_string_key', big_value)
    split_string_key(key='big_string_key')
  • Hash: 如果Hash类型的Key存储的是大量的字段和值,可以将其拆分成多个小Hash,每个小Hash存储一部分字段和值。或者,将Hash结构转换成String结构存储,key的命名方式为 原key:field
import redis

def split_hash_key(host='localhost', port=6379, db=0, key='big_hash_key', chunk_size=100):
    """
    将一个Hash类型的Key拆分成多个小Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    hash_data = r.hgetall(key)
    if not hash_data:
        print(f"Key {key} not found or is not a Hash.")
        return

    keys = list(hash_data.keys())
    num_chunks = (len(keys) + chunk_size - 1) // chunk_size

    for i in range(num_chunks):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, len(keys))
        new_key = f"{key}:part{i}"
        chunk_data = {}
        for k in keys[start:end]:
            chunk_data[k] = hash_data[k]
        r.hmset(new_key, chunk_data)
        print(f"Created new key: {new_key} with {len(chunk_data)} fields.")

    r.delete(key)
    print(f"Deleted original key: {key}")

if __name__ == '__main__':
    # 示例:假设你的Redis中有一个名为 'big_hash_key' 的Hash类型的Key
    # 先设置一个大的Hash值
    r = redis.Redis(host='localhost', port=6379, db=0)
    big_hash = {f"field_{i}": f"value_{i}" for i in range(500)}
    r.hmset('big_hash_key', big_hash)
    split_hash_key(key='big_hash_key')
  • List: 如果List类型的Key存储的是大量的元素,可以将其拆分成多个小List,每个小List存储一部分元素。或者,可以将List结构转换成String结构存储,用分隔符分隔每个元素。
import redis

def split_list_key(host='localhost', port=6379, db=0, key='big_list_key', chunk_size=100):
    """
    将一个List类型的Key拆分成多个小Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    list_len = r.llen(key)
    if list_len == 0:
        print(f"Key {key} not found or is empty.")
        return

    num_chunks = (list_len + chunk_size - 1) // chunk_size

    for i in range(num_chunks):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, list_len)
        new_key = f"{key}:part{i}"
        chunk_values = r.lrange(key, start, end - 1)
        if chunk_values:
            r.rpush(new_key, *chunk_values)  # 使用 rpush 一次性添加多个元素
            print(f"Created new key: {new_key} with {len(chunk_values)} elements.")

    r.delete(key)
    print(f"Deleted original key: {key}")

if __name__ == '__main__':
    # 示例:假设你的Redis中有一个名为 'big_list_key' 的List类型的Key
    # 先设置一个大的List值
    r = redis.Redis(host='localhost', port=6379, db=0)
    big_list = [f"item_{i}" for i in range(500)]
    r.rpush('big_list_key', *big_list)
    split_list_key(key='big_list_key')
  • Set: 如果Set类型的Key存储的是大量的元素,可以将其拆分成多个小Set,每个小Set存储一部分元素。
import redis

def split_set_key(host='localhost', port=6379, db=0, key='big_set_key', chunk_size=100):
    """
    将一个Set类型的Key拆分成多个小Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    set_members = r.smembers(key)
    if not set_members:
        print(f"Key {key} not found or is empty.")
        return

    member_list = list(set_members) # 将 set 转换为 list
    num_chunks = (len(member_list) + chunk_size - 1) // chunk_size

    for i in range(num_chunks):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, len(member_list))
        new_key = f"{key}:part{i}"
        chunk_members = member_list[start:end]
        if chunk_members:
            r.sadd(new_key, *chunk_members)  # 使用 sadd 一次性添加多个元素
            print(f"Created new key: {new_key} with {len(chunk_members)} members.")

    r.delete(key)
    print(f"Deleted original key: {key}")

if __name__ == '__main__':
    # 示例:假设你的Redis中有一个名为 'big_set_key' 的Set类型的Key
    # 先设置一个大的Set值
    r = redis.Redis(host='localhost', port=6379, db=0)
    big_set = {f"item_{i}" for i in range(500)}
    r.sadd('big_set_key', *big_set)
    split_set_key(key='big_set_key')
  • ZSet: 如果ZSet类型的Key存储的是大量的元素和分数,可以将其拆分成多个小ZSet,每个小ZSet存储一部分元素和分数。
import redis

def split_zset_key(host='localhost', port=6379, db=0, key='big_zset_key', chunk_size=100):
    """
    将一个ZSet类型的Key拆分成多个小Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    zset_data = r.zrange(key, 0, -1, withscores=True) # 获取所有元素和分数
    if not zset_data:
        print(f"Key {key} not found or is empty.")
        return

    num_chunks = (len(zset_data) + chunk_size - 1) // chunk_size

    for i in range(num_chunks):
        start = i * chunk_size
        end = min((i + 1) * chunk_size, len(zset_data))
        new_key = f"{key}:part{i}"
        chunk_data = zset_data[start:end]
        if chunk_data:
            # 使用 pipeline 批量添加元素和分数
            with r.pipeline() as pipe:
                for member, score in chunk_data:
                    pipe.zadd(new_key, {member: score})
                pipe.execute()
            print(f"Created new key: {new_key} with {len(chunk_data)} members.")

    r.delete(key)
    print(f"Deleted original key: {key}")

if __name__ == '__main__':
    # 示例:假设你的Redis中有一个名为 'big_zset_key' 的ZSet类型的Key
    # 先设置一个大的ZSet值
    r = redis.Redis(host='localhost', port=6379, db=0)
    big_zset = {f"item_{i}": i for i in range(500)}
    r.zadd('big_zset_key', big_zset)
    split_zset_key(key='big_zset_key')

拆分之后,你需要修改你的应用程序代码,将对大Key的读写操作转换为对多个小Key的读写操作。

  1. 过期:

如果大Key存储的是一些临时数据,可以给它设置一个过期时间,让Redis自动删除它。设置过期时间可以使用 EXPIRE 命令。

import redis

def set_expiration(host='localhost', port=6379, db=0, key='big_key', expiration_time=3600):
    """
    设置Key的过期时间。
    """
    r = redis.Redis(host=host, port=port, db=db)
    result = r.expire(key, expiration_time)
    if result:
        print(f"Key {key} set to expire in {expiration_time} seconds.")
    else:
        print(f"Key {key} not found.")

if __name__ == '__main__':
    # 示例:设置 'big_key' 的过期时间为 1 小时 (3600 秒)
    r = redis.Redis(host='localhost', port=6379, db=0)
    r.set('big_key', 'some_big_value') # 先设置一个key
    set_expiration(key='big_key')
  1. 异步删除:

如果大Key的删除操作耗时过长,会阻塞Redis。可以使用异步删除的方式,将删除操作放到后台执行。Redis 4.0 之后提供了 UNLINK 命令,可以异步删除Key。UNLINK 命令会将Key从Redis的数据结构中unlink,然后由后台线程异步删除Key的Value。

import redis

def unlink_key(host='localhost', port=6379, db=0, key='big_key'):
    """
    异步删除Key。
    """
    r = redis.Redis(host=host, port=port, db=db)
    result = r.unlink(key)
    if result:
        print(f"Key {key} unlinked for asynchronous deletion.")
    else:
        print(f"Key {key} not found.")

if __name__ == '__main__':
    # 示例:异步删除 'big_key'
    r = redis.Redis(host='localhost', port=6379, db=0)
    r.set('big_key', 'some_big_value') # 先设置一个key
    unlink_key(key='big_key')
  1. 数据压缩:

如果大Key存储的是一些可以压缩的数据,可以先对数据进行压缩,然后再存储到Redis中。常用的压缩算法有Gzip、Snappy、LZ4等。

import redis
import gzip

def compress_and_set(host='localhost', port=6379, db=0, key='big_key', value='some_big_value'):
    """
    压缩数据并存储到Redis中。
    """
    r = redis.Redis(host=host, port=port, db=db)
    value_bytes = value.encode('utf-8')
    compressed_value = gzip.compress(value_bytes)  # 使用 gzip 压缩
    r.set(key, compressed_value)
    print(f"Compressed and set key: {key}")

def get_and_decompress(host='localhost', port=6379, db=0, key='big_key'):
    """
    从Redis中获取数据并解压缩。
    """
    r = redis.Redis(host=host, port=port, db=db)
    compressed_value = r.get(key)
    if compressed_value:
        decompressed_value = gzip.decompress(compressed_value).decode('utf-8')
        print(f"Got and decompressed value for key: {key}")
        return decompressed_value
    else:
        print(f"Key {key} not found.")
        return None

if __name__ == '__main__':
    # 示例:压缩并存储数据,然后获取并解压缩数据
    big_value = "This is a very long string that we want to compress. " * 1000 # 模拟一个大的字符串
    compress_and_set(key='big_key', value=big_value)
    retrieved_value = get_and_decompress(key='big_key')
    #print(retrieved_value)
  1. 使用更高效的数据结构:

有时候,使用不合适的数据结构也会导致大Key。比如,如果你的应用程序需要存储大量的键值对,但是键的类型比较简单,可以使用String类型的Key,将键值对序列化成JSON字符串存储。

总结:

大Key是Redis的性能杀手,需要及时发现和优化。优化的方法有很多,需要根据具体情况选择合适的方法。

咱们来总结一下今天讲的内容:

问题 解决方案
如何发现大Key redis-cli --bigkeys 命令、redis-rdb-tools 工具、SCAN 命令、监控系统
如何优化大Key 拆分、过期、异步删除、数据压缩、使用更高效的数据结构
String类型大Key 拆分成多个String,或使用压缩存储
Hash类型大Key 拆分成多个Hash,或者将Hash结构转换成String结构存储
List类型大Key 拆分成多个List,或者将List结构转换成String结构存储
Set类型大Key 拆分成多个Set
ZSet类型大Key 拆分成多个ZSet

希望今天的讲座能帮助大家更好地管理Redis,让你的Redis服务跑得更快、更稳! 谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注