这一章主要讲 features
模块是如何把并发抽象成一个简单易用的模型。这里的内容对接下来的 asyncio
模块有帮助。
重点是:
- 关注
Future
这种模型的设计思想、使用方法 - 总结出常见场景的实用代码片断,便于快速使用
Example: Web Downloads in Three Styles
演示了两个下载多图片的程序,一个是顺序下载,一个是用 futures
开线程池下载。线程池的核心代码如下:
def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor: #1
res = executor.map(download_one, sorted(cc_list)) #2
return len(list(res)) #3
这里面有几点值得注意:
#1
处的with
块会在执行结束时调用executor.__exit__
方法,这里面会调用executor.shutdown(wait=True)
来等待各线程执行完成#2
处返回了一个生成器,用来取所有的执行结果#3
中list(res)
会遍历res
这个生成器,这个过程实际上是调用每个Future
的.result()
方法(下文描述)。如果有哪个任务在执行过程中抛出异常,它在被next()
到时也会将异常抛出
在上面的例子我们并不直接操作 future 对象。在标准库中有两个 Future 类:concurrent.futures.Future
和 asyncio.Future
。一个 Future 实例表示的是一个可能完成也可能没有完成的,被延迟的计算过程。原文是:
an instance of either Future class represents a deferred computation that may or may not have completed.
这个概念与 Twisted 中的 Deferred
,Tornado 中的 Future
,以及 JavaScript 中的 Promise
类似。
Future 的作用是封装了对一个延迟(对比于实时 blocking)计算的后续操作,使得你可以把它放进一个队列里,再去队列中查询它的状态(成功或失败)和计算结果等。一般 Future
对象应该由并发框架(如 asyncio
, concurrent.futures
)去生成,而不由用户手工去生成。因为 Future
对象代表的计算过程如果不被执行,就失去了它的意义。而计划执行 Future 对象的工作往往由并发框架去做。
Future
对象的基本操作:
.done()
: 查询是否结束.add_done_callback()
: 要求结束时执行相应 callback.result()
: 查询执行完返回的结果,或者其中抛出的异常(reraise);在 Future 执行过程中调用此函数时会 block,如果执行完则不会
上面的代码例子中,Future
对象仅在 executor.map()
内部使用,暴露给用户的是它的结果。
def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = [] #1
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do): #2
res = future.result() #3
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)
return len(results)
这段代码演示了对 Future
对象的直接访问,值得关注的点:
#1
中的to_do
list 存储了一组Future
实例#2
futures.as_completed()
接受一个包含Future
实例的 iterable 实例,并返回一个 generator。一旦有Future
对象执行结束,这个 generator 就会将其 yield 出#3
future.result()
会取出结果。并且由于这个 future 是执行结束的,.result()
操作不会 block
Blocking I/O and the GIL
存在 GIL (Global Interpreter Lock) 的原因是,CPython 解释器本身不是线程安全的,所以同时只能有一个线程在执行 Python 字节码。但是 C 写的 Python 扩展库可以手动释放 GIL。同时标准库中涉及到 I/O 操作的函数、sleep()
函数都会释放 GIL。这是为何 Python 的多线程在做 I/O 操作(比如上面例子中的下载图片)时可以起到加速作用的原因。但是对于 CPU 密集型的操作,Python 的多线程就没什么作用,需要用多进程。
Launching Processes with concurrent.futures
对多 CPU 密集型操作,futures
库提供了 ProcessPoolExecutor
。用法跟 ThreadPoolExecutor
类型,区别在于可以不手动指定 worker 数量,默认是 CPU 的核数。
Experimenting with Executor.map
这里面的例子又展示出 executor.map
的另一种惯常写法:
def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5))
# 遍历 result 会调用其中每一个 Future 的 `.result()`,如果任务还没完成会 block
for i, result in enumerate(results):
display('result {}: {}'.format(i, result))
遍历 result 返回的结果,会保证跟你传给 executor.map()
的任务顺序是一样的。这意味着如果第一个任务很久才完成,那么 for
循环会在第一个任务处等很久。如果你不要求顺序,可以用上一个例子的 futures.as_completed
。
Downloads with Progress Display and Error Handling
进度条显示,可以用 tqdm 库。异常处理,在调用 future.result()
时包裹 try-catch。另外有个惯常用法时,在 executor.submit()
后把生成的 Future
作为 key 放进 dict
(如下文的 to_do_map
)中,这样 Future
乱序完成后,你还可以获得这个任务相关联的信息。示例代码如下,做了简化:
def download_many(cc_list, base_url, verbose, concur_req):
with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
# 用来存储 Future => country code,方便 Future 完成后查询
to_do_map = {}
for cc in sorted(cc_list):
future = executor.submit(download_one, cc, base_url, verbose)
to_do_map[future] = cc
done_iter = futures.as_completed(to_do_map)
for future in done_iter:
try:
res = future.result()
except requests.exceptions.HTTPError as exc:
error_msg = 'HTTP {res.status_code} - {res.reason}'
if error_msg:
cc = to_do_map[future]
print('*** Error for {}: {}'.format(cc, error_msg))
Threading and Multiprocessing Alternatives
如果 concurrent.future
的方法对你来说缺乏灵活性,那你可以用 threading
模块搭载 Thread
, Lock
, Semaphore
来做你的解决方案。可以用线程安全的 queue
模块来传递数据。CPU 密集型程序,可以用多进程的 multiprocessing
来解决,它提供了一些在进程中传递数据的设施。
Further Reading
给了很多相关材料:
- 还有什么其他并发模型?
- 怎样理解 GIL?
- 每样更方便使用多进程和多线程?