Dart Isolate 的 Actor 模型:基于消息传递的并发与状态隔离

尊敬的各位同仁,女士们,先生们,

欢迎来到今天的讲座。我们将深入探讨一个在现代并发编程中至关重要且日益流行的范式:Actor 模型,以及它如何在 Dart 语言的强大 Isolate 机制中得到优雅而高效的实现。随着多核处理器的普及,如何有效地利用计算资源,同时避免并发编程常见的陷阱,成为了软件工程师面临的核心挑战。今天,我们将看到 Dart Isolate 和 Actor 模型的结合,如何为我们提供一个清晰、健壮且可扩展的解决方案,以实现基于消息传递的并发与彻底的状态隔离。

1. 并发编程的挑战与Dart的独特视角

在软件开发的历史长河中,并发编程一直是把双刃剑。它承诺了更高的吞吐量、更快的响应速度和更好的资源利用率,但也常常伴随着复杂性、难以调试的错误和系统的不稳定性。

1.1 共享内存并发的困境

传统的并发模型,尤其是基于线程和共享内存的模型,是许多问题的根源。在这种模型中,多个线程可以同时访问和修改同一块内存区域。这导致了以下一系列臭名昭著的问题:

  • 竞态条件 (Race Conditions): 当多个线程尝试同时访问和修改共享数据时,最终结果取决于线程执行的非确定性顺序。这可能导致数据不一致或程序行为异常。
  • 死锁 (Deadlocks): 两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行。
  • 活锁 (Livelocks): 线程虽然没有阻塞,但却在不断地改变状态以响应其他线程,却始终无法完成实际工作,像是在“礼貌地”互相避让,最终谁也过不去。
  • 数据损坏 (Data Corruption): 在没有适当同步机制的情况下,对共享数据的并发写入可能导致数据被部分更新或覆盖,从而使其处于无效状态。
  • 复杂性与维护成本: 为了避免上述问题,开发者不得不引入锁 (locks)、互斥量 (mutexes)、信号量 (semaphores) 等同步原语。这些机制本身就增加了代码的复杂性,且极易出错。一个不当的锁粒度、一个遗漏的锁,都可能导致严重的并发问题。

尽管这些机制在某些场景下是必要的,但它们强迫我们以一种非局部的方式思考程序的状态,使得推理并发行为变得异常困难。

1.2 Dart的并发哲学:无共享内存的独立世界

Dart 语言的设计者深刻理解了共享内存并发的痛点,并选择了另一条路径:Isolates。Dart 的并发模型与许多其他语言(如 Java、C#)的线程模型截然不同。

什么是 Isolate?

一个 Dart Isolate 可以被理解为一个完全独立的 Dart 运行时实例。它拥有自己独立的内存堆 (heap) 和事件循环 (event loop)。这意味着,Isates之间不共享任何可变状态。一个 Isolate 无法直接访问另一个 Isolate 的内存,也无法直接修改另一个 Isolate 中的对象。这从根本上杜绝了竞态条件、死锁和数据损坏的根源,因为根本不存在“共享可变状态”的概念。

这种设计哲学带来了巨大的好处:

  • 彻底的状态隔离 (Complete State Isolation): 每个 Isolate 都是一个独立的沙箱。你无需担心一个 Isolate 中的错误或不当操作会影响到另一个 Isolate 的内部状态。
  • 简化并发推理: 由于没有共享内存,你可以在很大程度上将每个 Isolate 的逻辑视为独立的、顺序执行的单元。你只需要关注它们之间的通信方式。
  • 真正的并行性 (True Parallelism): 在多核处理器上,不同的 Isolate 可以同时在不同的 CPU 核上运行,从而实现真正的并行计算,而不仅仅是并发。

Isolate 之间的通信:消息传递

如果 Isolate 之间不共享内存,那么它们如何协作呢?答案是:消息传递 (Message Passing)。Isolate 通过发送和接收消息来进行通信。Dart 提供了 SendPortReceivePort 这两个核心抽象来支持这种通信。

  • ReceivePort: 这是一个监听器,用于接收从其他 Isolate 发送过来的消息。当你创建一个 ReceivePort 时,它会暴露一个对应的 SendPort
  • SendPort: 这是一个句柄,可以用来向某个特定的 ReceivePort 发送消息。你可以将 SendPort 传递给其他 Isolate,以便它们可以向你的 Isolate 发送消息。

消息通过 Isolate 之间的通信通道进行传递。当一个 Isolate 发送一个对象作为消息时,Dart 运行时会尝试序列化 (serialize) 该对象,并在接收 Isolate 中反序列化 (deserialize) 它。这意味着,发送的不是对象的引用,而是其内容的副本。这进一步强化了状态隔离的原则。

让我们看一个最基本的 Isolate 通信示例:

import 'dart:isolate';

// Isolate 内部执行的函数
void isolateEntry(SendPort mainSendPort) {
  // Isolate 自己的接收端口,用于接收来自 main Isolate 的消息
  ReceivePort isolateReceivePort = ReceivePort();
  // 将自己的 SendPort 发送给 main Isolate,以便 main Isolate 可以向它发送消息
  mainSendPort.send(isolateReceivePort.sendPort);

  print('Isolate: 启动成功,正在等待消息...');

  // 监听来自 main Isolate 的消息
  isolateReceivePort.listen((message) {
    if (message is String) {
      print('Isolate: 收到消息 "$message"');
      if (message == 'Hello from main!') {
        // 回复 main Isolate
        mainSendPort.send('Hello back from Isolate!');
      } else if (message == 'stop') {
        print('Isolate: 收到停止指令,正在关闭...');
        isolateReceivePort.close(); // 关闭自己的接收端口
        Isolate.current.kill(priority: Isolate.beforeNextEvent); // 终止 Isolate
      }
    } else if (message is SendPort) {
      // 理论上,Isolate 也可以接收 SendPort,但这里为了演示简洁,我们假设只接收字符串。
      // 在更复杂的场景中,这很常见,例如 Actor 获取其他 Actor 的引用。
      print('Isolate: 收到一个 SendPort,通常用于建立双向通信或将此Isolate的SendPort传递给其他actor。');
    }
  });
}

void main() async {
  print('Main Isolate: 启动...');

  // 创建一个接收端口,用于接收来自新 Isolate 的消息(包括它的 SendPort)
  ReceivePort mainReceivePort = ReceivePort();

  // 启动一个新的 Isolate,并传递 mainReceivePort 的 SendPort 给它
  // 新 Isolate 会用这个 SendPort 来回复 main Isolate
  Isolate newIsolate = await Isolate.spawn(isolateEntry, mainReceivePort.sendPort);

  print('Main Isolate: 等待 Isolate 的 SendPort...');

  // 等待 Isolate 发送回它的 SendPort
  SendPort? isolateSendPort;
  await for (var message in mainReceivePort) {
    if (message is SendPort) {
      isolateSendPort = message;
      print('Main Isolate: 收到 Isolate 的 SendPort.');
      break; // 收到 SendPort 后即可跳出循环
    } else if (message is String) {
      print('Main Isolate: 收到 Isolate 的回复: "$message"');
    }
  }

  if (isolateSendPort != null) {
    // 现在我们可以向 Isolate 发送消息了
    print('Main Isolate: 向 Isolate 发送消息 "Hello from main!"');
    isolateSendPort.send('Hello from main!');

    // 等待 Isolate 的回复
    await for (var message in mainReceivePort) {
      if (message is String) {
        print('Main Isolate: 收到 Isolate 的回复: "$message"');
        if (message == 'Hello back from Isolate!') {
          // 发送停止指令
          print('Main Isolate: 向 Isolate 发送停止指令...');
          isolateSendPort.send('stop');
          break; // 收到回复后停止监听
        }
      }
    }
  }

  // 关闭主 Isolate 的接收端口
  mainReceivePort.close();
  // 终止 Isolate (如果它自己没有终止的话,这里确保清理)
  // newIsolate.kill(); // 通常在 Isolate 内部处理关闭,这里只是作为兜底
  print('Main Isolate: 结束。');
}

这段代码展示了 Isolate 如何通过 SendPortReceivePort 建立双向通信。主 Isolate 启动子 Isolate,并将自己的 SendPort 传递过去。子 Isolate 收到后,再将自己的 SendPort 回传给主 Isolate。一旦双方都拥有了对方的 SendPort,就可以自由地发送消息了。这种模式是构建基于 Actor 模型的基石。

2. Actor 模型:并发的优雅范式

理解了 Dart Isolate 的基础后,我们现在可以引入 Actor 模型。Actor 模型提供了一种高度抽象和强大的方式来思考和构建并发系统,它与 Dart Isolate 的设计理念不谋而合。

2.1 Actor 的定义与核心原则

Actor 模型由 Carl Hewitt 在 1973 年提出,其核心思想是:一切皆 Actor。一个 Actor 是并发计算的基本单元。它具有以下几个核心特征:

  • 封装状态 (Encapsulates State): 每个 Actor 都有自己的私有状态。这个状态只能由 Actor 自身访问和修改。这是 Actor 模型实现状态隔离的关键。
  • 行为 (Behavior): Actor 定义了如何响应接收到的消息。当 Actor 收到消息时,它会执行其内部逻辑,根据消息内容可能修改其内部状态。
  • 邮箱 (Mailbox): 每个 Actor 都有一个邮箱,用于接收来自其他 Actor 的消息。消息是异步地发送到邮箱中的,Actor 会顺序地处理邮箱中的消息,一次处理一个。
  • 消息传递 (Message Passing): Actor 之间只能通过异步发送消息进行通信。一个 Actor 无法直接调用另一个 Actor 的方法,也无法直接访问或修改另一个 Actor 的状态。
  • 创建新的 Actor (Create New Actors): Actor 可以创建其他 Actor。
  • 改变自身行为 (Change Own Behavior): Actor 在处理消息后,可以改变其处理后续消息的行为。

核心原则总结:

  1. 彻底的状态隔离: Actor 的内部状态是私有的,只能由 Actor 自身访问和修改。这消除了竞态条件和死锁的可能。
  2. 异步消息传递: Actor 之间通过异步消息进行通信。发送消息是非阻塞的,发送者不需要等待接收者处理消息。
  3. 单线程处理消息: 每个 Actor 在任何给定时间点都只处理一个消息。这意味着 Actor 内部的代码是顺序执行的,无需担心并发访问其内部状态。
  4. 无共享可变状态: 这是 Actor 模型与 Dart Isolate 完美契合的核心。

2.2 Actor 模型的优势

Actor 模型带来了显著的优势,尤其是在构建大规模、高并发和分布式系统时:

  • 简化并发推理: 由于状态隔离和单线程处理消息的特性,开发者可以专注于单个 Actor 的逻辑,而无需担心全局的并发问题。这极大地降低了心智负担。
  • 高可伸缩性 (High Scalability): Actor 是轻量级的,可以创建成千上万个 Actor。由于它们通过消息传递通信,可以很容易地将 Actor 分布到不同的处理器核心甚至不同的机器上。
  • 高容错性 (High Fault Tolerance): 如果一个 Actor 崩溃,它不会直接影响到其他 Actor 的状态。通过“监督者 (Supervisor)”模式(即一个 Actor 负责监控和重启其子 Actor),可以构建出高度弹性的系统。
  • 解耦 (Decoupling): Actor 之间只通过消息接口进行交互,它们彼此之间高度解耦,易于独立开发、测试和维护。
  • 响应性 (Responsiveness): 异步消息传递使得系统能够保持响应性,不会因为某个 Actor 的阻塞而导致整个系统停滞。

2.3 Actor 模型与传统并发模型的对比

下表总结了 Actor 模型与传统共享内存并发模型在关键方面的对比:

特性 传统共享内存模型 (e.g., 线程+锁) Actor 模型 (e.g., Dart Isolate + Actor)
状态管理 共享可变状态 彻底的状态隔离
通信方式 直接方法调用,共享内存读写 异步消息传递
同步机制 锁、互斥量、信号量、条件变量 无需显式同步,Actor 内部顺序处理消息
并发问题 竞态条件、死锁、活锁、数据损坏 基本消除竞态条件和死锁
可伸缩性 随着锁竞争增加,伸缩性受限 高度可伸缩,易于分布式
容错性 错误可能蔓延到整个系统 错误隔离,易于通过监督者模式恢复
编程复杂性 高,需要关注底层同步细节 相对低,关注业务逻辑和消息流

3. 使用 Dart Isolates 实现 Actor 模型

现在我们已经理解了 Dart Isolate 的工作原理和 Actor 模型的核心概念,是时候将它们结合起来了。Dart Isolate 提供了一个天然的、强大的运行时环境来实现 Actor 模型。

3.1 核心映射关系

  • 一个 Actor (的实例): 在 Dart 中,一个 Actor 通常由一个独立的 Isolate 实例承载。这个 Isolate 运行着 Actor 的逻辑。
  • Actor 的地址/引用 (ActorRef): 一个 ActorRef 是一个 SendPort 的包装。它允许你向特定的 Actor 发送消息。
  • Actor 的邮箱 (Mailbox): 每个 Actor 内部会持有一个 ReceivePort。这个 ReceivePort 充当 Actor 的邮箱,接收所有发往该 Actor 的消息。
  • Actor 的消息 (Message): 任何可以被 Dart Isolate 跨 Isolate 传递的对象都可以作为 Actor 的消息。通常,我们会定义特定的消息类来表示不同的操作或事件。
  • Actor 的行为 (Behavior): 这是 Actor 内部处理消息的逻辑。它通常是一个 switchif-else if 结构,根据消息类型执行相应的操作。

3.2 设计一个基础 Actor 框架

为了方便地创建和管理 Actor,我们可以设计一个简单的 Actor 框架。

3.2.1 消息定义

首先,定义一个抽象的 ActorMessage 类,作为所有 Actor 消息的基类。这有助于类型安全和模式匹配。

// actor_messages.dart
abstract class ActorMessage {}

// 示例消息:用于请求一个计算任务
class ComputeMessage extends ActorMessage {
  final int value;
  final SendPort? replyTo; // 用于回复的端口

  ComputeMessage(this.value, {this.replyTo});

  @override
  String toString() => 'ComputeMessage(value: $value, replyTo: $replyTo)';
}

// 示例消息:用于通知一个Actor停止
class StopActorMessage extends ActorMessage {
  @override
  String toString() => 'StopActorMessage';
}

// 示例消息:用于请求Actor的当前状态
class GetStateMessage extends ActorMessage {
  final SendPort replyTo;

  GetStateMessage(this.replyTo);

  @override
  String toString() => 'GetStateMessage(replyTo: $replyTo)';
}

3.2.2 Actor 引用 (ActorRef)

ActorRef 封装了 SendPort,提供一个类型安全的发送消息的接口。

// actor_ref.dart
import 'dart:isolate';
import 'actor_messages.dart';

/// ActorRef 是一个句柄,用于向 Actor 发送消息。
/// 它封装了底层 Isolate 的 SendPort。
class ActorRef {
  final SendPort _sendPort;

  ActorRef(this._sendPort);

  /// 向 Actor 发送一个消息。
  /// 消息必须是可跨 Isolate 传递的对象。
  void tell(ActorMessage message) {
    try {
      _sendPort.send(message);
    } catch (e) {
      print('Error sending message to Actor: $e');
      // 可以在这里添加更复杂的错误处理,例如Actor是否已死亡
    }
  }

  /// 异步地向 Actor 发送消息并等待回复。
  /// 这需要 Actor 在处理消息时将回复发送到提供的 replyPort。
  Future<T> ask<T>(ActorMessage message, {Duration? timeout}) async {
    final ReceivePort replyPort = ReceivePort();
    final SendPort replyTo = replyPort.sendPort;

    // 修改消息,使其包含 replyTo 端口
    // 注意:这里需要消息类型支持这种修改,或者提供一个工厂方法
    // 假设我们的消息支持通过构造函数传入replyTo
    ActorMessage messageWithReplyTo;
    if (message is ComputeMessage) {
      messageWithReplyTo = ComputeMessage(message.value, replyTo: replyTo);
    } else if (message is GetStateMessage) {
      messageWithReplyTo = GetStateMessage(replyTo);
    } else {
      // 对于不支持 replyTo 的消息类型,可以抛出错误或提供默认行为
      replyPort.close();
      throw ArgumentError('Message type does not support replyTo for ask pattern.');
    }

    tell(messageWithReplyTo);

    try {
      var future = replyPort.first.timeout(timeout ?? const Duration(seconds: 5));
      var result = await future;
      if (result is T) {
        return result;
      } else {
        throw Exception('Unexpected reply type: ${result.runtimeType}, expected $T');
      }
    } on TimeoutException {
      throw TimeoutException('Actor did not reply within the given timeout.', timeout);
    } finally {
      replyPort.close();
    }
  }

  // 可以重写 equals 和 hashCode,以便在集合中使用 ActorRef
  @override
  bool operator ==(Object other) =>
      identical(this, other) ||
      other is ActorRef && runtimeType == other.runtimeType && _sendPort == other._sendPort;

  @override
  int get hashCode => _sendPort.hashCode;
}

ActorRef 提供了 tell 方法用于单向消息发送(“fire and forget”)和 ask 方法用于请求-响应模式。ask 方法在内部创建一个临时的 ReceivePort,将其 SendPort 附加到发送的消息中,并等待 Actor 的回复。

3.2.3 Actor 基类

我们需要一个抽象的 Actor 基类,来处理 Isolate 的启动、消息循环和端口管理。

// actor.dart
import 'dart:async';
import 'dart:isolate';
import 'actor_messages.dart';
import 'actor_ref.dart';

/// Actor 是所有具体 Actor 的基类。
/// 它负责管理 Isolate 的生命周期和消息循环。
abstract class Actor {
  late ReceivePort _receivePort;
  late SendPort _actorSendPort; // Actor 自己的 SendPort,用于发送给其他 Actor 或回复 Ask
  late StreamSubscription _messageSubscription;
  ActorRef? _self; // Actor 自己的引用

  ActorRef get self => _self!;

  /// Actor 的启动方法。
  /// 此方法将在新的 Isolate 中执行。
  static void _startActor(List<Object?> args) async {
    final SendPort mainSendPort = args[0] as SendPort;
    final ActorCreator creator = args[1] as ActorCreator;

    // 创建 Actor 实例
    final Actor actor = creator();

    // 创建自己的接收端口
    actor._receivePort = ReceivePort();
    actor._actorSendPort = actor._receivePort.sendPort;
    actor._self = ActorRef(actor._actorSendPort);

    // 将自己的 SendPort 发送回主 Isolate
    mainSendPort.send(actor._actorSendPort);

    // 打印启动信息
    print('${actor.runtimeType} Actor: Started on Isolate ${Isolate.current.debugName}');

    // 调用 Actor 生命周期方法
    await actor.preStart();

    // 监听消息
    actor._messageSubscription = actor._receivePort.listen((message) async {
      if (message is ActorMessage) {
        if (message is StopActorMessage) {
          print('${actor.runtimeType} Actor: Received StopActorMessage. Shutting down...');
          await actor.postStop();
          actor._messageSubscription.cancel();
          actor._receivePort.close();
          Isolate.current.kill(priority: Isolate.beforeNextEvent);
        } else {
          // 处理具体消息
          await actor.onMessage(message);
        }
      } else {
        print('${actor.runtimeType} Actor: Received unknown message type: ${message.runtimeType}');
      }
    }, onError: (error) {
      print('${actor.runtimeType} Actor: Error in message stream: $error');
      actor.onError(error);
    }, onDone: () {
      print('${actor.runtimeType} Actor: Message stream done.');
      actor.postStop();
    });
  }

  /// 在 Actor 启动前执行的初始化逻辑。
  /// 子类可以重写此方法。
  Future<void> preStart() async {
    print('${runtimeType} Actor: preStart called.');
  }

  /// 在 Actor 停止后执行的清理逻辑。
  /// 子类可以重写此方法。
  Future<void> postStop() async {
    print('${runtimeType} Actor: postStop called.');
  }

  /// 抽象方法,子类必须实现以处理接收到的消息。
  Future<void> onMessage(ActorMessage message);

  /// 处理消息流中的错误。
  void onError(dynamic error) {
    print('${runtimeType} Actor: Unhandled error: $error');
  }
}

/// ActorCreator 是一个工厂函数类型,用于创建 Actor 实例。
typedef ActorCreator = Actor Function();

这个 Actor 基类承担了以下职责:

  • _startActor 静态方法: 这是 Isolate.spawn 调用的入口点。它负责创建 Actor 实例、设置其 ReceivePort、将 SendPort 回传给父 Isolate,并启动消息监听循环。
  • _receivePort_messageSubscription: 管理 Actor 的消息邮箱和消息流订阅。
  • preStart()postStop(): 生命周期钩子,允许子类在 Actor 启动和停止时执行自定义逻辑。
  • onMessage(ActorMessage message): 抽象方法,这是子类实现其核心业务逻辑的地方。
  • onError(dynamic error): 错误处理钩子。
  • StopActorMessage 处理: 基类自动处理 StopActorMessage 以实现优雅停机。

3.2.4 Actor 系统 (ActorSystem)

ActorSystem 是管理和创建 Actor 的中心组件。

// actor_system.dart
import 'dart:async';
import 'dart:isolate';
import 'actor.dart';
import 'actor_ref.dart';
import 'actor_messages.dart';

/// ActorSystem 负责管理 Actor 的生命周期和提供 ActorRef。
class ActorSystem {
  final String name;
  final Map<String, ActorRef> _actorRegistry = {}; // 存储 ActorRef 以便通过名称查找

  ActorSystem(this.name);

  /// 启动一个新的 Actor。
  /// [name] 是 Actor 的唯一标识符。
  /// [creator] 是一个工厂函数,用于创建 Actor 实例。
  Future<ActorRef> spawn<T extends Actor>(String name, ActorCreator creator) async {
    if (_actorRegistry.containsKey(name)) {
      throw ArgumentError('Actor with name "$name" already exists.');
    }

    final ReceivePort mainReceivePort = ReceivePort();
    final SendPort mainSendPort = mainReceivePort.sendPort;

    // 启动 Isolate,并传递 mainSendPort 和 ActorCreator
    await Isolate.spawn(Actor._startActor, [mainSendPort, creator], debugName: name);

    // 等待 Actor 将其 SendPort 发送回来
    // 使用 Completer 来等待第一个 SendPort 消息
    final Completer<ActorRef> completer = Completer<ActorRef>();
    late StreamSubscription subscription;

    subscription = mainReceivePort.listen((message) {
      if (message is SendPort) {
        final ActorRef actorRef = ActorRef(message);
        _actorRegistry[name] = actorRef;
        completer.complete(actorRef);
        subscription.cancel(); // 收到 SendPort 后取消订阅
        mainReceivePort.close(); // 关闭临时的接收端口
      } else {
        // 可以在这里处理其他来自 Isolate 的早期消息
        print('ActorSystem: Received unexpected message from $name Isolate during spawn: $message');
      }
    }, onError: (error) {
      completer.completeError(error);
      subscription.cancel();
      mainReceivePort.close();
    });

    return completer.future;
  }

  /// 通过名称查找已注册的 ActorRef。
  ActorRef? getActor(String name) {
    return _actorRegistry[name];
  }

  /// 停止一个 Actor。
  /// 向 Actor 发送 StopActorMessage。
  Future<void> stop(String name) async {
    final ActorRef? actorRef = _actorRegistry[name];
    if (actorRef != null) {
      print('ActorSystem: Sending StopActorMessage to Actor "$name"...');
      actorRef.tell(StopActorMessage());
      _actorRegistry.remove(name);
      // 可以添加更复杂的逻辑,例如等待 Actor 真正停止
      // 但对于简单演示,发送消息后移除即可
    } else {
      print('ActorSystem: Actor "$name" not found.');
    }
  }

  /// 停止所有 Actor。
  Future<void> shutdown() async {
    print('ActorSystem: Shutting down all actors...');
    for (final name in _actorRegistry.keys.toList()) {
      await stop(name);
    }
    _actorRegistry.clear();
    print('ActorSystem: All actors shut down.');
  }
}

ActorSystem 提供 spawn 方法来创建 Actor,并维护一个 _actorRegistry 来通过名称查找 ActorRefspawn 方法处理了 Isolate 启动的异步性,确保在返回 ActorRef 之前,Actor 已经成功启动并将其 SendPort 回传。

3.3 示例:一个计数器 Actor

现在,让我们创建一个具体的 Actor:一个简单的计数器。

// counter_actor.dart
import 'actor.dart';
import 'actor_messages.dart'; // 确保引入了 ActorMessage
import 'dart:isolate'; // 引入 Isolate 以便使用 SendPort

// 定义计数器 Actor 的特定消息
class IncrementCounter extends ActorMessage {
  @override
  String toString() => 'IncrementCounter';
}

class DecrementCounter extends ActorMessage {
  @override
  String toString() => 'DecrementCounter';
}

class GetCounterValue extends ActorMessage {
  final SendPort replyTo; // 用于回复 Ask 请求

  GetCounterValue(this.replyTo);

  @override
  String toString() => 'GetCounterValue(replyTo: $replyTo)';
}

// CounterActor 实现
class CounterActor extends Actor {
  int _count = 0; // Actor 的私有状态

  @override
  Future<void> preStart() async {
    await super.preStart();
    print('CounterActor: Initialized with count $_count.');
  }

  @override
  Future<void> onMessage(ActorMessage message) async {
    if (message is IncrementCounter) {
      _count++;
      print('CounterActor: Incremented count to $_count.');
    } else if (message is DecrementCounter) {
      _count--;
      print('CounterActor: Decremented count to $_count.');
    } else if (message is GetCounterValue) {
      // 回复 ask 请求
      print('CounterActor: Responding with current count: $_count.');
      message.replyTo.send(_count);
    } else {
      print('CounterActor: Unhandled message: $message');
    }
  }

  @override
  Future<void> postStop() async {
    await super.postStop();
    print('CounterActor: Final count was $_count.');
  }
}

3.3.1 主程序与 CounterActor 交互

// main.dart
import 'dart:async';
import 'actor_system.dart';
import 'counter_actor.dart'; // 引入 CounterActor 和其消息

void main() async {
  print('Main Isolate: Starting Actor System...');
  final ActorSystem system = ActorSystem('MySystem');

  // 1. 启动 CounterActor
  print('Main Isolate: Spawning CounterActor...');
  final counterActorRef = await system.spawn('myCounter', () => CounterActor());
  print('Main Isolate: CounterActor spawned.');

  // 2. 发送 Increment 消息 (tell - fire and forget)
  print('Main Isolate: Sending 3 Increment messages...');
  counterActorRef.tell(IncrementCounter());
  counterActorRef.tell(IncrementCounter());
  counterActorRef.tell(IncrementCounter());
  // 稍微等待一下,让 Actor 有时间处理消息
  await Future.delayed(Duration(milliseconds: 100));

  // 3. 发送 Decrement 消息
  print('Main Isolate: Sending 1 Decrement message...');
  counterActorRef.tell(DecrementCounter());
  await Future.delayed(Duration(milliseconds: 100));

  // 4. 使用 ask 模式获取当前计数
  print('Main Isolate: Asking CounterActor for current value...');
  try {
    final currentCount = await counterActorRef.ask<int>(GetCounterValue(system.getActor('myCounter')!.self._sendPort), timeout: Duration(seconds: 2));
    print('Main Isolate: CounterActor replied with current value: $currentCount');
  } on TimeoutException {
    print('Main Isolate: Failed to get counter value: Timeout occurred.');
  } catch (e) {
    print('Main Isolate: Error asking CounterActor: $e');
  }

  // 5. 再次发送消息并获取
  print('Main Isolate: Sending another Increment and asking again...');
  counterActorRef.tell(IncrementCounter());
  await Future.delayed(Duration(milliseconds: 100)); // 确保增量消息被处理
  try {
    final finalCount = await counterActorRef.ask<int>(GetCounterValue(system.getActor('myCounter')!.self._sendPort), timeout: Duration(seconds: 2));
    print('Main Isolate: CounterActor replied with final value: $finalCount');
  } on TimeoutException {
    print('Main Isolate: Failed to get final counter value: Timeout occurred.');
  } catch (e) {
    print('Main Isolate: Error asking CounterActor: $e');
  }

  // 6. 停止 Actor
  print('Main Isolate: Stopping CounterActor...');
  await system.stop('myCounter');

  // 7. 尝试向已停止的 Actor 发送消息 (会失败)
  print('Main Isolate: Attempting to send message to stopped Actor...');
  counterActorRef.tell(IncrementCounter()); // 这条消息可能会被丢弃或导致 Isolate 错误,取决于 Isolate 停止的实际时机

  print('Main Isolate: Actor System shut down.');
}

运行结果分析:

当你运行 main.dart 时,你会看到类似以下的输出(顺序可能略有不同,因为涉及并发和异步):

Main Isolate: Starting Actor System...
Main Isolate: Spawning CounterActor...
CounterActor Actor: preStart called.
CounterActor Actor: Initialized with count 0.
Main Isolate: CounterActor spawned.
Main Isolate: Sending 3 Increment messages...
CounterActor: Incremented count to 1.
CounterActor: Incremented count to 2.
CounterActor: Incremented count to 3.
Main Isolate: Sending 1 Decrement message...
CounterActor: Decremented count to 2.
Main Isolate: Asking CounterActor for current value...
CounterActor: Responding with current count: 2.
Main Isolate: CounterActor replied with current value: 2
Main Isolate: Sending another Increment and asking again...
CounterActor: Incremented count to 3.
CounterActor: Responding with current count: 3.
Main Isolate: CounterActor replied with final value: 3
Main Isolate: Stopping CounterActor...
ActorSystem: Sending StopActorMessage to Actor "myCounter"...
CounterActor Actor: Received StopActorMessage. Shutting down...
CounterActor Actor: postStop called.
CounterActor Actor: Final count was 3.
CounterActor Actor: Message stream done.
Main Isolate: Attempting to send message to stopped Actor...
Error sending message to Actor: Invalid argument(s): Cannot send messages to a closed port.
Main Isolate: Actor System shut down.

这个例子清晰地展示了:

  • 状态隔离: _count 变量只在 CounterActor 所在的 Isolate 内部被访问和修改。主 Isolate 无法直接操作它。
  • 消息传递: 主 Isolate 通过 ActorRef.tellActorRef.askCounterActor 发送消息。
  • 请求-响应: ask 方法允许主 Isolate 发送 GetCounterValue 消息并等待 CounterActor 回复其当前计数。
  • Actor 生命周期: preStartpostStop 钩子被调用,StopActorMessage 触发了 Actor 的优雅关闭。
  • 错误处理: 尝试向已关闭的端口发送消息会抛出异常,这表明 Isolate 间的通信通道已断开。

3.4 进阶通信模式

3.4.1 One-Way Messaging (Tell)

我们已经看到了 ActorRef.tell()。这是最简单的通信模式,发送者不关心接收者何时处理消息,也不期望任何回复。适用于通知、事件广播等场景。

3.4.2 Request-Response (Ask)

ActorRef.ask() 模式非常常见,它允许发送者发送消息并等待一个回复。这在需要获取 Actor 状态或计算结果时非常有用。其实现的关键在于在发送消息时附带一个临时的 SendPort 作为回复地址。

3.4.3 Actor 间通信

Actor 不仅可以与主 Isolate 通信,它们也可以相互通信。一个 Actor 可以在其 onMessage 方法中接收另一个 Actor 的 ActorRef,然后使用这个 ActorRef 来发送消息。

例如,如果我们有一个 ManagerActor,它可以创建并管理多个 WorkerActor

// worker_actor.dart
import 'actor.dart';
import 'actor_messages.dart';
import 'actor_ref.dart';
import 'dart:async';

class WorkMessage extends ActorMessage {
  final int input;
  final ActorRef replyTo; // 回复给谁

  WorkMessage(this.input, this.replyTo);
}

class WorkDoneMessage extends ActorMessage {
  final int result;
  final String workerName;

  WorkDoneMessage(this.result, this.workerName);
}

class WorkerActor extends Actor {
  String name;
  WorkerActor(this.name);

  @override
  Future<void> onMessage(ActorMessage message) async {
    if (message is WorkMessage) {
      print('$name WorkerActor: Received work for input ${message.input}.');
      // 模拟耗时计算
      await Future.delayed(Duration(milliseconds: 500));
      int result = message.input * 2;
      print('$name WorkerActor: Finished work, result $result. Replying to ${message.replyTo}.');
      message.replyTo.tell(WorkDoneMessage(result, name));
    } else {
      print('$name WorkerActor: Unhandled message: $message');
    }
  }
}

// manager_actor.dart
import 'actor.dart';
import 'actor_messages.dart';
import 'actor_ref.dart';
import 'actor_system.dart';
import 'worker_actor.dart'; // 引入 WorkerActor 及其消息
import 'dart:async';

class CreateWorkerMessage extends ActorMessage {
  final String workerName;
  CreateWorkerMessage(this.workerName);
}

class StartWorkMessage extends ActorMessage {
  final int input;
  StartWorkMessage(this.input);
}

class ManagerActor extends Actor {
  final ActorSystem _system; // ManagerActor 需要访问 ActorSystem 来创建 Worker
  final Map<String, ActorRef> _workers = {};
  int _completedWorks = 0;

  ManagerActor(this._system);

  @override
  Future<void> onMessage(ActorMessage message) async {
    if (message is CreateWorkerMessage) {
      if (!_workers.containsKey(message.workerName)) {
        print('ManagerActor: Creating worker ${message.workerName}...');
        final workerRef = await _system.spawn(message.workerName, () => WorkerActor(message.workerName));
        _workers[message.workerName] = workerRef;
        print('ManagerActor: Worker ${message.workerName} created.');
      } else {
        print('ManagerActor: Worker ${message.workerName} already exists.');
      }
    } else if (message is StartWorkMessage) {
      if (_workers.isEmpty) {
        print('ManagerActor: No workers available to process work for input ${message.input}.');
        return;
      }
      // 将工作分配给第一个可用的 Worker
      final workerRef = _workers.values.first;
      print('ManagerActor: Assigning work (input: ${message.input}) to ${workerRef}...');
      workerRef.tell(WorkMessage(message.input, self)); // 将 ManagerActor 的 self 引用作为 replyTo
    } else if (message is WorkDoneMessage) {
      _completedWorks++;
      print('ManagerActor: Received work done from ${message.workerName} with result ${message.result}. Total completed: $_completedWorks.');
    } else {
      print('ManagerActor: Unhandled message: $message');
    }
  }

  @override
  Future<void> postStop() async {
    await super.postStop();
    print('ManagerActor: Stopping all managed workers...');
    for (final name in _workers.keys.toList()) {
      await _system.stop(name); // Manager 也需要通知 system 停止其创建的 worker
    }
    print('ManagerActor: Managed workers stopped. Total completed works: $_completedWorks.');
  }
}

// main_manager.dart
import 'dart:async';
import 'actor_system.dart';
import 'manager_actor.dart';
import 'worker_actor.dart'; // 确保引入 WorkDoneMessage

void main() async {
  print('Main Isolate: Starting Actor System...');
  final ActorSystem system = ActorSystem('WorkerSystem');

  // 1. 启动 ManagerActor
  print('Main Isolate: Spawning ManagerActor...');
  final managerActorRef = await system.spawn('myManager', () => ManagerActor(system));
  print('Main Isolate: ManagerActor spawned.');

  // 2. 让 ManagerActor 创建 WorkerActors
  print('Main Isolate: Asking ManagerActor to create Worker1...');
  managerActorRef.tell(CreateWorkerMessage('Worker1'));
  await Future.delayed(Duration(milliseconds: 100)); // 等待 Worker1 启动
  print('Main Isolate: Asking ManagerActor to create Worker2...');
  managerActorRef.tell(CreateWorkerMessage('Worker2'));
  await Future.delayed(Duration(milliseconds: 100)); // 等待 Worker2 启动

  // 3. 让 ManagerActor 分配工作给 WorkerActors
  print('Main Isolate: Asking ManagerActor to start work...');
  managerActorRef.tell(StartWorkMessage(10));
  managerActorRef.tell(StartWorkMessage(20));
  managerActorRef.tell(StartWorkMessage(30));

  await Future.delayed(Duration(seconds: 2)); // 等待工作完成

  // 4. 停止 ManagerActor (它会负责停止其 Worker)
  print('Main Isolate: Stopping ManagerActor...');
  await system.stop('myManager');

  print('Main Isolate: Actor System shut down.');
}

这个 ManagerActorWorkerActor 的例子展示了 Actor 如何相互通信、创建新的 Actor,以及如何处理更复杂的业务流程。WorkerActor 完成工作后,会回复 ManagerActor,而 ManagerActor 则会更新其内部状态 (_completedWorks)。这完美体现了 Actor 模型中“行为”和“状态”的封装。

4. 实际应用场景与最佳实践

Actor 模型与 Dart Isolates 的结合,在多种场景下都能发挥巨大优势。

4.1 典型应用场景

  • 长时间运行的计算任务:

    • 图像/视频处理: 将图像帧或视频片段分发给不同的 Actor 进行并行处理。
    • 数据分析/科学计算: 大数据集的分块处理,每个 Actor 处理一部分数据并汇聚结果。
    • 加密/解密: 批量文件的加解密操作。
    • 示例: 一个 ImageProcessorActor 接收 ProcessImageMessage(imageData, replyTo),在 Isolate 中执行耗时的图像滤镜操作,然后将结果通过 replyTo 发送回去。这可以防止 UI 线程阻塞。
  • I/O 密集型操作:

    • 网络请求: 发送大量 HTTP 请求,每个请求由一个 Actor 负责,避免主 Isolate 阻塞。
    • 文件操作: 读写大文件,或同时处理多个文件。
    • 数据库交互: 运行在独立 Isolate 中的数据库连接池 Actor,处理所有数据库请求。
    • 示例: 一个 NetworkActor 接收 HttpRequestMessage(url, body, replyTo),使用 http 库执行请求,并将响应回传。
  • 状态管理服务:

    • 缓存服务: 一个 CacheActor 维护一个内存缓存,所有对缓存的读写请求都通过消息发送给它。由于 Actor 内部顺序处理消息,因此无需担心缓存的一致性问题。
    • 游戏服务器逻辑: 模拟游戏世界中的实体(玩家、NPC、物品),每个实体都是一个 Actor,维护自身状态并响应事件。
    • 分布式锁服务: 一个 Actor 充当锁管理器,处理所有锁请求。
  • 并发事件处理系统:

    • 消息队列消费者: 每个 Actor 消费一个或多个消息队列的主题,独立处理消息。
    • WebSocket 服务器: 每个连接由一个 Actor 表示,处理来自客户端的消息并发送响应。

4.2 错误处理与容错性

在 Actor 系统中,错误处理与传统的 try-catch 块有所不同。当一个 Actor 内部发生未捕获的异常时,其所在的 Isolate 可能会崩溃。Actor 模型鼓励使用“监督者策略”来实现容错:

  • 监督者 (Supervisor): 一个父 Actor 负责创建并监督其子 Actor。当子 Actor 失败时,父 Actor 可以决定如何处理:
    • 重启 (Restart): 将子 Actor 的状态重置并重新启动。
    • 停止 (Stop): 永久终止子 Actor。
    • 升级 (Escalate): 将错误报告给自己的监督者。
  • Dart Isolate 的错误处理: 当一个 Isolate 抛出未捕获的异常时,它会发送一个错误消息到其 ReceivePort 上。在 Isolate.spawn 返回的 Future 中,可以捕获这些错误。或者,在 mainReceivePort.listenonError 回调中处理。

在我们的基础框架中,Actor 基类的 onError 方法可以作为一个起点。更健壮的实现会涉及父 Actor 监听子 Actor 的错误流,并根据策略进行处理。

// 简化的错误监听示例 (在 ActorSystem.spawn 方法中可以这样增强)
// ...
late Isolate newIsolate;
try {
  newIsolate = await Isolate.spawn(Actor._startActor, [mainSendPort, creator], debugName: name);

  // 监听 Isolate 的错误流
  newIsolate.errors.listen((error) {
    print('ActorSystem: Error from Isolate $name: $error');
    // 根据错误类型或策略决定是否重启、停止或记录
    // 例如:
    // _actorRegistry.remove(name); // 将其从注册表中移除
    // mainReceivePort.close();
    // completer.completeError(Exception('Actor $name crashed: $error'));
  });

} catch (e) {
  completer.completeError(e);
}
// ...

4.3 性能考量

  • 消息序列化/反序列化: Dart Isolate 之间的消息传递涉及数据的序列化和反序列化。对于复杂或大型对象,这可能会引入显著的开销。尽量使用简单、高效的数据结构作为消息,或者只传递必要的 ID/引用。

  • Isolate 数量: 创建过多的 Isolate 会增加内存和上下文切换的开销。通常,Isolate 的数量不应超过 CPU 的逻辑核心数太多。对于大量轻量级任务,可以考虑使用一个 Isolate 池,或者一个 Actor 负责管理多个子任务。

  • CPU 密集型与 I/O 密集型:

    • CPU 密集型任务: 每个 Isolate 占用一个 CPU 核心,适合进行并行计算。
    • I/O 密集型任务: Isolate 在等待 I/O 时不会阻塞整个进程。但过多的 I/O 密集型 Isolate 可能导致操作系统层面的资源瓶颈。
  • 避免不必要的 Isolate 间通信: 消息传递有开销。如果两个组件需要频繁地、同步地交换大量数据,并且它们不是性能瓶颈,那么将它们放在同一个 Isolate 中可能更高效。Isolates 适用于需要严格隔离、并行执行或处理阻塞 I/O 的场景。

4.4 消息设计原则

  • 不可变性 (Immutability): 消息对象应该是不可变的。一旦发送,其内容就不应被修改。这避免了在消息传递过程中可能出现的意外副作用。
  • 原子性 (Atomicity): 每个消息都应该代表一个完整的、原子性的操作或事件。
  • 最小化数据: 消息中只包含完成操作所需的最小数据集。避免传递整个对象图。
  • 类型安全: 使用 Dart 的类型系统(如 sealed classenum)来定义消息类型,以便在 onMessage 方法中进行模式匹配,提高代码的可读性和健壮性。

4.5 与 Dart async/await 的关系

Dart 的 async/await 机制与 Isolate 和 Actor 模型是互补的,而不是互斥的。

  • async/await: 解决了单个 Isolate 内部的异步操作问题,允许非阻塞的顺序代码流。它在单个线程(Isolate 的事件循环)上管理异步任务。
  • Isolates/Actors: 解决了跨 Isolate 的并行性、状态隔离和并发问题。

在一个 Actor 内部,你仍然会大量使用 async/await 来处理异步操作(例如,网络请求、文件读写)。onMessage 方法本身可以是一个 async 方法,允许 Actor 在处理消息时执行异步操作,而不会阻塞其自己的事件循环。

// 示例:Actor 内部使用 async/await
class AsyncWorkerActor extends Actor {
  @override
  Future<void> onMessage(ActorMessage message) async {
    if (message is WorkMessage) {
      print('AsyncWorkerActor: Starting heavy async work for input ${message.input}...');
      // 模拟一个异步操作,例如网络请求
      final response = await Future.delayed(Duration(seconds: 1), () => 'Processed: ${message.input * 10}');
      print('AsyncWorkerActor: Work finished, result: $response. Replying.');
      message.replyTo.tell(WorkDoneMessage(int.parse(response.split(': ')[1]), runtimeType.toString()));
    }
  }
}

5. 更坚固的 Actor 框架:一些高级考量

我们之前构建的 ActorSystemActorRef 提供了 Actor 模型的基础实现。为了构建更坚固、更具生产力的框架,可以考虑以下高级特性:

5.1 Actor 路径与寻址

在一个复杂的系统中,可能需要通过一个唯一的路径来引用 Actor,类似于文件系统路径或 URL。例如 /user/myManager/worker-1。这有助于在分布式环境中定位 Actor。

5.2 监督者策略 (Supervision Strategies)

如前所述,当子 Actor 失败时,父 Actor 应该有一个明确的策略来处理。一个成熟的 Actor 框架会提供可配置的监督者策略:

  • OneForOneStrategy: 仅重启失败的子 Actor。
  • AllForOneStrategy: 重启所有兄弟子 Actor,如果其中一个失败。

实现监督者策略需要父 Actor 监听子 Actor 的错误流,并在其 onMessage 方法或专门的错误处理回调中执行相应的逻辑。

5.3 消息调度与优先级

默认情况下,ReceivePort 会按照消息到达的顺序处理。但在某些场景下,可能需要为消息设置优先级,确保高优先级消息被优先处理。这可以通过自定义 ReceivePort 的监听逻辑或使用更高级的队列实现。

5.4 Actor 生命周期管理

除了 preStartpostStop,还可以引入更多的生命周期钩子:

  • preRestart(): 在 Actor 重启前调用。
  • postRestart(): 在 Actor 重启后调用。

5.5 集群与分布式 Actor (Cluster & Distributed Actors)

对于更复杂的应用,Actor 可能需要跨多个进程或多台机器进行通信。这需要更高级的网络通信、序列化/反序列化机制和 Actor 寻址服务。虽然 Dart Isolate 本身是单进程内的,但其消息传递机制为构建分布式 Actor 框架提供了良好的基础。

5.6 示例:带监督者策略的简单 Actor

为了演示监督者策略,我们修改 ActorSystemActor,让父 Actor 可以捕获子 Actor 的错误。

// supervisor_strategy.dart
enum SupervisionDirective {
  restart,
  stop,
  escalate,
  resume, // 继续执行,忽略错误
}

typedef SupervisionStrategy = SupervisionDirective Function(
    ActorRef child, dynamic reason, StackTrace stackTrace);

// 默认策略:遇到错误就停止
SupervisionDirective defaultSupervisionStrategy(ActorRef child, dynamic reason, StackTrace stackTrace) {
  print('Default Supervision Strategy: Child $child failed with $reason. Stopping.');
  return SupervisionDirective.stop;
}

// actor_system.dart (修改部分)
// ...
class ActorSystem {
  // ...
  final Map<String, Isolate> _activeIsolates = {}; // 存储活跃的 Isolate 实例

  Future<ActorRef> spawn<T extends Actor>(String name, ActorCreator creator,
      {SupervisionStrategy? strategy}) async {
    // ... (省略 ActorRef 创建和注册部分)

    final Isolate newIsolate = await Isolate.spawn(Actor._startActor, [mainSendPort, creator], debugName: name);
    _activeIsolates[name] = newIsolate; // 注册 Isolate 实例

    // 监听 Isolate 的错误流
    newIsolate.errors.listen((error) async {
      print('ActorSystem: Error from Isolate $name: $error');
      // 错误消息通常是 List<String> [errorMsg, stackTrace]
      String errorMessage = 'Unknown error';
      StackTrace? stackTrace;
      if (error is List && error.length >= 2) {
        errorMessage = error[0] as String;
        stackTrace = StackTrace.fromString(error[1] as String);
      }

      final ActorRef? childRef = _actorRegistry[name];
      if (childRef != null) {
        final directive = strategy?.call(childRef, errorMessage, stackTrace!) ??
            defaultSupervisionStrategy(childRef, errorMessage, stackTrace!);

        switch (directive) {
          case SupervisionDirective.restart:
            print('ActorSystem: Restarting Actor "$name"...');
            await stop(name); // 停止旧的 Actor
            await Future.delayed(Duration(milliseconds: 100)); // 稍作等待
            await spawn(name, creator, strategy: strategy); // 重新启动新的 Actor
            break;
          case SupervisionDirective.stop:
            print('ActorSystem: Stopping Actor "$name" due to error.');
            await stop(name); // 停止 Actor
            break;
          case SupervisionDirective.escalate:
            // 向上级报告,这里简单地打印
            print('ActorSystem: Error escalated for Actor "$name".');
            await stop(name); // 停止 Actor
            break;
          case SupervisionDirective.resume:
            print('ActorSystem: Resuming Actor "$name" despite error.');
            // 对于 Isolate 崩溃的情况,resume 不太可行,通常用于 Actor 内部捕获的错误。
            // 这里为了演示,如果 Isolate 崩溃,我们仍然会停止它。
            await stop(name);
            break;
        }
      }
    });

    // ... (返回 completer.future 部分)
  }

  @override
  Future<void> stop(String name) async {
    final ActorRef? actorRef = _actorRegistry[name];
    if (actorRef != null) {
      print('ActorSystem: Sending StopActorMessage to Actor "$name"...');
      actorRef.tell(StopActorMessage());
      _actorRegistry.remove(name);
      final Isolate? iso = _activeIsolates.remove(name);
      if (iso != null) {
        // give it a moment to process StopActorMessage
        await Future.delayed(Duration(milliseconds: 50));
        iso.kill(priority: Isolate.immediate); // 确保 Isolate 被终止
      }
    } else {
      print('ActorSystem: Actor "$name" not found.');
    }
  }
  // ...
}

// FaultyActor.dart
import 'actor.dart';
import 'actor_messages.dart';
import 'dart:math';

class TriggerFaultMessage extends ActorMessage {
  @override
  String toString() => 'TriggerFaultMessage';
}

class FaultyActor extends Actor {
  int _state = 0;

  @override
  Future<void> onMessage(ActorMessage message) async {
    if (message is TriggerFaultMessage) {
      print('FaultyActor: Received TriggerFaultMessage. Current state: $_state');
      // 模拟一个随机错误
      if (Random().nextBool()) {
        throw Exception('Simulated crash in FaultyActor!');
      } else {
        _state++;
        print('FaultyActor: State incremented to $_state.');
      }
    } else if (message is GetStateMessage) {
      message.replyTo.send(_state);
    } else {
      print('FaultyActor: Unhandled message: $message');
    }
  }

  @override
  Future<void> preStart() async {
    await super.preStart();
    print('FaultyActor: Pre-start with state $_state.');
  }

  @override
  Future<void> postStop() async {
    await super.postStop();
    print('FaultyActor: Post-stop with state $_state.');
  }
}

// main_faulty.dart
import 'dart:async';
import 'actor_system.dart';
import 'faulty_actor.dart';
import 'actor_messages.dart';
import 'supervisor_strategy.dart';

void main() async {
  print('Main Isolate: Starting Actor System with FaultyActor...');
  final ActorSystem system = ActorSystem('FaultySystem');

  // 定义一个重启策略
  SupervisionStrategy restartStrategy = (child, reason, stackTrace) {
    print('Custom Strategy: Child $child failed ($reason). Attempting to RESTART.');
    return SupervisionDirective.restart;
  };

  // 1. 启动 FaultyActor,并应用重启策略
  print('Main Isolate: Spawning FaultyActor...');
  final faultyActorRef = await system.spawn('myFaultyActor', () => FaultyActor(), strategy: restartStrategy);
  print('Main Isolate: FaultyActor spawned.');

  // 2. 连续触发几次错误
  for (int i = 0; i < 5; i++) {
    print('nMain Isolate: Triggering fault attempt ${i + 1}...');
    faultyActorRef.tell(TriggerFaultMessage());
    await Future.delayed(Duration(milliseconds: 700)); // 等待 Actor 处理消息或崩溃
  }

  // 3. 尝试获取状态,验证是否重启成功
  print('nMain Isolate: Asking FaultyActor for final state...');
  try {
    final state = await faultyActorRef.ask<int>(GetStateMessage(system.getActor('myFaultyActor')!.self._sendPort), timeout: Duration(seconds: 2));
    print('Main Isolate: FaultyActor replied with final state: $state');
  } on TimeoutException {
    print('Main Isolate: Failed to get state: Timeout occurred.');
  } catch (e) {
    print('Main Isolate: Error asking FaultyActor: $e');
  }

  // 4. 停止 Actor
  print('nMain Isolate: Stopping FaultyActor...');
  await system.stop('myFaultyActor');

  print('Main Isolate: Actor System shut down.');
}

运行 main_faulty.dart,你会看到 FaultyActor 可能会崩溃,但由于我们设置了 restartStrategyActorSystem 会尝试重新启动它。这演示了 Actor 模型的容错性。

总结

Dart Isolate 与 Actor 模型的结合,为构建高并发、高弹性且易于推理的系统提供了一条清晰的路径。通过利用 Dart Isolate 的彻底状态隔离和消息传递机制,我们可以自然地实现 Actor 模型的核心原则。这种模式不仅简化了并发编程的复杂性,还为应用程序带来了卓越的可伸缩性和容错能力。

在今天的讲座中,我们探讨了并发编程的挑战,深入理解了 Dart Isolate 的独特之处,剖析了 Actor 模型的核心思想与优势,并通过一系列代码示例展示了如何在 Dart 中构建一个基础的 Actor 框架,并探讨了实际应用场景、错误处理与高级考量。希望这次深入的探讨能为各位在未来的 Dart 项目中应用 Actor 模型提供有益的启发和指导。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注