Python的ETL框架:如何使用`Pandera`和`Great Expectations`进行数据验证和质量保证。

Python ETL 框架中的数据验证与质量保证:Pandera 与 Great Expectations 实战

各位朋友,大家好!今天我们来聊聊在 Python ETL (Extract, Transform, Load) 框架中,如何利用 PanderaGreat Expectations 这两个强大的工具进行数据验证和质量保证。数据质量是 ETL 流程的核心,直接影响下游分析和决策的准确性。PanderaGreat Expectations 为我们提供了不同的数据验证方法,可以有效地保证数据的完整性、准确性和一致性。

1. 数据质量的重要性与挑战

在 ETL 流程中,数据质量问题可能出现在任何阶段:

  • 提取阶段: 数据源可能存在错误或不完整的数据。
  • 转换阶段: 数据转换逻辑的错误可能引入新的数据问题。
  • 加载阶段: 数据目标端可能对数据格式或约束有特定的要求。

如果没有有效的数据验证和质量保证机制,这些问题可能会悄无声息地渗透到下游系统,导致错误的分析结果和错误的决策。

以下是数据质量可能面临的一些常见挑战:

  • 缺失值: 数据集中缺少某些字段的值。
  • 异常值: 数据集中存在超出正常范围的值。
  • 数据类型错误: 字段的数据类型与预期不符。
  • 数据格式错误: 字段的格式与预期不符。
  • 数据一致性问题: 同一个实体在不同的数据集中存在不同的表示。
  • 数据重复: 数据集中存在重复的记录。

解决这些挑战,需要一个强大的数据验证和质量保证框架。这就是 PanderaGreat Expectations 能够发挥作用的地方。

2. Pandera:基于 Schema 的数据验证

Pandera 是一个轻量级的 Python 库,用于在 pandas DataFrame 上执行数据验证。它基于 Schema 的思想,允许你定义 DataFrame 的结构(例如列名、数据类型、约束等),并使用这些 Schema 来验证 DataFrame 的数据。

2.1 Pandera 的安装

pip install pandera

2.2 定义 Pandera Schema

下面是一个简单的例子,展示如何使用 Pandera 定义一个 Schema:

import pandas as pd
import pandera as pa
from pandera import Column, Check, Index

# 定义一个 DataFrame Schema
class MySchema(pa.Schema):
    user_id: Column[int] = Column(checks=pa.Check.greater_than(0))  # user_id 必须是 int 类型,且大于 0
    username: Column[str] = Column(checks=pa.Check.str_length(1, 50))  # username 必须是 str 类型,长度在 1 到 50 之间
    age: Column[int] = Column(checks=[pa.Check.greater_than_or_equal_to(0), pa.Check.less_than_or_equal_to(120)])  # age 必须是 int 类型,在 0 到 120 之间
    signup_date: Column[pd.Timestamp] = Column(nullable=True)  # signup_date 必须是 Timestamp 类型, 允许为 Null

    class Config:
        strict = True # 严格模式,如果 DataFrame 中存在 Schema 中未定义的列,则会报错
        coerce = True # 强制类型转换,如果 DataFrame 中的数据类型与 Schema 不符,则尝试进行转换

# 创建一个 DataFrame
data = {
    'user_id': [1, 2, 3, 4, 5],
    'username': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 22, 40, 18],
    'signup_date': ['2023-01-01', '2023-02-15', '2023-03-20', '2023-04-05', '2023-05-10']
}
df = pd.DataFrame(data)
df['signup_date'] = pd.to_datetime(df['signup_date'])

# 使用 Schema 验证 DataFrame
try:
    MySchema.validate(df, lazy=True) # 启用 lazy 模式,可以一次性报告所有错误
    print("DataFrame 验证通过!")
except pa.errors.SchemaErrors as e:
    print("DataFrame 验证失败!")
    print(e.failure_cases)  # 显示所有失败的案例
    print(e.formatted_error) # 格式化错误信息

在这个例子中,我们定义了一个 MySchema 类,它继承自 pa.Schema。在 MySchema 类中,我们定义了 DataFrame 的各个列,并为每个列指定了数据类型和约束条件。例如,user_id 列必须是 int 类型,且大于 0。username 列必须是 str 类型,长度在 1 到 50 之间。

我们还设置了 Config 类,其中 strict = True 表示严格模式,如果 DataFrame 中存在 Schema 中未定义的列,则会报错。coerce = True 表示强制类型转换,如果 DataFrame 中的数据类型与 Schema 不符,则尝试进行转换。

最后,我们使用 MySchema.validate(df, lazy=True) 方法来验证 DataFrame。lazy=True 启用 lazy 模式,可以一次性报告所有错误。

2.3 Pandera 常用功能

  • 数据类型验证: 可以验证列的数据类型是否符合预期。
  • 约束条件验证: 可以验证列的值是否满足特定的约束条件,例如范围、长度、正则表达式等。
  • 空值验证: 可以验证列是否允许空值。
  • 唯一性验证: 可以验证列的值是否唯一。
  • 自定义验证: 可以使用自定义函数来验证列的值。
  • Schema 继承: 可以继承现有的 Schema,并添加或修改其中的列。
  • Index 验证: 可以验证 DataFrame 的索引。

2.4 Pandera 的高级用法

  • 使用 Check 类定义更复杂的约束条件: Check 类提供了许多内置的约束条件,例如 Check.greater_thanCheck.less_thanCheck.isinCheck.str_matches 等。你也可以使用自定义函数来定义更复杂的约束条件。
  • 使用 Column 类的 alias 参数来定义列的别名: alias 参数允许你使用不同的名称来引用同一个列。
  • 使用 Schema 类的 columns 参数来定义列的顺序: columns 参数允许你指定 DataFrame 中列的顺序。
  • 使用 Schema 类的 dtype 参数来指定 DataFrame 的默认数据类型: dtype 参数允许你为 DataFrame 中的所有列指定默认的数据类型。
  • 使用 SchemaModel 定义 Schema: 可以使用 pandera.model.SchemaModel 来定义 Schema, 可以获得更好的类型提示和代码补全功能。

下面是一个使用 Check 类定义更复杂的约束条件的例子:

import pandas as pd
import pandera as pa
from pandera import Column, Check

class ProductSchema(pa.Schema):
    product_id: Column[int] = Column(checks=pa.Check.greater_than(0))
    product_name: Column[str] = Column(checks=pa.Check.str_length(1, 100))
    price: Column[float] = Column(checks=[pa.Check.greater_than(0), pa.Check.less_than(1000)])
    category: Column[str] = Column(checks=pa.Check.isin(['Electronics', 'Clothing', 'Books']))
    discount: Column[float] = Column(nullable=True, checks=pa.Check.in_range(0, 0.5)) # 折扣,允许为空,范围在 0 到 0.5 之间

    @pa.check("price", name="price_must_be_positive", error="Price must be positive")
    def price_must_be_positive(cls, price: pd.Series) -> pd.Series:
        """自定义校验价格必须为正数"""
        return price > 0

    class Config:
        strict = True

data = {
    'product_id': [1, 2, 3, 4, 5],
    'product_name': ['Laptop', 'T-Shirt', 'Python Crash Course', 'Headphones', 'Jeans'],
    'price': [1200, 25, 30, 80, 45], # Laptop价格错误,超出范围
    'category': ['Electronics', 'Clothing', 'Books', 'Electronics', 'Clothing'],
    'discount': [0.1, 0.2, 0.0, None, 0.3]
}
df = pd.DataFrame(data)

try:
    ProductSchema.validate(df, lazy=True)
    print("DataFrame 验证通过!")
except pa.errors.SchemaErrors as e:
    print("DataFrame 验证失败!")
    print(e.failure_cases)
    print(e.formatted_error)

这个例子展示了如何使用 Check 类来定义更复杂的约束条件,例如 Check.isinCheck.in_range。它还展示了如何使用自定义函数来验证列的值。

3. Great Expectations:基于期望的数据验证

Great Expectations 是一个更全面的数据验证框架,它允许你定义数据的“期望”,并使用这些期望来验证数据。与 Pandera 相比,Great Expectations 更加灵活,可以验证各种类型的数据,包括 DataFrame、数据库表、文件等。

3.1 Great Expectations 的安装

pip install great_expectations

3.2 初始化 Great Expectations 项目

在使用 Great Expectations 之前,你需要初始化一个项目。你可以使用以下命令来初始化一个项目:

great_expectations init

这个命令会创建一个名为 great_expectations 的目录,其中包含项目的配置文件、数据源定义、期望定义等。

3.3 创建数据源

Great Expectations 中,数据源被称为 "Datasource"。你需要定义一个 Datasource 来告诉 Great Expectations 如何访问你的数据。你可以使用以下命令来创建一个 Datasource:

great_expectations datasource new

这个命令会引导你完成 Datasource 的创建过程。你可以选择不同的 Datasource 类型,例如 pandassparksqlalchemy 等。

3.4 创建期望

Great Expectations 中,期望被称为 "Expectation"。你需要定义一系列的 Expectation 来描述你的数据的预期状态。你可以使用以下命令来创建一个 Expectation:

great_expectations suite new

这个命令会引导你完成 Expectation 的创建过程。你可以选择不同的 Expectation 类型,例如 expect_column_values_to_not_be_nullexpect_column_values_to_be_uniqueexpect_column_values_to_be_in_set 等。

3.5 验证数据

在定义好 Datasource 和 Expectation 之后,你可以使用以下命令来验证数据:

great_expectations check <expectation_suite_name>

这个命令会运行你定义的 Expectation,并生成验证报告。验证报告会告诉你哪些 Expectation 成功了,哪些 Expectation 失败了。

3.6 一个简单的例子

下面是一个简单的例子,展示如何使用 Great Expectations 验证 DataFrame 的数据:

import pandas as pd
import great_expectations as gx

# 创建一个 Great Expectations 上下文
context = gx.get_context()

# 创建一个 DataFrame
data = {
    'user_id': [1, 2, 3, 4, 5],
    'username': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 22, 40, 18]
}
df = pd.DataFrame(data)

# 创建一个 Datasource
datasource = context.sources.add_pandas(name="my_pandas_datasource")

# 创建一个 Data Asset
data_asset = datasource.add_dataframe_asset(name="my_dataframe", dataframe=df)

# 创建一个 Batch Request
batch_request = data_asset.build_batch_request()

# 创建一个 Expectation Suite
expectation_suite_name = "my_expectation_suite"
try:
    expectation_suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f"已存在名为 {expectation_suite_name} 的 Expectation Suite,将继续使用。")
except gx.exceptions.DataContextError:
    expectation_suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f"创建名为 {expectation_suite_name} 的 Expectation Suite。")

# 定义 Expectation
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)
validator.expect_column_values_to_not_be_null(column="user_id")
validator.expect_column_values_to_be_unique(column="user_id")
validator.expect_column_values_to_be_in_type_list(column="age", type_list=["INT64"])
validator.expect_column_values_to_be_between(column="age", min_value=0, max_value=120)

# 验证数据
checkpoint = context.add_or_update_checkpoint(
    name="my_checkpoint",
    validator=validator,
)

checkpoint_result = checkpoint.run()

# 查看验证结果
if checkpoint_result["success"]:
    print("数据验证通过!")
else:
    print("数据验证失败!")
    print(checkpoint_result)

# 可视化验证结果
context.view_validation_result(checkpoint_result)

在这个例子中,我们首先创建了一个 Great Expectations 上下文。然后,我们创建了一个 DataFrame,并将其转换为 Great Expectations 可以理解的 Data Asset。接着,我们创建了一个 Expectation Suite,并在其中定义了一些 Expectation。最后,我们使用 validator.validate() 方法来验证数据,并查看验证结果。

3.7 Great Expectations 的常用功能

  • 多种数据源支持: 支持 pandas DataFrame、数据库表、文件等多种数据源。
  • 丰富的 Expectation 类型: 提供了大量的内置 Expectation 类型,可以满足各种数据验证需求。
  • 自定义 Expectation: 允许你使用自定义函数来定义 Expectation。
  • 自动数据剖析: 可以自动分析数据,并生成推荐的 Expectation。
  • 数据文档生成: 可以根据 Expectation 和验证结果生成数据文档。
  • 集成到 CI/CD 流程: 可以将数据验证集成到 CI/CD 流程中,以确保数据质量。

3.8 Great Expectations 的高级用法

  • 使用 Validator 对象来定义 Expectation: Validator 对象提供了更灵活的方式来定义 Expectation。
  • 使用 Checkpoint 对象来运行验证: Checkpoint 对象允许你配置验证的各种参数,例如数据源、Expectation Suite、验证报告等。
  • 使用 Data Docs 来查看验证结果: Data DocsGreat Expectations 提供的可视化工具,可以让你方便地查看验证结果。
  • Great Expectations 集成到 Airflow 中: 可以将 Great Expectations 集成到 Airflow 中,以实现自动化数据验证。

4. Pandera 与 Great Expectations 的比较

PanderaGreat Expectations 都是优秀的数据验证工具,但它们的设计理念和适用场景有所不同。

特性 Pandera Great Expectations
设计理念 基于 Schema 的数据验证 基于期望的数据验证
验证对象 pandas DataFrame 多种数据源,包括 DataFrame、数据库表、文件等
灵活性 较低,更适合结构化的数据 较高,更适合各种类型的数据
易用性 较高,API 简单易懂 较低,需要学习一些概念和配置
功能 数据类型验证、约束条件验证、空值验证、唯一性验证 丰富的 Expectation 类型、自定义 Expectation、自动数据剖析、数据文档生成、集成到 CI/CD 流程
适用场景 简单的 DataFrame 数据验证 复杂的数据验证场景,需要验证各种类型的数据,并生成数据文档
与 ETL 集成 易于集成到 ETL 流程中,特别是基于 Pandas 的 ETL 更加灵活,可以通过 Checkpoint 等机制方便地集成到各种 ETL 框架中,例如 Airflow
学习曲线 相对平缓 相对陡峭

选择哪个工具取决于你的具体需求。如果你只需要验证简单的 DataFrame 数据,Pandera 可能是一个更好的选择。如果你需要验证各种类型的数据,并生成数据文档,Great Expectations 可能更适合你。

5. 在 ETL 流程中应用 Pandera 和 Great Expectations

在 ETL 流程中,你可以将 PanderaGreat Expectations 应用于不同的阶段:

  • 提取阶段: 验证从数据源提取的数据是否符合预期。
  • 转换阶段: 验证数据转换的结果是否正确。
  • 加载阶段: 验证加载到数据目标端的数据是否符合目标端的要求。

一个常见的做法是在 ETL 流程的每个阶段都添加数据验证步骤,以确保数据质量。

例如,你可以使用 Pandera 来验证从 CSV 文件读取的 DataFrame 的数据类型和约束条件。然后,你可以使用 Great Expectations 来验证数据转换的结果是否符合业务规则。最后,你可以使用 Great Expectations 来验证加载到数据库表的数据是否符合表结构的定义。

6. 实战案例:使用 Pandera 和 Great Expectations 构建数据质量 Pipeline

假设我们有一个电商网站的订单数据,我们需要构建一个 ETL 流程来将这些数据加载到数据仓库中。

6.1 数据源

我们的数据源是一个 CSV 文件,包含以下字段:

  • order_id: 订单 ID (int)
  • user_id: 用户 ID (int)
  • product_id: 产品 ID (int)
  • order_date: 订单日期 (datetime)
  • order_amount: 订单金额 (float)

6.2 ETL 流程

我们的 ETL 流程包含以下步骤:

  1. 提取: 从 CSV 文件读取数据。
  2. 转换: 将订单日期转换为指定的格式,并计算订单的总金额。
  3. 加载: 将数据加载到数据仓库的订单表中。

6.3 使用 Pandera 进行数据验证

在提取阶段,我们可以使用 Pandera 来验证从 CSV 文件读取的数据是否符合预期。

import pandas as pd
import pandera as pa
from pandera import Column, Check

class OrderSchema(pa.Schema):
    order_id: Column[int] = Column(checks=pa.Check.greater_than(0))
    user_id: Column[int] = Column(checks=pa.Check.greater_than(0))
    product_id: Column[int] = Column(checks=pa.Check.greater_than(0))
    order_date: Column[pd.Timestamp]
    order_amount: Column[float] = Column(checks=pa.Check.greater_than(0))

    class Config:
        strict = True
        coerce = True

def extract_data(file_path: str) -> pd.DataFrame:
    """从 CSV 文件读取数据,并使用 Pandera 验证数据"""
    try:
        df = pd.read_csv(file_path)
        OrderSchema.validate(df, lazy=True)
        print("数据提取成功,且通过 Pandera 验证!")
        return df
    except pa.errors.SchemaErrors as e:
        print("数据提取失败,Pandera 验证不通过!")
        print(e.failure_cases)
        print(e.formatted_error)
        raise # 抛出异常,中断 ETL 流程
    except Exception as e:
        print(f"数据提取失败,发生其他错误: {e}")
        raise

# 示例用法
file_path = "orders.csv" # 假设CSV文件名为 orders.csv
try:
    orders_df = extract_data(file_path)
except Exception:
    print("数据提取过程失败,请检查数据源。")
    orders_df = None

if orders_df is not None:
    print(orders_df.head())

6.4 使用 Great Expectations 进行数据验证

在转换阶段和加载阶段,我们可以使用 Great Expectations 来验证数据转换的结果是否正确,以及加载到数据仓库的数据是否符合目标端的要求。

import pandas as pd
import great_expectations as gx

def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """转换数据:将订单日期转换为指定格式,并计算订单总金额"""
    df['order_date'] = pd.to_datetime(df['order_date'])
    df['total_amount'] = df['order_amount'] * 1.1 # 假设加上 10% 的税费
    return df

def load_data(df: pd.DataFrame, table_name: str, context: gx.DataContext):
    """加载数据到数据仓库 (这里只是模拟,实际需要连接数据库) 并使用 Great Expectations 验证数据"""
    # 模拟加载到数据库
    print(f"模拟加载数据到表:{table_name}")
    print(df.head())

    # 创建 Great Expectations 上下文 (假设已初始化)
    # context = gx.get_context()

    # 获取或创建 Datasource
    datasource_name = "my_pandas_datasource"  # 替换为你实际的 Datasource 名称
    try:
        datasource = context.get_datasource(datasource_name)
        print(f"已存在 Datasource: {datasource_name}")
    except gx.exceptions.DataContextError:
         datasource = context.sources.add_pandas(name=datasource_name)
         print(f"创建 Datasource: {datasource_name}")

    # 将 DataFrame 转换为 Great Expectations 的 Data Asset
    data_asset_name = "transformed_orders"  # 替换为你实际的 Data Asset 名称
    data_asset = datasource.add_dataframe_asset(name=data_asset_name, dataframe=df)

    # 创建 Batch Request
    batch_request = data_asset.build_batch_request()

    # 获取或创建 Expectation Suite
    expectation_suite_name = "transformed_orders_suite"  # 替换为你实际的 Expectation Suite 名称
    try:
        expectation_suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
        print(f"已存在 Expectation Suite: {expectation_suite_name}")
    except gx.exceptions.DataContextError:
        expectation_suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
        print(f"创建 Expectation Suite: {expectation_suite_name}")

    # 定义 Expectation
    validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite_name=expectation_suite_name,
    )
    validator.expect_column_values_to_not_be_null(column="order_id")
    validator.expect_column_values_to_be_unique(column="order_id")
    validator.expect_column_values_to_be_of_type(column="total_amount", type_="FLOAT")
    validator.expect_column_values_to_be_between(column="total_amount", min_value=0)

    # 验证数据
    checkpoint = context.add_or_update_checkpoint(
        name="my_checkpoint",  # 替换为你实际的 Checkpoint 名称
        validator=validator,
    )

    checkpoint_result = checkpoint.run()

    # 查看验证结果
    if checkpoint_result["success"]:
        print("数据加载成功,且通过 Great Expectations 验证!")
    else:
        print("数据加载失败,Great Expectations 验证不通过!")
        print(checkpoint_result)

    # 可视化验证结果
    context.view_validation_result(checkpoint_result)

# 示例用法
if orders_df is not None:
    transformed_df = transform_data(orders_df)
    context = gx.get_context() # 假设 Great Expectations 项目已初始化
    load_data(transformed_df, "orders_table", context)

这个例子展示了如何在 ETL 流程中使用 PanderaGreat Expectations 来进行数据验证和质量保证。通过在 ETL 流程的每个阶段都添加数据验证步骤,我们可以有效地保证数据的完整性、准确性和一致性。

选择合适的工具,提高数据质量

总而言之,PanderaGreat Expectations 都是强大的数据验证和质量保证工具,可以帮助你构建健壮的 ETL 流程。选择哪个工具取决于你的具体需求。

希望今天的分享能够帮助你更好地理解如何在 Python ETL 框架中使用 PanderaGreat Expectations 进行数据验证和质量保证。谢谢大家!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注