Python实现因果图模型的贝叶斯网络:结构学习与参数推断的算法细节

Python实现因果图模型的贝叶斯网络:结构学习与参数推断的算法细节

大家好,今天我们来深入探讨如何使用Python实现因果图模型的贝叶斯网络,包括结构学习和参数推断两个关键环节。贝叶斯网络是一种强大的概率图模型,能够有效地表示变量之间的概率依赖关系,并进行推断。理解其内部机制对于实际应用至关重要。

一、贝叶斯网络基础

在深入算法细节之前,我们先回顾一下贝叶斯网络的基本概念。

  • 有向无环图 (DAG): 贝叶斯网络的核心是一个有向无环图,其中节点代表变量,边代表变量之间的直接因果关系。箭头从“原因”指向“结果”。 无环保证了概率计算的合理性。

  • 条件概率分布 (CPD): 每个节点都关联一个条件概率分布,描述了在给定其父节点状态下,该节点取值的概率。对于没有父节点的节点,CPD退化为先验概率分布。

  • 联合概率分布: 贝叶斯网络定义了一个联合概率分布,可以分解为每个节点在其父节点条件下的条件概率的乘积:

    P(X1, X2, …, Xn) = Π P(Xi | Parents(Xi))

    其中,Parents(Xi)表示节点Xi的所有父节点。

二、结构学习:从数据中发现因果关系

结构学习的目标是根据观测数据,学习出最能描述变量之间依赖关系的DAG。这是一个具有挑战性的问题,因为可能的DAG结构的数量随变量数量呈指数增长。常用的结构学习方法包括基于约束的方法和基于评分的方法。

1. 基于约束的方法:PC算法

PC算法是一种经典的基于约束的结构学习算法。它的核心思想是利用条件独立性测试,逐步构建DAG。

  • 算法流程:

    1. 骨架学习:

      • 构造完全无向图。
      • 移除所有变量对之间的边,如果它们是无条件独立的。
      • 迭代地进行条件独立性测试,在逐渐增大的条件下(即条件变量的数量),移除更多的边。 例如,先测试X和Y是否独立,然后测试X和Y在Z的条件下是否独立,再测试在Z1和Z2的条件下是否独立,直到达到某个预设的最大条件集大小。
    2. V-结构学习:

      • 识别所有未碰撞的V-结构 (X -> Z <- Y),即X和Y不是直接相连的,但它们都指向Z。
      • 将未碰撞的V-结构的方向确定为X -> Z <- Y,只要Z不是X和Y的共同原因。
    3. 方向传播:

      • 应用一系列方向传播规则,尽可能地确定剩余边的方向,同时避免引入环或新的V-结构。
  • 代码实现 (简化版):

import numpy as np
from scipy.stats import chi2_contingency

def conditional_independence_test(data, X, Y, Z, alpha=0.05):
    """
    条件独立性测试 (卡方检验)
    data: 数据集 (numpy array)
    X, Y, Z: 变量索引 (列索引)
    alpha: 显著性水平
    """
    # 创建列联表
    observed = np.zeros((len(np.unique(data[:, X])), len(np.unique(data[:, Y]))))
    for i in range(len(data)):
        observed[data[i, X], data[i, Y]] += 1

    # 如果Z为空,则进行独立性测试
    if not Z:
        chi2, p, _, _ = chi2_contingency(observed)
    else:
        # 将Z离散化
        z_values = np.unique(data[:, Z])
        expected = np.zeros((len(np.unique(data[:, X])), len(np.unique(data[:, Y])), len(z_values)))
        for k, z_val in enumerate(z_values):
            data_subset = data[data[:, Z] == z_val]
            observed_subset = np.zeros((len(np.unique(data[:, X])), len(np.unique(data_subset[:, Y]))))
            for i in range(len(data_subset)):
                observed_subset[data_subset[i, X], data_subset[i, Y]] += 1
            chi2, p, _, _ = chi2_contingency(observed_subset)
            if k == 0:
                total_chi2 = chi2
            else:
                total_chi2 += chi2

        # 计算自由度
        dof = (len(np.unique(data[:, X])) - 1) * (len(np.unique(data[:, Y])) - 1) * len(z_values)
        p = 1 - chi2.cdf(total_chi2, dof)

    return p > alpha  # 如果p值大于alpha,则认为X和Y在Z的条件下独立

def pc_algorithm(data, alpha=0.05, max_cond_size=3):
    """
    PC算法 (简化版)
    data: 数据集 (numpy array)
    alpha: 显著性水平
    max_cond_size: 最大条件集大小
    """
    n_vars = data.shape[1]

    # 1. 骨架学习
    adjacency_matrix = np.ones((n_vars, n_vars), dtype=bool)  # 初始为完全图
    for i in range(n_vars):
        adjacency_matrix[i, i] = False  # 移除自环

    for l in range(max_cond_size + 1): # 条件集大小
        for X in range(n_vars):
            for Y in range(X + 1, n_vars): # 避免重复检查
                if adjacency_matrix[X, Y]:
                    # 寻找可能的条件集Z
                    neighbors_X = np.where(adjacency_matrix[X])[0]
                    possible_Z = [z for z in neighbors_X if z != Y] # X的邻居且不是Y
                    if len(possible_Z) >= l:
                        # 遍历所有大小为l的条件集
                        from itertools import combinations
                        for Z_set in combinations(possible_Z, l):
                            if conditional_independence_test(data, X, Y, list(Z_set), alpha):
                                adjacency_matrix[X, Y] = False
                                adjacency_matrix[Y, X] = False
                                print(f"移除边 {X}-{Y} 在条件 {Z_set} 下")
                                break # 如果找到一个条件集使得X和Y条件独立,则移除边

    # 2. V-结构学习
    directed_adjacency_matrix = np.zeros((n_vars, n_vars), dtype=int) # 0: 无边, 1: X->Y, -1: Y->X
    for Z in range(n_vars):
        parents = np.where(adjacency_matrix[:, Z])[0]
        if len(parents) >= 2:
            for X, Y in combinations(parents, 2):
                if not adjacency_matrix[X, Y]: # X和Y不相邻
                    # 找到一个未碰撞的V-结构
                    directed_adjacency_matrix[X, Z] = 1
                    directed_adjacency_matrix[Y, Z] = 1
                    print(f"发现 V-结构 {X}->{Z}<-{Y}")

    # 3.  方向传播 (简化,仅处理一种情况)
    #   如果 X->Y, Y-Z 且没有X->Z,则推断 Y->Z
    for Y in range(n_vars):
        parents = np.where(directed_adjacency_matrix[:, Y] == 1)[0]
        neighbors = np.where(adjacency_matrix[Y])[0]
        for X in parents:
            for Z in neighbors:
                if directed_adjacency_matrix[X,Z] == 0 and directed_adjacency_matrix[Z,X] == 0: # 没有X->Z 或 Z->X
                    directed_adjacency_matrix[Y, Z] = 1
                    print(f"方向传播: {Y}->{Z}")

    return directed_adjacency_matrix

# 示例数据
data = np.random.randint(0, 2, size=(1000, 4)) # 1000个样本,4个变量 (0或1)
directed_adj_matrix = pc_algorithm(data)
print("学习到的有向邻接矩阵:n", directed_adj_matrix)
  • 算法分析:

    • 优点: 相对简单,易于实现。
    • 缺点: 对条件独立性测试的准确性高度敏感。 卡方检验对样本量有要求。 算法复杂度较高,尤其是在变量数量较多时。 PC算法学习到的DAG可能不是唯一的,存在马尔科夫等价类。

2. 基于评分的方法:Hill-Climbing算法

Hill-Climbing算法是一种贪心搜索算法,通过不断修改DAG结构,并根据评分函数评估修改后的结构,选择评分最高的结构作为下一步搜索的方向。

  • 算法流程:

    1. 初始化: 随机生成一个DAG结构,或者从一个空的DAG开始。
    2. 迭代优化:
      • 尝试对当前DAG进行一系列修改操作,例如添加边、删除边、反转边的方向。
      • 对于每个修改后的DAG,计算其评分函数值。
      • 选择评分最高的DAG作为新的当前DAG。
      • 如果没有找到评分更高的DAG,则算法停止。
  • 评分函数: 常用的评分函数包括:

    • 贝叶斯信息准则 (BIC): BIC = log P(D|G) – (k/2) log N,其中D是数据,G是图结构,k是模型参数数量,N是样本数量。 BIC倾向于选择更简单的模型。
    • 赤池信息准则 (AIC): AIC = log P(D|G) – k,其中D是数据,G是图结构,k是模型参数数量。 AIC对模型复杂度惩罚较小,可能导致过拟合。
  • 代码实现 (简化版):

import numpy as np
import pandas as pd
from pgmpy.estimators import BicScore, K2Score, BDeuScore
from pgmpy.models import BayesianNetwork
from pgmpy.estimators import MaximumLikelihoodEstimator
from pgmpy.inference import VariableElimination

def hill_climbing(data, scoring_method="bic", initial_model=None, max_iter=100):
    """
    Hill-Climbing算法
    data: 数据集 (pandas DataFrame)
    scoring_method: 评分方法 ("bic", "k2", "bdeu")
    initial_model: 初始模型 (pgmpy BayesianNetwork对象)
    max_iter: 最大迭代次数
    """
    if scoring_method == "bic":
        scoring_function = BicScore(data)
    elif scoring_method == "k2":
        scoring_function = K2Score(data)
    elif scoring_method == "bdeu":
        scoring_function = BDeuScore(data)
    else:
        raise ValueError("Invalid scoring method. Choose 'bic', 'k2', or 'bdeu'.")

    if initial_model is None:
        model = BayesianNetwork()
        nodes = data.columns.tolist()
        model.add_nodes_from(nodes)
    else:
        model = initial_model

    best_score = scoring_function.score(model)
    best_model = model.copy()

    for _ in range(max_iter):
        best_candidate = None
        best_candidate_score = best_score

        # 尝试添加边
        for u in data.columns:
            for v in data.columns:
                if u != v and (u, v) not in model.edges():
                    candidate_model = model.copy()
                    try:
                        candidate_model.add_edge(u, v)
                        if not candidate_model.is_dag():  # 检查是否引入环
                            continue
                        score = scoring_function.score(candidate_model)
                        if score > best_candidate_score:
                            best_candidate_score = score
                            best_candidate = candidate_model
                    except:
                        pass

        # 尝试删除边
        for u, v in list(model.edges()):
            candidate_model = model.copy()
            candidate_model.remove_edge(u, v)
            score = scoring_function.score(candidate_model)
            if score > best_candidate_score:
                best_candidate_score = score
                best_candidate = candidate_model

        # 尝试反转边
        for u, v in list(model.edges()):
            candidate_model = model.copy()
            try:
                candidate_model.remove_edge(u, v)
                candidate_model.add_edge(v, u)

                if not candidate_model.is_dag():
                    continue
                score = scoring_function.score(candidate_model)
                if score > best_candidate_score:
                    best_candidate_score = score
                    best_candidate = candidate_model
            except:
                pass

        if best_candidate is not None:
            model = best_candidate
            best_score = best_candidate_score
            best_model = best_candidate.copy()

        else:
            break

    return best_model

# 示例数据
data = pd.DataFrame(np.random.randint(0, 2, size=(500, 4)), columns=['A', 'B', 'C', 'D'])
learned_model = hill_climbing(data, scoring_method="bic")

print("学习到的结构:", learned_model.edges())

# 参数估计
emv = MaximumLikelihoodEstimator(learned_model, data)
learned_model.fit(data, estimator=emv)

# 推断
inference = VariableElimination(learned_model)
query = inference.query(variables=['A'], evidence={'B': 1})
print(query)
  • 算法分析:

    • 优点: 相对简单,易于实现。 可以使用不同的评分函数。
    • 缺点: 容易陷入局部最优解。 搜索空间巨大,计算成本较高。 对初始模型敏感。

三、参数推断:学习条件概率分布

在确定了贝叶斯网络的结构之后,我们需要学习每个节点的条件概率分布 (CPD)。 常用的参数推断方法是最大似然估计 (MLE) 和贝叶斯估计。

1. 最大似然估计 (MLE)

MLE的目标是找到使得观测数据出现的概率最大化的参数值。 对于离散变量,MLE就是简单地计算每个状态组合的频率。

  • 公式:

    P(Xi = xi | Parents(Xi) = parents(xi)) = Count(Xi = xi, Parents(Xi) = parents(xi)) / Count(Parents(Xi) = parents(xi))

  • 代码实现 (基于频率计数):

    在上面的Hill-Climbing算法的代码示例中,已经包含了使用pgmpy.estimators.MaximumLikelihoodEstimator进行参数估计的部分。

2. 贝叶斯估计

贝叶斯估计将参数视为随机变量,并使用先验概率分布来表示我们对参数的初始信念。 在观测到数据后,我们使用贝叶斯公式更新先验概率分布,得到后验概率分布。

  • 公式:

    P(θ | D) ∝ P(D | θ) * P(θ)

    其中,θ是参数,D是数据,P(D | θ)是似然函数,P(θ)是先验概率分布,P(θ | D)是后验概率分布。

  • 狄利克雷先验:

    对于离散变量的条件概率分布,常用的先验分布是狄利克雷分布。 狄利克雷分布是多项式分布的共轭先验,这意味着如果先验分布是狄利克雷分布,那么后验分布也是狄利克雷分布。

  • 代码实现 (使用狄利克雷先验):

from pgmpy.estimators import BayesianEstimator

# ... (前面的代码,包括数据生成和结构学习)

# 贝叶斯估计
be = BayesianEstimator(learned_model, data)
learned_model.fit(data, estimator=be, prior_type="dirichlet", pseudo_count=1)  # pseudo_count 是狄利克雷先验的参数

# 推断 (与之前相同)
inference = VariableElimination(learned_model)
query = inference.query(variables=['A'], evidence={'B': 1})
print(query)
  • 算法分析:

    • MLE优点: 简单易懂,计算效率高。
    • MLE缺点: 容易过拟合,尤其是在数据量较少时。 无法利用先验知识。
    • 贝叶斯估计优点: 可以利用先验知识,减少过拟合。
    • 贝叶斯估计缺点: 计算复杂度较高,需要选择合适的先验分布。

四、其他重要考虑因素

  • 数据预处理: 贝叶斯网络对数据的质量非常敏感。 在进行结构学习和参数推断之前,需要进行数据清洗、缺失值处理、离散化等预处理操作。

  • 模型评估: 需要使用合适的指标评估学习到的贝叶斯网络的性能,例如准确率、召回率、F1值、AUC等。 可以使用交叉验证等方法进行模型评估。

  • 因果推断: 学习到的贝叶斯网络可以用于进行因果推断,例如预测干预的效果、发现反事实等。 但是,需要注意贝叶斯网络只能表示观测数据中的因果关系,不能直接推断干预的效果。 需要使用因果推断的方法,例如Do-算子,才能进行因果推断。

  • pgmpy库: pgmpy 是一个强大的Python库,提供了贝叶斯网络建模、结构学习、参数推断、推断等功能。 可以大大简化贝叶斯网络的实现。

五、总结:结构学习与参数推断的迭代优化

结构学习和参数推断是贝叶斯网络构建的两个核心步骤。结构学习的目标是从数据中发现变量之间的依赖关系,而参数推断则是学习条件概率分布。 这两个过程可以迭代进行,通过不断优化结构和参数,最终得到一个能够准确描述数据的贝叶斯网络模型。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注