Python实现因果发现算法(PC/FCI):从观测数据中学习因果图结构与独立性检验
大家好,今天我们来探讨一个非常有趣且重要的领域:因果发现。在传统机器学习中,我们通常关注预测,即给定一些输入,预测输出。然而,因果发现更进一步,它试图从数据中学习变量之间的因果关系,这对于理解世界、做出更合理的决策至关重要。本次讲座我们将聚焦于两种经典的因果发现算法:PC算法和FCI算法,并使用Python进行实现。
1. 因果发现的挑战与基本概念
首先,我们需要明确因果关系与相关关系的区别。“相关性不等于因果性”是统计学中的一句至理名言。例如,冰淇淋销量与犯罪率可能存在正相关,但这并不意味着吃冰淇淋会导致犯罪,而是因为两者都可能受到夏季气温升高的影响。
因果发现的目标是从观测数据中推断出变量之间的因果图结构。一个因果图是一个有向无环图(DAG),其中节点代表变量,有向边表示因果关系。例如,A -> B 表示 A 直接影响 B。
在因果发现中,我们需要解决几个关键挑战:
- 混淆变量 (Confounding Variables): 就像冰淇淋和犯罪率的例子一样,隐藏的混淆变量可能会导致我们错误地推断因果关系。
- 选择偏差 (Selection Bias): 如果我们的数据不是随机抽样的,而是有选择性地收集的,那么我们可能会得到错误的因果结论。
- 因果方向的确定: 即使我们知道两个变量之间存在因果关系,也很难确定是 A 导致 B 还是 B 导致 A。
为了应对这些挑战,因果发现算法通常依赖于以下假设:
- 因果马尔可夫条件 (Causal Markov Condition): 给定一个变量的直接原因(即父母节点),该变量与其非后代节点条件独立。
- 忠实性条件 (Faithfulness Condition): 数据中的所有条件独立关系都反映在因果图中。这个假设意味着数据没有“神奇”的独立性,即不是由于参数的特殊选择导致的独立性。
- 因果充分性 (Causal Sufficiency): 数据中没有未观测到的混淆变量。如果这个假设不成立,我们将只能学习到变量之间的部分因果关系。
2. PC算法:从条件独立性测试到因果图结构
PC算法是一种经典的基于约束的因果发现算法。它基于因果马尔可夫条件和忠实性条件,通过一系列条件独立性测试来逐步构建因果图结构。
PC算法的主要步骤如下:
- 骨架发现 (Skeleton Discovery):
- 首先,构建一个完全图,即所有变量之间都存在边。
- 然后,通过条件独立性测试,逐步删除不必要的边。具体来说,对于每一对变量 X 和 Y,我们尝试找到一个变量集合 S,使得 X 和 Y 在给定 S 的情况下条件独立。如果找到了这样的 S,我们就删除 X 和 Y 之间的边。S 的大小逐渐增加,从 0 开始,直到达到一个预定的最大值。
- V结构发现 (V-Structure Discovery):
- 在骨架发现之后,我们得到一个无向图。接下来,我们需要确定边的方向。
- V结构是指形如 X – > Z < – Y 的结构,其中 X 和 Y 之间没有直接的边。如果我们在骨架中发现 X – Z – Y,并且 X 和 Y 在给定 Z 的情况下不条件独立,但在给定任何包含 Z 的集合 S 的情况下条件独立,那么我们就推断出 X -> Z <- Y 是一个 V结构。
- 规则1:
- 如果 A -> B – C,且A 和 C 不相邻,那么 B – C 变成 B -> C
- 规则2:
- 如果 A -> B -> C, A – C, 那么A – C 变成 A -> C
- 规则3:
- 如果 A – B – C, A – D – C, B 和 D 不相邻, B – C, D – C, A -> B, D -> C, 那么B – C 变成 B -> C
下面是一个使用 Python 实现 PC算法的例子。为了简化,我们使用一个预定义的条件独立性测试函数 conditional_independence_test。在实际应用中,可以使用卡方检验、G平方检验等统计方法来实现这个函数。
import pandas as pd
from itertools import combinations, chain
def powerset(iterable):
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def conditional_independence_test(data, X, Y, Z):
"""
一个占位符函数,用于测试 X 和 Y 在给定 Z 的情况下是否条件独立。
在实际应用中,需要使用统计检验方法(例如卡方检验)来实现。
这里我们使用一个简单的模拟,基于数据生成过程。
"""
# 假设 X, Y, Z 是列名
df = pd.DataFrame(data)
X_data = df[X].values
Y_data = df[Y].values
Z_data = df[list(Z)].values if Z else None
# 条件独立性的简单模拟 (仅用于演示)
# !!!在实际应用中,需要替换为统计检验!!!
if Z_data is None:
correlation = pd.Series(X_data).corr(pd.Series(Y_data))
return abs(correlation) < 0.1 # 如果相关性很小,则认为独立
else:
# 使用线性回归模拟条件独立性
from sklearn.linear_model import LinearRegression
model_X = LinearRegression()
model_Y = LinearRegression()
# 预测 X 和 Y 基于 Z
model_X.fit(Z_data, X_data)
model_Y.fit(Z_data, Y_data)
X_residual = X_data - model_X.predict(Z_data)
Y_residual = Y_data - model_Y.predict(Z_data)
correlation = pd.Series(X_residual).corr(pd.Series(Y_residual))
return abs(correlation) < 0.1 # 如果残差相关性很小,则认为条件独立
def pc_algorithm(data, alpha=0.05, max_cond_size=3):
"""
PC算法的Python实现。
Args:
data: Pandas DataFrame,包含观测数据。
alpha: 显著性水平,用于条件独立性测试。
max_cond_size: 条件集合的最大大小。
Returns:
一个字典,表示学习到的因果图结构。
键是节点,值是该节点的邻居集合。
"""
variables = list(data.columns)
graph = {var: set(variables) - {var} for var in variables} # 初始化为完全图
# 1. 骨架发现
for i in range(max_cond_size + 1):
for X in variables:
for Y in graph[X].copy():
if Y not in graph[X]: # 检查Y是否仍然是X的邻居
continue
neighbors_X = graph[X] - {Y}
for Z in combinations(neighbors_X, i):
if conditional_independence_test(data, X, Y, set(Z)):
graph[X].remove(Y)
graph[Y].remove(X)
break #找到分离集后,跳出内层循环
# 2. V结构发现
for Z in variables:
for X in variables:
if X == Z or Z not in graph[X]:
continue
for Y in variables:
if Y == Z or Y not in graph[Z] or X == Y or X in graph[Y]:
continue
if Y not in graph[X]: # 确认X和Y不相邻
# 检查X和Y在给定Z的情况下是否条件独立
if not conditional_independence_test(data, X, Y, {Z}):
# X -> Z <- Y 是一个V结构
if Y in graph[X] or X in graph[Y]:
continue
graph[X].add(Z+"_arrow")
graph[Z].add(X+"_arrow")
graph[Y].add(Z+"_arrow")
graph[Z].add(Y+"_arrow")
graph[Z].discard(X)
graph[X].discard(Z)
graph[Z].discard(Y)
graph[Y].discard(Z)
# 3. 应用规则1
for B in variables:
for A in variables:
if A == B or (A+"_arrow") not in graph[B]:
continue
for C in variables:
if C == B or C == A or (C not in graph[B] and (C+"_arrow") not in graph[B]) or A in graph[C]:
continue
if C not in graph[A]:
graph[B].add(C+"_arrow")
graph[C].add(B+"_arrow")
graph[B].discard(C)
graph[C].discard(B)
# 4. 应用规则2
for B in variables:
for A in variables:
if A == B or (A+"_arrow") not in graph[B]:
continue
for C in variables:
if C == B or C == A or (B+"_arrow") not in graph[C] or (A+"_arrow") not in graph[C]:
continue
if C in graph[A]:
graph[A].add(C+"_arrow")
graph[C].add(A+"_arrow")
graph[A].discard(C)
graph[C].discard(A)
# 5. 应用规则3
for B in variables:
for A in variables:
if A == B or A not in graph[B]:
continue
for C in variables:
if C == B or C == A or C not in graph[B]:
continue
for D in variables:
if D == B or D == A or D == C or D not in graph[C] or D not in graph[A]:
continue
if (D in graph[B]) or ((D+"_arrow") in graph[B]):
continue
if B not in graph[D] and (B+"_arrow") not in graph[D]:
continue
if (D in graph[B]) or ((D+"_arrow") in graph[B]):
continue
if (B in graph[A]) or ((B+"_arrow") in graph[A]):
continue
if (D in graph[C]) or ((D+"_arrow") in graph[C]):
continue
graph[B].add(C+"_arrow")
graph[C].add(B+"_arrow")
graph[B].discard(C)
graph[C].discard(B)
return graph
# 示例用法
# 创建一个示例数据集
data = pd.DataFrame({
'A': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'B': [2, 4, 6, 8, 10, 12, 14, 16, 18, 20],
'C': [3, 6, 9, 12, 15, 18, 21, 24, 27, 30],
'D': [4, 8, 12, 16, 20, 24, 28, 32, 36, 40]
})
# 运行PC算法
graph = pc_algorithm(data)
# 打印结果
for node, neighbors in graph.items():
print(f"{node}: {neighbors}")
3. FCI算法:处理未观测的混淆变量
PC算法的一个重要假设是因果充分性,即没有未观测到的混淆变量。然而,在现实世界中,这个假设往往不成立。FCI算法 (Fast Causal Inference) 是一种扩展的因果发现算法,它可以处理未观测的混淆变量。
FCI算法与PC算法的主要区别在于:
- FCI算法不仅学习变量之间的直接因果关系,还学习变量与潜在混淆变量之间的关系。
- FCI算法使用不同的符号来表示不同的因果关系:
A -> B: A 是 B 的直接原因。A -o B: A 和 B 之间可能存在一个指向 B 的未观测的混淆变量。A o- B: A 和 B 之间可能存在一个指向 A 的未观测的混淆变量。A o-o B: A 和 B 之间可能存在一个未观测的混淆变量,方向未知。
FCI算法的输出是一个 Partial Ancestral Graph (PAG),它表示了变量之间的潜在因果关系。
由于FCI算法的复杂性,这里我们不提供完整的 Python 实现。但是,我们可以简单地描述一下FCI算法的主要步骤:
- 骨架发现: 与 PC 算法类似,但使用更严格的条件独立性测试,以避免错误地删除边。
- 碰撞器发现: 识别 V 结构,并使用特殊规则来处理潜在的混淆变量。
- 传播规则: 应用一系列规则来传播方向信息,并确定潜在的混淆变量的位置。
4. 条件独立性检验的具体实现
在上述 PC 算法的实现中,我们使用了一个占位符函数 conditional_independence_test。现在,让我们讨论如何使用统计方法来实现这个函数。
常用的条件独立性检验方法包括:
- 卡方检验 (Chi-Square Test): 适用于离散变量。
- G平方检验 (G-Square Test): 类似于卡方检验,但更适用于小样本数据。
- 条件互信息 (Conditional Mutual Information): 适用于连续变量和离散变量的混合。
- 线性回归 (Linear Regression): 如果变量之间存在线性关系,可以使用线性回归来测试条件独立性。
下面是一个使用卡方检验实现 conditional_independence_test 的例子:
from scipy.stats import chi2_contingency
def conditional_independence_test_chisq(data, X, Y, Z, alpha=0.05):
"""
使用卡方检验测试 X 和 Y 在给定 Z 的情况下是否条件独立。
适用于离散变量。
"""
df = pd.DataFrame(data)
observed = pd.crosstab(df[X], df[Y])
if Z:
# 如果存在条件变量Z
p_value = 0
for z_val in df[list(Z)[0]].unique(): #只考虑一个条件变量
subset = df[df[list(Z)[0]] == z_val]
if len(subset[X].unique()) > 1 and len(subset[Y].unique()) > 1:
observed = pd.crosstab(subset[X], subset[Y])
chi2, p, dof, expected = chi2_contingency(observed)
p_value += p
p_value = p_value/len(df[list(Z)[0]].unique()) #取平均
else:
chi2, p, dof, expected = chi2_contingency(observed)
p_value = p
return p_value > alpha # p > alpha, 则认为独立
# 示例用法
# 创建一个示例数据集
data = pd.DataFrame({
'A': [0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
'B': [0, 0, 1, 1, 0, 0, 1, 1, 0, 0],
'C': [0, 1, 0, 1, 1, 0, 1, 0, 0, 1],
})
# 测试 A 和 B 在给定 C 的情况下是否条件独立
is_independent = conditional_independence_test_chisq(data, 'A', 'B', {'C'})
print(f"A and B are conditionally independent given C: {is_independent}")
5. 因果发现算法的应用与局限性
因果发现算法在许多领域都有广泛的应用,例如:
- 医学: 发现疾病的风险因素和治疗方法。
- 经济学: 了解经济变量之间的因果关系,制定更有效的政策。
- 社会科学: 研究社会现象的原因和影响。
- 人工智能: 构建更智能的决策系统。
然而,因果发现算法也存在一些局限性:
- 计算复杂度: PC算法和FCI算法的计算复杂度较高,尤其是在变量数量较多时。
- 数据质量: 因果发现算法对数据质量要求较高,噪声和缺失值会影响结果的准确性。
- 假设的限制: 因果发现算法依赖于一些假设,如果这些假设不成立,可能会得到错误的结论。
- 因果方向的不确定性: 即使在理想情况下,因果发现算法也可能无法完全确定因果方向,只能学习到变量之间的部分因果关系。
总结:代码是骨架,理论是灵魂,应用是价值
本次讲座,我们深入探讨了 PC 算法和 FCI 算法,并提供了使用 Python 实现 PC 算法的示例代码。我们还讨论了条件独立性检验的具体实现,以及因果发现算法的应用与局限性。希望这次讲座能帮助大家更好地理解因果发现的原理和方法,并在实际应用中取得更好的效果。
核心步骤:骨架发现,V结构识别,规则应用
PC算法通过骨架发现,V结构识别和一系列规则的应用,从数据中推断出可能的因果图结构。
复杂场景:潜在混淆,关系模糊,方向未知
FCI算法解决了PC算法不能处理潜在混淆变量的局限性,能够发现变量与潜在混淆变量之间的关系。
数据质量:模型假设,计算限制,方向确定
因果发现算法对数据质量要求较高,并且受到模型假设和计算资源的限制,同时因果方向的确定也是一个挑战。
更多IT精英技术系列讲座,到智猿学院