ML Recommender System架构:双塔模型、召回与排序阶段的Python实现
大家好,今天我们来深入探讨机器学习驱动的推荐系统架构,重点关注双塔模型以及召回和排序这两个关键阶段,并通过Python代码进行实践。
一、推荐系统架构概述
推荐系统旨在根据用户的历史行为和偏好,为用户推荐其可能感兴趣的物品。一个典型的推荐系统架构通常包含以下几个主要阶段:
-
数据收集与处理: 收集用户行为数据(例如点击、购买、评分等)以及物品的属性数据(例如类别、价格、描述等)。进行数据清洗、预处理和特征工程。
-
召回(Retrieval): 从海量物品库中快速筛选出用户可能感兴趣的一小部分候选物品。目标是覆盖尽可能多的相关物品,牺牲一定的精度。常见的召回方法包括:
- 协同过滤(Collaborative Filtering)
- 基于内容的推荐(Content-Based Recommendation)
- 基于标签的推荐(Tag-Based Recommendation)
- 双塔模型(Two-Tower Model)
-
排序(Ranking): 对召回阶段筛选出的候选物品进行精细化排序,预测用户对每个物品的偏好程度。目标是提高推荐的准确性。常见的排序模型包括:
- 逻辑回归(Logistic Regression)
- 梯度提升树(Gradient Boosting Decision Tree,GBDT)
- 深度神经网络(Deep Neural Network,DNN)
- Wide & Deep 模型
-
重排序(Re-ranking): 对排序结果进行进一步调整,例如考虑多样性、新颖性、公平性等因素。
-
展示(Display): 将最终的推荐结果展示给用户。
今天我们将重点关注双塔模型在召回阶段的应用,以及排序阶段的实现。
二、双塔模型(Two-Tower Model)
双塔模型是一种常用的召回模型,其核心思想是将用户和物品分别映射到同一个低维向量空间中,然后通过计算用户向量和物品向量的相似度来衡量用户对该物品的偏好程度。
- 用户塔(User Tower): 负责将用户特征映射成用户向量。
- 物品塔(Item Tower): 负责将物品特征映射成物品向量。
2.1 双塔模型架构
+--------------------+ +--------------------+
| User Tower | | Item Tower |
+--------------------+ +--------------------+
| User Features | | Item Features |
| | | | | |
| Embedding Layer | | Embedding Layer |
| | | | | |
| MLP/DNN Layers | | MLP/DNN Layers |
| | | | | |
| User Embedding | | Item Embedding |
+--------------------+ +--------------------+
| |
+------- Dot Product -------+
|
Similarity Score
2.2 双塔模型优势
- 高效性: 可以预先计算所有物品的向量,并存储在索引中,在线服务时只需要计算用户向量,然后与物品向量进行快速相似度匹配。
- 可扩展性: 用户塔和物品塔可以独立训练,方便新增用户和物品。
- 灵活性: 用户塔和物品塔可以使用不同的特征和模型结构。
2.3 双塔模型训练
双塔模型通常采用pairwise learning或者negative sampling的方式进行训练。
- Pairwise Learning: 对于每个正样本(用户点击过的物品),随机选择一个负样本(用户未点击过的物品),然后训练模型,使得正样本的相似度高于负样本的相似度。
- Negative Sampling: 对于每个正样本,随机选择多个负样本,然后训练模型,使得正样本的相似度尽可能高,负样本的相似度尽可能低。
三、Python实现:双塔模型召回
我们使用TensorFlow/Keras来实现一个简单的双塔模型。
3.1 数据准备
假设我们有以下用户和物品数据:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
# 模拟用户数据
user_data = {
'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'age': [25, 30, 22, 35, 28, 40, 27, 32, 29, 31],
'gender': ['M', 'F', 'M', 'F', 'M', 'F', 'M', 'F', 'M', 'F'],
'city': ['Beijing', 'Shanghai', 'Guangzhou', 'Shanghai', 'Beijing', 'Guangzhou', 'Beijing', 'Shanghai', 'Guangzhou', 'Beijing']
}
user_df = pd.DataFrame(user_data)
# 模拟物品数据
item_data = {
'item_id': [101, 102, 103, 104, 105, 106, 107, 108, 109, 110],
'category': ['Movie', 'Book', 'Movie', 'Book', 'Movie', 'Book', 'Movie', 'Book', 'Movie', 'Book'],
'price': [20, 30, 25, 35, 22, 32, 28, 38, 26, 34]
}
item_df = pd.DataFrame(item_data)
# 模拟用户-物品交互数据(点击)
interaction_data = {
'user_id': [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10],
'item_id': [101, 102, 103, 104, 101, 105, 102, 106, 103, 107, 104, 108, 101, 109, 102, 110, 103, 105, 104, 106],
'rating': [5, 4, 3, 2, 4, 5, 2, 3, 5, 4, 3, 2, 4, 5, 2, 3, 5, 4, 3, 2] # 添加评分数据
}
interaction_df = pd.DataFrame(interaction_data)
# 合并数据
df = pd.merge(interaction_df, user_df, on='user_id')
df = pd.merge(df, item_df, on='item_id')
# 创建用户和物品ID的映射
user_ids = df["user_id"].unique().tolist()
item_ids = df["item_id"].unique().tolist()
user_id_mapping = {id: i for i, id in enumerate(user_ids)}
item_id_mapping = {id: i for i, id in enumerate(item_ids)}
df["user_id_encoded"] = df["user_id"].map(user_id_mapping)
df["item_id_encoded"] = df["item_id"].map(item_id_mapping)
# 划分训练集和测试集
train, test = train_test_split(df, test_size=0.2, random_state=42)
NUM_USERS = len(user_ids)
NUM_ITEMS = len(item_ids)
print("Number of users:", NUM_USERS)
print("Number of items:", NUM_ITEMS)
3.2 模型定义
EMBEDDING_DIM = 64
# 用户塔
def create_user_tower(num_users, embedding_dim):
user_input = layers.Input(shape=(1,), name='user_input')
embedding = layers.Embedding(num_users, embedding_dim, name='user_embedding')(user_input)
flatten = layers.Flatten()(embedding)
dense1 = layers.Dense(128, activation='relu')(flatten)
dense2 = layers.Dense(embedding_dim, activation='relu')(dense1) # 输出维度等于embedding_dim
return models.Model(inputs=user_input, outputs=dense2)
# 物品塔
def create_item_tower(num_items, embedding_dim):
item_input = layers.Input(shape=(1,), name='item_input')
embedding = layers.Embedding(num_items, embedding_dim, name='item_embedding')(item_input)
flatten = layers.Flatten()(embedding)
dense1 = layers.Dense(128, activation='relu')(flatten)
dense2 = layers.Dense(embedding_dim, activation='relu')(dense1) # 输出维度等于embedding_dim
return models.Model(inputs=item_input, outputs=dense2)
user_tower = create_user_tower(NUM_USERS, EMBEDDING_DIM)
item_tower = create_item_tower(NUM_ITEMS, EMBEDDING_DIM)
# 创建双塔模型
user_input = layers.Input(shape=(1,), name='user_id')
item_input = layers.Input(shape=(1,), name='item_id')
user_embedding = user_tower(user_input)
item_embedding = item_tower(item_input)
# 计算相似度
dot_product = layers.Dot(axes=1)([user_embedding, item_embedding]) # 修改为计算点积
output = layers.Activation('sigmoid')(dot_product) # 使用sigmoid激活函数
model = models.Model(inputs=[user_input, item_input], outputs=output)
model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy']) # 修改为二元交叉熵损失
print(model.summary())
3.3 模型训练
BATCH_SIZE = 64
EPOCHS = 10
# 准备训练数据
user_ids_train = train["user_id_encoded"].values
item_ids_train = train["item_id_encoded"].values
ratings_train = train["rating"].apply(lambda x: 1 if x > 3 else 0).values # 将评分转化为二元标签
# 准备测试数据
user_ids_test = test["user_id_encoded"].values
item_ids_test = test["item_id_encoded"].values
ratings_test = test["rating"].apply(lambda x: 1 if x > 3 else 0).values # 将评分转化为二元标签
# 训练模型
model.fit(
x=[user_ids_train, item_ids_train],
y=ratings_train,
batch_size=BATCH_SIZE,
epochs=EPOCHS,
validation_data=([user_ids_test, item_ids_test], ratings_test)
)
3.4 召回
# 获取所有物品的向量表示
item_embeddings = item_tower.predict(np.array(range(NUM_ITEMS)))
def recommend(user_id, top_n=10):
# 获取用户向量
user_encoded = user_id_mapping[user_id]
user_embedding = user_tower.predict(np.array([user_encoded]))
# 计算用户向量和所有物品向量的相似度
similarities = np.dot(user_embedding, item_embeddings.T).flatten()
# 获取相似度最高的top_n个物品
top_item_indices = np.argsort(similarities)[::-1][:top_n]
# 将物品索引转换为物品ID
recommended_item_ids = [list(item_id_mapping.keys())[list(item_id_mapping.values()).index(i)] for i in top_item_indices]
return recommended_item_ids
# 为用户1推荐10个物品
recommendations = recommend(1, top_n=10)
print(f"为用户1推荐的物品:{recommendations}")
四、Python实现:排序阶段
在召回阶段,我们筛选出了一部分候选物品。接下来,我们使用排序模型对这些候选物品进行精细化排序。这里我们使用一个简单的Wide & Deep模型作为示例。
4.1 特征工程
from sklearn.preprocessing import LabelEncoder, StandardScaler
# 对类别特征进行编码
le_gender = LabelEncoder()
df['gender_encoded'] = le_gender.fit_transform(df['gender'])
le_category = LabelEncoder()
df['category_encoded'] = le_category.fit_transform(df['category'])
# 对数值特征进行标准化
scaler = StandardScaler()
df[['age', 'price']] = scaler.fit_transform(df[['age', 'price']])
# 划分训练集和测试集
train, test = train_test_split(df, test_size=0.2, random_state=42)
# 定义特征列
wide_features = ['user_id_encoded', 'item_id_encoded', 'gender_encoded', 'category_encoded']
deep_features = ['age', 'price']
X_train_wide = train[wide_features].values
X_train_deep = train[deep_features].values
y_train = train['rating'].apply(lambda x: 1 if x > 3 else 0).values
X_test_wide = test[wide_features].values
X_test_deep = test[deep_features].values
y_test = test['rating'].apply(lambda x: 1 if x > 3 else 0).values
4.2 模型定义:Wide & Deep 模型
def create_wide_deep_model(num_wide_features, num_deep_features):
# Wide 部分
wide_input = layers.Input(shape=(num_wide_features,), name='wide_input')
wide_output = layers.Dense(1, activation='sigmoid')(wide_input)
# Deep 部分
deep_input = layers.Input(shape=(num_deep_features,), name='deep_input')
deep_dense1 = layers.Dense(64, activation='relu')(deep_input)
deep_dense2 = layers.Dense(32, activation='relu')(deep_dense1)
deep_output = layers.Dense(1, activation='sigmoid')(deep_dense2)
# 合并 Wide 和 Deep 部分
merged_output = layers.concatenate([wide_output, deep_output])
final_output = layers.Dense(1, activation='sigmoid')(merged_output) # 修改:添加一层全连接层
model = models.Model(inputs=[wide_input, deep_input], outputs=final_output)
model.compile(optimizer=optimizers.Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
return model
model_wd = create_wide_deep_model(len(wide_features), len(deep_features))
print(model_wd.summary())
4.3 模型训练与预测
model_wd.fit(
x=[X_train_wide, X_train_deep],
y=y_train,
batch_size=32,
epochs=10,
validation_data=([X_test_wide, X_test_deep], y_test)
)
# 预测
predictions = model_wd.predict([X_test_wide, X_test_deep])
#print(predictions)
#评估模型
_, accuracy = model_wd.evaluate([X_test_wide, X_test_deep], y_test)
print('Accuracy: %.2f' % (accuracy*100))
4.4 整合召回和排序
现在,我们将召回阶段的结果作为排序阶段的输入。
def recommend_and_rank(user_id, top_n_recall=20, top_n_rank=10):
# 召回阶段:获取候选物品
candidate_item_ids = recommend(user_id, top_n=top_n_recall)
# 准备排序数据
user_data = df[df['user_id'] == user_id].iloc[0] # 获取用户数据
rank_data = []
for item_id in candidate_item_ids:
item_data = df[df['item_id'] == item_id].iloc[0] # 获取物品数据
rank_data.append({
'user_id_encoded': user_data['user_id_encoded'],
'item_id_encoded': item_data['item_id_encoded'],
'gender_encoded': user_data['gender_encoded'],
'category_encoded': item_data['category_encoded'],
'age': user_data['age'],
'price': item_data['price'],
'item_id': item_id # 保存item_id
})
rank_df = pd.DataFrame(rank_data)
X_rank_wide = rank_df[wide_features].values
X_rank_deep = rank_df[deep_features].values
# 排序
rank_scores = model_wd.predict([X_rank_wide, X_rank_deep]).flatten()
# 获取排序后的物品ID
ranked_item_indices = np.argsort(rank_scores)[::-1][:top_n_rank]
ranked_item_ids = [rank_df['item_id'].iloc[i] for i in ranked_item_indices]
return ranked_item_ids
# 为用户1推荐10个物品(经过召回和排序)
final_recommendations = recommend_and_rank(1, top_n_recall=20, top_n_rank=10)
print(f"为用户1最终推荐的物品:{final_recommendations}")
五、模型评估与优化
- 离线评估: 使用AUC、Precision@K、Recall@K、NDCG@K等指标评估模型的性能。
- 在线评估: 通过A/B测试等方式评估模型在真实环境中的效果。
- 优化方向:
- 特征工程:挖掘更多有用的特征。
- 模型结构:尝试更复杂的模型结构,例如深度神经网络。
- 训练方法:使用更有效的训练方法,例如负采样、知识蒸馏。
- 超参数调优:使用网格搜索、贝叶斯优化等方法优化模型的超参数。
六、实际应用中的考量
- 冷启动问题: 对于新用户和新物品,由于缺乏历史数据,推荐效果可能较差。可以采用一些冷启动策略,例如基于人口统计信息的推荐、基于内容的推荐等。
- 多样性: 推荐结果应该具有一定的多样性,避免过度集中于少数几个物品。
- 新颖性: 推荐结果应该包含一些用户未曾见过的物品,避免用户感到厌倦。
- 可解释性: 推荐结果应该具有一定的可解释性,让用户了解推荐的原因。
- 实时性: 推荐系统应该能够根据用户的实时行为进行调整。
双塔模型召回和Wide&Deep排序模型的实践
我们一起构建了一个简单的双塔模型进行召回,并使用 Wide & Deep 模型进行排序。需要注意的是,这只是一个基础示例,实际应用中需要根据具体业务场景进行调整和优化。可以通过调整模型结构、优化特征工程、使用更先进的训练方法等方式来提高推荐效果。
更多IT精英技术系列讲座,到智猿学院