API接口的Token刷新机制:设计JWT的Refresh Token与黑名单管理

API接口的Token刷新机制:设计JWT的Refresh Token与黑名单管理

大家好,今天我们来深入探讨API接口的Token刷新机制,重点是如何利用JWT(JSON Web Token)的Refresh Token以及黑名单管理来保障系统的安全性和用户体验。

1. 为什么需要Token刷新机制?

在基于Token的身份认证系统中,通常的做法是用户登录后,服务器会颁发一个Token(例如JWT)给客户端,客户端在后续的请求中携带这个Token,服务器验证Token的有效性,从而确认用户的身份。

然而,Token的有效期是一个关键问题。如果Token有效期太长,一旦Token泄露,风险就会很大。如果Token有效期太短,用户频繁地需要重新登录,体验会很差。

因此,我们需要一种机制,既能保证安全性,又能兼顾用户体验。这就是Token刷新机制的目的。

2. JWT(JSON Web Token)简介

在深入Refresh Token之前,我们先简单回顾一下JWT。JWT是一个开放标准(RFC 7519),它定义了一种紧凑且自包含的方式,用于在各方之间安全地传输信息,作为JSON对象。

JWT通常包含三部分:

  • Header(头部): 定义了Token的类型(通常是"JWT")和使用的签名算法(例如"HS256")。
  • Payload(载荷): 包含了声明(claims),这些声明是关于用户或其他数据的断言。例如,用户ID、用户名、权限等。
  • Signature(签名): 使用Header中指定的算法,对Header和Payload进行签名,防止篡改。

一个典型的JWT如下:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

3. Refresh Token机制

Refresh Token机制的核心思想是:颁发两种Token:

  • Access Token: 有效期较短,用于正常的API请求。
  • Refresh Token: 有效期较长,用于在Access Token过期后,获取新的Access Token。

流程如下:

  1. 用户登录成功后,服务器颁发Access Token和Refresh Token给客户端。
  2. 客户端在每次API请求中携带Access Token。
  3. 当Access Token过期时,客户端使用Refresh Token向服务器请求新的Access Token。
  4. 服务器验证Refresh Token的有效性,如果有效,则颁发新的Access Token和Refresh Token(可选,可以复用之前的Refresh Token)。
  5. 如果Refresh Token无效,则要求用户重新登录。

3.1 实现示例(Python + Flask):

首先,我们需要安装必要的库:

pip install Flask PyJWT

然后,定义一个简单的Flask应用:

import jwt
import datetime
from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'  # 生产环境务必使用强密钥

# 模拟数据库存储用户和refresh token
users = {
    "admin": "password"
}
refresh_tokens = {}

def generate_access_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=15)  # Access Token有效期15分钟
    }
    return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

def generate_refresh_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)  # Refresh Token有效期30天
    }
    return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

def verify_token(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            return jsonify({'message': 'Token is missing'}), 401

        try:
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            current_user = data['user_id']  # 这里假设user_id是唯一的
        except jwt.ExpiredSignatureError:
            return jsonify({'message': 'Token has expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'message': 'Invalid token'}), 401

        return f(current_user, *args, **kwargs)

    return decorated

@app.route('/login', methods=['POST'])
def login():
    auth = request.authorization

    if not auth or not auth.username or not auth.password:
        return jsonify({'message': 'Could not verify'}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}

    if auth.username in users and users[auth.username] == auth.password:
        access_token = generate_access_token(auth.username)
        refresh_token = generate_refresh_token(auth.username)
        refresh_tokens[auth.username] = refresh_token #Store refresh token
        return jsonify({'access_token': access_token, 'refresh_token': refresh_token})

    return jsonify({'message': 'Invalid credentials'}), 401

@app.route('/refresh', methods=['POST'])
def refresh():
    refresh_token = request.json.get('refresh_token')
    if not refresh_token:
        return jsonify({'message': 'Refresh token is missing'}), 400

    try:
        data = jwt.decode(refresh_token, app.config['SECRET_KEY'], algorithms=['HS256'])
        user_id = data['user_id']
        #Check if the stored refresh token matches the one received
        if user_id in refresh_tokens and refresh_tokens[user_id] == refresh_token:
            access_token = generate_access_token(user_id)
            new_refresh_token = generate_refresh_token(user_id) # Generate a new refresh token
            refresh_tokens[user_id] = new_refresh_token # update stored refresh token
            return jsonify({'access_token': access_token, 'refresh_token': new_refresh_token})
        else:
            return jsonify({'message': 'Invalid refresh token'}), 401

    except jwt.ExpiredSignatureError:
        return jsonify({'message': 'Refresh token has expired, please login again'}), 401
    except jwt.InvalidTokenError:
        return jsonify({'message': 'Invalid refresh token'}), 401

@app.route('/protected', methods=['GET'])
@verify_token
def protected(current_user):
    return jsonify({'message': f'Hello, {current_user}! This is a protected resource.'})

if __name__ == '__main__':
    app.run(debug=True)

代码解释:

  • generate_access_token(user_id): 生成Access Token,有效期为15分钟。
  • generate_refresh_token(user_id): 生成Refresh Token,有效期为30天。
  • verify_token(f): 一个装饰器,用于验证Access Token的有效性。
  • /login: 登录接口,验证用户名和密码,如果成功,则颁发Access Token和Refresh Token。
  • /refresh: 刷新Token接口,验证Refresh Token的有效性,如果有效,则颁发新的Access Token和Refresh Token。
  • /protected: 一个受保护的接口,需要携带有效的Access Token才能访问。

重要提示:

  • 在生产环境中,务必使用强密钥,并将其存储在安全的地方。
  • 上述示例中,Refresh Token存储在内存中,在生产环境中,应该将其存储在数据库中,并与用户ID关联。
  • 每次刷新Token时,都应该生成新的Refresh Token,并更新数据库中的值,防止Refresh Token被滥用。

3.2 客户端实现(JavaScript):

// 假设我们已经获取到 access_token 和 refresh_token
let accessToken = localStorage.getItem('access_token');
let refreshToken = localStorage.getItem('refresh_token');

// 定时检查 access_token 是否过期
setInterval(() => {
  if (isAccessTokenExpired(accessToken)) {
    // 调用 refresh 接口获取新的 access_token
    fetch('/refresh', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ refresh_token: refreshToken })
    })
    .then(response => response.json())
    .then(data => {
      if (data.access_token && data.refresh_token) {
        accessToken = data.access_token;
        refreshToken = data.refresh_token;
        localStorage.setItem('access_token', accessToken);
        localStorage.setItem('refresh_token', refreshToken);
        console.log('Access token refreshed successfully!');
      } else {
        console.error('Failed to refresh access token:', data.message);
        // 强制用户重新登录
        window.location.href = '/login';
      }
    })
    .catch(error => {
      console.error('Error refreshing access token:', error);
      // 强制用户重新登录
      window.location.href = '/login';
    });
  }
}, 60 * 1000); // 每分钟检查一次

function isAccessTokenExpired(token) {
  if (!token) return true; //如果token为空,则认为过期
  try {
    const payloadBase64 = token.split('.')[1];
    const payloadJson = atob(payloadBase64);
    const payload = JSON.parse(payloadJson);
    const expiryTimestamp = payload.exp * 1000; // Convert to milliseconds

    return Date.now() >= expiryTimestamp;
  } catch (error) {
    console.error("Error decoding token:", error);
    return true; // If decoding fails, treat it as expired
  }
}

// 在请求 API 时,携带 access_token
function fetchData(url) {
  fetch(url, {
    headers: {
      'Authorization': accessToken
    }
  })
  .then(response => response.json())
  .then(data => {
    console.log(data);
  })
  .catch(error => {
    console.error('Error fetching data:', error);
  });
}

代码解释:

  • isAccessTokenExpired(token): 判断Access Token是否过期,通过解析JWT的Payload中的exp字段来判断。
  • setInterval: 定时检查Access Token是否过期,如果过期,则调用/refresh接口获取新的Access Token。
  • fetchData(url): 在请求API时,携带Access Token。

4. 黑名单管理

即使有了Refresh Token机制,仍然存在一些安全风险。例如,如果用户在多个设备上登录,并丢失了其中一个设备的Refresh Token,攻击者可以使用该Refresh Token来获取新的Access Token,从而冒充用户。

为了解决这个问题,我们需要引入黑名单管理。黑名单是一个存储已失效或被吊销的Token的列表。

4.1 实现思路

  1. 当用户注销登录时,将对应的Refresh Token加入黑名单。
  2. 当用户修改密码时,将所有的Refresh Token加入黑名单。
  3. 当检测到异常行为时(例如,在短时间内从多个不同的IP地址请求刷新Token),将相关的Refresh Token加入黑名单。
  4. /refresh接口中,先检查Refresh Token是否在黑名单中,如果在,则拒绝刷新Token的请求。

4.2 实现示例(Python + Flask):

import jwt
import datetime
from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'  # 生产环境务必使用强密钥

# 模拟数据库存储用户和refresh token
users = {
    "admin": "password"
}
refresh_tokens = {}
blacklist = set() # Use a set for efficient membership checking

def generate_access_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=15)  # Access Token有效期15分钟
    }
    return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

def generate_refresh_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)  # Refresh Token有效期30天
    }
    return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

def verify_token(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            return jsonify({'message': 'Token is missing'}), 401

        try:
            data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            current_user = data['user_id']  # 这里假设user_id是唯一的
        except jwt.ExpiredSignatureError:
            return jsonify({'message': 'Token has expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'message': 'Invalid token'}), 401

        return f(current_user, *args, **kwargs)

    return decorated

@app.route('/login', methods=['POST'])
def login():
    auth = request.authorization

    if not auth or not auth.username or not auth.password:
        return jsonify({'message': 'Could not verify'}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}

    if auth.username in users and users[auth.username] == auth.password:
        access_token = generate_access_token(auth.username)
        refresh_token = generate_refresh_token(auth.username)
        refresh_tokens[auth.username] = refresh_token  # Store refresh token
        return jsonify({'access_token': access_token, 'refresh_token': refresh_token})

    return jsonify({'message': 'Invalid credentials'}), 401

@app.route('/refresh', methods=['POST'])
def refresh():
    refresh_token = request.json.get('refresh_token')
    if not refresh_token:
        return jsonify({'message': 'Refresh token is missing'}), 400

    # Check if the refresh token is in the blacklist
    if refresh_token in blacklist:
        return jsonify({'message': 'Invalid refresh token'}), 401

    try:
        data = jwt.decode(refresh_token, app.config['SECRET_KEY'], algorithms=['HS256'])
        user_id = data['user_id']

        # Check if the stored refresh token matches the one received
        if user_id in refresh_tokens and refresh_tokens[user_id] == refresh_token:
            access_token = generate_access_token(user_id)
            new_refresh_token = generate_refresh_token(user_id)  # Generate a new refresh token
            refresh_tokens[user_id] = new_refresh_token  # Update stored refresh token
            return jsonify({'access_token': access_token, 'refresh_token': new_refresh_token})
        else:
            return jsonify({'message': 'Invalid refresh token'}), 401

    except jwt.ExpiredSignatureError:
        return jsonify({'message': 'Refresh token has expired, please login again'}), 401
    except jwt.InvalidTokenError:
        return jsonify({'message': 'Invalid refresh token'}), 401

@app.route('/logout', methods=['POST'])
def logout():
    refresh_token = request.json.get('refresh_token')
    if not refresh_token:
        return jsonify({'message': 'Refresh token is missing'}), 400

    # Add the refresh token to the blacklist
    blacklist.add(refresh_token)
    # Optionally, remove the refresh token from refresh_tokens dictionary
    # if it's not needed anymore

    return jsonify({'message': 'Successfully logged out'})

@app.route('/protected', methods=['GET'])
@verify_token
def protected(current_user):
    return jsonify({'message': f'Hello, {current_user}! This is a protected resource.'})

if __name__ == '__main__':
    app.run(debug=True)

代码解释:

  • blacklist = set(): 使用一个set来存储黑名单中的Refresh Token,set的查找效率很高。
  • /logout: 注销接口,将Refresh Token加入黑名单。
  • /refresh接口中,先检查Refresh Token是否在黑名单中。

重要提示:

  • 在生产环境中,黑名单应该存储在数据库中,并定期清理过期的Token。
  • 黑名单的大小可能会很大,因此需要考虑使用合适的数据结构和存储方式,以提高查找效率。可以使用Redis等缓存数据库来加速黑名单的查询。
  • 可以考虑使用JTI (JWT ID) claim,将每个refresh token都赋予一个唯一的ID,将此ID加入黑名单,而不是整个token。

5. 安全性考虑

  • 使用HTTPS: 确保所有的通信都使用HTTPS,防止Token被窃听。
  • 存储Refresh Token: Refresh Token应该存储在安全的地方,例如数据库中,并使用加密算法进行加密。
  • 防止CSRF攻击: 在刷新Token的接口中,需要防止CSRF攻击。
  • 监控和审计: 对Token的颁发、刷新和使用进行监控和审计,及时发现异常行为。
  • Token过期时间: Access Token的过期时间应该尽可能短,Refresh Token的过期时间可以稍长,但也要合理设置。
  • Rotate Refresh Token: 每次使用 Refresh Token 换取新的 Access Token 时,也同时颁发一个新的 Refresh Token。旧的 Refresh Token 立即失效。

6. Refresh Token管理策略对比

策略 优点 缺点 适用场景
一次性Refresh Token 每次使用后都失效,安全性高,可以有效防止Refresh Token泄露后的滥用。 实现复杂度较高,需要更频繁的交互,可能影响性能。 对安全性要求极高的场景,例如金融支付等。
固定Refresh Token 实现简单,易于管理。 安全性相对较低,一旦Refresh Token泄露,风险较高。 对安全性要求不高的场景,例如内部系统等。
Rotate Refresh Token 安全性较高,即使Refresh Token泄露,也能在下次使用时被检测出来并失效。 实现复杂度较高,需要维护Refresh Token的状态。 安全性要求较高的场景,但可以接受一定的复杂性。
短生命周期Refresh Token 降低了Refresh Token泄露后的风险窗口。 需要更频繁的刷新操作,可能影响性能。 希望在安全性和性能之间取得平衡的场景。
可撤销Refresh Token 可以随时撤销Refresh Token的权限,例如用户登出或修改密码时。 需要维护一个黑名单或类似的数据结构来存储已撤销的Refresh Token。 需要灵活控制用户权限的场景,例如用户管理系统等。

7. 总结

今天我们详细讨论了API接口的Token刷新机制,包括JWT的Refresh Token以及黑名单管理。通过合理的Refresh Token策略和黑名单机制,可以有效地提高系统的安全性和用户体验。希望今天的分享对大家有所帮助。

8. 关键点回顾

  • Refresh Token 用于在 Access Token 过期后获取新的 Access Token。
  • 黑名单用于存储已失效或被吊销的 Token,防止 Token 被滥用。
  • 在实际应用中,需要根据具体的业务场景选择合适的 Refresh Token 管理策略和黑名单实现方式。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注