深入 ‘Local Embedding’ 离线加速:在多核 CPU 上利用 OpenVINO 优化向量计算的延迟

各位同仁,下午好!

今天,我们将深入探讨一个在现代AI应用中至关重要的话题:如何对“Local Embedding”进行离线加速,特别是在多核CPU上利用OpenVINO工具套件,以显著优化向量计算的延迟。在RAG(检索增强生成)、语义搜索、推荐系统等领域,快速、高效地生成文本嵌入向量是核心需求。当面临大规模离线数据处理或对成本、隐私有严格要求时,将嵌入模型部署到本地CPU上进行加速,而非依赖云服务,便成为一个极具吸引力的方案。

1. 本地嵌入与计算瓶颈的深层剖析

1.1 什么是本地嵌入(Local Embedding)?

本地嵌入指的是在本地硬件设备上运行预训练好的深度学习模型(通常是基于Transformer架构),将文本数据转换成高维稠密向量的过程。这些向量捕捉了文本的语义信息,使得相似的文本在向量空间中距离相近。与依赖远程API(如OpenAI Embeddings)不同,本地嵌入具有以下显著优势:

  • 成本效益: 避免了按使用量付费的模式,长期运行成本更低。
  • 数据隐私: 敏感数据无需离开本地环境,符合严格的数据安全和隐私法规。
  • 低延迟与高吞吐: 在优化得当的情况下,可以实现极低的推理延迟和极高的吞吐量,尤其适用于离线批处理场景。
  • 离线可用性: 无需网络连接即可工作。

常见的本地嵌入模型包括Sentence-Transformers系列(如all-MiniLM-L6-v2, bge-small-zh-v1.5等)、Hugging Face上托管的各种Transformer模型。它们通常由一个tokenizer(分词器)和一个编码器模型(Encoder Model)组成。

1.2 本地嵌入的计算瓶颈

尽管本地嵌入优势明显,但其计算密集性也带来了挑战。一个典型的文本到向量的转换流程如下:

  1. 文本预处理与分词 (Tokenization): 将原始文本分解成模型可以理解的token ID序列,并生成注意力掩码(attention mask)等辅助信息。这通常涉及字符串处理、字典查找等操作。
  2. 模型推理 (Model Inference): 将token ID序列输入到深度学习模型中。对于Transformer模型,这包括:
    • 词嵌入层 (Embedding Layer): 将token ID映射到词向量。
    • 多头自注意力机制 (Multi-Head Self-Attention): 计算token之间的相互关系。这是Transformer的核心,涉及大量的矩阵乘法。
    • 前馈网络 (Feed-Forward Network): 对每个token的表示进行非线性变换,同样包含密集的矩阵乘法。
    • 层归一化 (Layer Normalization) 和激活函数 (Activation Functions): 辅助稳定训练和引入非线性。
  3. 池化操作 (Pooling): 将模型最后一层输出的每个token的向量(序列长度 x 隐藏层维度)聚合为一个单一的句子或文档向量(1 x 隐藏层维度)。常见的有均值池化(mean pooling)。

在这些步骤中,模型推理,特别是其中的矩阵乘法 (GEMM – General Matrix Multiply),是主要的计算瓶颈。Transformer模型由多层堆叠而成,每层都包含多次矩阵乘法,其计算复杂度随着序列长度和模型维度的增加而急剧上升。在CPU上,如何高效执行这些矩阵乘法,并充分利用多核并行能力,是优化延迟的关键。

2. OpenVINO:CPU推理优化的利器

为了应对CPU上的AI推理挑战,Intel推出了OpenVINO™ Toolkit。OpenVINO是一个开源工具包,旨在帮助开发者优化和部署高性能AI推理应用程序,尤其擅长在Intel硬件(包括CPU、集成显卡、Movidius VPU和FPGA)上加速推理。

2.1 OpenVINO的核心优势

  • 模型优化器 (Model Optimizer): 这是一个命令行工具或Python API,用于将各种训练框架(如PyTorch, TensorFlow, ONNX)的模型转换为OpenVINO的中间表示(IR – Intermediate Representation)格式。在转换过程中,Model Optimizer会执行一系列图优化,如层融合、死代码消除、常量折叠等,以提高执行效率。
  • 推理引擎 (Inference Engine): 这是OpenVINO的核心运行时组件,负责加载IR模型并在目标硬件上执行推理。它提供了统一的API,能够根据不同的硬件自动选择最优的执行路径。
  • 硬件加速: OpenVINO能够自动检测并利用Intel CPU上的高级指令集,如AVX2、AVX512甚至最新的AMX(针对第五代Xeon处理器),显著加速矩阵乘法和其他向量操作。
  • 多线程与并行处理: OpenVINO推理引擎内置了高效的多线程调度机制,能够充分利用多核CPU的并行计算能力,通过配置不同的策略(如流数量、线程亲和性)来优化吞吐量和延迟。
  • 精度优化: 支持模型量化(如FP32到FP16或INT8),在保持可接受精度损失的前提下,大幅减少模型大小和计算量,进一步提升性能。

2.2 OpenVINO如何加速本地嵌入模型?

OpenVINO对本地嵌入模型的加速体现在以下几个方面:

  1. 模型图优化: 将Transformer模型中复杂的层结构(如注意力机制)优化为更高效的计算图,减少不必要的内存访问和计算冗余。
  2. 硬件指令集利用: 自动将矩阵乘法等核心操作映射到CPU的SIMD(单指令多数据)指令,实现数据级并行。
  3. 多核并行: 通过在多个CPU核心上并行执行不同的推理请求(或单个请求的不同部分),提升整体吞吐量或降低单个请求的延迟。
  4. 内存优化: 减少中间结果的内存占用,优化内存访问模式,降低缓存未命中率。

3. 准备本地嵌入模型用于OpenVINO

在利用OpenVINO加速之前,我们需要将基于PyTorch或TensorFlow训练的嵌入模型转换为OpenVINO的IR格式。通常,这需要先将模型导出为ONNX (Open Neural Network Exchange) 格式,因为ONNX是一种通用的模型表示,OpenVINO对它有很好的支持。

我们将以sentence-transformers/all-MiniLM-L6-v2模型为例进行说明。这是一个轻量级的英文嵌入模型,非常适合CPU部署。对于中文模型,可以考虑BAAI/bge-small-zh-v1.5

3.1 环境准备

首先,确保安装了必要的库:

pip install torch transformers sentence-transformers onnx openvino

3.2 加载原始模型并进行基准测试

为了衡量优化效果,我们首先使用原始的Sentence-Transformers库进行推理,并记录其延迟。

import time
from sentence_transformers import SentenceTransformer
import torch

# 1. 加载原始Sentence-Transformer模型
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
# model_name = 'BAAI/bge-small-zh-v1.5' # 如果需要中文模型,请取消注释并安装相关依赖
original_model = SentenceTransformer(model_name)
original_model.eval() # 设置为评估模式

# 准备测试数据
texts = [
    "This is a sample sentence for benchmarking.",
    "OpenVINO is great for optimizing AI inference on CPUs.",
    "How to accelerate local embedding models with OpenVINO?",
    "Machine learning models are becoming increasingly powerful and complex."
] * 25 # 批量处理100个句子

print(f"原始模型加载完成: {model_name}")

# 进行一次预热,确保缓存和初始化完成
_ = original_model.encode(["warm up sentence"], convert_to_tensor=False)

# 测量原始模型推理延迟
start_time = time.perf_counter()
embeddings_original = original_model.encode(texts, convert_to_tensor=False, show_progress_bar=False)
end_time = time.perf_counter()
original_latency = (end_time - start_time) * 1000
print(f"原始模型推理延迟 (批量大小={len(texts)}): {original_latency:.2f} ms")
print(f"原始模型输出嵌入形状: {embeddings_original.shape}")

3.3 导出为ONNX格式

Sentence-Transformers库提供了一个方便的方法来导出其模型到ONNX。这个方法会处理好模型的输入/输出结构。

import os
import torch.onnx

# 定义ONNX模型保存路径
onnx_model_path = "model/all-MiniLM-L6-v2.onnx"
os.makedirs(os.path.dirname(onnx_model_path), exist_ok=True)

# 使用SentenceTransformer的export_to_onnx方法
# 注意:Sentence-Transformers在内部会处理好token_type_ids,
# 对于某些模型(如BERT),它可能需要,但对于MiniLM通常不需要,
# 但为了兼容性,可以假设它存在。
# 我们需要确保导出的ONNX模型输入与OpenVINO的期望一致。
# 通常Sentence-Transformers的ONNX导出处理了这些细节。
# 如果直接从transformers库导出,需要手动构建输入张量。

# 对于sentence-transformers库,通常可以直接调用其方法进行ONNX导出
# 它会处理好输入,通常是 input_ids, attention_mask, token_type_ids
# 我们需要一个样本输入来确定模型期望的动态维度
dummy_input_ids = torch.randint(0, original_model.tokenizer.vocab_size, (1, 128), dtype=torch.int64)
dummy_attention_mask = torch.ones((1, 128), dtype=torch.int64)
# MiniLM-L6-v2通常不需要token_type_ids,但某些模型需要
# dummy_token_type_ids = torch.zeros((1, 128), dtype=torch.int64)

# SentenceTransformer的export_to_onnx方法会自行处理输入和输出
# 它会创建一个WrapperModel,使得ONNX导出更顺畅
try:
    original_model.export_to_onnx(onnx_model_path, opset=13, use_external_data_format=False)
    print(f"模型成功导出到ONNX: {onnx_model_path}")
except Exception as e:
    print(f"导出ONNX失败: {e}")
    print("尝试手动导出(更复杂,但如果SentenceTransformer的方法失败,可以尝试)...")
    # 如果export_to_onnx失败,可以尝试更底层的导出,但这需要更深入了解模型结构
    # 这里我们假设export_to_onnx是成功的。
    # 示例手动导出(通常用于直接从transformers库加载的模型):
    # from transformers import AutoTokenizer, AutoModel
    # model_hf = AutoModel.from_pretrained(model_name)
    # tokenizer_hf = AutoTokenizer.from_pretrained(model_name)
    #
    # class MyEncoder(torch.nn.Module):
    #     def __init__(self, encoder, tokenizer):
    #         super().__init__()
    #         self.encoder = encoder
    #         self.tokenizer = tokenizer
    #
    #     def forward(self, input_ids, attention_mask):
    #         model_output = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
    #         # Sentence-Transformers通常对最后一层输出进行均值池化
    #         # 假设这里是均值池化,但实际模型可能更复杂
    #         sentence_embeddings = self.mean_pooling(model_output, attention_mask)
    #         return sentence_embeddings
    #
    #     def mean_pooling(self, model_output, attention_mask):
    #         token_embeddings = model_output[0] # First element of model_output contains all token embeddings
    #         input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    #         sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, 1)
    #         sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    #         return sum_embeddings / sum_mask
    #
    # dummy_model = MyEncoder(model_hf, tokenizer_hf)
    #
    # torch.onnx.export(dummy_model,
    #                   args=(dummy_input_ids, dummy_attention_mask),
    #                   f=onnx_model_path,
    #                   input_names=['input_ids', 'attention_mask'],
    #                   output_names=['sentence_embeddings'],
    #                   dynamic_axes={'input_ids': {0: 'batch_size', 1: 'sequence_length'},
    #                                 'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
    #                                 'sentence_embeddings': {0: 'batch_size'}},
    #                   opset_version=13,
    #                   do_constant_folding=True)
    # print(f"手动模型成功导出到ONNX: {onnx_model_path}")

# 保存tokenizer,因为OpenVINO模型不包含分词逻辑
original_model.save_pretrained("model/tokenizer_only")
print("Tokenizer已保存到 model/tokenizer_only/")

导出的ONNX模型会包含模型的核心计算图。请注意,分词器(tokenizer)是一个独立的组件,OpenVINO只处理数值计算图,因此我们需要将分词器单独保存,并在推理时手动进行预处理。

3.4 OpenVINO模型转换

现在,我们将ONNX模型转换为OpenVINO的IR格式。这可以通过ov.convert_model函数(OpenVINO 2023.0及更高版本推荐)或旧版Model Optimizer工具完成。

import openvino as ov

# 定义OpenVINO IR模型保存路径
ov_model_path = "model/all-MiniLM-L6-v2.xml" # .xml是模型描述文件
ov_weights_path = "model/all-MiniLM-L6-v2.bin" # .bin是权重文件

# 使用ov.convert_model将ONNX模型转换为OpenVINO IR
try:
    # 假设ONNX模型有三个输入: input_ids, attention_mask, token_type_ids
    # 对于all-MiniLM-L6-v2,通常只需要input_ids和attention_mask。
    # Sentence-Transformers的ONNX导出默认会处理好输入名称。
    # 我们需要加载ONNX模型来检查其输入名称
    from onnx import load as load_onnx
    onnx_model = load_onnx(onnx_model_path)
    input_names = [inp.name for inp in onnx_model.graph.input]
    # print(f"ONNX模型输入名称: {input_names}")

    # 定义模型的输入形状。
    # OpenVINO支持动态形状,但为获得最佳性能,可以指定批大小和序列长度的上限。
    # 或者使用-1表示动态。这里我们设置为动态。
    # 注意:这里的input参数需要与ONNX模型的实际输入名称和顺序对应
    # Sentence-Transformers导出的ONNX模型通常只有一个输入叫 "input_ids"
    # 或者可能会有 "input_ids" 和 "attention_mask"
    # 需要根据实际导出的ONNX模型结构来确定
    # 我们可以尝试使用OpenVINO的默认转换,它通常能处理ONNX的动态输入

    # 如果ONNX模型只有一个输入 (input_ids), 并且attention_mask是内部生成的或不需要
    # 则输入定义可以简化。但更常见的是,attention_mask也是一个输入。
    # 通常Sentence-Transformers导出的ONNX模型,输入是 `input_ids`, `attention_mask`
    # 有时候还会有 `token_type_ids`
    # 为确保兼容性,我们先检查原始模型的tokenizer的call方法
    # print(original_model.tokenizer.model_input_names) # 通常是 ['input_ids', 'attention_mask']

    # 假设输入为 input_ids, attention_mask,且都是动态形状
    # OpenVINO 2023.0+ 的 convert_model 能够智能处理ONNX的动态形状
    model_ov = ov.convert_model(
        onnx_model_path,
        # input=[
        #     ov.Type.i64, # input_ids
        #     ov.Type.i64, # attention_mask
        #     # ov.Type.i64, # token_type_ids, 如果模型需要
        # ],
        # shape=[
        #     ov.Dimension(1, -1), # input_ids: batch_size, sequence_length
        #     ov.Dimension(1, -1), # attention_mask: batch_size, sequence_length
        #     # ov.Dimension(1, -1), # token_type_ids: batch_size, sequence_length
        # ],
        # dynamic_shapes=True # 明确声明支持动态形状
    )

    ov.save_model(model_ov, ov_model_path, compress_to_fp16=False) # 默认FP32
    print(f"ONNX模型成功转换为OpenVINO IR: {ov_model_path}")

except Exception as e:
    print(f"OpenVINO模型转换失败: {e}")
    print("请检查ONNX模型路径和其输入结构是否正确。")
    print("如果模型转换失败,可能是因为原始模型对ONNX导出的兼容性问题。")
    print("在这种情况下,可能需要使用Model Optimizer CLI工具,并手动指定输入参数。")

# 加载保存的tokenizer
from transformers import AutoTokenizer
tokenizer_ov = AutoTokenizer.from_pretrained("model/tokenizer_only")
print("OpenVINO推理使用的Tokenizer已加载。")

转换后的OpenVINO IR模型由两个文件组成:.xml文件(描述模型网络结构)和.bin文件(包含模型权重)。现在,我们已经准备好在OpenVINO推理引擎中加载和运行模型。

4. OpenVINO推理引擎深度解析:多核CPU优化策略

OpenVINO推理引擎是实现高性能的关键。它提供了一系列配置选项,允许我们精细控制模型在CPU上的行为,从而最大化利用多核资源。

4.1 核心概念

  • ov.Core: OpenVINO推理引擎的入口点。它负责管理设备、插件和模型加载。
  • ov.CompiledModel: 将IR模型加载到特定设备(如CPU)后得到的编译模型。这个对象是线程安全的,可以创建多个推理请求。
  • ov.InferRequest: 代表一个独立的推理任务。每个InferRequest都有自己的输入/输出缓冲区,可以独立提交和等待结果。

4.2 CPU特定配置属性

OpenVINO通过设置ov.properties.intel_cpu下的属性来调整CPU的行为。以下是一些最常用的属性:

| 属性名称 | 类型 | 描述

OpenVINO:在多核CPU上加速本地嵌入模型的深度优化与延迟优化

各位尊敬的与会者,各位同仁,

大家好!

今天,我们将聚焦一个在人工智能领域日益受到关注,并且在实际应用中非常重要的议题:如何利用OpenVINO工具套件,在多核CPU上对“Local Embedding”模型进行深度优化,从而显著降低向量计算的延迟。

在过去几年里,以Transformer为代表的预训练语言模型(PLMs)彻底改变了自然语言处理(NLP)的面貌。从语义搜索、信息检索、推荐系统到当前的检索增强生成(RAG)架构,文本嵌入(Text Embedding)作为将非结构化文本转化为机器可理解的数值向量的关键步骤,其性能直接影响着整个系统的效率和用户体验。

当我们的应用场景需要处理大量离线数据、对推理成本敏感、或出于数据隐私考虑必须在本地部署模型时,“Local Embedding”成为了首选。然而,Transformer模型的复杂性意味着其计算量巨大,如何在CPU这一通用且成本效益高的硬件平台上实现低延迟、高吞吐的本地嵌入,是摆在我们面前的一个重要挑战。

本次讲座,我将作为一名编程专家,带领大家系统地探索从模型选择、ONNX导出、OpenVINO转换,到深入理解OpenVINO推理引擎的CPU优化机制,并结合具体的代码实践,展示如何一步步实现高性能的本地嵌入推理。


第一章:本地嵌入模型的本质与计算瓶颈

在开始优化之旅前,我们首先要对本地嵌入模型有一个清晰的认识,并准确识别其计算上的“痛点”。

1.1 什么是本地嵌入?

本地嵌入,顾名思义,是指将预训练的嵌入模型部署在本地计算资源上,直接对输入的文本进行编码,生成对应的密集向量。与依赖云端API(如OpenAI API)相比,本地部署具有以下显著优势:

  • 成本控制: 避免了按API调用次数付费的模式,对于大规模数据处理,长期运行成本更低。
  • 数据隐私与安全: 敏感数据无需传输到第三方服务,完全在本地环境处理,满足合规性要求。
  • 延迟与吞吐量: 在经过充分优化后,本地模型可以实现极低的推理延迟和极高的吞吐量,尤其适合离线批处理。
  • 离线可用性: 无需互联网连接即可进行推理,增强了应用的鲁棒性。

目前主流的本地嵌入模型多基于Transformer架构,例如著名的Sentence-Transformers库中集成的各类模型,如all-MiniLM-L6-v2bge-small-zh-v1.5等。这些模型通常由两个核心部分组成:

  1. Tokenizer (分词器): 负责将原始文本转换为模型能够理解的数值序列(token IDs)、注意力掩码(attention mask)等。
  2. Encoder Model (编码器模型): 通常是Transformer编码器,接收token IDs作为输入,输出每个token的上下文表示,最终通过池化(Pooling)操作(如均值池化)得到整个文本的固定维度向量。

1.2 计算瓶颈的深层剖析

一个文本经过嵌入模型生成向量的典型流程是:

原始文本 → Tokenizer → 数值输入(input_ids, attention_mask等) → Encoder Model → token表示 → Pooling → 最终嵌入向量

在这个流程中,计算资源主要消耗在以下几个环节:

  1. 分词器 (Tokenizer):
    • 操作类型: 字符串处理、字典查找、填充(padding)、截断(truncation)。
    • 计算特点: CPU密集型,但通常是轻量级的,尤其对于现代Rust实现的分词器(如Hugging Face tokenizers库)。对于大量短文本,其串行处理可能积累成瓶颈。
  2. 编码器模型 (Encoder Model) 推理:
    • 操作类型: 这是主要的计算瓶颈。Transformer模型的核心是多层堆叠的编码器,每层都包含:
      • 词嵌入查找: 将token ID映射到稠密向量。
      • 多头自注意力机制 (Multi-Head Self-Attention): 计算输入序列中不同token之间的相互关系。其核心是大量的矩阵乘法(GEMM – General Matrix Multiply)操作,例如 $Q cdot K^T$、$Softmax(QK^T) cdot V$等。
      • 前馈网络 (Feed-Forward Network): 对自注意力层的输出进行非线性变换,同样包含密集的矩阵乘法
      • 层归一化 (Layer Normalization) 和激活函数: 提供稳定性并引入非线性。
    • 计算特点: 极度计算密集型,尤其依赖于高效的矩阵乘法库(如BLAS/MKL)。其计算复杂度随序列长度的平方和模型维度线性增长。在CPU上,如何将这些庞大的矩阵乘法并行化并映射到CPU的SIMD(Single Instruction, Multiple Data)指令集(如AVX2, AVX512)是性能优化的关键。
  3. 池化操作 (Pooling):
    • 操作类型: 聚合操作,如均值池化(对所有token的表示求平均)。
    • 计算特点: 通常是轻量级的向量求和或求平均操作,相比模型推理,其耗时可忽略不计。

核心瓶颈聚焦: 显然,编码器模型中的密集矩阵乘法是性能优化的重中之重。在多核CPU上,我们不仅需要利用单核的SIMD能力加速单个矩阵乘法,更需要通过合理的并行策略,让多个核心协同工作,以实现整体吞吐量和延迟的优化。


第二章:OpenVINO:CPU推理加速的强大引擎

OpenVINO™ Toolkit 是Intel为加速AI推理而设计的一套全面的开源工具和运行时。它旨在将深度学习模型从训练框架(如PyTorch、TensorFlow)高效地部署到各种Intel硬件平台,包括CPU、集成显卡(iGPU)、VPU(如Movidius)和FPGA。对于本次讲座的主题——在多核CPU上进行加速,OpenVINO提供了无与伦比的优化能力。

2.1 OpenVINO的核心组件与优势

OpenVINO的主要组件及其在优化本地嵌入模型中的作用:

  1. 模型优化器 (Model Optimizer – MO):
    • 功能: 这是一个Python工具或API,负责将训练好的模型(ONNX、PyTorch、TensorFlow等)转换为OpenVINO的中间表示(IR – Intermediate Representation)格式。IR模型由两个文件组成:一个XML文件(描述模型网络结构)和一个BIN文件(包含模型权重)。
    • 优化作用: 在转换过程中,MO执行一系列图级优化,包括:
      • 层融合 (Layer Fusion): 将多个连续的、计算相关的层合并为一个高效的内部操作,减少内存访问和调度开销。例如,卷积-BN-ReLU序列可以融合成一个操作。
      • 常量折叠 (Constant Folding): 预先计算并替换模型中的常量表达式。
      • 死代码消除 (Dead Code Elimination): 移除对推理结果没有贡献的部分。
      • 布局优化: 调整数据在内存中的布局,以适应目标硬件的最佳访问模式。
  2. 推理引擎 (Inference Engine):
    • 功能: OpenVINO的核心运行时库,负责加载IR模型并在指定硬件上执行推理。它提供了一套统一的C++和Python API。
    • 优化作用:
      • 硬件指令集利用: 自动检测并利用Intel CPU的SIMD指令集(如SSE、AVX2、AVX512、AMX),将底层的矩阵乘法和向量操作编译成高度优化的机器码,实现数据级并行。
      • 多线程与并行调度: 内置了复杂的线程管理和任务调度机制,能够智能地将推理任务分配到多个CPU核心上并行执行。支持同步和异步推理模式,以及多种性能配置。
      • 内存优化: 优化内存分配和访问模式,降低缓存未命中率,提高数据局部性。
      • 精度优化: 支持FP32、FP16、INT8等不同精度,通过量化可以在保持可接受精度损失的前提下,显著提升推理速度和降低内存占用。

2.2 OpenVINO在CPU上的特定优化能力

OpenVINO针对Intel CPU进行了深度优化,其优势体现在:

  • Intel MKL-DNN (oneDNN) 集成: OpenVINO底层严重依赖oneDNN(以前称为MKL-DNN),这是一个高度优化的深度学习原语库,为Intel CPU提供了极致性能的矩阵乘法、卷积等操作。
  • 线程亲和性与调度: 允许用户精细控制推理线程如何绑定到CPU核心,以优化缓存利用率和减少上下文切换开销。
  • 异构计算支持: 即使模型部分在CPU上运行,部分在其他加速器上运行,OpenVINO也能提供统一的接口和协调能力。
  • 缓存管理: 优化了数据流,使得数据能够更好地驻留在CPU的高速缓存中,减少对主内存的访问。

简而言之,OpenVINO不仅仅是一个模型转换工具,更是一个智能的运行时,它能够理解深度学习模型的计算图,并将其高效地映射到CPU的底层硬件特性上,从而释放出CPU的全部潜力。


第三章:本地嵌入模型的OpenVINO化实践

本章我们将通过具体的代码示例,展示如何将一个基于PyTorch和Sentence-Transformers的本地嵌入模型,转换为OpenVINO IR格式,并为后续的推理优化做准备。

3.1 环境设置与模型加载(复习与准备)

我们将继续使用 sentence-transformers/all-MiniLM-L6-v2 作为示例模型。

import time
import os
import torch
import openvino as ov
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer

# 1. 确保目录存在
os.makedirs("model", exist_ok=True)

# 2. 加载原始Sentence-Transformer模型
model_name = 'sentence-transformers/all-MiniLM-L6-v2'
original_model = SentenceTransformer(model_name)
original_model.eval()
print(f"原始Sentence-Transformer模型 '{model_name}' 已加载。")

# 3. 保存Tokenizer
# OpenVINO模型不包含分词逻辑,因此需要单独保存分词器。
tokenizer_path = "model/tokenizer_for_ov"
original_model.tokenizer.save_pretrained(tokenizer_path)
print(f"Tokenizer已保存到 '{tokenizer_path}'。")

# 4. 准备测试数据
texts = [
    "OpenVINO is an open-source toolkit for optimizing and deploying AI inference.",
    "Accelerating local embedding models on multi-core CPUs with OpenVINO.",
    "Efficient vector computation is crucial for semantic search and RAG applications.",
    "Intel provides various tools to boost machine learning performance.",
    "This is a longer sentence to test maximum sequence length handling and performance implications.",
    "Another example sentence for demonstration purposes.",
    "Local embedding offers privacy, cost-effectiveness, and low latency for AI workloads."
] * 10 # 批量大小为70

# 5. 原始模型推理基准测试
# 进行一次预热,确保缓存和初始化完成
_ = original_model.encode(["warm up sentence"], convert_to_tensor=False)

start_time = time.perf_counter()
embeddings_original = original_model.encode(texts, convert_to_tensor=False, show_progress_bar=False)
end_time = time.perf_counter()
original_latency = (end_time - start_time) * 1000
print(f"n--- 原始模型基准测试 ---")
print(f"原始模型推理延迟 (批量大小={len(texts)}, 句子平均长度={len(texts[0])}): {original_latency:.2f} ms")
print(f"原始模型输出嵌入形状: {embeddings_original.shape}")

3.2 导出为ONNX格式

SentenceTransformer库提供了便利的export_to_onnx方法。它会封装原始PyTorch模型,使其输出符合Sentence-Transformers的池化逻辑,并生成ONNX格式。

关键在于确定ONNX模型的输入。对于all-MiniLM-L6-v2这样的模型,通常需要input_idsattention_masktoken_type_ids在某些BERT-like模型中需要,但MiniLM通常不需要。export_to_onnx方法会智能处理这些。

# 6. 导出为ONNX格式
onnx_model_path = "model/all-MiniLM-L6-v2.onnx"

print(f"n--- 导出模型到ONNX ---")
try:
    # opset_version=13 是一个比较稳定且兼容性好的ONNX操作集版本
    # use_external_data_format=False 对于较小的模型可以不用外部文件
    original_model.export_to_onnx(onnx_model_path, opset=13, use_external_data_format=False)
    print(f"模型成功导出到ONNX: {onnx_model_path}")
except Exception as e:
    print(f"导出ONNX失败: {e}")
    print("请检查Sentence-Transformers版本或模型兼容性。")

# 验证ONNX模型输入名称 (可选)
import onnx
try:
    onnx_graph = onnx.load(onnx_model_path)
    input_names_onnx = [node.name for node in onnx_graph.graph.input]
    print(f"ONNX模型输入名称: {input_names_onnx}")
    # 期望看到类似 ['input_ids', 'attention_mask'] 或 ['input_ids', 'attention_mask', 'token_type_ids']
except Exception as e:
    print(f"无法加载ONNX模型或获取输入名称: {e}")

3.3 OpenVINO模型转换

我们将ONNX模型转换为OpenVINO的IR格式(.xml.bin)。OpenVINO 2023.0及更高版本推荐使用ov.convert_model函数,它比旧的mo.py脚本更方便且功能强大。convert_model能够自动检测ONNX模型的动态输入。

# 7. OpenVINO模型转换
ov_model_xml_path = "model/all-MiniLM-L6-v2.xml"

print(f"n--- 转换为OpenVINO IR格式 ---")
try:
    # ov.convert_model 能够智能处理ONNX模型的动态形状,无需手动指定
    # compress_to_fp16=False 保持FP32精度,如果需要FP16可以设为True
    model_ov_ir = ov.convert_model(onnx_model_path, compress_to_fp16=False)
    ov.save_model(model_ov_ir, ov_model_xml_path)
    print(f"ONNX模型成功转换为OpenVINO IR: {ov_model_xml_path}")
except Exception as e:
    print(f"OpenVINO模型转换失败: {e}")
    print("请确保ONNX模型路径正确,并且OpenVINO安装完整。")
    print("如果转换失败,可以尝试使用OpenVINO的Model Optimizer CLI工具进行更详细的配置。")

至此,我们已经成功将原始的Sentence-Transformers模型转换为OpenVINO的IR格式,并保存了对应的分词器。接下来,我们将进入OpenVINO推理引擎的核心部分,探索如何利用多核CPU进行高效推理。


第四章:OpenVINO推理引擎深度解析:多核CPU优化策略

OpenVINO推理引擎是实现高性能的关键。它提供了一系列配置选项,允许我们精细控制模型在CPU上的行为,从而最大化利用多核资源。

4.1 核心概念

  • ov.Core: OpenVINO推理引擎的入口点。它负责管理设备、插件和模型加载。
  • ov.CompiledModel: 将IR模型加载到特定设备(如CPU)后得到的编译模型。这个对象是线程安全的,可以创建多个推理请求。
  • ov.InferRequest: 代表一个独立的推理任务。每个InferRequest都有自己的输入/输出缓冲区,可以独立提交和等待结果。

4.2 CPU特定配置属性详解

OpenVINO通过设置ov.properties.intel_cpu下的属性来调整CPU的行为。这些属性对于在多核CPU上优化延迟和吞吐量至关重要。

| 属性名称 | 类型/值 | 描述

import numpy as np
import openvino.properties as ov_properties
import openvino.properties.intel_cpu as ov_properties_intel_cpu
from transformers import AutoTokenizer

# 确保 OpenVINO IR 模型和 tokenizer 已存在
ov_model_xml_path = "model/all-MiniLM-L6-v2.xml"
tokenizer_path = "model/tokenizer_for_ov"

if not os.path.exists(ov_model_xml_path) or not os.path.exists(tokenizer_path):
    print("错误:OpenVINO IR 模型或 Tokenizer 路径不存在。请先运行前面的模型转换代码。")
    exit()

# 1. 加载OpenVINO核心
core = ov.Core()

# 2. 加载OpenVINO模型
model_ov_ir = core.read_model(ov_model_xml_path)

# 获取模型输入信息
input_names = [inp.any_name for inp in model_ov_ir.inputs]
output_names = [out.any_name for out in model_ov_ir.outputs]
print(f"OpenVINO模型输入名称: {input_names}")
print(f"OpenVINO模型输出名称: {output_names}")

# 3. 加载Tokenizer
tokenizer_ov = AutoTokenizer.from_pretrained(tokenizer_path)

# 4. 准备测试数据 (与基准测试相同)
texts = [
    "OpenVINO is an open-source toolkit for optimizing and deploying AI inference.",
    "Accelerating local embedding models on multi-core CPUs with OpenVINO.",
    "Efficient vector computation is crucial for semantic search and RAG applications.",
    "Intel provides various tools to boost machine learning performance.",
    "This is a longer sentence to test maximum sequence length handling and performance implications.",
    "Another example sentence for demonstration purposes.",
    "Local embedding offers privacy, cost-effectiveness, and low latency for AI workloads."
] * 10 # 批量大小为70
MAX_SEQ_LENGTH = 128 # 依据模型训练时的最大序列长度

#### 4.3 同步推理与CPU性能配置

首先,我们演示同步推理,并尝试配置CPU属性。

```python
print("n--- OpenVINO 同步推理与CPU配置 ---")

# 配置CPU属性
# 通常,NUM_STREAMS=AUTO 配合 PERFORMANCE_HINT=THROUGHPUT 是一个好的起点
# 或者 NUM_STREAMS=n 来指定 n 个并行流
cpu_config = {
    # 性能提示:优先吞吐量,OpenVINO会尝试使用更多CPU核心
    ov_properties.hint.MODEL_PRIORITY: ov_properties.hint.Priority.HIGH,
    ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.THROUGHPUT,
    # ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.LATENCY, # 如果更关注单次推理延迟

    # NUM_STREAMS: 控制并行推理流的数量。
    # AUTO: OpenVINO根据CPU核心数自动决定。
    # n (整数): 明确指定流数量。通常可以设置为物理核心数或逻辑核心数的一半。
    ov_properties_intel_cpu.NUM_STREAMS: ov_properties_intel_cpu.NUM_STREAMS.AUTO,
    # ov_properties_intel_cpu.NUM_STREAMS: os.cpu_count() // 2, # 例如,设置为物理核心数

    # AFFINITY: 线程亲和性,控制推理线程如何绑定到CPU核心。
    # CORE: 将线程绑定到物理核心。对于吞吐量优化通常更优。
    # HYBRID_AWARE: 适用于Intel大小核架构 (P-cores/E-cores),OpenVINO会智能调度。
    # NONE: 不绑定,由操作系统调度。
    ov_properties_intel_cpu.AFFINITY: ov_properties_intel_cpu.Affinity.CORE,

    # INFERENCE_PRECISION_HINT: 推理精度。
    # FP32: 默认精度。
    # FP16: 半精度,减少内存带宽和计算量,可能轻微损失精度。
    # INT8: 8位整数,进一步提升性能,但需要校准(PTQ),精度损失可能较大。
    ov_properties.hint.INFERENCE_PRECISION: ov.Type.f32, # 保持FP32,如果模型未进行INT8量化

    # ENABLE_CPU_PINNING (OpenVINO 2023.0+): 显式启用/禁用CPU线程绑定
    # ov_properties_intel_cpu.ENABLE_CPU_PINNING: True, # 默认为True

    # NUM_THREADS: 每个推理请求使用的线程数。通常由 NUM_STREAMS 和 AFFINITY 更好地管理。
    # ov_properties_intel_cpu.NUM_THREADS: os.cpu_count(), # 谨慎使用,可能与NUM_STREAMS冲突
}

# 编译模型到CPU设备
try:
    compiled_model = core.compile_model(model_ov_ir, "CPU", cpu_config)
    print(f"OpenVINO模型已编译到CPU,配置: {cpu_config}")
except Exception as e:
    print(f"编译OpenVINO模型失败: {e}")
    exit()

# 获取输入/输出节点
input_ids_name = compiled_model.input(0).any_name
attention_mask_name = compiled_model.input(1).any_name
output_name = compiled_model.output(0).any_name

# OpenVINO同步推理函数
def run_ov_inference_sync(texts_batch):
    # 分词
    tokenized_inputs = tokenizer_ov(
        texts_batch,
        padding='max_length',
        truncation=True,
        max_length=MAX_SEQ_LENGTH,
        return_tensors='np' # OpenVINO通常使用NumPy数组
    )

    # 准备输入字典
    inputs = {
        input_ids_name: tokenized_inputs['input_ids'],
        attention_mask_name: tokenized_inputs['attention_mask']
    }
    # 如果模型需要 token_type_ids,也需要添加到inputs中
    # if 'token_type_ids' in tokenized_inputs:
    #     inputs['token_type_ids'] = tokenized_inputs['token_type_ids']

    # 执行推理
    result = compiled_model(inputs) # 简化的同步推理调用
    return result[output_name]

# 预热
_ = run_ov_inference_sync(["warm up sentence"])

start_time_ov_sync = time.perf_counter()
embeddings_ov_sync = run_ov_inference_sync(texts)
end_time_ov_sync = time.perf_counter()
ov_sync_latency = (end_time_ov_sync - start_time_ov_sync) * 1000

print(f"OpenVINO同步推理延迟 (批量大小={len(texts)}): {ov_sync_latency:.2f} ms")
print(f"OpenVINO同步推理输出嵌入形状: {embeddings_ov_sync.shape}")

配置详解:

  • PERFORMANCE_HINT
    • THROUGHPUT:OpenVINO会尝试最大化每秒处理的请求数量。它通常会创建与CPU核心数相近的推理流,并让它们并行运行。这对于离线批处理这种需要高吞吐的场景非常适用。
    • LATENCY:OpenVINO会尝试最小化单个推理请求的完成时间。它通常只创建一个推理流,并为该流分配尽可能多的计算资源。
  • NUM_STREAMS 这是控制并行度的最重要参数。
    • AUTO:OpenVINO根据启发式算法和当前CPU类型自动选择最佳流数。对于大多数情况,这是一个很好的起点。
    • n (整数):手动指定要创建的并行推理流的数量。例如,在一个16核CPU上,设置NUM_STREAMS=8可能意味着8个推理流并行运行,每个流使用一部分CPU核心资源。对于离线加速,通常会设置一个等于或略小于物理核心数的流数,以充分利用CPU。
  • AFFINITY 控制推理线程与CPU核心的绑定策略。
    • CORE:将推理线程绑定到物理核心。这通常能减少缓存冲突,提高性能。
    • HYBRID_AWARE:适用于具有大小核架构(如Intel的Alder Lake/Raptor Lake)的CPU。OpenVINO会智能地将任务调度到P-cores(性能核心)或E-cores(能效核心),以平衡性能和功耗。
    • NONE:不进行线程绑定,由操作系统决定。
  • INFERENCE_PRECISION_HINT 建议的推理精度。FP32是默认,FP16可以提升速度但可能损失精度,INT8则需要专门的量化流程。

4.4 异步推理与手动批处理(离线加速核心)

对于离线加速,我们通常会处理一个大型数据集。将整个数据集一次性加载到内存并推理是不现实的。相反,我们倾向于将数据切分成若干批次(mini-batches),然后并行处理这些批次。异步推理在这里发挥着关键作用,它允许我们在一个批次计算的同时,准备下一个批次的数据,从而隐藏数据预处理和传输的延迟。


import numpy as np
from threading import Thread, Event
from collections import deque

print("n--- OpenVINO 异步推理与手动批处理 (离线加速) ---")

# 重新编译模型,这次我们更倾向于吞吐量,并使用更多的流
# 假设我们有8个物理核心,可以尝试8个流
num_inference_streams = os.cpu_count() # 或者 os.cpu_count() // 2
print(f"尝试使用 {num_inference_streams} 个推理流。")

cpu_config_async = {
    ov_properties.hint.PERFORMANCE_HINT: ov_properties.hint.PerformanceHint.THROUGHPUT,
    ov_properties_intel_cpu.NUM_STREAMS: num_inference_streams,
    ov_properties_intel_cpu.AFFINITY: ov_properties_intel_cpu.Affinity.CORE,
    ov_properties.hint.INFERENCE_PRECISION: ov.Type.f32,
}

try:
    compiled_model_async = core.compile_model(model_ov_ir, "CPU", cpu_config_async)
    print(f"OpenVINO模型已编译到CPU,配置: {cpu_config_async}")
except Exception as e:
    print(f"编译OpenVINO模型失败: {e}")
    exit()

# 获取输入/输出节点 (与同步推理相同)
input_ids_name_async = compiled_model_async.input(0).any_name
attention_mask_name_async = compiled_model_async.input(1).any_name
output_name_async = compiled_model_async.output(0).any_name

# 异步推理的生产者-消费者模式
# 生产者:负责分词和将输入数据放入队列
# 消费者:从队列中取出数据,执行异步推理,并收集结果

class AsyncInferencer:
    def __init__(self, compiled_model, tokenizer, max_seq_length, batch_size, num_streams):
        self.compiled_model = compiled_model
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length
        self.batch_size = batch_size
        self.num_streams = num_streams

        self.infer_requests = []
        for _ in range(self.num_streams):
            self.infer_requests.append(compiled_model.create_infer_request())

        self.input_queue = deque() # 存放待处理的文本批次
        self.output_queue = deque() # 存放已完成的嵌入批次
        self.stop_event = Event()
        self.producer_thread = None
        self.consumer_thread = None

        self.input_names = [inp.any_name for inp in compiled_model.inputs]
        self.output_name = compiled_model.output(0).any_name

    def _tokenize_batch(self, texts_batch):
        return self.tokenizer(
            texts_batch,
            padding='max_length',
            truncation=True,
            max_length=self.max_seq_length,
            return_tensors='np'
        )

    def _producer_loop(self, all_texts):
        for i in range(0, len(all_texts), self.batch_size):
            batch_texts = all_texts[i:i + self.batch_size]
            tokenized_batch = self._tokenize_batch(batch_texts)
            self.input_queue.append(tokenized_batch)
            # print(f"Producer: Added batch {i//self.batch_size + 1}/{len(all_texts)//self.batch_size + 1} to queue.")
        self.stop_event.set() # 生产完成信号

    def _consumer_loop(self):
        req_idx = 0
        active_requests = {} # 存储正在进行的请求及其对应的批次索引

        total_batches_processed = 0
        total_batches_to_process = (len(self.all_texts_for_producer) + self.batch_size - 1) // self.batch_size

        while not self.stop_event.is_set() or len(self.input_queue) > 0 or len(active_requests) > 0:
            # 尝试提交新的请求
            if len(self.input_queue) > 0:
                for _ in range(self.num_streams): # 尝试填充所有可用流
                    if len(self.input_queue) > 0 and req_idx not in active_requests:
                        tokenized_batch = self.input_queue.popleft()

                        inputs = {
                            self.input_names[0]: tokenized_batch['input_ids'],
                            self.input_names[1]: tokenized_batch['attention_mask']
                        }

                        request = self.infer_requests[req_idx]
                        request.set_inputs(inputs)
                        request.start_async()
                        active_requests[req_idx] = True # 标记此请求正在运行
                        # print(f"Consumer: Started async request {req_idx}.")
                        req_idx = (req_idx + 1) % self.num_streams
                    else:
                        break # 没有更多输入或没有空闲请求

            # 检查已完成的请求
            for i in list(active_requests.keys()): # 遍历活动请求的副本
                request = self.infer_requests[i]
                if request.wait_for(0) == ov.WaitStatus.RESULT_READY: # 检查是否完成,不阻塞
                    result = request.get_output_tensor(self.output_name).data
                    self.output_queue.append(result)
                    del active_requests[i]
                    total_batches_processed += 1
                    # print(f"Consumer: Completed request {i}. Total processed: {total_batches_processed}/{total_batches_to_process}")

            # 如果没有活动请求且生产已停止且输入队列为空,则退出
            if self.stop_event.is_set() and len(self.input_queue) == 0 and len(active_requests) == 0:
                break

            time.sleep(0.001) # 短暂休眠,避免忙等待

    def infer(self, all_texts):
        self.all_texts_for_producer = all_texts
        self.stop_event.clear()
        self.input_queue.clear()
        self.output_queue.clear()

        self.producer_thread = Thread(target=self._producer_loop, args=(all_texts,))
        self.consumer_thread = Thread(target=self._consumer_loop)

        self.producer_thread.start()
        self.consumer_thread.start()

        self.producer_thread.join() # 等待所有文本被分词并放入队列
        self.consumer_thread.join() # 等待所有推理请求完成

        # 收集所有结果
        all_embeddings = []
        while len(self.output_queue) > 0:
            all_embeddings.append(self.output_queue.popleft())

        if not all_embeddings:
            return np.array([]) # 处理空结果情况
        return np.vstack(all_embeddings)

# 设置批处理大小
inference_batch_size = 32 # 调整这个值以找到最佳性能点

# 初始化异步推理器
async_inferencer = AsyncInferencer(
    compiled_model=compiled_model_async,
    tokenizer=tokenizer_ov,
    max_seq_length=MAX_SEQ_LENGTH,
    batch_size=inference_batch_size,
    num_streams=num_inference_streams
)

# 预热 (对于异步推理,预热可能需要更复杂的逻辑,这里简化为运行一次小批次推理)
_ = async_inferencer.infer(["warm up sentence"] * inference_batch_size)

start_time_ov_async = time.perf_counter()
embeddings_ov_async = async_inferencer.infer(texts)
end_time_ov_async = time.perf_counter()
ov_async_latency = (end_time_ov_async - start_time_ov_async) * 1000

print(f"OpenVINO异步推理延迟 (总文本数={len(texts)}, 批大小={inference_batch_size}, 流数={num_inference_streams}): {ov_async

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注