FluentPythonCh16

 28th February 2019 at 9:52am

这章主要续上章讲 generator 引入的 yield 机制,继续讲协程(coroutines)机制。

yield 除了在数据流动中起作用,它更重要的功能是作为一种控制流设施,来实现协同式(与抢占式不同)地多任务:

Regardless of the flow of data, yield is a control flow device that can be used to implement cooperative multitasking: each coroutine yields control to a central scheduler so that other coroutines can be activated.

理解这些关键点:

  • Coroutine 是如何演化而来的
  • Coroutine 的状态、API 设计,如何和背后的控制流转换机制结合
  • yield from 的作用

补充一个书中没有显式指出的概念,coroutine function,即函数体中带有 yield / yield from 的。调用这种函数时返回的不是函数体中的 return 的值,而是一个 coroutine object。这个 object 可以被传递,可以(仅有 yield from 时)被 asyncio 的 event loop 调度,可以被 yield from 等。

How Coroutines Evolved from Generators

协程如何从 generator 演变而来:

  1. PEP 342 — Coroutines via Enhanced Generators
  2. PEP 380 - Syntax for Delegating to a Subgenerator

Basic Behavior of a Generator Used as a Coroutine

def simple_coroutine():
    print('-> start ')
    x = yield
    print('-> received: {}'.format(x))

yield 表达式的函数,你可以调用它,获得一个 generator:

>>> coro = simple_coroutine()
>>> type(coro)
generator

Generator 有 4 个状态,可以通过 inspect.getgeneratorstata(coro) 函数获得。含义跟它的字面意义一样

状态 调用 next() 调用 send()
'GEN_CREATED'变为 GEN_RUNNINGTypeError: can't send non-None value to a just-started generator
'GEN_RUNNING'这个状态下,解释器在运行协程内的代码,所以没有调用 next() 一说同左
'GEN_SUSPENDED'等同于 send(None),变为 GEN_RUNNING变为 GEN_RUNNING
'GEN_CLOSED'此时协程的函数体已运行完,抛出 StopIteration 异常同左

比如:

>>> coro = simple_coroutine()   # 'GEN_CREATED'
>>> next(coro)					# 'GEN_RUNNING' then 'GEN_SUSPENDED'
-> start
>>> coro.send(1)				# 'GEN_RUNNING' then 'GEN_CLOSED'
-> received 1
>>> next(coro)					# `coro.send(2)` would have the same result
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration:
>>> coro = simple_coroutine()   # 'GEN_CREATED'
>>> coro.send(1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator

一开始的 next(coro) 调用,一般被称为 "priming the coroutine",prime 在这里的意思是 "make sth ready for use or action"。

Note
next(coro)coro.send(None) 的功能是完全一致的。

第二个例子:

def simple_coro2(a):
    print("-> Started: a = {}".format(a))
    b = yield a
    print("-> Received: b = {}".format(b))
    c = yield a + b
    print("-> Received: c = {}".format(c))

Example: Coroutine to Compute a Running Average

这节描述如何用协程写一个动态计算平均数的程序。相比之前的 generator 写法或者 __call__ 函数,用协程写更直观和简洁,不依赖 closure 等。

Decorators for Coroutine Priming

Coroutine 需要 prime 完才能用。作者写了个简单的 decorator 用来自动 prime 协程。

from functools import wraps

def coroutine(func):
    """Decorator: primes `func` by advancing to first `yield`"""
    @wraps(func)
    def primer(*args,**kwargs):
        gen = func(*args,**kwargs)
        next(gen)
        return gen
    return primer

Coroutine Termination and Exception Handling

关于异常处理,有这几个场景:

  • Caller 想给 coroutine 发送一个异常。发送异常的原因可能是 coroutine yield 出来的值有问题,所以 caller 需要触发 coroutine 中走向异常分支。也可能是其他原因
  • Caller 想要感知到 coroutine 中发生的异常

对于 caller 给 coroutine 发送异常的,有 generator.throw(exc_type[, exc_value[, traceback]]) 函数。Coroutine 中的 yield 语句处会抛异常。

对于 caller 想要感知到 coroutine 中发生的异常,如果 coroutine 中没有捕获这个异常,那么异常会向上抛到 caller 中。

关于 coroutine 的停止,如果 caller 想让 coroutine 停止,直接调用 generator.close() 即可。

Returning a Value from a Coroutine

Coroutine 向 caller 传递数据,之前提到的 yield 关键字是在 coroutine 还没结束时 yield 的。但是如果我们想让 coroutine 像函数一样,在最后执行结束时返回一个值,应该怎么做?

语法上,coroutine 中一样是用 return 返回值。由于 return 机制设计上晚于 coroutine 本身,为了保持与之前行为的一致性,返回值是从 StopIterationexc.value 中取得的:

from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)

>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(30)
>>> coro_avg.send(6.5)
>>> try:
...     coro_avg.send(None)
... except StopIteration as exc:
...     result = exc.value
...
>>> result
Result(count=3, average=15.5)

Using yield from

这一节以及后面的内容,是这章最 重点 的内容。这里先用一套教学代码描述 yield from 的基本能力:

# BEGIN YIELD_FROM_AVERAGER
from collections import namedtuple

Result = namedtuple('Result', 'count average')


# the subgenerator
def averager():  # <1>
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield  # <2>
        if term is None:  # <3>
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)  # <4>


# the delegating generator
def grouper(results, key):  # <5>
    while True:  # <6>
        results[key] = yield from averager()  # <7>


# the client code, a.k.a. the caller
def main(data):  # <8>
    results = {}
    for key, values in data.items():
        group = grouper(results, key)  # <9>
        next(group)  # <10>
        for value in values:
            group.send(value)  # <11>
        group.send(None)  # important! <12>

    # print(results)  # uncomment to debug
    report(results)


# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))


data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}


if __name__ == '__main__':
    main(data)

当第一次执行到 <9> 处时,代码走进 grouper 中的 yield from 语句,然后会走进 averager 中,到 yield 语句时被 suspended。此后 main 函数中 next()send() 操作,都会通过 grouper 作为管道输送到 averager 中。下面的图描述了这一过程:

grouper 中虽然没有 yield 语句(只有 yield from),但是它也是 generator,也遵循 generator 的规则。这里的有几个关键点:

  • yield from 后面接的是一个 generator,会执行这个 generator 中的代码直到 yield 把 coroutine suspend
  • grouper 代码体执行结束时也会抛 StopIteration。这里作者为了容易处理,在 <6> 处加了个 while True,用来避免 group.send(None) 时抛异常。我觉得不是太好,这意味着 main 中每一次循环结束后,在 <7> 处都会有一个没有实际作用的 generator 实例被产生,然后随着下一次循环开始又被回收掉
  • 如果 subgenerator 一直没有执行完,那么 delegating generator 会一直被 suspend 在 yield from 处,直到被垃圾回收
  • delegating generator 也需要 priming

还有一个点是,yield from 的对象可以是另外一个 generator,也可以是任意 iterable 的对象:

>>> def chain(*iterables):
...     for it in iterables:
...         yield from it
...
>>> s = 'ABC'
>>> t = tuple(range(3))
>>> list(chain(s, t))
['A', 'B', 'C', 0, 1, 2]

需要注意的是,对于 chain 返回的 generator,你不能使用 .send() 带一个非 None 的值去访问,会抛 AttributeError: 'str_iterator' object has no attribute 'send',导致 generator 异常而结束。

The Meaning of yield from

Fluent Python 在前文中提到一个设计上的问题,即 coroutine 和 generator 在语法上用了 def,但是事实上他们又不像函数一样可以被嵌套调用。yield from 出现用来解决这个问题,它使得:

  • Generator 可以方便地包裹 generator
  • Caller 可以把数据通过外层 generator 传递到内层 generator 中,反之亦然
  • 内层 generator 抛出异常后可以一层一层传递到 caller 中
  • Generator 给 caller 传递数据的方式,除了在未结束前通过 yield 传递外,还可以用 yield from 将结束时的返回值给到 caller

简单(但是不完整)地说:

“When the iterator is another generator, the effect is the same as if the body of the subgenerator were inlined at the point of the yield from expression. Furthermore, the subgenerator is allowed to execute a return statement with a value, and that value becomes the value of the yield from expression.”

完整的规则在这里不作摘录,需要时翻书。

yield from 还会带来发生异常方面的差异。如果 caller close() 掉 subgenerator,对于 delegating generator 来说,只有一个 yield from 并不能让它感知到 subgenerator 是正常做完,还是被 close() 掉的(它只能感知到再 next() 会抛 StopIteration。因此特性作者设计出了一种机制,如果 caller close() 掉 subgenerator 时,delegating generator 会收到 GeneratorExit 异常。

对于 iterable 被 yield from 的情况,事情又变复杂了。plain iterator 并不支持 .send(), .throw(), .close(),需要 yield from 在处理时有额外的支持,比如 delegating generator 收到 GeneratorExit 异常时,需要判断 subgenerator 有无 close 属性,有的话再进行调用。但这些细节不是很必要去了解,特性设计者来处理这些 cornor case,使用者就可以享受比较一致的上层模型。

Use Case: Coroutines for Discrete Event Simulation

这节要做一个离散事件模拟器(Discrete Event Simulation)。

About Discrete Event Simulations

书中对 DES (Discrete Event Simulation) 的描述很清晰直观:

In a DES, the simulation “clock” does not advance by fixed increments, but advances directly to the simulated time of the next modeled event.

比如回合制游戏是 DES 的例子,玩家每一个操作对应 DES 中的一个事件,会使整个系统发生变化。而玩家不操作时,一些状态是没有变化的。实时游戏则不是 DES 的例子。

The Taxi Fleet Simulation

这大概是这章最重要的一个例子,书中用了大量篇幅描述。代码在 GitHub 上。这个例子值得一看,重点在于:

  • 仅用单线程配合协程完成对离散事件的模拟
  • 协程(或者说 generator)会产生一些事件,比如 taxi 离开车库,接到客人等,这些事件通过 yield 给到 caller
  • Caller(相对于协程而言)维护一个 优先级队列,将事件放入这个队列中。这个队列的排序规则决定了哪些事件会先被处理
  • Caller 处理事件时,同时会 send() 数据给 generator 指导 它做事情或者产生下一个事件
  • Caller 发现 generator 抛了 StopIteration 时做相应的处理

我觉得这个例子中,作者应该把 caller 跟 coroutine 之间的交互、各自的职责描述出来,但是他没有。我觉得普通人写起这种代码来很纠结,不够能直观地理解逻辑应该放在 caller 还是 coroutine。

Further Reading

照例是非常多好的协程相关的材料。可惜我没有能力去深入阅读,也感觉难以在工程上直接用协程,只在理解框架上会有一些帮助。所以不深入看了。

Soapbox

Raise from lambda

作者表示 Python 语言设计中,对于 keyword 的使用太过谨慎。举了几个例子:

  • for / while / try 都可以搭配一个 else 子句,这是对 else 的滥用,应该新使用一个 then 关键字
  • def 可以表示函数定义,还可以表示 generator 和 coroutine,这也非常地不好。这几者在使用上差异很大
  • yield from 则是最大的混乱。这完全应该用新的关键字,却将已有的两个关键字连起来作为一个新的语义。作者担心后面有 raise from lambda 这种奇葩语法出现

但是后面的 async, await 关键字可以部分解决这上面提到的混乱。