各位靓仔靓女,晚上好!
今天咱们来聊聊Python里的Repository模式,这玩意儿听起来高大上,其实就是把数据访问这块儿给好好收拾收拾,让代码更干净、更好维护。可以理解为,你不想直接跟数据库打交道,你只想跟一个“仓库管理员”说:“给我拿个用户数据!”
为什么要搞这个Repository模式?
设想一下,你写了一个电商网站,用户管理模块里,你需要从数据库里读取用户数据,更新用户数据。代码可能长这样:
import sqlite3
def get_user(user_id):
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
conn.close()
return user
def update_user_email(user_id, new_email):
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute("UPDATE users SET email = ? WHERE id = ?", (new_email, user_id))
conn.commit()
conn.close()
# 使用
user = get_user(1)
print(user)
update_user_email(1, "[email protected]")
这段代码能跑,没毛病。但是,如果你的项目越来越大,到处都是这种直接操作数据库的代码,那你就惨了。
- 代码重复: 每次都要写连接数据库、执行SQL、关闭连接的代码。
- 难以测试: 没法轻易地mock数据库,测试起来很麻烦。
- 耦合度高: 业务逻辑和数据访问逻辑混在一起,改个数据库类型,整个模块都要改。
- 违反单一职责原则: 函数既负责业务逻辑,又负责数据访问,累不累啊?
Repository模式就是来解决这些问题的,它把数据访问逻辑抽象出来,让你的业务代码只跟Repository打交道,而不用关心Repository背后是怎么操作数据库的。
Repository模式长啥样?
Repository模式的核心思想是创建一个抽象层,这个抽象层充当你的应用代码和数据访问层之间的中介。 通常,Repository会定义一个接口,这个接口声明了你的应用需要的各种数据访问方法(比如 get_user()
、update_user()
)。然后,你可以创建一个或多个实现这个接口的具体Repository类,每个类对应一种特定的数据存储方式(比如 SQLite、PostgreSQL、MySQL、甚至内存)。
基本结构:
- Repository 接口: 定义操作数据的抽象方法。
- 具体 Repository 实现: 实现接口,封装具体的数据库操作。
- 数据模型: 定义数据的结构。
上代码!
咱们来用Python实现一个简单的Repository模式。
1. 数据模型 (User)
class User:
def __init__(self, id, name, email):
self.id = id
self.name = name
self.email = email
def __repr__(self):
return f"User(id={self.id}, name='{self.name}', email='{self.email}')"
2. Repository 接口 (UserRepository)
from abc import ABC, abstractmethod
from typing import List, Optional
class UserRepository(ABC):
@abstractmethod
def get_user(self, user_id: int) -> Optional[User]:
pass
@abstractmethod
def get_all_users(self) -> List[User]:
pass
@abstractmethod
def add_user(self, user: User) -> None:
pass
@abstractmethod
def update_user(self, user: User) -> None:
pass
@abstractmethod
def delete_user(self, user_id: int) -> None:
pass
3. 具体 Repository 实现 (SqliteUserRepository)
import sqlite3
class SqliteUserRepository(UserRepository):
def __init__(self, db_path: str):
self.db_path = db_path
self._create_table() # 确保表存在
def _create_table(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def get_user(self, user_id: int) -> Optional[User]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
conn.close()
if row:
return User(id=row[0], name=row[1], email=row[2])
return None
def get_all_users(self) -> List[User]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
conn.close()
return [User(id=row[0], name=row[1], email=row[2]) for row in rows]
def add_user(self, user: User) -> None:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
(user.id, user.name, user.email))
conn.commit()
conn.close()
def update_user(self, user: User) -> None:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("UPDATE users SET name = ?, email = ? WHERE id = ?",
(user.name, user.email, user.id))
conn.commit()
conn.close()
def delete_user(self, user_id: int) -> None:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.commit()
conn.close()
4. 使用 Repository
# 初始化 Repository
user_repository = SqliteUserRepository('my_database.db')
# 添加用户
new_user = User(id=1, name='张三', email='[email protected]')
user_repository.add_user(new_user)
# 获取用户
user = user_repository.get_user(1)
print(user)
# 更新用户
if user:
user.email = '[email protected]'
user_repository.update_user(user)
# 获取所有用户
all_users = user_repository.get_all_users()
print(all_users)
# 删除用户
user_repository.delete_user(1)
Repository模式的优势
- 解耦: 业务逻辑和数据访问逻辑分离,降低耦合度。
- 可测试性: 可以轻松地mock Repository,进行单元测试。
- 可维护性: 修改数据访问方式,不会影响业务代码。
- 代码复用: Repository可以在多个地方复用。
- 更好的抽象: 将数据访问的细节隐藏起来,提供更清晰的接口给业务逻辑使用。
进阶话题
- 依赖注入 (Dependency Injection): 使用依赖注入框架(例如
injector
或dependency_injector
)来管理 Repository 实例,进一步降低耦合度。 - Unit of Work模式: 结合 Unit of Work 模式,实现事务管理,保证数据一致性。
- 泛型 Repository: 使用泛型来创建通用的 Repository,减少重复代码。
- 使用ORM (Object-Relational Mapper): 结合 SQLAlchemy 或 Django ORM 等 ORM 框架,简化数据库操作。
依赖注入示例 (使用 injector
库)
首先安装 injector
:
pip install injector
然后,修改代码:
import injector
# 定义一个配置类
class AppModule(injector.Module):
def configure(self, binder):
binder.bind(UserRepository, SqliteUserRepository(db_path='my_database.db'))
# 定义一个需要 UserRepository 的类
class UserService:
@injector.inject
def __init__(self, user_repository: UserRepository):
self.user_repository = user_repository
def get_user_email(self, user_id: int) -> Optional[str]:
user = self.user_repository.get_user(user_id)
if user:
return user.email
return None
# 创建一个 Injector 实例
injector_ = injector.Injector([AppModule()])
# 获取 UserService 实例,Injector 会自动注入 UserRepository
user_service = injector_.get(UserService)
# 使用 UserService
email = user_service.get_user_email(1)
print(email)
在这个例子中,AppModule
告诉 injector
如何创建 UserRepository
的实例。UserService
通过 @injector.inject
装饰器声明它需要一个 UserRepository
的实例。injector
会自动创建 SqliteUserRepository
的实例并注入到 UserService
中。
Unit of Work 示例
import sqlite3
class UnitOfWork:
def __init__(self, db_path):
self.db_path = db_path
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.conn.close()
class SqliteUserRepository(UserRepository):
def __init__(self, unit_of_work: UnitOfWork):
self.unit_of_work = unit_of_work
self._create_table() # 确保表存在
def _create_table(self):
cursor = self.unit_of_work.cursor
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL
)
""")
def get_user(self, user_id: int) -> Optional[User]:
cursor = self.unit_of_work.cursor
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
if row:
return User(id=row[0], name=row[1], email=row[2])
return None
def get_all_users(self) -> List[User]:
cursor = self.unit_of_work.cursor
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
return [User(id=row[0], name=row[1], email=row[2]) for row in rows]
def add_user(self, user: User) -> None:
cursor = self.unit_of_work.cursor
cursor.execute("INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
(user.id, user.name, user.email))
def update_user(self, user: User) -> None:
cursor = self.unit_of_work.cursor
cursor.execute("UPDATE users SET name = ?, email = ? WHERE id = ?",
(user.name, user.email, user.id))
def delete_user(self, user_id: int) -> None:
cursor = self.unit_of_work.cursor
cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
# 使用 Unit of Work
with UnitOfWork('my_database.db') as uow:
user_repository = SqliteUserRepository(uow)
new_user = User(id=2, name='李四', email='[email protected]')
user_repository.add_user(new_user)
# 假设这里发生了错误
# raise Exception("模拟错误") # 如果取消注释,则会回滚事务
user = user_repository.get_user(2)
print(user) # 如果没有发生错误,则会打印用户信息
在这个例子中,UnitOfWork
负责管理数据库连接和事务。SqliteUserRepository
使用 UnitOfWork
提供的 cursor
执行数据库操作。 with UnitOfWork(...) as uow:
语句确保事务在完成时提交,在发生错误时回滚。
结合ORM(以SQLAlchemy为例)
首先安装SQLAlchemy
pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.ext.declarative import declarative_base
from typing import List, Optional
# 定义数据模型
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
def __repr__(self):
return f"User(id={self.id}, name='{self.name}', email='{self.email}')"
# Repository 接口
from abc import ABC, abstractmethod
class UserRepository(ABC):
@abstractmethod
def get_user(self, user_id: int) -> Optional[User]:
pass
@abstractmethod
def get_all_users(self) -> List[User]:
pass
@abstractmethod
def add_user(self, user: User) -> None:
pass
@abstractmethod
def update_user(self, user: User) -> None:
pass
@abstractmethod
def delete_user(self, user_id: int) -> None:
pass
# 具体 Repository 实现
class SQLAlchemyUserRepository(UserRepository):
def __init__(self, session):
self.session = session
def get_user(self, user_id: int) -> Optional[User]:
return self.session.query(User).filter(User.id == user_id).first()
def get_all_users(self) -> List[User]:
return self.session.query(User).all()
def add_user(self, user: User) -> None:
self.session.add(user)
self.session.commit()
def update_user(self, user: User) -> None:
self.session.commit() # SQLAlchemy会自动检测变化并更新
def delete_user(self, user_id: int) -> None:
user = self.get_user(user_id)
if user:
self.session.delete(user)
self.session.commit()
# 使用
engine = create_engine('sqlite:///my_database.db') # 使用SQLite
Base.metadata.create_all(engine) # 创建表
Session = sessionmaker(bind=engine)
session = Session()
user_repository = SQLAlchemyUserRepository(session)
# 添加用户
new_user = User(name='王五', email='[email protected]')
user_repository.add_user(new_user)
# 获取用户
user = user_repository.get_user(1)
print(user)
# 更新用户
if user:
user.email = '[email protected]'
user_repository.update_user(user)
# 获取所有用户
all_users = user_repository.get_all_users()
print(all_users)
# 删除用户
user_repository.delete_user(1)
session.close()
总结
Repository模式是一种非常有用的设计模式,可以帮助你更好地组织和管理数据访问代码。 它提高了代码的可测试性、可维护性和可复用性。 结合依赖注入、Unit of Work 和 ORM 框架,可以进一步提升代码质量。
当然,Repository模式也不是银弹。 在一些简单的项目中,过度使用Repository模式可能会增加代码的复杂性。 需要根据实际情况权衡利弊。
最后,希望大家能够掌握Repository模式,写出更优雅、更健壮的Python代码! 下课!