解析 ‘Agentic Load Testing’:如何利用‘机器人用户’模拟百万级并发交互以压测系统的逻辑稳定性

各位同仁,各位技术专家,晚上好。

今天,我们聚焦一个在现代复杂系统测试中日益关键的话题:’Agentic Load Testing’,即“机器人用户”压测。这不仅仅是关于每秒处理多少请求的性能数字,更是深入探讨系统在百万级并发交互下,其核心业务逻辑是否依然坚如磐石,数据是否依然保持完整与一致。我们将深入剖析如何构建和利用这些智能“机器人用户”来模拟真实世界的复杂行为,从而揭示系统深层次的逻辑稳定性问题。

传统压测的局限性与“机器人用户”的崛起

在过去的几十年里,负载测试(Load Testing)一直是软件质量保障的关键环节。它通过模拟大量用户请求,评估系统在预期负载下的性能表现,如响应时间、吞吐量和资源利用率。然而,随着分布式系统、微服务架构以及用户行为日益复杂化,传统负载测试方法逐渐暴露出其局限性:

  1. 无状态或弱状态模拟: 许多传统工具侧重于发送大量独立的HTTP请求,这些请求往往缺乏上下文,不模拟用户的真实会话(session)和状态(state)。例如,一个用户会先登录,然后浏览商品,添加购物车,最后结账。这些步骤是串联且依赖前一步骤的状态的。
  2. 单一路径测试: 传统测试倾向于执行预设的、线性的业务流程。然而,真实用户在系统中并非总是按部就班,他们可能会在不同页面间跳转,重复某些操作,甚至触发异常流程。
  3. 忽略逻辑稳定性: 关注点主要集中在性能指标上,而较少关注在高并发下业务逻辑是否正确执行。例如,库存是否在高并发下单时出现超卖?优惠券是否被重复核销?用户账户余额是否因并发操作出现不一致?这些都是逻辑稳定性问题,而非单纯的性能瓶颈。
  4. 难以发现死锁与竞态条件: 复杂的并发交互可能导致数据库死锁、数据不一致、业务流程中断等难以复现的竞态条件(Race Conditions)。传统工具的脚本通常难以有效探测此类问题。

正是在这样的背景下,“Agentic Load Testing”应运而生。它引入了“机器人用户”(Robot Users),这些机器人不仅仅是请求生成器,更是具有以下核心特征的智能代理(Agents):

  • 状态感知与维持: 每个机器人用户都维护自己的独立状态,如登录状态、购物车内容、当前浏览的页面、已完成的交易等。
  • 决策能力: 机器人用户可以根据系统响应、预设的概率分布或甚至简单的决策逻辑,选择下一步的行动。例如,如果商品缺货,它可能会选择浏览其他商品,而不是继续添加到购物车。
  • 模拟真实用户行为: 它们能模拟复杂的、非线性的用户旅程,包含思考时间、随机操作、异常路径等。
  • 业务逻辑验证: 其核心目标之一是验证在高并发下,系统的业务逻辑是否依然正确、数据是否依然一致。

通过赋予这些“机器人用户”更高级别的智能和自治性,我们能够更深入、更全面地模拟真实世界的使用场景,从而有效地发现传统压测难以触及的逻辑稳定性问题。

“机器人用户”的核心概念与构建要素

构建百万级并发的“机器人用户”系统,需要理解其核心组成部分。

1. 机器人用户(Agent)的定义与行为模式

一个“机器人用户”是一个模拟真实用户行为的软件实体。它不是简单的HTTP客户端,而是一个具备以下特征的独立执行单元:

  • 身份(Identity): 每个机器人用户都应有唯一的身份标识,如用户ID、会话ID,甚至完整的用户档案(用户名、密码、地址、支付信息等)。
  • 状态(State): 维护当前会话的状态。这可能包括:
    • is_logged_in (布尔值)
    • cart_items (列表,包含商品ID和数量)
    • current_page (字符串,表示用户当前所在页面)
    • order_id (字符串,如果已下单)
    • session_token (用于认证)
  • 行为序列(Behavior Sequence/Workflow): 定义了机器人用户可能执行的动作序列。这通常是一个状态机或一个流程图,描述了从登录到注销的各种可能路径。
  • 决策逻辑(Decision Logic): 根据当前状态、系统响应或随机因素来决定下一步动作。
    • 例如:if inventory_low: browse_other_products() else: add_to_cart()
    • random_choice([view_product_details, add_to_wishlist, search_product])
  • 数据驱动(Data-Driven): 行为可以由外部数据驱动,例如从预设的商品列表中选择商品,或者使用从数据库中读取的特定用户数据。
  • 错误处理(Error Handling): 当系统返回错误时,机器人应能识别并记录,甚至尝试不同的操作或重试。

2. 逻辑稳定性:不仅仅是性能

逻辑稳定性是Agentic Load Testing的核心关注点。它涵盖了:

  • 数据一致性: 在高并发读写下,数据库中的数据是否保持一致。例如,银行转账的原子性,商品库存的准确性。
  • 业务规则正确性: 优惠券只能使用一次、订单状态流转正确、权限控制在高并发下不失效等。
  • 交易完整性: 复杂交易(如支付)的每一个步骤在高并发下都能正确完成,不会出现部分成功、部分失败的中间状态。
  • 竞态条件识别: 发现和验证系统在并发操作下是否会遇到死锁、数据覆盖、更新丢失等问题。
  • 错误传播与隔离: 一个子系统的错误是否在高并发下被放大并导致整个系统崩溃,还是能被有效隔离和处理。

这些问题往往难以通过单元测试或集成测试完全暴露,因为它们需要大规模的并发交互才能显现。

3. 压测架构的扩展性挑战

要模拟百万级并发的机器人用户,需要一个高度可伸缩的压测架构:

  • 分布式负载生成器: 单台机器无法承载如此多的并发连接和计算资源。需要将负载生成器分布在多台机器甚至多个数据中心。
  • 数据管理: 为每个机器人用户提供唯一的、真实的测试数据(用户账户、商品、订单等)。这需要高效的数据生成、分发和管理策略。
  • 结果聚合与分析: 在如此大规模的测试中,收集、存储和分析性能指标、逻辑错误日志等数据,并从中提取有价值的洞察,是巨大的挑战。

压测架构与核心组件

为了实现Agentic Load Testing,一个健壮且可伸缩的架构至关重要。

核心组件概述

组件名称 职责 关键技术
Agent Orchestrator 任务调度、管理机器人用户的生命周期、分发测试场景、协调负载生成器。 Kubernetes (调度器), Message Queues (Kafka/RabbitMQ), 自定义调度逻辑
Agent Executors / Load Generators 运行机器人用户实例,执行定义的行为模式,与目标系统进行交互,收集原始性能和逻辑数据。 k6, Gatling, Locust, 自定义Python/Go程序, Docker, Kubernetes Pods
Test Scenarios / Workflows 定义机器人用户的行为逻辑、状态机、决策规则和数据模板。 DSL (Gatling), Python脚本, JavaScript (k6), YAML/JSON配置
Data Providers 生成、存储和分发唯一的测试数据(用户、商品、支付信息等)给机器人用户。 Faker库, 关系型/NoSQL数据库, CSV文件, Redis (缓存)
Metrics Collectors 收集 Agent Executors 上报的性能指标(响应时间、吞吐量、错误率)和逻辑错误(业务逻辑错误)。 Prometheus Exporters, InfluxDB, ELK Stack, OpenTelemetry
Monitoring & Dashboard 可视化实时和历史压测数据,提供系统健康状态、性能趋势和逻辑错误报告。 Grafana, Kibana
Reporting & Analysis 生成详细的压测报告,分析瓶颈、逻辑缺陷和数据一致性问题,提供优化建议。 自定义报告工具, Jupyter Notebooks (数据分析)

架构示意图

在高并发场景下,我们通常会采用云原生技术栈来构建这个架构:

+---------------------+      +---------------------------------+      +---------------------+
|                     |      |                                 |      |                     |
|  Test Scenario      |      |  Data Providers                 |      |  Target System      |
|  (User Journeys,    |      |  (User Data, Product Data, ...) |      |  (Backend APIs,     |
|  Decision Logic)    +----->+---------------------------------+      |  Databases,         |
|                     |             ^                                 |  Microservices)     |
+---------------------+             |                                 +---------------------+
           |                        |                                             ^
           v                        |                                             |
+---------------------+      +---------------------+      +---------------------+
|                     |      |                     |      |                     |
|  Agent Orchestrator |      |  Message Queue      |      |  Metrics Collector  |
|  (Kubernetes,       |      |  (Kafka/RabbitMQ)   |      |  (Prometheus,      |
|  Custom Scheduler)  +----->+---------------------+<-----+  OpenTelemetry)   |
|                     |             ^                        |                     |
+---------------------+             |                        +---------------------+
           |                        |                                     ^
           |  (Spawns/Manages)      | (Logs/Metrics)                      | (Query/Visualize)
           v                        |                                     |
+---------------------+             |                                     |
|                     |             |                                     |
|  Agent Executors    |             |                                     |
|  (k6/Gatling Pods,   +-------------+-------------------------------------+
|  Python Async Apps) |
|                     |
+---------------------+ (Multiple instances running on Kubernetes/VMs)
  1. Agent Orchestrator (调度器): 通常部署在Kubernetes集群上。它负责根据预设的测试计划(例如,在未来一小时内启动一百万个机器人用户,每个用户执行X个循环),动态地创建和销毁Agent Executors。它还负责将测试场景和所需数据分发给各个执行器。
  2. Message Queue (消息队列): 作为中心枢纽,用于Agent OrchestratorAgent Executors之间的通信,以及Agent Executors上报的原始日志和指标数据。这确保了高吞吐量和解耦。
  3. Agent Executors (执行器): 这是真正运行机器人用户逻辑的地方。每个执行器可以是一个Docker容器(或Kubernetes Pod),内部运行着一个或多个机器人用户实例。它们根据接收到的场景和数据,向目标系统发送请求,模拟用户交互,并记录操作结果、响应时间以及任何业务逻辑错误。
  4. Data Providers (数据提供者): 在测试开始前或动态地为机器人用户生成和提供大量唯一的测试数据。这可能是一个独立的微服务,或者是一个预先填充好的数据库。
  5. Metrics Collectors (指标收集器) & Monitoring (监控): Agent Executors会将性能指标(如请求延迟、错误率)和逻辑错误(如“订单创建失败:库存不足”)上报到中心化的指标收集系统(如Prometheus)。同时,目标系统本身的资源利用率、服务日志也应被收集。Grafana等工具用于实时可视化这些数据。
  6. Target System (目标系统): 这是我们正在压测的实际应用、API或微服务集群。

设计与实现“机器人用户”的行为流

构建一个有效的机器人用户,其行为流的设计至关重要。

1. 用户旅程映射与状态机

首先,需要详细绘制出目标系统的关键用户旅程(User Journey)。例如,一个电商应用的典型旅程可能包括:

  • 未登录状态: 浏览首页 -> 搜索商品 -> 查看商品详情 -> 注册/登录
  • 登录状态: 浏览首页 -> 搜索商品 -> 查看商品详情 -> 添加到购物车 -> 查看购物车 -> 更新购物车 -> 下单 -> 支付 -> 查看订单
  • 其他路径: 更新个人资料、查看历史订单、联系客服、使用优惠券等。

这些旅程可以抽象为一个状态机。每个状态代表用户在系统中的一个位置或操作阶段,而状态之间的转换则是通过特定的动作触发。

                          +-----------------+
                          |    Start/Init   |
                          +-----------------+
                                  |
                                  v
                          +-----------------+
                          |     Browse      |  (Homepage, Category, Search)
                          +-----------------+
                                  |
                                  v
                          +-----------------+
                          |  View Product   |
                          +-----------------+
                                  |
                                  v
                  +--------------------------------+
                  |  Decision: Login or Add to Cart? |
                  +--------------------------------+
                  /                                  
                 /                                    
                v                                      v
        +-----------------+                      +-----------------+
        |      Login      |                      |  Add to Cart    |
        +-----------------+                      +-----------------+
                |                                          |
                v                                          v
        +-----------------+                      +-----------------+
        |  Authenticated  |                      |  View Cart      |
        |     Browse      |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |  Checkout       |
        |  Add to Cart    |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |   Place Order   |
        |   View Cart     |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |     Payment     |
        |     Checkout    |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |    Order Conf.  |
        |   Place Order   |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |      Logout     |
        |     Payment     |                      +-----------------+
        +-----------------+                              |
                |                                          v
                v                                  +-----------------+
        +-----------------+                      |       End       |
        |    Order Conf.  |                      +-----------------+
        +-----------------+
                |
                v
        +-----------------+
        |      Logout     |
        +-----------------+
                |
                v
        +-----------------+
        |       End       |
        +-----------------+

2. 实现机器人用户(Python 示例)

我们将使用Python来演示一个简单的机器人用户。Python的requests库(或httpx用于异步)非常适合模拟HTTP请求,而asyncio可以用于实现并发。

首先,定义一个Agent类,它将封装用户的状态和行为。

import httpx
import asyncio
import random
import time
from faker import Faker # 用于生成随机数据

# 初始化Faker,用于生成模拟的用户数据
fake = Faker('zh_CN')

class Product:
    """模拟商品信息"""
    def __init__(self, product_id, name, price, stock):
        self.product_id = product_id
        self.name = name
        self.price = price
        self.stock = stock

    def __repr__(self):
        return f"Product({self.name}, ID:{self.product_id}, Price:{self.price}, Stock:{self.stock})"

# 模拟一个简单的商品数据库(在实际中,这会从API获取或从数据提供者加载)
MOCK_PRODUCTS = [
    Product("P001", "智能手机X", 4999.00, 100),
    Product("P002", "无线耳机Pro", 1299.00, 50),
    Product("P003", "智能手表", 1999.00, 200),
    Product("P004", "笔记本电脑Ultra", 9999.00, 30),
    Product("P005", "智能家居套装", 899.00, 150),
]

class ShoppingAgent:
    def __init__(self, base_url, user_data):
        self.base_url = base_url
        self.user_data = user_data # 包含 username, password, address等
        self.client = httpx.AsyncClient(base_url=self.base_url, timeout=10.0)
        self.session_token = None
        self.cart_items = {} # {product_id: quantity}
        self.is_logged_in = False
        self.user_id = user_data.get("username") # 假设username就是用户ID
        self.current_page = "/"
        self.current_browsing_products = [] # 存储当前页面展示的商品
        self.logical_errors = [] # 记录逻辑错误

    async def _request(self, method, path, **kwargs):
        """封装请求,并记录响应,模拟思考时间"""
        try:
            # 模拟用户思考时间
            await asyncio.sleep(random.uniform(0.1, 0.5))
            response = await self.client.request(method, path, **kwargs)
            response.raise_for_status() # 抛出HTTP错误
            return response
        except httpx.HTTPStatusError as e:
            self.logical_errors.append(f"HTTP Error {e.response.status_code} for {method} {path}: {e.response.text}")
            print(f"Agent {self.user_id} HTTP Error: {e}")
            return None
        except httpx.RequestError as e:
            self.logical_errors.append(f"Request Error for {method} {path}: {e}")
            print(f"Agent {self.user_id} Request Error: {e}")
            return None

    async def login(self):
        """模拟用户登录"""
        print(f"Agent {self.user_id}: Attempting to login...")
        payload = {
            "username": self.user_data["username"],
            "password": self.user_data["password"]
        }
        response = await self._request("POST", "/api/login", json=payload)
        if response and response.status_code == 200:
            data = response.json()
            if data.get("success"):
                self.session_token = data.get("token")
                self.is_logged_in = True
                print(f"Agent {self.user_id}: Logged in successfully. Token: {self.session_token[:10]}...")
                return True
            else:
                self.logical_errors.append(f"Login failed for {self.user_id}: {data.get('message')}")
                print(f"Agent {self.user_id}: Login failed: {data.get('message')}")
                return False
        return False

    async def browse_products(self):
        """模拟用户浏览商品列表"""
        print(f"Agent {self.user_id}: Browsing products...")
        headers = {"Authorization": f"Bearer {self.session_token}"} if self.is_logged_in else {}
        response = await self._request("GET", "/api/products", headers=headers)
        if response and response.status_code == 200:
            products_data = response.json().get("products", [])
            self.current_browsing_products = [Product(**p) for p in products_data]
            print(f"Agent {self.user_id}: Found {len(self.current_browsing_products)} products.")
            return True
        return False

    async def view_product_detail(self, product_id):
        """模拟用户查看商品详情"""
        print(f"Agent {self.user_id}: Viewing product detail for {product_id}...")
        headers = {"Authorization": f"Bearer {self.session_token}"} if self.is_logged_in else {}
        response = await self._request("GET", f"/api/products/{product_id}", headers=headers)
        if response and response.status_code == 200:
            product_data = response.json().get("product")
            if product_data:
                product = Product(**product_data)
                print(f"Agent {self.user_id}: Viewed product {product.name}.")
                return product
            else:
                self.logical_errors.append(f"Product {product_id} detail not found.")
                print(f"Agent {self.user_id}: Product {product_id} detail not found.")
                return None
        return None

    async def add_to_cart(self, product_id, quantity=1):
        """模拟用户添加商品到购物车"""
        if not self.is_logged_in:
            self.logical_errors.append(f"Agent {self.user_id} tried to add to cart without login.")
            print(f"Agent {self.user_id}: Cannot add to cart, not logged in.")
            return False

        print(f"Agent {self.user_id}: Adding {quantity} of {product_id} to cart...")
        headers = {"Authorization": f"Bearer {self.session_token}"}
        payload = {"product_id": product_id, "quantity": quantity}
        response = await self._request("POST", "/api/cart/add", headers=headers, json=payload)
        if response and response.status_code == 200:
            data = response.json()
            if data.get("success"):
                self.cart_items[product_id] = self.cart_items.get(product_id, 0) + quantity
                print(f"Agent {self.user_id}: Added {quantity} of {product_id} to cart. Current cart: {self.cart_items}")
                return True
            else:
                self.logical_errors.append(f"Add to cart failed for {product_id} by {self.user_id}: {data.get('message')}")
                print(f"Agent {self.user_id}: Add to cart failed: {data.get('message')}")
                return False
        return False

    async def view_cart(self):
        """模拟用户查看购物车"""
        if not self.is_logged_in:
            self.logical_errors.append(f"Agent {self.user_id} tried to view cart without login.")
            print(f"Agent {self.user_id}: Cannot view cart, not logged in.")
            return False

        print(f"Agent {self.user_id}: Viewing cart...")
        headers = {"Authorization": f"Bearer {self.session_token}"}
        response = await self._request("GET", "/api/cart", headers=headers)
        if response and response.status_code == 200:
            cart_data = response.json().get("cart_items", [])
            # 验证购物车数据是否与本地状态一致(逻辑稳定性检查)
            remote_cart = {item['product_id']: item['quantity'] for item in cart_data}
            if remote_cart != self.cart_items:
                self.logical_errors.append(f"Cart inconsistency detected for {self.user_id}: Local {self.cart_items}, Remote {remote_cart}")
                print(f"Agent {self.user_id}: !!! Cart inconsistency detected: Local {self.cart_items}, Remote {remote_cart}")
            else:
                print(f"Agent {self.user_id}: Cart consistent: {self.cart_items}")
            return True
        return False

    async def checkout(self):
        """模拟用户结账"""
        if not self.is_logged_in or not self.cart_items:
            self.logical_errors.append(f"Agent {self.user_id} tried to checkout without login or empty cart.")
            print(f"Agent {self.user_id}: Cannot checkout, not logged in or cart empty.")
            return False

        print(f"Agent {self.user_id}: Proceeding to checkout...")
        headers = {"Authorization": f"Bearer {self.session_token}"}
        payload = {"address": self.user_data["address"], "payment_method": "credit_card"} # 简化支付
        response = await self._request("POST", "/api/checkout", headers=headers, json=payload)
        if response and response.status_code == 200:
            data = response.json()
            if data.get("success"):
                order_id = data.get("order_id")
                print(f"Agent {self.user_id}: Checkout successful, Order ID: {order_id}. Clearing cart.")
                self.cart_items = {} # 清空购物车
                return True
            else:
                self.logical_errors.append(f"Checkout failed for {self.user_id}: {data.get('message')}")
                print(f"Agent {self.user_id}: Checkout failed: {data.get('message')}")
                return False
        return False

    async def run_scenario(self):
        """定义机器人用户的主要行为场景"""
        print(f"Agent {self.user_id}: Starting scenario...")

        # 1. 尝试登录
        if not await self.login():
            print(f"Agent {self.user_id}: Scenario aborted due to login failure.")
            return

        # 2. 浏览产品
        if not await self.browse_products():
            print(f"Agent {self.user_id}: Scenario aborted due to product browsing failure.")
            return

        if not self.current_browsing_products:
            print(f"Agent {self.user_id}: No products to browse, scenario ending.")
            return

        # 3. 随机选择1-3个产品添加到购物车
        num_products_to_add = random.randint(1, min(3, len(self.current_browsing_products)))
        selected_products = random.sample(self.current_browsing_products, num_products_to_add)

        for product in selected_products:
            # 随机决定是否查看详情
            if random.random() < 0.7: # 70%的几率查看详情
                detailed_product = await self.view_product_detail(product.product_id)
                if detailed_product and detailed_product.stock > 0:
                    await self.add_to_cart(detailed_product.product_id, quantity=random.randint(1, min(detailed_product.stock, 2)))
                else:
                    self.logical_errors.append(f"Agent {self.user_id} tried to add out-of-stock product {product.product_id}.")
                    print(f"Agent {self.user_id}: Product {product.product_id} is out of stock or not found, skipping add to cart.")
            else: # 直接添加到购物车
                await self.add_to_cart(product.product_id, quantity=random.randint(1, 2))

        # 4. 查看购物车
        if not await self.view_cart():
            print(f"Agent {self.user_id}: Scenario continuing despite cart viewing failure.")

        # 5. 如果购物车有商品,尝试结账
        if self.cart_items:
            await self.checkout()
        else:
            print(f"Agent {self.user_id}: Cart is empty, skipping checkout.")

        print(f"Agent {self.user_id}: Scenario finished. Total logical errors: {len(self.logical_errors)}")
        await self.client.aclose() # 关闭 httpx 客户端

# 模拟一个简单的后端API服务器 (使用 FastAPI 作为例子)
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Dict, Optional

app = FastAPI()

# 模拟数据库
users_db = {} # {username: {password: ..., address: ...}}
products_db = {p.product_id: p for p in MOCK_PRODUCTS}
carts_db = {} # {user_id: {product_id: quantity}}
orders_db = {} # {order_id: {user_id: ..., items: ..., status: ...}}
next_order_id = 1

# 简单JWT模拟
SECRET_KEY = "your-secret-key" # 生产环境应使用更安全的密钥
ALGORITHM = "HS256"

# For simplicity, we'll use a mock token check
def verify_token(token: str = Depends(lambda token: token)):
    if token and token.startswith("Bearer "):
        actual_token = token.split(" ")[1]
        # In a real app, you'd decode and validate the token
        # For this example, any non-empty token is considered valid
        if actual_token:
            return True
    raise HTTPException(status_code=401, detail="Invalid or missing token")

@app.on_event("startup")
async def startup_event():
    # 预填充一些用户数据
    for i in range(100):
        username = fake.user_name()
        users_db[username] = {
            "password": fake.password(),
            "address": fake.address(),
            "username": username # Store username for easy retrieval
        }
    print(f"Initialized {len(users_db)} mock users.")

class LoginRequest(BaseModel):
    username: str
    password: str

@app.post("/api/login")
async def login(request: LoginRequest):
    user = users_db.get(request.username)
    if not user or user["password"] != request.password:
        return {"success": False, "message": "Invalid credentials"}
    # Simplified token generation
    token = f"mock_jwt_token_for_{request.username}_{int(time.time())}"
    return {"success": True, "token": token}

@app.get("/api/products")
async def get_products():
    return {"products": [p.__dict__ for p in products_db.values()]}

@app.get("/api/products/{product_id}")
async def get_product_detail(product_id: str):
    product = products_db.get(product_id)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    return {"product": product.__dict__}

class AddToCartRequest(BaseModel):
    product_id: str
    quantity: int

@app.post("/api/cart/add")
async def add_to_cart(request: AddToCartRequest, token_valid: bool = Depends(verify_token)):
    # Extract user_id from token in real app, here we mock it
    user_id = token_valid # This is just a placeholder for simplicity. In a real app, parse token.
    if isinstance(user_id, bool) and user_id: # If token_valid is True, it means a valid token was passed
        # We need to extract the actual user_id from the token or headers.
        # For this mock, let's assume the user_id is passed in a custom header or derived differently.
        # For the Agent, user_id is its username
        user_id = "mock_user" # Placeholder: in a real system, the token would carry the user_id

    product = products_db.get(request.product_id)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    if product.stock < request.quantity:
        return {"success": False, "message": f"Insufficient stock for {product.name}"}

    # Simulate race condition/inconsistency
    # In a real scenario, this would involve database transactions and locking
    # For demonstration, we'll just update directly.
    # To introduce a potential race condition, we could add a small random delay here.
    # await asyncio.sleep(random.uniform(0.01, 0.05))

    with asyncio.Lock(): # Simulate a lock for critical section in a single-instance API
        # This is a simplification; in real distributed systems, use distributed locks
        product.stock -= request.quantity
        carts_db.setdefault(user_id, {})
        carts_db[user_id][request.product_id] = carts_db[user_id].get(request.product_id, 0) + request.quantity
        print(f"API: {user_id} added {request.quantity} of {request.product_id}. Remaining stock: {product.stock}")
        return {"success": True, "message": "Added to cart"}

@app.get("/api/cart")
async def get_cart(token_valid: bool = Depends(verify_token)):
    user_id = "mock_user" # Placeholder
    cart_items = carts_db.get(user_id, {})
    formatted_cart = [{"product_id": pid, "quantity": qty} for pid, qty in cart_items.items()]
    return {"cart_items": formatted_cart}

class CheckoutRequest(BaseModel):
    address: str
    payment_method: str

@app.post("/api/checkout")
async def checkout(request: CheckoutRequest, token_valid: bool = Depends(verify_token)):
    global next_order_id
    user_id = "mock_user" # Placeholder
    cart_items = carts_db.get(user_id)
    if not cart_items:
        return {"success": False, "message": "Cart is empty"}

    # Simulate complex transaction: deduct stock, create order, process payment
    # This is where race conditions are most critical
    # In a real system, this would be a transaction involving multiple services/DB operations

    # Introduce a potential delay for demonstrating logical stability issues
    await asyncio.sleep(random.uniform(0.05, 0.15)) 

    # Check stock again before finalizing (crucial for concurrency)
    for product_id, quantity in cart_items.items():
        product = products_db.get(product_id)
        if not product or product.stock < quantity:
            # If stock became insufficient during the delay, rollback or fail
            return {"success": False, "message": f"Checkout failed: Insufficient stock for {product_id}"}

    # All checks passed, proceed with order creation
    order_id = f"ORD-{next_order_id:05d}"
    next_order_id += 1
    orders_db[order_id] = {
        "user_id": user_id,
        "items": cart_items,
        "address": request.address,
        "payment_method": request.payment_method,
        "status": "pending"
    }

    # Clear cart
    carts_db.pop(user_id)
    print(f"API: Order {order_id} created for {user_id}. Cart cleared.")
    return {"success": True, "order_id": order_id}

# --- 压测 Orchestrator 示例 ---
async def generate_user_data(num_users):
    """生成一批唯一的机器人用户数据"""
    users = []
    for _ in range(num_users):
        username = fake.user_name() + str(random.randint(1000, 9999)) # 确保唯一性
        users_db[username] = {
            "password": fake.password(),
            "address": fake.address(),
            "username": username
        }
        users.append(users_db[username])
    print(f"Generated {len(users)} unique user data sets for agents.")
    return users

async def orchestrate_agents(base_url, num_agents, duration_seconds):
    """
    协调多个机器人用户并发执行场景。
    这个函数模拟了 Orchestrator 的部分职责,在实际中,这会是一个分布式系统。
    """
    print(f"Orchestrator: Starting {num_agents} agents for {duration_seconds} seconds.")

    agent_data = await generate_user_data(num_agents)
    agents = []
    for user_d in agent_data:
        agents.append(ShoppingAgent(base_url, user_d))

    start_time = time.monotonic()
    tasks = []

    # 简单地让每个 agent 运行一次完整场景
    # 在真实压测中,可能需要循环运行,或者根据TPS目标动态调整
    for agent in agents:
        tasks.append(agent.run_scenario())

    await asyncio.gather(*tasks) # 等待所有 agent 任务完成

    end_time = time.monotonic()
    print(f"Orchestrator: All agents finished in {end_time - start_time:.2f} seconds.")

    total_logical_errors = 0
    for agent in agents:
        total_logical_errors += len(agent.logical_errors)
        if agent.logical_errors:
            print(f"Agent {agent.user_id} reported {len(agent.logical_errors)} logical errors:")
            for error in agent.logical_errors:
                print(f"  - {error}")

    print(f"nTotal logical errors across all agents: {total_logical_errors}")
    if total_logical_errors > 0:
        print("!!! Logical stability issues detected during agentic load testing. !!!")
    else:
        print("No logical stability issues reported by agents.")

# --- 如何运行 ---
# 1. 启动 FastAPI 服务器:
#    uvicorn your_module_name:app --host 0.0.0.0 --port 8000 --reload
#    (将 your_module_name 替换为你的Python文件名)
# 2. 在另一个终端运行 Orchestrator:
#    asyncio.run(orchestrate_agents("http://localhost:8000", num_agents=10, duration_seconds=60))
#    (这里 duration_seconds 在目前的简化例子中不完全起作用,因为 agents 运行一次就结束)

# 实际运行 Orchestrator 的代码
if __name__ == "__main__":
    # 为了简化演示,这里直接运行 Orchestrator。
    # 在实际场景中,FastAPI应用和Orchestrator会是独立的进程或服务。
    # 假设 FastAPI 已经在 localhost:8000 运行
    BASE_URL = "http://localhost:8000"
    NUM_AGENTS = 100 # 模拟100个并发用户
    TEST_DURATION = 30 # 秒

    print("--- Starting Agentic Load Test ---")
    asyncio.run(orchestrate_agents(BASE_URL, NUM_AGENTS, TEST_DURATION))
    print("--- Agentic Load Test Finished ---")

    # 打印最终的库存状态,检查是否出现负库存(超卖)
    print("n--- Final Product Stock Status ---")
    for product_id, product in products_db.items():
        print(f"Product {product.name} (ID: {product_id}): Remaining Stock = {product.stock}")
        if product.stock < 0:
            print(f"!!! CRITICAL: Product {product.name} (ID: {product_id}) shows negative stock (oversold)! !!!")

代码解释:

  1. Product类和MOCK_PRODUCTS 模拟商品数据,包含库存信息,这是检测超卖的关键。
  2. ShoppingAgent类:
    • __init__ 初始化机器人用户的状态(is_logged_in, cart_items, session_token, user_id, logical_errors等)。使用httpx.AsyncClient进行异步HTTP请求。
    • _request 封装了HTTP请求,加入了随机的“思考时间” (asyncio.sleep),使行为更真实。同时捕获HTTP和请求错误,并记录为逻辑错误。
    • login, browse_products, view_product_detail, add_to_cart, view_cart, checkout 这些方法分别模拟了用户在电商系统中的各个操作。它们会更新机器人内部状态,并向模拟后端API发送请求。
    • 逻辑稳定性检查:view_cart中,机器人会比较本地维护的购物车状态与从API获取的购物车状态是否一致。如果出现不一致,则记录为逻辑错误。在add_to_cartcheckout中,会检查库存是否充足,如果不足则记录错误。
    • run_scenario 定义了机器人用户的完整行为路径。它是一个复杂的流程,包含条件判断(是否登录)、随机选择(选择哪些商品)、以及循环操作(添加多个商品)。
  3. FastAPI 模拟后端:
    • 这是一个非常简化的后端API,用于接收机器人用户的请求。
    • users_db, products_db, carts_db, orders_db:模拟了内存中的数据库,用于存储用户、商品、购物车和订单数据。
    • @app.on_event("startup"):在应用启动时生成一些初始用户数据。
    • login:简单的用户认证。
    • get_products, get_product_detail:获取商品信息。
    • add_to_cart这是关键点之一。 它会减少商品库存。为了模拟并发问题,我们可以在这里加入延迟,并假设在分布式环境中,多个并发请求可能导致库存检查与扣减之间出现竞态条件。asyncio.Lock()在此处用于模拟单个API实例内部的并发保护,但在分布式系统中需要分布式锁。
    • get_cart:返回购物车内容。
    • checkout另一个关键点。 在真实系统中,结账是一个复杂的事务,可能涉及多次库存检查和扣减。我们在这里引入了随机延迟,以增加并发冲突的可能性。在扣减库存之前,再次检查库存是防止超卖的常见手段,但高并发下仍需强一致性保证。
  4. orchestrate_agents
    • 这是一个简单的协调器,它生成指定数量的机器人用户数据,然后创建ShoppingAgent实例,并使用asyncio.gather并发运行所有机器人的run_scenario方法。
    • 它在测试结束后聚合所有机器人报告的逻辑错误。
  5. if __name__ == "__main__": 运行压测的主入口。

如何观察逻辑稳定性问题:

  • 购物车不一致: 当多个机器人用户并发操作购物车时,如果后端没有正确处理并发,ShoppingAgentview_cart方法可能会发现其本地维护的购物车状态与API返回的状态不一致。
  • 超卖:add_to_cartcheckout过程中,如果products_db中的商品库存变为负数,则表示发生了超卖。在checkout中,即使初步检查库存充足,在处理过程中也可能因为其他并发操作导致库存不足。
  • 结账失败: 如果checkout返回失败信息(例如库存不足),则表示业务逻辑在高并发下未能正确完成。
  • 错误日志: logical_errors列表会记录所有机器人用户在执行过程中遇到的业务逻辑错误或非预期的系统行为。

通过这种方式,我们不仅仅测量了响应时间,更重要的是,我们让机器人用户像真实用户一样“思考”和“验证”,从而在高并发下暴露系统深层次的业务逻辑缺陷。

数据管理:百万级并发的基石

模拟百万级并发交互,意味着我们需要百万级的独立用户身份、商品、订单等测试数据。

1. 唯一性与真实性

  • 用户数据: 每个机器人用户都需要一个唯一的用户名、密码。可能还需要唯一的地址、电话号码、支付方式等。使用Faker这样的库可以生成大量看起来真实的假数据。
  • 商品数据: 如果系统涉及商品,需要确保有足够的商品种类和库存。在某些场景下,可能需要模拟商品库存的动态变化。
  • 事务数据: 订单ID、交易流水号等需要保证唯一性。

2. 数据生成与分发策略

  • 预生成: 在测试开始前生成所有所需数据,并存储在数据库或文件中。然后,机器人用户从这些预生成的数据池中领取自己的身份和相关数据。
  • 动态生成: 在测试运行过程中,根据需要实时生成少量数据。这适用于某些无法预知的场景或数据量极大的情况。
  • 数据池管理: 使用Redis或其他缓存系统作为数据池,机器人用户从池中原子性地获取数据,确保每个数据只被一个机器人使用。
  • 数据分区: 将数据分发到不同的负载生成器,减少单点瓶颈。

3. 数据清理与重置

每次压测结束后,为了保证测试结果的独立性和可重复性,需要:

  • 清理脏数据: 清除测试过程中生成的用户、订单、购物车等数据。
  • 重置系统状态: 将商品库存、用户余额等业务状态恢复到初始值。
  • 数据库快照/容器化: 使用数据库快照或容器(如Docker Compose或Kubernetes)来快速部署和重置测试环境,确保每次测试都在一个干净的环境中运行。

指标与可观测性:洞察系统行为

Agentic Load Testing不仅关注传统的性能指标,更强调对逻辑稳定性的监控和分析。

1. 传统性能指标

这些是任何负载测试都应收集的基础数据:

  • 响应时间 (Latency): 请求从发出到接收响应所需的时间。关注平均值、P90、P99等分位数。
  • 吞吐量 (Throughput): 单位时间内系统处理的请求数量(RPS/TPS)。
  • 错误率 (Error Rate): 请求失败的百分比(HTTP 5xx错误,连接错误等)。
  • 资源利用率: CPU、内存、网络IO、磁盘IO等在服务器、数据库和消息队列上的使用情况。
  • 并发用户数: 同时活跃的机器人用户数量。

2. 机器人用户特有指标(逻辑稳定性指标)

这些指标是Agentic Load Testing的核心价值所在:

  • 业务流程成功率: 机器人用户成功完成一个完整业务流程(如登录-浏览-添加-结账)的百分比。
  • 逻辑错误率: 机器人用户在执行业务逻辑时遇到的错误数量或百分比。例如:
    • “库存不足”错误(即使HTTP状态码是200)
    • “购物车不一致”错误
    • “优惠券已使用”错误
    • “订单状态流转异常”
  • 数据一致性检查结果: 机器人用户在执行过程中进行的各种数据一致性验证的结果。
  • 状态转换成功率: 机器人用户从一个业务状态成功转换到下一个业务状态的百分比。
  • 用户旅程分布: 不同类型用户旅程的执行频率和成功率。

3. 日志与分布式追踪

  • 机器人用户日志: 每个机器人用户应记录其执行的每一步操作、发送的请求、接收的响应以及任何遇到的逻辑错误。这些日志是调试和分析逻辑问题的关键。
  • 后端服务日志: 目标系统的所有服务日志,包括应用日志、数据库日志、Web服务器日志等,需要集中收集(如ELK Stack)。
  • 分布式追踪: 使用OpenTelemetry、Jaeger或Zipkin等工具,追踪一个请求在分布式系统中穿梭的完整路径。这对于在高并发下定位复杂业务流程中的逻辑瓶颈和错误传播至关重要。

4. 可视化与报告

  • 实时仪表盘: 使用Grafana等工具构建实时仪表盘,展示性能指标、逻辑错误趋势、资源利用率等,以便在测试进行时发现问题。
  • 详细报告: 测试结束后,生成详细的报告,包括所有指标的统计数据、发现的逻辑错误列表、瓶颈分析和建议。

挑战与最佳实践

Agentic Load Testing虽然强大,但也伴随着一系列挑战,需要采取相应的最佳实践。

1. 挑战

  • 复杂性: 机器人用户的行为设计、状态管理、数据生成以及分布式架构的实现都比传统压测复杂得多。
  • 资源消耗: 模拟百万级机器人用户需要大量的计算资源(CPU、内存)来运行负载生成器。此外,目标系统也需要足够的资源来应对测试负载。
  • 调试困难: 在高并发下,定位特定的逻辑错误或竞态条件可能非常困难,因为它们往往难以复现。
  • 数据污染: 不当的测试数据管理可能导致测试数据与生产数据混淆,或者测试数据之间相互影响。
  • 结果分析: 大量的数据和日志需要高效的工具和方法进行分析,以提取有意义的洞察。
  • 维护成本: 随着系统功能迭代,机器人用户的行为脚本也需要同步更新,维护成本较高。

2. 最佳实践

  • 渐进式构建: 从简单的用户旅程开始,逐步增加机器人用户的复杂性和随机性。不要试图一次性模拟所有可能的行为。
  • 模块化设计: 将机器人用户的行为拆分成独立的、可复用的模块(如登录模块、购物车模块),便于维护和组合。
  • 数据驱动: 尽可能将机器人用户的行为参数化,通过外部数据(CSV、数据库)驱动其行为,而不是硬编码在脚本中。
  • 充分的数据准备: 投入足够的时间和资源来生成和管理大规模、高质量的测试数据,确保数据唯一性和真实性。
  • 强大的可观测性: 建立全面的监控、日志和分布式追踪系统,不仅关注性能,更要关注业务逻辑的正确性和数据一致性。
  • 自动化验证: 在机器人用户的行为流中嵌入断言和验证逻辑,使其能够自动检测业务逻辑错误和数据不一致。
  • 环境隔离: 确保压测环境与生产环境完全隔离,避免数据污染。建议使用可快速部署和重置的容器化环境。
  • 团队协作: 压测不仅仅是测试团队的职责,开发、运维团队也应积极参与,共同定义测试场景,分析问题,并优化系统。
  • 小规模验证: 在进行大规模压测之前,先在小规模下运行机器人用户,验证其行为逻辑是否正确,以及能否发现预期的问题。
  • 故障注入: 结合混沌工程(Chaos Engineering)思想,在压测过程中主动注入故障(如网络延迟、服务下线),观察系统在压力和故障双重打击下的逻辑稳定性。

结语

Agentic Load Testing代表了负载测试领域的一个重要演进,它将我们的视野从单纯的性能指标拓展到更深层次的业务逻辑与数据一致性。通过赋予“机器人用户”以智能和自治性,我们得以在高并发的复杂交互场景中,揭示系统潜在的逻辑缺陷,确保关键业务流程的健壮与可靠。这不仅是技术上的挑战,更是对系统质量保障理念的革新。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注