各位同仁,下午好!
今天,我们将深入探讨一个在现代AI应用中至关重要的话题:如何对“Local Embedding”进行离线加速,特别是在多核CPU上利用OpenVINO工具套件,以显著优化向量计算的延迟。在RAG(检索增强生成)、语义搜索、推荐系统等领域,快速、高效地生成文本嵌入向量是核心需求。当面临大规模离线数据处理或对成本、隐私有严格要求时,将嵌入模型部署到本地CPU上进行加速,而非依赖云服务,便成为一个极具吸引力的方案。
1. 本地嵌入与计算瓶颈的深层剖析
1.1 什么是本地嵌入(Local Embedding)?
本地嵌入指的是在本地硬件设备上运行预训练好的深度学习模型(通常是基于Transformer架构),将文本数据转换成高维稠密向量的过程。这些向量捕捉了文本的语义信息,使得相似的文本在向量空间中距离相近。与依赖远程API(如OpenAI Embeddings)不同,本地嵌入具有以下显著优势:
- 成本效益: 避免了按使用量付费的模式,长期运行成本更低。
- 数据隐私: 敏感数据无需离开本地环境,符合严格的数据安全和隐私法规。
- 低延迟与高吞吐: 在优化得当的情况下,可以实现极低的推理延迟和极高的吞吐量,尤其适用于离线批处理场景。
- 离线可用性: 无需网络连接即可工作。
常见的本地嵌入模型包括Sentence-Transformers系列(如all-MiniLM-L6-v2, bge-small-zh-v1.5等)、Hugging Face上托管的各种Transformer模型。它们通常由一个tokenizer(分词器)和一个编码器模型(Encoder Model)组成。
1.2 本地嵌入的计算瓶颈
尽管本地嵌入优势明显,但其计算密集性也带来了挑战。一个典型的文本到向量的转换流程如下:
- 文本预处理与分词 (Tokenization): 将原始文本分解成模型可以理解的token ID序列,并生成注意力掩码(attention mask)等辅助信息。这通常涉及字符串处理、字典查找等操作。
- 模型推理 (Model Inference): 将token ID序列输入到深度学习模型中。对于Transformer模型,这包括:
- 词嵌入层 (Embedding Layer): 将token ID映射到词向量。
- 多头自注意力机制 (Multi-Head Self-Attention): 计算token之间的相互关系。这是Transformer的核心,涉及大量的矩阵乘法。
- 前馈网络 (Feed-Forward Network): 对每个token的表示进行非线性变换,同样包含密集的矩阵乘法。
- 层归一化 (Layer Normalization) 和激活函数 (Activation Functions): 辅助稳定训练和引入非线性。
- 池化操作 (Pooling): 将模型最后一层输出的每个token的向量(序列长度 x 隐藏层维度)聚合为一个单一的句子或文档向量(1 x 隐藏层维度)。常见的有均值池化(mean pooling)。
在这些步骤中,模型推理,特别是其中的矩阵乘法 (GEMM – General Matrix Multiply),是主要的计算瓶颈。Transformer模型由多层堆叠而成,每层都包含多次矩阵乘法,其计算复杂度随着序列长度和模型维度的增加而急剧上升。在CPU上,如何高效执行这些矩阵乘法,并充分利用多核并行能力,是优化延迟的关键。
2. OpenVINO:CPU推理优化的利器
为了应对CPU上的AI推理挑战,Intel推出了OpenVINO™ Toolkit。OpenVINO是一个开源工具包,旨在帮助开发者优化和部署高性能AI推理应用程序,尤其擅长在Intel硬件(包括CPU、集成显卡、Movidius VPU和FPGA)上加速推理。
2.1 OpenVINO的核心优势
- 模型优化器 (Model Optimizer): 这是一个命令行工具或Python API,用于将各种训练框架(如PyTorch, TensorFlow, ONNX)的模型转换为OpenVINO的中间表示(IR – Intermediate Representation)格式。在转换过程中,Model Optimizer会执行一系列图优化,如层融合、死代码消除、常量折叠等,以提高执行效率。
- 推理引擎 (Inference Engine): 这是OpenVINO的核心运行时组件,负责加载IR模型并在目标硬件上执行推理。它提供了统一的API,能够根据不同的硬件自动选择最优的执行路径。
- 硬件加速: OpenVINO能够自动检测并利用Intel CPU上的高级指令集,如AVX2、AVX512甚至最新的AMX(针对第五代Xeon处理器),显著加速矩阵乘法和其他向量操作。
- 多线程与并行处理: OpenVINO推理引擎内置了高效的多线程调度机制,能够充分利用多核CPU的并行计算能力,通过配置不同的策略(如流数量、线程亲和性)来优化吞吐量和延迟。
- 精度优化: 支持模型量化(如FP32到FP16或INT8),在保持可接受精度损失的前提下,大幅减少模型大小和计算量,进一步提升性能。
2.2 OpenVINO如何加速本地嵌入模型?
OpenVINO对本地嵌入模型的加速体现在以下几个方面:
- 模型图优化: 将Transformer模型中复杂的层结构(如注意力机制)优化为更高效的计算图,减少不必要的内存访问和计算冗余。
- 硬件指令集利用: 自动将矩阵乘法等核心操作映射到CPU的SIMD(单指令多数据)指令,实现数据级并行。
- 多核并行: 通过在多个CPU核心上并行执行不同的推理请求(或单个请求的不同部分),提升整体吞吐量或降低单个请求的延迟。
- 内存优化: 减少中间结果的内存占用,优化内存访问模式,降低缓存未命中率。
3. 准备本地嵌入模型用于OpenVINO
在利用OpenVINO加速之前,我们需要将基于PyTorch或TensorFlow训练的嵌入模型转换为OpenVINO的IR格式。通常,这需要先将模型导出为ONNX (Open Neural Network Exchange) 格式,因为ONNX是一种通用的模型表示,OpenVINO对它有很好的支持。
我们将以sentence-transformers/all-MiniLM-L6-v2模型为例进行说明。这是一个轻量级的英文嵌入模型,非常适合CPU部署。对于中文模型,可以考虑BAAI/bge-small-zh-v1.5。
3.1 环境准备
首先,确保安装了必要的库:
pip install torch transformers sentence-transformers onnx openvino
3.2 加载原始模型并进行基准测试
为了衡量优化效果,我们首先使用原始的Sentence-Transformers库进行推理,并记录其延迟。
import time
from sentence_transformers import SentenceTransformer
import torch
# 1. 加载原始Sentence-Transformer模型
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
# model_name = 'BAAI/bge-small-zh-v1.5' # 如果需要中文模型,请取消注释并安装相关依赖
original_model = SentenceTransformer(model_name)
original_model.eval() # 设置为评估模式
# 准备测试数据
texts = [
"This is a sample sentence for benchmarking.",
"OpenVINO is great for optimizing AI inference on CPUs.",
"How to accelerate local embedding models with OpenVINO?",
"Machine learning models are becoming increasingly powerful and complex."
] * 25 # 批量处理100个句子
print(f"原始模型加载完成: {model_name}")
# 进行一次预热,确保缓存和初始化完成
_ = original_model.encode(["warm up sentence"], convert_to_tensor=False)
# 测量原始模型推理延迟
start_time = time.perf_counter()
embeddings_original = original_model.encode(texts, convert_to_tensor=False, show_progress_bar=False)
end_time = time.perf_counter()
original_latency = (end_time - start_time) * 1000
print(f"原始模型推理延迟 (批量大小={len(texts)}): {original_latency:.2f} ms")
print(f"原始模型输出嵌入形状: {embeddings_original.shape}")
3.3 导出为ONNX格式
Sentence-Transformers库提供了一个方便的方法来导出其模型到ONNX。这个方法会处理好模型的输入/输出结构。
import os
import torch.onnx
# 定义ONNX模型保存路径
onnx_model_path = "model/all-MiniLM-L6-v2.onnx"
os.makedirs(os.path.dirname(onnx_model_path), exist_ok=True)
# 使用SentenceTransformer的export_to_onnx方法
# 注意:Sentence-Transformers在内部会处理好token_type_ids,
# 对于某些模型(如BERT),它可能需要,但对于MiniLM通常不需要,
# 但为了兼容性,可以假设它存在。
# 我们需要确保导出的ONNX模型输入与OpenVINO的期望一致。
# 通常Sentence-Transformers的ONNX导出处理了这些细节。
# 如果直接从transformers库导出,需要手动构建输入张量。
# 对于sentence-transformers库,通常可以直接调用其方法进行ONNX导出
# 它会处理好输入,通常是 input_ids, attention_mask, token_type_ids
# 我们需要一个样本输入来确定模型期望的动态维度
dummy_input_ids = torch.randint(0, original_model.tokenizer.vocab_size, (1, 128), dtype=torch.int64)
dummy_attention_mask = torch.ones((1, 128), dtype=torch.int64)
# MiniLM-L6-v2通常不需要token_type_ids,但某些模型需要
# dummy_token_type_ids = torch.zeros((1, 128), dtype=torch.int64)
# SentenceTransformer的export_to_onnx方法会自行处理输入和输出
# 它会创建一个WrapperModel,使得ONNX导出更顺畅
try:
original_model.export_to_onnx(onnx_model_path, opset=13, use_external_data_format=False)
print(f"模型成功导出到ONNX: {onnx_model_path}")
except Exception as e:
print(f"导出ONNX失败: {e}")
print("尝试手动导出(更复杂,但如果SentenceTransformer的方法失败,可以尝试)...")
# 如果export_to_onnx失败,可以尝试更底层的导出,但这需要更深入了解模型结构
# 这里我们假设export_to_onnx是成功的。
# 示例手动导出(通常用于直接从transformers库加载的模型):
# from transformers import AutoTokenizer, AutoModel
# model_hf = AutoModel.from_pretrained(model_name)
# tokenizer_hf = AutoTokenizer.from_pretrained(model_name)
#
# class MyEncoder(torch.nn.Module):
# def __init__(self, encoder, tokenizer):
# super().__init__()
# self.encoder = encoder
# self.tokenizer = tokenizer
#
# def forward(self, input_ids, attention_mask):
# model_output = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# # Sentence-Transformers通常对最后一层输出进行均值池化
# # 假设这里是均值池化,但实际模型可能更复杂
# sentence_embeddings = self.mean_pooling(model_output, attention_mask)
# return sentence_embeddings
#
# def mean_pooling(self, model_output, attention_mask):
# token_embeddings = model_output[0] # First element of model_output contains all token embeddings
# input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
# sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
# sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
# return sum_embeddings / sum_mask
#
# dummy_model = MyEncoder(model_hf, tokenizer_hf)
#
# torch.onnx.export(dummy_model,
# args=(dummy_input_ids, dummy_attention_mask),
# f=onnx_model_path,
# input_names=['input_ids', 'attention_mask'],
# output_names=['sentence_embeddings'],
# dynamic_axes={'input_ids': {0: 'batch_size', 1: 'sequence_length'},
# 'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
# 'sentence_embeddings': {0: 'batch_size'}},
# opset_version=13,
# do_constant_folding=True)
# print(f"手动模型成功导出到ONNX: {onnx_model_path}")
# 保存tokenizer,因为OpenVINO模型不包含分词逻辑
original_model.save_pretrained("model/tokenizer_only")
print("Tokenizer已保存到 model/tokenizer_only/")
导出的ONNX模型会包含模型的核心计算图。请注意,分词器(tokenizer)是一个独立的组件,OpenVINO只处理数值计算图,因此我们需要将分词器单独保存,并在推理时手动进行预处理。
3.4 OpenVINO模型转换
现在,我们将ONNX模型转换为OpenVINO的IR格式。这可以通过ov.convert_model函数(OpenVINO 2023.0及更高版本推荐)或旧版Model Optimizer工具完成。
import openvino as ov
# 定义OpenVINO IR模型保存路径
ov_model_path = "model/all-MiniLM-L6-v2.xml" # .xml是模型描述文件
ov_weights_path = "model/all-MiniLM-L6-v2.bin" # .bin是权重文件
# 使用ov.convert_model将ONNX模型转换为OpenVINO IR
try:
# 假设ONNX模型有三个输入: input_ids, attention_mask, token_type_ids
# 对于all-MiniLM-L6-v2,通常只需要input_ids和attention_mask。
# Sentence-Transformers的ONNX导出默认会处理好输入名称。
# 我们需要加载ONNX模型来检查其输入名称
from onnx import load as load_onnx
onnx_model = load_onnx(onnx_model_path)
input_names = [inp.name for inp in onnx_model.graph.input]
# print(f"ONNX模型输入名称: {input_names}")
# 定义模型的输入形状。
# OpenVINO支持动态形状,但为获得最佳性能,可以指定批大小和序列长度的上限。
# 或者使用-1表示动态。这里我们设置为动态。
# 注意:这里的input参数需要与ONNX模型的实际输入名称和顺序对应
# Sentence-Transformers导出的ONNX模型通常只有一个输入叫 "input_ids"
# 或者可能会有 "input_ids" 和 "attention_mask"
# 需要根据实际导出的ONNX模型结构来确定
# 我们可以尝试使用OpenVINO的默认转换,它通常能处理ONNX的动态输入
# 如果ONNX模型只有一个输入 (input_ids), 并且attention_mask是内部生成的或不需要
# 则输入定义可以简化。但更常见的是,attention_mask也是一个输入。
# 通常Sentence-Transformers导出的ONNX模型,输入是 `input_ids`, `attention_mask`
# 有时候还会有 `token_type_ids`
# 为确保兼容性,我们先检查原始模型的tokenizer的call方法
# print(original_model.tokenizer.model_input_names) # 通常是 ['input_ids', 'attention_mask']
# 假设输入为 input_ids, attention_mask,且都是动态形状
# OpenVINO 2023.0+ 的 convert_model 能够智能处理ONNX的动态形状
model_ov = ov.convert_model(
onnx_model_path,
# input=[
# ov.Type.i64, # input_ids
# ov.Type.i64, # attention_mask
# # ov.Type.i64, # token_type_ids, 如果模型需要
# ],
# shape=[
# ov.Dimension(1, -1), # input_ids: batch_size, sequence_length
# ov.Dimension(1, -1), # attention_mask: batch_size, sequence_length
# # ov.Dimension(1, -1), # token_type_ids: batch_size, sequence_length
# ],
# dynamic_shapes=True # 明确声明支持动态形状
)
ov.save_model(model_ov, ov_model_path, compress_to_fp16=False) # 默认FP32
print(f"ONNX模型成功转换为OpenVINO IR: {ov_model_path}")
except Exception as e:
print(f"OpenVINO模型转换失败: {e}")
print("请检查ONNX模型路径和其输入结构是否正确。")
print("如果模型转换失败,可能是因为原始模型对ONNX导出的兼容性问题。")
print("在这种情况下,可能需要使用Model Optimizer CLI工具,并手动指定输入参数。")
# 加载保存的tokenizer
from transformers import AutoTokenizer
tokenizer_ov = AutoTokenizer.from_pretrained("model/tokenizer_only")
print("OpenVINO推理使用的Tokenizer已加载。")
转换后的OpenVINO IR模型由两个文件组成:.xml文件(描述模型网络结构)和.bin文件(包含模型权重)。现在,我们已经准备好在OpenVINO推理引擎中加载和运行模型。
4. OpenVINO推理引擎深度解析:多核CPU优化策略
OpenVINO推理引擎是实现高性能的关键。它提供了一系列配置选项,允许我们精细控制模型在CPU上的行为,从而最大化利用多核资源。
4.1 核心概念
ov.Core: OpenVINO推理引擎的入口点。它负责管理设备、插件和模型加载。ov.CompiledModel: 将IR模型加载到特定设备(如CPU)后得到的编译模型。这个对象是线程安全的,可以创建多个推理请求。ov.InferRequest: 代表一个独立的推理任务。每个InferRequest都有自己的输入/输出缓冲区,可以独立提交和等待结果。
4.2 CPU特定配置属性
OpenVINO通过设置ov.properties.intel_cpu下的属性来调整CPU的行为。以下是一些最常用的属性:
| 属性名称 | 类型 | 描述
OpenVINO:在多核CPU上加速本地嵌入模型的深度优化与延迟优化
各位尊敬的与会者,各位同仁,
大家好!
今天,我们将聚焦一个在人工智能领域日益受到关注,并且在实际应用中非常重要的议题:如何利用OpenVINO工具套件,在多核CPU上对“Local Embedding”模型进行深度优化,从而显著降低向量计算的延迟。
在过去几年里,以Transformer为代表的预训练语言模型(PLMs)彻底改变了自然语言处理(NLP)的面貌。从语义搜索、信息检索、推荐系统到当前的检索增强生成(RAG)架构,文本嵌入(Text Embedding)作为将非结构化文本转化为机器可理解的数值向量的关键步骤,其性能直接影响着整个系统的效率和用户体验。
当我们的应用场景需要处理大量离线数据、对推理成本敏感、或出于数据隐私考虑必须在本地部署模型时,“Local Embedding”成为了首选。然而,Transformer模型的复杂性意味着其计算量巨大,如何在CPU这一通用且成本效益高的硬件平台上实现低延迟、高吞吐的本地嵌入,是摆在我们面前的一个重要挑战。
本次讲座,我将作为一名编程专家,带领大家系统地探索从模型选择、ONNX导出、OpenVINO转换,到深入理解OpenVINO推理引擎的CPU优化机制,并结合具体的代码实践,展示如何一步步实现高性能的本地嵌入推理。
第一章:本地嵌入模型的本质与计算瓶颈
在开始优化之旅前,我们首先要对本地嵌入模型有一个清晰的认识,并准确识别其计算上的“痛点”。
1.1 什么是本地嵌入?
本地嵌入,顾名思义,是指将预训练的嵌入模型部署在本地计算资源上,直接对输入的文本进行编码,生成对应的密集向量。与依赖云端API(如OpenAI API)相比,本地部署具有以下显著优势:
- 成本控制: 避免了按API调用次数付费的模式,对于大规模数据处理,长期运行成本更低。
- 数据隐私与安全: 敏感数据无需传输到第三方服务,完全在本地环境处理,满足合规性要求。
- 延迟与吞吐量: 在经过充分优化后,本地模型可以实现极低的推理延迟和极高的吞吐量,尤其适合离线批处理。
- 离线可用性: 无需互联网连接即可进行推理,增强了应用的鲁棒性。
目前主流的本地嵌入模型多基于Transformer架构,例如著名的Sentence-Transformers库中集成的各类模型,如all-MiniLM-L6-v2、bge-small-zh-v1.5等。这些模型通常由两个核心部分组成:
- Tokenizer (分词器): 负责将原始文本转换为模型能够理解的数值序列(token IDs)、注意力掩码(attention mask)等。
- Encoder Model (编码器模型): 通常是Transformer编码器,接收token IDs作为输入,输出每个token的上下文表示,最终通过池化(Pooling)操作(如均值池化)得到整个文本的固定维度向量。
1.2 计算瓶颈的深层剖析
一个文本经过嵌入模型生成向量的典型流程是:
原始文本 → Tokenizer → 数值输入(input_ids, attention_mask等) → Encoder Model → token表示 → Pooling → 最终嵌入向量
在这个流程中,计算资源主要消耗在以下几个环节:
- 分词器 (Tokenizer):
- 操作类型: 字符串处理、字典查找、填充(padding)、截断(truncation)。
- 计算特点: CPU密集型,但通常是轻量级的,尤其对于现代Rust实现的分词器(如Hugging Face
tokenizers库)。对于大量短文本,其串行处理可能积累成瓶颈。
- 编码器模型 (Encoder Model) 推理:
- 操作类型: 这是主要的计算瓶颈。Transformer模型的核心是多层堆叠的编码器,每层都包含:
- 词嵌入查找: 将token ID映射到稠密向量。
- 多头自注意力机制 (Multi-Head Self-Attention): 计算输入序列中不同token之间的相互关系。其核心是大量的矩阵乘法(GEMM – General Matrix Multiply)操作,例如 $Q cdot K^T$、$Softmax(QK^T) cdot V$等。
- 前馈网络 (Feed-Forward Network): 对自注意力层的输出进行非线性变换,同样包含密集的矩阵乘法。
- 层归一化 (Layer Normalization) 和激活函数: 提供稳定性并引入非线性。
- 计算特点: 极度计算密集型,尤其依赖于高效的矩阵乘法库(如BLAS/MKL)。其计算复杂度随序列长度的平方和模型维度线性增长。在CPU上,如何将这些庞大的矩阵乘法并行化并映射到CPU的SIMD(Single Instruction, Multiple Data)指令集(如AVX2, AVX512)是性能优化的关键。
- 操作类型: 这是主要的计算瓶颈。Transformer模型的核心是多层堆叠的编码器,每层都包含:
- 池化操作 (Pooling):
- 操作类型: 聚合操作,如均值池化(对所有token的表示求平均)。
- 计算特点: 通常是轻量级的向量求和或求平均操作,相比模型推理,其耗时可忽略不计。
核心瓶颈聚焦: 显然,编码器模型中的密集矩阵乘法是性能优化的重中之重。在多核CPU上,我们不仅需要利用单核的SIMD能力加速单个矩阵乘法,更需要通过合理的并行策略,让多个核心协同工作,以实现整体吞吐量和延迟的优化。
第二章:OpenVINO:CPU推理加速的强大引擎
OpenVINO™ Toolkit 是Intel为加速AI推理而设计的一套全面的开源工具和运行时。它旨在将深度学习模型从训练框架(如PyTorch、TensorFlow)高效地部署到各种Intel硬件平台,包括CPU、集成显卡(iGPU)、VPU(如Movidius)和FPGA。对于本次讲座的主题——在多核CPU上进行加速,OpenVINO提供了无与伦比的优化能力。
2.1 OpenVINO的核心组件与优势
OpenVINO的主要组件及其在优化本地嵌入模型中的作用:
- 模型优化器 (Model Optimizer – MO):
- 功能: 这是一个Python工具或API,负责将训练好的模型(ONNX、PyTorch、TensorFlow等)转换为OpenVINO的中间表示(IR – Intermediate Representation)格式。IR模型由两个文件组成:一个XML文件(描述模型网络结构)和一个BIN文件(包含模型权重)。
- 优化作用: 在转换过程中,MO执行一系列图级优化,包括:
- 层融合 (Layer Fusion): 将多个连续的、计算相关的层合并为一个高效的内部操作,减少内存访问和调度开销。例如,卷积-BN-ReLU序列可以融合成一个操作。
- 常量折叠 (Constant Folding): 预先计算并替换模型中的常量表达式。
- 死代码消除 (Dead Code Elimination): 移除对推理结果没有贡献的部分。
- 布局优化: 调整数据在内存中的布局,以适应目标硬件的最佳访问模式。
- 推理引擎 (Inference Engine):
- 功能: OpenVINO的核心运行时库,负责加载IR模型并在指定硬件上执行推理。它提供了一套统一的C++和Python API。
- 优化作用:
- 硬件指令集利用: 自动检测并利用Intel CPU的SIMD指令集(如SSE、AVX2、AVX512、AMX),将底层的矩阵乘法和向量操作编译成高度优化的机器码,实现数据级并行。
- 多线程与并行调度: 内置了复杂的线程管理和任务调度机制,能够智能地将推理任务分配到多个CPU核心上并行执行。支持同步和异步推理模式,以及多种性能配置。
- 内存优化: 优化内存分配和访问模式,降低缓存未命中率,提高数据局部性。
- 精度优化: 支持FP32、FP16、INT8等不同精度,通过量化可以在保持可接受精度损失的前提下,显著提升推理速度和降低内存占用。
2.2 OpenVINO在CPU上的特定优化能力
OpenVINO针对Intel CPU进行了深度优化,其优势体现在:
- Intel MKL-DNN (oneDNN) 集成: OpenVINO底层严重依赖oneDNN(以前称为MKL-DNN),这是一个高度优化的深度学习原语库,为Intel CPU提供了极致性能的矩阵乘法、卷积等操作。
- 线程亲和性与调度: 允许用户精细控制推理线程如何绑定到CPU核心,以优化缓存利用率和减少上下文切换开销。
- 异构计算支持: 即使模型部分在CPU上运行,部分在其他加速器上运行,OpenVINO也能提供统一的接口和协调能力。
- 缓存管理: 优化了数据流,使得数据能够更好地驻留在CPU的高速缓存中,减少对主内存的访问。
简而言之,OpenVINO不仅仅是一个模型转换工具,更是一个智能的运行时,它能够理解深度学习模型的计算图,并将其高效地映射到CPU的底层硬件特性上,从而释放出CPU的全部潜力。
第三章:本地嵌入模型的OpenVINO化实践
本章我们将通过具体的代码示例,展示如何将一个基于PyTorch和Sentence-Transformers的本地嵌入模型,转换为OpenVINO IR格式,并为后续的推理优化做准备。
3.1 环境设置与模型加载(复习与准备)
我们将继续使用 sentence-transformers/all-MiniLM-L6-v2 作为示例模型。
import time
import os
import torch
import openvino as ov
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
# 1. 确保目录存在
os.makedirs("model", exist_ok=True)
# 2. 加载原始Sentence-Transformer模型
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
original_model = SentenceTransformer(model_name)
original_model.eval()
print(f"原始Sentence-Transformer模型 '{model_name}' 已加载。")
# 3. 保存Tokenizer
# OpenVINO模型不包含分词逻辑,因此需要单独保存分词器。
tokenizer_path = "model/tokenizer_for_ov"
original_model.tokenizer.save_pretrained(tokenizer_path)
print(f"Tokenizer已保存到 '{tokenizer_path}'。")
# 4. 准备测试数据
texts = [
"OpenVINO is an open-source toolkit for optimizing and deploying AI inference.",
"Accelerating local embedding models on multi-core CPUs with OpenVINO.",
"Efficient vector computation is crucial for semantic search and RAG applications.",
"Intel provides various tools to boost machine learning performance.",
"This is a longer sentence to test maximum sequence length handling and performance implications.",
"Another example sentence for demonstration purposes.",
"Local embedding offers privacy, cost-effectiveness, and low latency for AI workloads."
] * 10 # 批量大小为70
# 5. 原始模型推理基准测试
# 进行一次预热,确保缓存和初始化完成
_ = original_model.encode(["warm up sentence"], convert_to_tensor=False)
start_time = time.perf_counter()
embeddings_original = original_model.encode(texts, convert_to_tensor=False, show_progress_bar=False)
end_time = time.perf_counter()
original_latency = (end_time - start_time) * 1000
print(f"n--- 原始模型基准测试 ---")
print(f"原始模型推理延迟 (批量大小={len(texts)}, 句子平均长度={len(texts[0])}): {original_latency:.2f} ms")
print(f"原始模型输出嵌入形状: {embeddings_original.shape}")
3.2 导出为ONNX格式
SentenceTransformer库提供了便利的export_to_onnx方法。它会封装原始PyTorch模型,使其输出符合Sentence-Transformers的池化逻辑,并生成ONNX格式。
关键在于确定ONNX模型的输入。对于all-MiniLM-L6-v2这样的模型,通常需要input_ids和attention_mask。token_type_ids在某些BERT-like模型中需要,但MiniLM通常不需要。export_to_onnx方法会智能处理这些。
# 6. 导出为ONNX格式
onnx_model_path = "model/all-MiniLM-L6-v2.onnx"
print(f"n--- 导出模型到ONNX ---")
try:
# opset_version=13 是一个比较稳定且兼容性好的ONNX操作集版本
# use_external_data_format=False 对于较小的模型可以不用外部文件
original_model.export_to_onnx(onnx_model_path, opset=13, use_external_data_format=False)
print(f"模型成功导出到ONNX: {onnx_model_path}")
except Exception as e:
print(f"导出ONNX失败: {e}")
print("请检查Sentence-Transformers版本或模型兼容性。")
# 验证ONNX模型输入名称 (可选)
import onnx
try:
onnx_graph = onnx.load(onnx_model_path)
input_names_onnx = [node.name for node in onnx_graph.graph.input]
print(f"ONNX模型输入名称: {input_names_onnx}")
# 期望看到类似 ['input_ids', 'attention_mask'] 或 ['input_ids', 'attention_mask', 'token_type_ids']
except Exception as e:
print(f"无法加载ONNX模型或获取输入名称: {e}")
3.3 OpenVINO模型转换
我们将ONNX模型转换为OpenVINO的IR格式(.xml和.bin)。OpenVINO 2023.0及更高版本推荐使用ov.convert_model函数,它比旧的mo.py脚本更方便且功能强大。convert_model能够自动检测ONNX模型的动态输入。
# 7. OpenVINO模型转换
ov_model_xml_path = "model/all-MiniLM-L6-v2.xml"
print(f"n--- 转换为OpenVINO IR格式 ---")
try:
# ov.convert_model 能够智能处理ONNX模型的动态形状,无需手动指定
# compress_to_fp16=False 保持FP32精度,如果需要FP16可以设为True
model_ov_ir = ov.convert_model(onnx_model_path, compress_to_fp16=False)
ov.save_model(model_ov_ir, ov_model_xml_path)
print(f"ONNX模型成功转换为OpenVINO IR: {ov_model_xml_path}")
except Exception as e:
print(f"OpenVINO模型转换失败: {e}")
print("请确保ONNX模型路径正确,并且OpenVINO安装完整。")
print("如果转换失败,可以尝试使用OpenVINO的Model Optimizer CLI工具进行更详细的配置。")
至此,我们已经成功将原始的Sentence-Transformers模型转换为OpenVINO的IR格式,并保存了对应的分词器。接下来,我们将进入OpenVINO推理引擎的核心部分,探索如何利用多核CPU进行高效推理。
第四章:OpenVINO推理引擎深度解析:多核CPU优化策略
OpenVINO推理引擎是实现高性能的关键。它提供了一系列配置选项,允许我们精细控制模型在CPU上的行为,从而最大化利用多核资源。
4.1 核心概念
ov.Core: OpenVINO推理引擎的入口点。它负责管理设备、插件和模型加载。ov.CompiledModel: 将IR模型加载到特定设备(如CPU)后得到的编译模型。这个对象是线程安全的,可以创建多个推理请求。ov.InferRequest: 代表一个独立的推理任务。每个InferRequest都有自己的输入/输出缓冲区,可以独立提交和等待结果。
4.2 CPU特定配置属性详解
OpenVINO通过设置ov.properties.intel_cpu下的属性来调整CPU的行为。这些属性对于在多核CPU上优化延迟和吞吐量至关重要。
| 属性名称 | 类型/值 | 描述
import numpy as np
import openvino.properties as ov_properties
import openvino.properties.intel_cpu as ov_properties_intel_cpu
from transformers import AutoTokenizer
# 确保 OpenVINO IR 模型和 tokenizer 已存在
ov_model_xml_path = "model/all-MiniLM-L6-v2.xml"
tokenizer_path = "model/tokenizer_for_ov"
if not os.path.exists(ov_model_xml_path) or not os.path.exists(tokenizer_path):
print("错误:OpenVINO IR 模型或 Tokenizer 路径不存在。请先运行前面的模型转换代码。")
exit()
# 1. 加载OpenVINO核心
core = ov.Core()
# 2. 加载OpenVINO模型
model_ov_ir = core.read_model(ov_model_xml_path)
# 获取模型输入信息
input_names = [inp.any_name for inp in model_ov_ir.inputs]
output_names = [out.any_name for out in model_ov_ir.outputs]
print(f"OpenVINO模型输入名称: {input_names}")
print(f"OpenVINO模型输出名称: {output_names}")
# 3. 加载Tokenizer
tokenizer_ov = AutoTokenizer.from_pretrained(tokenizer_path)
# 4. 准备测试数据 (与基准测试相同)
texts = [
"OpenVINO is an open-source toolkit for optimizing and deploying AI inference.",
"Accelerating local embedding models on multi-core CPUs with OpenVINO.",
"Efficient vector computation is crucial for semantic search and RAG applications.",
"Intel provides various tools to boost machine learning performance.",
"This is a longer sentence to test maximum sequence length handling and performance implications.",
"Another example sentence for demonstration purposes.",
"Local embedding offers privacy, cost-effectiveness, and low latency for AI workloads."
] * 10 # 批量大小为70
MAX_SEQ_LENGTH = 128 # 依据模型训练时的最大序列长度
#### 4.3 同步推理与CPU性能配置
首先,我们演示同步推理,并尝试配置CPU属性。
```python
print("n--- OpenVINO 同步推理与CPU配置 ---")
# 配置CPU属性
# 通常,NUM_STREAMS=AUTO 配合 PERFORMANCE_HINT=THROUGHPUT 是一个好的起点
# 或者 NUM_STREAMS=n 来指定 n 个并行流
cpu_config = {
# 性能提示:优先吞吐量,OpenVINO会尝试使用更多CPU核心
ov_properties.hint.MODEL_PRIORITY: ov_properties.hint.Priority.HIGH,
ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.THROUGHPUT,
# ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.LATENCY, # 如果更关注单次推理延迟
# NUM_STREAMS: 控制并行推理流的数量。
# AUTO: OpenVINO根据CPU核心数自动决定。
# n (整数): 明确指定流数量。通常可以设置为物理核心数或逻辑核心数的一半。
ov_properties_intel_cpu.NUM_STREAMS: ov_properties_intel_cpu.NUM_STREAMS.AUTO,
# ov_properties_intel_cpu.NUM_STREAMS: os.cpu_count() // 2, # 例如,设置为物理核心数
# AFFINITY: 线程亲和性,控制推理线程如何绑定到CPU核心。
# CORE: 将线程绑定到物理核心。对于吞吐量优化通常更优。
# HYBRID_AWARE: 适用于Intel大小核架构 (P-cores/E-cores),OpenVINO会智能调度。
# NONE: 不绑定,由操作系统调度。
ov_properties_intel_cpu.AFFINITY: ov_properties_intel_cpu.Affinity.CORE,
# INFERENCE_PRECISION_HINT: 推理精度。
# FP32: 默认精度。
# FP16: 半精度,减少内存带宽和计算量,可能轻微损失精度。
# INT8: 8位整数,进一步提升性能,但需要校准(PTQ),精度损失可能较大。
ov_properties.hint.INFERENCE_PRECISION: ov.Type.f32, # 保持FP32,如果模型未进行INT8量化
# ENABLE_CPU_PINNING (OpenVINO 2023.0+): 显式启用/禁用CPU线程绑定
# ov_properties_intel_cpu.ENABLE_CPU_PINNING: True, # 默认为True
# NUM_THREADS: 每个推理请求使用的线程数。通常由 NUM_STREAMS 和 AFFINITY 更好地管理。
# ov_properties_intel_cpu.NUM_THREADS: os.cpu_count(), # 谨慎使用,可能与NUM_STREAMS冲突
}
# 编译模型到CPU设备
try:
compiled_model = core.compile_model(model_ov_ir, "CPU", cpu_config)
print(f"OpenVINO模型已编译到CPU,配置: {cpu_config}")
except Exception as e:
print(f"编译OpenVINO模型失败: {e}")
exit()
# 获取输入/输出节点
input_ids_name = compiled_model.input(0).any_name
attention_mask_name = compiled_model.input(1).any_name
output_name = compiled_model.output(0).any_name
# OpenVINO同步推理函数
def run_ov_inference_sync(texts_batch):
# 分词
tokenized_inputs = tokenizer_ov(
texts_batch,
padding='max_length',
truncation=True,
max_length=MAX_SEQ_LENGTH,
return_tensors='np' # OpenVINO通常使用NumPy数组
)
# 准备输入字典
inputs = {
input_ids_name: tokenized_inputs['input_ids'],
attention_mask_name: tokenized_inputs['attention_mask']
}
# 如果模型需要 token_type_ids,也需要添加到inputs中
# if 'token_type_ids' in tokenized_inputs:
# inputs['token_type_ids'] = tokenized_inputs['token_type_ids']
# 执行推理
result = compiled_model(inputs) # 简化的同步推理调用
return result[output_name]
# 预热
_ = run_ov_inference_sync(["warm up sentence"])
start_time_ov_sync = time.perf_counter()
embeddings_ov_sync = run_ov_inference_sync(texts)
end_time_ov_sync = time.perf_counter()
ov_sync_latency = (end_time_ov_sync - start_time_ov_sync) * 1000
print(f"OpenVINO同步推理延迟 (批量大小={len(texts)}): {ov_sync_latency:.2f} ms")
print(f"OpenVINO同步推理输出嵌入形状: {embeddings_ov_sync.shape}")
配置详解:
PERFORMANCE_HINT:THROUGHPUT:OpenVINO会尝试最大化每秒处理的请求数量。它通常会创建与CPU核心数相近的推理流,并让它们并行运行。这对于离线批处理这种需要高吞吐的场景非常适用。LATENCY:OpenVINO会尝试最小化单个推理请求的完成时间。它通常只创建一个推理流,并为该流分配尽可能多的计算资源。
NUM_STREAMS: 这是控制并行度的最重要参数。AUTO:OpenVINO根据启发式算法和当前CPU类型自动选择最佳流数。对于大多数情况,这是一个很好的起点。n(整数):手动指定要创建的并行推理流的数量。例如,在一个16核CPU上,设置NUM_STREAMS=8可能意味着8个推理流并行运行,每个流使用一部分CPU核心资源。对于离线加速,通常会设置一个等于或略小于物理核心数的流数,以充分利用CPU。
AFFINITY: 控制推理线程与CPU核心的绑定策略。CORE:将推理线程绑定到物理核心。这通常能减少缓存冲突,提高性能。HYBRID_AWARE:适用于具有大小核架构(如Intel的Alder Lake/Raptor Lake)的CPU。OpenVINO会智能地将任务调度到P-cores(性能核心)或E-cores(能效核心),以平衡性能和功耗。NONE:不进行线程绑定,由操作系统决定。
INFERENCE_PRECISION_HINT: 建议的推理精度。FP32是默认,FP16可以提升速度但可能损失精度,INT8则需要专门的量化流程。
4.4 异步推理与手动批处理(离线加速核心)
对于离线加速,我们通常会处理一个大型数据集。将整个数据集一次性加载到内存并推理是不现实的。相反,我们倾向于将数据切分成若干批次(mini-batches),然后并行处理这些批次。异步推理在这里发挥着关键作用,它允许我们在一个批次计算的同时,准备下一个批次的数据,从而隐藏数据预处理和传输的延迟。
import numpy as np
from threading import Thread, Event
from collections import deque
print("n--- OpenVINO 异步推理与手动批处理 (离线加速) ---")
# 重新编译模型,这次我们更倾向于吞吐量,并使用更多的流
# 假设我们有8个物理核心,可以尝试8个流
num_inference_streams = os.cpu_count() # 或者 os.cpu_count() // 2
print(f"尝试使用 {num_inference_streams} 个推理流。")
cpu_config_async = {
ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.THROUGHPUT,
ov_properties_intel_cpu.NUM_STREAMS: num_inference_streams,
ov_properties_intel_cpu.AFFINITY: ov_properties_intel_cpu.Affinity.CORE,
ov_properties.hint.INFERENCE_PRECISION: ov.Type.f32,
}
try:
compiled_model_async = core.compile_model(model_ov_ir, "CPU", cpu_config_async)
print(f"OpenVINO模型已编译到CPU,配置: {cpu_config_async}")
except Exception as e:
print(f"编译OpenVINO模型失败: {e}")
exit()
# 获取输入/输出节点 (与同步推理相同)
input_ids_name_async = compiled_model_async.input(0).any_name
attention_mask_name_async = compiled_model_async.input(1).any_name
output_name_async = compiled_model_async.output(0).any_name
# 异步推理的生产者-消费者模式
# 生产者:负责分词和将输入数据放入队列
# 消费者:从队列中取出数据,执行异步推理,并收集结果
class AsyncInferencer:
def __init__(self, compiled_model, tokenizer, max_seq_length, batch_size, num_streams):
self.compiled_model = compiled_model
self.tokenizer = tokenizer
self.max_seq_length = max_seq_length
self.batch_size = batch_size
self.num_streams = num_streams
self.infer_requests = []
for _ in range(self.num_streams):
self.infer_requests.append(compiled_model.create_infer_request())
self.input_queue = deque() # 存放待处理的文本批次
self.output_queue = deque() # 存放已完成的嵌入批次
self.stop_event = Event()
self.producer_thread = None
self.consumer_thread = None
self.input_names = [inp.any_name for inp in compiled_model.inputs]
self.output_name = compiled_model.output(0).any_name
def _tokenize_batch(self, texts_batch):
return self.tokenizer(
texts_batch,
padding='max_length',
truncation=True,
max_length=self.max_seq_length,
return_tensors='np'
)
def _producer_loop(self, all_texts):
for i in range(0, len(all_texts), self.batch_size):
batch_texts = all_texts[i:i + self.batch_size]
tokenized_batch = self._tokenize_batch(batch_texts)
self.input_queue.append(tokenized_batch)
# print(f"Producer: Added batch {i//self.batch_size + 1}/{len(all_texts)//self.batch_size + 1} to queue.")
self.stop_event.set() # 生产完成信号
def _consumer_loop(self):
req_idx = 0
active_requests = {} # 存储正在进行的请求及其对应的批次索引
total_batches_processed = 0
total_batches_to_process = (len(self.all_texts_for_producer) + self.batch_size - 1) // self.batch_size
while not self.stop_event.is_set() or len(self.input_queue) > 0 or len(active_requests) > 0:
# 尝试提交新的请求
if len(self.input_queue) > 0:
for _ in range(self.num_streams): # 尝试填充所有可用流
if len(self.input_queue) > 0 and req_idx not in active_requests:
tokenized_batch = self.input_queue.popleft()
inputs = {
self.input_names[0]: tokenized_batch['input_ids'],
self.input_names[1]: tokenized_batch['attention_mask']
}
request = self.infer_requests[req_idx]
request.set_inputs(inputs)
request.start_async()
active_requests[req_idx] = True # 标记此请求正在运行
# print(f"Consumer: Started async request {req_idx}.")
req_idx = (req_idx + 1) % self.num_streams
else:
break # 没有更多输入或没有空闲请求
# 检查已完成的请求
for i in list(active_requests.keys()): # 遍历活动请求的副本
request = self.infer_requests[i]
if request.wait_for(0) == ov.WaitStatus.RESULT_READY: # 检查是否完成,不阻塞
result = request.get_output_tensor(self.output_name).data
self.output_queue.append(result)
del active_requests[i]
total_batches_processed += 1
# print(f"Consumer: Completed request {i}. Total processed: {total_batches_processed}/{total_batches_to_process}")
# 如果没有活动请求且生产已停止且输入队列为空,则退出
if self.stop_event.is_set() and len(self.input_queue) == 0 and len(active_requests) == 0:
break
time.sleep(0.001) # 短暂休眠,避免忙等待
def infer(self, all_texts):
self.all_texts_for_producer = all_texts
self.stop_event.clear()
self.input_queue.clear()
self.output_queue.clear()
self.producer_thread = Thread(target=self._producer_loop, args=(all_texts,))
self.consumer_thread = Thread(target=self._consumer_loop)
self.producer_thread.start()
self.consumer_thread.start()
self.producer_thread.join() # 等待所有文本被分词并放入队列
self.consumer_thread.join() # 等待所有推理请求完成
# 收集所有结果
all_embeddings = []
while len(self.output_queue) > 0:
all_embeddings.append(self.output_queue.popleft())
if not all_embeddings:
return np.array([]) # 处理空结果情况
return np.vstack(all_embeddings)
# 设置批处理大小
inference_batch_size = 32 # 调整这个值以找到最佳性能点
# 初始化异步推理器
async_inferencer = AsyncInferencer(
compiled_model=compiled_model_async,
tokenizer=tokenizer_ov,
max_seq_length=MAX_SEQ_LENGTH,
batch_size=inference_batch_size,
num_streams=num_inference_streams
)
# 预热 (对于异步推理,预热可能需要更复杂的逻辑,这里简化为运行一次小批次推理)
_ = async_inferencer.infer(["warm up sentence"] * inference_batch_size)
start_time_ov_async = time.perf_counter()
embeddings_ov_async = async_inferencer.infer(texts)
end_time_ov_async = time.perf_counter()
ov_async_latency = (end_time_ov_async - start_time_ov_async) * 1000
print(f"OpenVINO异步推理延迟 (总文本数={len(texts)}, 批大小={inference_batch_size}, 流数={num_inference_streams}): {ov_async