Python高级技术之:`SQLAlchemy`的`subquery`和`CTE`:如何构建复杂的查询。

嘿,大家好!今天咱们来聊聊 SQLAlchemy 里的两个神器:subqueryCTE,这俩玩意儿能帮你构建那些“绕来绕去”的复杂 SQL 查询,让你在数据世界里玩得更溜!

开场白:SQL 为什么需要复杂查询?

想象一下,你是一家电商公司的数据分析师,老板突然拍着桌子说:“我要知道每个月销售额最高的商品是什么,还要列出这些商品的平均价格,以及它们占当月总销售额的比例!”

听到这,你是不是感觉头皮发麻?这需要多个步骤才能完成,光靠简单的 SELECT * FROM table 肯定是不行的。这时候,subqueryCTE 就派上用场了,它们能把复杂的查询拆解成小块,一步一步地得出结果。

第一幕:Subquery(子查询)—— 查询里的“俄罗斯套娃”

Subquery,顾名思义,就是嵌套在另一个查询语句里的查询。你可以把它想象成一个“俄罗斯套娃”,一个查询里面藏着另一个查询。

1. 基本语法

Subquery 可以出现在 SELECT, FROM, WHEREHAVING 子句中。

  • SELECT 中的 Subquery:

    from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.declarative import declarative_base
    from datetime import datetime
    
    # 数据库连接配置
    engine = create_engine('sqlite:///:memory:')  # 使用内存数据库,方便演示
    Base = declarative_base()
    
    # 定义模型
    class Product(Base):
        __tablename__ = 'products'
        id = Column(Integer, primary_key=True)
        name = Column(String)
        price = Column(Float)
        category = Column(String)
    
    class Order(Base):
        __tablename__ = 'orders'
        id = Column(Integer, primary_key=True)
        product_id = Column(Integer)
        quantity = Column(Integer)
        order_date = Column(DateTime)
    
    Base.metadata.create_all(engine)
    
    # 创建 Session
    Session = sessionmaker(bind=engine)
    session = Session()
    
    # 插入一些示例数据
    product1 = Product(name='Laptop', price=1200.00, category='Electronics')
    product2 = Product(name='Mouse', price=25.00, category='Electronics')
    product3 = Product(name='T-Shirt', price=15.00, category='Clothing')
    product4 = Product(name='Jeans', price=60.00, category='Clothing')
    session.add_all([product1, product2, product3, product4])
    session.commit()
    
    order1 = Order(product_id=1, quantity=1, order_date=datetime(2023, 1, 15))
    order2 = Order(product_id=2, quantity=2, order_date=datetime(2023, 1, 20))
    order3 = Order(product_id=3, quantity=3, order_date=datetime(2023, 2, 10))
    order4 = Order(product_id=4, quantity=1, order_date=datetime(2023, 2, 28))
    session.add_all([order1, order2, order3, order4])
    session.commit()
    
    from sqlalchemy import select
    
    # 查询每个产品的价格和该产品价格占所有产品平均价格的比例
    subquery = select(func.avg(Product.price)).scalar_subquery()  # 生成标量子查询
    query = select(Product.name, Product.price, (Product.price / subquery).label("price_ratio"))
    
    result = session.execute(query).all()
    for row in result:
        print(f"Product: {row.name}, Price: {row.price}, Price Ratio: {row.price_ratio:.2f}")

    代码解释:

    • func.avg(Product.price) 计算所有产品的平均价格。
    • .scalar_subquery() 将子查询转换为标量子查询,使其能够返回单个值。
    • (Product.price / subquery).label("price_ratio") 计算每个产品的价格与平均价格的比率,并将其命名为 "price_ratio"。
  • FROM 中的 Subquery (Derived Table):

    # 查询销售额超过 50 的产品名称和总销售额
    subquery = (
        select(Order.product_id, func.sum(Order.quantity * Product.price).label('total_sales'))
        .join(Product, Order.product_id == Product.id)
        .group_by(Order.product_id)
        .having(func.sum(Order.quantity * Product.price) > 50)
        .subquery()
    )
    
    query = select(Product.name, subquery.c.total_sales).join(Product, Product.id == subquery.c.product_id)
    
    result = session.execute(query).all()
    for row in result:
        print(f"Product: {row.name}, Total Sales: {row.total_sales}")

    代码解释:

    • select(...).subquery() 创建一个子查询,它返回产品 ID 和总销售额。
    • .having(func.sum(Order.quantity * Product.price) > 50) 过滤掉总销售额小于等于 50 的产品。
    • 主查询连接 Product 表和子查询,获取产品名称和总销售额。
  • WHERE 中的 Subquery:

    # 查询价格高于平均价格的产品
    subquery = select(func.avg(Product.price)).scalar_subquery()
    query = select(Product.name, Product.price).where(Product.price > subquery)
    
    result = session.execute(query).all()
    for row in result:
        print(f"Product: {row.name}, Price: {row.price}")

    代码解释:

    • 子查询计算所有产品的平均价格。
    • where(Product.price > subquery) 过滤掉价格低于或等于平均价格的产品。

2. Subquery 的类型

  • Scalar Subquery (标量子查询): 返回单个值的子查询。 就像上面的 SELECT 中的 Subquery例子。
  • Row Subquery (行子查询): 返回单行数据的子查询。
  • Column Subquery (列子查询): 返回单列数据的子查询。
  • Table Subquery (表子查询/Derived Table): 返回一个结果集,就像一张表一样,可以被主查询引用。 就像上面的 FROM 中的 Subquery例子。
  • Correlated Subquery (相关子查询): 子查询的执行依赖于外部查询的值。 性能通常较差。

3. Subquery 的优缺点

  • 优点: 结构清晰,易于理解,可以将复杂逻辑分解成小块。
  • 缺点: 性能可能较差,特别是嵌套层数过多时。相关子查询的性能问题尤其突出。

第二幕:CTE (Common Table Expression)—— 查询里的“临时表”

CTE 可以理解为 SQL 查询中的“临时表”,它允许你定义一个命名的子查询,然后在后面的查询中多次引用它。

1. 基本语法

WITH cte_name AS (
    SELECT ...
    FROM ...
    WHERE ...
)
SELECT ...
FROM cte_name
WHERE ...

在 SQLAlchemy 中,可以使用 with_cte() 方法来创建 CTE。

from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, func
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

# 数据库连接配置
engine = create_engine('sqlite:///:memory:')  # 使用内存数据库,方便演示
Base = declarative_base()

# 定义模型
class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Float)
    category = Column(String)

class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    product_id = Column(Integer)
    quantity = Column(Integer)
    order_date = Column(DateTime)

Base.metadata.create_all(engine)

# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

# 插入一些示例数据
product1 = Product(name='Laptop', price=1200.00, category='Electronics')
product2 = Product(name='Mouse', price=25.00, category='Electronics')
product3 = Product(name='T-Shirt', price=15.00, category='Clothing')
product4 = Product(name='Jeans', price=60.00, category='Clothing')
session.add_all([product1, product2, product3, product4])
session.commit()

order1 = Order(product_id=1, quantity=1, order_date=datetime(2023, 1, 15))
order2 = Order(product_id=2, quantity=2, order_date=datetime(2023, 1, 20))
order3 = Order(product_id=3, quantity=3, order_date=datetime(2023, 2, 10))
order4 = Order(product_id=4, quantity=1, order_date=datetime(2023, 2, 28))
session.add_all([order1, order2, order3, order4])
session.commit()

from sqlalchemy import select

# 查找每个类别中最贵的产品
subquery = (
    select(
        Product.category,
        func.max(Product.price).label('max_price')
    ).group_by(Product.category).cte('max_prices') # 定义 CTE
)

query = select(
    Product.name,
    Product.price,
    Product.category
).join(
    subquery,
    (Product.category == subquery.c.category) & (Product.price == subquery.c.max_price)
)

results = session.execute(query).all()

for row in results:
    print(f"Product: {row.name}, Price: {row.price}, Category: {row.category}")

代码解释:

  • select(...).group_by(...).cte('max_prices') 创建一个名为 max_prices 的 CTE,它返回每个类别的最高价格。
  • join(subquery, ...)Product 表和 max_prices CTE 连接起来,找到每个类别中价格等于最高价格的产品。

2. CTE 的类型

  • Non-Recursive CTE (非递归 CTE): 就像上面这个例子,只执行一次。
  • Recursive CTE (递归 CTE): 用于处理具有层级关系的数据,例如组织结构、树形结构等。 这个比较高级,我们稍后再说。

3. CTE 的优缺点

  • 优点:

    • 提高查询的可读性和可维护性。
    • 允许在单个查询中多次引用同一个子查询,避免重复计算。
    • 可以用于递归查询,处理层级数据。
    • 理论上,数据库可以更好地优化 CTE,因为它可以更好地理解查询的结构。
  • 缺点:

    • 某些情况下,性能可能不如直接使用子查询,具体取决于数据库的优化器。

第三幕:Subquery vs. CTE:选哪个?

既然 subqueryCTE 都能实现复杂查询,那我们到底该选哪个呢?这取决于具体情况。

特性 Subquery CTE
可读性 对于简单的查询,可读性尚可;但嵌套层数过多时,可读性较差。 可读性更好,特别是对于复杂的查询,可以将查询分解成逻辑块。
可维护性 维护性相对较差,修改嵌套的子查询比较困难。 维护性更好,可以独立修改 CTE,而不会影响到整个查询。
重用性 只能在定义它的查询中使用一次。 可以在单个查询中多次引用。
递归查询 不支持。 支持递归查询,可以处理层级数据。
性能 某些情况下,性能可能较差,特别是相关子查询。 理论上,数据库可以更好地优化 CTE,但实际效果取决于数据库的优化器。
使用场景 简单的、只需要使用一次的子查询。 复杂的、需要多次引用子查询、或者需要进行递归查询的场景。
SQLAlchemy 使用 select(...).subquery() 创建。 使用 select(...).cte('cte_name') 创建。

一般来说:

  • 如果查询比较简单,只需要使用一次子查询,那么 subquery 就可以了。
  • 如果查询比较复杂,需要多次引用同一个子查询,或者需要进行递归查询,那么 CTE 是更好的选择。

第四幕:Recursive CTE (递归 CTE)—— 征服层级数据

Recursive CTE 是一种特殊的 CTE,它可以递归地引用自身,用于处理具有层级关系的数据。

场景: 假设我们有一个 Employee 表,记录了员工的 ID、姓名和上级领导的 ID。我们要查询某个员工的所有下属。

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 数据库连接配置
engine = create_engine('sqlite:///:memory:')  # 使用内存数据库,方便演示
Base = declarative_base()

# 定义模型
class Employee(Base):
    __tablename__ = 'employees'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    manager_id = Column(Integer)  # 上级领导的 ID

Base.metadata.create_all(engine)

# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

# 插入一些示例数据
employee1 = Employee(name='Alice', manager_id=None)  # Alice 是 CEO
employee2 = Employee(name='Bob', manager_id=1)  # Bob 的上级是 Alice
employee3 = Employee(name='Charlie', manager_id=1)  # Charlie 的上级是 Alice
employee4 = Employee(name='David', manager_id=2)  # David 的上级是 Bob
employee5 = Employee(name='Eve', manager_id=2)  # Eve 的上级是 Bob
session.add_all([employee1, employee2, employee3, employee4, employee5])
session.commit()

from sqlalchemy import select, and_

# 查找 Alice 的所有下属
def find_subordinates(employee_name):
    EmployeeAlias = Employee.__table__.alias()

    # 定义递归 CTE
    subordinates_cte = (
        select(
            Employee.id,
            Employee.name,
            Employee.manager_id
        ).where(Employee.name == employee_name) # 初始查询,找到 Alice
        .union_all(  # 使用 union_all 连接递归查询
            select(
                Employee.id,
                Employee.name,
                Employee.manager_id
            ).join(
                EmployeeAlias, Employee.manager_id == EmployeeAlias.c.id
            ).where(EmployeeAlias.c.name == employee_name)
        )
        .cte("subordinates", recursive=True)
    )

    # 执行查询
    query = select(subordinates_cte.c.name).where(subordinates_cte.c.name != employee_name)
    results = session.execute(query).all()

    for row in results:
        print(f"Subordinate: {row.name}")

find_subordinates('Alice')

代码解释:

  1. 定义递归 CTE:

    • subordinates_cte = select(...).where(Employee.name == employee_name).union_all(...) 创建一个名为 subordinates 的递归 CTE。 recursive=True 必须设置。
  2. 初始查询 (Anchor Member):

    • select(...).where(Employee.name == employee_name) 是初始查询,它找到 Alice。
  3. 递归查询 (Recursive Member):

    • select(...).join(EmployeeAlias, Employee.manager_id == EmployeeAlias.c.id) 是递归查询,它找到所有上级是 Alice 的员工。
    • union_all 将初始查询和递归查询的结果连接起来。
  4. 递归过程:

    • 递归查询会一直执行,直到找不到新的下属为止。
  5. 执行查询:

    • query = select(subordinates_cte.c.name) 查询 CTE 中的所有员工的姓名。

第五幕:优化建议

  • 避免过度嵌套: 尽量减少 subqueryCTE 的嵌套层数,避免查询过于复杂。
  • 使用索引: 确保相关的列上有索引,以提高查询性能。
  • 分析查询计划: 使用数据库的查询计划工具,分析查询的执行效率,找出性能瓶颈。
  • 考虑物化视图: 对于复杂的查询,可以考虑使用物化视图,将结果预先计算好并存储起来,提高查询速度。
  • 合理使用 CTE: CTE 可以提高可读性,但并非总是性能最佳。 在某些情况下,直接使用子查询可能会更快。 需要根据实际情况进行测试和比较。

总结陈词

SubqueryCTE 是 SQLAlchemy 中构建复杂查询的利器。 掌握它们,你就能轻松应对各种数据分析任务,成为数据世界的“超级英雄”! 记住,没有银弹,选择哪个取决于你的具体场景和需求。 多实践,多思考,你就能熟练运用它们,写出高效、易懂的 SQL 查询!

希望今天的讲座对大家有所帮助! 下次再见!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注