Python实现因果图模型的贝叶斯网络:结构学习与参数推断的算法细节
大家好,今天我们来深入探讨如何使用Python实现因果图模型的贝叶斯网络,包括结构学习和参数推断两个关键环节。贝叶斯网络是一种强大的概率图模型,能够有效地表示变量之间的概率依赖关系,并进行推断。理解其内部机制对于实际应用至关重要。
一、贝叶斯网络基础
在深入算法细节之前,我们先回顾一下贝叶斯网络的基本概念。
-
有向无环图 (DAG): 贝叶斯网络的核心是一个有向无环图,其中节点代表变量,边代表变量之间的直接因果关系。箭头从“原因”指向“结果”。 无环保证了概率计算的合理性。
-
条件概率分布 (CPD): 每个节点都关联一个条件概率分布,描述了在给定其父节点状态下,该节点取值的概率。对于没有父节点的节点,CPD退化为先验概率分布。
-
联合概率分布: 贝叶斯网络定义了一个联合概率分布,可以分解为每个节点在其父节点条件下的条件概率的乘积:
P(X1, X2, …, Xn) = Π P(Xi | Parents(Xi))
其中,Parents(Xi)表示节点Xi的所有父节点。
二、结构学习:从数据中发现因果关系
结构学习的目标是根据观测数据,学习出最能描述变量之间依赖关系的DAG。这是一个具有挑战性的问题,因为可能的DAG结构的数量随变量数量呈指数增长。常用的结构学习方法包括基于约束的方法和基于评分的方法。
1. 基于约束的方法:PC算法
PC算法是一种经典的基于约束的结构学习算法。它的核心思想是利用条件独立性测试,逐步构建DAG。
-
算法流程:
-
骨架学习:
- 构造完全无向图。
- 移除所有变量对之间的边,如果它们是无条件独立的。
- 迭代地进行条件独立性测试,在逐渐增大的条件下(即条件变量的数量),移除更多的边。 例如,先测试X和Y是否独立,然后测试X和Y在Z的条件下是否独立,再测试在Z1和Z2的条件下是否独立,直到达到某个预设的最大条件集大小。
-
V-结构学习:
- 识别所有未碰撞的V-结构 (X -> Z <- Y),即X和Y不是直接相连的,但它们都指向Z。
- 将未碰撞的V-结构的方向确定为X -> Z <- Y,只要Z不是X和Y的共同原因。
-
方向传播:
- 应用一系列方向传播规则,尽可能地确定剩余边的方向,同时避免引入环或新的V-结构。
-
-
代码实现 (简化版):
import numpy as np
from scipy.stats import chi2_contingency
def conditional_independence_test(data, X, Y, Z, alpha=0.05):
"""
条件独立性测试 (卡方检验)
data: 数据集 (numpy array)
X, Y, Z: 变量索引 (列索引)
alpha: 显著性水平
"""
# 创建列联表
observed = np.zeros((len(np.unique(data[:, X])), len(np.unique(data[:, Y]))))
for i in range(len(data)):
observed[data[i, X], data[i, Y]] += 1
# 如果Z为空,则进行独立性测试
if not Z:
chi2, p, _, _ = chi2_contingency(observed)
else:
# 将Z离散化
z_values = np.unique(data[:, Z])
expected = np.zeros((len(np.unique(data[:, X])), len(np.unique(data[:, Y])), len(z_values)))
for k, z_val in enumerate(z_values):
data_subset = data[data[:, Z] == z_val]
observed_subset = np.zeros((len(np.unique(data[:, X])), len(np.unique(data_subset[:, Y]))))
for i in range(len(data_subset)):
observed_subset[data_subset[i, X], data_subset[i, Y]] += 1
chi2, p, _, _ = chi2_contingency(observed_subset)
if k == 0:
total_chi2 = chi2
else:
total_chi2 += chi2
# 计算自由度
dof = (len(np.unique(data[:, X])) - 1) * (len(np.unique(data[:, Y])) - 1) * len(z_values)
p = 1 - chi2.cdf(total_chi2, dof)
return p > alpha # 如果p值大于alpha,则认为X和Y在Z的条件下独立
def pc_algorithm(data, alpha=0.05, max_cond_size=3):
"""
PC算法 (简化版)
data: 数据集 (numpy array)
alpha: 显著性水平
max_cond_size: 最大条件集大小
"""
n_vars = data.shape[1]
# 1. 骨架学习
adjacency_matrix = np.ones((n_vars, n_vars), dtype=bool) # 初始为完全图
for i in range(n_vars):
adjacency_matrix[i, i] = False # 移除自环
for l in range(max_cond_size + 1): # 条件集大小
for X in range(n_vars):
for Y in range(X + 1, n_vars): # 避免重复检查
if adjacency_matrix[X, Y]:
# 寻找可能的条件集Z
neighbors_X = np.where(adjacency_matrix[X])[0]
possible_Z = [z for z in neighbors_X if z != Y] # X的邻居且不是Y
if len(possible_Z) >= l:
# 遍历所有大小为l的条件集
from itertools import combinations
for Z_set in combinations(possible_Z, l):
if conditional_independence_test(data, X, Y, list(Z_set), alpha):
adjacency_matrix[X, Y] = False
adjacency_matrix[Y, X] = False
print(f"移除边 {X}-{Y} 在条件 {Z_set} 下")
break # 如果找到一个条件集使得X和Y条件独立,则移除边
# 2. V-结构学习
directed_adjacency_matrix = np.zeros((n_vars, n_vars), dtype=int) # 0: 无边, 1: X->Y, -1: Y->X
for Z in range(n_vars):
parents = np.where(adjacency_matrix[:, Z])[0]
if len(parents) >= 2:
for X, Y in combinations(parents, 2):
if not adjacency_matrix[X, Y]: # X和Y不相邻
# 找到一个未碰撞的V-结构
directed_adjacency_matrix[X, Z] = 1
directed_adjacency_matrix[Y, Z] = 1
print(f"发现 V-结构 {X}->{Z}<-{Y}")
# 3. 方向传播 (简化,仅处理一种情况)
# 如果 X->Y, Y-Z 且没有X->Z,则推断 Y->Z
for Y in range(n_vars):
parents = np.where(directed_adjacency_matrix[:, Y] == 1)[0]
neighbors = np.where(adjacency_matrix[Y])[0]
for X in parents:
for Z in neighbors:
if directed_adjacency_matrix[X,Z] == 0 and directed_adjacency_matrix[Z,X] == 0: # 没有X->Z 或 Z->X
directed_adjacency_matrix[Y, Z] = 1
print(f"方向传播: {Y}->{Z}")
return directed_adjacency_matrix
# 示例数据
data = np.random.randint(0, 2, size=(1000, 4)) # 1000个样本,4个变量 (0或1)
directed_adj_matrix = pc_algorithm(data)
print("学习到的有向邻接矩阵:n", directed_adj_matrix)
-
算法分析:
- 优点: 相对简单,易于实现。
- 缺点: 对条件独立性测试的准确性高度敏感。 卡方检验对样本量有要求。 算法复杂度较高,尤其是在变量数量较多时。 PC算法学习到的DAG可能不是唯一的,存在马尔科夫等价类。
2. 基于评分的方法:Hill-Climbing算法
Hill-Climbing算法是一种贪心搜索算法,通过不断修改DAG结构,并根据评分函数评估修改后的结构,选择评分最高的结构作为下一步搜索的方向。
-
算法流程:
- 初始化: 随机生成一个DAG结构,或者从一个空的DAG开始。
- 迭代优化:
- 尝试对当前DAG进行一系列修改操作,例如添加边、删除边、反转边的方向。
- 对于每个修改后的DAG,计算其评分函数值。
- 选择评分最高的DAG作为新的当前DAG。
- 如果没有找到评分更高的DAG,则算法停止。
-
评分函数: 常用的评分函数包括:
- 贝叶斯信息准则 (BIC): BIC = log P(D|G) – (k/2) log N,其中D是数据,G是图结构,k是模型参数数量,N是样本数量。 BIC倾向于选择更简单的模型。
- 赤池信息准则 (AIC): AIC = log P(D|G) – k,其中D是数据,G是图结构,k是模型参数数量。 AIC对模型复杂度惩罚较小,可能导致过拟合。
-
代码实现 (简化版):
import numpy as np
import pandas as pd
from pgmpy.estimators import BicScore, K2Score, BDeuScore
from pgmpy.models import BayesianNetwork
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination
def hill_climbing(data, scoring_method="bic", initial_model=None, max_iter=100):
"""
Hill-Climbing算法
data: 数据集 (pandas DataFrame)
scoring_method: 评分方法 ("bic", "k2", "bdeu")
initial_model: 初始模型 (pgmpy BayesianNetwork对象)
max_iter: 最大迭代次数
"""
if scoring_method == "bic":
scoring_function = BicScore(data)
elif scoring_method == "k2":
scoring_function = K2Score(data)
elif scoring_method == "bdeu":
scoring_function = BDeuScore(data)
else:
raise ValueError("Invalid scoring method. Choose 'bic', 'k2', or 'bdeu'.")
if initial_model is None:
model = BayesianNetwork()
nodes = data.columns.tolist()
model.add_nodes_from(nodes)
else:
model = initial_model
best_score = scoring_function.score(model)
best_model = model.copy()
for _ in range(max_iter):
best_candidate = None
best_candidate_score = best_score
# 尝试添加边
for u in data.columns:
for v in data.columns:
if u != v and (u, v) not in model.edges():
candidate_model = model.copy()
try:
candidate_model.add_edge(u, v)
if not candidate_model.is_dag(): # 检查是否引入环
continue
score = scoring_function.score(candidate_model)
if score > best_candidate_score:
best_candidate_score = score
best_candidate = candidate_model
except:
pass
# 尝试删除边
for u, v in list(model.edges()):
candidate_model = model.copy()
candidate_model.remove_edge(u, v)
score = scoring_function.score(candidate_model)
if score > best_candidate_score:
best_candidate_score = score
best_candidate = candidate_model
# 尝试反转边
for u, v in list(model.edges()):
candidate_model = model.copy()
try:
candidate_model.remove_edge(u, v)
candidate_model.add_edge(v, u)
if not candidate_model.is_dag():
continue
score = scoring_function.score(candidate_model)
if score > best_candidate_score:
best_candidate_score = score
best_candidate = candidate_model
except:
pass
if best_candidate is not None:
model = best_candidate
best_score = best_candidate_score
best_model = best_candidate.copy()
else:
break
return best_model
# 示例数据
data = pd.DataFrame(np.random.randint(0, 2, size=(500, 4)), columns=['A', 'B', 'C', 'D'])
learned_model = hill_climbing(data, scoring_method="bic")
print("学习到的结构:", learned_model.edges())
# 参数估计
emv = MaximumLikelihoodEstimator(learned_model, data)
learned_model.fit(data, estimator=emv)
# 推断
inference = VariableElimination(learned_model)
query = inference.query(variables=['A'], evidence={'B': 1})
print(query)
-
算法分析:
- 优点: 相对简单,易于实现。 可以使用不同的评分函数。
- 缺点: 容易陷入局部最优解。 搜索空间巨大,计算成本较高。 对初始模型敏感。
三、参数推断:学习条件概率分布
在确定了贝叶斯网络的结构之后,我们需要学习每个节点的条件概率分布 (CPD)。 常用的参数推断方法是最大似然估计 (MLE) 和贝叶斯估计。
1. 最大似然估计 (MLE)
MLE的目标是找到使得观测数据出现的概率最大化的参数值。 对于离散变量,MLE就是简单地计算每个状态组合的频率。
-
公式:
P(Xi = xi | Parents(Xi) = parents(xi)) = Count(Xi = xi, Parents(Xi) = parents(xi)) / Count(Parents(Xi) = parents(xi))
-
代码实现 (基于频率计数):
在上面的Hill-Climbing算法的代码示例中,已经包含了使用
pgmpy.estimators.MaximumLikelihoodEstimator进行参数估计的部分。
2. 贝叶斯估计
贝叶斯估计将参数视为随机变量,并使用先验概率分布来表示我们对参数的初始信念。 在观测到数据后,我们使用贝叶斯公式更新先验概率分布,得到后验概率分布。
-
公式:
P(θ | D) ∝ P(D | θ) * P(θ)
其中,θ是参数,D是数据,P(D | θ)是似然函数,P(θ)是先验概率分布,P(θ | D)是后验概率分布。
-
狄利克雷先验:
对于离散变量的条件概率分布,常用的先验分布是狄利克雷分布。 狄利克雷分布是多项式分布的共轭先验,这意味着如果先验分布是狄利克雷分布,那么后验分布也是狄利克雷分布。
-
代码实现 (使用狄利克雷先验):
from pgmpy.estimators import BayesianEstimator
# ... (前面的代码,包括数据生成和结构学习)
# 贝叶斯估计
be = BayesianEstimator(learned_model, data)
learned_model.fit(data, estimator=be, prior_type="dirichlet", pseudo_count=1) # pseudo_count 是狄利克雷先验的参数
# 推断 (与之前相同)
inference = VariableElimination(learned_model)
query = inference.query(variables=['A'], evidence={'B': 1})
print(query)
-
算法分析:
- MLE优点: 简单易懂,计算效率高。
- MLE缺点: 容易过拟合,尤其是在数据量较少时。 无法利用先验知识。
- 贝叶斯估计优点: 可以利用先验知识,减少过拟合。
- 贝叶斯估计缺点: 计算复杂度较高,需要选择合适的先验分布。
四、其他重要考虑因素
-
数据预处理: 贝叶斯网络对数据的质量非常敏感。 在进行结构学习和参数推断之前,需要进行数据清洗、缺失值处理、离散化等预处理操作。
-
模型评估: 需要使用合适的指标评估学习到的贝叶斯网络的性能,例如准确率、召回率、F1值、AUC等。 可以使用交叉验证等方法进行模型评估。
-
因果推断: 学习到的贝叶斯网络可以用于进行因果推断,例如预测干预的效果、发现反事实等。 但是,需要注意贝叶斯网络只能表示观测数据中的因果关系,不能直接推断干预的效果。 需要使用因果推断的方法,例如Do-算子,才能进行因果推断。
-
pgmpy库:
pgmpy是一个强大的Python库,提供了贝叶斯网络建模、结构学习、参数推断、推断等功能。 可以大大简化贝叶斯网络的实现。
五、总结:结构学习与参数推断的迭代优化
结构学习和参数推断是贝叶斯网络构建的两个核心步骤。结构学习的目标是从数据中发现变量之间的依赖关系,而参数推断则是学习条件概率分布。 这两个过程可以迭代进行,通过不断优化结构和参数,最终得到一个能够准确描述数据的贝叶斯网络模型。
更多IT精英技术系列讲座,到智猿学院