Python高级技术之:`SQLAlchemy`的`Alembic`:如何进行数据库`Schema`的迁移。

各位观众老爷,晚上好!我是今天的主讲人,很高兴能在这里和大家聊聊Python高级技术中的一个重要组成部分—— SQLAlchemy 的 Alembic,以及它如何帮助我们优雅地进行数据库 Schema 的迁移。

咱们今天的主题是数据库 Schema 迁移,这玩意儿听起来高大上,实际上就是数据库结构的升级和变更。想象一下,你的 App 从 1.0 升级到 2.0,数据库表里要加几个字段,删几个索引,甚至整个表都要重构,这时候,Alembic 就派上大用场了。

一、 为什么要用 Alembic?

直接上手改数据库结构?也不是不行,但风险太大。手动改容易出错,而且一旦出错,回滚起来非常麻烦,甚至可能导致数据丢失。想象一下,半夜三更被叫起来修复数据库,那种感觉,啧啧…

Alembic 就像数据库的 Git,它可以:

  • 版本控制: 记录每次数据库 Schema 的变更,方便追溯和回滚。
  • 自动化迁移: 自动生成迁移脚本,只需简单命令即可完成数据库升级。
  • 团队协作: 方便团队成员之间同步数据库 Schema,避免冲突。

简单来说,用了 Alembic,你就可以像管理代码一样管理数据库 Schema,告别手动修改的痛苦,拥抱自动化迁移的便捷。

二、 Alembic 的基本概念

在深入代码之前,咱们先来了解一下 Alembic 的几个核心概念:

  • Migration Repository (迁移仓库): 存放所有迁移脚本的目录。
  • Migration Script (迁移脚本): 包含数据库 Schema 变更的具体操作,例如创建表、添加字段等。
  • Revision (修订版本): 迁移脚本的版本号,Alembic 通过修订版本来跟踪数据库 Schema 的演变。
  • Head (头): 当前数据库 Schema 的最新修订版本。
  • Version Table (版本表): 数据库中用于记录当前 Schema 版本的表,通常命名为 alembic_version

三、 Alembic 的安装和配置

废话不多说,咱们直接上手。首先,安装 Alembic:

pip install alembic

安装完成后,初始化 Alembic 仓库:

alembic init alembic

这会在当前目录下创建一个名为 alembic 的目录,里面包含了 Alembic 的配置文件 alembic.ini 和迁移脚本存放目录 alembic/versions

接下来,修改 alembic.ini 文件,配置数据库连接信息:

sqlalchemy.url = postgresql://user:password@host:port/database

这里的 sqlalchemy.url 是 SQLAlchemy 的数据库连接字符串,根据你的数据库类型和连接方式进行修改。

此外,还需要配置 version_tableversion_table_pk 两个参数,分别指定版本表的名称和主键列名:

version_table = alembic_version
version_table_pk = version_num

四、 创建第一个迁移脚本

配置完成后,就可以创建第一个迁移脚本了。使用 alembic revision 命令创建一个新的迁移脚本:

alembic revision -m "Create user table"

-m 参数用于指定迁移脚本的描述信息,方便日后查看。

执行上述命令后,Alembic 会在 alembic/versions 目录下生成一个新的 Python 文件,文件名类似于 xxxxxxxxxxxx_create_user_table.py

打开这个文件,你会看到 upgrade()downgrade() 两个函数。upgrade() 函数用于执行数据库 Schema 升级操作,downgrade() 函数用于执行回滚操作。

咱们来创建一个简单的用户表:

# alembic/versions/xxxxxxxxxxxx_create_user_table.py

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer, primary_key=True),
        sa.Column('username', sa.String(50), nullable=False),
        sa.Column('email', sa.String(100), nullable=False, unique=True),
        sa.Column('password', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime, server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime, server_default=sa.func.now(), onupdate=sa.func.now())
    )

def downgrade():
    op.drop_table('users')

upgrade() 函数使用 op.create_table() 创建了一个名为 users 的表,包含了 id、username、email、password 等字段。downgrade() 函数使用 op.drop_table() 删除了 users 表。

五、 执行和回滚迁移

编写完迁移脚本后,就可以执行迁移了。使用 alembic upgrade head 命令将数据库 Schema 升级到最新版本:

alembic upgrade head

执行上述命令后,Alembic 会读取 alembic/versions 目录下的所有迁移脚本,并依次执行 upgrade() 函数,最终将数据库 Schema 升级到最新版本。

如果需要回滚到之前的版本,可以使用 alembic downgrade <revision> 命令:

alembic downgrade 1234567890ab

这里的 <revision> 是要回滚到的修订版本号。执行上述命令后,Alembic 会执行指定版本之后的所有迁移脚本的 downgrade() 函数,将数据库 Schema 回滚到指定版本。

六、 常用 Alembic 命令

为了方便大家使用,我整理了一些常用的 Alembic 命令:

命令 描述
alembic init <directory> 初始化 Alembic 仓库,<directory> 是仓库目录名。
alembic revision -m "<message>" 创建一个新的迁移脚本,-m 参数用于指定描述信息。
alembic upgrade head 将数据库 Schema 升级到最新版本。
alembic downgrade <revision> 将数据库 Schema 回滚到指定版本,<revision> 是要回滚到的修订版本号。
alembic history 查看迁移历史记录。
alembic current 查看当前数据库 Schema 的修订版本。
alembic stamp head 将数据库 Schema 标记为最新版本,但不执行任何迁移操作。通常用于在已存在的数据库上使用 Alembic。
alembic show <revision> 显示指定修订版本的迁移脚本内容。

七、 高级用法

Alembic 除了基本用法外,还有很多高级用法,例如:

  • 自动生成迁移脚本: Alembic 可以自动比较 SQLAlchemy 模型和数据库 Schema,生成迁移脚本,大大简化了迁移过程。
  • 多数据库支持: Alembic 可以同时管理多个数据库的 Schema 迁移。
  • 自定义迁移操作: 可以编写自定义的迁移操作,例如执行 SQL 语句、导入数据等。

1. 自动生成迁移脚本

要使用自动生成迁移脚本的功能,需要配置 alembic.ini 文件中的 sqlalchemy.urlscript_location 参数,以及在 env.py 文件中配置 SQLAlchemy 的 metadata。

首先,定义 SQLAlchemy 模型:

# models.py

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), nullable=False)
    email = Column(String(100), nullable=False, unique=True)
    password = Column(String(255), nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

然后,修改 env.py 文件:

# alembic/env.py

import os
import sys
from logging.config import fileConfig

from sqlalchemy import create_engine
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
sys.path.append(os.getcwd())  # Add the current directory to the Python path
from models import Base  # Import your SQLAlchemy models
target_metadata = Base.metadata

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the script
    directly to the console.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.
    """
    connectable = create_engine(config.get_main_option("sqlalchemy.url"))

    with connectable.connect() as connection:
        context.configure(
            connection=connection, target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

关键在于 target_metadata = Base.metadata 这一行,它将 SQLAlchemy 模型的 metadata 赋值给 target_metadata 变量,Alembic 就可以通过它来比较模型和数据库 Schema。

现在,可以执行 alembic revision --autogenerate -m "Add user table" 命令来自动生成迁移脚本:

alembic revision --autogenerate -m "Add user table"

Alembic 会自动比较 SQLAlchemy 模型和数据库 Schema,生成创建 users 表的迁移脚本。

2. 多数据库支持

Alembic 可以通过配置多个数据库连接字符串来支持多数据库迁移。需要在 alembic.ini 文件中配置多个 sqlalchemy.url 参数,并在迁移脚本中使用不同的连接字符串。

例如,配置两个数据库连接:

sqlalchemy.url = postgresql://user:password@host:port/database1
sqlalchemy.url2 = mysql://user:password@host:port/database2

然后在迁移脚本中使用 op.execute() 函数执行 SQL 语句,指定连接字符串:

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.execute("CREATE TABLE users (id INT PRIMARY KEY, username VARCHAR(50))", execution_options={"url": "sqlalchemy.url"})
    op.execute("CREATE TABLE products (id INT PRIMARY KEY, name VARCHAR(50))", execution_options={"url": "sqlalchemy.url2"})

def downgrade():
    op.execute("DROP TABLE users", execution_options={"url": "sqlalchemy.url"})
    op.execute("DROP TABLE products", execution_options={"url": "sqlalchemy.url2"})

3. 自定义迁移操作

Alembic 允许编写自定义的迁移操作,例如执行 SQL 语句、导入数据等。可以使用 op.execute() 函数执行 SQL 语句,或者使用 op.get_bind() 函数获取数据库连接,然后执行自定义操作。

例如,执行 SQL 语句:

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.execute("ALTER TABLE users ADD COLUMN age INT")

def downgrade():
    op.execute("ALTER TABLE users DROP COLUMN age")

或者,导入数据:

from alembic import op
import sqlalchemy as sa
import csv

def upgrade():
    connection = op.get_bind()
    with open("data.csv", "r") as f:
        reader = csv.reader(f)
        next(reader)  # Skip header row
        for row in reader:
            connection.execute(
                "INSERT INTO users (id, username, email) VALUES (%s, %s, %s)",
                row
            )

def downgrade():
    op.execute("DELETE FROM users")

八、 最佳实践

  • 保持迁移脚本的原子性: 每个迁移脚本只包含一个逻辑上的变更,方便回滚。
  • 编写完善的 downgrade() 函数: 确保每个迁移脚本都有对应的 downgrade() 函数,以便回滚到之前的版本。
  • 使用事务: 在迁移脚本中使用事务,确保所有操作要么全部成功,要么全部失败。
  • 测试迁移脚本: 在生产环境之前,务必在测试环境测试迁移脚本,确保没有错误。
  • 定期备份数据库: 以防万一,定期备份数据库,以便在出现问题时可以恢复数据。

九、 总结

Alembic 是一个强大的数据库 Schema 迁移工具,它可以帮助我们自动化管理数据库 Schema 的变更,提高开发效率,降低出错风险。掌握 Alembic 的基本概念和用法,对于 Python Web 开发人员来说至关重要。

希望今天的讲座能帮助大家更好地理解和使用 Alembic。记住,熟能生巧,多练习,多实践,才能真正掌握这门技术。

最后,祝大家编程愉快,Bug 远离!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注