FluentPythonCh17

15th February 2019 at 9:55am
Fluent Python

这一章主要讲 features 模块是如何把并发抽象成一个简单易用的模型。这里的内容对接下来的 asyncio 模块有帮助。

重点是:

  • 关注 Future 这种模型的设计思想、使用方法
  • 总结出常见场景的实用代码片断,便于快速使用

Example: Web Downloads in Three Styles

演示了两个下载多图片的程序,一个是顺序下载,一个是用 futures 开线程池下载。线程池的核心代码如下:

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:   #1
        res = executor.map(download_one, sorted(cc_list))   #2
    return len(list(res))   #3

这里面有几点值得注意:

  • #1 处的 with 块会在执行结束时调用 executor.__exit__ 方法,这里面会调用 executor.shutdown(wait=True) 来等待各线程执行完成
  • #2 处返回了一个生成器,用来取所有的执行结果
  • #3list(res) 会遍历 res 这个生成器,这个过程实际上是调用每个 Future.result() 方法(下文描述)。如果有哪个任务在执行过程中抛出异常,它在被 next() 到时也会将异常抛出

在上面的例子我们并不直接操作 future 对象。在标准库中有两个 Future 类:concurrent.futures.Futureasyncio.Future。一个 Future 实例表示的是一个可能完成也可能没有完成的,被延迟的计算过程。原文是:

an instance of either Future class represents a deferred computation that may or may not have completed.

这个概念与 Twisted 中的 Deferred,Tornado 中的 Future,以及 JavaScript 中的 Promise 类似。

Future 的作用是封装了对一个延迟(对比于实时 blocking)计算的后续操作,使得你可以把它放进一个队列里,再去队列中查询它的状态(成功或失败)和计算结果等。一般 Future 对象应该由并发框架(如 asyncio, concurrent.futures)去生成,而不由用户手工去生成。因为 Future 对象代表的计算过程如果不被执行,就失去了它的意义。而计划执行 Future 对象的工作往往由并发框架去做。

Future 对象的基本操作:

  • .done(): 查询是否结束
  • .add_done_callback(): 要求结束时执行相应 callback
  • .result(): 查询执行完返回的结果,或者其中抛出的异常(reraise);在 Future 执行过程中调用此函数时会 block,如果执行完则不会

上面的代码例子中,Future 对象仅在 executor.map() 内部使用,暴露给用户的是它的结果。

def download_many(cc_list):
    cc_list = cc_list[:5]   
    with futures.ThreadPoolExecutor(max_workers=3) as executor:   
        to_do = []  #1
        for cc in sorted(cc_list):   
            future = executor.submit(download_one, cc)   
            to_do.append(future)   
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))   
            
        results = []
        for future in futures.as_completed(to_do):  #2
            res = future.result()  #3
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    return len(results)

这段代码演示了对 Future 对象的直接访问,值得关注的点:

  • #1 中的 to_do list 存储了一组 Future 实例
  • #2 futures.as_completed() 接受一个包含 Future 实例的 iterable 实例,并返回一个 generator。一旦有 Future 对象执行结束,这个 generator 就会将其 yield 出
  • #3 future.result() 会取出结果。并且由于这个 future 是执行结束的,.result() 操作不会 block

Blocking I/O and the GIL

存在 GIL (Global Interpreter Lock) 的原因是,CPython 解释器本身不是线程安全的,所以同时只能有一个线程在执行 Python 字节码。但是 C 写的 Python 扩展库可以手动释放 GIL。同时标准库中涉及到 I/O 操作的函数、sleep() 函数都会释放 GIL。这是为何 Python 的多线程在做 I/O 操作(比如上面例子中的下载图片)时可以起到加速作用的原因。但是对于 CPU 密集型的操作,Python 的多线程就没什么作用,需要用多进程。

Launching Processes with concurrent.futures

对多 CPU 密集型操作,futures 库提供了 ProcessPoolExecutor。用法跟 ThreadPoolExecutor 类型,区别在于可以不手动指定 worker 数量,默认是 CPU 的核数。

Experimenting with Executor.map

这里面的例子又展示出 executor.map 的另一种惯常写法:

def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)   
    results = executor.map(loiter, range(5))   
    
    # 遍历 result 会调用其中每一个 Future 的 `.result()`,如果任务还没完成会 block
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))

遍历 result 返回的结果,会保证跟你传给 executor.map() 的任务顺序是一样的。这意味着如果第一个任务很久才完成,那么 for 循环会在第一个任务处等很久。如果你不要求顺序,可以用上一个例子的 futures.as_completed

Downloads with Progress Display and Error Handling

进度条显示,可以用 tqdm 库。异常处理,在调用 future.result() 时包裹 try-catch。另外有个惯常用法时,在 executor.submit() 后把生成的 Future 作为 key 放进 dict(如下文的 to_do_map)中,这样 Future 乱序完成后,你还可以获得这个任务相关联的信息。示例代码如下,做了简化:

def download_many(cc_list, base_url, verbose, concur_req):
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:   
        # 用来存储 Future => country code,方便 Future 完成后查询
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)
            to_do_map[future] = cc
            
        done_iter = futures.as_completed(to_do_map)   
        for future in done_iter:   
            try:
                res = future.result()   
            except requests.exceptions.HTTPError as exc:   
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                
            if error_msg:
                cc = to_do_map[future]   
                print('*** Error for {}: {}'.format(cc, error_msg))

Threading and Multiprocessing Alternatives

如果 concurrent.future 的方法对你来说缺乏灵活性,那你可以用 threading 模块搭载 Thread, Lock, Semaphore 来做你的解决方案。可以用线程安全的 queue 模块来传递数据。CPU 密集型程序,可以用多进程的 multiprocessing 来解决,它提供了一些在进程中传递数据的设施。

Further Reading

给了很多相关材料:

  • 还有什么其他并发模型?
  • 怎样理解 GIL?
  • 每样更方便使用多进程和多线程?