更快的 Python:async/await 和 threading 中的并发

如果您使用 Python 编程已经有一段时间,特别是使用过 FastAPI 和 discord.py 这样的框架和库,那么您可能已经接触过 async/await
或 asyncio
。 您也许听说过“Python 中不存在多线程处理”这种说法,也可能知道 Python 中大名鼎鼎(或臭名昭著)的 GIL。 考虑到 Python 中多线程受到的否定,您可能好奇 async/await
与多线程处理之间到底有什么区别,尤其是在 Python 编程中。 如果上述情况与您相符,那么这篇博文就是为您准备的!
什么是多线程处理?
在编程中,多线程处理是指程序同时执行多个顺序任务(即线程)的能力。 这些线程可以在单个处理器核心上运行,也可以跨多个核心运行。 不过,由于全局解释器锁 (GIL) 的限制,Python 中的多线程处理只能在单个核心上处理。 nogil(也称无线程)Python 是个例外,它消除了 GIL,本系列的第二部分将对此进行介绍。 在这篇博文中,我们假设 GIL 始终存在。
什么是并发?
编程中的并发是指计算机同时执行多个任务,或者似乎在同时执行多个任务,即使不同的任务是在单个处理器上执行。 通过管理程序不同部分之间的资源和交互,不同的任务可以在重叠时间间隔内独立取得进展。
asyncio
和 threading
在 Python 中都表现为并发
粗略地说,asyncio
和 threading
Python 库都支持并发。 不过,您的 CPU 并没有在完全相同的时间执行多个任务, 只是看起来像而已。
假设您要为几位客人准备一顿多道菜的晚餐。 有些菜需要时间烹制,例如,烤箱中烘烤的派或者炉子上慢炖的汤。 在这些菜肴烹制期间,我们不会干等着, 而是会去忙其他事。 这就类似于 Python 中的并发。 有时,Python 进程会等待某个任务完成。 例如,当一些输入/输出 (I/O) 进程由操作系统处理时,Python 进程只是在等待。 这时,我们可以使用 async 让另一个 Python 进程在等待期间运行。

区别在于谁说了算
如果 asyncio
和 threading
都表现为并发,它们之间有什么区别? 主要区别在于谁负责哪个进程在什么时候运行。 对于 async/await
,这种方法有时被称为合作并发。 一个协程或 future 将其控制权转交给另一个协程或 future。 另一方面,在 threading
中,操作系统的管理器将控制哪个进程运行。
以会议作为类比,在合作并发的会议中,一个麦克风被传来传去。 拿着麦克风的人可以发言,当他发言完毕,他会将麦克风传递给下一个人。 而在多线程处理的会议中,则是由一个主席决定谁在某段时间内可以发言。
在 Python 中编写并发代码
我们编写一些示例代码来了解 Python 中并发的运作方式。 我们将使用 asyncio
和 threading
创建一个快餐店模拟。
Python 中 async/await
的运作方式
asyncio
软件包在 Python 3.4 中引入,而 async
和 await
关键字在 Python 3.5 中引入。 async/await
之所以成为可能,主要原因之一是协程的使用。 Python 中的协程实际上是重新设计的生成器,能够暂停并传回 main 函数。
现在,想象一下一家汉堡店,只有一名员工在工作。 订单按照先进先出的队列准备,不能执行异步操作:
import time def make_burger(order_num): print(f"Preparing burger #{order_num}...") time.sleep(5) # time for making the burger print(f"Burger made #{order_num}") def main(): for i in range(3): make_burger(i) if __name__ == "__main__": s = time.perf_counter() main() elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
这将需要一些时间来完成:
Preparing burger #0... Burger made #0 Preparing burger #1... Burger made #1 Preparing burger #2... Burger made #2 Orders completed in 15.01 seconds.
现在,假设餐厅增加了人手,员工们同时工作:
import asyncio import time async def make_burger(order_num): print(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger print(f"Burger made #{order_num}") async def main(): order_queue = [] for i in range(3): order_queue.append(make_burger(i)) await asyncio.gather(*(order_queue)) if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
我们可以看到两者的区别:
Preparing burger #0... Preparing burger #1... Preparing burger #2... Burger made #0 Burger made #1 Burger made #2 Orders completed in 5.00 seconds.
使用 asyncio
提供的函数,例如 run
和 gather
,以及关键字 async
和 await
,我们创建出可以并发制作汉堡的协程。
接下来,我们更进一步,创建一个更复杂的模拟。 假设我们只有两个工作进程,一次只能制作两个汉堡。
import asyncio import time order_queue = asyncio.Queue() def take_order(): for i in range(3): order_queue.put_nowait(make_burger(i)) async def make_burger(order_num): print(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger print(f"Burger made #{order_num}") class Staff: def __init__(self, name): self.name = name async def working(self): while order_queue.qsize() > 0: print(f"{self.name} is working...") task = await order_queue.get() await task print(f"{self.name} finished a task...") async def main(): staff1 = Staff(name="John") staff2 = Staff(name="Jane") take_order() await asyncio.gather(staff1.working(), staff2.working()) if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
我们用队列保存任务,然后由员工领取。
John is working... Preparing burger #0... Jane is working... Preparing burger #1... Burger made #0 John finished a task... John is working... Preparing burger #2... Burger made #1 Jane finished a task... Burger made #2 John finished a task... Orders completed in 10.00 seconds.
在这个示例中,我们使用 asyncio.Queue
存储任务,不过,如果有多种类型的任务,它会更实用,如下例所示。
import asyncio import time task_queue = asyncio.Queue() order_num = 0 async def take_order(): global order_num order_num += 1 print(f"Order burger and fries for order #{order_num:04d}:") burger_num = input("Number of burgers:") for i in range(int(burger_num)): await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}")) fries_num = input("Number of fries:") for i in range(int(fries_num)): await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}")) print(f"Order #{order_num:04d} queued.") await task_queue.put(take_order()) async def make_burger(order_num): print(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger print(f"Burger made #{order_num}") async def make_fries(order_num): print(f"Preparing fries #{order_num}...") await asyncio.sleep(2) # time for making fries print(f"Fries made #{order_num}") class Staff: def __init__(self, name): self.name = name async def working(self): while True: if task_queue.qsize() > 0: print(f"{self.name} is working...") task = await task_queue.get() await task print(f"{self.name} finish task...") else: await asyncio.sleep(1) #rest async def main(): task_queue.put_nowait(take_order()) staff1 = Staff(name="John") staff2 = Staff(name="Jane") await asyncio.gather(staff1.working(), staff2.working()) if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
这个示例中有多个任务,包括制作薯条(耗时较少)和接受订单(需要用户输入)。
可以看到程序停止来等待用户输入,甚至其他没有接受订单的员工也停止了后台工作。 这是因为 input
函数不是异步的,因此不会被等待。 记住,异步代码中的控制只在被等待时才会释放。 为了解决这个问题,我们可以将:
input("Number of burgers:")
替换为
await asyncio.to_thread(input, "Number of burgers:")
对薯条也是一样,如以下代码所示。 注意,现在程序在无限循环中运行。 如果需要停止,我们可以故意使用无效输入使程序崩溃。
import asyncio import time task_queue = asyncio.Queue() order_num = 0 async def take_order(): global order_num order_num += 1 print(f"Order burger and fries for order #{order_num:04d}:") burger_num = await asyncio.to_thread(input, "Number of burgers:") for i in range(int(burger_num)): await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}")) fries_num = await asyncio.to_thread(input, "Number of fries:") for i in range(int(fries_num)): await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}")) print(f"Order #{order_num:04d} queued.") await task_queue.put(take_order()) async def make_burger(order_num): print(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger print(f"Burger made #{order_num}") async def make_fries(order_num): print(f"Preparing fries #{order_num}...") await asyncio.sleep(2) # time for making fries print(f"Fries made #{order_num}") class Staff: def __init__(self, name): self.name = name async def working(self): while True: if task_queue.qsize() > 0: print(f"{self.name} is working...") task = await task_queue.get() await task print(f"{self.name} finish task...") else: await asyncio.sleep(1) #rest async def main(): task_queue.put_nowait(take_order()) staff1 = Staff(name="John") staff2 = Staff(name="Jane") await asyncio.gather(staff1.working(), staff2.working()) if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
使用 asyncio.to_thread
,我们将 input
函数放入了一个单独的线程(参阅此参考)。 不过,需要注意的是,这个技巧只有在 Python GIL 存在时才能疏通 I/O 绑定任务。
如果运行上面的代码,您可能还会发现终端中的标准 I/O 出现混乱。 用户 I/O 和发生的事件的记录应该是分开的。 我们可以将记录放入日志,以后再检查。
import asyncio import logging import time logger = logging.getLogger(__name__) logging.basicConfig(filename='pyburger.log', level=logging.INFO) task_queue = asyncio.Queue() order_num = 0 closing = False async def take_order(): global order_num, closing try: order_num += 1 logger.info(f"Taking Order #{order_num:04d}...") print(f"Order burger and fries for order #{order_num:04d}:") burger_num = await asyncio.to_thread(input, "Number of burgers:") for i in range(int(burger_num)): await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}")) fries_num = await asyncio.to_thread(input, "Number of fries:") for i in range(int(fries_num)): await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}")) logger.info(f"Order #{order_num:04d} queued.") print(f"Order #{order_num:04d} queued, please wait.") await task_queue.put(take_order()) except ValueError: print("Goodbye!") logger.info("Closing down... stop taking orders and finish all tasks.") closing = True async def make_burger(order_num): logger.info(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger logger.info(f"Burger made #{order_num}") async def make_fries(order_num): logger.info(f"Preparing fries #{order_num}...") await asyncio.sleep(2) # time for making fries logger.info(f"Fries made #{order_num}") class Staff: def __init__(self, name): self.name = name async def working(self): while True: if task_queue.qsize() > 0: logger.info(f"{self.name} is working...") task = await task_queue.get() await task task_queue.task_done() logger.info(f"{self.name} finish task.") elif closing: return else: await asyncio.sleep(1) #rest async def main(): global task_queue task_queue.put_nowait(take_order()) staff1 = Staff(name="John") staff2 = Staff(name="Jane") print("Welcome to Pyburger!") logger.info("Ready for business!") await asyncio.gather(staff1.working(), staff2.working()) logger.info("All tasks finished. Closing now.") if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s logger.info(f"Orders completed in {elapsed:0.2f} seconds.")
在这最后的代码块中,我们将模拟信息记录在 pyburger.log
中,并为客户消息预留了终端。 我们还会在点餐过程中捕获无效输入,并在输入无效时将 closing
标志切换为 True
(假设用户想要退出)。 在 closing
标志设为 True
后,工作线程将 return
,结束协程的无限 while
循环。
Python 中 threading
的运作方式
在上面的示例中,我们将 I/O 绑定任务放入另一个线程。 您可能想知道,是否可以将所有任务放入单独的线程,并让它们并发运行。 我们尝试使用 threading
而不是 asyncio
。
在下面的代码中我们并发制作汉堡,没有限制:
import asyncio import time async def make_burger(order_num): print(f"Preparing burger #{order_num}...") await asyncio.sleep(5) # time for making the burger print(f"Burger made #{order_num}") async def main(): order_queue = [] for i in range(3): order_queue.append(make_burger(i)) await asyncio.gather(*(order_queue)) if __name__ == "__main__": s = time.perf_counter() asyncio.run(main()) elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.") ``` Instead of creating async coroutines to make the burgers, we can just send functions down different threads like this: ``` import threading import time def make_burger(order_num): print(f"Preparing burger #{order_num}...") time.sleep(5) # time for making the burger print(f"Burger made #{order_num}") def main(): order_queue = [] for i in range(3): task = threading.Thread(target=make_burger, args=(i,)) order_queue.append(task) task.start() for task in order_queue: task.join() if __name__ == "__main__": s = time.perf_counter() main() elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
在 main
的第一个 for
循环中,任务在不同的线程中创建并启动。 第二个 for
循环确保所有汉堡都制作完成后,程序才能继续运行(即返回 main
之前)。
当我们只有两名员工时,情况更加复杂。 每名员工都由一个线程表示,他/她从一个存储所有任务的普通列表中获取任务。
import threading import time order_queue = [] def take_order(): for i in range(3): order_queue.append(make_burger(i)) def make_burger(order_num): def making_burger(): print(f"Preparing burger #{order_num}...") time.sleep(5) # time for making the burger print(f"Burger made #{order_num}") return making_burger def working(): while len(order_queue) > 0: print(f"{threading.current_thread().name} is working...") task = order_queue.pop(0) task() print(f"{threading.current_thread().name} finish task...") def main(): take_order() staff1 = threading.Thread(target=working, name="John") staff1.start() staff2 = threading.Thread(target=working, name="Jane") staff2.start() staff1.join() staff2.join() if __name__ == "__main__": s = time.perf_counter() main() elapsed = time.perf_counter() - s print(f"Orders completed in {elapsed:0.2f} seconds.")
运行上面的代码时,其中一个线程可能会出现错误,指示它正在尝试从空列表中获取任务。 您可能想知道为什么会出现这种情况,因为 while
循环中已经有一个条件,只有 task_queue
不为空时才会继续执行。 但还是出错了,这是因为我们遇到了竞争条件。
竞争条件
当多个线程同时尝试访问同一资源或数据时,就会发生竞争条件,导致系统出现问题。 访问资源的时间和顺序对程序逻辑至关重要,不可预测的时间或多个线程交叉访问和修改共享数据可能导致错误。
为了解决程序中的竞争条件,我们将向 task_queue
部署一个锁:
queue_lock = threading.Lock()
为了奏效,我们需要确保在检查队列长度以及从队列中获取任务时拥有访问权限。 当我们拥有权限时,其他线程无法访问队列:
def working(): while True: with queue_lock: if len(order_queue) == 0: return else: task = order_queue.pop(0) print(f"{threading.current_thread().name} is working...") task() print(f"{threading.current_thread().name} finish task...") ``` Based on what we have learned so far, we can complete our final code with threading like this: ``` import logging import threading import time logger = logging.getLogger(__name__) logging.basicConfig(filename="pyburger_threads.log", level=logging.INFO) queue_lock = threading.Lock() task_queue = [] order_num = 0 closing = False def take_order(): global order_num, closing try: order_num += 1 logger.info(f"Taking Order #{order_num:04d}...") print(f"Order burger and fries for order #{order_num:04d}:") burger_num = input("Number of burgers:") for i in range(int(burger_num)): with queue_lock: task_queue.append(make_burger(f"{order_num:04d}-burger{i:02d}")) fries_num = input("Number of fries:") for i in range(int(fries_num)): with queue_lock: task_queue.append(make_fries(f"{order_num:04d}-fries{i:02d}")) logger.info(f"Order #{order_num:04d} queued.") print(f"Order #{order_num:04d} queued, please wait.") with queue_lock: task_queue.append(take_order) except ValueError: print("Goodbye!") logger.info("Closing down... stop taking orders and finish all tasks.") closing = True def make_burger(order_num): def making_burger(): logger.info(f"Preparing burger #{order_num}...") time.sleep(5) # time for making the burger logger.info(f"Burger made #{order_num}") return making_burger def make_fries(order_num): def making_fries(): logger.info(f"Preparing fried #{order_num}...") time.sleep(2) # time for making fries logger.info(f"Fries made #{order_num}") return making_fries def working(): while True: with queue_lock: if len(task_queue) == 0: if closing: return else: task = None else: task = task_queue.pop(0) if task: logger.info(f"{threading.current_thread().name} is working...") task() logger.info(f"{threading.current_thread().name} finish task...") else: time.sleep(1) # rest def main(): print("Welcome to Pyburger!") logger.info("Ready for business!") task_queue.append(take_order) staff1 = threading.Thread(target=working, name="John") staff1.start() staff2 = threading.Thread(target=working, name="Jane") staff2.start() staff1.join() staff2.join() logger.info("All tasks finished. Closing now.") if __name__ == "__main__": s = time.perf_counter() main() elapsed = time.perf_counter() - s logger.info(f"Orders completed in {elapsed:0.2f} seconds.")
如果您比较使用 asyncio
和 threading
的两个代码段,它们应该会有相似的结果。 您可能想知道哪一个更好,该怎样做出选择。
实际上,编写 asyncio
代码比多线程处理更容易,因为我们不必自己处理潜在的竞争条件和死锁。 控制默认在协程之间传递,因此不需要锁。 不过,Python 线程确实有并行运行的潜力,只是在 GIL 存在的情况下大多数时候无法实现。 我们可以在下一篇博文中讨论 nogil(无线程)Python 时回到这个问题。
从并发中受益
为什么要在编程中使用并发? 一个主要原因是:速度。 如上文所述,如果我们能缩短等待时间,任务就可以更快完成。 计算中存在不同类型的等待,针对每一种等待,我们倾向于使用不同的方法来节省时间。
I/O 绑定任务
当任务或程序的执行速度主要受限于 I/O 操作(例如从文件或网络读取数据,或等待用户输入)的速度时,该任务或程序就被视为输入/输出 (I/O) 绑定。 I/O 操作通常比其他 CPU 操作慢,因此,涉及大量 I/O 操作的任务可能会花费更多时间。 这些任务的典型示例包括从数据库读取数据、处理 Web 请求或处理大型文件。
使用 async/await
并发可以疏通处理序列,并让其他任务在等待期间得到处理,从而有助于优化 I/O 绑定任务的等待时间。
async/await
并发在许多 Python 应用程序中都非常有用,例如涉及与数据库进行大量通信和处理 Web 请求的 Web 应用程序。 GUI(图形用户界面)也可以从 async/await
并发中受益,允许在用户与应用程序交互时执行后台任务。
CPU 绑定任务
当任务或程序的执行速度主要受限于 CPU 的速度时,该任务或程序就被视为 CPU 绑定。 典型示例包括图像或视频处理,例如调整大小或编辑,以及复杂的数学计算,例如矩阵乘法或训练机器学习模型。
与 I/O 绑定任务不同,CPU 绑定任务很少能够通过使用 async/await
并发优化,因为 CPU 已经在忙于处理这些任务。 如果您的机器有多个 CPU,或者您可以将部分任务卸载到一个或多个 GPU,那么可以创建更多线程并执行多处理来更快完成 CPU 绑定任务。 多处理可以优化这些 CPU 和 GPU 的使用方式,正因如此,如今许多机器学习和 AI 模型都在多个 GPU 上训练。
不过,这很难通过纯 Python 代码实现,因为 Python 本身就是为提供抽象层而设计,让用户无需控制低级别计算过程。 此外,Python 的 GIL 限制了计算机上多个线程之间 Python 资源的共享。 最近,Python 3.13 允许移除 GIL 以实现真正的多线程处理。 我们将在下一篇博文中讨论 GIL 以及如何将其移除。
有时,我们上面提到的方法都无法充分加速 CPU 绑定任务。 在这种情况下,CPU 绑定任务可能需要被分解成更小的任务,以便在多个线程、多个处理器甚至多台机器上同时执行。 这就是并行处理,您可能需要完全重写代码才能实现。 在 Python 中,multiprocessing
软件包提供本地和远程并发,可以用来绕过 GIL 的限制。 我们也将在下一篇博文中展示一些相关示例。
在 PyCharm 中调试并发代码
调试异步或并发代码可能很困难,因为程序不是按顺序执行,这意味着很难看到代码的执行位置和时间。 许多开发者使用 print
帮助追踪代码流,但不推荐这种方式,因为它非常笨拙,用它来研究复杂的程序(例如并发程序)并不容易。 而且,后续整理也很麻烦。
许多 IDE 提供调试器,它们非常适合检查变量和程序流。 调试器还能提供跨多个线程的清晰堆栈跟踪。 来看看如何在 PyCharm 中跟踪示例餐厅模拟的 task_queue
。
首先,我们将在代码中设置一些断点。 为此,请点击您希望调试器暂停的行号。 行号将变成红点,表示这里设置了断点。 我们将在第 23、27 和 65 行设置断点,在这些行中,task_queue
在不同的线程中被更改。


然后,我们点击右上角的小虫子图标,以调试模式运行程序。

点击图标后,将打开 Debug(调试)窗口。 程序开始运行,直到代码中高亮显示的第一个断点。

在这里,我们看到 John
线程正在尝试接收任务,第 65 行被高亮显示。 此时,高亮显示的行尚未执行。 当我们想在进入断点之前检查变量时,这很有用。
我们检查一下 task_queue
中的内容。 在 Debug(调试)窗口中开始输入即可,如下所示。

选择或输入“task_queue”,然后按 Enter。 您将看到 take_order
任务在队列中。

现在,我们点击 Step in(步入)按钮执行断点,如下所示。

按下按钮并查看过弹出的 Special Variables(特殊变量)窗口后,我们看到任务变量现在为 John
线程中的 take_order
。

再次查询 task_queue
时,我们发现列表现在为空。

点击 Resume Program(恢复程序)按钮,让程序运行。

当程序到达用户输入部分时,PyCharm 会跳转到 Console(控制台)窗口,让我们提供输入。 假设我们想要两个汉堡。 输入“2”,然后按 Enter。

现在,我们到达第二个断点。 如果点击 Threads & Variables(线程和变量)回到该窗口,我们将看到 burger_num
为 2,与我们输入的一样。

现在,我们步入断点,检查 task_queue
,就像之前一样。 我们可以看到新增了一个 make_burger
任务。

让程序再次运行,如果我们在程序停止时步入断点,就可以看到 Jane
正在接手任务。

您可以自行检查其余代码。 完成后,按窗口顶部的红色 Stop(停止)按钮即可。

使用 PyCharm 中的调试器,您可以轻松跨线程跟踪程序的执行情况并检查不同的变量。
结论
现在,我们已经学习了 Python 中并发的基础知识,希望您能够通过实践练习加以掌握。 在下一篇博文中,我们将探讨 Python GIL、它的作用以及它缺失时会有什么变化。
PyCharm 为处理并发 Python 代码提供了强大的工具。 如本文所示,调试器允许对异步和线程代码的逐步检查,帮助您跟踪执行流、监控共享资源和检测问题。 PyCharm 拥有直观的断点、实时变量视图、用户输入的无缝控制台集成以及强大的日志记录支持,让编写、测试和调试应用程序变得更加轻松、可靠和清晰。
本博文英文原作者: