Generator 函数在控制流中的高级应用:实现协作式多任务

好的,各位观众老爷们,欢迎来到“Generator的奇妙冒险”特别节目!我是你们的老朋友,Bug终结者,今天咱们不聊枯燥的理论,要玩点刺激的——用Generator函数实现协作式多任务!

准备好了吗?系好安全带,我们要发车啦!🚀

第一幕:Generator,一个被低估的英雄

在很多人眼里,Generator函数可能就是个“迭代器plus”,能省点内存,生成斐波那契数列啥的。但我要告诉你,这简直是暴殄天物!Generator真正的潜力,在于它能暂停和恢复执行的能力,这简直就是控制流的魔法棒啊!✨

想象一下,你是一位指挥家,手下的乐器(函数)们各自演奏着不同的乐章。Generator函数就像一个可以随时暂停和恢复乐器演奏的遥控器,让你能够精妙地控制整个交响乐团的节奏。

什么是Generator函数?

先来个快速回顾,免得有小伙伴掉队:

  • 普通函数: 一口气执行到底,要么返回结果,要么抛出异常,干脆利落,不拖泥带水。
  • Generator函数: 遇到yield关键字就暂停,把yield后面的值“吐”出来,然后乖乖等着下次被唤醒。
def my_generator():
    print("准备开始...")
    yield 1
    print("暂停一下...")
    yield 2
    print("结束了!")

gen = my_generator()
print(next(gen))  # 输出:准备开始...  1
print(next(gen))  # 输出:暂停一下...  2
print(next(gen))  # 输出:结束了! StopIteration

看到没?next()函数就像一个闹钟,每次叫醒Generator函数,让它执行到下一个yield,然后又进入休眠状态。

第二幕:协作式多任务,让程序优雅地“轮班”

传统的并发编程,比如多线程或者多进程,就像一群人争抢一个麦克风🎤,谁抢到就唱两句,然后让给别人。这种方式虽然也能实现并发,但切换任务的成本比较高,而且容易出现锁的问题,一不小心就变成“死锁之歌”。

协作式多任务则不同,它更像一群有礼貌的歌手,大家轮流唱歌,唱完一段自觉地把麦克风交给下一个人。每个任务执行一段时间后,主动让出控制权,让其他任务有机会执行。

这种方式的好处是:

  • 切换成本低: 因为是任务主动让出控制权,所以不需要操作系统来调度,省去了上下文切换的开销。
  • 避免锁的问题: 由于同一时刻只有一个任务在执行,所以不需要考虑锁的问题,避免了死锁的风险。
  • 更易于理解和调试: 任务之间的切换点是明确的,更容易理解程序的执行流程,也更容易调试。

第三幕:Generator + 调度器,打造多任务引擎

要实现协作式多任务,我们需要两个关键组件:

  1. Generator函数: 负责执行具体的任务,并在适当的时候让出控制权。
  2. 调度器: 负责管理所有的Generator函数,并按照一定的策略轮流执行它们。

下面,我们来撸一个简单的调度器:

class Scheduler:
    def __init__(self):
        self.tasks = []  # 任务队列

    def add_task(self, task):
        self.tasks.append(task)

    def run(self):
        while self.tasks:
            task = self.tasks.pop(0)  # 从队列头部取出一个任务
            try:
                next(task)  # 执行任务
                self.tasks.append(task)  # 如果任务没有完成,放回队列尾部
            except StopIteration:
                pass  # 任务完成,忽略

# 示例任务
def task1():
    print("任务1开始...")
    yield
    print("任务1继续...")
    yield
    print("任务1结束!")

def task2():
    print("任务2开始...")
    yield
    print("任务2继续...")
    yield
    print("任务2结束!")

# 创建调度器并添加任务
scheduler = Scheduler()
scheduler.add_task(task1())
scheduler.add_task(task2())

# 运行调度器
scheduler.run()

运行结果:

任务1开始...
任务2开始...
任务1继续...
任务2继续...
任务1结束!
任务2结束!

看到没?任务1和任务2交替执行,实现了简单的协作式多任务。🎉

第四幕:进阶技巧,让多任务更强大

上面的调度器只是一个简单的原型,还有很多可以改进的地方。

  1. 优先级: 可以给任务设置优先级,让优先级高的任务先执行。
class Scheduler:
    def __init__(self):
        self.tasks = []  # 任务队列,存储 (优先级, 任务)

    def add_task(self, task, priority=0):
        self.tasks.append((priority, task))
        self.tasks.sort(key=lambda x: x[0])  # 按照优先级排序

    def run(self):
        while self.tasks:
            priority, task = self.tasks.pop(0)  # 从队列头部取出一个任务
            try:
                next(task)  # 执行任务
                self.tasks.append((priority, task))  # 如果任务没有完成,放回队列尾部
                self.tasks.sort(key=lambda x: x[0]) # 重新排序
            except StopIteration:
                pass  # 任务完成,忽略

# 示例任务
def task1():
    print("任务1开始...")
    yield
    print("任务1继续...")
    yield
    print("任务1结束!")

def task2():
    print("任务2开始...")
    yield
    print("任务2继续...")
    yield
    print("任务2结束!")

# 创建调度器并添加任务
scheduler = Scheduler()
scheduler.add_task(task1(), priority=1)  # 任务1优先级高
scheduler.add_task(task2(), priority=0)  # 任务2优先级低

# 运行调度器
scheduler.run()
  1. 阻塞: 有些任务可能需要等待某个事件发生才能继续执行,比如等待网络请求完成。我们可以使用yield来模拟阻塞。
import time

class Scheduler:
    # ... (省略其他代码)

    def block(self, task, event):
        """阻塞一个任务,直到某个事件发生"""
        self.blocked_tasks[event] = self.blocked_tasks.get(event, []) + [task]

    def unblock(self, event, result=None):
        """解除阻塞的任务"""
        if event in self.blocked_tasks:
            for task in self.blocked_tasks.pop(event):
                self.add_task(task)

    def run(self):
        while self.tasks or self.blocked_tasks: # 任务队列和阻塞队列都为空时退出
            if self.tasks:
                priority, task = self.tasks.pop(0)
                try:
                    result = next(task)
                    if isinstance(result, Blocked): #如果返回的是阻塞信号
                        self.block(task,result.event)
                    else:
                        self.tasks.append((priority, task))
                        self.tasks.sort(key=lambda x: x[0])
                except StopIteration:
                    pass
            else: # 如果任务队列为空,则检查阻塞队列
                time.sleep(0.1) # 避免空转

class Blocked(object):
    def __init__(self, event):
        self.event = event

def task1():
    print("任务1开始...")
    yield Blocked("网络请求")
    print("任务1继续...")
    yield
    print("任务1结束!")

def task2():
    print("任务2开始...")
    yield
    print("任务2继续...")
    yield
    print("任务2结束!")

# 创建调度器并添加任务
scheduler = Scheduler()
scheduler.add_task(task1(), priority=1)
scheduler.add_task(task2(), priority=0)

# 模拟网络请求完成
def simulate_network_request():
    time.sleep(2)  # 模拟网络延迟
    scheduler.unblock("网络请求", "网络数据")

# 启动网络请求模拟
import threading
threading.Thread(target=simulate_network_request).start()

# 运行调度器
scheduler.run()

在这个例子中,task1遇到了Blocked("网络请求"),表示它需要等待网络请求完成。调度器会将task1放入阻塞队列,直到simulate_network_request函数调用scheduler.unblock("网络请求"),才会将task1重新加入任务队列,让它继续执行。

  1. 异常处理: Generator函数中可能会抛出异常,我们需要在调度器中处理这些异常,避免程序崩溃。
class Scheduler:
    # ... (省略其他代码)

    def run(self):
        while self.tasks:
            task = self.tasks.pop(0)
            try:
                next(task)
                self.tasks.append(task)
            except StopIteration:
                pass
            except Exception as e:
                print(f"任务 {task} 发生异常:{e}")  # 捕获并打印异常

第五幕:Generator的威力,远不止于此

除了协作式多任务,Generator函数还有很多其他的应用场景:

  • 数据流处理: 可以用Generator函数构建数据管道,将数据从一个阶段传递到另一个阶段,实现复杂的数据处理流程。
  • 状态机: 可以用Generator函数来实现状态机,控制程序的不同状态之间的转换。
  • 异步编程: 配合asyncawait关键字,可以实现更高效的异步编程。

总结:

Generator函数就像一把瑞士军刀,功能强大,用途广泛。只要你敢于探索,就能发现它更多的可能性。希望今天的节目能让你对Generator函数有更深入的了解,并能在实际项目中灵活运用它。

记住,编程的乐趣在于创造,在于不断地学习和探索。让我们一起用代码改变世界吧!🌍

最后的彩蛋:

如果你想更深入地了解Generator函数,可以参考以下资源:

感谢大家的收看,我们下期再见!👋

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注