这章主要续上章讲 generator 引入的 yield
机制,继续讲协程(coroutines)机制。
yield
除了在数据流动中起作用,它更重要的功能是作为一种控制流设施,来实现协同式(与抢占式不同)地多任务:
Regardless of the flow of data, yield is a control flow device that can be used to implement cooperative multitasking: each coroutine yields control to a central scheduler so that other coroutines can be activated.
理解这些关键点:
- Coroutine 是如何演化而来的
- Coroutine 的状态、API 设计,如何和背后的控制流转换机制结合
yield from
的作用
补充一个书中没有显式指出的概念,coroutine function,即函数体中带有 yield
/ yield from
的。调用这种函数时返回的不是函数体中的 return
的值,而是一个 coroutine object。这个 object 可以被传递,可以(仅有 yield from
时)被 asyncio
的 event loop 调度,可以被 yield from
等。
How Coroutines Evolved from Generators
协程如何从 generator 演变而来:
Basic Behavior of a Generator Used as a Coroutine
def simple_coroutine():
print('-> start ')
x = yield
print('-> received: {}'.format(x))
带 yield
表达式的函数,你可以调用它,获得一个 generator:
>>> coro = simple_coroutine()
>>> type(coro)
generator
Generator 有 4 个状态,可以通过 inspect.getgeneratorstata(coro)
函数获得。含义跟它的字面意义一样
状态 | 调用 next() | 调用 send() |
---|---|---|
'GEN_CREATED' | 变为 GEN_RUNNING | TypeError: can't send non-None value to a just-started generator |
'GEN_RUNNING' | 这个状态下,解释器在运行协程内的代码,所以没有调用 next() 一说 | 同左 |
'GEN_SUSPENDED' | 等同于 send(None) ,变为 GEN_RUNNING | 变为 GEN_RUNNING |
'GEN_CLOSED' | 此时协程的函数体已运行完,抛出 StopIteration 异常 | 同左 |
比如:
>>> coro = simple_coroutine() # 'GEN_CREATED'
>>> next(coro) # 'GEN_RUNNING' then 'GEN_SUSPENDED'
-> start
>>> coro.send(1) # 'GEN_RUNNING' then 'GEN_CLOSED'
-> received 1
>>> next(coro) # `coro.send(2)` would have the same result
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration:
>>> coro = simple_coroutine() # 'GEN_CREATED'
>>> coro.send(1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator
一开始的 next(coro)
调用,一般被称为 "priming the coroutine",prime 在这里的意思是 "make sth ready for use or action"。
- Note
next(coro)
和coro.send(None)
的功能是完全一致的。
第二个例子:
def simple_coro2(a):
print("-> Started: a = {}".format(a))
b = yield a
print("-> Received: b = {}".format(b))
c = yield a + b
print("-> Received: c = {}".format(c))
Example: Coroutine to Compute a Running Average
这节描述如何用协程写一个动态计算平均数的程序。相比之前的 generator 写法或者 __call__
函数,用协程写更直观和简洁,不依赖 closure 等。
Decorators for Coroutine Priming
Coroutine 需要 prime 完才能用。作者写了个简单的 decorator 用来自动 prime 协程。
from functools import wraps
def coroutine(func):
"""Decorator: primes `func` by advancing to first `yield`"""
@wraps(func)
def primer(*args,**kwargs):
gen = func(*args,**kwargs)
next(gen)
return gen
return primer
Coroutine Termination and Exception Handling
关于异常处理,有这几个场景:
- Caller 想给 coroutine 发送一个异常。发送异常的原因可能是 coroutine
yield
出来的值有问题,所以 caller 需要触发 coroutine 中走向异常分支。也可能是其他原因 - Caller 想要感知到 coroutine 中发生的异常
对于 caller 给 coroutine 发送异常的,有 generator.throw(exc_type[, exc_value[, traceback]])
函数。Coroutine 中的 yield
语句处会抛异常。
对于 caller 想要感知到 coroutine 中发生的异常,如果 coroutine 中没有捕获这个异常,那么异常会向上抛到 caller 中。
关于 coroutine 的停止,如果 caller 想让 coroutine 停止,直接调用 generator.close()
即可。
Returning a Value from a Coroutine
Coroutine 向 caller 传递数据,之前提到的 yield
关键字是在 coroutine 还没结束时 yield
的。但是如果我们想让 coroutine 像函数一样,在最后执行结束时返回一个值,应该怎么做?
语法上,coroutine 中一样是用 return
返回值。由于 return
机制设计上晚于 coroutine 本身,为了保持与之前行为的一致性,返回值是从 StopIteration
的 exc.value
中取得的:
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(30)
>>> coro_avg.send(6.5)
>>> try:
... coro_avg.send(None)
... except StopIteration as exc:
... result = exc.value
...
>>> result
Result(count=3, average=15.5)
Using yield from
这一节以及后面的内容,是这章最 重点 的内容。这里先用一套教学代码描述 yield from
的基本能力:
# BEGIN YIELD_FROM_AVERAGER
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# the subgenerator
def averager(): # <1>
total = 0.0
count = 0
average = None
while True:
term = yield # <2>
if term is None: # <3>
break
total += term
count += 1
average = total/count
return Result(count, average) # <4>
# the delegating generator
def grouper(results, key): # <5>
while True: # <6>
results[key] = yield from averager() # <7>
# the client code, a.k.a. the caller
def main(data): # <8>
results = {}
for key, values in data.items():
group = grouper(results, key) # <9>
next(group) # <10>
for value in values:
group.send(value) # <11>
group.send(None) # important! <12>
# print(results) # uncomment to debug
report(results)
# output report
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
当第一次执行到 <9> 处时,代码走进 grouper
中的 yield from
语句,然后会走进 averager
中,到 yield
语句时被 suspended。此后 main
函数中 next()
和 send()
操作,都会通过 grouper
作为管道输送到 averager
中。下面的图描述了这一过程:
grouper
中虽然没有 yield
语句(只有 yield from
),但是它也是 generator,也遵循 generator 的规则。这里的有几个关键点:
yield from
后面接的是一个 generator,会执行这个 generator 中的代码直到yield
把 coroutine suspendgrouper
代码体执行结束时也会抛StopIteration
。这里作者为了容易处理,在 <6> 处加了个while True
,用来避免group.send(None)
时抛异常。我觉得不是太好,这意味着main
中每一次循环结束后,在 <7> 处都会有一个没有实际作用的 generator 实例被产生,然后随着下一次循环开始又被回收掉- 如果 subgenerator 一直没有执行完,那么 delegating generator 会一直被 suspend 在
yield from
处,直到被垃圾回收 - delegating generator 也需要 priming
还有一个点是,yield from
的对象可以是另外一个 generator,也可以是任意 iterable 的对象:
>>> def chain(*iterables):
... for it in iterables:
... yield from it
...
>>> s = 'ABC'
>>> t = tuple(range(3))
>>> list(chain(s, t))
['A', 'B', 'C', 0, 1, 2]
需要注意的是,对于 chain
返回的 generator,你不能使用 .send()
带一个非 None
的值去访问,会抛 AttributeError: 'str_iterator' object has no attribute 'send'
,导致 generator 异常而结束。
The Meaning of yield from
Fluent Python 在前文中提到一个设计上的问题,即 coroutine 和 generator 在语法上用了 def
,但是事实上他们又不像函数一样可以被嵌套调用。yield from
出现用来解决这个问题,它使得:
- Generator 可以方便地包裹 generator
- Caller 可以把数据通过外层 generator 传递到内层 generator 中,反之亦然
- 内层 generator 抛出异常后可以一层一层传递到 caller 中
- Generator 给 caller 传递数据的方式,除了在未结束前通过
yield
传递外,还可以用yield from
将结束时的返回值给到 caller
简单(但是不完整)地说:
“When the iterator is another generator, the effect is the same as if the body of the subgenerator were inlined at the point of the yield from expression. Furthermore, the subgenerator is allowed to execute a return statement with a value, and that value becomes the value of the yield from expression.”
完整的规则在这里不作摘录,需要时翻书。
yield from
还会带来发生异常方面的差异。如果 caller close()
掉 subgenerator,对于 delegating generator 来说,只有一个 yield from
并不能让它感知到 subgenerator 是正常做完,还是被 close()
掉的(它只能感知到再 next()
会抛 StopIteration
。因此特性作者设计出了一种机制,如果 caller close()
掉 subgenerator 时,delegating generator 会收到 GeneratorExit
异常。
对于 iterable 被 yield from
的情况,事情又变复杂了。plain iterator 并不支持 .send()
, .throw()
, .close()
,需要 yield from
在处理时有额外的支持,比如 delegating generator 收到 GeneratorExit
异常时,需要判断 subgenerator 有无 close
属性,有的话再进行调用。但这些细节不是很必要去了解,特性设计者来处理这些 cornor case,使用者就可以享受比较一致的上层模型。
Use Case: Coroutines for Discrete Event Simulation
这节要做一个离散事件模拟器(Discrete Event Simulation)。
About Discrete Event Simulations
书中对 DES (Discrete Event Simulation) 的描述很清晰直观:
In a DES, the simulation “clock” does not advance by fixed increments, but advances directly to the simulated time of the next modeled event.
比如回合制游戏是 DES 的例子,玩家每一个操作对应 DES 中的一个事件,会使整个系统发生变化。而玩家不操作时,一些状态是没有变化的。实时游戏则不是 DES 的例子。
The Taxi Fleet Simulation
这大概是这章最重要的一个例子,书中用了大量篇幅描述。代码在 GitHub 上。这个例子值得一看,重点在于:
- 仅用单线程配合协程完成对离散事件的模拟
- 协程(或者说 generator)会产生一些事件,比如 taxi 离开车库,接到客人等,这些事件通过
yield
给到 caller - Caller(相对于协程而言)维护一个 优先级队列,将事件放入这个队列中。这个队列的排序规则决定了哪些事件会先被处理
- Caller 处理事件时,同时会
send()
数据给 generator 指导 它做事情或者产生下一个事件 - Caller 发现 generator 抛了
StopIteration
时做相应的处理
我觉得这个例子中,作者应该把 caller 跟 coroutine 之间的交互、各自的职责描述出来,但是他没有。我觉得普通人写起这种代码来很纠结,不够能直观地理解逻辑应该放在 caller 还是 coroutine。
Further Reading
照例是非常多好的协程相关的材料。可惜我没有能力去深入阅读,也感觉难以在工程上直接用协程,只在理解框架上会有一些帮助。所以不深入看了。
Soapbox
Raise from lambda
作者表示 Python 语言设计中,对于 keyword 的使用太过谨慎。举了几个例子:
for
/while
/try
都可以搭配一个else
子句,这是对else
的滥用,应该新使用一个then
关键字def
可以表示函数定义,还可以表示 generator 和 coroutine,这也非常地不好。这几者在使用上差异很大yield from
则是最大的混乱。这完全应该用新的关键字,却将已有的两个关键字连起来作为一个新的语义。作者担心后面有raise from lambda
这种奇葩语法出现
但是后面的 async
, await
关键字可以部分解决这上面提到的混乱。