如何使用`Psycopg2`或`PyMySQL`进行`数据库`的`连接`和`操作`。

好的,下面我将以讲座的模式,详细讲解如何使用 Psycopg2PyMySQL 进行数据库的连接和操作。

数据库连接与操作:Psycopg2 和 PyMySQL 实战

大家好,今天我们来深入探讨 Python 中连接和操作数据库的两个常用库:Psycopg2PyMySQLPsycopg2 用于连接 PostgreSQL 数据库,而 PyMySQL 则用于连接 MySQL 数据库。我们将详细讲解如何安装、连接、执行 SQL 语句、处理结果以及进行一些高级操作。

1. 环境准备与安装

首先,我们需要确保已经安装了 Python 和相应的数据库(PostgreSQL 或 MySQL)。然后,通过 pip 安装所需的库:

# 安装 psycopg2
pip install psycopg2-binary

# 安装 PyMySQL
pip install pymysql
  • psycopg2-binarypsycopg2 的一个简化版本,它包含了预编译的二进制文件,安装更加方便。在生产环境中,建议使用完整的 psycopg2 包,它提供了更多的功能和更好的性能。

2. Psycopg2:连接 PostgreSQL

2.1 建立连接

连接 PostgreSQL 数据库是使用 Psycopg2 的第一步。我们需要提供数据库的主机名、数据库名、用户名和密码。

import psycopg2

# 数据库连接信息
db_host = "localhost"
db_name = "mydatabase"
db_user = "myuser"
db_password = "mypassword"
db_port = "5432" # postgresql 默认端口是5432

try:
    # 建立连接
    conn = psycopg2.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port
    )
    print("成功连接到 PostgreSQL 数据库!")

except psycopg2.Error as e:
    print(f"连接 PostgreSQL 数据库失败:{e}")
    exit()

# 关闭连接 (稍后会演示关闭,这里先建立连接)

2.2 创建游标

游标 (Cursor) 用于执行 SQL 语句和获取结果。

try:
    # 创建游标对象
    cur = conn.cursor()
    print("成功创建游标对象!")

except psycopg2.Error as e:
    print(f"创建游标失败:{e}")
    conn.close()  # 连接创建成功,但是游标创建失败,需要关闭连接
    exit()

2.3 执行 SQL 语句

我们可以使用游标执行各种 SQL 语句,包括 SELECTINSERTUPDATEDELETE

try:
    # 执行 SELECT 语句
    cur.execute("SELECT version();")  # 获取 PostgreSQL 版本信息

    # 获取查询结果
    version = cur.fetchone()[0]
    print(f"PostgreSQL 版本:{version}")

except psycopg2.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback() # 如果执行失败,回滚事务。
    cur.close()
    conn.close()
    exit()

2.4 获取查询结果

Psycopg2 提供了多种方法来获取查询结果:

  • fetchone(): 获取下一行结果。
  • fetchall(): 获取所有结果。
  • fetchmany(size): 获取指定数量的结果。
try:
    # 创建表
    cur.execute("""
        CREATE TABLE IF NOT EXISTS employees (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            salary DECIMAL(10, 2)
        )
    """)

    # 插入数据
    cur.execute("""
        INSERT INTO employees (name, salary) VALUES
        ('Alice', 50000.00),
        ('Bob', 60000.00),
        ('Charlie', 70000.00)
    """)

    # 查询数据
    cur.execute("SELECT * FROM employees")

    # 获取所有结果
    employees = cur.fetchall()

    # 打印结果
    for employee in employees:
        print(employee)

except psycopg2.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()

2.5 参数化查询

为了防止 SQL 注入攻击,应该使用参数化查询。

try:
    # 参数化查询
    name = "David"
    salary = 80000.00

    cur.execute("INSERT INTO employees (name, salary) VALUES (%s, %s)", (name, salary))

    # 查询特定员工
    cur.execute("SELECT * FROM employees WHERE name = %s", (name,))
    david = cur.fetchone()
    print(f"David 的信息:{david}")
except psycopg2.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()

2.6 事务处理

Psycopg2 默认开启事务。这意味着,除非显式提交,否则所有的更改都会被回滚。

try:
    # 提交事务
    conn.commit()
    print("事务提交成功!")

except psycopg2.Error as e:
    print(f"事务提交失败:{e}")
    conn.rollback() # 回滚事务,确保数据一致性
    cur.close()
    conn.close()
    exit()
finally:
    # 关闭游标和连接
    if cur:
        cur.close()
        print("游标已关闭")
    if conn:
        conn.close()
        print("数据库连接已关闭")

3. PyMySQL:连接 MySQL

3.1 建立连接

连接 MySQL 数据库与连接 PostgreSQL 类似,需要提供主机名、数据库名、用户名和密码。

import pymysql

# 数据库连接信息
db_host = "localhost"
db_name = "mydatabase"
db_user = "myuser"
db_password = "mypassword"
db_port = 3306 # MySQL 默认端口是3306

try:
    # 建立连接
    conn = pymysql.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port,
        charset='utf8mb4', # 设置字符集
        cursorclass=pymysql.cursors.DictCursor # 返回字典类型结果
    )
    print("成功连接到 MySQL 数据库!")

except pymysql.Error as e:
    print(f"连接 MySQL 数据库失败:{e}")
    exit()

# 关闭连接 (稍后会演示关闭,这里先建立连接)
  • charset='utf8mb4':设置字符集为 utf8mb4,支持存储 Unicode 字符。
  • cursorclass=pymysql.cursors.DictCursor:设置游标类型为 DictCursor,这样获取的结果将以字典的形式返回,方便操作。

3.2 创建游标

try:
    # 创建游标对象
    cur = conn.cursor()
    print("成功创建游标对象!")

except pymysql.Error as e:
    print(f"创建游标失败:{e}")
    conn.close()
    exit()

3.3 执行 SQL 语句

try:
    # 执行 SELECT 语句
    cur.execute("SELECT version()")

    # 获取查询结果
    version = cur.fetchone()[0]
    print(f"MySQL 版本:{version}")

except pymysql.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()

3.4 获取查询结果

Psycopg2 类似,PyMySQL 也提供了 fetchone()fetchall()fetchmany(size) 方法来获取查询结果。

try:
    # 创建表
    cur.execute("""
        CREATE TABLE IF NOT EXISTS employees (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            salary DECIMAL(10, 2)
        )
    """)

    # 插入数据
    cur.execute("""
        INSERT INTO employees (name, salary) VALUES
        ('Alice', 50000.00),
        ('Bob', 60000.00),
        ('Charlie', 70000.00)
    """)

    # 查询数据
    cur.execute("SELECT * FROM employees")

    # 获取所有结果
    employees = cur.fetchall()

    # 打印结果
    for employee in employees:
        print(employee)

except pymysql.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()

3.5 参数化查询

try:
    # 参数化查询
    name = "David"
    salary = 80000.00

    cur.execute("INSERT INTO employees (name, salary) VALUES (%s, %s)", (name, salary))

    # 查询特定员工
    cur.execute("SELECT * FROM employees WHERE name = %s", (name,))
    david = cur.fetchone()
    print(f"David 的信息:{david}")

except pymysql.Error as e:
    print(f"执行 SQL 语句失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()

3.6 事务处理

Psycopg2 不同,PyMySQL 默认不开启事务。需要显式地开启事务。

try:
    # 开启事务
    conn.begin()

    # 执行多个 SQL 语句
    cur.execute("UPDATE employees SET salary = salary * 1.1 WHERE name = 'Alice'")
    cur.execute("UPDATE employees SET salary = salary * 1.2 WHERE name = 'Bob'")

    # 提交事务
    conn.commit()
    print("事务提交成功!")

except pymysql.Error as e:
    print(f"事务提交失败:{e}")
    conn.rollback()
    cur.close()
    conn.close()
    exit()
finally:
    # 关闭游标和连接
    if cur:
        cur.close()
        print("游标已关闭")
    if conn:
        conn.close()
        print("数据库连接已关闭")

4. 错误处理

在数据库操作中,错误处理至关重要。我们需要捕获可能发生的异常,并进行相应的处理,例如回滚事务、关闭连接等。

try:
    # 数据库操作
    pass # 数据库操作代码
except (psycopg2.Error, pymysql.Error) as e:
    print(f"数据库操作出错:{e}")
    # 回滚事务
    if conn:
        conn.rollback()
    # 关闭游标和连接
    if cur:
        cur.close()
    if conn:
        conn.close()
    exit()

5. 代码示例:完整的 PostgreSQL 操作

import psycopg2

# 数据库连接信息
db_host = "localhost"
db_name = "mydatabase"
db_user = "myuser"
db_password = "mypassword"
db_port = "5432"

conn = None
cur = None

try:
    # 建立连接
    conn = psycopg2.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port
    )
    print("成功连接到 PostgreSQL 数据库!")

    # 创建游标
    cur = conn.cursor()
    print("成功创建游标对象!")

    # 创建表
    cur.execute("""
        CREATE TABLE IF NOT EXISTS employees (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            salary DECIMAL(10, 2)
        )
    """)

    # 插入数据
    cur.execute("""
        INSERT INTO employees (name, salary) VALUES
        ('Alice', 50000.00),
        ('Bob', 60000.00),
        ('Charlie', 70000.00)
    """)

    # 参数化查询
    name = "David"
    salary = 80000.00
    cur.execute("INSERT INTO employees (name, salary) VALUES (%s, %s)", (name, salary))

    # 查询数据
    cur.execute("SELECT * FROM employees")
    employees = cur.fetchall()
    print("所有员工信息:")
    for employee in employees:
        print(employee)

    # 提交事务
    conn.commit()
    print("事务提交成功!")

except psycopg2.Error as e:
    print(f"数据库操作出错:{e}")
    if conn:
        conn.rollback()  # 回滚事务

finally:
    # 关闭游标和连接
    if cur:
        cur.close()
        print("游标已关闭")
    if conn:
        conn.close()
        print("数据库连接已关闭")

6. 代码示例:完整的 MySQL 操作

import pymysql

# 数据库连接信息
db_host = "localhost"
db_name = "mydatabase"
db_user = "myuser"
db_password = "mypassword"
db_port = 3306

conn = None
cur = None

try:
    # 建立连接
    conn = pymysql.connect(
        host=db_host,
        database=db_name,
        user=db_user,
        password=db_password,
        port=db_port,
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    print("成功连接到 MySQL 数据库!")

    # 创建游标
    cur = conn.cursor()
    print("成功创建游标对象!")

    # 创建表
    cur.execute("""
        CREATE TABLE IF NOT EXISTS employees (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            salary DECIMAL(10, 2)
        )
    """)

    # 插入数据
    cur.execute("""
        INSERT INTO employees (name, salary) VALUES
        ('Alice', 50000.00),
        ('Bob', 60000.00),
        ('Charlie', 70000.00)
    """)

    # 参数化查询
    name = "David"
    salary = 80000.00
    cur.execute("INSERT INTO employees (name, salary) VALUES (%s, %s)", (name, salary))

    # 查询数据
    cur.execute("SELECT * FROM employees")
    employees = cur.fetchall()
    print("所有员工信息:")
    for employee in employees:
        print(employee)

    # 提交事务
    conn.commit()
    print("事务提交成功!")

except pymysql.Error as e:
    print(f"数据库操作出错:{e}")
    if conn:
        conn.rollback()  # 回滚事务

finally:
    # 关闭游标和连接
    if cur:
        cur.close()
        print("游标已关闭")
    if conn:
        conn.close()
        print("数据库连接已关闭")

7. 两种数据库操作的对比

特性 Psycopg2 (PostgreSQL) PyMySQL (MySQL)
默认事务 开启 关闭
参数化查询方式 %s %s
字符集设置 连接时设置 连接时设置
游标类型 默认元组,可自定义 默认元组,可自定义为字典
是否线程安全

8. 高级操作

  • 连接池: 在高并发场景下,使用连接池可以提高性能。Psycopg2PyMySQL 都有连接池的实现。
  • 异步操作: 使用异步库(如 asyncpg)可以进行非阻塞的数据库操作,提高程序的响应速度。
  • ORM 框架: 使用 ORM 框架(如 SQLAlchemy)可以简化数据库操作,提高开发效率。

内容要点回顾

我们了解了如何使用 Psycopg2PyMySQL 连接和操作 PostgreSQL 和 MySQL 数据库,包括安装、连接、执行 SQL 语句、获取结果、参数化查询和事务处理。掌握这些基础知识,可以进行更加复杂的数据库应用开发。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注