Python中的时间序列异常检测:基于自回归模型与深度学习的算法

Python时间序列异常检测:基于自回归模型与深度学习的算法

大家好,今天我们来聊聊时间序列的异常检测,重点关注两种方法:基于自回归模型的方法和基于深度学习的方法。时间序列异常检测在很多领域都有应用,例如金融风控、网络安全、设备维护等等。我们的目标是识别出与正常模式显著不同的数据点,这些数据点可能预示着潜在的问题或者机会。

1. 时间序列异常检测概述

在深入具体算法之前,我们先简单回顾一下时间序列异常检测的基本概念。

什么是时间序列?

时间序列是按照时间顺序排列的一系列数据点。例如,股票价格、服务器 CPU 使用率、传感器读数等。

什么是异常?

异常(Anomaly),也称为离群点(Outlier),是指与时间序列中的其他数据点显著不同的数据点。异常可能是由各种因素引起的,例如设备故障、人为错误、欺诈行为等等。

异常检测的类型:

  • 点异常(Point Anomaly): 单个数据点与整体序列模式明显不同。
  • 上下文异常(Contextual Anomaly): 数据点本身在全局范围内可能并不异常,但在特定的时间上下文中显得异常。例如,在夏季销售额很高是正常的,但在冬季销售额很高可能就是一个异常。
  • 集体异常(Collective Anomaly): 一组数据点共同构成一个异常,即使单个数据点本身并不异常。例如,一段时间内网络流量突然持续增加。

异常检测的基本方法:

  • 基于统计的方法: 利用统计模型拟合时间序列数据,然后将与模型预测值偏差较大的数据点标记为异常。
  • 基于机器学习的方法: 利用机器学习算法训练模型,学习正常数据的模式,然后将与模型预测不符的数据点标记为异常。
  • 基于深度学习的方法: 利用深度学习模型自动学习时间序列的复杂模式,然后将与模型预测不符的数据点标记为异常。

2. 基于自回归模型的异常检测

自回归模型(AR)是一种常用的时间序列模型,它假设当前时刻的值与过去若干时刻的值之间存在线性关系。我们可以利用 AR 模型预测时间序列的未来值,然后将实际值与预测值进行比较,从而检测异常。

2.1 AR 模型简介

AR(p) 模型表示当前时刻的值 $yt$ 与过去 p 个时刻的值 $y{t-1}, y{t-2}, …, y{t-p}$ 之间存在线性关系:

$y_t = c + phi1 y{t-1} + phi2 y{t-2} + … + phip y{t-p} + epsilon_t$

其中:

  • $c$ 是常数项。
  • $phi_1, phi_2, …, phi_p$ 是自回归系数。
  • $epsilon_t$ 是白噪声,表示随机误差。

2.2 AR 模型参数估计

我们可以使用多种方法估计 AR 模型的参数,例如:

  • Yule-Walker 方程: 基于样本自相关函数求解线性方程组。
  • 最小二乘法: 最小化预测误差的平方和。

在 Python 中,我们可以使用 statsmodels 库来估计 AR 模型的参数:

import pandas as pd
from statsmodels.tsa.ar_model import AutoReg
from sklearn.metrics import mean_squared_error
import numpy as np

# 创建一个示例时间序列
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
series = pd.Series(data)

# 设置滞后阶数 (p)
p = 3

# 将数据分为训练集和测试集
train_data = series[:-5]
test_data = series[-5:]

# 训练 AR 模型
model = AutoReg(train_data, lags=p)
model_fit = model.fit()

# 打印模型参数
print('Coefficients: %s' % model_fit.params)

# 进行预测
predictions = model_fit.predict(start=len(train_data), end=len(series)-1)

# 评估模型
rmse = np.sqrt(mean_squared_error(test_data, predictions))
print('RMSE: %.3f' % rmse)

# 可视化结果
import matplotlib.pyplot as plt

plt.plot(series, label='Original')
plt.plot(predictions, label='Predictions')
plt.legend()
plt.show()

2.3 异常检测方法

基于 AR 模型的异常检测方法的基本思路是:

  1. 使用历史数据训练 AR 模型。
  2. 利用训练好的 AR 模型预测未来值。
  3. 计算实际值与预测值之间的残差(Residual)。
  4. 如果残差超过预定义的阈值,则将该数据点标记为异常。

残差分析:

残差是实际值与预测值之间的差异。在正常的序列中,残差应该服从均值为 0 的正态分布。如果残差显著偏离 0,或者残差的分布不符合正态分布,则可能存在异常。

阈值设定:

阈值的设定是一个关键问题。常用的方法包括:

  • 固定阈值: 根据经验或者领域知识设定一个固定的阈值。
  • 基于标准差的阈值: 计算残差的标准差,然后将阈值设定为 $k times sigma$,其中 $k$ 是一个常数,通常取 2 或 3。
  • 基于百分位数的阈值: 将阈值设定为残差的某个百分位数。
# 异常检测函数
def detect_anomalies_ar(series, p, threshold):
    train_data = series[:-5]
    test_data = series[-5:]
    model = AutoReg(train_data, lags=p)
    model_fit = model.fit()
    predictions = model_fit.predict(start=len(train_data), end=len(series)-1)
    residuals = test_data - predictions

    anomalies = []
    for i, residual in enumerate(residuals):
        if abs(residual) > threshold:
            anomalies.append(len(train_data) + i)  # 记录异常点的索引

    return anomalies, predictions

# 设置阈值 (基于标准差)
residuals = series[-5:] - model_fit.predict(start=len(series)-5, end=len(series)-1) #必须使用模型的预测值,如果用上面的predictions,会因为test_data和predictions的长度不一致报错
threshold = 2 * np.std(residuals) #注意这里标准差的计算,需要使用测试集的残差来计算标准差
print(f"Threshold: {threshold}")

# 检测异常
anomalies, predictions = detect_anomalies_ar(series, p, threshold)
print(f"Anomalies found at indices: {anomalies}")

# 可视化结果
plt.plot(series, label='Original')
plt.plot(predictions, label='Predictions')
plt.scatter(anomalies, series[anomalies], color='red', label='Anomalies')  # 标记异常点
plt.legend()
plt.show()

2.4 AR 模型的优缺点

优点:

  • 简单易懂,易于实现。
  • 计算效率高。
  • 适用于线性时间序列。

缺点:

  • 对非线性时间序列效果较差。
  • 需要手动选择滞后阶数 p,选择不当会影响模型性能。
  • 对阈值的选择比较敏感。

3. 基于深度学习的异常检测

深度学习模型具有强大的特征学习能力,可以自动学习时间序列的复杂模式,因此在时间序列异常检测领域得到了广泛应用。

3.1 常用的深度学习模型

  • 循环神经网络(RNN): RNN 擅长处理序列数据,可以捕捉时间序列中的长期依赖关系。常用的 RNN 变体包括 LSTM 和 GRU。
  • 自编码器(Autoencoder): 自编码器是一种无监督学习模型,可以学习将输入数据压缩成低维表示,然后再从低维表示重构回原始数据。异常数据通常难以被重构,因此可以利用重构误差来检测异常。
  • 卷积神经网络(CNN): CNN 主要用于图像处理,但也可以应用于时间序列异常检测。可以将时间序列数据看作一维图像,然后使用 CNN 提取特征。

3.2 基于 LSTM 的异常检测

LSTM (Long Short-Term Memory) 是一种特殊的 RNN,可以有效解决 RNN 中的梯度消失问题,更好地捕捉时间序列中的长期依赖关系。

模型结构:

  1. LSTM 层: 用于学习时间序列的模式。
  2. 全连接层: 用于将 LSTM 层的输出映射到预测值。

训练方法:

使用历史数据训练 LSTM 模型,目标是最小化预测误差。

异常检测方法:

  1. 使用训练好的 LSTM 模型预测未来值。
  2. 计算实际值与预测值之间的残差。
  3. 如果残差超过预定义的阈值,则将该数据点标记为异常。
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# 创建示例时间序列数据
data = np.sin(np.linspace(0, 10 * np.pi, 200)) + np.random.normal(0, 0.1, 200)
series = pd.Series(data)

# 数据预处理
scaler = MinMaxScaler()
series_scaled = scaler.fit_transform(series.values.reshape(-1, 1))

# 创建训练数据集
def create_dataset(series, look_back=1):
    X, Y = [], []
    for i in range(len(series) - look_back - 1):
        a = series[i:(i + look_back), 0]
        X.append(a)
        Y.append(series[i + look_back, 0])
    return np.array(X), np.array(Y)

look_back = 10
X, Y = create_dataset(series_scaled, look_back)

# 划分训练集和测试集
train_size = int(len(X) * 0.8)
test_size = len(X) - train_size
X_train, X_test = X[0:train_size,:], X[train_size:len(X),:]
Y_train, Y_test = Y[0:train_size], Y[train_size:len(X)]

# reshape input to be [samples, time steps, features] which is required for LSTM
X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))
X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))

# 构建 LSTM 模型
model = Sequential()
model.add(LSTM(50, input_shape=(1, look_back)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')

# 训练模型
model.fit(X_train, Y_train, epochs=50, batch_size=1, verbose=0)

# 预测
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

# 逆缩放
train_predict = scaler.inverse_transform(train_predict)
Y_train = scaler.inverse_transform(Y_train.reshape(-1,1))
test_predict = scaler.inverse_transform(test_predict)
Y_test = scaler.inverse_transform(Y_test.reshape(-1,1))

# 计算 RMSE
train_rmse = np.sqrt(mean_squared_error(Y_train, train_predict))
test_rmse = np.sqrt(mean_squared_error(Y_test, test_predict))
print(f'Train RMSE: {train_rmse:.3f}')
print(f'Test RMSE: {test_rmse:.3f}')

# 异常检测
def detect_anomalies_lstm(predictions, actual, threshold):
    residuals = actual - predictions
    anomalies = np.where(np.abs(residuals) > threshold)[0] + train_size + look_back + 1 #index adjust
    return anomalies.tolist()

# 设置阈值
threshold = 2 * np.std(Y_test - test_predict)  # 基于测试集残差的标准差
print(f"Threshold: {threshold}")

# 检测异常
anomalies = detect_anomalies_lstm(test_predict, Y_test, threshold)
print(f"Anomalies found at indices: {anomalies}")

# 可视化结果
plt.plot(series, label='Original')
# Adjust the index for plotting.  Predictions start at the end of the lookback window.
train_predict_plot = np.empty_like(series)
train_predict_plot[:] = np.nan
train_predict_plot[look_back+1:len(train_predict)+look_back+1] = train_predict[:, 0] #shift the index to match the original
plt.plot(train_predict_plot, label='Train Predictions')

test_predict_plot = np.empty_like(series)
test_predict_plot[:] = np.nan
test_predict_plot[len(train_predict)+look_back+1+1:len(series)] = test_predict[:, 0] #shift the index to match the original
plt.plot(test_predict_plot, label='Test Predictions')

plt.scatter(anomalies, series[anomalies], color='red', label='Anomalies')
plt.legend()
plt.show()

3.3 基于 Autoencoder 的异常检测

自编码器是一种无监督学习模型,它学习将输入数据压缩成低维表示,然后再从低维表示重构回原始数据。 异常数据通常难以被重构,因此可以利用重构误差来检测异常。

模型结构:

  1. 编码器(Encoder): 将输入数据压缩成低维表示。
  2. 解码器(Decoder): 从低维表示重构回原始数据。

训练方法:

使用历史数据训练自编码器,目标是最小化重构误差。

异常检测方法:

  1. 使用训练好的自编码器重构输入数据。
  2. 计算重构误差。
  3. 如果重构误差超过预定义的阈值,则将该数据点标记为异常。
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# 创建示例时间序列数据
data = np.sin(np.linspace(0, 10 * np.pi, 200)) + np.random.normal(0, 0.1, 200)
series = pd.Series(data)

# 数据预处理
scaler = MinMaxScaler()
series_scaled = scaler.fit_transform(series.values.reshape(-1, 1))

# 构建 Autoencoder 模型
model = Sequential()
model.add(Dense(10, activation='relu', input_shape=(1,)))
model.add(Dense(5, activation='relu'))
model.add(Dense(10, activation='relu'))
model.add(Dense(1, activation='linear'))  # Linear activation for output

# 编译模型
model.compile(optimizer='adam', loss='mse')

# 划分训练集和测试集
train_size = int(len(series_scaled) * 0.8)
train_data = series_scaled[:train_size]
test_data = series_scaled[train_size:]

# 训练模型
model.fit(train_data, train_data, epochs=50, batch_size=1, verbose=0)

# 预测 (重构)
train_predictions = model.predict(train_data)
test_predictions = model.predict(test_data)

# 逆缩放
train_predictions = scaler.inverse_transform(train_predictions)
test_predictions = scaler.inverse_transform(test_predictions)
train_data_original = scaler.inverse_transform(train_data)
test_data_original = scaler.inverse_transform(test_data)

# 计算 RMSE
train_rmse = np.sqrt(mean_squared_error(train_data_original, train_predictions))
test_rmse = np.sqrt(mean_squared_error(test_data_original, test_predictions))
print(f'Train RMSE: {train_rmse:.3f}')
print(f'Test RMSE: {test_rmse:.3f}')

# 异常检测
def detect_anomalies_autoencoder(predictions, actual, threshold):
    residuals = actual - predictions
    anomalies = np.where(np.abs(residuals) > threshold)[0] + train_size
    return anomalies.tolist()

# 设置阈值
threshold = 2 * np.std(test_data_original - test_predictions)  # 基于测试集残差的标准差
print(f"Threshold: {threshold}")

# 检测异常
anomalies = detect_anomalies_autoencoder(test_predictions, test_data_original, threshold)
print(f"Anomalies found at indices: {anomalies}")

# 可视化结果
plt.plot(series, label='Original')
plt.plot(np.arange(len(train_predictions)), train_predictions, label='Train Predictions')
plt.plot(np.arange(len(train_predictions), len(series)), test_predictions, label='Test Predictions')
plt.scatter(anomalies, series[anomalies], color='red', label='Anomalies')
plt.legend()
plt.show()

3.4 深度学习模型的优缺点

优点:

  • 可以自动学习时间序列的复杂模式。
  • 对非线性时间序列效果较好。
  • 可以处理高维时间序列数据。

缺点:

  • 需要大量的训练数据。
  • 计算成本高。
  • 模型结构复杂,难以解释。
  • 容易过拟合。

4. 总结选择合适的异常检测方法

我们讨论了基于自回归模型和基于深度学习的时间序列异常检测方法。每种方法都有其优缺点,选择哪种方法取决于具体的应用场景和数据特点。

方法 优点 缺点 适用场景
AR 模型 简单易懂,计算效率高,适用于线性时间序列 对非线性时间序列效果差,需手动选择滞后阶数,对阈值敏感 线性时间序列,对实时性要求高的场景
LSTM 模型 可以自动学习复杂模式,对非线性时间序列效果好 需要大量数据,计算成本高,模型复杂,易过拟合 非线性时间序列,对精度要求高的场景
Autoencoder 模型 可以自动学习复杂模式,适用于无监督学习 需要大量数据,计算成本高,模型复杂,易过拟合 无监督学习场景,对数据分布没有先验知识的场景

5. 未来研究方向

时间序列异常检测是一个活跃的研究领域,未来有以下几个值得关注的方向:

  • 可解释性: 如何提高深度学习模型的透明度和可解释性,让人们更好地理解异常的原因。
  • 自适应性: 如何让模型能够自动适应时间序列的变化,提高模型的鲁棒性。
  • 多模态数据融合: 如何将时间序列数据与其他类型的数据(例如文本、图像)进行融合,提高异常检测的准确率。
  • 小样本学习: 如何在数据量较少的情况下,训练出有效的异常检测模型。

希望今天的分享对大家有所帮助。谢谢!

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注