尊敬的各位开发者,
欢迎来到今天的讲座。我们将深入探讨Dart FFI中一个至关重要且常被误解的主题:异步回调的Isolate端口管理,特别是ReceivePort的生命周期与使用策略。在现代应用开发中,利用原生代码库的能力是提升性能、访问系统API或复用现有C/C++代码的常见需求。Dart的FFI(Foreign Function Interface)为我们提供了这座桥梁。然而,当原生代码需要异步地将结果或事件通知回Dart时,问题就变得复杂起来,尤其是在Dart的并发模型——Isolate——的背景下。
我们将从FFI的基础开始,逐步构建起对异步回调机制的理解,详细讲解SendPort和ReceivePort如何成为跨Isolate通信的关键,并最终聚焦于ReceivePort的创建、使用、销毁及高级管理策略。
第一章:Dart FFI与异步回调的挑战
1.1 FFI:连接Dart与原生世界的桥梁
Dart FFI允许Dart代码直接调用C语言接口的函数,以及被C语言接口调用。这开启了Dart应用与操作系统API、高性能计算库、现有C/C++代码库等原生资源深度集成的可能性。
在Dart中,与FFI相关的核心概念包括:
dart:ffi库:提供了与原生代码交互的所有工具。DynamicLibrary:用于加载原生动态链接库(.so,.dylib,.dll)。lookupFunction:将原生函数指针转换为Dart函数。Pointer:表示原生内存地址。NativeFunction/DartFunction:定义原生函数和对应的Dart函数签名。
// 示例:一个简单的同步FFI调用
import 'dart:ffi';
import 'dart:io' show Platform;
// 1. 定义原生函数的C类型签名
typedef NativeAdd = Int32 Function(Int32 a, Int32 b);
// 2. 定义对应的Dart函数类型签名
typedef DartAdd = int Function(int a, int b);
void main() {
// 3. 加载动态库
// 假设我们有一个名为 'my_native_lib' 的库,其中包含一个 'add' 函数
final DynamicLibrary myNativeLib = Platform.isMacOS || Platform.isIOS
? DynamicLibrary.open('my_native_lib.dylib')
: Platform.isWindows
? DynamicLibrary.open('my_native_lib.dll')
: DynamicLibrary.open('my_native_lib.so');
// 4. 查找并绑定原生函数
final DartAdd add = myNativeLib
.lookupFunction<NativeAdd, DartAdd>('add');
// 5. 调用原生函数
final result = add(5, 7);
print('5 + 7 = $result'); // 期望输出 12
}
1.2 同步回调与异步回调:本质的区别
上述示例展示了同步FFI调用。当add(5, 7)被调用时,Dart代码会暂停执行,直到原生add函数返回结果。
然而,在许多实际场景中,原生操作可能是耗时或非阻塞的:
- 文件I/O:读取大文件、网络请求。
- 系统事件监听:例如文件系统变更、传感器数据。
- 长时间运行的计算:图像处理、AI推理。
- 复杂的UI组件:原生地图、视频播放器回调。
在这种情况下,我们不希望Dart主线程被阻塞。原生库通常会提供一个回调机制,允许它在操作完成或有新事件发生时,“调用回”应用程序代码。这就是异步回调。
挑战在于: 原生代码与Dart代码的执行环境可能完全不同。原生回调可能发生在任何线程上,而Dart代码运行在严格的Isolate模型中。
第二章:Dart的Isolate模型与跨Isolate通信
2.1 深入理解Dart Isolate
Dart语言是单线程模型的,但通过Isolate实现了并发。
- 内存隔离:每个Isolate都有自己独立的内存堆,变量不能直接在Isolate之间共享。这意味着没有共享内存并发带来的锁和竞态条件问题。
- 事件循环:每个Isolate都有自己的事件循环。UI更新、网络请求、定时器等所有异步操作都在各自Isolate的事件循环中排队执行。
- 天然的并发单元:Flutter应用的主UI线程就是一个Isolate。当我们需要执行耗时操作而不阻塞UI时,通常会创建一个新的后台Isolate。
2.2 原生回调的Isolate困境
当原生代码通过FFI调用Dart函数时,这个Dart函数会在哪个Isolate上执行呢?
答案是:它会在注册该回调的Isolate上执行。
假设你在主UI Isolate上注册了一个FFI回调,那么当原生代码触发这个回调时,它就会在UI Isolate上执行。如果这个回调执行时间过长,它会阻塞UI Isolate,导致应用卡顿。
更复杂的情况是,原生代码可能在它自己的某个后台线程中触发回调。Dart FFI机制会尽力将这个回调调度到注册它的Dart Isolate的事件循环中。但这仍然意味着:
- 如果回调数据需要在UI Isolate上展示,直接在另一个后台Isolate注册回调是不足够的。
- 如果回调频率非常高,即使在注册它的后台Isolate上执行,也可能导致该Isolate的任务队列过载。
核心问题: 原生代码不知道Dart的Isolate概念。它只知道一个函数指针。我们如何才能将原生回调的数据,安全、高效地从原生世界,跨越可能存在的线程边界,最终送达目标Dart Isolate(例如,主UI Isolate)?
2.3 SendPort与ReceivePort:跨Isolate的信使
Dart提供了一套专门用于Isolate间通信的机制:SendPort和ReceivePort。
ReceivePort:一个Isolate可以通过创建一个ReceivePort来监听传入的消息。它有一个内部队列,用于接收其他Isolate发送的消息。SendPort:每个ReceivePort都有一个对应的SendPort。SendPort可以被发送到其他Isolate。一旦其他Isolate持有了这个SendPort,它就可以向原始的ReceivePort发送消息。
通信模型:
- 目标Isolate (例如,UI Isolate) 创建一个
ReceivePort。 - 目标Isolate 获取其
ReceivePort的SendPort。 - 目标Isolate 将这个
SendPort发送给另一个Isolate (例如,一个执行FFI操作的后台Isolate,或者通过FFI传递给原生代码)。 - 其他Isolate或原生代码通过这个
SendPort向目标Isolate发送消息。 - 目标Isolate的
ReceivePort监听这些消息,并在其事件循环中处理它们。
关键点: SendPort.nativePort属性。这个属性返回一个int64,代表了SendPort在原生内存中的地址或标识符。这个int64可以安全地通过FFI传递给原生代码。原生代码不需要理解SendPort是什么,它只需要在需要回调时,将这个int64以及回调数据一起传递给一个特殊的Dart FFI回调函数。这个特殊的Dart FFI回调函数再利用这个int64重建SendPort,并发送消息。
第三章:构建异步FFI回调机制
现在,我们来详细构建一个完整的异步FFI回调系统。我们将以一个模拟文件下载进度的场景为例。原生库将模拟下载过程,并周期性地通过回调报告进度。
3.1 预备知识:Pointer.fromFunction
Pointer.fromFunction是Dart FFI中用于创建可供原生代码调用的Dart回调函数的关键。
// 1. 定义Dart回调的C签名
typedef NativeProgressCallback = Void Function(Int64 port, Double progress);
// 2. 定义Dart回调的Dart签名
typedef DartProgressCallback = void Function(int port, double progress);
// 3. 创建一个Dart函数作为原生回调的“入口”
// 这个函数必须是静态的或顶级的,因为它不能捕获任何外部上下文(闭包)
@pragma('vm:entry-point') // 确保在AOT编译时不会被优化掉
void _progressCallback(int port, double progress) {
// 在这里处理原生回调
// 但我们如何知道哪个ReceivePort需要这个进度?
// 这就是'port'参数的作用
print('Received progress from native: $progress for port $port');
}
// 4. 获取原生可调用的函数指针
final Pointer<NativeFunction<NativeProgressCallback>> progressCallbackPtr =
Pointer.fromFunction<NativeProgressCallback>(_progressCallback, 0.0); // 第二个参数是发生异常时的默认返回值
注意_progressCallback函数必须是顶级函数或静态函数,并且通常需要@pragma('vm:entry-point')注解。这是因为原生代码调用的是一个固定的内存地址,它不能像Dart闭包那样捕获外部环境。
3.2 原生C/C++代码设计
我们假设有一个简单的C库,用于模拟下载。
my_native_lib.h:
#ifndef MY_NATIVE_LIB_H
#define MY_NATIVE_LIB_H
#include <stdint.h> // For int64_t
// 定义回调函数类型
typedef void (*ProgressCallback)(int64_t port, double progress);
// 注册回调函数和用户数据(这里是SendPort的nativePort)
void register_progress_callback(ProgressCallback cb);
// 模拟一个异步下载操作
// 注意:这个函数会启动一个后台线程来模拟下载,并在该线程中调用回调
void start_download_async(int64_t port_id, const char* filename);
#endif // MY_NATIVE_LIB_H
my_native_lib.c:
#include "my_native_lib.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> // For sleep
#include <pthread.h> // For threading
// 存储Dart传入的回调函数指针
static ProgressCallback global_progress_callback = NULL;
// 线程参数结构体
typedef struct {
int64_t port_id;
const char* filename;
} DownloadArgs;
// 模拟下载任务的线程函数
void* download_task(void* arg) {
DownloadArgs* args = (DownloadArgs*)arg;
int64_t current_port_id = args->port_id;
const char* current_filename = args->filename;
printf("Native: Starting download for %s on thread %lun", current_filename, pthread_self());
for (int i = 0; i <= 10; i++) {
double progress = i * 10.0;
if (global_progress_callback != NULL) {
// 调用Dart的回调函数,传入port_id和进度
global_progress_callback(current_port_id, progress);
}
usleep(500 * 1000); // 模拟0.5秒的下载
}
// 下载完成,发送100%进度
if (global_progress_callback != NULL) {
global_progress_callback(current_port_id, 100.0);
}
printf("Native: Download for %s finished.n", current_filename);
free(args); // 释放线程参数
return NULL;
}
// FFI接口:注册回调
void register_progress_callback(ProgressCallback cb) {
global_progress_callback = cb;
printf("Native: Progress callback registered.n");
}
// FFI接口:开始异步下载
void start_download_async(int64_t port_id, const char* filename) {
if (global_progress_callback == NULL) {
fprintf(stderr, "Native: Error: Callback not registered before starting download.n");
return;
}
DownloadArgs* args = (DownloadArgs*)malloc(sizeof(DownloadArgs));
if (args == NULL) {
perror("Failed to allocate DownloadArgs");
return;
}
args->port_id = port_id;
args->filename = filename; // 注意:这里简单传递字符串指针,实际应用中可能需要深拷贝
pthread_t tid;
// 创建一个新线程来执行下载任务
if (pthread_create(&tid, NULL, download_task, (void*)args) != 0) {
perror("Failed to create download thread");
free(args);
} else {
pthread_detach(tid); // 让线程在结束时自动释放资源
}
}
编译这个C文件为动态库(例如my_native_lib.so)。
3.3 Dart FFI绑定与“跳板”回调(Trampoline Callback)
现在,我们要在Dart中定义FFI绑定,并实现一个“跳板”回调函数。这个函数是Pointer.fromFunction创建的,它接收原生回调,并负责将数据转发到正确的ReceivePort。
dart_ffi_bindings.dart:
import 'dart:ffi';
import 'dart:isolate';
import 'dart:io' show Platform;
// 1. 加载原生库
final DynamicLibrary _nativeLib = Platform.isMacOS || Platform.isIOS
? DynamicLibrary.open('my_native_lib.dylib')
: Platform.isWindows
? DynamicLibrary.open('my_native_lib.dll')
: DynamicLibrary.open('my_native_lib.so');
// 2. 定义原生函数类型签名
// C函数签名:void register_progress_callback(void (*cb)(int64_t port, double progress));
typedef _NativeRegisterProgressCallback = Void Function(Pointer<NativeFunction<NativeProgressCallback>> cb);
typedef _DartRegisterProgressCallback = void Function(Pointer<NativeFunction<NativeProgressCallback>> cb);
// C函数签名:void start_download_async(int64_t port_id, const char* filename);
typedef _NativeStartDownloadAsync = Void Function(Int64 port_id, Pointer<Utf8> filename);
typedef _DartStartDownloadAsync = void Function(int port_id, Pointer<Utf8> filename);
// 3. 查找并绑定原生函数
final _registerProgressCallback = _nativeLib.lookupFunction<
_NativeRegisterProgressCallback,
_DartRegisterProgressCallback>('register_progress_callback');
final _startDownloadAsync = _nativeLib.lookupFunction<
_NativeStartDownloadAsync,
_DartStartDownloadAsync>('start_download_async');
// 4. 定义Dart回调函数及其原生签名
// 这个回调函数将作为“跳板”接收来自原生的数据
typedef NativeProgressCallback = Void Function(Int64 port, Double progress);
typedef DartProgressCallback = void Function(int port, double progress);
// 5. 实现“跳板”回调函数
// 这是一个顶级函数,因为它将被Pointer.fromFunction引用,不能是闭包。
// 它的职责是接收原生数据,并将其发送到正确的Dart Isolate。
@pragma('vm:entry-point')
void _dartProgressTrampoline(int port, double progress) {
// 从原生端口ID重建SendPort
final SendPort sendPort = SendPort.fromRawPort(port);
if (sendPort == null) {
// 这通常不应该发生,除非port ID无效
print('Error: Could not reconstruct SendPort from raw port: $port');
return;
}
// 将数据发送到目标ReceivePort
sendPort.send(progress);
}
// 6. 获取“跳板”回调的指针
final Pointer<NativeFunction<NativeProgressCallback>> _progressTrampolinePtr =
Pointer.fromFunction<NativeProgressCallback>(_dartProgressTrampoline, 0.0); // 0.0是发生异常时的默认返回值
// 7. 封装FFI操作的Dart类
class NativeDownloader {
static bool _callbackRegistered = false;
NativeDownloader() {
if (!_callbackRegistered) {
// 在第一次实例化时注册回调
_registerProgressCallback(_progressTrampolinePtr);
_callbackRegistered = true;
}
}
// 启动下载并返回一个Stream,用于接收进度更新
Stream<double> downloadFile(String filename) {
// 1. 创建一个ReceivePort来接收原生回调
final ReceivePort receivePort = ReceivePort();
// 2. 获取ReceivePort对应的SendPort的nativePort,并将其传递给原生
final int sendPortRaw = receivePort.sendPort.nativePort;
// 3. 将文件名转换为C字符串
final Pointer<Utf8> filenameC = filename.toNativeUtf8();
// 4. 调用原生函数开始下载
_startDownloadAsync(sendPortRaw, filenameC);
// 5. 释放C字符串内存
calloc.free(filenameC);
// 6. 将ReceivePort转换为Stream,方便Dart异步处理
// 并在流关闭时关闭ReceivePort
return receivePort.cast<double>().asBroadcastStream()
..listen(
null, // 不需要额外的监听器,StreamController会处理
onDone: () {
print('Dart: ReceivePort for $filename download closed.');
receivePort.close();
},
onError: (error) {
print('Dart: Error on ReceivePort for $filename: $error');
receivePort.close();
},
);
}
}
3.4 ReceivePort的创建、监听与销毁
在NativeDownloader.downloadFile方法中,我们看到了ReceivePort的完整生命周期管理:
- 创建 (
final ReceivePort receivePort = ReceivePort();):每当启动一个需要回调的新异步操作时,我们都创建一个新的ReceivePort。这是为了隔离不同操作的回调消息,避免混淆。如果多个操作共享一个ReceivePort,消息就需要包含一个标识符来区分它们属于哪个操作,这会增加复杂性。 - 获取
SendPort的nativePort(final int sendPortRaw = receivePort.sendPort.nativePort;):这是将ReceivePort与原生世界连接的关键。int类型的nativePort可以安全地传递给原生代码。 - 监听 (
return receivePort.cast<double>().asBroadcastStream()..listen(...)):ReceivePort本身是一个Stream。我们可以像处理任何Dart Stream一样处理它。cast<double>()是为了将接收到的Object?转换为预期的double类型。asBroadcastStream()使得可以有多个监听器,虽然在这个场景中可能只有一个。 - 销毁 (
receivePort.close();):这是最重要的一步。当异步操作完成(无论是成功、失败还是取消),或者不再需要接收回调时,必须调用receivePort.close()。
为什么销毁如此重要?
- 资源释放:每个
ReceivePort都持有系统资源。如果不关闭,这些资源会一直占用,导致内存泄漏。 - 避免僵尸Isolate:如果一个Isolate只因为一个未关闭的
ReceivePort而无法被垃圾回收,它将成为一个“僵尸Isolate”,继续占用内存和CPU周期。 - 清理事件循环:
ReceivePort上的监听器会在Isolate的事件循环中注册任务。关闭它会移除这些任务,保持事件循环的整洁高效。
3.5 完整的Dart应用示例
将上述组件整合到一个简单的Dart控制台应用中。
main.dart:
import 'dart:async';
import 'dart:ffi';
import 'dart:io';
import 'package:ffi/ffi.dart'; // For .toNativeUtf8() and calloc
// 引入我们的FFI绑定
import 'dart_ffi_bindings.dart';
void main() async {
print('Dart: Application started.');
// 实例化原生下载器
final NativeDownloader downloader = NativeDownloader();
// 启动两个并发下载任务
final Completer<void> download1Completer = Completer<void>();
final Completer<void> download2Completer = Completer<void>();
print('Dart: Starting download of file1.txt');
downloader.downloadFile('file1.txt').listen(
(progress) {
print('Download 1 (file1.txt) Progress: ${progress.toStringAsFixed(2)}%');
if (progress >= 100.0) {
download1Completer.complete();
}
},
onError: (error) {
print('Download 1 (file1.txt) Error: $error');
download1Completer.completeError(error);
},
onDone: () {
print('Download 1 (file1.txt) Finished.');
},
);
print('Dart: Starting download of file2.txt');
downloader.downloadFile('file2.txt').listen(
(progress) {
print('Download 2 (file2.txt) Progress: ${progress.toStringAsFixed(2)}%');
if (progress >= 100.0) {
download2Completer.complete();
}
},
onError: (error) {
print('Download 2 (file2.txt) Error: $error');
download2Completer.completeError(error);
},
onDone: () {
print('Download 2 (file2.txt) Finished.');
},
);
// 等待所有下载完成
await Future.wait([download1Completer.future, download2Completer.future]);
print('Dart: All downloads completed. Exiting application.');
}
运行这个应用,你将看到来自两个不同下载任务的进度更新,它们通过各自的ReceivePort和SendPort机制异步地将数据传递回Dart主Isolate。
执行步骤:
- 保存C代码为
my_native_lib.c和my_native_lib.h。 - 使用C编译器编译共享库:
- Linux:
gcc -shared -o my_native_lib.so my_native_lib.c -pthread - macOS:
gcc -shared -o my_native_lib.dylib my_native_lib.c -pthread - Windows (MinGW-w64):
gcc -shared -o my_native_lib.dll my_native_lib.c -pthread
- Linux:
- 将编译好的库文件放到Dart程序能够找到的位置(例如,与
main.dart同目录)。 - 保存Dart代码为
dart_ffi_bindings.dart和main.dart。 - 运行Dart程序:
dart run main.dart
你将观察到类似这样的输出:
Dart: Application started.
Dart: Starting download of file1.txt
Dart: Starting download of file2.txt
Native: Progress callback registered.
Native: Starting download for file1.txt on thread 12345
Native: Starting download for file2.txt on thread 67890
Download 1 (file1.txt) Progress: 0.00%
Download 2 (file2.txt) Progress: 0.00%
Download 1 (file1.txt) Progress: 10.00%
Download 2 (file2.txt) Progress: 10.00%
...
Download 1 (file1.txt) Progress: 100.00%
Download 1 (file1.txt) Finished.
Dart: ReceivePort for file1.txt download closed.
Download 2 (file2.txt) Progress: 100.00%
Download 2 (file2.txt) Finished.
Dart: ReceivePort for file2.txt download closed.
Native: Download for file1.txt finished.
Native: Download for file2.txt finished.
Dart: All downloads completed. Exiting application.
可以看到,_dartProgressTrampoline函数成功地接收了来自原生后台线程的回调,并通过SendPort.fromRawPort重建了SendPort,将进度发送到了主Isolate的相应ReceivePort。
第四章:ReceivePort管理的高级策略与最佳实践
上面的例子展示了“一操作一端口”的基本模式,它简单直接。但在更复杂的应用中,可能需要更精细的管理。
4.1 错误处理
原生代码中发生的错误也需要通过SendPort传回Dart。我们可以扩展消息的格式。
修改原生回调和Dart跳板:
my_native_lib.h (添加错误码):
// 定义回调函数类型,可以包含错误码
typedef void (*ProgressCallback)(int64_t port, double progress, int error_code);
// start_download_async 签名也需要更新
void start_download_async(int64_t port_id, const char* filename); // 假设这里不直接传error_code,而是通过回调传
my_native_lib.c (模拟错误):
// ... (之前的代码)
void* download_task(void* arg) {
DownloadArgs* args = (DownloadArgs*)arg;
int64_t current_port_id = args->port_id;
const char* current_filename = args->filename;
printf("Native: Starting download for %s on thread %lun", current_filename, pthread_self());
// 模拟一个随机错误
int error_code = 0;
if (rand() % 10 == 0) { // 10% 几率发生错误
error_code = -1; // 模拟一个错误码
printf("Native: Simulated error for %s!n", current_filename);
}
for (int i = 0; i <= 10; i++) {
if (error_code != 0) {
// 如果有错误,直接发送错误并结束
if (global_progress_callback != NULL) {
global_progress_callback(current_port_id, -1.0, error_code); // 进度设为-1表示错误
}
break;
}
double progress = i * 10.0;
if (global_progress_callback != NULL) {
global_progress_callback(current_port_id, progress, 0); // 正常进度,无错误
}
usleep(500 * 1000);
}
if (error_code == 0 && global_progress_callback != NULL) {
global_progress_callback(current_port_id, 100.0, 0);
}
printf("Native: Download for %s finished (error: %d).n", current_filename, error_code);
free(args);
return NULL;
}
// ... (其他函数)
dart_ffi_bindings.dart (更新回调签名和跳板逻辑):
// 更新NativeProgressCallback和DartProgressCallback签名
typedef NativeProgressCallback = Void Function(Int64 port, Double progress, Int32 errorCode);
typedef DartProgressCallback = void Function(int port, double progress, int errorCode);
// 更新_dartProgressTrampoline
@pragma('vm:entry-point')
void _dartProgressTrampoline(int port, double progress, int errorCode) {
final SendPort sendPort = SendPort.fromRawPort(port);
if (sendPort == null) {
print('Error: Could not reconstruct SendPort from raw port: $port');
return;
}
// 发送一个包含进度和错误码的Map或List
sendPort.send({'progress': progress, 'errorCode': errorCode});
}
// 更新NativeDownloader.downloadFile
class NativeDownloader {
// ... (其他代码)
Stream<Map<String, dynamic>> downloadFile(String filename) {
final ReceivePort receivePort = ReceivePort();
final int sendPortRaw = receivePort.sendPort.nativePort;
final Pointer<Utf8> filenameC = filename.toNativeUtf8();
_startDownloadAsync(sendPortRaw, filenameC);
calloc.free(filenameC);
return receivePort.cast<Map<String, dynamic>>().asBroadcastStream()
..listen(
null,
onDone: () {
print('Dart: ReceivePort for $filename download closed.');
receivePort.close();
},
onError: (error) {
print('Dart: Error on ReceivePort for $filename: $error');
receivePort.close();
},
);
}
}
main.dart (更新监听器):
void main() async {
// ... (之前的代码)
downloader.downloadFile('file1.txt').listen(
(data) {
final progress = data['progress'] as double;
final errorCode = data['errorCode'] as int;
if (errorCode != 0) {
print('Download 1 (file1.txt) FAILED with error code: $errorCode');
download1Completer.completeError('Native error: $errorCode');
return;
}
print('Download 1 (file1.txt) Progress: ${progress.toStringAsFixed(2)}%');
if (progress >= 100.0) {
download1Completer.complete();
}
},
// ...
);
downloader.downloadFile('file2.txt').listen(
(data) {
final progress = data['progress'] as double;
final errorCode = data['errorCode'] as int;
if (errorCode != 0) {
print('Download 2 (file2.txt) FAILED with error code: $errorCode');
download2Completer.completeError('Native error: $errorCode');
return;
}
print('Download 2 (file2.txt) Progress: ${progress.toStringAsFixed(2)}%');
if (progress >= 100.0) {
download2Completer.complete();
}
},
// ...
);
// ...
}
通过发送Map或其他结构化的数据,可以轻松地在原生回调中传递多种类型的信息,包括错误状态、进度、结果数据等。
4.2 数据序列化
SendPort.send()方法可以发送以下类型的数据:
null- 基本类型:
int,double,bool,String List<Object?>Map<Object?, Object?>SendPort(是的,你可以在Isolate之间发送SendPort,这在构建复杂的通信网络时非常有用)TypedData(例如Uint8List)Capability(用于权限系统)
对于更复杂的数据结构,你需要手动将其序列化为上述可发送的类型(例如,JSON字符串、Map、List或Uint8List),在接收端再反序列化。
4.3 ReceivePort的多种管理策略
4.3.1 单次使用端口 (One-Shot Port)
如我们示例所示,为每个FFI异步操作创建一个新的ReceivePort。
- 优点:简单,消息隔离,易于生命周期管理。
- 缺点:对于大量短期操作,频繁创建和销毁
ReceivePort可能带来轻微开销。
适用场景:文件下载、一次性传感器读取、单个网络请求等。
4.3.2 长生命周期服务端口 (Long-Lived Service Port)
一个专用的FFI服务或管理器类可以拥有一个长生命周期的ReceivePort。所有来自原生的回调都通过这个端口发送。
- 优点:减少端口创建/销毁开销,集中管理所有FFI事件。
- 缺点:消息需要包含一个“类型”或“请求ID”来区分是哪个操作的回调,增加了消息处理的复杂性。
消息结构示例 (Map):
| Key | Value Type | Description |
|---|---|---|
'type' |
String |
消息类型(例如 'progress', 'result', 'error') |
'id' |
int or String |
关联到特定请求的唯一ID |
'data' |
Object? |
实际的回调数据(例如进度值、结果对象) |
'errorCode' |
int |
错误码(如果有的话) |
// 在NativeDownloader类中
// 假设_sendPortToService是发送给原生库的SendPort,它属于一个ServiceReceivePort
// 原生代码会把这个SendPort的nativePort存起来,并用它来发送所有回调
// Dart跳板回调 (只有一个,处理所有事件)
@pragma('vm:entry-point')
void _unifiedTrampoline(int port, int eventType, int requestId, dynamic data) {
final SendPort sendPort = SendPort.fromRawPort(port);
if (sendPort == null) return;
sendPort.send({'type': eventType, 'id': requestId, 'data': data});
}
// FFI服务类
class FfiService {
final ReceivePort _serviceReceivePort = ReceivePort();
final Map<int, Completer<dynamic>> _requestCompleters = {};
int _nextRequestId = 0;
FfiService() {
_serviceReceivePort.listen((message) {
final Map<String, dynamic> msg = message as Map<String, dynamic>;
final int requestId = msg['id'] as int;
final int eventType = msg['type'] as int; // 1: progress, 2: result, 3: error
final dynamic data = msg['data'];
if (_requestCompleters.containsKey(requestId)) {
final completer = _requestCompleters[requestId]!;
if (eventType == 1) { // Progress
// 可以通过StreamController发送进度
} else if (eventType == 2) { // Result
completer.complete(data);
_requestCompleters.remove(requestId);
} else if (eventType == 3) { // Error
completer.completeError(data);
_requestCompleters.remove(requestId);
}
}
});
// 将_serviceReceivePort.sendPort.nativePort传递给原生库注册
// _registerUnifiedCallback(_unifiedTrampolinePtr, _serviceReceivePort.sendPort.nativePort);
}
Future<dynamic> performOperation(String operationName, dynamic args) async {
final requestId = _nextRequestId++;
final completer = Completer<dynamic>();
_requestCompleters[requestId] = completer;
// 调用原生FFI函数,传入 requestId 和操作参数
// _startNativeOperation(requestId, operationName.toNativeUtf8(), ...);
return completer.future;
}
void dispose() {
_serviceReceivePort.close();
// 清理所有未完成的completers
}
}
适用场景:复杂的Ffi库,需要管理多种类型事件和多个并发请求,例如一个图像处理库可能同时进行多个图像的滤镜操作,每个操作都有进度和最终结果。
4.3.3 与StreamController结合
ReceivePort本身就是一个Stream,但它是一个单订阅流。如果需要多订阅(例如,多个UI组件监听同一个FFI事件),或者需要更细粒度地控制流的生命周期,可以将其包装到StreamController中。
class FfiManager {
final ReceivePort _receivePort = ReceivePort();
// 使用一个BroadcastStreamController来允许多个监听器
final StreamController<double> _progressController = StreamController<double>.broadcast();
Stream<double> get progressStream => _progressController.stream;
FfiManager() {
// 监听ReceivePort,并将收到的消息添加到StreamController
_receivePort.listen((message) {
if (message is double) {
_progressController.add(message);
} else if (message is String && message == 'done') {
_progressController.close(); // 当收到“完成”消息时关闭流
_receivePort.close(); // 同时关闭ReceivePort
}
});
// 将_receivePort.sendPort.nativePort传递给原生库注册
// _registerSomeCallback(_progressTrampolinePtr, _receivePort.sendPort.nativePort);
}
void dispose() {
// 确保在管理器销毁时关闭所有资源
_progressController.close();
_receivePort.close();
}
}
通过StreamController,我们可以对事件流进行更多的控制,例如在某个条件满足时关闭流,或者在流中插入额外的处理逻辑。
4.4 Isolate.spawn与后台Isolate
对于耗时的FFI操作,即使回调最终通过SendPort回到主UI Isolate,FFI调用的发起本身也可能阻塞UI。在这种情况下,应该在后台Isolate中执行FFI调用。
- 主Isolate:
- 创建自己的
ReceivePort,用于接收后台Isolate的SendPort和最终结果。 Isolate.spawn启动后台Isolate,并把自己的SendPort传给后台Isolate。
- 创建自己的
- 后台Isolate:
- 创建自己的
ReceivePort。 - 把自己的
SendPort发送回主Isolate。 - 接收主Isolate发来的FFI请求。
- 为每个FFI请求,创建它自己的
ReceivePort,并将其SendPort.nativePort传递给原生。 - 后台Isolate的FFI回调跳板会接收原生数据,并通过FFI请求对应的
SendPort发回给后台Isolate的ReceivePort。 - 后台Isolate处理完数据后,将最终结果发回主Isolate。
- 创建自己的
这是一个多层通信的复杂结构,但其核心仍然是SendPort和ReceivePort的协同工作。
4.5 NativeCallable (Dart 3.x+)
在Dart 3.x及更高版本中,NativeCallable是创建原生可调用Dart回调的推荐方式,它提供了比Pointer.fromFunction更好的生命周期管理和类型安全性。
NativeCallable可以从一个Dart函数或一个StaticDartRuntime.postCObject回调创建。
import 'dart:ffi';
import 'dart:isolate';
import 'package:ffi/ffi.dart';
// 假设我们有一个名为'my_native_lib'的库,其中包含一个'set_callback'函数
// 和一个'trigger_callback'函数
final DynamicLibrary _nativeLib = DynamicLibrary.process(); // 假设库已经加载
typedef NativeSetCallback = Void Function(Pointer<NativeFunction<NativeCallable<Void Function(Int64, Double)>>> cb_ptr);
typedef DartSetCallback = void Function(Pointer<NativeFunction<NativeCallable<Void Function(Int64, Double)>>> cb_ptr);
typedef NativeTriggerCallback = Void Function(Int64 port_id, Double value);
typedef DartTriggerCallback = void Function(int port_id, double value);
final _setCallback = _nativeLib.lookupFunction<NativeSetCallback, DartSetCallback>('set_callback');
final _triggerCallback = _nativeLib.lookupFunction<NativeTriggerCallback, DartTriggerCallback>('trigger_callback');
// 我们的Dart回调函数
@pragma('vm:entry-point')
void _myDartCallback(int port, double value) {
final SendPort sendPort = SendPort.fromRawPort(port);
if (sendPort == null) {
print('Error: Could not reconstruct SendPort from raw port: $port');
return;
}
sendPort.send(value);
}
void main() {
// 1. 创建一个NativeCallable
final nativeCallable = NativeCallable<Void Function(Int64, Double)>.listener(_myDartCallback);
// 2. 将NativeCallable的指针传递给原生代码
_setCallback(nativeCallable.nativeFunction);
// 3. 准备一个ReceivePort来接收回调
final receivePort = ReceivePort();
receivePort.listen((message) {
print('Received from native via NativeCallable: $message');
if (message == 100.0) {
// 达到100%时,关闭port并释放NativeCallable
receivePort.close();
nativeCallable.close();
print('NativeCallable and ReceivePort closed.');
}
});
// 4. 触发原生回调,并传入SendPort的nativePort
_triggerCallback(receivePort.sendPort.nativePort, 0.0); // 假设原生会逐步增加这个值
}
NativeCallable.listener创建的回调会确保在回调执行时,Dart运行时是活跃的。NativeCallable.close()方法用于在不再需要回调时安全地释放原生资源。这使得NativeCallable在管理FFI回调的生命周期方面更加健壮和方便。它的使用模式与Pointer.fromFunction非常相似,但更推荐。
5. ReceivePort管理总结
Dart FFI中的异步回调是连接高性能原生代码与响应式Dart应用的关键环节。ReceivePort是实现这一机制的核心组件,它允许不同Isolate间进行安全、高效的消息传递。
妥善管理ReceivePort的生命周期至关重要:
- 创建:根据业务需求,为每个异步操作创建独立的
ReceivePort,或使用一个长生命周期的服务端口。 - 传递:将
ReceivePort的SendPort.nativePort作为int64传递给原生代码。 - 跳板:实现一个Dart顶层/静态函数作为原生回调的“跳板”,它负责从
int64重建SendPort并发送消息。 - 监听:在目标Isolate中监听
ReceivePort的Stream,处理接收到的消息。 - 销毁:在操作完成、出错或不再需要时,务必调用
receivePort.close()以释放资源,避免内存泄漏和僵尸Isolate。
通过理解和实践这些原则,你将能够构建出健壮、高效且易于维护的Dart FFI异步回调系统。