各位技术同仁,下午好!
今天,我们齐聚一堂,共同探讨一个在AI时代背景下,对企业品牌声誉至关重要的议题:如何构建一个“AI展现预警系统”,在第一时间发现AI搜索结果中对品牌不利的言论。
随着大型语言模型(LLM)的飞速发展和普及,以ChatGPT、Bard、Copilot为代表的AI搜索引擎正在深刻改变用户获取信息的方式。它们不再仅仅是索引网页,而是通过理解、总结、生成内容,直接向用户呈现答案。这无疑为信息传播带来了前所未有的效率,但也为品牌管理带来了新的挑战。
传统意义上的舆情监控系统,侧重于监测社交媒体、新闻网站、论坛等平台。然而,当用户直接通过AI搜索引擎提问,而AI给出的回答中包含了对品牌的不利信息时,这种负面传播的隐蔽性、权威性和扩散速度都远超以往。一条由AI“权威”生成的负面评价,其杀伤力可能相当于数百条社交媒体评论。更重要的是,用户可能不会再去验证AI的回答,而是直接采信。
因此,我们迫切需要一套全新的预警机制,能够深入AI搜索结果的“内部”,理解其生成内容的语义,并及时识别出任何可能损害品牌声誉的言论。这正是我们今天的主题:构建一个智能化、自动化、高效的AI展现预警系统。
本次讲座,我将从一名编程专家的视角,为大家详细剖析这个系统的架构、核心模块的实现细节,并提供大量的代码示例,帮助大家将理论转化为实践。我们将覆盖从数据采集、预处理、智能分析到预警通知的整个流程。
一、AI搜索时代的品牌挑战与预警系统的必要性
1.1 传统舆情监控的局限性
传统的舆情监控系统通常依赖于关键词匹配、爬虫抓取固定网页、以及基于规则的情感分析。它们在面对AI搜索结果时面临以下挑战:
- 动态生成性: AI搜索结果是动态生成的,每次查询甚至在不同上下文中都可能产生差异。这使得传统的固定URL抓取变得无效。
- 语义复杂性: AI生成的文本往往高度概括和总结,其中可能包含隐晦的负面含义、讽刺或基于误解的偏见,而这些是简单关键词匹配难以捕捉的。
- 黑盒性: 我们无法直接访问AI搜索引擎的内部数据或API(除非它们提供),只能通过模拟用户查询来获取结果。
- 权威性影响: AI生成的回答在用户心中具有更高的“权威性”,一旦出现负面信息,其对品牌信任度的损害更为严重且难以挽回。
1.2 为什么需要“第一时间”发现
“第一时间”在这里是关键。负面信息,尤其是通过AI渠道传播的,具有极强的扩散性和固化性。一旦负面叙事形成并被AI反复生成和传播,纠正成本将呈指数级增长。
- 快速响应: 及时发现能够为品牌争取宝贵的应对时间,无论是内部调查、澄清事实,还是进行公关干预。
- 损害控制: 在负面信息扩散初期进行干预,可以有效阻止其进一步蔓延,将损害降到最低。
- 品牌声誉维护: 持续监测和快速响应是维护品牌长期声誉的基石。
构建这样一个系统,不仅仅是技术挑战,更是品牌在数字时代生存和发展的战略需求。
二、系统架构总览:蓝图与核心组件
一个健壮的AI展现预警系统需要多个模块协同工作。我将它设计为一个可伸缩、模块化的架构,便于迭代和维护。
高层架构视图:
+---------------------+ +------------------------+ +--------------------------+
| 1. 数据采集模块 | | 2. 数据预处理与知识库 | | 3. AI情感分析与意图识别 |
| (AI Search Crawler) | | (Text Cleaning, Embedding, RAG) | | (LLM, Prompt Engineering) |
+----------+----------+ +------------+-----------+ +------------+-------------+
| | |
v v v
+-----------------------------------------------------------------------------------------+
| 消息队列 (Message Queue) |
| (e.g., Kafka, AWS SQS) |
+-----------------------------------------------------------------------------------------+
|
v
+--------------------------+ +------------------------+
| 4. 预警规则与通知模块 |----->| 5. 数据存储与可视化 |
| (Rule Engine, Notifier) | | (Database, Dashboard) |
+----------+----------+ +------------------------+
|
v
+--------------------------+
| 通知渠道 (Email, SMS, |
| Slack, Webhook) |
+--------------------------+
核心组件介绍:
- 数据采集模块: 负责模拟用户在AI搜索引擎上的行为,抓取AI生成的回答内容。
- 数据预处理与知识库模块: 对抓取到的原始文本进行清洗、格式化,并构建品牌相关的知识库,为后续的AI分析提供上下文和事实依据。
- AI情感分析与意图识别模块: 利用大型语言模型(LLM)对文本进行深入语义分析,判断其情感倾向(正面、中性、负面)及潜在意图(抱怨、诽谤、误导等)。
- 消息队列: 作为解耦各个模块的中间件,确保数据流的稳定性和系统的可伸缩性。
- 预警规则与通知模块: 根据预设的规则(如情感阈值、特定关键词、意图匹配),触发预警,并通过多种渠道通知相关人员。
- 数据存储与可视化: 存储所有抓取和分析的数据,并提供仪表盘进行趋势监控和历史追溯。
技术栈选择(示例):
- 编程语言: Python (生态丰富,适合NLP/ML)
- 爬虫框架: Playwright/Selenium (处理动态JS加载页面)
- 文本处理: NLTK, spaCy, RegEx
- 向量数据库: Pinecone, Weaviate, Milvus, Qdrant (用于RAG和知识库)
- LLM提供商: OpenAI API (GPT-4), Anthropic Claude, Google Gemini
- 消息队列: Apache Kafka / AWS SQS / RabbitMQ
- 数据存储: PostgreSQL / MongoDB (结构化/非结构化数据)
- 通知服务: SMTP (Email), Twilio (SMS), Slack Webhooks, DingTalk/WeChat Work APIs
- 云平台: AWS / Google Cloud Platform / Azure (提供Serverless functions, Managed databases, etc.)
接下来,我们将深入探讨每个核心模块的实现细节和代码示例。
三、核心模块详解与代码实现
3.1 数据采集模块:深入AI搜索结果
数据采集是整个系统的起点。由于AI搜索结果的动态性和黑盒性,我们不能像抓取静态网页那样简单。
挑战:
- 反爬机制: AI搜索引擎可能部署了复杂的反爬机制,包括IP限制、User-Agent检测、JS混淆等。
- 动态渲染: 结果页面通常由JavaScript动态渲染,需要一个能够执行JS的浏览器自动化工具。
- 会话管理: 模拟用户登录、保持会话等复杂操作。
- 速率限制: 过快请求可能导致IP被封禁或触发验证码。
策略:
- 浏览器自动化工具: 使用Playwright或Selenium模拟真实用户操作,打开浏览器,输入查询,等待结果加载。
- 代理IP池: 轮换IP地址,避免被目标网站识别为爬虫。
- User-Agent轮换: 模拟不同浏览器和操作系统。
- 请求间隔: 引入随机延迟,模拟人类浏览行为。
- 错误处理与重试: 对网络错误、页面加载失败等情况进行健壮处理。
实现:Python + Playwright
Playwright是一个强大的浏览器自动化库,支持Chromium、Firefox和WebKit,并且提供了同步和异步API。它特别适合处理JavaScript渲染的页面。
import asyncio
from playwright.async_api import async_playwright, Page, TimeoutError
import logging
import random
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 模拟的AI搜索引擎URL和搜索框选择器 (示例,实际需根据目标AI搜索引擎调整)
# 注意:以下URL和选择器均为虚构,实际使用时请替换为真实AI搜索引擎的URL和元素选择器
AI_SEARCH_URL = "https://example-ai-search.com/"
SEARCH_INPUT_SELECTOR = "input[name='q']"
SEARCH_RESULT_SELECTOR = "div.ai-generated-answer" # AI生成回答的CSS选择器
# 代理IP池 (示例,实际应从服务商获取或自建)
PROXY_LIST = [
"http://user:[email protected]:8080",
"http://user:[email protected]:8080",
# ... 更多代理
]
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/109.0.0.0 Safari/537.36",
# ... 更多User-Agent
]
async def get_ai_search_result(query: str) -> str | None:
"""
模拟在AI搜索引擎中进行搜索,并提取AI生成的回答。
"""
selected_proxy = random.choice(PROXY_LIST) if PROXY_LIST else None
selected_user_agent = random.choice(USER_AGENTS)
browser_config = {
'headless': True, # 生产环境通常使用无头模式
'proxy': {'server': selected_proxy} if selected_proxy else None
}
async with async_playwright() as p:
try:
browser = await p.chromium.launch(**browser_config)
context = await browser.new_context(user_agent=selected_user_agent)
page = await context.new_page()
logging.info(f"Navigating to {AI_SEARCH_URL} with proxy: {selected_proxy} and UA: {selected_user_agent[:50]}...")
await page.goto(AI_SEARCH_URL, wait_until='domcontentloaded', timeout=60000)
# 确保搜索框可见并可交互
await page.wait_for_selector(SEARCH_INPUT_SELECTOR, state='visible', timeout=30000)
await page.fill(SEARCH_INPUT_SELECTOR, query)
await page.press(SEARCH_INPUT_SELECTOR, "Enter")
logging.info(f"Searching for: '{query}'")
# 等待AI生成结果的元素出现
# 通常AI结果加载需要时间,这里设置较长的等待时间
await page.wait_for_selector(SEARCH_RESULT_SELECTOR, state='visible', timeout=90000)
ai_answer_element = await page.query_selector(SEARCH_RESULT_SELECTOR)
if ai_answer_element:
ai_answer_text = await ai_answer_element.inner_text()
logging.info(f"Successfully retrieved AI answer for '{query}'. Length: {len(ai_answer_text)} chars.")
return ai_answer_text
else:
logging.warning(f"Could not find AI answer element for '{query}'. Selector: {SEARCH_RESULT_SELECTOR}")
return None
except TimeoutError:
logging.error(f"Timeout occurred while searching for '{query}'. Page did not load or element not found.")
return None
except Exception as e:
logging.error(f"An error occurred during AI search for '{query}': {e}")
return None
finally:
if 'browser' in locals():
await browser.close()
# 引入随机延迟,模拟人类行为,避免触发反爬
time.sleep(random.uniform(5, 15))
# 示例使用
async def main():
brand_queries = [
"关于 [你的品牌名] 的最新评价",
"是 [你的品牌名] 的产品好用吗",
"[你的品牌名] 负面新闻",
"[你的品牌名] 质量问题",
"如何评价 [你的品牌名]",
"谁是 [你的品牌名] 的竞争对手",
"[你的品牌名] 的服务怎么样",
]
results = {}
for query in brand_queries:
ai_response = await get_ai_search_result(query)
if ai_response:
results[query] = ai_response
# 这里可以将结果发送到消息队列供下一个模块处理
print(f"Query: {query}nAI Response:n{ai_response[:200]}...n---")
else:
print(f"Query: {query}nNo AI Response found or error occurred.n---")
# 避免过于频繁的请求
await asyncio.sleep(random.uniform(10, 30))
# 实际应用中,会将results发送到消息队列
# 例如:send_to_message_queue(results)
if __name__ == "__main__":
asyncio.run(main())
代码说明:
get_ai_search_result函数封装了整个爬取逻辑。- 通过
async_playwright()启动无头浏览器,并设置代理和User-Agent。 page.goto()导航到AI搜索引擎页面。page.fill()填充搜索框,page.press("Enter")提交查询。page.wait_for_selector()等待AI生成结果的特定HTML元素加载完成。ai_answer_element.inner_text()提取AI回答的纯文本内容。- 包含了错误处理、超时机制和随机延迟,增强爬虫的鲁棒性。
main函数展示了如何批量查询与品牌相关的疑问。
重要提示: 请务必遵守目标网站的服务条款和robots.txt协议。未经授权的爬取可能导致法律风险或IP被封禁。上述代码中的URL和选择器仅为演示目的,实际使用时必须替换为真实的AI搜索引擎及其对应的HTML元素。某些AI搜索可能集成验证码或更复杂的反爬机制,可能需要额外的解决方案(如接入打码平台)。
3.2 数据预处理与知识库构建模块
从AI搜索结果中获取的原始文本,通常需要清洗才能进行高质量的LLM分析。同时,构建一个品牌知识库对于提高分析的准确性和避免LLM“幻觉”至关重要。
目的:
- 文本清洗: 移除无关字符、HTML标签、乱码等。
- 文本标准化: 统一格式,如转换为小写,处理数字和日期。
- 语义增强: 通过分词、命名实体识别(NER)等技术提取关键信息。
- 知识库构建: 存储品牌的官方信息、产品特点、常见问题解答、正面宣传材料,用于RAG(检索增强生成)机制。
实现:Python + NLTK/spaCy + OpenAI Embedding + Vector DB
3.2.1 文本清洗与分块
LLM通常有输入长度限制(context window),因此长文本需要分块(chunking)。
import re
import jieba # 假设处理中文,可以使用jieba进行分词
from typing import List, Dict
def clean_text(text: str) -> str:
"""
清洗文本,移除HTML标签、多余空格、特殊字符等。
"""
if not isinstance(text, str):
return ""
# 移除HTML标签
clean = re.compile('<.*?>')
text = re.sub(clean, '', text)
# 移除URLs
text = re.sub(r'httpS+|wwwS+|httpsS+', '', text, flags=re.MULTILINE)
# 移除邮件地址
text = re.sub(r'S*@S*s?', '', text)
# 移除特殊字符和数字,只保留中英文、数字和常见标点
text = re.sub(r'[^u4e00-u9fa5a-zA-Z0-9.,!?;,。!?;]', ' ', text)
# 移除多个空格
text = re.sub(r's+', ' ', text).strip()
return text
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""
将长文本分割成固定大小的块,并带有重叠。
"""
if not text:
return []
tokens = list(jieba.cut(text)) # 假设中文,使用jieba分词
# 如果是英文,可以使用 text.split() 或更高级的tokenizers
chunks = []
i = 0
while i < len(tokens):
chunk = tokens[i:i + chunk_size]
chunks.append("".join(chunk))
i += chunk_size - overlap
if i >= len(tokens): # 确保最后一个chunk也包含
break
return chunks
# 示例使用
raw_ai_response = """
<p>关于<a href="https://yourbrand.com">你的品牌名</a>的最新评价:</p>
<p>最近用户反映其产品A的续航能力不及宣传,导致部分消费者不满。虽然官方宣称有<b>30小时</b>续航,但实际使用中可能只有20小时。此外,客服响应速度也有待提高。</p>
<p>但也有用户表示,产品B的性价比很高,是同类产品中的佼佼者。其创新设计和稳定性能获得了广泛好评。总体而言,该品牌在市场上有一定的竞争力。</p>
"""
cleaned_text = clean_text(raw_ai_response)
print(f"Cleaned Text:n{cleaned_text}n")
chunks = chunk_text(cleaned_text, chunk_size=100, overlap=20)
for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}:n{chunk}n---")
3.2.2 知识库构建与向量化
品牌知识库应包含官方文档、FAQ、产品手册等。这些文本经过清洗和分块后,需要通过Embedding模型转换为向量,存储在向量数据库中。
向量数据库的优势:
| 特性 | 传统关系型数据库(RDB) | 向量数据库(Vector DB) |
|---|---|---|
| 数据类型 | 结构化数据,如整数、字符串、日期 | 高维向量(浮点数数组) |
| 查询方式 | 精确匹配、范围查询、JOIN | 近似最近邻(ANN)搜索,相似度查询 |
| 核心用例 | 事务处理、报表、数据分析 | 语义搜索、推荐系统、RAG、异常检测 |
| 扩展性 | 通常通过分库分表或读写分离 | 针对高维向量搜索优化,水平扩展能力强 |
| 代表产品 | MySQL, PostgreSQL, Oracle | Pinecone, Weaviate, Milvus, Qdrant, Chroma |
我们将使用OpenAI的Embedding API将文本转换为向量。
import os
from openai import OpenAI
from typing import List, Dict
# 假设您已设置OPENAI_API_KEY环境变量
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
def get_embeddings(texts: List[str]) -> List[List[float]]:
"""
使用OpenAI Embedding API将文本转换为向量。
"""
if not texts:
return []
try:
response = client.embeddings.create(
input=texts,
model="text-embedding-ada-002" # 当前推荐的embedding模型
)
return [data.embedding for data in response.data]
except Exception as e:
logging.error(f"Error getting embeddings: {e}")
return []
# 模拟品牌知识库内容
brand_knowledge_base_raw = [
"我们的品牌名是'你的品牌名',致力于提供高质量的智能家居产品。",
"产品A是一款智能音箱,拥有30小时超长续航,支持语音助手功能。",
"产品B是一款智能摄像头,特点是1080P高清画质和AI人形识别。",
"我们的客服团队24/7在线,承诺2小时内响应客户咨询。",
"所有产品均享受一年质保和七天无理由退换。",
# ... 更多官方、正面的品牌信息
]
# 清洗并分块知识库内容
cleaned_knowledge_chunks = []
for doc in brand_knowledge_base_raw:
cleaned_doc = clean_text(doc)
cleaned_knowledge_chunks.extend(chunk_text(cleaned_doc, chunk_size=100, overlap=0))
# 获取知识库文本的向量
knowledge_embeddings = get_embeddings(cleaned_knowledge_chunks)
# 模拟存储到向量数据库 (这里仅打印,实际需调用向量数据库SDK)
# 假设我们有一个简单的结构来存储
knowledge_vectors: List[Dict] = []
for i, (text_chunk, embedding) in enumerate(zip(cleaned_knowledge_chunks, knowledge_embeddings)):
knowledge_vectors.append({
"id": f"kb_chunk_{i}",
"text": text_chunk,
"embedding": embedding
})
# print(f"Knowledge Chunk {i+1}: {text_chunk[:50]}... Embedding length: {len(embedding)}")
# 示例:将知识库存储到Pinecone (概念性代码)
# from pinecone import Pinecone, Index
# pinecone_api_key = os.environ.get("PINECONE_API_KEY")
# pinecone_environment = os.environ.get("PINECONE_ENVIRONMENT")
# pc = Pinecone(api_key=pinecone_api_key, environment=pinecone_environment)
# index_name = "brand-knowledge-base"
# if index_name not in pc.list_indexes():
# pc.create_index(name=index_name, dimension=len(knowledge_embeddings[0]), metric='cosine')
# index: Index = pc.Index(index_name)
# index.upsert(vectors=[
# {"id": item["id"], "values": item["embedding"], "metadata": {"text": item["text"]}}
# for item in knowledge_vectors
# ])
# logging.info(f"Knowledge base loaded with {len(knowledge_vectors)} chunks into Pinecone.")
代码说明:
clean_text函数用于移除HTML、URL、特殊字符等。chunk_text函数将长文本按字数(或token数)分割成小块,并引入重叠以保留上下文。get_embeddings函数通过OpenAI API将文本块转换为高维向量。- 展示了如何将清洗、分块和向量化的知识库内容概念性地存储到向量数据库中。实际使用时,需要安装并配置相应的向量数据库客户端(如
pinecone-client)。
3.3 AI情感分析与意图识别模块:LLM的深度洞察
这是预警系统的核心,利用LLM的强大理解能力,超越简单的关键词匹配,进行深层次的语义分析。
挑战:
- 识别细微情感: 负面情绪可能不直接表达,而是通过暗示、比较、讽刺等方式体现。
- 区分事实与观点: LLM可能混淆用户观点和客观事实。
- 幻觉问题: LLM可能生成不准确甚至虚假的信息。
- 上下文理解: 准确判断负面言论需要结合品牌自身的背景信息。
策略:
- Prompt Engineering: 精心设计Prompt,引导LLM输出结构化、准确的结果。
- Few-shot Learning: 在Prompt中提供少量负面言论示例及其分析结果,帮助LLM理解任务。
- RAG (Retrieval Augmented Generation): 结合品牌知识库,为LLM提供额外上下文,进行事实核查,减少幻觉,并帮助LLM更好地判断言论是否与品牌官方信息相悖。
- 结构化输出: 要求LLM以JSON格式返回结果,便于后续程序处理。
实现:Python + OpenAI API + RAG
import os
from openai import OpenAI
import json
import logging
from typing import Dict, List, Any
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
# 模拟向量数据库查询函数 (实际应调用Pinecone/Weaviate等SDK)
def retrieve_relevant_knowledge(query_text: str, top_k: int = 3) -> List[str]:
"""
根据查询文本,从向量数据库中检索最相关的品牌知识。
这里仅为模拟,实际需实现向量搜索逻辑。
"""
# 实际步骤:
# 1. 将 query_text 转换为 embedding
# query_embedding = get_embeddings([query_text])[0]
# 2. 调用向量数据库进行相似度搜索
# search_results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
# 3. 提取相关文本
# relevant_texts = [match.metadata['text'] for match in search_results.matches]
# 简单模拟返回,实际应从知识库中检索
simulated_knowledge = [
"我们的产品A确实拥有30小时的超长续航能力,这是经过严格测试的官方数据。",
"我们承诺2小时内响应客户咨询,并持续优化客服系统以提供更快的响应。",
"产品B的1080P高清画质和AI人形识别是其核心卖点,深受用户喜爱。"
]
# 根据查询文本的关键词简单筛选模拟结果,实际是向量搜索
relevant = [k for k in simulated_knowledge if any(word in query_text for word in ["续航", "客服", "产品B"])]
return relevant if relevant else simulated_knowledge[:top_k] # 如果没有匹配,返回前top_k个
def analyze_brand_mention_with_llm(
mention_text: str,
brand_name: str,
context_knowledge: List[str] = None
) -> Dict[str, Any] | None:
"""
使用LLM分析品牌提及的情感和意图,并结合知识库进行事实核查。
"""
if context_knowledge is None:
context_knowledge = []
knowledge_str = "n".join([f"- {k}" for k in context_knowledge])
if knowledge_str:
knowledge_str = "nn以下是关于品牌的一些官方信息和背景知识,请在分析时作为参考,用于核实事实或提供上下文:n" + knowledge_str
prompt = f"""
你是一个专业的品牌舆情分析师,你的任务是分析一段关于品牌'{brand_name}'的言论。
请根据以下要求,以JSON格式输出分析结果:
1. **sentiment (情感倾向):** 'Positive', 'Neutral', 'Negative', 'Mixed'。
2. **intent (意图):** 识别言论的潜在意图,例如 'Complaint' (抱怨), 'Misinformation' (虚假信息/误导), 'Defamation' (诽谤), 'Bug Report' (Bug报告), 'Feature Request' (功能请求), 'General Review' (一般评价), 'Praise' (赞扬), 'Comparison' (比较), 'Question' (提问) 等。如果存在多个意图,请列出主要意图。
3. **keywords (关键词):** 提取与品牌、产品、负面/正面信息直接相关的关键词列表。
4. **summary (总结):** 对该言论进行简明扼要的总结。
5. **is_negative_impact (是否产生负面影响):** 布尔值,判断该言论是否可能对品牌产生负面影响。如果情感是Negative或Mixed,且意图是Complaint, Misinformation, Defamation,则通常为True。
6. **details (详细分析):** 解释为什么你认为它会产生负面影响,或者为什么是某种情感和意图。如果言论内容与提供的品牌知识存在矛盾,请指出矛盾点。
7. **confidence (置信度):** 你对分析结果的置信度,范围0-100。
请分析以下言论:
---
言论内容:
{mention_text}
---
{knowledge_str}
请直接输出JSON格式的结果,不要包含任何额外文字或解释。
"""
try:
response = client.chat.completions.create(
model="gpt-4o", # 使用最新的模型,性能更优
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}, # 强制LLM输出JSON
temperature=0.2 # 较低的温度值有助于获得更稳定、一致的输出
)
result_json_str = response.choices[0].message.content
return json.loads(result_json_str)
except json.JSONDecodeError as e:
logging.error(f"LLM output was not valid JSON: {result_json_str}. Error: {e}")
return None
except Exception as e:
logging.error(f"Error analyzing mention with LLM: {e}")
return None
# 示例使用
ai_search_query_result = """
最近用户反映'你的品牌名'的产品A续航能力不及宣传,导致部分消费者不满。虽然官方宣称有30小时续航,但实际使用中可能只有20小时。此外,客服响应速度也有待提高。
"""
# 结合RAG
relevant_knowledge = retrieve_relevant_knowledge(ai_search_query_result)
print(f"Retrieved relevant knowledge: {relevant_knowledge}n")
analysis_result = analyze_brand_mention_with_llm(
mention_text=ai_search_query_result,
brand_name="你的品牌名",
context_knowledge=relevant_knowledge
)
if analysis_result:
print(json.dumps(analysis_result, indent=2, ensure_ascii=False))
else:
print("Failed to get LLM analysis.")
# 另一个例子:正面评价
positive_query_result = """
'你的品牌名'的产品B性价比很高,是同类产品中的佼佼者。其创新设计和稳定性能获得了广泛好评。
"""
positive_analysis_result = analyze_brand_mention_with_llm(
mention_text=positive_query_result,
brand_name="你的品牌名"
)
if positive_analysis_result:
print("n--- Positive Example ---n")
print(json.dumps(positive_analysis_result, indent=2, ensure_ascii=False))
代码说明:
retrieve_relevant_knowledge函数模拟从向量数据库中检索与当前待分析文本最相关的品牌知识。这是RAG机制的关键一步。analyze_brand_mention_with_llm函数是核心。- Prompt Engineering: 构建了详细的Prompt,明确角色、任务、期望的JSON输出格式,并列出了可能的情感和意图类型。
- RAG集成: 将
context_knowledge作为Prompt的一部分,为LLM提供额外的背景信息,帮助其进行更准确的分析和事实核查。 - 强制JSON输出: 利用
response_format={"type": "json_object"}确保LLM返回有效的JSON字符串。 - 使用
gpt-4o模型,因为它在理解复杂指令和生成结构化输出方面表现出色。
- 通过
json.loads()解析LLM的输出,以便程序化处理。
3.4 预警规则与通知模块
当AI情感分析与意图识别模块得出结论后,预警规则模块会根据这些结果判断是否需要触发预警。
目的:
- 规则评估: 根据预设条件(情感、意图、关键词、置信度等)判断是否达到预警级别。
- 通知分发: 通过多种渠道及时通知相关负责人。
预警规则示例(可配置):
| 规则ID | 规则名称 | 触发条件 | 预警级别 | 通知渠道 | 接收人/组 |
|---|---|---|---|---|---|
| R001 | 严重负面评价 | sentiment == ‘Negative’ 且 is_negative_impact == True 且 confidence >= 80 |
紧急 | 邮件, Slack | 品牌公关部, CEO |
| R002 | 虚假信息/诽谤 | intent 包含 ‘Misinformation’ 或 ‘Defamation’ 且 confidence >= 70 |
紧急 | 邮件, 短信 | 品牌公关部, 法务部 |
| R003 | 批量负面抱怨 | 在短时间内(如1小时内)发现5条以上 sentiment == ‘Negative’ 且 intent == ‘Complaint’ 的言论 |
高 | 邮件, Slack | 产品经理, 客户服务部 |
| R004 | 特定关键词预警 | keywords 包含 ‘质量问题’, ‘诈骗’, ‘退款难’ 等敏感词 |
高 | 邮件, 钉钉 | 相关产品负责人, 客户服务部 |
| R005 | 中度负面评价 | sentiment == ‘Negative’ 或 ‘Mixed’ 且 is_negative_impact == True |
中 | 邮件 | 品牌公关部 |
实现:Python + 消息队列 + 通知服务
消息队列在这里起到关键作用,它解耦了分析模块和通知模块,提高了系统的鲁棒性和可伸缩性。分析结果首先发送到消息队列,然后由通知服务异步消费。
import os
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import json
import logging
from typing import Dict, Any, List
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- 消息队列模拟 (实际应集成Kafka/SQS等) ---
class MessageQueue:
def __init__(self):
self.queue = []
def publish(self, topic: str, message: Dict[str, Any]):
logging.info(f"Published to topic '{topic}': {message.get('alert_id', 'N/A')}")
self.queue.append({'topic': topic, 'message': message})
def consume(self, topic: str) -> List[Dict[str, Any]]:
# 实际消费会有更复杂的机制,这里简单模拟
consumed_messages = [item['message'] for item in self.queue if item['topic'] == topic]
self.queue = [item for item in self.queue if item['topic'] != topic] # 消费后移除
return consumed_messages
message_queue = MessageQueue()
# --- 通知服务实现 ---
def send_email_notification(recipient_email: str, subject: str, body: str):
"""发送邮件通知"""
sender_email = os.environ.get("SENDER_EMAIL")
sender_password = os.environ.get("SENDER_PASSWORD")
smtp_server = os.environ.get("SMTP_SERVER", "smtp.gmail.com")
smtp_port = int(os.environ.get("SMTP_PORT", 587))
if not all([sender_email, sender_password, recipient_email]):
logging.error("Email configuration missing. Skipping email notification.")
return False
try:
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = recipient_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain', 'utf-8'))
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
logging.info(f"Email notification sent to {recipient_email} for subject: {subject}")
return True
except Exception as e:
logging.error(f"Failed to send email to {recipient_email}: {e}")
return False
def send_slack_notification(webhook_url: str, message: str):
"""发送Slack通知"""
if not webhook_url:
logging.error("Slack webhook URL not configured. Skipping Slack notification.")
return False
payload = {'text': message}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
logging.info(f"Slack notification sent. Status: {response.status_code}")
return True
except requests.exceptions.RequestException as e:
logging.error(f"Failed to send Slack notification: {e}")
return False
# 假设还有send_sms_notification, send_dingtalk_notification等
# --- 预警规则评估器 ---
def evaluate_alert_rules(analysis_result: Dict[str, Any], brand_name: str) -> List[Dict[str, Any]]:
"""
根据LLM的分析结果评估预警规则。
返回触发的预警列表。
"""
triggered_alerts = []
sentiment = analysis_result.get('sentiment')
intent = analysis_result.get('intent', [])
is_negative_impact = analysis_result.get('is_negative_impact', False)
confidence = analysis_result.get('confidence', 0)
keywords = analysis_result.get('keywords', [])
mention_summary = analysis_result.get('summary', analysis_result.get('details', '无总结'))
original_text = analysis_result.get('original_text', 'N/A') # 假设LLM结果中包含原始文本
# 规则1: 严重负面评价
if sentiment == 'Negative' and is_negative_impact and confidence >= 80:
triggered_alerts.append({
"alert_id": "R001",
"level": "紧急",
"title": f"【紧急预警】品牌'{brand_name}'出现严重负面评价",
"message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}, 置信度: {confidence}",
"channels": ["email", "slack"],
"recipients": {"email": ["[email protected]", "[email protected]"], "slack": ["pr_channel_webhook_url"]}
})
# 规则2: 虚假信息/诽谤
if any(i in intent for i in ['Misinformation', 'Defamation']) and confidence >= 70:
triggered_alerts.append({
"alert_id": "R002",
"level": "紧急",
"title": f"【紧急预警】品牌'{brand_name}'可能存在虚假信息或诽谤",
"message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}, 置信度: {confidence}n详细分析: {analysis_result.get('details', '')}",
"channels": ["email", "sms"],
"recipients": {"email": ["[email protected]", "[email protected]"], "sms": ["+8613800138000"]}
})
# 规则3: 特定敏感关键词
sensitive_keywords = ['质量问题', '诈骗', '退款难', '虚假宣传']
if any(k in sensitive_keywords for k in keywords):
triggered_alerts.append({
"alert_id": "R004",
"level": "高",
"title": f"【高风险预警】品牌'{brand_name}'出现敏感关键词",
"message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n涉及关键词: {', '.join(keywords)}n情感: {sentiment}, 意图: {intent}",
"channels": ["email", "dingtalk"],
"recipients": {"email": ["[email protected]"], "dingtalk": ["dingtalk_webhook_url"]}
})
# 规则4: 中度负面评价
if (sentiment == 'Negative' or sentiment == 'Mixed') and is_negative_impact and not triggered_alerts: # 如果没有被更高级别的规则捕获
triggered_alerts.append({
"alert_id": "R005",
"level": "中",
"title": f"【中度预警】品牌'{brand_name}'出现负面评价",
"message": f"言论摘要: {mention_summary}n原始文本: {original_text[:200]}...n情感: {sentiment}, 意图: {intent}",
"channels": ["email"],
"recipients": {"email": ["[email protected]"]}
})
return triggered_alerts
def process_alert_message(alert_message: Dict[str, Any]):
"""
处理单个预警消息,并发送到指定渠道。
"""
alert_title = alert_message['title']
alert_body = alert_message['message']
channels = alert_message['channels']
recipients = alert_message['recipients']
if "email" in channels and "email" in recipients:
for email in recipients["email"]:
send_email_notification(email, alert_title, alert_body)
if "slack" in channels and "slack" in recipients:
for webhook_url in recipients["slack"]:
send_slack_notification(webhook_url, f"*{alert_title}*n{alert_body}")
# ... 其他通知渠道,如短信、钉钉等
# --- 主流程集成示例 ---
async def integrated_workflow(query_text: str, brand_name: str):
# 1. 数据采集
ai_response_text = await get_ai_search_result(query_text)
if not ai_response_text:
logging.warning(f"No AI response for query: {query_text}")
return
# 2. 数据预处理
cleaned_text = clean_text(ai_response_text)
# 3. RAG - 检索相关知识
relevant_knowledge = retrieve_relevant_knowledge(cleaned_text)
# 4. AI情感分析与意图识别
analysis_result = analyze_brand_mention_with_llm(
mention_text=cleaned_text,
brand_name=brand_name,
context_knowledge=relevant_knowledge
)
if not analysis_result:
logging.error(f"Failed to get analysis for: {cleaned_text[:100]}...")
return
# 将原始文本也加入分析结果,方便追溯
analysis_result['original_text'] = cleaned_text
logging.info(f"LLM Analysis Result: {json.dumps(analysis_result, ensure_ascii=False)}")
# 5. 预警规则评估
triggered_alerts = evaluate_alert_rules(analysis_result, brand_name)
# 6. 发布到消息队列
if triggered_alerts:
for alert in triggered_alerts:
message_queue.publish("brand_alerts", alert)
else:
logging.info("No alerts triggered for this mention.")
# 7. 模拟消息队列消费和通知发送 (在实际系统中,这通常是另一个独立的消费者服务)
consumed_alerts = message_queue.consume("brand_alerts")
for alert in consumed_alerts:
logging.info(f"Consuming alert: {alert['title']}")
process_alert_message(alert)
# 运行集成示例
async def run_integration_example():
test_brand_name = "你的品牌名"
test_queries = [
"关于 你的品牌名 的产品A续航太差了",
"有人说 你的品牌名 的客服态度不好",
"对 你的品牌名 的产品B非常满意,强烈推荐",
"你的品牌名 真的有质量问题吗?网上都在传"
]
for q in test_queries:
print(f"n--- Processing Query: {q} ---")
await integrated_workflow(q, test_brand_name)
await asyncio.sleep(random.uniform(5, 10)) # 避免过于频繁请求
if __name__ == "__main__":
# 配置环境变量 (请替换为您的实际值)
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
os.environ["SENDER_EMAIL"] = "[email protected]"
os.environ["SENDER_PASSWORD"] = "your_email_app_password" # 或SMTP密码
# os.environ["SMTP_SERVER"] = "smtp.office365.com"
# os.environ["SMTP_PORT"] = "587"
# os.environ["SLACK_WEBHOOK_URL"] = "YOUR_SLACK_WEBHOOK_URL"
# os.environ["DINGTALK_WEBHOOK_URL"] = "YOUR_DINGTALK_WEBHOOK_URL"
asyncio.run(run_integration_example())
代码说明:
- 消息队列模拟:
MessageQueue类简单模拟了消息的发布和消费,实际生产环境应使用Kafka、AWS SQS等专业消息队列服务。 - 通知服务:
send_email_notification和send_slack_notification演示了如何通过SMTP和HTTP Webhook发送通知。这些函数需要配置相应的环境变量(如邮箱账号密码、Webhook URL)。 evaluate_alert_rules函数: 这是预警逻辑的核心。它接收LLM的分析结果,并根据预设的规则列表进行匹配。每条规则定义了触发条件(如情感、意图、置信度、关键词)和触发后的预警级别、标题、消息内容以及通知渠道和接收人。process_alert_message函数: 负责根据预警消息中指定的渠道,调用相应的通知发送函数。integrated_workflow函数: 将之前所有模块串联起来,展示了从数据采集到最终预警通知的完整流程。- 环境变量: 所有的API密钥、Webhook URL等敏感信息都通过环境变量加载,增强安全性。
四、系统部署与运维考虑
一个实用的预警系统不仅要能工作,还要能稳定、高效、安全地运行。
4.1 可伸缩性(Scalability)
- 数据采集: 采用分布式爬虫架构。多个爬虫实例并行工作,配合代理IP池和User-Agent轮换,提高抓取效率和抗封禁能力。可以部署在Kubernetes集群或Serverless函数(如AWS Lambda, Google Cloud Functions)上。
- 消息队列: 使用Kafka、AWS SQS等托管服务,轻松处理高并发数据流,解耦各模块。
- LLM分析: LLM API本身具有高可用性和可伸缩性。在本地部署开源LLM时,考虑使用GPU集群。
- 向量数据库: 选择云托管的向量数据库服务(如Pinecone, Weaviate Cloud),它们通常自带高可用和弹性伸缩能力。
- 通知服务: 同样可以利用Serverless函数处理通知逻辑,按需伸缩。
4.2 可靠性(Reliability)
- 监控与日志: 部署全面的监控系统(如Prometheus + Grafana)来跟踪系统各项指标(CPU、内存、请求速率、错误率、LLM调用次数、预警触发次数)。详细记录所有模块的日志(如ELK Stack或Loki),便于问题排查。
- 错误处理与重试: 在每个模块中实现健壮的错误处理机制,例如网络请求失败时的指数退避重试、LLM调用超时重试等。
- 数据持久化: 关键数据(原始AI结果、分析结果、触发的预警)应持久化存储到数据库,以便审计、追溯和后续分析。
- 幂等性设计: 消息队列消费者应设计为幂等性,即多次处理同一条消息不会产生副作用,避免重复通知或数据错误。
4.3 安全性(Security)
- API密钥管理: 绝不将API密钥硬编码到代码中。使用环境变量、Secret Manager (如AWS Secrets Manager, Azure Key Vault) 或HashiCorp Vault等工具进行安全管理。
- 数据加密: 传输中的数据(TLS/SSL)和静态数据(数据库加密)都应进行加密。
- 访问控制: 对系统各个组件(数据库、云资源、API端点)实施最小权限原则,限制访问。
- 代理IP安全: 确保代理IP提供商的信誉,避免使用不安全的代理。
4.4 成本优化(Cost Optimization)
- LLM调用成本: LLM API调用是主要的成本来源。
- 模型选择: 根据任务复杂度选择合适的模型,不一定所有任务都需要GPT-4o。对简单任务使用更便宜的模型。
- Prompt优化: 精简Prompt,减少输入Token数量。
- 批处理: 尽可能批处理LLM请求,减少API调用的次数。
- 缓存: 对重复的查询或分析结果进行缓存。
- 向量数据库成本: 根据数据量和查询频率选择合适的向量数据库实例大小和类型。
- 计算资源成本: 利用Serverless计算(Lambda, Cloud Functions)实现按需付费,避免资源闲置浪费。
- 爬虫成本: 代理IP服务通常按流量或请求量收费,合理规划和选择服务商。
4.5 持续优化(Continuous Optimization)
- Prompt迭代: LLM的性能高度依赖于Prompt。需要持续测试和优化Prompt,提高分析准确性。
- 模型微调: 如果有大量的特定领域标注数据,可以考虑对开源LLM进行微调,以提高特定任务的性能并降低成本。
- 规则更新: 预警规则需要根据品牌策略、市场变化和实际效果进行动态调整。
- 反馈闭环: 收集预警系统的误报和漏报数据,形成反馈闭环,持续改进LLM分析和预警规则。
五、挑战与未来展望
5.1 当前挑战
- AI搜索结果的黑盒性与变化性: 目标AI搜索引擎可能随时更新其UI、反爬机制或底层模型,导致爬虫失效或分析结果不稳定。
- LLM的幻觉与偏见: 尽管RAG能有效缓解,但LLM仍可能生成不完全准确或带有偏见的回答,需要人工复核。
- 多语言支持的复杂性: 品牌可能面临多语言市场,需要针对不同语言配置不同的分词器、Embedding模型和LLM Prompt。
- 成本控制: 大规模部署和高频调用LLM可能产生不小的费用,需要精细化管理和优化。
- 误报与漏报: 这是一个持续的挑战,需要通过反馈循环和迭代优化来逐步改善。
5.2 未来展望
- 更高级的意图识别与情绪粒度: 发展更精细的LLM模型,不仅能识别基本情感,还能区分情绪强度、识别讽刺、暗示等复杂语境,甚至理解用户的情绪演变。
- 自动化响应建议: 系统在发现负面言论后,除了预警,还能根据品牌知识库和预设策略,为公关团队生成初步的应对建议或草稿,进一步提升响应效率。
- 多模态预警: 随着AI搜索引擎集成图片、视频等多模态内容,未来的预警系统需要能够分析图片中的品牌Logo、视频中的语音内容等,实现更全面的监控。
- 与CRM/BI系统深度集成: 将预警数据无缝集成到客户关系管理(CRM)或商业智能(BI)系统中,为品牌决策提供更全面的数据支持,例如将负面评价与具体客户ID关联,或分析负面趋势对销售的影响。
- 零样本/少样本学习: 减少对大量标注数据的依赖,通过更智能的Prompt工程和少量示例,让系统能够快速适应新的负面言论模式。
六、结语
今天,我们详细探讨了如何建立一个AI展现预警系统,以应对AI时代品牌声誉管理的新挑战。从数据采集、预处理、LLM深度分析,到智能预警和通知,我们构建了一个端到端的解决方案。这个系统不仅仅是技术上的创新,更是品牌在复杂多变的信息环境中保护自身、赢得用户信任的关键防线。
技术的道路永无止境,这个系统也需要持续的迭代和优化。我鼓励各位将今天所学的理论和代码付诸实践,结合您自身品牌的具体需求,不断探索和完善。只有这样,我们才能真正驾驭AI的力量,让它成为我们品牌增长和声誉维护的强大助力。
谢谢大家!期待与各位共同见证AI技术在品牌管理领域的更多创新。