引言:Flutter FFI 与跨语言并发的挑战
在现代应用开发中,Flutter 以其出色的跨平台能力和高效的UI渲染机制赢得了广泛的赞誉。然而,当Flutter应用需要与底层硬件交互、利用现有原生库或执行计算密集型任务时,纯Dart环境的局限性便会显现。此时,Flutter FFI (Foreign Function Interface) 应运而生,它提供了一条优雅的途径,使Dart代码能够直接调用C语言接口,从而间接与C++、Rust等其他原生语言编写的功能进行交互。
FFI的引入极大地扩展了Flutter的能力边界,但同时也带来了新的挑战,尤其是在并发编程领域。Dart语言采用独特的“Isolate”模型来实现并发,每个Isolate都有自己的内存空间和事件循环,并且默认是单线程的。与之相对,原生世界(如C、C++、Rust)则通常采用共享内存、多线程的模型。当原生代码在自己的线程中执行任务,并需要将结果或事件异步地回调给Dart时,如何确保这些跨语言的线程交互是安全、高效且不阻塞UI的,便成为了一个核心问题——这就是本文将深入探讨的“原生线程与Dart Isolate的线程亲和性管理”。
理解并妥善管理这种线程亲和性至关重要。如果处理不当,轻则可能导致UI卡顿、数据不同步,重则可能引发应用崩溃,甚至更难以诊断的内存错误。本讲座将从Dart的并发模型入手,逐步深入到FFI的机制,最终提供一套全面的解决方案,并通过丰富的代码示例,展示如何在Flutter FFI中构建健壮、高效的跨语言并发应用。
Dart 的并发模型:隔离(Isolates)与事件循环
要理解FFI中的线程亲和性问题,我们首先需要深刻理解Dart语言自身的并发模型。Dart并没有采用传统意义上的“共享内存多线程”模型,而是选择了“隔离”(Isolate)模型。
什么是 Isolate?
Isolate 可以被理解为一个独立的 Dart 虚拟机实例。每个 Isolate 都拥有:
- 独立的内存堆:一个 Isolate 无法直接访问另一个 Isolate 的内存。这意味着Isolate之间不存在共享内存带来的数据竞争问题,从而简化了并发编程的复杂性。
- 独立的事件循环:每个 Isolate 都有一个自己的事件循环,负责处理该 Isolate 的所有异步事件(如用户输入、网络请求、定时器、FFI回调等)。
- 独立的执行线程:在底层,每个 Isolate 通常会运行在一个单独的操作系统线程上。
由于内存隔离,Isolate 之间不能直接共享变量。它们之间通信的唯一方式是通过“消息传递”(message passing)。这种模型类似于Erlang或Actor模型,强调无共享状态的并发。
事件循环与单线程执行
尽管 Dart 应用程序可以运行多个 Isolate,但每个 Isolate 内部都是单线程执行的。这意味着在一个 Isolate 中,Dart 代码在任何给定时间点都只有一个任务在执行。当一个任务正在运行时,其他任务(如UI更新、网络响应)必须等待。
事件循环是 Isolate 运行的核心。它不断地从事件队列中取出事件并执行其关联的回调函数。这个队列包含了所有待处理的异步操作。一个典型的 Dart Isolate 的事件循环流程如下:
- 检查“微任务队列”(Microtask Queue):优先处理所有微任务(如
Future.microtask、then回调等)。 - 检查“事件队列”(Event Queue):处理来自外部的事件,如计时器、I/O完成、用户交互、Isolate间消息、FFI回调等。
- 重复以上过程,直到两个队列都为空,此时 Isolate 可能会退出(如果不再有任何待处理的异步任务)。
这种单线程模型极大地简化了同步问题,避免了死锁和竞态条件。然而,它也意味着任何长时间运行的同步操作都会阻塞整个 Isolate,包括其事件循环,从而导致UI冻结(如果是在主 Isolate 上)或异步事件无法及时处理。
SendPort / ReceivePort:Isolate 间通信机制
如前所述,Isolate 之间通过消息传递进行通信。Dart 提供了 SendPort 和 ReceivePort 这对核心类来实现这一机制:
ReceivePort:一个 Isolate 通过创建一个ReceivePort来监听来自其他 Isolate 的消息。当消息到达时,ReceivePort会触发一个事件,并在其事件循环中执行回调函数。SendPort:当一个 Isolate 想要向另一个 Isolate 发送消息时,它需要持有目标 Isolate 的SendPort。SendPort是ReceivePort的一个“发送端句柄”。
一个 ReceivePort 可以有多个 SendPort 与之关联,但一个 SendPort 只能发送消息给一个特定的 ReceivePort。消息是按顺序发送和接收的,并且通常是“深度复制”的,这意味着传递的对象会在接收 Isolate 中重新创建,而不是共享引用(除非是原始类型或不可变对象)。
import 'dart:isolate';
// 模拟一个在后台Isolate中执行的耗时任务
void heavyComputation(SendPort sendPort) {
var result = 0;
for (var i = 0; i < 1000000000; i++) {
result += 1;
}
sendPort.send('Computation finished with result: $result');
}
void main() async {
print('Main Isolate started.');
// 1. 创建一个ReceivePort来接收来自后台Isolate的消息
final receivePort = ReceivePort();
// 2. 获取ReceivePort的SendPort,用于传递给后台Isolate
final sendPort = receivePort.sendPort;
// 3. 监听来自ReceivePort的消息
receivePort.listen((message) {
print('Main Isolate received: $message');
// 当收到消息后,可以关闭ReceivePort,或者根据需要继续监听
receivePort.close();
});
// 4. 启动一个新的Isolate,并传递SendPort给它
print('Spawning a new Isolate...');
await Isolate.spawn(heavyComputation, sendPort);
print('Main Isolate continues its work...');
// 主Isolate可以继续执行其他任务,不会被后台Isolate阻塞
await Future.delayed(Duration(seconds: 1));
print('Main Isolate finished its other work.');
}
输出示例:
Main Isolate started.
Spawning a new Isolate...
Main Isolate continues its work...
Main Isolate finished its other work.
Main Isolate received: Computation finished with result: 1000000000
这个例子清晰地展示了 Isolate.spawn 和 SendPort/ReceivePort 的基本用法。主 Isolate 启动了一个后台 Isolate 来执行耗时计算,而自身并未被阻塞。当后台 Isolate 完成计算后,通过 sendPort 将结果发送回主 Isolate,主 Isolate 的 receivePort.listen 回调被触发,从而接收到消息。
UI线程的特殊性
在 Flutter 应用中,主 Dart Isolate 承担着渲染UI、处理用户输入和管理应用状态的关键职责。因此,主 Isolate 也是 Flutter 的 UI 线程。任何阻塞主 Isolate 事件循环的操作都会导致UI无响应(“掉帧”),严重影响用户体验。
正是因为主 Isolate 的这种特殊性和单线程特性,我们在处理 FFI 回调时必须格外小心。原生代码在后台线程中执行任务后,如果需要更新UI或通知Dart应用状态变化,它绝不能直接在任意原生线程中调用Dart代码,更不能阻塞主 Isolate。我们必须确保所有与UI相关的操作都通过安全的消息传递机制,最终在主 Isolate 的事件循环中执行。
原生并发模型概述
与 Dart 的 Isolate 模型截然不同,C、C++、Rust 等原生语言通常采用基于操作系统线程的并发模型。理解这些模型的特点对于设计安全的 FFI 交互至关重要。
主流原生语言(C/C++/Rust)的线程模型
-
C/C++ (POSIX Threads/WinAPI):
- C语言本身没有内置的线程支持,通常依赖于操作系统提供的API。在类Unix系统上,这通常是 POSIX Threads (pthreads) 库;在Windows上,则是 WinAPI 提供的线程函数。
- 共享内存:原生线程默认共享进程的地址空间。这意味着它们可以直接访问和修改相同的全局变量或堆内存。
- 同步机制:为了避免数据竞争和确保内存一致性,C/C++ 程序员必须显式地使用互斥锁(mutexes)、读写锁、条件变量、信号量等同步原语。
- 线程生命周期:线程需要被创建 (
pthread_create/CreateThread),执行一个入口函数,并在完成后被加入 (pthread_join/WaitForSingleObject) 或分离 (pthread_detach)。 - 回调函数:原生库常常会提供回调函数机制,允许用户代码注册一个函数指针,当特定事件发生时,库会在某个线程(可能是主线程,也可能是其内部工作线程)中调用这个函数。
-
Rust (标准库线程):
- Rust 的标准库 (
std::thread) 提供了创建和管理操作系统线程的抽象。 - 所有权与借用:Rust 的一个核心优势是其所有权系统和借用检查器,它在编译时强制执行内存安全和数据竞争预防。即使在多线程环境中,Rust 也能在很大程度上防止数据竞争,除非显式使用
unsafe代码。 - 消息传递与共享状态:Rust 鼓励使用消息传递(如通道
std::sync::mpsc)在线程间安全地通信,或者通过Arc<Mutex<T>>/Arc<RwLock<T>>等智能指针安全地共享可变状态。 - 内存安全:Rust 的类型系统和生命周期规则使其在原生语言中独树一帜,能够提供强大的并发安全保证,减少了许多与C/C++相关的常见并发错误。
- Rust 的标准库 (
线程创建、同步与生命周期
无论是 C、C++ 还是 Rust,原生线程的创建和管理都涉及以下几个基本方面:
- 创建:启动一个新的执行流,通常需要指定一个入口函数(线程函数)。
- 同步:当多个线程需要访问共享资源时,必须使用同步机制来协调它们的访问,防止数据损坏。
- 通信:线程之间可能需要交换数据。这可以通过共享内存(配合同步)或消息队列实现。
- 生命周期:线程从创建到执行完成,再到最终销毁。正确管理线程的生命周期对于防止资源泄露和程序崩溃至关重要。例如,一个分离的线程(detached thread)在完成后会自动释放资源,而一个可加入的线程(joinable thread)则需要另一个线程显式地调用
join来等待其完成并回收资源。
原生回调机制
许多原生库都会提供回调函数机制。例如,一个网络库可能会在数据到达时调用用户注册的onDataReceived函数;一个硬件驱动可能会在某个事件发生时调用onHardwareEvent。这些回调函数在被调用时,其所在的线程上下文是不确定的:
- 可能在调用者线程:如果回调是同步的,或者库内部在调用者线程上直接触发。
- 可能在库的内部工作线程:许多库会创建自己的线程池来处理异步任务,并在这些线程上触发回调。
- 可能在操作系统事件循环线程:例如,GUI框架的事件处理通常在主UI线程上。
这种不确定性是 FFI 线程亲和性问题的核心所在。当原生代码在某个后台线程中触发一个回调,而这个回调最终需要将数据发送给 Dart 时,我们必须确保这个数据能够安全地传递到正确的 Dart Isolate(通常是主 Isolate),并且不会在不合适的线程中直接执行 Dart 代码。
Flutter FFI 基础:从 Dart 到 原生的桥梁
dart:ffi 库是 Flutter FFI 的核心。它允许 Dart 代码直接调用 C 语言风格的函数,并与 C 语言数据结构进行交互。
dart:ffi 库的核心组件
-
DynamicLibrary:用于加载原生共享库(.soon Linux/Android,.dylibon macOS/iOS,.dllon Windows)。import 'dart:ffi'; import 'dart:io'; DynamicLibrary openDynamicLibrary() { if (Platform.isAndroid) { return DynamicLibrary.open('libmy_native_lib.so'); } else if (Platform.isIOS) { return DynamicLibrary.process(); // iOS bundles dylibs into the app executable } else if (Platform.isMacOS) { return DynamicLibrary.open('libmy_native_lib.dylib'); } else if (Platform.isWindows) { return DynamicLibrary.open('my_native_lib.dll'); } else { throw UnsupportedError('Unsupported platform'); } } final DynamicLibrary myNativeLib = openDynamicLibrary(); -
lookupFunction:从加载的库中查找并绑定一个原生函数到 Dart 函数签名。它需要两个类型参数:原生函数的C类型签名和对应的Dart函数类型签名。// C function signature: int add(int a, int b); typedef AddFuncC = Int32 Function(Int32 a, Int32 b); // Dart function signature: int add(int a, int b); typedef AddFuncDart = int Function(int a, int b); final AddFuncDart add = myNativeLib .lookupFunction<AddFuncC, AddFuncDart>('add'); // Usage: // int result = add(10, 20); // result is 30 -
Pointer<T>:代表一个指向 C 类型T的内存地址。它是 FFI 中最核心的概念之一,用于处理 C 语言中的指针。Pointer<Int32>: Cint*Pointer<Utf8>: Cchar*(用于字符串)Pointer<Void>: Cvoid*(通用指针)Pointer<NativeFunction<T>>: C 函数指针
-
Struct和Union:允许 Dart 定义与 C 结构体或联合体布局相匹配的类,从而在 Dart 和原生之间传递复杂的数据结构。// C struct: // typedef struct { // int x; // int y; // } Point; import 'package:ffi/ffi.dart'; // For @sizeOf, allocate, free class Point extends Struct { @Int32() external int x; @Int32() external int y; } // Usage: // final Pointer<Point> p = calloc<Point>(); // p.ref.x = 10; // p.ref.y = 20; // someNativeFunctionThatTakesAPointPointer(p); // calloc.free(p);
类型映射与内存管理
Dart FFI 提供了完善的类型映射机制,将 Dart 类型与 C 类型进行对应:
| Dart FFI Type | C Type | Dart Primitive Type | 说明 |
|---|---|---|---|
Int8 |
int8_t, char |
int |
8位有符号整数 |
Uint8 |
uint8_t, unsigned char |
int |
8位无符号整数 |
Int16 |
int16_t, short |
int |
16位有符号整数 |
Uint16 |
uint16_t, unsigned short |
int |
16位无符号整数 |
Int32 |
int32_t, int |
int |
32位有符号整数 |
Uint32 |
uint32_t, unsigned int |
int |
32位无符号整数 |
Int64 |
int64_t, long long |
int |
64位有符号整数 |
Uint64 |
uint64_t, unsigned long long |
int |
64位无符号整数 |
Float |
float |
double |
32位浮点数 |
Double |
double |
double |
64位浮点数 |
Bool |
_Bool (C99), bool |
bool |
布尔值 (0/1) |
Void |
void |
void |
无类型 |
Pointer<T> |
T* |
Pointer<T> |
指针类型 |
Pointer<Utf8> |
char* |
String |
C 字符串 (UTF-8编码) |
Pointer<NativeFunction<T>> |
T (*)(...) |
Function |
C 函数指针 |
Handle |
Dart_Handle |
Object |
Dart 对象引用 (仅用于回调,不推荐直接传递) |
内存管理:
由于 Dart 有垃圾回收机制,而 C/C++ 需要手动管理内存,因此在使用 FFI 时,内存管理是一个关键点。
- Dart 分配,原生使用:如果 Dart 分配内存(例如使用
calloc或malloc),并将其指针传递给原生代码,那么 Dart 必须负责在不再需要时释放这块内存。 - 原生分配,Dart 使用:如果原生代码分配内存,并将其指针返回给 Dart,那么通常原生代码也应该提供一个函数让 Dart 调用来释放这块内存,或者 Dart 必须非常小心地管理其生命周期。
- 字符串: Dart 字符串 (
String) 和 C 字符串 (char*) 之间不能直接传递。需要使用Utf8.toUtf8()将 Dart 字符串转换为Pointer<Utf8>,并记得在原生使用后释放内存 (calloc.free)。从 C 返回的char*需要使用toDartString()转换为 DartString。
调用原生函数
以下是一个简单的 C 库和 Dart FFI 调用的例子,展示了同步调用。
C 代码 (my_native_lib.c)
#include <stdio.h>
#include <stdlib.h> // For malloc/free
// Function to add two integers
int add(int a, int b) {
return a + b;
}
// Function to reverse a string
// Caller is responsible for freeing the returned string
char* reverse_string(const char* input) {
if (input == NULL) {
return NULL;
}
int len = 0;
while (input[len] != '') {
len++;
}
char* reversed = (char*) malloc(len + 1);
if (reversed == NULL) {
return NULL;
}
for (int i = 0; i < len; i++) {
reversed[i] = input[len - 1 - i];
}
reversed[len] = '';
return reversed;
}
// Function to free a string allocated by reverse_string
void free_string(char* ptr) {
free(ptr);
}
编译 C 代码 (Linux/macOS)
gcc -shared -o libmy_native_lib.so my_native_lib.c
# For macOS: gcc -shared -o libmy_native_lib.dylib my_native_lib.c
# For Windows: Use MSVC or MinGW to compile to .dll
Dart 代码 (main.dart)
import 'dart:ffi';
import 'dart:io';
import 'package:ffi/ffi.dart'; // For Utf8.toUtf8, calloc.free
// 1. 加载原生库
DynamicLibrary _openDynamicLibrary() {
if (Platform.isAndroid || Platform.isLinux) {
return DynamicLibrary.open('libmy_native_lib.so');
} else if (Platform.isIOS || Platform.isMacOS) {
// For iOS, native libraries are often bundled directly into the executable.
// For macOS, you might specify the full path or use .dylib.
return DynamicLibrary.open('libmy_native_lib.dylib');
} else if (Platform.isWindows) {
return DynamicLibrary.open('my_native_lib.dll');
} else {
throw UnsupportedError('Unsupported platform');
}
}
final DynamicLibrary _myNativeLib = _openDynamicLibrary();
// 2. 定义原生函数签名
// C: int add(int a, int b);
typedef AddFuncC = Int32 Function(Int32 a, Int32 b);
// Dart: int add(int a, int b);
typedef AddFuncDart = int Function(int a, int b);
// C: char* reverse_string(const char* input);
typedef ReverseStringFuncC = Pointer<Utf8> Function(Pointer<Utf8> input);
// Dart: Pointer<Utf8> reverse_string(Pointer<Utf8> input);
typedef ReverseStringFuncDart = Pointer<Utf8> Function(Pointer<Utf8> input);
// C: void free_string(char* ptr);
typedef FreeStringFuncC = Void Function(Pointer<Utf8> ptr);
// Dart: void free_string(Pointer<Utf8> ptr);
typedef FreeStringFuncDart = void Function(Pointer<Utf8> ptr);
// 3. 查找并绑定原生函数
final AddFuncDart add = _myNativeLib
.lookupFunction<AddFuncC, AddFuncDart>('add');
final ReverseStringFuncDart reverseString = _myNativeLib
.lookupFunction<ReverseStringFuncC, ReverseStringFuncDart>('reverse_string');
final FreeStringFuncDart freeString = _myNativeLib
.lookupFunction<FreeStringFuncC, FreeStringFuncDart>('free_string');
void main() {
print('Calling native add function...');
int sum = add(100, 200);
print('100 + 200 = $sum'); // Output: 100 + 200 = 300
print('nCalling native reverse_string function...');
final originalDartString = 'Hello, FFI!';
// Allocate C string from Dart string
final Pointer<Utf8> cString = originalDartString.toUtf8();
// Call native function
final Pointer<Utf8> reversedCString = reverseString(cString);
// Convert C string back to Dart string
final String reversedDartString = reversedCString.toDartString();
print('Original: "$originalDartString"');
print('Reversed: "$reversedDartString"');
// Free memory allocated by Dart for original string
calloc.free(cString);
// Free memory allocated by native for reversed string
freeString(reversedCString);
print('Memory freed.');
}
这个例子展示了基本的 FFI 调用和手动内存管理。然而,这些都是同步调用。当原生函数执行时间较长时,它们会阻塞 Dart Isolate,尤其是在主 Isolate 中,这将导致UI卡顿。这正是我们需要解决的核心问题。
核心挑战:原生线程与 Dart Isolate 的线程亲和性
当我们开始在 FFI 中引入并发和回调时,原生线程与 Dart Isolate 之间的线程亲和性管理便成为一个复杂且关键的问题。理解挑战的根源是找到正确解决方案的第一步。
为什么会出现问题?
-
UI 更新限制:Flutter 的 UI 渲染引擎要求所有 UI 更新(例如通过
setState触发的 Widget 重建)都必须在主 Dart Isolate 上执行。这是为了保证 UI 状态的一致性和避免复杂的并发同步问题。 -
Dart Isolate 的单线程特性:每个 Dart Isolate 内部都是严格单线程的,通过事件循环处理所有任务。如果一个长时间运行的同步任务(包括同步 FFI 调用)在主 Isolate 中执行,它会阻塞事件循环,导致 UI 冻结,用户输入无法响应,动画停止。
-
原生回调的线程上下文:原生库通常在其内部的工作线程中执行耗时操作,并在这些线程中触发回调。如果我们将一个 Dart 函数指针直接传递给原生,让原生在任意线程中调用它,这会带来严重问题。Dart VM 并不期望在任意操作系统线程上执行 Dart 代码。Dart VM 运行时需要特定的上下文才能安全执行 Dart 代码,这个上下文通常绑定到它自己的 Isolate 线程。
-
直接在任意原生线程调用 Dart 的风险:
- VM 崩溃:Dart VM 并非线程安全的,如果在非 Dart Isolate 关联的线程中尝试执行 Dart 代码,极有可能导致 VM 崩溃。
- 内存损坏:由于 Dart 有垃圾回收机制,而不相关的原生线程可能在 Dart GC 运行时访问或修改 Dart 对象,导致内存损坏。
- 未定义的行为:即使没有立即崩溃,也可能导致难以追踪的错误和未定义的行为。
-
阻塞 Dart UI Isolate 的危害:
- UI 冻结:最直接的影响,用户界面停止响应。
- 帧率下降:动画不再流畅,用户体验极差。
- ANR (Application Not Responding):在 Android 等平台上,长时间阻塞主线程会导致系统提示应用无响应。
- 数据不一致:如果后台任务与 UI 状态紧密相关,阻塞可能导致数据更新延迟或错乱。
简而言之,核心问题在于:原生世界的“多线程共享内存”模型与 Dart 世界的“多隔离消息传递”模型之间的不兼容性。我们不能简单地让原生代码在任何线程中“自由地”调用 Dart 代码,也不能让耗时的原生操作阻塞 Dart 的主 Isolate。
错误的示例(概念性,切勿尝试):
假设我们有一个 C 函数,它在后台线程中执行一个耗时任务,并在完成后直接调用一个 Dart 函数指针。
C 代码(错误示范)
// ... Dart_PostCObject_DL 初始化等略过
// DartFunctionPointer is a pointer to a Dart function.
// This is overly simplified and dangerous.
typedef void (*DartCallback)(int result);
DartCallback global_dart_callback = NULL;
void register_dart_callback(DartCallback callback) {
global_dart_callback = callback;
}
void long_running_task_in_native_thread() {
// This function runs in a *native background thread*
// ... simulate heavy computation ...
int result = 42;
if (global_dart_callback != NULL) {
// !!! DANGER: Calling Dart from an arbitrary native thread is UNSAFE !!!
// This will likely crash the Dart VM or lead to undefined behavior.
global_dart_callback(result);
}
}
// A wrapper to start the task in a new native thread
void start_native_long_task() {
// In a real scenario, you'd create a pthread or similar here
// For demonstration, let's just imagine this runs in a new thread
printf("Native: Starting long running task in a new thread.n");
long_running_task_in_native_thread(); // This should be spawned in a new thread
}
Dart 代码(错误示范)
import 'dart:ffi';
// C type for the Dart callback
typedef DartCallbackC = Void Function(Int32 result);
// Dart type for the Dart callback
typedef DartCallbackDart = void Function(int result);
// Native function to register the callback
typedef RegisterDartCallbackFuncC = Void Function(Pointer<NativeFunction<DartCallbackC>> callback);
typedef RegisterDartCallbackFuncDart = void Function(Pointer<NativeFunction<DartCallbackC>> callback);
// Native function to start the long task
typedef StartNativeLongTaskFuncC = Void Function();
typedef StartNativeLongTaskFuncDart = void Function();
// ... FFI setup for _myNativeLib ...
// Bind native functions
final RegisterDartCallbackFuncDart registerDartCallback = _myNativeLib
.lookupFunction<RegisterDartCallbackFuncC, RegisterDartCallbackFuncDart>('register_dart_callback');
final StartNativeLongTaskFuncDart startNativeLongTask = _myNativeLib
.lookupFunction<StartNativeLongTaskFuncC, StartNativeLongTaskFuncDart>('start_native_long_task');
// This is the Dart function that will be called by native
void myDartCallback(int result) {
print('Dart: Native task finished with result: $result');
// Update UI or state...
}
void main() {
// Get a pointer to the Dart function
final Pointer<NativeFunction<DartCallbackC>> dartCallbackPointer =
Pointer.fromFunction<DartCallbackC>(myDartCallback, 0); // 0 is for error value
// Register the Dart callback with native
registerDartCallback(dartCallbackPointer);
// Start the native long task (which will call myDartCallback from a native thread)
startNativeLongTask();
}
上述代码展示了将 Dart 函数指针直接传递给原生,并让原生在任意线程中调用的错误模式。这种做法是极其危险的,因为它违反了 Dart VM 的线程安全原则。Dart VM 期望 Dart 代码在其 Isolate 线程上执行,而不是任意的原生线程。
为了安全地实现原生异步回调,我们必须采用一种消息传递机制,将原生线程中的数据安全地“投递”到目标 Dart Isolate 的事件队列中,从而在 Dart Isolate 的主线程中执行后续的 Dart 逻辑。这就是 NativePort 和 Dart_PostCObject_DL 的用武之地。
解决方案一:基于端口(Port)的回调机制
NativePort 是 Dart FFI 专门为原生代码与 Dart Isolate 之间进行异步消息传递而设计的一种机制。它允许原生代码向一个特定的 Dart ReceivePort 发送消息,这些消息会被安全地投递到目标 Isolate 的事件队列中,并在该 Isolate 的主线程上处理。
NativePort、SendPort 与 ReceivePort 的协同
整个流程可以概括为:
- Dart 端创建
ReceivePort:首先,Dart Isolate(通常是主 Isolate)创建一个ReceivePort来监听消息。 - 获取
SendPort地址:从ReceivePort获取其对应的SendPort,然后通过NativePort.sendPortToNativePort方法将其转换为一个可以传递给原生代码的整数句柄(Dart_Port类型)。 - 传递
Dart_Port给原生:Dart 将这个整数句柄(Dart_Port)通过 FFI 调用传递给原生代码。 - 原生端存储
Dart_Port:原生代码接收并存储这个Dart_Port。 - 原生端初始化
Dart_PostCObject_DL:原生代码需要通过Dart_InitializeApiDL初始化 Dart API 函数指针,特别是Dart_PostCObject_DL。 - 原生端发送消息:当原生代码需要向 Dart 发送消息时(例如,在后台线程中完成任务后),它会构建一个
Dart_CObject结构体来封装数据,然后调用Dart_PostCObject_DL(dart_port, &message_object)将消息发送到对应的 DartReceivePort。 - Dart 端接收消息:
ReceivePort监听器会收到消息,并在其 Isolate 的事件循环中执行回调。
从 Dart 创建 NativePort 并传递其地址
Dart 提供 NativePort.sendPortToNativePort 来获取 SendPort 的原生句柄。
import 'dart:ffi';
import 'dart:isolate';
// ... FFI setup for _myNativeLib ...
// In Dart code:
final ReceivePort receivePort = ReceivePort();
// Get the native port ID (Dart_Port) associated with this ReceivePort
final int nativePort = receivePort.sendPort.nativePort;
// Now, pass 'nativePort' (which is an int representing a Dart_Port) to native code via FFI
// Example:
// myNativeLib.lookupFunction<...>(...)('register_callback_port_dl')(nativePort);
// Listen for messages on this port
receivePort.listen((message) {
print('Dart received message from native: $message');
// Process message, update UI, etc.
});
原生侧获取 NativePort 地址并使用 Dart_PostCObject_DL
原生代码需要 Dart_InitializeApiDL 来获取 Dart_PostCObject_DL 函数指针。这个函数指针是 Dart VM 提供给原生的,用于安全地从任意原生线程向 Dart Isolate 发送消息。
C 代码 (my_native_lib.c)
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h> // For native threading
// Include Dart's C API header
// You'll need to find this in your Flutter SDK, e.g.,
// flutter/bin/cache/pkg/sky_engine/lib/ffi/api/dart_api_dl.h
#include "dart_api_dl.h"
// Store the Dart_PostCObject_DL function pointer
static Dart_PostCObject_DL_Type Dart_PostCObject_DL_ptr;
// Global variable to store the Dart_Port
static Dart_Port main_dart_port = ILLEGAL_PORT;
// Initialize Dart API (must be called once from Dart)
void init_dart_api_dl(void* data) {
Dart_PostCObject_DL_ptr = data;
printf("Native: Dart_PostCObject_DL initialized.n");
}
// Register the Dart_Port from Dart
void register_port(Dart_Port port) {
main_dart_port = port;
printf("Native: Registered Dart Port: %lldn", (long long)port);
}
// --- Native background task example ---
typedef struct {
char* data;
int length;
} NativeMessage;
void* native_long_task_thread(void* arg) {
printf("Native thread: Started long running task...n");
// Simulate some work
for (int i = 0; i < 5; i++) {
printf("Native thread: Working... %dn", i + 1);
usleep(1000 * 1000); // Sleep for 1 second
if (main_dart_port != ILLEGAL_PORT) {
// Send progress update to Dart
Dart_CObject message;
message.type = Dart_CObject_kInt64;
message.value.as_int64 = i + 1; // Send progress as an integer
bool posted = Dart_PostCObject_DL_ptr(main_dart_port, &message);
if (!posted) {
fprintf(stderr, "Native thread: Failed to post progress message to Dart port.n");
}
}
}
// Simulate final result
const char* result_str = "Task completed successfully!";
if (main_dart_port != ILLEGAL_PORT) {
// Prepare Dart_CObject for string
Dart_CObject message;
message.type = Dart_CObject_kString;
message.value.as_string = (char*)result_str; // Cast to char* is safe as we're not modifying
printf("Native thread: Sending final result to Dart.n");
bool posted = Dart_PostCObject_DL_ptr(main_dart_port, &message);
if (!posted) {
fprintf(stderr, "Native thread: Failed to post final result message to Dart port.n");
}
} else {
printf("Native thread: Dart Port not registered, cannot send result.n");
}
printf("Native thread: Long running task finished.n");
return NULL;
}
// Function to start the native background task
void start_native_long_task_async() {
pthread_t thread_id;
// Create a detached thread, so no need to join it from the main thread
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int ret = pthread_create(&thread_id, &attr, native_long_task_thread, NULL);
pthread_attr_destroy(&attr);
if (ret != 0) {
fprintf(stderr, "Native: Failed to create native thread: %dn", ret);
} else {
printf("Native: Started native long task in background thread (ID: %lu).n", (unsigned long)thread_id);
}
}
编译 C 代码 (Linux/macOS)
# Important: Link with -ldart_api_dl and -lpthread
# You need to find the dart_api_dl.h and libdart_api_dl.a (or .so) from your Flutter SDK.
# Example path: flutter/bin/cache/pkg/sky_engine/lib/ffi/api
# Replace <FLUTTER_SDK_PATH> with your actual Flutter SDK path.
# For Linux (adjust paths):
gcc -shared -o libmy_native_lib.so my_native_lib.c
-I<FLUTTER_SDK_PATH>/bin/cache/pkg/sky_engine/lib/ffi/api
-L<FLUTTER_SDK_PATH>/bin/cache/artifacts/engine/linux-x64/dart-sdk/lib
-ldart_api_dl -lpthread
# For macOS (adjust paths, use .dylib):
gcc -shared -o libmy_native_lib.dylib my_native_lib.c
-I<FLUTTER_SDK_PATH>/bin/cache/pkg/sky_engine/lib/ffi/api
-L<FLUTTER_SDK_PATH>/bin/cache/artifacts/engine/darwin-x64/dart-sdk/lib
-ldart_api_dl -lpthread
Dart_CObject:数据封装与类型转换
Dart_PostCObject_DL 函数接收一个 Dart_Port 和一个 Dart_CObject 指针。Dart_CObject 是一个联合体,用于在原生和 Dart 之间传递各种类型的数据。
Dart_CObject_Type |
value 字段 |
Dart 接收类型 | 说明 |
|---|---|---|---|
Dart_CObject_kNull |
N/A | null |
空值 |
Dart_CObject_kBool |
as_bool |
bool |
布尔值 |
Dart_CObject_kInt32 |
as_int32 |
int |
32位整数 |
Dart_CObject_kInt64 |
as_int64 |
int |
64位整数 |
Dart_CObject_kDouble |
as_double |
double |
双精度浮点数 |
Dart_CObject_kString |
as_string |
String |
UTF-8 编码的 C 字符串 |
Dart_CObject_kArray |
as_array |
List<Object?> |
包含其他 Dart_CObject 的数组 |
Dart_CObject_kTypedData |
as_typed_data |
TypedData |
Uint8List, Int332List 等类型化的字节数组 |
Dart_CObject_kExternalTypedData |
as_external_typed_data |
TypedData |
外部(原生分配)的类型化字节数组,Dart 不拥有内存 |
Dart_CObject_kSendPort |
as_send_port |
SendPort |
传递 SendPort 对象 |
Dart_CObject_kCapability |
as_capability |
Capability |
传递 Capability 对象 |
注意:Dart_CObject_kString 传递的字符串,原生代码不需要手动释放,Dart VM 会在接收后将其复制到 Dart 堆中。但如果是 Dart_CObject_kExternalTypedData,则需要提供一个回调函数,让 Dart 在不再需要时通知原生释放内存。
代码示例:Dart-Native 异步回调
现在,我们将上述 C 代码与 Dart 代码结合起来。
Dart 代码 (main.dart)
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
// 1. 加载原生库 (同前文)
DynamicLibrary _openDynamicLibrary() {
if (Platform.isAndroid || Platform.isLinux) {
return DynamicLibrary.open('libmy_native_lib.so');
} else if (Platform.isIOS || Platform.isMacOS) {
return DynamicLibrary.open('libmy_native_lib.dylib');
} else if (Platform.isWindows) {
return DynamicLibrary.open('my_native_lib.dll');
} else {
throw UnsupportedError('Unsupported platform');
}
}
final DynamicLibrary _myNativeLib = _openDynamicLibrary();
// 2. 定义原生函数签名
// C: void init_dart_api_dl(void* data);
typedef InitDartApiDlFuncC = Void Function(Pointer<Void> data);
typedef InitDartApiDlFuncDart = void Function(Pointer<Void> data);
// C: void register_port(Dart_Port port);
typedef RegisterPortFuncC = Void Function(Int64 port); // Dart_Port is 64-bit int
typedef RegisterPortFuncDart = void Function(int port);
// C: void start_native_long_task_async();
typedef StartNativeLongTaskAsyncFuncC = Void Function();
typedef StartNativeLongTaskAsyncFuncDart = void Function();
// 3. 查找并绑定原生函数
final InitDartApiDlFuncDart initDartApiDl = _myNativeLib
.lookupFunction<InitDartApiDlFuncC, InitDartApiDlFuncDart>('init_dart_api_dl');
final RegisterPortFuncDart registerPort = _myNativeLib
.lookupFunction<RegisterPortFuncC, RegisterPortFuncDart>('register_port');
final StartNativeLongTaskAsyncFuncDart startNativeLongTaskAsync = _myNativeLib
.lookupFunction<StartNativeLongTaskAsyncFuncC, StartNativeLongTaskAsyncFuncDart>('start_native_long_task_async');
void main() async {
print('Dart: Main Isolate started.');
// 4. 初始化 Dart API DL (必须在任何 Dart_PostCObject_DL_ptr 调用之前完成)
// `NativeApi.initializeApiDLData` 获取了 Dart VM 的内部指针,
// 供原生代码初始化 Dart_PostCObject_DL_ptr
initDartApiDl(NativeApi.initializeApiDLData);
// 5. 创建 ReceivePort 并获取其 nativePort
final ReceivePort receivePort = ReceivePort();
final int nativePort = receivePort.sendPort.nativePort;
// 6. 将 nativePort 注册到原生侧
registerPort(nativePort);
print('Dart: Registered native port $nativePort with native code.');
// 7. 监听来自原生侧的消息
receivePort.listen((message) {
if (message is int) {
print('Dart: Received progress from native: $message');
} else if (message is String) {
print('Dart: Received final result from native: "$message"');
// 当收到最终结果后,关闭 ReceivePort
receivePort.close();
print('Dart: ReceivePort closed.');
} else {
print('Dart: Received unknown message type: $message');
}
});
// 8. 启动原生后台任务
print('Dart: Starting native long task asynchronously...');
startNativeLongTaskAsync();
print('Dart: Main Isolate continues its work...');
// 主 Isolate 可以继续执行其他任务,不会被原生后台任务阻塞
await Future.delayed(Duration(seconds: 1));
print('Dart: Main Isolate finished its other work.');
}
运行结果示例:
Dart: Main Isolate started.
Native: Dart_PostCObject_DL initialized.
Dart: Registered native port 123456789 with native code.
Dart: Starting native long task asynchronously...
Native: Started native long task in background thread (ID: 140735165934592).
Dart: Main Isolate continues its work...
Dart: Main Isolate finished its other work.
Native thread: Started long running task...
Native thread: Working... 1
Dart: Received progress from native: 1
Native thread: Working... 2
Dart: Received progress from native: 2
Native thread: Working... 3
Dart: Received progress from native: 3
Native thread: Working... 4
Dart: Received progress from native: 4
Native thread: Working... 5
Dart: Received progress from native: 5
Native thread: Sending final result to Dart.
Native thread: Long running task finished.
Dart: Received final result from native: "Task completed successfully!"
Dart: ReceivePort closed.
这个例子展示了基于 NativePort 和 Dart_PostCObject_DL 的异步回调机制。原生代码在后台线程中执行任务,并通过 Dart_PostCObject_DL 定期发送进度和最终结果到 Dart。Dart 的主 Isolate 保持响应,并通过 receivePort.listen 接收并处理这些消息。这是 Flutter FFI 中处理异步回调最核心和推荐的方法。
解决方案二:Dart Isolate 的后台计算与 FFI 结合
有时,我们可能需要在后台 Isolate 中执行 FFI 调用,而不是在主 Isolate 中。这在以下场景中非常有用:
- FFI 调用本身是同步的,但耗时较长,我们希望它不阻塞主 Isolate。
- FFI 调用返回的数据需要在 Dart 侧进行大量处理,而这些处理也可能阻塞主 Isolate。
- 将 FFI 相关的逻辑封装在一个独立的 Isolate 中,以实现更好的模块化和资源管理。
这种方案结合了 Dart 的 Isolate.spawn 机制和 FFI 调用。
Isolate.spawn:在后台 Isolate 执行耗时 Dart 任务
Isolate.spawn 允许我们启动一个新的 Dart Isolate,并在其中执行一个指定的 Dart 函数。这个函数会接收一个 SendPort 参数,用于将结果发送回启动它的 Isolate。
FFI 调用在后台 Isolate 中执行
当我们在 Isolate.spawn 启动的函数中执行 FFI 调用时:
- 内存隔离:后台 Isolate 有自己的内存空间。它会加载原生库并执行 FFI 调用。
- 阻塞隔离:如果 FFI 调用是同步且耗时的,它只会阻塞当前的后台 Isolate,而不会影响主 Isolate 的 UI 响应。
- 回调机制:如果原生库需要异步回调,后台 Isolate 同样需要创建一个
ReceivePort,将其nativePort传递给原生,并通过Dart_PostCObject_DL接收消息。
后台 Isolate 与主 Isolate 通过端口通信
整个流程如下:
- 主 Isolate 准备:主 Isolate 创建一个
ReceivePort来接收来自后台 Isolate 的消息。 - 启动后台 Isolate:主 Isolate 调用
Isolate.spawn,将自己的SendPort传递给后台 Isolate。 - 后台 Isolate 初始化:
- 接收主 Isolate 的
SendPort。 - 加载原生库 (
DynamicLibrary.open)。 - (可选)如果原生需要回调,后台 Isolate 也需要创建一个自己的
ReceivePort,并将其nativePort传递给原生。 - 执行 FFI 调用。
- 接收主 Isolate 的
- 后台 Isolate 回传结果:后台 Isolate 将 FFI 调用的结果(或原生回调的结果)通过主 Isolate 提供的
SendPort发送回主 Isolate。 - 主 Isolate 接收结果:主 Isolate 的
ReceivePort监听器收到消息并处理。
代码示例:后台 Isolate 执行 FFI 调用,并将结果传回主 Isolate
我们将使用之前定义的 C 函数 reverse_string,但在后台 Isolate 中调用它。
C 代码 (my_native_lib.c)
(保持不变,只需要 add, reverse_string, free_string 函数)
Dart 代码 (main.dart)
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'package:ffi/ffi.dart';
// 1. FFI 相关的定义 (同前文,但现在是私有或在单独文件)
// Helper to open the dynamic library
DynamicLibrary _openDynamicLibrary() {
if (Platform.isAndroid || Platform.isLinux) {
return DynamicLibrary.open('libmy_native_lib.so');
} else if (Platform.isIOS || Platform.isMacOS) {
return DynamicLibrary.open('libmy_native_lib.dylib');
} else if (Platform.isWindows) {
return DynamicLibrary.open('my_native_lib.dll');
} else {
throw UnsupportedError('Unsupported platform');
}
}
// C: char* reverse_string(const char* input);
typedef ReverseStringFuncC = Pointer<Utf8> Function(Pointer<Utf8> input);
typedef ReverseStringFuncDart = Pointer<Utf8> Function(Pointer<Utf8> input);
// C: void free_string(char* ptr);
typedef FreeStringFuncC = Void Function(Pointer<Utf8> ptr);
typedef FreeStringFuncDart = void Function(Pointer<Utf8> ptr);
// Function to perform FFI calls in a background Isolate
void _ffiWorker(SendPort mainIsolateSendPort) {
print('FFI Worker Isolate: Started.');
final DynamicLibrary nativeLib = _openDynamicLibrary();
final ReverseStringFuncDart reverseString = nativeLib
.lookupFunction<ReverseStringFuncC, ReverseStringFuncDart>('reverse_string');
final FreeStringFuncDart freeString = nativeLib
.lookupFunction<FreeStringFuncC, FreeStringFuncDart>('free_string');
// Simulate a long-running FFI task
final originalString = 'This is a long string to reverse in FFI worker.';
print('FFI Worker Isolate: Reversing string: "$originalString"');
final Pointer<Utf8> cString = originalString.toUtf8();
final Pointer<Utf8> reversedCString = reverseString(cString);
final String reversedDartString = reversedCString.toDartString();
// Free native memory
calloc.free(cString);
freeString(reversedCString);
print('FFI Worker Isolate: String reversed: "$reversedDartString"');
// Send result back to the main Isolate
mainIsolateSendPort.send(reversedDartString);
print('FFI Worker Isolate: Sent result to main Isolate, exiting.');
}
void main() async {
print('Main Isolate: Started.');
// 1. 创建 ReceivePort 以接收来自后台 Isolate 的消息
final ReceivePort mainReceivePort = ReceivePort();
// 2. 获取主 Isolate 的 SendPort,以便传递给后台 Isolate
final SendPort mainIsolateSendPort = mainReceivePort.sendPort;
// 3. 监听来自后台 Isolate 的消息
mainReceivePort.listen((message) {
if (message is String) {
print('Main Isolate: Received reversed string from FFI worker: "$message"');
} else {
print('Main Isolate: Received unknown message: $message');
}
mainReceivePort.close(); // Close port after receiving result
print('Main Isolate: ReceivePort closed.');
});
// 4. 启动 FFI 工作 Isolate
print('Main Isolate: Spawning FFI worker Isolate...');
await Isolate.spawn(_ffiWorker, mainIsolateSendPort);
print('Main Isolate: Continues its work while FFI worker is busy...');
// 主 Isolate 可以继续执行其他任务,不会被 FFI worker 阻塞
await Future.delayed(Duration(seconds: 2));
print('Main Isolate: Finished its other work.');
}
运行结果示例:
Main Isolate: Started.
Main Isolate: Spawning FFI worker Isolate...
Main Isolate: Continues its work while FFI worker is busy...
FFI Worker Isolate: Started.
FFI Worker Isolate: Reversing string: "This is a long string to reverse in FFI worker."
Main Isolate: Finished its other work.
FFI Worker Isolate: String reversed: ".rekrow IFF ni esrever ot gnirts gnol a si sihT"
FFI Worker Isolate: Sent result to main Isolate, exiting.
Main Isolate: Received reversed string from FFI worker: ".rekrow IFF ni esrever ot gnirts gnol a si sihT"
Main Isolate: ReceivePort closed.
这个例子展示了如何利用 Dart 的 Isolate 模型来将耗时的 FFI 调用从主 Isolate 中分流出去。后台 Isolate 负责执行 FFI 调用,并将结果通过 SendPort 发送回主 Isolate,从而确保了 UI 的流畅性。这种模式非常适合需要执行计算密集型或 I/O 密集型 FFI 调用的场景。
解决方案三:原生侧线程管理与消息队列
在前一个解决方案中,我们让 Dart 的后台 Isolate 负责 FFI 调用。但在某些更复杂的场景中,原生库可能已经拥有自己的线程池、事件循环或复杂的内部并发模型。在这种情况下,让原生库自行管理线程,并在其内部线程中完成任务后,再将结果“推送”给 Dart,可能更加自然和高效。
此方案的核心思想是:
- 原生层创建并管理工作线程:原生库根据需要创建和管理自己的工作线程(例如,一个C++线程池或Rust的
rayon库)。 - 原生工作线程执行任务:这些原生工作线程执行耗时的计算或I/O操作。
- 原生消息队列:当原生工作线程完成任务或需要报告进度时,它将结果或事件放入一个原生层维护的消息队列中。
- 专门的原生“通知”线程:一个专门的原生线程(或者在某些情况下,可能是主线程的事件循环)负责从这个原生消息队列中取出消息。
- 通过
NativePort发送给 Dart:这个“通知”线程(而不是任意工作线程)从队列中取出消息后,将其封装为Dart_CObject,并通过之前注册的Dart_Port调用Dart_PostCObject_DL发送给 Dart。
为什么要使用原生消息队列和专用通知线程?
虽然 Dart_PostCObject_DL 理论上可以在任何原生线程中调用,但为了更好的控制和简化调试,将其封装在一个专门的通知机制中通常是更好的实践:
- 解耦:将工作线程的逻辑与 Dart 回调逻辑解耦。工作线程只关心完成其核心任务,并将结果放入队列。通知线程只关心从队列中取出结果并转发给 Dart。
- 错误隔离:如果
Dart_PostCObject_DL调用失败(例如,Dart Port 已关闭),错误可以被通知线程处理,而不会影响工作线程。 - 流量控制:可以在通知线程中实现消息限流或合并,避免 Dart Isolate 被过多消息淹没。
- 上下文一致性:尽管
Dart_PostCObject_DL是线程安全的,但将所有 Dart 回调集中在一个或少数几个已知线程中处理,有助于更清晰地理解和调试跨语言的并发流。
代码示例:原生线程池与消息回传机制
我们将模拟一个原生线程池,其中工作线程生成数据,然后通过一个消息队列将数据传递给一个专门的通知线程,该通知线程再将数据发送给 Dart。
C 代码 (my_native_lib.c)
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h> // For usleep
#include "dart_api_dl.h"
// --- Dart API Initialization ---
static Dart_PostCObject_DL_Type Dart_PostCObject_DL_ptr;
static Dart_Port main_dart_port = ILLEGAL_PORT;
void init_dart_api_dl(void* data) {
Dart_PostCObject_DL_ptr = data;
printf("Native: Dart_PostCObject_DL initialized.n");
}
void register_port(Dart_Port port) {
main_dart_port = port;
printf("Native: Registered Dart Port: %lldn", (long long)port);
}
// --- Native Message Queue for internal communication ---
typedef struct {
int type; // 0 for progress, 1 for final result
char* data; // For string results
int int_val; // For integer progress
} NativeQueueMessage;
#define MAX_QUEUE_SIZE 100
NativeQueueMessage message_queue[MAX_QUEUE_SIZE];
int head = 0;
int tail = 0;
int count = 0;
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queue_not_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t queue_not_full = PTHREAD_COND_INITIALIZER;
void enqueue_message(NativeQueueMessage msg) {
pthread_mutex_lock(&queue_mutex);
while (count == MAX_QUEUE_SIZE) {
printf("Native Queue: Full, waiting...n");
pthread_cond_wait(&queue_not_full, &queue_mutex);
}
message_queue[tail] = msg;
tail = (tail + 1) % MAX_QUEUE_SIZE;
count++;
pthread_cond_signal(&queue_not_empty);
pthread_mutex_unlock(&queue_mutex);
}
NativeQueueMessage dequeue_message() {
pthread_mutex_lock(&queue_mutex);
while (count == 0) {
printf("Native Queue: Empty, waiting...n");
pthread_cond_wait(&queue_not_empty, &queue_mutex);
}
NativeQueueMessage msg = message_queue[head];
head = (head + 1) % MAX_QUEUE_SIZE;
count--;
pthread_cond_signal(&queue_not_full);
pthread_mutex_unlock(&queue_mutex);
return msg;
}
// --- Native Worker Thread ---
void* worker_thread_func(void* arg) {
printf("Native Worker Thread: Started.n");
for (int i = 0; i < 3; i++) { // Simulate 3 steps of work
usleep(1000 * 1000); // Work for 1 second
NativeQueueMessage progress_msg;
progress_msg.type = 0; // Progress
progress_msg.int_val = i + 1;
progress_msg.data = NULL;
enqueue_message(progress_msg);
printf("Native Worker Thread: Enqueued progress %d.n", i + 1);
}
usleep(1000 * 1000); // Final work
NativeQueueMessage final_msg;
final_msg.type = 1; // Final result
final_msg.int_val = 0;
final_msg.data = strdup("Native task completed via message queue!"); // Duplicate string for queue
enqueue_message(final_msg);
printf("Native Worker Thread: Enqueued final result.n");
printf("Native Worker Thread: Exiting.n");
return NULL;
}
// --- Native Notifier Thread ---
static pthread_t notifier_thread_id;
static bool notifier_running = false;
void* notifier_thread_func(void* arg) {
printf("Native Notifier Thread: Started.n");
notifier_running = true;
while (notifier_running) {
NativeQueueMessage msg = dequeue_message(); // Blocks if queue is empty
if (main_dart_port == ILLEGAL_PORT) {
fprintf(stderr, "Native Notifier Thread: Dart Port not registered, dropping message.n");
if (msg.type == 1 && msg.data != NULL) {
free(msg.data); // Free duplicated string
}
continue;
}
Dart_CObject dart_msg;
bool posted = false;
if (msg.type == 0) { // Progress
dart_msg.type = Dart_CObject_kInt64;
dart_msg.value.as_int64 = msg.int_val;
posted = Dart_PostCObject_DL_ptr(main_dart_port, &dart_msg);
printf("Native Notifier Thread: Posted progress %lld to Dart.n", dart_msg.value.as_int64);
} else if (msg.type == 1) { // Final result
dart_msg.type = Dart_CObject_kString;
dart_msg.value.as_string = msg.data;
posted = Dart_PostCObject_DL_ptr(main_dart_port, &dart_msg);
printf("Native Notifier Thread: Posted final result '%s' to Dart.n", dart_msg.value.as_string);
free(msg.data); // Free the duplicated string after posting (Dart VM makes its own copy)
}
if (!posted) {
fprintf(stderr, "Native Notifier Thread: Failed to post message to Dart port.n");
}
}
printf("Native Notifier Thread: Exiting.n");
return NULL;
}
// --- Control Functions ---
void start_native_workers_and_notifier() {
// Start notifier thread if not already running
if (!notifier_running) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // Detach notifier thread
int ret = pthread_create(¬ifier_thread_id, &attr, notifier_thread_func, NULL);
pthread_attr_destroy(&attr);
if (ret != 0) {
fprintf(stderr, "Native: Failed to create notifier thread: %dn", ret);
return;
}
}
// Start a worker thread (can be multiple workers in a real pool)
pthread_attr_t worker_attr;
pthread_attr_init(&worker_attr);
pthread_attr_setdetachstate(&worker_attr, PTHREAD_CREATE_DETACHED); // Detach worker thread
pthread_t worker_thread_id;
int ret = pthread_create(&worker_thread_id, &worker_attr, worker_thread_func, NULL);
pthread_attr_destroy(&worker_attr);
if (ret != 0) {
fprintf(stderr, "Native: Failed to create worker thread: %dn", ret);
} else {
printf("Native: Started native worker in background thread (ID: %lu).n", (unsigned long)worker_thread_id);
}
}
// Function to stop the notifier thread gracefully (optional, but good for cleanup)
void stop_native_notifier() {
pthread_mutex_lock(&queue_mutex);
notifier_running = false;
// Signal the condition variable to wake up the notifier thread if it's waiting
pthread_cond_signal(&queue_not_empty);
pthread_mutex_unlock(&queue_mutex);
// If the thread is detached, we can't join it.
// In a real app, you might use a joinable thread and join it here.
printf("Native: Signaled notifier thread to stop.n");
}
编译 C 代码 (同前文,注意链接 pthread)
Dart 代码 (main.dart)
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
// 1. FFI 相关的定义 (同前文,用于初始化 Dart API DL 和注册端口)
DynamicLibrary _openDynamicLibrary() {
if (Platform.isAndroid || Platform.isLinux) {
return DynamicLibrary.open('libmy_native_lib.so');
} else if (Platform.isIOS || Platform.isMacOS) {
return DynamicLibrary.open('libmy_native_lib.dylib');
} else if (Platform.isWindows) {
return DynamicLibrary.open('my_native_lib.dll');
} else {
throw UnsupportedError('Unsupported platform');
}
}
final DynamicLibrary _myNativeLib = _openDynamicLibrary();
// C: void init_dart_api_dl(void* data);
typedef InitDartApiDlFuncC = Void Function(Pointer<Void> data);
typedef InitDartApiDlFuncDart = void Function(Pointer<Void> data);
// C: void register_port(Dart_Port port);
typedef RegisterPortFuncC = Void Function(Int64 port);
typedef RegisterPortFuncDart = void Function(int port);
// C: void start_native_workers_and_notifier();
typedef StartNativeWorkersAndNotifierFuncC = Void Function();
typedef StartNativeWorkersAndNotifierFuncDart = void Function();
// C: void stop_native_notifier(); (Optional cleanup)
typedef StopNativeNotifierFuncC = Void Function();
typedef StopNativeNotifierFuncDart = void Function();
final InitDartApiDlFuncDart initDartApiDl = _myNativeLib
.lookupFunction<InitDartApiDlFuncC, InitDartApiDlFuncDart>('init_dart_api_dl');
final RegisterPortFuncDart registerPort = _myNativeLib
.lookupFunction<RegisterPortFuncC, RegisterPortFuncDart>('register_port');
final StartNativeWorkersAndNotifierFuncDart startNativeWorkersAndNotifier = _myNativeLib
.lookupFunction<StartNativeWorkersAndNotifierFuncC, StartNativeWorkersAndNotifierFuncDart>('start_native_workers_and_notifier');
final StopNativeNotifierFuncDart stopNativeNotifier = _myNativeLib
.lookupFunction<StopNativeNotifierFuncC, StopNativeNotifierFuncDart>('stop_native_notifier');
void main() async {
print('Dart: Main Isolate started.');
initDartApiDl(NativeApi.initializeApiDLData);
final ReceivePort receivePort = ReceivePort();
final int nativePort = receivePort.sendPort.nativePort;
registerPort(nativePort);
print('Dart: Registered native port $nativePort with native code.');
receivePort.listen((message) {
if (message is int) {
print('Dart: Received progress from native via queue: $message');
} else if (message is String) {
print('Dart: Received final result from native via queue: "$message"');
receivePort.close();
stopNativeNotifier(); // Call native cleanup
print('Dart: ReceivePort closed, native notifier stopped.');
} else {
print('Dart: Received unknown message type: $message');
}
});
print('Dart: Starting native workers and notifier...');
startNativeWorkersAndNotifier();
print('Dart: Main Isolate continues its work...');
await Future.delayed(Duration(seconds: 1));
print('Dart: Main Isolate finished its initial work.');
}
运行结果示例:
Dart: Main Isolate started.
Native: Dart_PostCObject_DL initialized.
Dart: Registered native port 123456789 with native code.
Dart: Starting native workers and notifier...
Native: Started native worker in background thread (ID: 140735165934592).
Native Notifier Thread: Started.
Dart: Main Isolate continues its work...
Dart: Main Isolate finished its initial work.
Native Worker Thread: Started.
Native Worker Thread: Enqueued progress 1.
Native Queue: Empty, waiting...
Native Notifier Thread: Posted progress 1 to Dart.
Dart: Received progress from native via queue: 1
Native Worker Thread: Enqueued progress 2.
Native Notifier Thread: Posted progress 2 to Dart.
Dart: Received progress from native via queue: 2
Native Worker Thread: Enqueued progress 3.
Native Notifier Thread: Posted progress 3 to Dart.
Dart: Received progress from native via queue: 3
Native Worker Thread: Enqueued final result.
Native Worker Thread: Exiting.
Native Notifier Thread: Posted final result 'Native task completed via message queue!' to Dart.
Dart: Received final result from native via queue: "Native task completed via message queue!"
Dart: ReceivePort closed, native notifier stopped.
Native: Signaled notifier thread to stop.
Native Notifier Thread: Exiting.
这个示例展示了如何在原生层实现更复杂的线程管理和消息队列机制。一个原生工作线程生成数据并将其放入一个线程安全的消息队列。另一个专门的原生通知线程从队列中取出数据,并负责将其安全地发送给 Dart Isolate。这种模式在原生库本身需要复杂并发控制时非常有用,它将原生并发的复杂性与 Dart 的异步回调机制清晰地分离。
UI 线程安全与状态更新
在 Flutter FFI 的上下文中,UI 线程安全是一个不容妥协的原则。所有涉及 Flutter Widget 树修改的操作都必须在主 Dart Isolate 上执行。
强制在主 Dart Isolate 更新 UI
Flutter 框架对 UI 更新有严格的要求。如果你尝试在非主 Isolate 的上下文中调用 setState 或其他 UI 更新方法,Flutter 会抛出错误或导致未定义的行为。这意味着,无论你的数据是从 FFI 的 NativePort 回调、还是从后台 Dart Isolate 接收,最终更新 UI 的逻辑都必须发生在主 Dart Isolate 中。
setState 的调用上下文
当 ReceivePort 在主 Dart Isolate 中监听消息时,其 listen 回调函数本身就运行在主 Isolate 的事件循环中。因此,在这个回调函数内部直接调用 setState 是完全安全的。
// main.dart (Flutter Widget)
import 'package:flutter/material.dart';
import 'dart:isolate';
import 'dart:ffi';
// ... other FFI imports and definitions for native calls
class MyFFIApp extends StatefulWidget {
@override
_MyFFIAppState createState() => _MyFFIAppState();
}
class _MyFFIAppState extends State<MyFFIApp> {
String _statusMessage = 'Waiting for native task...';
int _progress = 0;
ReceivePort? _receivePort;
@override
void initState() {
super.initState();
_initFFIAndStartTask();
}
void _initFFIAndStartTask() {
// Initialize Dart API DL for native callbacks (only once)
initDartApiDl(NativeApi.initializeApiDLData);
_receivePort = ReceivePort();
final int nativePort = _receivePort!.sendPort.nativePort;
registerPort(nativePort);
print('Dart UI: Registered native port $nativePort with native code.');
_receivePort!.listen((message) {
// This callback runs on the main Dart Isolate, so setState is safe here.
if (message is int) {
setState(() {
_progress = message;
_statusMessage = 'Progress: $_progress/5';
});
print('Dart UI: Received progress: $message');
} else if (message is String) {
setState(() {
_statusMessage = message;
_progress = 5; // Indicate completion
});
print('Dart UI: Received final result: "$message"');
_receivePort!.close();
_receivePort = null;
stopNativeNotifier(); // Call native cleanup
} else {
print('Dart UI: Received unknown message type: $message');
}
});
startNativeWorkersAndNotifier();
print('Dart UI: Started native workers and notifier.');
}
@override
void dispose() {
_receivePort?.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('Flutter FFI Async Demo')),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
CircularProgressIndicator(value: _progress / 5.0),
SizedBox(height: 20),
Text(_statusMessage, style: TextStyle(fontSize: 18)),
SizedBox(height: 20),
ElevatedButton(
onPressed: _receivePort == null ? null : () => print('Task running...'),
child: Text('Simulate UI interaction'),
),
],
),
),
);
}
}
void main() {
runApp(MaterialApp(home: MyFFIApp()));
}
在这个 Flutter Widget 示例中,_receivePort!.listen 的回调直接在主 Isolate 上执行,因此可以安全地调用 setState 来更新 UI 状态 (_statusMessage 和 _progress)。