FluentPythonCh18

29th January 2021 at 11:07am
Fluent Python
Concurrency is about dealing with lots of things at once.
Parallelism is about doing lots of things at once.
— Rob Pike, Co-inventor of the Go language

Rob Pike 阐述了并发(concurrency)和并行(parallelism)的区别。简单的例子是,「并发」是让单核 CPU 能跑非常多进程的原因;「并行」是多核 CPU 能同行参与计算的原因。

In this chapter we’ll see:

  • A comparison between a simple threaded program and the asyncio equivalent, showing the relationship between threads and asynchronous tasks
  • How the asyncio.Future class differs from concurrent.futures.Future
  • Asynchronous versions of the flag download examples from Chapter 17
  • How asynchronous programming manages high concurrency in network applications, without using threads or processes
  • How coroutines are a major improvement over callbacks for asynchronous programming
  • How to avoid blocking the event loop by offloading blocking operations to a thread pool
  • Writing asyncio servers, and how to rethink web applications for high concurrency
  • Why asyncio is poised to have a big impact in the Python ecosystem

Python 在后来提供了 async / await 关键字用来替代 @ayncio.coroutine 以及 yield from。下面的很多代码例子已经不是最佳实践,但是原理仍然是可行的。

Thread Versus Coroutine: A Comparison

通过对比同一个程序用线程写和用 asyncio 写的区别来引入 asyncio

线程版:

def slow_function():   
    # pretend waiting a long time for I/O
    time.sleep(3)
    return 42

def supervisor():   
    signal = Signal()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

asyncio 版:

@asyncio.coroutine
def slow_function():   
    # pretend waiting a long time for I/O
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    # asyncio.async(…) schedules the spin coroutine to run, wrapping it in a Task
    # object, which is returned immediately.
    spinner = asyncio.ensure_future(spin('thinking!'))   
    
    print('spinner object:', spinner)   
    
    # 需等待 slow_function 中的 sleep 结束后才被 event loop 调度,继续执行
    result = yield from slow_function()
    
    spinner.cancel()
    return result

这里的 核心 是:

  • 线程作为一种抢占式多任务(preemptive multitasking)的模型,不同的线程间能否被运行是由 CPU 进行调度的
  • asyncio 使用了协作式 (cooperative) 或称非抢占式 (non-preemptive) 多任务,因此设计了协程和 event loop 概念,由 event loop 驱动多个协程去工作;每个协程能被调度到的前提是,之前的协程主动让出了 CPU 或者没有其他协程

因此 API 设计 上:

  • 线程需要:
    • 起新的线程,如 spinner.start()
    • 线程间通信,如 signal.go = False(虽然它并不是操作系统层面的通信,而是在 GIL 体系下的通信)
    • 等待线程结束,如 spinner.join()
    • 线程间并没有父子关系,所以没有父线程终止子线程之说,因此也没有 API 去做这个事情
  • asyncio 需要:
    • 把新的任务放进 event loop 中调度,如 asyncio.ensure_future();由于任务由 event loop 调度,因此你也不需要手动告诉 event loop 要执行这个任务(不需要 task.start();放进 event loop 即表示要执行
    • 主动让出 CPU 的机制,如 I/O 调用,如 asyncio.sleep(3)
    • 简单的同步机制,或称等待某一任务完成的能力,如 yield from,以及后面提到的 await
    • 告诉某个协程可以停止运行的机制,如 spinner.cancel()

线程(抢占式)与协程(非抢占式)两种模型的 区别

  • 在线程中,scheduler 可能在线程运行的任意时刻打断线程,因此你需要各种机制(如加锁)来保护关键数据,避免数据处于不完整的状态
  • 在协程中,仅有 yield / yield from 时,scheduler 才能拿到控制权,因此设计上避免了打断;即使你调用了 task.cancel() 函数,也仅在 task 中调用了 yield 时才会抛出异常,因此数据不会有中间状态

asyncio 的 coroutine 概念会比普通 coroutine 要严格一点:

A coroutine suitable for use with the asyncio API must use yield from and not yield in its body. Also, an asyncio coroutine should be driven by a caller invoking it through yield from or by passing the coroutine to one of the asyncio functions such as asyn cio.async(…) and others covered in this chapter. Finally, the @asyncio.coroutine decorator should be applied to coroutines.

为啥 asyncio coroutine 里面一定要使用 yield from 而不能是 yield

asyncio.Future: Nonblocking by Design

asyncio.ensure_future() 底下调用的是 BaseEventLoop.create_task(…),它以一个协程为参数,调度它去运行,同时返回一个 asyncio.Task。而这个 asyncio.Taskasyncio.Future 的子类。

asyncio.Futureconcurrent.futures.Future 不能互用,但是它们 API 设计上是很像的:

  • 加入调度的 API 类似,分别是 BaseEventLoop.create_task(…)Executor.submit(…)
  • 一样有 .done(), .add_done_callback(), .result() 方法

但是实际的惯常用法,并不太一样。asyncio.Future.result() 方法是 non-blocking 的,因为整个 asyncio 体系都是 non-blocking 的。如果这个 asyncio.Future 还没有完成,调用 .result() 会抛 asyncio.InvalidStateError 异常出来。

如果要等待 asyncio.Future 完成,使用 yield from,它即不堵塞 event loop,也可以返回内层的协程的返回值。这导致 .add_done_callback(), .done(), .result() 这些方法一般不被需要到。

Note
asyncio 体系中,yield from 被用来将控制权给回 event loop。

Yielding from Futures, Tasks, and Coroutines

在 asyncio 体系中,Future / Task 和 coroutine 是紧密联系的。比如 res = yield from foo()foo() 可以是一个 coroutine function,也可以是个返回 Future / Task 实例的普通函数。

看看 Future / Task__iter__ 函数实现,可能是它们可以被 yield from 的原因。

如果想在控制台上跑一小段 asyncio 代码,可以这样写:

>>> import asyncio
>>> def run_sync(coro_or_future):
...     loop = asyncio.get_event_loop()
...     return loop.run_until_complete(coro_or_future)
...
>>> a = run_sync(some_coroutine())

作者写书时用的 Python 是 3.4 版本。3.7 版本可以用 asyncio.run() 来做这件事情,更加简单。

Downloading with asyncio and aiohttp

这一段又给出一个运行 多协程 的惯常用法:

@asyncio.coroutine
def download_one(cc):   
    image = yield from get_flag(cc)   
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc
    
def download_many(cc_list):
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]   
    
    # 这一行是重点,返回一个协程,它的作用是 to_do 中的协程全部结束后它才结束
    wait_coro = asyncio.wait(to_do)
    
    res, _ = loop.run_until_complete(wait_coro)   
    loop.close()  

如果你用了一些框架,它来管理 event loop 的话,那你不应该调用 loop.close(),比如一些 GUI 框架。

作者总结了 yield from 的两大规则:

  • Every arrangement of coroutines chained with yield from must be ultimately driven by a caller that is not a coroutine, which invokes next(…) or .send(…) on the outermost delegating generator, explicitly or implicitly (e.g., in a for loop).
  • The innermost subgenerator in the chain must be a simple generator that uses just yield—or an iterable object.

比如上面例子中,

  • 协程们最终都是由 loop.run_until_complete(…) 来驱动;run_until_complete(…) 本身不是一个协程函数
  • 最内层的 subgenerator 只会用 yield / iterable;我猜它也可能是一段 C 的函数,一样实现 yield 的能力,有空深挖下 asyncio transports
  • 即使是 yield from aiohttp.request(…),最终估计也是 yield asyncio 库里面的某些函数

Running Circling Around Blocking Calls

这一段写得 非常好。很值得一看,同时内容的演进手法也很赞。

读一次 L1 cache 的时间如果是 3 秒,那么读一次网络设备数据的时间约为 7.6 年。因此同步、阻塞型程序,如果涉及到 I/O 操作,CPU 基本上就不在经常在等不在干活。解决这个问题一般是两个思路:

  • 每个阻塞操作都在单独的线程做;缺点是资源消耗很大
  • 每个阻塞操作都变成事件驱动式的异步调用

对于异步调用,很早前就有类似的机制,如硬件中断(但是这并不表示编程语言的异步调用机制就是构建在硬件中断上的)。Callback 和 event loop 的底层实现,可以利用操作系统的各种基础设施,如中断、线程、polling、后台进程等等。比如 Node.js 为了实现异步的文件操作 API,在底层用 libeio 实现了个线程池用来支持这个操作。因为当时(2014 年)的操作系统还没有提供可靠的、通用的异步文件操作 API。

目前 asyncio 库本身并不提供异步文件 API(参考这里)。aiofiles 提供了异步文件 API 接口,底层是用线程实现的。作者在本书里提到可以使用 loop.run_in_executor(None, …) 起个新的线程写,可能写书时 aiofiles 还没成为主流。

Enhancing the asyncio downloader Script

这里给 asyncio 版的下载国旗例子加上进度条和错误控制。代码没有太多特别。但是值得注意的是 asyncio.Semaphore 的使用,它支持 context manager,但是需要以 yield from 的形式来使用:

semaphore = asyncio.Semaphore(concur_req)
with (yield from semaphore):
    image = yield from get_flag(base_url, cc)

不过这种形式已经被 deprecated,目前推荐的方式是 async with lock,它跟 with (yield from semaphore) 功能相同,但是语法上更清晰。

另外 asyncio.as_completed 中返回的 Future 结构,跟传入的并不是同一个;这个行为与 concurrent.futures 中的不一致。所以内层 coro 抛异常时,考虑使用自定义的 Exception 类将相关信息一起带出来。

From Callbacks to Futures and Coroutines

这节为了说明 asyncio 的协程模型带来的好处,先举了 JavaScript 的 callback hell 作为对比。JS 的 callback 模型中,需要传两个 callback 函数,分别表示操作成功和失败后的 callback,整个逻辑非常难维护。协程非常好地解决了这个心智负担。

但是协程也带来了自己的问题。比如在这个体系中,你必须都使用协程函数,而不能使用普通函数,这意味着之前的大量历史代码不能在这套体系中使用。

Writing asyncio Servers

用 TCP 和 HTTP 写一个查询 Unicode 字符的程序。

An asyncio TCP Server

这节给了一个 TCP Server 的代码示例,用了 asyncio Streams API。需要写时再参考即可。Stream API 是高层封装,底层是 Transports and Protocols API,和 Twisted 的 API 很像。

An aiohttp Web Server

给了一个使用 aiohttp 写一个简单 HTTP Sever 的代码示例。

这两个 server 中的关键点是,将启动 server 的协程扔进 event loop 的过程变得隐晦起来:

# TCP 版
def main(address='127.0.0.1', port=2323):
    loop = asyncio.get_event_loop()
    server_coro = asyncio.start_server(handle_queries, address, port, loop=loop)  # 1
    server = loop.run_until_complete(server_coro)
    loop.run_forever()

# HTTP 版
@asyncio.coroutine
def init(loop, address, port):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', home)
    handler = app.make_handler()
    server = yield from loop.create_server(handler, address, port)  # 3
    return server.sockets[0].getsockname()


def main(address="127.0.0.1", port=8888):
    loop = asyncio.get_event_loop()
    host = loop.run_until_complete(init(loop, address, port))  # 2
    loop.run_forever()

这里面的代码看起来让人很模糊。比如 <1> 处的 asyncio.start_server() 返回的 coroutine object,为啥传给下一行的 loop.run_until_complete() 不会引起 server 开始进入 while 循环去服务客户端请求,而需要再调用一个 loop.run_forever()?为啥 <2> 处的调用可以马上返回 host,而不是在 <3> 处开始服务?

原因是,loop.create_server() 事实上是生成一个任务放进 loop 中准备被调度。而最终的 loop.run_forever 才让这个 server 真正服务起来。asyncio.start_server() 底层也是调用的 loop.create_server()

Futher Reading