原文由 Jim Anderson 发表于 Jan 14, 2019 。原文链接: Speed Up Your Python Program With Concurrency

如果你听说 asyncio 加入 Python ,但是好奇它与其他的并发方法有什么不同。或者想知道什么是并发,并发如何加速你的程序,那么你来对了。

在本文中,你将学到:

  • 什么是 并发
  • 什么是 并行
  • 几种 Python并发方法 的比较,包括 threading, asyncio 和 multiprocessing
  • 在程序中 什么时候使用并发 ,以及应该使用哪个模块

本文假设你有 Python 基础,并且使用 3.6 及以上版本来运行示例。你可以在 Real Python Github 仓库 下载示例程序。

什么是并发?

并发在字典中的定义是同时发生。而在 Python 中,同时发生的事情有几种不同的形式(线程,任务,进程)。但是在更高层面上看,它们都指的是按顺序运行的一系列指令。

我喜欢把它们看作是不同的 trains of thought 。每一个可以在某些点停止, 这样正在处理它们的 CPU 或大脑可以切换到另一个。每一个的状态被保存起来,这样就可以正好在被打断的地方重新启动。

你或许奇怪为什么 Python 对同一个的概念使用不同的名字。那是因为线程,任务和进程,只是在高级层面上看起来一样。当你深入细节,它们都代表不同的东西。通过查看示例,你将看到更多的不同之处。

现在,我们来讨论这个定义相同的部分。你必须小心,因为当你深入细节,实际上只有 multiprocessing 是真正的同时运行 trains of thought 。 threading 和 asyncio 都只运行在一个处理器上,因此每次只运行一个。它们只是巧妙地找到替换的方法,来加快整个过程。尽管没有同时运行不同的 trains of throught ,我们仍然称之为并发。

线程或任务替换的方式是 threading 和 asyncio 最大的区别。对于 threading ,操作系统知道每个线程,并且可以在任意时间中断它然后运行其他的线程。这被称为 抢占式多任务 ,因为操作系统可以抢占线程来完成切换。

抢占式多任务是方便的,因为线程中的代码无需做任何事来完成切换。它也是困难的,因为可以切换“在任何时间”。这种切换可能发生在一条 Python 语句之中,即使是最简单的 x = x +1

另一方面, asyncio ,使用 协作式多任务 。任务必须声称它们准备好被换出,来完成协作。这意味着,任务中的代码必须做出改变来完成切换。

这些额外工作带来的好处是你总是知道任务在哪里被换出。不会出现 Python 与据中被换出,除非语句被标记。稍后你会看到这将如何简化你的设计部分。

什么是并行?

到目前为止,你已经看到了发生在一个处理器上的的并发。那么你的新笔记本电脑的有的所有 CPU 内核呢?如何使用它们?答案就是 multiprocessing 。

通过 multiprocessing , Python 可以创建新进程。一个进程可以当作是一个几乎完全不同的程序,技术上说,进程被定义为包含一组资源,这些资源包括内存,文件句柄等。思考它的一种方式是,每个进程运行自己的 Python 解释器。

因为它们是不同的进程,每个在多进程程序中的 trains of thought 可以运行在不同的内核上。运行在不同的内核上意味着实际上可以同时运行,真是太棒了。这样做会产生一些困难,不过在大多数情况下, Python 很好地解决了它们。

现在你理解了什么是并发和并行,让我们回顾一下它们的不同,然后我们看看它们为什么有用:

并发类型 切换决策 处理器个数
抢占式多任务(threading) 操作系统在 Python 外部决定什么时候切换任务 1
协作式多任务(asyncio) 任务决定什么时候放弃控制 1
多进程(multiprocessing) 进程同时运行在不同的处理器上 多个

每个并发类型都是有用的。我们来看看它们可以帮助你提速什么类型的程序。

什么时候使用并发?

并发对两类问题有重大影响。它们是 CPU 密集型和 I/O 密集型。

I/O 密集型问题导致你的程序变慢,是因为经常必须等待外部资源的输入输出(I/O)。它们经常在你的程序与比 CPU 慢的多的东西交互时发生。

比 CPU 慢的东西有很多,幸运的是,程序不需要与其中的大多数交互。程序交互的慢设备,常见的是文件系统和网络连接。

让我们看看这是怎样的:

在上面的示意图中,蓝盒子表示程序工作的时间,红盒子表示花费在等待 I/O 完成的时间。图中并不是等比例的,因为互联网上的请求比 CPU 指令慢了几个数量级,所以你的程序最终花费了大多数时间等待。这也是你的浏览器多数时间所做的。

反过来说,有一类程序做重要计算而不访问网络或访问文件。它们是 CPU 密集型程序,因为限制这些程序速度的资源是 CPU ,而不是网络或文件系统。

这是 CPU 密集型程序对应的示意图:

通过学习后面的示例,你将看到 CPU 密集型或 I/O 密集型程序使用不同形式并发的优点或缺点。在程序中使用并发,增加了额外的代码和复杂度,所以你需要决定可能的提速是否值得额外的工作。读完本文,你将有足够的信息来做这个决定。

下面是说明这个概念的快速总结:

I/O 密集型 CPU 密集型
程序花费大量的时间与慢设备交互,比如网络连接,硬盘,或打印机 程序消耗时间做 CPU 操作
加速应该重叠花费在等待设备上的时间 加速应该寻找在相同时间里做更多计算的方法

你将先看到 I/O 密集型程序。然后,看到一些 CPU 密集型的程序。

如何加速 I/O 密集型程序

让我们聚焦于 I/O 密集型程序和一个普遍问题:通过网络下载内容。我们的示例中,你将从一些网站下载网页,但其实可以是任何网络通信。使用网页,只是为了简化观察和设置。

同步版本

我们首先从一个非并发版本开始。注意这个程序需要 requests 模块。在运行它之前,你应该首先运行 pip install requests ,或许使用 virtualenv 。这个版本完全没有使用并发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests
import time


def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.itme()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Download {len(sites)} in {duration} seconds")

如你所见,这是一个很短的程序。 download_site() 函数仅仅从 URL 下载内容然后输出内容大小。有一点需要指出,我们使用了 requests 中的 Session 对象。

也可以简单地直接使用 requests 中的 get() 方法,但是创建一个 Session 对象允许 requests 做一些花哨的网络技巧并且真正提速。

download_all_sites() 创建 Session 然后遍历网站列表,并按顺序下载。最后,输出这个程序使用的时间,所有你可以清楚地看到在后面的示例中,并发帮了我们多少。

这个程序的执行示意图很像上一节中的 I/O 密集型示意图。

为什么同步版本很棒

这个版本代码最大的好处就是简单。编写和调试都相对简单。思路也很直接。只有一个 train of thought 在其中运行,所以你可以预测下一步和它的行为。

同步版本的问题

这里最大的问题是,它比后面提供的其他解决方案相对较慢。在我电脑的上,最后输出的示例是:

1
2
3
$ ./io_non_concurrent.py
[跳过大多数输出]
Downloaded 160 in 14.289619207382202 seconds

不过,慢并不总是一个大问题。如果你运行同步版本的程序只用了 2 秒,而且很少运行,就很可能不需要使用并发。你可以到此为止。

如果你的程序经常运行呢?如果它需要数小时来运行?让我们转到并发,使用 threading 来重写这个程序。

threading 版本

你可能猜到了,编写一个线程程序需要做更多工作。不过,简单情况下只需要很少的额外工作。这是使用 threading 的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
if not getattr(thread_local, "session", None):
thread_local.session = requests.Session()
return thread_local.session


def download_site(url):
session = get_session()
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_worker=5) as executor:
executor.map(download_site, sites)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")

引入了 threading ,不过代码的整体结构是一样的,你只需要做小部分修改。 download_all_sites() 从每个网址调用一次函数变成更复杂的结构。

这个版本中,你创建了一个看起来复杂的 ThreadPoolExecutor 。让我们分解一下: ThreadPoolExecutor = Thread(线程) + Pool(池) + Executor(执行者)

你已经知道 Thread 。它是前面提到过的一种 train of thought 。从 Pool 开始变得有趣。这个对象创建一个线程池,其中每一个都可以并发地运行。最后, Executor 管理池中的线程如何运行和何时运行。它将在池中处理请求。

幸运的是,标准库将 ThreadPoolExecutor 实现为一个上下文管理器,这样你可以使用 with 语法来管理创建或释放线程池。

有了 ThreadPoolExecutor ,你就可以使用方便的 .map() 方法。这个方法对列表中的每个网址运行传入的函数。最重要的是,它使用所管理的线程池自动并发地运行那些函数。

如果你使用过其他语言,甚至是 Python 2,也许会奇怪,以前处理 threading 时,那些管理细节的常见对象和函数去哪了,比如 Thread.start()Thread.join()Queue

它们仍然在,你也可以用它们实现对运行线程的细粒度控制。但是,从 Python 3.2 开始,标准库添加了高级抽象层 Executors ,你不需要细粒度控制的时候,它可以帮你管理很多细节。

示例中另一个有趣的变化是,每个线程创建了自己的 requests.Session() 对象。当你查看 requests 的文档时,这并不容易理解,但是查看这个 issue ,很明显你需要给每个线程分别创建 Session

这是使用 threading 时有趣且困难的问题之一。因为操作系统控制何时切换任务,所以任何线程共享的数据都需要保护,或者称为线程安全。不幸的是, requests.Session() 不是线程安全的。

数据访问线程安全有多种策略,取决于是什么数据和你如何使用它。其中之一是使用线程安全的数据结构,比如 Python queue 模块中的 Queue

这些对象使用像 threading.Lock 这样的低级原语,确保同一时间只有一个线程可以访问一段代码或一块内存。使用 ThreadPoolExecutor 对象,就间接地使用了这种策略。

另一种策略是使用线程本地存储。 Threading.local() 创建一个看似全局实际是每个线程所特有的对象。在示例中,由 threadLocalget_session() 完成:

1
2
3
4
5
6
7
threadLocal = threading.local()


def get_session():
if getattr(threadLocal, "session", None) is None:
threadLocal.session = requests.Session()
return threadLocal.session

threading 模块中的 ThreadLocal 专门处理这个问题。它看起来有点奇怪,但是你只想创建这些对象中的一个,而不是一个给每个线程。这个对象处理不同线程对不同数据的隔离访问。

调用 get_session() 时, session 指定给运行它的线程。所以每个线程第一次运行 get_session() 时创建 session,然后在以后的生命周期中,每次调用都使用这个 session 。

最后,一个关于线程数选择的提醒。前面示例代码中使用了 5 个线程。你可以随便修改这个数字,然后观察整个时间的变化。你可能觉得每个下载一个线程是最快的情况,但是,至少在我的系统中不是这样的。我发现最快的情况是使用 5~10 个线程中的某一个。如果你使用了比这个更多的线程,那么创建或销毁线程的额外开销将抹去节省的时间。

困难的是,对于不同的任务,合适的线程数并不是固定的。你需要做一些试验。

为什么 threading 版本很棒

它很快!这是我测试中最快的一次运行。记得非并发版本花费了超过 14 秒:

1
2
3
$ ./io_threading.py
[跳过大部分输出]
Downloaded 160 in 3.7238826751708984 seconds

本示例的执行时间示意图:

使用多线程同时对网站发出多个请求,允许程序重叠等待时间,更快获取最终结果!开心!这正是我们的目标。

threading 版本的问题

好吧,如你所见,线程版本使用了更多的代码,而且你必须考虑哪些数据是线程共享的。

线程之间使用微秒且很难查明的方式交互。这些交互会引起 竞态条件 ,经常导致随机的、间歇的而且很难查找的 bug 。

asyncio 版本

在深入 asyncio 示例代码之前,让我们先看看 asyncio 是怎样工作的。

asyncio 基础
这是简单版的 asyncio 。忽略了很多细节,但是仍然传达了它的工作理念。

asyncio 的一般概念是,一个叫事件循环的 Python 对象控制每个任务怎样和何时运行。事件循环观察每个任务,知道它处于什么状态。实际上,任务的状态有很多种,但是现在让我们想象一个只有两种状态的简单版事件循环。

准备好状态表示一个任务有工作要做且已经准备好,等待状态表示一个任务在等待外部事件完成,比如网络操作。

简单版事件循环管理两个任务列表,每个列表与它们的状态对应。它首先选出一个准备好的任务,启动并运行。这个任务完全控制,直到它协作地将控制交还给事件循环。

当运行的任务将控制交还给事件循环后,事件循环把这个任务放入准备好或等待列表,然后遍历等待列表中的每个任务,查看它是否有完成 I/O 操作已经准备好。事件循环知道在准备好列表中的任务仍然是准备好的,因为它们还没有运行。

一旦所有任务重新分配在正确的列表中,事件循环就选取下一个任务来执行,然后重复这个过程。简单版事件循环选取等待时间最长的任务来执行。重复这个过程直到事件循环结束。

asyncio 的一个关键点是除非任务有意所为,否则永远不会放弃控制。它们不会在操作中中断。这使我们在 asyncio 中共享资源比在线程中更简单。你不必担心使代码线程安全。

这是 asyncio 发生什么的高级描述。如果你想了解更多, 这个 StackOverflow 答案 提供了一些很好的更深入的细节。

async 和 await

现在我们讨论两个新加入 Python 的关键字: asyncawait 。跟据上面的讨论,你可以把 await 当作允许任务把控制交还给事件循环的魔法。当你的代码等待一个函数调用,这表示调用需要花费一些时间,所以任务应该放弃控制。

最简单的是把 async 看成 Python 的一个标志,由它定义的函数可以使用 await 。有些情况下,这并不准确,比如异步生成器,但是这适用于很多情况,并且在开始时给你一个简单模型。

async with 声明是一个特例,它从你通常等待的对象创建一个上下文管理器。虽然有点奇怪,想法却是相同的:把这个上下文管理器标志为可以换出。

你一定能想到,在管理事件循环和任务的交互时比较复杂。对于刚开始接触 asyncio 的开发者,这些细节并不重要,但是需要记住任何函数调用 await 必须使用 async 标记。否则,就会有语法错误。

回到代码

现在你对 asyncio 有了基本的了解,让我们通过 asyncio 版本的示例代码,理解它是如何工作的。注意这个版本引入了 aiohttp 。你应该先运行 pip install aiohttp 来安装它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time
import aiohttp


async def download_site(session, url):
async with session.get(url) as response:
print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
duration = time.time() - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")

这个版本比前两个更复杂。它有类似的结构,但是比起创建 ThreadPoolExecutor 它需要做更多工作来设置任务。让我们从示例顶端开始。

download_site()

download_site() 几乎与线程版本相同,不同的是在函数定义时使用的 async 关键字,和调用 session.get() 时的 async with 关键字。后面你将看到为什么这里 Session 可以传送,而不是使用线程安全存储。

download_all_sites()

download_all_sites() 你将看到与 threading 示例最大的不同。

你可以在所有任务中分享 session,所以 session 创建作为一个上下文管理器。任务可以共享 session ,因为它们都运行在同一个线程上。没有一个任务去中断另一个,导致 session 被破坏的情况。

在上下文管理器中,它使用 asyncio.ensure_future() 创建一个任务列表,并且负责启动它们。一旦所有任务被创建,这个函数使用 asyncio.gather() 保持 session 上下文存活,直到所有任务完成。

threading 代码与此相似,但是在 ThreadPoolExecutor 中细节更容易操作。现在不是 AsyncioPoolExecutor 类。

不过,细节里隐藏了一个很小却很重要的改变。记得我们讨论需要创建的线程数?在 threading 示例中,并不能得出明显合适的数字。

asyncio 的优势之一是它比 threading 好得多。比起线程,每个任务使用更少的资源,更短的创建时间,所以创建并运行更多的 asyncio 也可以很好地工作。这个示例为每个网址创建了任务,并且工作地很好。

main

最后, asyncio 要求你启动事件循环,并且告诉它运行哪个任务。代码末尾的 main 使用 get_event_loop()run_until_complete() 。如无意外,它们完成杰出的工作像它们的名字一样。

如果你升级到 Python 3.7 , Python 核心开发者简化了这个语法。不再使用 asyncio.get_event_loop().run_until_complete() 这种方式,可以使用 asyncio.run()

为什么 asyncio 版本很棒

它真的很快!在我电脑的测试中,这是最快的版本,效率很高:

1
2
3
$ ./io_asyncio.py
[跳过大多数输出]
Downloaded 160 in 2.5727896690368652 seconds

它的执行时间示意图跟 threading 示例很像。不同的是 I/O 请求都是由同一个线程完成的:

缺少像 ThreadPoolExecutor 一样的包裹,让这个代码比 threading 示例更复杂。这是一种你必须做更多的工作来获取更好性能的情况。

另外,必须在合适的位置使用 async 和 await 也引起普遍争议。
在很小程度上,这是真的。反过来说,它促使你思考一个任务何时换出,可以帮助你做更好,更快的设计。

数量问题也很突出。运行 threading 示例时,每个网址一个线程明显要慢于使用少量线程。而运行 asyncio 示例时,使用成百个任务则一点没有变慢。

asyncio 版本的问题

使用 asyncio 有几个问题。首先,你必须使用 async 版本的库来获取 asyncio 的所有优势。如果你使用了 requests 来下载网站,将会更慢,这是因为 requests 没有设计成阻塞时通知事件循环。随着时间推移,这个问题会慢慢解决,更多的库正在适应 asyncio 。

另一个更严重的问题是,如果其中一个任务不协作,协作式多任务的所有优势都会消失。代码中一个很小的错误,都会引起任务终止并占用处理器很长时间,使其他需要运行的任务不能运行。如果一个任务没有将控制交还给它,时间循环没有任何办法介入。

考虑到这一点,让我们走进一种截然不同的并发, multiprocessing 。

multiprocessing 版本

不像前面的方法, multiprocessing 版本代码充分利用了电脑的多 CPU 。让我们从代码开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import requests
import multiprocessin
import time

session = None


def set_global_session():
global session
if not session:
session = requests.Session()


def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process().name
print(f"{name}: Read {len(response.content)} from {url}")


def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)


if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Download {len(sites)} in {duration} seconds")

这比 asyncio 示例短得多,实际上看起来很像 threading 版本,在我们深入代码之前,先看看 multiprocessing 做了什么。

multiprocessing 简单介绍

在此之前,文中所有并发示例都只使用电脑的一个 CPU 或核。原因是现在 CPython 的设计,和一个叫做全局解释锁的东西,或者 GIL 。

本文并不深入讨论 GIL 。现在,只要知道同步,线程和 asyncio 版本的示例都运行在单 CPU 就足够了。

标准库中的 multiprocessing 设计为打破界限,使代码运行在多 CPU 上。在高级层面上,它在每个 CPU 上创建一个新的 Python 解释器实例,然后移交部分程序在上面运行。

可以想象,启动一个不同的 Python 解释器不会像在 Python 解释器上启动一个进程那么快。这是一个重量级操作,同时也有一些限制和困难,但是对于适合的问题,它可以带来很大的不同。

multiprocessing 代码
相对于同步版本,只有很少的改变。第一个是 download_all_sites() 。不同与简单地重复调用 download_site() ,它创建一个 multiprocessing.Pool 对象,并且将 download_site 和可迭代的网站对应起来。这看起来很像线程示例。

Pool 创建了几个不同的 Python 解释器进程,每个解释器对可迭代的项目运行特定的函数,对于我们的情况就是网站列表。主进程和其他进程之间的通信由 multiprocessing 模块来负责。

需要注意创建 Pool 的代码。首先,它没有指定在池中创建多少个进程,尽管这是可选参数。默认情况下, multiprocessing.Pool() 将检查电脑中 CPU 个数并与之相同。这通常是最好的选择,也适用于我们的情况。

对于这个问题,增加进程数并不能变得更快。实际上,它会变得更慢,因为花费在设置和关闭进程的时间多于并行 I/O 请求节省的时间。

接下来,分析 initializer=set_global_session 。记得池中的每个进程都有自己的内存空间。这意味着它们不能共享 Session 对象这样的东西。你并不想每次调用函数时创建一个新 Session ,你想为每个进程创建一个。

initializer 函数的参数用来处理这种情况。在 download_site() 进程中,没有可以从 initializer 传递返回值给调用函数的方法,但是你可以初始化一个全局 session 变量来维持每个进程一个 session 。因为每个进程都有自己的内存空间,每个的全局变量都是不同的。

这就是全部了。代码的其他部分跟之前差不多。

为什么 multiprocessing 版本很棒

示例中的 multiprocessing 版本很好,因为设置相对简单,而且只需要很少的额外代码。同时充分利用了 CPU 的能力。代码的执行时间示意图:

multiprocessing 版本的问题

这个版本的代码需要一些额外的设置,并且全局 session 对象很奇怪。你必须花时间考虑哪个变量会在每个进程中访问。

最后,在这个示例中,它明显慢于 asyncio 和 threading 版本。

1
2
3
$ ./io_mp.py
[跳过大多数输出]
Downloaded 160 in 5.718175172805786 seconds

这并不奇怪, multiprocessing 的存在不并使为了解决 I/O 密集型问题。在下一节,你将看到更多,并将查看 CPU 密集型示例。

怎样加速 CPU 密集型程序

之前的示例都处理 I/O 密集型问题。现在,你将深入 CPU 密集型问题。 I/O 密集型问题花费多数时间等待外部操作,比如网络调用。 CPU 密集型问题,却很少做 I/O 操作,它的全部执行时间是多快处理请求数据的一个因素。

为了举例,我们使用一个笨函数来创建在 CPU 上运行很长时间。这个函数计算从0到传入值每个数字的平方和:

1
2
def cpu_bound(number):
return sum(i * i for i in range(number))

你将传入很大的数字,所以会花费一些时间。记住,这只是代码中的一个占位函数,实际上应该做一些有用的且消耗时间的事,比如计算方程式的根或者给大数据结构排序。

CPU 密集型同步版本

现在,我们看看这个示例的非并发版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time


def cpu_bound(number):
return sum(i * i for i in range(number))


def find_sums(numbers):
for number in numbers:
cpu_bound(number)


if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds")

这段代码调用 cpu_bound() 20 次,每次使用一个不同的大数。它在单一线程单一进程在一个 CPU 上完成这些。执行时间示意图:

不像 I/O 密集型示例, CPU 密集型示例的运行时间经常完全一致。这个示例在我的电脑上花费 7.8 秒:

1
2
$ ./cpu_non_concurrent.py
Duration 7,834432125091553 seconds

很明显,我们可能做的更好。这个示例都运行在一个 CPU 上,没有使用并发。我们看看如何做的更好。

threading 和 asyncio 版本

你觉得重写代码使用 threading 或 asyncio 能提速多少?

如果你的答案是“完全不能”,奖励自己一块小饼干。如果你回答,“它会降速”,奖励自己两块小饼干。

原因如下:在 I/O 密集型示例中,大部分时间花在等待慢操作完成。 threading 和 asyncio 可以提速,是允许你重叠等待时间而不是顺序执行。

不过,在 CPU 密集型问题上,没有等待时间。 CPU 全力运行直到完成问题。在 Python 中,线程和任务运行在同一个 CPU 同一个进程。这就意味着非并发代码只有一个 CPU 在做所有的工作,还要加上设置线程或任务的额外工作。它花费超过 10 秒:

1
2
$ ./cpu_threading.py
Duration 10.407078266143799 seconds

我也编写了 threading 版本,也放在 Github 库 中,你可以自己去测试。不过,请继续看。

CPU 密集型 multiprocessing 版本

现在,终于到了 multiprocessing 发光的地方。不同于其他并发库, multiprocessing 是显式地设计成通过多 CPU 来分担重 CPU 工作负载。这是它的执行时间示意图:

这是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import multiprocessing
import time


def cpu_bound(number):
return sum(i * i for i in range(number))


def find_sums(numbers):
with multiprocessing.Pool() as pool:
pool.map(cpu_bound, numbers)


if __name__ == "__main__":
numbers = [5_000_000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duratino {duration} seconds")

只需从非并发版本做少量修改。你必须 import multiprocessing 然后将循环改成创建 multiprocessing.Pool 对象,使用它的 .map() 方法来发送独立的数字到自由的工作进程。

这就是你需要做的,但是你不用关心 Session 对象。

像之前提到的,需要注意 multiprocessing.Pool() 构造器的进程可选参数。你可以指定在池中你想创建或管理多少进程对象。默认情况下,它检查电脑中 CPU 个数,并为每个 CPU 创建进程。这对我们的简单示例很有效,你或许想在生产环境有更多控制。

同样的,像之前提到的线程, multiprocessing.Pool 代码是建立在像 QueueSemaphore 的区块上,如果你使用其他语言写过多线程和多进程代码,这会很熟悉。

为什么 multiprocessing 版本很棒

这个示例中的 multiprocessing 版本很棒,因为它设置简单,且仅需要少量额外代码。而且充分利用了电脑上的 CPU 。

这就是我说的 multiprocessing 。很明显,这是最好的选择。在我的电脑上花费 2.5 秒:

1
2
$ ./cpu_mp.py
Duration 2.5175397396087646 seconds

比其他选择好得多。

multiprocessing 版本的问题

使用 multiprocessing 也有一些缺点。在这个简单示例中没有呈现,但是拆分问题使每个处理器独立工作,有时会很困难。

同样的,许多解决方案需要在进程之间更多通信。这会在解决方案中增加复杂度,而非并发程序则不需要考虑。

什么时候使用并发

你已经了解了很多内容,让我们回顾一下关键点,然后讨论一些决定点,帮助你决定哪个并发模块可以使用在你的项目中。

第一步是,决定是否需要使用并发模块。虽然,示例中每个库看起来都很简单,不过并发总是带来额外的复杂度,并经常导致很难查找的 bug 。

除非你明确有性能问题,否则不要使用并发。然后决定使用哪种并发。正如 Donald Knuth 所说,“过早优化是万恶之源(至少多数情况下)”。

一旦你决定优化你的程序,下一步就是分清程序是 CPU 密集型还是 I/O 密集型。记得, I/O 密集型程序是花费大量时间等待事物发生,而 CPU 密集型程序花费时间以最快的数度处理数据或处理数字。

如你所见, CPU 密集型问题只能从 multiprocessing 获得好处。 threading 和 asyncio 对这种问题没有任何帮助。

对于 I/O 密集型问题, Python 社区有一般经验法则:“如果可以,使用 asyncio ,如果必须则使用 threading ”。 asyncio 可以对这种程序提供最快的加速,但是有时你需要的关键库还没有迁移来发挥 asyncio 的优势。记住,任何任务没有放弃控制给事件循环将阻塞其他所有任务。

总结

你已经看到了 Python 中并发的基本类型:

  • threading
  • asyncio
  • multiprocessing

你已经了解了对于给定问题应该使用哪种并发方法,或者是否需要使用!另外,你对可以使用并发解决问题有了更多的了解。