Concurrency is about dealing with lots of things at once.Parallelism is about doing lots of things at once.— Rob Pike, Co-inventor of the Go language
Rob Pike 阐述了并发(concurrency)和并行(parallelism)的区别。简单的例子是,「并发」是让单核 CPU 能跑非常多进程的原因;「并行」是多核 CPU 能同行参与计算的原因。
In this chapter we’ll see:
- A comparison between a simple threaded program and the asyncio equivalent, showing the relationship between threads and asynchronous tasks
- How the
asyncio.Future
class differs fromconcurrent.futures.Future
- Asynchronous versions of the flag download examples from Chapter 17
- How asynchronous programming manages high concurrency in network applications, without using threads or processes
- How coroutines are a major improvement over callbacks for asynchronous programming
- How to avoid blocking the event loop by offloading blocking operations to a thread pool
- Writing
asyncio
servers, and how to rethink web applications for high concurrency - Why
asyncio
is poised to have a big impact in the Python ecosystem
Thread Versus Coroutine: A Comparison
通过对比同一个程序用线程写和用 asyncio
写的区别来引入 asyncio
。
线程版:
def slow_function():
# pretend waiting a long time for I/O
time.sleep(3)
return 42
def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,
args=('thinking!', signal))
print('spinner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result
asyncio
版:
@asyncio.coroutine
def slow_function():
# pretend waiting a long time for I/O
yield from asyncio.sleep(3)
return 42
@asyncio.coroutine
def supervisor():
# asyncio.async(…) schedules the spin coroutine to run, wrapping it in a Task
# object, which is returned immediately.
spinner = asyncio.ensure_future(spin('thinking!'))
print('spinner object:', spinner)
# 需等待 slow_function 中的 sleep 结束后才被 event loop 调度,继续执行
result = yield from slow_function()
spinner.cancel()
return result
这里的 核心 是:
- 线程作为一种抢占式多任务(preemptive multitasking)的模型,不同的线程间能否被运行是由 CPU 进行调度的
asyncio
使用了协作式 (cooperative) 或称非抢占式 (non-preemptive) 多任务,因此设计了协程和 event loop 概念,由 event loop 驱动多个协程去工作;每个协程能被调度到的前提是,之前的协程主动让出了 CPU 或者没有其他协程
因此 API 设计 上:
- 线程需要:
- 起新的线程,如
spinner.start()
- 线程间通信,如
signal.go = False
(虽然它并不是操作系统层面的通信,而是在 GIL 体系下的通信) - 等待线程结束,如
spinner.join()
- 线程间并没有父子关系,所以没有父线程终止子线程之说,因此也没有 API 去做这个事情
- 起新的线程,如
asyncio
需要:- 把新的任务放进 event loop 中调度,如
asyncio.ensure_future()
;由于任务由 event loop 调度,因此你也不需要手动告诉 event loop 要执行这个任务(不需要task.start()
;放进 event loop 即表示要执行 - 主动让出 CPU 的机制,如 I/O 调用,如
asyncio.sleep(3)
- 简单的同步机制,或称等待某一任务完成的能力,如
yield from
,以及后面提到的await
- 告诉某个协程可以停止运行的机制,如
spinner.cancel()
- 把新的任务放进 event loop 中调度,如
线程(抢占式)与协程(非抢占式)两种模型的 区别:
- 在线程中,scheduler 可能在线程运行的任意时刻打断线程,因此你需要各种机制(如加锁)来保护关键数据,避免数据处于不完整的状态
- 在协程中,仅有
yield
/yield from
时,scheduler 才能拿到控制权,因此设计上避免了打断;即使你调用了task.cancel()
函数,也仅在task
中调用了yield
时才会抛出异常,因此数据不会有中间状态
asyncio
的 coroutine 概念会比普通 coroutine 要严格一点:
A coroutine suitable for use with the asyncio API must useyield from
and notyield
in its body. Also, an asyncio coroutine should be driven by a caller invoking it through yield from or by passing the coroutine to one of the asyncio functions such as asyn cio.async(…) and others covered in this chapter. Finally, the @asyncio.coroutine decorator should be applied to coroutines.
asyncio.Future
: Nonblocking by Design
asyncio.ensure_future()
底下调用的是 BaseEventLoop.create_task(…)
,它以一个协程为参数,调度它去运行,同时返回一个 asyncio.Task
。而这个 asyncio.Task
是 asyncio.Future
的子类。
asyncio.Future
和 concurrent.futures.Future
不能互用,但是它们 API 设计上是很像的:
- 加入调度的 API 类似,分别是
BaseEventLoop.create_task(…)
和Executor.submit(…)
- 一样有
.done()
,.add_done_callback()
,.result()
方法
但是实际的惯常用法,并不太一样。asyncio.Future
的 .result()
方法是 non-blocking 的,因为整个 asyncio 体系都是 non-blocking 的。如果这个 asyncio.Future
还没有完成,调用 .result()
会抛 asyncio.InvalidStateError
异常出来。
如果要等待 asyncio.Future
完成,使用 yield from
,它即不堵塞 event loop,也可以返回内层的协程的返回值。这导致 .add_done_callback()
, .done()
, .result()
这些方法一般不被需要到。
- Note
- 在
asyncio
体系中,yield from
被用来将控制权给回 event loop。
Yielding from Futures, Tasks, and Coroutines
在 asyncio 体系中,Future
/ Task
和 coroutine 是紧密联系的。比如 res = yield from foo()
,foo()
可以是一个 coroutine function,也可以是个返回 Future
/ Task
实例的普通函数。
如果想在控制台上跑一小段 asyncio
代码,可以这样写:
>>> import asyncio
>>> def run_sync(coro_or_future):
... loop = asyncio.get_event_loop()
... return loop.run_until_complete(coro_or_future)
...
>>> a = run_sync(some_coroutine())
Downloading with asyncio and aiohttp
这一段又给出一个运行 多协程 的惯常用法:
@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
# 这一行是重点,返回一个协程,它的作用是 to_do 中的协程全部结束后它才结束
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
作者总结了 yield from
的两大规则:
- Every arrangement of coroutines chained with yield from must be ultimately driven by a caller that is not a coroutine, which invokes
next(…)
or.send(…)
on the outermost delegating generator, explicitly or implicitly (e.g., in a for loop). - The innermost subgenerator in the chain must be a simple generator that uses just yield—or an iterable object.
比如上面例子中,
- 协程们最终都是由
loop.run_until_complete(…)
来驱动;run_until_complete(…)
本身不是一个协程函数 - 最内层的 subgenerator 只会用
yield
/ iterable;我猜它也可能是一段 C 的函数,一样实现 yield 的能力,有空深挖下 asyncio transports - 即使是
yield from aiohttp.request(…)
,最终估计也是yield
asyncio 库里面的某些函数
Running Circling Around Blocking Calls
这一段写得 非常好。很值得一看,同时内容的演进手法也很赞。
读一次 L1 cache 的时间如果是 3 秒,那么读一次网络设备数据的时间约为 7.6 年。因此同步、阻塞型程序,如果涉及到 I/O 操作,CPU 基本上就不在经常在等不在干活。解决这个问题一般是两个思路:
- 每个阻塞操作都在单独的线程做;缺点是资源消耗很大
- 每个阻塞操作都变成事件驱动式的异步调用
对于异步调用,很早前就有类似的机制,如硬件中断(但是这并不表示编程语言的异步调用机制就是构建在硬件中断上的)。Callback 和 event loop 的底层实现,可以利用操作系统的各种基础设施,如中断、线程、polling、后台进程等等。比如 Node.js 为了实现异步的文件操作 API,在底层用 libeio
实现了个线程池用来支持这个操作。因为当时(2014 年)的操作系统还没有提供可靠的、通用的异步文件操作 API。
Enhancing the asyncio downloader Script
这里给 asyncio
版的下载国旗例子加上进度条和错误控制。代码没有太多特别。但是值得注意的是 asyncio.Semaphore
的使用,它支持 context manager,但是需要以 yield from
的形式来使用:
semaphore = asyncio.Semaphore(concur_req)
with (yield from semaphore):
image = yield from get_flag(base_url, cc)
不过这种形式已经被 deprecated,目前推荐的方式是 async with lock
,它跟 with (yield from semaphore)
功能相同,但是语法上更清晰。
另外 asyncio.as_completed
中返回的 Future
结构,跟传入的并不是同一个;这个行为与 concurrent.futures
中的不一致。所以内层 coro 抛异常时,考虑使用自定义的 Exception 类将相关信息一起带出来。
From Callbacks to Futures and Coroutines
这节为了说明 asyncio
的协程模型带来的好处,先举了 JavaScript 的 callback hell 作为对比。JS 的 callback 模型中,需要传两个 callback 函数,分别表示操作成功和失败后的 callback,整个逻辑非常难维护。协程非常好地解决了这个心智负担。
但是协程也带来了自己的问题。比如在这个体系中,你必须都使用协程函数,而不能使用普通函数,这意味着之前的大量历史代码不能在这套体系中使用。
Writing asyncio Servers
用 TCP 和 HTTP 写一个查询 Unicode 字符的程序。
An asyncio TCP Server
这节给了一个 TCP Server 的代码示例,用了 asyncio Streams API。需要写时再参考即可。Stream API 是高层封装,底层是 Transports and Protocols API,和 Twisted 的 API 很像。
An aiohttp Web Server
给了一个使用 aiohttp
写一个简单 HTTP Sever 的代码示例。
这两个 server 中的关键点是,将启动 server 的协程扔进 event loop 的过程变得隐晦起来:
# TCP 版
def main(address='127.0.0.1', port=2323):
loop = asyncio.get_event_loop()
server_coro = asyncio.start_server(handle_queries, address, port, loop=loop) # 1
server = loop.run_until_complete(server_coro)
loop.run_forever()
# HTTP 版
@asyncio.coroutine
def init(loop, address, port):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', home)
handler = app.make_handler()
server = yield from loop.create_server(handler, address, port) # 3
return server.sockets[0].getsockname()
def main(address="127.0.0.1", port=8888):
loop = asyncio.get_event_loop()
host = loop.run_until_complete(init(loop, address, port)) # 2
loop.run_forever()
这里面的代码看起来让人很模糊。比如 <1> 处的 asyncio.start_server()
返回的 coroutine object,为啥传给下一行的 loop.run_until_complete()
不会引起 server 开始进入 while
循环去服务客户端请求,而需要再调用一个 loop.run_forever()
?为啥 <2> 处的调用可以马上返回 host
,而不是在 <3> 处开始服务?
原因是,loop.create_server()
事实上是生成一个任务放进 loop 中准备被调度。而最终的 loop.run_forever
才让这个 server 真正服务起来。asyncio.start_server()
底层也是调用的 loop.create_server()
。
Futher Reading
- Guido’s “Deconstructing Deferred” 说明了 asyncio 的 Future 设计上为啥跟 Twisted Defered 不同;asyncio 在很多设计上参考了 Twisted