SQLAlchemy Core API:构建 SQL 表达式与数据库抽象层

好的,各位小伙伴,欢迎来到今天的“SQLAlchemy Core API:构建 SQL 表达式与数据库抽象层”专场!今天咱们不讲虚的,直接上手,用最接地气的方式,把 SQLAlchemy Core API 这玩意儿给它扒个底朝天。

一、啥是 SQLAlchemy Core API?为啥要学它?

首先,咱们得搞清楚 SQLAlchemy 这大家伙,它其实是个超级厉害的 Python SQL 工具包,分两层:

  • SQLAlchemy Core: 负责构建 SQL 表达式,让你像搭积木一样拼 SQL 语句,然后直接跟数据库对话。
  • SQLAlchemy ORM: 在 Core 的基础上,加了一层对象关系映射(ORM),可以把数据库表映射成 Python 类,操作对象就像操作数据库,更爽!

今天咱们先啃 Core 这块硬骨头。为啥要学 Core 呢?原因很简单:

  1. 灵活性爆炸: Core 让你完全掌控 SQL 语句,想怎么写就怎么写,没有 ORM 的条条框框。
  2. 性能更可控: ORM 帮你做了很多事情,但也可能带来性能损耗。Core 让你直接写 SQL,可以针对特定场景做优化。
  3. ORM 的基石: 理解了 Core,才能更好地理解 ORM 的底层原理,用好 ORM。

简单来说,Core 就是内功,ORM 是招式,内功深厚,招式才能更厉害!

二、Core API 的核心概念:元数据、表、列、表达式

Core API 的世界里,有几个核心概念必须搞清楚:

  • 元数据 (MetaData): 相当于一个数据库的“目录”,里面记录了所有表的信息。
  • 表 (Table): 就是数据库里的表,包含表名、列信息等。
  • 列 (Column): 表里的每一列,有列名、数据类型、约束等。
  • 表达式 (Expression): 各种 SQL 操作,比如 SELECT, WHERE, ORDER BY 等,都可以用表达式来表示。

咱们先来写个简单的例子,创建元数据和表:

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String

# 创建数据库引擎,连接到 SQLite 数据库(也可以是 MySQL, PostgreSQL 等)
engine = create_engine('sqlite:///:memory:')  # 使用内存数据库,方便测试

# 创建元数据对象
metadata = MetaData()

# 创建表对象
users_table = Table(
    'users',  # 表名
    metadata,  # 所属的元数据对象
    Column('id', Integer, primary_key=True),  # 列:id,整数类型,主键
    Column('name', String(50)),  # 列:name,字符串类型,最大长度 50
    Column('age', Integer)  # 列:age,整数类型
)

# 在数据库中创建表
metadata.create_all(engine)

这段代码干了啥?

  1. create_engine('sqlite:///:memory:') 创建了一个数据库引擎,连接到内存中的 SQLite 数据库。
  2. MetaData() 创建了一个元数据对象,用来管理数据库的结构信息。
  3. Table(...) 创建了一个表对象,指定了表名、列信息等。
  4. metadata.create_all(engine) 根据元数据中定义的表结构,在数据库中创建表。

现在,咱们的数据库里就多了一个 users 表,包含 id, name, age 三列。

三、用表达式构建 SQL 语句:增删改查

有了表,接下来就是增删改查了。Core API 提供了一套强大的表达式系统,可以让你像搭积木一样拼 SQL 语句。

1. 插入数据 (INSERT)

from sqlalchemy import insert

# 构建 INSERT 语句
insert_stmt = insert(users_table).values(name='Alice', age=30)

# 执行 INSERT 语句
with engine.connect() as connection:
    result = connection.execute(insert_stmt)
    connection.commit()  # 提交事务
    print(result.inserted_primary_key) # 输出插入数据的主键

这段代码:

  1. insert(users_table).values(name='Alice', age=30) 构建了一个 INSERT 语句,向 users 表插入一条记录,name 为 ‘Alice’, age 为 30。
  2. engine.connect() 获取一个数据库连接。
  3. connection.execute(insert_stmt) 执行 INSERT 语句。
  4. connection.commit() 提交事务,将数据写入数据库。

2. 查询数据 (SELECT)

from sqlalchemy import select

# 构建 SELECT 语句
select_stmt = select(users_table).where(users_table.c.age > 25)

# 执行 SELECT 语句
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)

这段代码:

  1. select(users_table).where(users_table.c.age > 25) 构建了一个 SELECT 语句,查询 users 表中 age 大于 25 的所有记录。 users_table.c.age 表示 users 表的 age 列。
  2. connection.execute(select_stmt) 执行 SELECT 语句。
  3. for row in result: 遍历查询结果,row 是一个元组,包含每一列的值。

3. 更新数据 (UPDATE)

from sqlalchemy import update

# 构建 UPDATE 语句
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=35)

# 执行 UPDATE 语句
with engine.connect() as connection:
    result = connection.execute(update_stmt)
    connection.commit()
    print(result.rowcount) # 输出受影响的行数

这段代码:

  1. update(users_table).where(users_table.c.name == 'Alice').values(age=35) 构建了一个 UPDATE 语句,将 users 表中 name 为 ‘Alice’ 的记录的 age 更新为 35。
  2. connection.execute(update_stmt) 执行 UPDATE 语句。
  3. connection.commit() 提交事务。

4. 删除数据 (DELETE)

from sqlalchemy import delete

# 构建 DELETE 语句
delete_stmt = delete(users_table).where(users_table.c.age > 32)

# 执行 DELETE 语句
with engine.connect() as connection:
    result = connection.execute(delete_stmt)
    connection.commit()
    print(result.rowcount) # 输出受影响的行数

这段代码:

  1. delete(users_table).where(users_table.c.age > 32) 构建了一个 DELETE 语句,删除 users 表中 age 大于 32 的所有记录。
  2. connection.execute(delete_stmt) 执行 DELETE 语句。
  3. connection.commit() 提交事务。

四、高级表达式:函数、连接、子查询

Core API 提供的表达式系统非常强大,可以构建各种复杂的 SQL 语句。

1. 使用函数

from sqlalchemy import func

# 构建 SELECT 语句,使用 func.count() 函数
select_stmt = select(func.count()).select_from(users_table)

# 执行 SELECT 语句
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    count = result.scalar()  # 获取单个标量值
    print(count)

这段代码:

  1. func.count() 表示 SQL 的 COUNT() 函数。
  2. select(func.count()).select_from(users_table) 构建了一个 SELECT 语句,查询 users 表的记录总数。
  3. result.scalar() 获取查询结果中的单个标量值,也就是记录总数。

2. 表连接 (JOIN)

假设我们还有一个 orders 表,记录用户的订单信息:

from sqlalchemy import ForeignKey

# 创建 orders 表
orders_table = Table(
    'orders',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('user_id', Integer, ForeignKey('users.id')),  # 外键,关联到 users 表的 id 列
    Column('product_name', String(50))
)

metadata.create_all(engine)

现在,我们可以使用 JOIN 查询用户的订单信息:

# 构建 JOIN 查询
select_stmt = select(users_table.c.name, orders_table.c.product_name).join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id)

# 执行 JOIN 查询
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)  # 输出 (user_name, product_name)

这段代码:

  1. ForeignKey('users.id') 定义了一个外键,将 orders 表的 user_id 列关联到 users 表的 id 列。
  2. join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id) 构建了一个 JOIN 查询,连接 users 表和 orders 表,连接条件是 users.id == orders.user_id

3. 子查询 (Subquery)

# 构建子查询
subquery = select(users_table.c.id).where(users_table.c.age > 28).scalar_subquery()

# 构建主查询,使用子查询
select_stmt = select(orders_table).where(orders_table.c.user_id.in_(subquery))

# 执行查询
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)

这段代码:

  1. scalar_subquery() 将一个 SELECT 语句转换为一个标量子查询,也就是返回单个值的子查询。
  2. orders_table.c.user_id.in_(subquery) 表示 user_id 列的值在子查询的结果集中。

五、事务处理:保证数据一致性

在数据库操作中,事务是非常重要的,可以保证数据的一致性。Core API 提供了事务处理的支持。

# 使用 try...except...finally 块处理事务
with engine.connect() as connection:
    transaction = connection.begin()  # 开启事务
    try:
        # 执行一系列数据库操作
        insert_stmt = insert(users_table).values(name='Bob', age=40)
        connection.execute(insert_stmt)

        update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=36)
        connection.execute(update_stmt)

        transaction.commit()  # 提交事务
        print("事务提交成功")
    except Exception as e:
        transaction.rollback()  # 回滚事务
        print("事务回滚", e)
    finally:
        transaction.close()  # 关闭事务

这段代码:

  1. connection.begin() 开启一个事务。
  2. 如果在 try 块中的数据库操作都成功执行,则 transaction.commit() 提交事务,将所有修改写入数据库。
  3. 如果在 try 块中发生异常,则 transaction.rollback() 回滚事务,撤销所有修改,保证数据的一致性。
  4. transaction.close() 关闭事务。

六、绑定参数:防止 SQL 注入

在构建 SQL 语句时,最好使用绑定参数,可以防止 SQL 注入攻击。

# 使用绑定参数
select_stmt = select(users_table).where(users_table.c.name == :name)

# 执行查询,传入参数
with engine.connect() as connection:
    result = connection.execute(select_stmt, {"name": "Alice"}) # 传入参数
    for row in result:
        print(row)

这段代码:

  1. users_table.c.name == :name 使用 :name 作为参数占位符。
  2. connection.execute(select_stmt, {"name": "Alice"}) 执行查询,并将参数 {"name": "Alice"} 传递给数据库。

七、总结:Core API 的优势与局限

Core API 提供了强大的 SQL 表达式系统,可以灵活地构建各种 SQL 语句。它的优势在于:

  • 灵活性: 可以完全掌控 SQL 语句。
  • 性能: 可以针对特定场景做优化。
  • 底层: 理解 ORM 的基础。

但是,Core API 也有一些局限性:

  • 代码量: 需要手动编写 SQL 语句,代码量较大。
  • 学习成本: 需要熟悉 SQL 语法和 SQLAlchemy 的表达式系统。

总的来说,Core API 适合对 SQL 有深入了解,需要对性能进行优化的场景。如果对 SQL 不熟悉,或者需要快速开发,可以使用 ORM。

八、代码示例汇总

为了方便大家查阅,这里把所有的代码示例汇总一下:

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, ForeignKey, insert, select, update, delete, func

# 创建数据库引擎
engine = create_engine('sqlite:///:memory:')

# 创建元数据对象
metadata = MetaData()

# 创建 users 表
users_table = Table(
    'users',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('age', Integer)
)

# 创建 orders 表
orders_table = Table(
    'orders',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('user_id', Integer, ForeignKey('users.id')),
    Column('product_name', String(50))
)

# 在数据库中创建表
metadata.create_all(engine)

# 插入数据
insert_stmt = insert(users_table).values(name='Alice', age=30)
with engine.connect() as connection:
    result = connection.execute(insert_stmt)
    connection.commit()
    print(result.inserted_primary_key)

# 查询数据
select_stmt = select(users_table).where(users_table.c.age > 25)
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)

# 更新数据
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=35)
with engine.connect() as connection:
    result = connection.execute(update_stmt)
    connection.commit()
    print(result.rowcount)

# 删除数据
delete_stmt = delete(users_table).where(users_table.c.age > 32)
with engine.connect() as connection:
    result = connection.execute(delete_stmt)
    connection.commit()
    print(result.rowcount)

# 使用函数
select_stmt = select(func.count()).select_from(users_table)
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    count = result.scalar()
    print(count)

# 表连接
select_stmt = select(users_table.c.name, orders_table.c.product_name).join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id)
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)

# 子查询
subquery = select(users_table.c.id).where(users_table.c.age > 28).scalar_subquery()
select_stmt = select(orders_table).where(orders_table.c.user_id.in_(subquery))
with engine.connect() as connection:
    result = connection.execute(select_stmt)
    for row in result:
        print(row)

# 事务处理
with engine.connect() as connection:
    transaction = connection.begin()
    try:
        insert_stmt = insert(users_table).values(name='Bob', age=40)
        connection.execute(insert_stmt)

        update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=36)
        connection.execute(update_stmt)

        transaction.commit()
        print("事务提交成功")
    except Exception as e:
        transaction.rollback()
        print("事务回滚", e)
    finally:
        transaction.close()

# 绑定参数
select_stmt = select(users_table).where(users_table.c.name == :name)
with engine.connect() as connection:
    result = connection.execute(select_stmt, {"name": "Alice"})
    for row in result:
        print(row)

九、练习题

最后,给大家留几个练习题,巩固一下今天所学的内容:

  1. 创建一个 products 表,包含 id, name, price 三列,然后插入几条记录。
  2. 查询 products 表中 price 大于 100 的所有记录。
  3. 更新 products 表中 name 为 ‘iPhone’ 的记录的 price 为 999.99。
  4. 删除 products 表中 price 小于 50 的所有记录。
  5. 使用 JOIN 查询 users 表和 orders 表,显示用户名和订单总数。

好了,今天的 SQLAlchemy Core API 专场就到这里了。希望大家通过今天的学习,对 Core API 有了更深入的了解。记住,多练习,多实践,才能真正掌握这门技术!下次咱们再聊 ORM! 拜拜!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注