好的,各位小伙伴,欢迎来到今天的“SQLAlchemy Core API:构建 SQL 表达式与数据库抽象层”专场!今天咱们不讲虚的,直接上手,用最接地气的方式,把 SQLAlchemy Core API 这玩意儿给它扒个底朝天。
一、啥是 SQLAlchemy Core API?为啥要学它?
首先,咱们得搞清楚 SQLAlchemy 这大家伙,它其实是个超级厉害的 Python SQL 工具包,分两层:
- SQLAlchemy Core: 负责构建 SQL 表达式,让你像搭积木一样拼 SQL 语句,然后直接跟数据库对话。
- SQLAlchemy ORM: 在 Core 的基础上,加了一层对象关系映射(ORM),可以把数据库表映射成 Python 类,操作对象就像操作数据库,更爽!
今天咱们先啃 Core 这块硬骨头。为啥要学 Core 呢?原因很简单:
- 灵活性爆炸: Core 让你完全掌控 SQL 语句,想怎么写就怎么写,没有 ORM 的条条框框。
- 性能更可控: ORM 帮你做了很多事情,但也可能带来性能损耗。Core 让你直接写 SQL,可以针对特定场景做优化。
- ORM 的基石: 理解了 Core,才能更好地理解 ORM 的底层原理,用好 ORM。
简单来说,Core 就是内功,ORM 是招式,内功深厚,招式才能更厉害!
二、Core API 的核心概念:元数据、表、列、表达式
Core API 的世界里,有几个核心概念必须搞清楚:
- 元数据 (MetaData): 相当于一个数据库的“目录”,里面记录了所有表的信息。
- 表 (Table): 就是数据库里的表,包含表名、列信息等。
- 列 (Column): 表里的每一列,有列名、数据类型、约束等。
- 表达式 (Expression): 各种 SQL 操作,比如
SELECT
,WHERE
,ORDER BY
等,都可以用表达式来表示。
咱们先来写个简单的例子,创建元数据和表:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
# 创建数据库引擎,连接到 SQLite 数据库(也可以是 MySQL, PostgreSQL 等)
engine = create_engine('sqlite:///:memory:') # 使用内存数据库,方便测试
# 创建元数据对象
metadata = MetaData()
# 创建表对象
users_table = Table(
'users', # 表名
metadata, # 所属的元数据对象
Column('id', Integer, primary_key=True), # 列:id,整数类型,主键
Column('name', String(50)), # 列:name,字符串类型,最大长度 50
Column('age', Integer) # 列:age,整数类型
)
# 在数据库中创建表
metadata.create_all(engine)
这段代码干了啥?
create_engine('sqlite:///:memory:')
创建了一个数据库引擎,连接到内存中的 SQLite 数据库。MetaData()
创建了一个元数据对象,用来管理数据库的结构信息。Table(...)
创建了一个表对象,指定了表名、列信息等。metadata.create_all(engine)
根据元数据中定义的表结构,在数据库中创建表。
现在,咱们的数据库里就多了一个 users
表,包含 id
, name
, age
三列。
三、用表达式构建 SQL 语句:增删改查
有了表,接下来就是增删改查了。Core API 提供了一套强大的表达式系统,可以让你像搭积木一样拼 SQL 语句。
1. 插入数据 (INSERT)
from sqlalchemy import insert
# 构建 INSERT 语句
insert_stmt = insert(users_table).values(name='Alice', age=30)
# 执行 INSERT 语句
with engine.connect() as connection:
result = connection.execute(insert_stmt)
connection.commit() # 提交事务
print(result.inserted_primary_key) # 输出插入数据的主键
这段代码:
insert(users_table).values(name='Alice', age=30)
构建了一个 INSERT 语句,向users
表插入一条记录,name
为 ‘Alice’,age
为 30。engine.connect()
获取一个数据库连接。connection.execute(insert_stmt)
执行 INSERT 语句。connection.commit()
提交事务,将数据写入数据库。
2. 查询数据 (SELECT)
from sqlalchemy import select
# 构建 SELECT 语句
select_stmt = select(users_table).where(users_table.c.age > 25)
# 执行 SELECT 语句
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row)
这段代码:
select(users_table).where(users_table.c.age > 25)
构建了一个 SELECT 语句,查询users
表中age
大于 25 的所有记录。users_table.c.age
表示users
表的age
列。connection.execute(select_stmt)
执行 SELECT 语句。for row in result:
遍历查询结果,row
是一个元组,包含每一列的值。
3. 更新数据 (UPDATE)
from sqlalchemy import update
# 构建 UPDATE 语句
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=35)
# 执行 UPDATE 语句
with engine.connect() as connection:
result = connection.execute(update_stmt)
connection.commit()
print(result.rowcount) # 输出受影响的行数
这段代码:
update(users_table).where(users_table.c.name == 'Alice').values(age=35)
构建了一个 UPDATE 语句,将users
表中name
为 ‘Alice’ 的记录的age
更新为 35。connection.execute(update_stmt)
执行 UPDATE 语句。connection.commit()
提交事务。
4. 删除数据 (DELETE)
from sqlalchemy import delete
# 构建 DELETE 语句
delete_stmt = delete(users_table).where(users_table.c.age > 32)
# 执行 DELETE 语句
with engine.connect() as connection:
result = connection.execute(delete_stmt)
connection.commit()
print(result.rowcount) # 输出受影响的行数
这段代码:
delete(users_table).where(users_table.c.age > 32)
构建了一个 DELETE 语句,删除users
表中age
大于 32 的所有记录。connection.execute(delete_stmt)
执行 DELETE 语句。connection.commit()
提交事务。
四、高级表达式:函数、连接、子查询
Core API 提供的表达式系统非常强大,可以构建各种复杂的 SQL 语句。
1. 使用函数
from sqlalchemy import func
# 构建 SELECT 语句,使用 func.count() 函数
select_stmt = select(func.count()).select_from(users_table)
# 执行 SELECT 语句
with engine.connect() as connection:
result = connection.execute(select_stmt)
count = result.scalar() # 获取单个标量值
print(count)
这段代码:
func.count()
表示 SQL 的COUNT()
函数。select(func.count()).select_from(users_table)
构建了一个 SELECT 语句,查询users
表的记录总数。result.scalar()
获取查询结果中的单个标量值,也就是记录总数。
2. 表连接 (JOIN)
假设我们还有一个 orders
表,记录用户的订单信息:
from sqlalchemy import ForeignKey
# 创建 orders 表
orders_table = Table(
'orders',
metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.id')), # 外键,关联到 users 表的 id 列
Column('product_name', String(50))
)
metadata.create_all(engine)
现在,我们可以使用 JOIN 查询用户的订单信息:
# 构建 JOIN 查询
select_stmt = select(users_table.c.name, orders_table.c.product_name).join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id)
# 执行 JOIN 查询
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row) # 输出 (user_name, product_name)
这段代码:
ForeignKey('users.id')
定义了一个外键,将orders
表的user_id
列关联到users
表的id
列。join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id)
构建了一个 JOIN 查询,连接users
表和orders
表,连接条件是users.id == orders.user_id
。
3. 子查询 (Subquery)
# 构建子查询
subquery = select(users_table.c.id).where(users_table.c.age > 28).scalar_subquery()
# 构建主查询,使用子查询
select_stmt = select(orders_table).where(orders_table.c.user_id.in_(subquery))
# 执行查询
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row)
这段代码:
scalar_subquery()
将一个 SELECT 语句转换为一个标量子查询,也就是返回单个值的子查询。orders_table.c.user_id.in_(subquery)
表示user_id
列的值在子查询的结果集中。
五、事务处理:保证数据一致性
在数据库操作中,事务是非常重要的,可以保证数据的一致性。Core API 提供了事务处理的支持。
# 使用 try...except...finally 块处理事务
with engine.connect() as connection:
transaction = connection.begin() # 开启事务
try:
# 执行一系列数据库操作
insert_stmt = insert(users_table).values(name='Bob', age=40)
connection.execute(insert_stmt)
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=36)
connection.execute(update_stmt)
transaction.commit() # 提交事务
print("事务提交成功")
except Exception as e:
transaction.rollback() # 回滚事务
print("事务回滚", e)
finally:
transaction.close() # 关闭事务
这段代码:
connection.begin()
开启一个事务。- 如果在
try
块中的数据库操作都成功执行,则transaction.commit()
提交事务,将所有修改写入数据库。 - 如果在
try
块中发生异常,则transaction.rollback()
回滚事务,撤销所有修改,保证数据的一致性。 transaction.close()
关闭事务。
六、绑定参数:防止 SQL 注入
在构建 SQL 语句时,最好使用绑定参数,可以防止 SQL 注入攻击。
# 使用绑定参数
select_stmt = select(users_table).where(users_table.c.name == :name)
# 执行查询,传入参数
with engine.connect() as connection:
result = connection.execute(select_stmt, {"name": "Alice"}) # 传入参数
for row in result:
print(row)
这段代码:
users_table.c.name == :name
使用:name
作为参数占位符。connection.execute(select_stmt, {"name": "Alice"})
执行查询,并将参数{"name": "Alice"}
传递给数据库。
七、总结:Core API 的优势与局限
Core API 提供了强大的 SQL 表达式系统,可以灵活地构建各种 SQL 语句。它的优势在于:
- 灵活性: 可以完全掌控 SQL 语句。
- 性能: 可以针对特定场景做优化。
- 底层: 理解 ORM 的基础。
但是,Core API 也有一些局限性:
- 代码量: 需要手动编写 SQL 语句,代码量较大。
- 学习成本: 需要熟悉 SQL 语法和 SQLAlchemy 的表达式系统。
总的来说,Core API 适合对 SQL 有深入了解,需要对性能进行优化的场景。如果对 SQL 不熟悉,或者需要快速开发,可以使用 ORM。
八、代码示例汇总
为了方便大家查阅,这里把所有的代码示例汇总一下:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, ForeignKey, insert, select, update, delete, func
# 创建数据库引擎
engine = create_engine('sqlite:///:memory:')
# 创建元数据对象
metadata = MetaData()
# 创建 users 表
users_table = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('age', Integer)
)
# 创建 orders 表
orders_table = Table(
'orders',
metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.id')),
Column('product_name', String(50))
)
# 在数据库中创建表
metadata.create_all(engine)
# 插入数据
insert_stmt = insert(users_table).values(name='Alice', age=30)
with engine.connect() as connection:
result = connection.execute(insert_stmt)
connection.commit()
print(result.inserted_primary_key)
# 查询数据
select_stmt = select(users_table).where(users_table.c.age > 25)
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row)
# 更新数据
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=35)
with engine.connect() as connection:
result = connection.execute(update_stmt)
connection.commit()
print(result.rowcount)
# 删除数据
delete_stmt = delete(users_table).where(users_table.c.age > 32)
with engine.connect() as connection:
result = connection.execute(delete_stmt)
connection.commit()
print(result.rowcount)
# 使用函数
select_stmt = select(func.count()).select_from(users_table)
with engine.connect() as connection:
result = connection.execute(select_stmt)
count = result.scalar()
print(count)
# 表连接
select_stmt = select(users_table.c.name, orders_table.c.product_name).join_from(users_table, orders_table, users_table.c.id == orders_table.c.user_id)
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row)
# 子查询
subquery = select(users_table.c.id).where(users_table.c.age > 28).scalar_subquery()
select_stmt = select(orders_table).where(orders_table.c.user_id.in_(subquery))
with engine.connect() as connection:
result = connection.execute(select_stmt)
for row in result:
print(row)
# 事务处理
with engine.connect() as connection:
transaction = connection.begin()
try:
insert_stmt = insert(users_table).values(name='Bob', age=40)
connection.execute(insert_stmt)
update_stmt = update(users_table).where(users_table.c.name == 'Alice').values(age=36)
connection.execute(update_stmt)
transaction.commit()
print("事务提交成功")
except Exception as e:
transaction.rollback()
print("事务回滚", e)
finally:
transaction.close()
# 绑定参数
select_stmt = select(users_table).where(users_table.c.name == :name)
with engine.connect() as connection:
result = connection.execute(select_stmt, {"name": "Alice"})
for row in result:
print(row)
九、练习题
最后,给大家留几个练习题,巩固一下今天所学的内容:
- 创建一个
products
表,包含id
,name
,price
三列,然后插入几条记录。 - 查询
products
表中price
大于 100 的所有记录。 - 更新
products
表中name
为 ‘iPhone’ 的记录的price
为 999.99。 - 删除
products
表中price
小于 50 的所有记录。 - 使用 JOIN 查询
users
表和orders
表,显示用户名和订单总数。
好了,今天的 SQLAlchemy Core API 专场就到这里了。希望大家通过今天的学习,对 Core API 有了更深入的了解。记住,多练习,多实践,才能真正掌握这门技术!下次咱们再聊 ORM! 拜拜!