Dart FFI 异步回调的 Isolate 端口(ReceivePort)管理

尊敬的各位开发者,

欢迎来到今天的讲座。我们将深入探讨Dart FFI中一个至关重要且常被误解的主题:异步回调的Isolate端口管理,特别是ReceivePort的生命周期与使用策略。在现代应用开发中,利用原生代码库的能力是提升性能、访问系统API或复用现有C/C++代码的常见需求。Dart的FFI(Foreign Function Interface)为我们提供了这座桥梁。然而,当原生代码需要异步地将结果或事件通知回Dart时,问题就变得复杂起来,尤其是在Dart的并发模型——Isolate——的背景下。

我们将从FFI的基础开始,逐步构建起对异步回调机制的理解,详细讲解SendPortReceivePort如何成为跨Isolate通信的关键,并最终聚焦于ReceivePort的创建、使用、销毁及高级管理策略。


第一章:Dart FFI与异步回调的挑战

1.1 FFI:连接Dart与原生世界的桥梁

Dart FFI允许Dart代码直接调用C语言接口的函数,以及被C语言接口调用。这开启了Dart应用与操作系统API、高性能计算库、现有C/C++代码库等原生资源深度集成的可能性。

在Dart中,与FFI相关的核心概念包括:

  • dart:ffi:提供了与原生代码交互的所有工具。
  • DynamicLibrary:用于加载原生动态链接库(.so, .dylib, .dll)。
  • lookupFunction:将原生函数指针转换为Dart函数。
  • Pointer:表示原生内存地址。
  • NativeFunction / DartFunction:定义原生函数和对应的Dart函数签名。
// 示例:一个简单的同步FFI调用
import 'dart:ffi';
import 'dart:io' show Platform;

// 1. 定义原生函数的C类型签名
typedef NativeAdd = Int32 Function(Int32 a, Int32 b);

// 2. 定义对应的Dart函数类型签名
typedef DartAdd = int Function(int a, int b);

void main() {
  // 3. 加载动态库
  // 假设我们有一个名为 'my_native_lib' 的库,其中包含一个 'add' 函数
  final DynamicLibrary myNativeLib = Platform.isMacOS || Platform.isIOS
      ? DynamicLibrary.open('my_native_lib.dylib')
      : Platform.isWindows
          ? DynamicLibrary.open('my_native_lib.dll')
          : DynamicLibrary.open('my_native_lib.so');

  // 4. 查找并绑定原生函数
  final DartAdd add = myNativeLib
      .lookupFunction<NativeAdd, DartAdd>('add');

  // 5. 调用原生函数
  final result = add(5, 7);
  print('5 + 7 = $result'); // 期望输出 12
}

1.2 同步回调与异步回调:本质的区别

上述示例展示了同步FFI调用。当add(5, 7)被调用时,Dart代码会暂停执行,直到原生add函数返回结果。

然而,在许多实际场景中,原生操作可能是耗时或非阻塞的:

  • 文件I/O:读取大文件、网络请求。
  • 系统事件监听:例如文件系统变更、传感器数据。
  • 长时间运行的计算:图像处理、AI推理。
  • 复杂的UI组件:原生地图、视频播放器回调。

在这种情况下,我们不希望Dart主线程被阻塞。原生库通常会提供一个回调机制,允许它在操作完成或有新事件发生时,“调用回”应用程序代码。这就是异步回调

挑战在于: 原生代码与Dart代码的执行环境可能完全不同。原生回调可能发生在任何线程上,而Dart代码运行在严格的Isolate模型中。


第二章:Dart的Isolate模型与跨Isolate通信

2.1 深入理解Dart Isolate

Dart语言是单线程模型的,但通过Isolate实现了并发。

  • 内存隔离:每个Isolate都有自己独立的内存堆,变量不能直接在Isolate之间共享。这意味着没有共享内存并发带来的锁和竞态条件问题。
  • 事件循环:每个Isolate都有自己的事件循环。UI更新、网络请求、定时器等所有异步操作都在各自Isolate的事件循环中排队执行。
  • 天然的并发单元:Flutter应用的主UI线程就是一个Isolate。当我们需要执行耗时操作而不阻塞UI时,通常会创建一个新的后台Isolate。

2.2 原生回调的Isolate困境

当原生代码通过FFI调用Dart函数时,这个Dart函数会在哪个Isolate上执行呢?
答案是:它会在注册该回调的Isolate上执行。

假设你在主UI Isolate上注册了一个FFI回调,那么当原生代码触发这个回调时,它就会在UI Isolate上执行。如果这个回调执行时间过长,它会阻塞UI Isolate,导致应用卡顿。

更复杂的情况是,原生代码可能在它自己的某个后台线程中触发回调。Dart FFI机制会尽力将这个回调调度到注册它的Dart Isolate的事件循环中。但这仍然意味着:

  1. 如果回调数据需要在UI Isolate上展示,直接在另一个后台Isolate注册回调是不足够的。
  2. 如果回调频率非常高,即使在注册它的后台Isolate上执行,也可能导致该Isolate的任务队列过载。

核心问题: 原生代码不知道Dart的Isolate概念。它只知道一个函数指针。我们如何才能将原生回调的数据,安全、高效地从原生世界,跨越可能存在的线程边界,最终送达目标Dart Isolate(例如,主UI Isolate)?

2.3 SendPortReceivePort:跨Isolate的信使

Dart提供了一套专门用于Isolate间通信的机制:SendPortReceivePort

  • ReceivePort:一个Isolate可以通过创建一个ReceivePort来监听传入的消息。它有一个内部队列,用于接收其他Isolate发送的消息。
  • SendPort:每个ReceivePort都有一个对应的SendPortSendPort可以被发送到其他Isolate。一旦其他Isolate持有了这个SendPort,它就可以向原始的ReceivePort发送消息。

通信模型:

  1. 目标Isolate (例如,UI Isolate) 创建一个ReceivePort
  2. 目标Isolate 获取其ReceivePortSendPort
  3. 目标Isolate 将这个SendPort发送给另一个Isolate (例如,一个执行FFI操作的后台Isolate,或者通过FFI传递给原生代码)。
  4. 其他Isolate或原生代码通过这个SendPort向目标Isolate发送消息。
  5. 目标Isolate的ReceivePort监听这些消息,并在其事件循环中处理它们。

关键点: SendPort.nativePort属性。这个属性返回一个int64,代表了SendPort在原生内存中的地址或标识符。这个int64可以安全地通过FFI传递给原生代码。原生代码不需要理解SendPort是什么,它只需要在需要回调时,将这个int64以及回调数据一起传递给一个特殊的Dart FFI回调函数。这个特殊的Dart FFI回调函数再利用这个int64重建SendPort,并发送消息。


第三章:构建异步FFI回调机制

现在,我们来详细构建一个完整的异步FFI回调系统。我们将以一个模拟文件下载进度的场景为例。原生库将模拟下载过程,并周期性地通过回调报告进度。

3.1 预备知识:Pointer.fromFunction

Pointer.fromFunction是Dart FFI中用于创建可供原生代码调用的Dart回调函数的关键。

// 1. 定义Dart回调的C签名
typedef NativeProgressCallback = Void Function(Int64 port, Double progress);

// 2. 定义Dart回调的Dart签名
typedef DartProgressCallback = void Function(int port, double progress);

// 3. 创建一个Dart函数作为原生回调的“入口”
// 这个函数必须是静态的或顶级的,因为它不能捕获任何外部上下文(闭包)
@pragma('vm:entry-point') // 确保在AOT编译时不会被优化掉
void _progressCallback(int port, double progress) {
  // 在这里处理原生回调
  // 但我们如何知道哪个ReceivePort需要这个进度?
  // 这就是'port'参数的作用
  print('Received progress from native: $progress for port $port');
}

// 4. 获取原生可调用的函数指针
final Pointer<NativeFunction<NativeProgressCallback>> progressCallbackPtr =
    Pointer.fromFunction<NativeProgressCallback>(_progressCallback, 0.0); // 第二个参数是发生异常时的默认返回值

注意_progressCallback函数必须是顶级函数或静态函数,并且通常需要@pragma('vm:entry-point')注解。这是因为原生代码调用的是一个固定的内存地址,它不能像Dart闭包那样捕获外部环境。

3.2 原生C/C++代码设计

我们假设有一个简单的C库,用于模拟下载。

my_native_lib.h:

#ifndef MY_NATIVE_LIB_H
#define MY_NATIVE_LIB_H

#include <stdint.h> // For int64_t

// 定义回调函数类型
typedef void (*ProgressCallback)(int64_t port, double progress);

// 注册回调函数和用户数据(这里是SendPort的nativePort)
void register_progress_callback(ProgressCallback cb);

// 模拟一个异步下载操作
// 注意:这个函数会启动一个后台线程来模拟下载,并在该线程中调用回调
void start_download_async(int64_t port_id, const char* filename);

#endif // MY_NATIVE_LIB_H

my_native_lib.c:

#include "my_native_lib.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> // For sleep
#include <pthread.h> // For threading

// 存储Dart传入的回调函数指针
static ProgressCallback global_progress_callback = NULL;

// 线程参数结构体
typedef struct {
    int64_t port_id;
    const char* filename;
} DownloadArgs;

// 模拟下载任务的线程函数
void* download_task(void* arg) {
    DownloadArgs* args = (DownloadArgs*)arg;
    int64_t current_port_id = args->port_id;
    const char* current_filename = args->filename;

    printf("Native: Starting download for %s on thread %lun", current_filename, pthread_self());

    for (int i = 0; i <= 10; i++) {
        double progress = i * 10.0;
        if (global_progress_callback != NULL) {
            // 调用Dart的回调函数,传入port_id和进度
            global_progress_callback(current_port_id, progress);
        }
        usleep(500 * 1000); // 模拟0.5秒的下载
    }

    // 下载完成,发送100%进度
    if (global_progress_callback != NULL) {
        global_progress_callback(current_port_id, 100.0);
    }
    printf("Native: Download for %s finished.n", current_filename);

    free(args); // 释放线程参数
    return NULL;
}

// FFI接口:注册回调
void register_progress_callback(ProgressCallback cb) {
    global_progress_callback = cb;
    printf("Native: Progress callback registered.n");
}

// FFI接口:开始异步下载
void start_download_async(int64_t port_id, const char* filename) {
    if (global_progress_callback == NULL) {
        fprintf(stderr, "Native: Error: Callback not registered before starting download.n");
        return;
    }

    DownloadArgs* args = (DownloadArgs*)malloc(sizeof(DownloadArgs));
    if (args == NULL) {
        perror("Failed to allocate DownloadArgs");
        return;
    }
    args->port_id = port_id;
    args->filename = filename; // 注意:这里简单传递字符串指针,实际应用中可能需要深拷贝

    pthread_t tid;
    // 创建一个新线程来执行下载任务
    if (pthread_create(&tid, NULL, download_task, (void*)args) != 0) {
        perror("Failed to create download thread");
        free(args);
    } else {
        pthread_detach(tid); // 让线程在结束时自动释放资源
    }
}

编译这个C文件为动态库(例如my_native_lib.so)。

3.3 Dart FFI绑定与“跳板”回调(Trampoline Callback)

现在,我们要在Dart中定义FFI绑定,并实现一个“跳板”回调函数。这个函数是Pointer.fromFunction创建的,它接收原生回调,并负责将数据转发到正确的ReceivePort

dart_ffi_bindings.dart:

import 'dart:ffi';
import 'dart:isolate';
import 'dart:io' show Platform;

// 1. 加载原生库
final DynamicLibrary _nativeLib = Platform.isMacOS || Platform.isIOS
    ? DynamicLibrary.open('my_native_lib.dylib')
    : Platform.isWindows
        ? DynamicLibrary.open('my_native_lib.dll')
        : DynamicLibrary.open('my_native_lib.so');

// 2. 定义原生函数类型签名

// C函数签名:void register_progress_callback(void (*cb)(int64_t port, double progress));
typedef _NativeRegisterProgressCallback = Void Function(Pointer<NativeFunction<NativeProgressCallback>> cb);
typedef _DartRegisterProgressCallback = void Function(Pointer<NativeFunction<NativeProgressCallback>> cb);

// C函数签名:void start_download_async(int64_t port_id, const char* filename);
typedef _NativeStartDownloadAsync = Void Function(Int64 port_id, Pointer<Utf8> filename);
typedef _DartStartDownloadAsync = void Function(int port_id, Pointer<Utf8> filename);

// 3. 查找并绑定原生函数
final _registerProgressCallback = _nativeLib.lookupFunction<
    _NativeRegisterProgressCallback,
    _DartRegisterProgressCallback>('register_progress_callback');

final _startDownloadAsync = _nativeLib.lookupFunction<
    _NativeStartDownloadAsync,
    _DartStartDownloadAsync>('start_download_async');

// 4. 定义Dart回调函数及其原生签名
// 这个回调函数将作为“跳板”接收来自原生的数据
typedef NativeProgressCallback = Void Function(Int64 port, Double progress);
typedef DartProgressCallback = void Function(int port, double progress);

// 5. 实现“跳板”回调函数
// 这是一个顶级函数,因为它将被Pointer.fromFunction引用,不能是闭包。
// 它的职责是接收原生数据,并将其发送到正确的Dart Isolate。
@pragma('vm:entry-point')
void _dartProgressTrampoline(int port, double progress) {
  // 从原生端口ID重建SendPort
  final SendPort sendPort = SendPort.fromRawPort(port);
  if (sendPort == null) {
    // 这通常不应该发生,除非port ID无效
    print('Error: Could not reconstruct SendPort from raw port: $port');
    return;
  }
  // 将数据发送到目标ReceivePort
  sendPort.send(progress);
}

// 6. 获取“跳板”回调的指针
final Pointer<NativeFunction<NativeProgressCallback>> _progressTrampolinePtr =
    Pointer.fromFunction<NativeProgressCallback>(_dartProgressTrampoline, 0.0); // 0.0是发生异常时的默认返回值

// 7. 封装FFI操作的Dart类
class NativeDownloader {
  static bool _callbackRegistered = false;

  NativeDownloader() {
    if (!_callbackRegistered) {
      // 在第一次实例化时注册回调
      _registerProgressCallback(_progressTrampolinePtr);
      _callbackRegistered = true;
    }
  }

  // 启动下载并返回一个Stream,用于接收进度更新
  Stream<double> downloadFile(String filename) {
    // 1. 创建一个ReceivePort来接收原生回调
    final ReceivePort receivePort = ReceivePort();

    // 2. 获取ReceivePort对应的SendPort的nativePort,并将其传递给原生
    final int sendPortRaw = receivePort.sendPort.nativePort;

    // 3. 将文件名转换为C字符串
    final Pointer<Utf8> filenameC = filename.toNativeUtf8();

    // 4. 调用原生函数开始下载
    _startDownloadAsync(sendPortRaw, filenameC);

    // 5. 释放C字符串内存
    calloc.free(filenameC);

    // 6. 将ReceivePort转换为Stream,方便Dart异步处理
    // 并在流关闭时关闭ReceivePort
    return receivePort.cast<double>().asBroadcastStream()
      ..listen(
        null, // 不需要额外的监听器,StreamController会处理
        onDone: () {
          print('Dart: ReceivePort for $filename download closed.');
          receivePort.close();
        },
        onError: (error) {
          print('Dart: Error on ReceivePort for $filename: $error');
          receivePort.close();
        },
      );
  }
}

3.4 ReceivePort的创建、监听与销毁

NativeDownloader.downloadFile方法中,我们看到了ReceivePort的完整生命周期管理:

  1. 创建 (final ReceivePort receivePort = ReceivePort();):每当启动一个需要回调的新异步操作时,我们都创建一个新的ReceivePort。这是为了隔离不同操作的回调消息,避免混淆。如果多个操作共享一个ReceivePort,消息就需要包含一个标识符来区分它们属于哪个操作,这会增加复杂性。
  2. 获取SendPortnativePort (final int sendPortRaw = receivePort.sendPort.nativePort;):这是将ReceivePort与原生世界连接的关键。int类型的nativePort可以安全地传递给原生代码。
  3. 监听 (return receivePort.cast<double>().asBroadcastStream()..listen(...))ReceivePort本身是一个Stream。我们可以像处理任何Dart Stream一样处理它。cast<double>()是为了将接收到的Object?转换为预期的double类型。asBroadcastStream()使得可以有多个监听器,虽然在这个场景中可能只有一个。
  4. 销毁 (receivePort.close();):这是最重要的一步。当异步操作完成(无论是成功、失败还是取消),或者不再需要接收回调时,必须调用receivePort.close()

为什么销毁如此重要?

  • 资源释放:每个ReceivePort都持有系统资源。如果不关闭,这些资源会一直占用,导致内存泄漏。
  • 避免僵尸Isolate:如果一个Isolate只因为一个未关闭的ReceivePort而无法被垃圾回收,它将成为一个“僵尸Isolate”,继续占用内存和CPU周期。
  • 清理事件循环ReceivePort上的监听器会在Isolate的事件循环中注册任务。关闭它会移除这些任务,保持事件循环的整洁高效。

3.5 完整的Dart应用示例

将上述组件整合到一个简单的Dart控制台应用中。

main.dart:

import 'dart:async';
import 'dart:ffi';
import 'dart:io';
import 'package:ffi/ffi.dart'; // For .toNativeUtf8() and calloc

// 引入我们的FFI绑定
import 'dart_ffi_bindings.dart';

void main() async {
  print('Dart: Application started.');

  // 实例化原生下载器
  final NativeDownloader downloader = NativeDownloader();

  // 启动两个并发下载任务
  final Completer<void> download1Completer = Completer<void>();
  final Completer<void> download2Completer = Completer<void>();

  print('Dart: Starting download of file1.txt');
  downloader.downloadFile('file1.txt').listen(
    (progress) {
      print('Download 1 (file1.txt) Progress: ${progress.toStringAsFixed(2)}%');
      if (progress >= 100.0) {
        download1Completer.complete();
      }
    },
    onError: (error) {
      print('Download 1 (file1.txt) Error: $error');
      download1Completer.completeError(error);
    },
    onDone: () {
      print('Download 1 (file1.txt) Finished.');
    },
  );

  print('Dart: Starting download of file2.txt');
  downloader.downloadFile('file2.txt').listen(
    (progress) {
      print('Download 2 (file2.txt) Progress: ${progress.toStringAsFixed(2)}%');
      if (progress >= 100.0) {
        download2Completer.complete();
      }
    },
    onError: (error) {
      print('Download 2 (file2.txt) Error: $error');
      download2Completer.completeError(error);
    },
    onDone: () {
      print('Download 2 (file2.txt) Finished.');
    },
  );

  // 等待所有下载完成
  await Future.wait([download1Completer.future, download2Completer.future]);

  print('Dart: All downloads completed. Exiting application.');
}

运行这个应用,你将看到来自两个不同下载任务的进度更新,它们通过各自的ReceivePortSendPort机制异步地将数据传递回Dart主Isolate。

执行步骤:

  1. 保存C代码为my_native_lib.cmy_native_lib.h
  2. 使用C编译器编译共享库:
    • Linux: gcc -shared -o my_native_lib.so my_native_lib.c -pthread
    • macOS: gcc -shared -o my_native_lib.dylib my_native_lib.c -pthread
    • Windows (MinGW-w64): gcc -shared -o my_native_lib.dll my_native_lib.c -pthread
  3. 将编译好的库文件放到Dart程序能够找到的位置(例如,与main.dart同目录)。
  4. 保存Dart代码为dart_ffi_bindings.dartmain.dart
  5. 运行Dart程序: dart run main.dart

你将观察到类似这样的输出:

Dart: Application started.
Dart: Starting download of file1.txt
Dart: Starting download of file2.txt
Native: Progress callback registered.
Native: Starting download for file1.txt on thread 12345
Native: Starting download for file2.txt on thread 67890
Download 1 (file1.txt) Progress: 0.00%
Download 2 (file2.txt) Progress: 0.00%
Download 1 (file1.txt) Progress: 10.00%
Download 2 (file2.txt) Progress: 10.00%
...
Download 1 (file1.txt) Progress: 100.00%
Download 1 (file1.txt) Finished.
Dart: ReceivePort for file1.txt download closed.
Download 2 (file2.txt) Progress: 100.00%
Download 2 (file2.txt) Finished.
Dart: ReceivePort for file2.txt download closed.
Native: Download for file1.txt finished.
Native: Download for file2.txt finished.
Dart: All downloads completed. Exiting application.

可以看到,_dartProgressTrampoline函数成功地接收了来自原生后台线程的回调,并通过SendPort.fromRawPort重建了SendPort,将进度发送到了主Isolate的相应ReceivePort


第四章:ReceivePort管理的高级策略与最佳实践

上面的例子展示了“一操作一端口”的基本模式,它简单直接。但在更复杂的应用中,可能需要更精细的管理。

4.1 错误处理

原生代码中发生的错误也需要通过SendPort传回Dart。我们可以扩展消息的格式。

修改原生回调和Dart跳板:

my_native_lib.h (添加错误码):

// 定义回调函数类型,可以包含错误码
typedef void (*ProgressCallback)(int64_t port, double progress, int error_code);

// start_download_async 签名也需要更新
void start_download_async(int64_t port_id, const char* filename); // 假设这里不直接传error_code,而是通过回调传

my_native_lib.c (模拟错误):

// ... (之前的代码)

void* download_task(void* arg) {
    DownloadArgs* args = (DownloadArgs*)arg;
    int64_t current_port_id = args->port_id;
    const char* current_filename = args->filename;

    printf("Native: Starting download for %s on thread %lun", current_filename, pthread_self());

    // 模拟一个随机错误
    int error_code = 0;
    if (rand() % 10 == 0) { // 10% 几率发生错误
        error_code = -1; // 模拟一个错误码
        printf("Native: Simulated error for %s!n", current_filename);
    }

    for (int i = 0; i <= 10; i++) {
        if (error_code != 0) {
            // 如果有错误,直接发送错误并结束
            if (global_progress_callback != NULL) {
                global_progress_callback(current_port_id, -1.0, error_code); // 进度设为-1表示错误
            }
            break;
        }
        double progress = i * 10.0;
        if (global_progress_callback != NULL) {
            global_progress_callback(current_port_id, progress, 0); // 正常进度,无错误
        }
        usleep(500 * 1000);
    }

    if (error_code == 0 && global_progress_callback != NULL) {
        global_progress_callback(current_port_id, 100.0, 0);
    }
    printf("Native: Download for %s finished (error: %d).n", current_filename, error_code);

    free(args);
    return NULL;
}

// ... (其他函数)

dart_ffi_bindings.dart (更新回调签名和跳板逻辑):

// 更新NativeProgressCallback和DartProgressCallback签名
typedef NativeProgressCallback = Void Function(Int64 port, Double progress, Int32 errorCode);
typedef DartProgressCallback = void Function(int port, double progress, int errorCode);

// 更新_dartProgressTrampoline
@pragma('vm:entry-point')
void _dartProgressTrampoline(int port, double progress, int errorCode) {
  final SendPort sendPort = SendPort.fromRawPort(port);
  if (sendPort == null) {
    print('Error: Could not reconstruct SendPort from raw port: $port');
    return;
  }
  // 发送一个包含进度和错误码的Map或List
  sendPort.send({'progress': progress, 'errorCode': errorCode});
}

// 更新NativeDownloader.downloadFile
class NativeDownloader {
  // ... (其他代码)

  Stream<Map<String, dynamic>> downloadFile(String filename) {
    final ReceivePort receivePort = ReceivePort();
    final int sendPortRaw = receivePort.sendPort.nativePort;
    final Pointer<Utf8> filenameC = filename.toNativeUtf8();
    _startDownloadAsync(sendPortRaw, filenameC);
    calloc.free(filenameC);

    return receivePort.cast<Map<String, dynamic>>().asBroadcastStream()
      ..listen(
        null,
        onDone: () {
          print('Dart: ReceivePort for $filename download closed.');
          receivePort.close();
        },
        onError: (error) {
          print('Dart: Error on ReceivePort for $filename: $error');
          receivePort.close();
        },
      );
  }
}

main.dart (更新监听器):

void main() async {
  // ... (之前的代码)

  downloader.downloadFile('file1.txt').listen(
    (data) {
      final progress = data['progress'] as double;
      final errorCode = data['errorCode'] as int;

      if (errorCode != 0) {
        print('Download 1 (file1.txt) FAILED with error code: $errorCode');
        download1Completer.completeError('Native error: $errorCode');
        return;
      }
      print('Download 1 (file1.txt) Progress: ${progress.toStringAsFixed(2)}%');
      if (progress >= 100.0) {
        download1Completer.complete();
      }
    },
    // ...
  );

  downloader.downloadFile('file2.txt').listen(
    (data) {
      final progress = data['progress'] as double;
      final errorCode = data['errorCode'] as int;

      if (errorCode != 0) {
        print('Download 2 (file2.txt) FAILED with error code: $errorCode');
        download2Completer.completeError('Native error: $errorCode');
        return;
      }
      print('Download 2 (file2.txt) Progress: ${progress.toStringAsFixed(2)}%');
      if (progress >= 100.0) {
        download2Completer.complete();
      }
    },
    // ...
  );

  // ...
}

通过发送Map或其他结构化的数据,可以轻松地在原生回调中传递多种类型的信息,包括错误状态、进度、结果数据等。

4.2 数据序列化

SendPort.send()方法可以发送以下类型的数据:

  • null
  • 基本类型:int, double, bool, String
  • List<Object?>
  • Map<Object?, Object?>
  • SendPort (是的,你可以在Isolate之间发送SendPort,这在构建复杂的通信网络时非常有用)
  • TypedData (例如Uint8List)
  • Capability (用于权限系统)

对于更复杂的数据结构,你需要手动将其序列化为上述可发送的类型(例如,JSON字符串、MapListUint8List),在接收端再反序列化。

4.3 ReceivePort的多种管理策略

4.3.1 单次使用端口 (One-Shot Port)

如我们示例所示,为每个FFI异步操作创建一个新的ReceivePort

  • 优点:简单,消息隔离,易于生命周期管理。
  • 缺点:对于大量短期操作,频繁创建和销毁ReceivePort可能带来轻微开销。

适用场景:文件下载、一次性传感器读取、单个网络请求等。

4.3.2 长生命周期服务端口 (Long-Lived Service Port)

一个专用的FFI服务或管理器类可以拥有一个长生命周期的ReceivePort。所有来自原生的回调都通过这个端口发送。

  • 优点:减少端口创建/销毁开销,集中管理所有FFI事件。
  • 缺点:消息需要包含一个“类型”或“请求ID”来区分是哪个操作的回调,增加了消息处理的复杂性。

消息结构示例 (Map):

Key Value Type Description
'type' String 消息类型(例如 'progress', 'result', 'error'
'id' int or String 关联到特定请求的唯一ID
'data' Object? 实际的回调数据(例如进度值、结果对象)
'errorCode' int 错误码(如果有的话)
// 在NativeDownloader类中
// 假设_sendPortToService是发送给原生库的SendPort,它属于一个ServiceReceivePort
// 原生代码会把这个SendPort的nativePort存起来,并用它来发送所有回调

// Dart跳板回调 (只有一个,处理所有事件)
@pragma('vm:entry-point')
void _unifiedTrampoline(int port, int eventType, int requestId, dynamic data) {
  final SendPort sendPort = SendPort.fromRawPort(port);
  if (sendPort == null) return;
  sendPort.send({'type': eventType, 'id': requestId, 'data': data});
}

// FFI服务类
class FfiService {
  final ReceivePort _serviceReceivePort = ReceivePort();
  final Map<int, Completer<dynamic>> _requestCompleters = {};
  int _nextRequestId = 0;

  FfiService() {
    _serviceReceivePort.listen((message) {
      final Map<String, dynamic> msg = message as Map<String, dynamic>;
      final int requestId = msg['id'] as int;
      final int eventType = msg['type'] as int; // 1: progress, 2: result, 3: error
      final dynamic data = msg['data'];

      if (_requestCompleters.containsKey(requestId)) {
        final completer = _requestCompleters[requestId]!;
        if (eventType == 1) { // Progress
          // 可以通过StreamController发送进度
        } else if (eventType == 2) { // Result
          completer.complete(data);
          _requestCompleters.remove(requestId);
        } else if (eventType == 3) { // Error
          completer.completeError(data);
          _requestCompleters.remove(requestId);
        }
      }
    });

    // 将_serviceReceivePort.sendPort.nativePort传递给原生库注册
    // _registerUnifiedCallback(_unifiedTrampolinePtr, _serviceReceivePort.sendPort.nativePort);
  }

  Future<dynamic> performOperation(String operationName, dynamic args) async {
    final requestId = _nextRequestId++;
    final completer = Completer<dynamic>();
    _requestCompleters[requestId] = completer;

    // 调用原生FFI函数,传入 requestId 和操作参数
    // _startNativeOperation(requestId, operationName.toNativeUtf8(), ...);

    return completer.future;
  }

  void dispose() {
    _serviceReceivePort.close();
    // 清理所有未完成的completers
  }
}

适用场景:复杂的Ffi库,需要管理多种类型事件和多个并发请求,例如一个图像处理库可能同时进行多个图像的滤镜操作,每个操作都有进度和最终结果。

4.3.3 与StreamController结合

ReceivePort本身就是一个Stream,但它是一个单订阅流。如果需要多订阅(例如,多个UI组件监听同一个FFI事件),或者需要更细粒度地控制流的生命周期,可以将其包装到StreamController中。

class FfiManager {
  final ReceivePort _receivePort = ReceivePort();
  // 使用一个BroadcastStreamController来允许多个监听器
  final StreamController<double> _progressController = StreamController<double>.broadcast();

  Stream<double> get progressStream => _progressController.stream;

  FfiManager() {
    // 监听ReceivePort,并将收到的消息添加到StreamController
    _receivePort.listen((message) {
      if (message is double) {
        _progressController.add(message);
      } else if (message is String && message == 'done') {
        _progressController.close(); // 当收到“完成”消息时关闭流
        _receivePort.close(); // 同时关闭ReceivePort
      }
    });

    // 将_receivePort.sendPort.nativePort传递给原生库注册
    // _registerSomeCallback(_progressTrampolinePtr, _receivePort.sendPort.nativePort);
  }

  void dispose() {
    // 确保在管理器销毁时关闭所有资源
    _progressController.close();
    _receivePort.close();
  }
}

通过StreamController,我们可以对事件流进行更多的控制,例如在某个条件满足时关闭流,或者在流中插入额外的处理逻辑。

4.4 Isolate.spawn与后台Isolate

对于耗时的FFI操作,即使回调最终通过SendPort回到主UI Isolate,FFI调用的发起本身也可能阻塞UI。在这种情况下,应该在后台Isolate中执行FFI调用。

  1. 主Isolate
    • 创建自己的ReceivePort,用于接收后台Isolate的SendPort和最终结果。
    • Isolate.spawn启动后台Isolate,并把自己的SendPort传给后台Isolate。
  2. 后台Isolate
    • 创建自己的ReceivePort
    • 把自己的SendPort发送回主Isolate。
    • 接收主Isolate发来的FFI请求。
    • 为每个FFI请求,创建它自己的ReceivePort,并将其SendPort.nativePort传递给原生。
    • 后台Isolate的FFI回调跳板会接收原生数据,并通过FFI请求对应的SendPort发回给后台Isolate的ReceivePort
    • 后台Isolate处理完数据后,将最终结果发回主Isolate。

这是一个多层通信的复杂结构,但其核心仍然是SendPortReceivePort的协同工作。

4.5 NativeCallable (Dart 3.x+)

在Dart 3.x及更高版本中,NativeCallable是创建原生可调用Dart回调的推荐方式,它提供了比Pointer.fromFunction更好的生命周期管理和类型安全性。

NativeCallable可以从一个Dart函数或一个StaticDartRuntime.postCObject回调创建。

import 'dart:ffi';
import 'dart:isolate';
import 'package:ffi/ffi.dart';

// 假设我们有一个名为'my_native_lib'的库,其中包含一个'set_callback'函数
// 和一个'trigger_callback'函数
final DynamicLibrary _nativeLib = DynamicLibrary.process(); // 假设库已经加载

typedef NativeSetCallback = Void Function(Pointer<NativeFunction<NativeCallable<Void Function(Int64, Double)>>> cb_ptr);
typedef DartSetCallback = void Function(Pointer<NativeFunction<NativeCallable<Void Function(Int64, Double)>>> cb_ptr);

typedef NativeTriggerCallback = Void Function(Int64 port_id, Double value);
typedef DartTriggerCallback = void Function(int port_id, double value);

final _setCallback = _nativeLib.lookupFunction<NativeSetCallback, DartSetCallback>('set_callback');
final _triggerCallback = _nativeLib.lookupFunction<NativeTriggerCallback, DartTriggerCallback>('trigger_callback');

// 我们的Dart回调函数
@pragma('vm:entry-point')
void _myDartCallback(int port, double value) {
  final SendPort sendPort = SendPort.fromRawPort(port);
  if (sendPort == null) {
    print('Error: Could not reconstruct SendPort from raw port: $port');
    return;
  }
  sendPort.send(value);
}

void main() {
  // 1. 创建一个NativeCallable
  final nativeCallable = NativeCallable<Void Function(Int64, Double)>.listener(_myDartCallback);

  // 2. 将NativeCallable的指针传递给原生代码
  _setCallback(nativeCallable.nativeFunction);

  // 3. 准备一个ReceivePort来接收回调
  final receivePort = ReceivePort();
  receivePort.listen((message) {
    print('Received from native via NativeCallable: $message');
    if (message == 100.0) {
      // 达到100%时,关闭port并释放NativeCallable
      receivePort.close();
      nativeCallable.close();
      print('NativeCallable and ReceivePort closed.');
    }
  });

  // 4. 触发原生回调,并传入SendPort的nativePort
  _triggerCallback(receivePort.sendPort.nativePort, 0.0); // 假设原生会逐步增加这个值
}

NativeCallable.listener创建的回调会确保在回调执行时,Dart运行时是活跃的。NativeCallable.close()方法用于在不再需要回调时安全地释放原生资源。这使得NativeCallable在管理FFI回调的生命周期方面更加健壮和方便。它的使用模式与Pointer.fromFunction非常相似,但更推荐。


5. ReceivePort管理总结

Dart FFI中的异步回调是连接高性能原生代码与响应式Dart应用的关键环节。ReceivePort是实现这一机制的核心组件,它允许不同Isolate间进行安全、高效的消息传递。

妥善管理ReceivePort的生命周期至关重要:

  • 创建:根据业务需求,为每个异步操作创建独立的ReceivePort,或使用一个长生命周期的服务端口。
  • 传递:将ReceivePortSendPort.nativePort作为int64传递给原生代码。
  • 跳板:实现一个Dart顶层/静态函数作为原生回调的“跳板”,它负责从int64重建SendPort并发送消息。
  • 监听:在目标Isolate中监听ReceivePort的Stream,处理接收到的消息。
  • 销毁:在操作完成、出错或不再需要时,务必调用receivePort.close()以释放资源,避免内存泄漏和僵尸Isolate。

通过理解和实践这些原则,你将能够构建出健壮、高效且易于维护的Dart FFI异步回调系统。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注