Python高级技术之:`Python`的`Repository`模式:如何抽象数据访问层。

各位靓仔靓女,晚上好!

今天咱们来聊聊Python里的Repository模式,这玩意儿听起来高大上,其实就是把数据访问这块儿给好好收拾收拾,让代码更干净、更好维护。可以理解为,你不想直接跟数据库打交道,你只想跟一个“仓库管理员”说:“给我拿个用户数据!”

为什么要搞这个Repository模式?

设想一下,你写了一个电商网站,用户管理模块里,你需要从数据库里读取用户数据,更新用户数据。代码可能长这样:

import sqlite3

def get_user(user_id):
    conn = sqlite3.connect('database.db')
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
    user = cursor.fetchone()
    conn.close()
    return user

def update_user_email(user_id, new_email):
    conn = sqlite3.connect('database.db')
    cursor = conn.cursor()
    cursor.execute("UPDATE users SET email = ? WHERE id = ?", (new_email, user_id))
    conn.commit()
    conn.close()

# 使用
user = get_user(1)
print(user)
update_user_email(1, "[email protected]")

这段代码能跑,没毛病。但是,如果你的项目越来越大,到处都是这种直接操作数据库的代码,那你就惨了。

  • 代码重复: 每次都要写连接数据库、执行SQL、关闭连接的代码。
  • 难以测试: 没法轻易地mock数据库,测试起来很麻烦。
  • 耦合度高: 业务逻辑和数据访问逻辑混在一起,改个数据库类型,整个模块都要改。
  • 违反单一职责原则: 函数既负责业务逻辑,又负责数据访问,累不累啊?

Repository模式就是来解决这些问题的,它把数据访问逻辑抽象出来,让你的业务代码只跟Repository打交道,而不用关心Repository背后是怎么操作数据库的。

Repository模式长啥样?

Repository模式的核心思想是创建一个抽象层,这个抽象层充当你的应用代码和数据访问层之间的中介。 通常,Repository会定义一个接口,这个接口声明了你的应用需要的各种数据访问方法(比如 get_user()update_user())。然后,你可以创建一个或多个实现这个接口的具体Repository类,每个类对应一种特定的数据存储方式(比如 SQLite、PostgreSQL、MySQL、甚至内存)。

基本结构:

  1. Repository 接口: 定义操作数据的抽象方法。
  2. 具体 Repository 实现: 实现接口,封装具体的数据库操作。
  3. 数据模型: 定义数据的结构。

上代码!

咱们来用Python实现一个简单的Repository模式。

1. 数据模型 (User)

class User:
    def __init__(self, id, name, email):
        self.id = id
        self.name = name
        self.email = email

    def __repr__(self):
        return f"User(id={self.id}, name='{self.name}', email='{self.email}')"

2. Repository 接口 (UserRepository)

from abc import ABC, abstractmethod
from typing import List, Optional

class UserRepository(ABC):
    @abstractmethod
    def get_user(self, user_id: int) -> Optional[User]:
        pass

    @abstractmethod
    def get_all_users(self) -> List[User]:
        pass

    @abstractmethod
    def add_user(self, user: User) -> None:
        pass

    @abstractmethod
    def update_user(self, user: User) -> None:
        pass

    @abstractmethod
    def delete_user(self, user_id: int) -> None:
        pass

3. 具体 Repository 实现 (SqliteUserRepository)

import sqlite3

class SqliteUserRepository(UserRepository):
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._create_table()  # 确保表存在

    def _create_table(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT NOT NULL
            )
        """)
        conn.commit()
        conn.close()

    def get_user(self, user_id: int) -> Optional[User]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        row = cursor.fetchone()
        conn.close()
        if row:
            return User(id=row[0], name=row[1], email=row[2])
        return None

    def get_all_users(self) -> List[User]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users")
        rows = cursor.fetchall()
        conn.close()
        return [User(id=row[0], name=row[1], email=row[2]) for row in rows]

    def add_user(self, user: User) -> None:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
                       (user.id, user.name, user.email))
        conn.commit()
        conn.close()

    def update_user(self, user: User) -> None:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("UPDATE users SET name = ?, email = ? WHERE id = ?",
                       (user.name, user.email, user.id))
        conn.commit()
        conn.close()

    def delete_user(self, user_id: int) -> None:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
        conn.commit()
        conn.close()

4. 使用 Repository

# 初始化 Repository
user_repository = SqliteUserRepository('my_database.db')

# 添加用户
new_user = User(id=1, name='张三', email='[email protected]')
user_repository.add_user(new_user)

# 获取用户
user = user_repository.get_user(1)
print(user)

# 更新用户
if user:
    user.email = '[email protected]'
    user_repository.update_user(user)

# 获取所有用户
all_users = user_repository.get_all_users()
print(all_users)

# 删除用户
user_repository.delete_user(1)

Repository模式的优势

  1. 解耦: 业务逻辑和数据访问逻辑分离,降低耦合度。
  2. 可测试性: 可以轻松地mock Repository,进行单元测试。
  3. 可维护性: 修改数据访问方式,不会影响业务代码。
  4. 代码复用: Repository可以在多个地方复用。
  5. 更好的抽象: 将数据访问的细节隐藏起来,提供更清晰的接口给业务逻辑使用。

进阶话题

  • 依赖注入 (Dependency Injection): 使用依赖注入框架(例如 injectordependency_injector)来管理 Repository 实例,进一步降低耦合度。
  • Unit of Work模式: 结合 Unit of Work 模式,实现事务管理,保证数据一致性。
  • 泛型 Repository: 使用泛型来创建通用的 Repository,减少重复代码。
  • 使用ORM (Object-Relational Mapper): 结合 SQLAlchemy 或 Django ORM 等 ORM 框架,简化数据库操作。

依赖注入示例 (使用 injector 库)

首先安装 injector:

pip install injector

然后,修改代码:

import injector

# 定义一个配置类
class AppModule(injector.Module):
    def configure(self, binder):
        binder.bind(UserRepository, SqliteUserRepository(db_path='my_database.db'))

# 定义一个需要 UserRepository 的类
class UserService:
    @injector.inject
    def __init__(self, user_repository: UserRepository):
        self.user_repository = user_repository

    def get_user_email(self, user_id: int) -> Optional[str]:
        user = self.user_repository.get_user(user_id)
        if user:
            return user.email
        return None

# 创建一个 Injector 实例
injector_ = injector.Injector([AppModule()])

# 获取 UserService 实例,Injector 会自动注入 UserRepository
user_service = injector_.get(UserService)

# 使用 UserService
email = user_service.get_user_email(1)
print(email)

在这个例子中,AppModule 告诉 injector 如何创建 UserRepository 的实例。UserService 通过 @injector.inject 装饰器声明它需要一个 UserRepository 的实例。injector 会自动创建 SqliteUserRepository 的实例并注入到 UserService 中。

Unit of Work 示例

import sqlite3

class UnitOfWork:
    def __init__(self, db_path):
        self.db_path = db_path
        self.conn = sqlite3.connect(self.db_path)
        self.cursor = self.conn.cursor()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self.conn.rollback()
        else:
            self.conn.commit()
        self.conn.close()

class SqliteUserRepository(UserRepository):
    def __init__(self, unit_of_work: UnitOfWork):
        self.unit_of_work = unit_of_work
        self._create_table()  # 确保表存在

    def _create_table(self):
        cursor = self.unit_of_work.cursor
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT NOT NULL
            )
        """)

    def get_user(self, user_id: int) -> Optional[User]:
        cursor = self.unit_of_work.cursor
        cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        row = cursor.fetchone()
        if row:
            return User(id=row[0], name=row[1], email=row[2])
        return None

    def get_all_users(self) -> List[User]:
        cursor = self.unit_of_work.cursor
        cursor.execute("SELECT * FROM users")
        rows = cursor.fetchall()
        return [User(id=row[0], name=row[1], email=row[2]) for row in rows]

    def add_user(self, user: User) -> None:
        cursor = self.unit_of_work.cursor
        cursor.execute("INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
                       (user.id, user.name, user.email))

    def update_user(self, user: User) -> None:
        cursor = self.unit_of_work.cursor
        cursor.execute("UPDATE users SET name = ?, email = ? WHERE id = ?",
                       (user.name, user.email, user.id))

    def delete_user(self, user_id: int) -> None:
        cursor = self.unit_of_work.cursor
        cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))

# 使用 Unit of Work
with UnitOfWork('my_database.db') as uow:
    user_repository = SqliteUserRepository(uow)
    new_user = User(id=2, name='李四', email='[email protected]')
    user_repository.add_user(new_user)
    # 假设这里发生了错误
    # raise Exception("模拟错误")  # 如果取消注释,则会回滚事务
    user = user_repository.get_user(2)
    print(user) # 如果没有发生错误,则会打印用户信息

在这个例子中,UnitOfWork 负责管理数据库连接和事务。SqliteUserRepository 使用 UnitOfWork 提供的 cursor 执行数据库操作。 with UnitOfWork(...) as uow: 语句确保事务在完成时提交,在发生错误时回滚。

结合ORM(以SQLAlchemy为例)

首先安装SQLAlchemy

pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.ext.declarative import declarative_base
from typing import List, Optional

# 定义数据模型
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)

    def __repr__(self):
        return f"User(id={self.id}, name='{self.name}', email='{self.email}')"

# Repository 接口
from abc import ABC, abstractmethod

class UserRepository(ABC):
    @abstractmethod
    def get_user(self, user_id: int) -> Optional[User]:
        pass

    @abstractmethod
    def get_all_users(self) -> List[User]:
        pass

    @abstractmethod
    def add_user(self, user: User) -> None:
        pass

    @abstractmethod
    def update_user(self, user: User) -> None:
        pass

    @abstractmethod
    def delete_user(self, user_id: int) -> None:
        pass

# 具体 Repository 实现
class SQLAlchemyUserRepository(UserRepository):
    def __init__(self, session):
        self.session = session

    def get_user(self, user_id: int) -> Optional[User]:
        return self.session.query(User).filter(User.id == user_id).first()

    def get_all_users(self) -> List[User]:
        return self.session.query(User).all()

    def add_user(self, user: User) -> None:
        self.session.add(user)
        self.session.commit()

    def update_user(self, user: User) -> None:
        self.session.commit() # SQLAlchemy会自动检测变化并更新

    def delete_user(self, user_id: int) -> None:
        user = self.get_user(user_id)
        if user:
            self.session.delete(user)
            self.session.commit()

# 使用
engine = create_engine('sqlite:///my_database.db')  # 使用SQLite
Base.metadata.create_all(engine)  # 创建表
Session = sessionmaker(bind=engine)
session = Session()

user_repository = SQLAlchemyUserRepository(session)

# 添加用户
new_user = User(name='王五', email='[email protected]')
user_repository.add_user(new_user)

# 获取用户
user = user_repository.get_user(1)
print(user)

# 更新用户
if user:
    user.email = '[email protected]'
    user_repository.update_user(user)

# 获取所有用户
all_users = user_repository.get_all_users()
print(all_users)

# 删除用户
user_repository.delete_user(1)

session.close()

总结

Repository模式是一种非常有用的设计模式,可以帮助你更好地组织和管理数据访问代码。 它提高了代码的可测试性、可维护性和可复用性。 结合依赖注入、Unit of Work 和 ORM 框架,可以进一步提升代码质量。

当然,Repository模式也不是银弹。 在一些简单的项目中,过度使用Repository模式可能会增加代码的复杂性。 需要根据实际情况权衡利弊。

最后,希望大家能够掌握Repository模式,写出更优雅、更健壮的Python代码! 下课!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注