How-To's Tutorials Web Development

より高速な Python 開発へ: async/await と threading における並行性

Read this post in other languages:

ある程度 Python のコーディング経験がある方(特に FastAPI や discord.py などのフレームワークとライブラリを使っている方)は、async/awaitasyncio を使用したことがあるかと思います。 「Python でのマルチスレッド処理は本物ではない」との言説を聞いたことがあり、Python の有名な(あるいは悪名高い)GIL のこともご存知かもしれません。 Python のマルチスレッド処理に対する否定的な意見を考慮すると、特に Python のプログラミングにおける async/await とマルチスレッド処理の実際の違いが気になっているのではないでしょうか。 このブログ記事はそんな方にぴったりです!

マルチスレッド処理とは?

プログラミングにおけるマルチスレッド処理とは、あるプログラムが複数の連続するタスク(スレッドと呼ばれる)を並行して実行する機能を指します。 このようなスレッドは 1 つのプロセッサコアか複数のコアで実行できますが、 グローバルインタープリターロック(GIL)の制限により、Python のマルチスレッド処理は 1 つのコアでしか実行されません。 GIL を取り除く nogil(またはスレッドフリー)Python はこの例外ですが、これについてはこの連載の第 2 部で説明します。 このブログ記事では、GIL が必ず存在することを前提とします。

並行性とは?

プログラミングにおける並行性とは、コンピューターが同時に複数の処理を行っている、または行っているように見えることを指します。これは、複数の異なるタスクが 1 つのプロセッサ上で実行されている場合も当てはまります。 プログラムを構成する複数の異なる部分間のリソースや連携処理を管理することによって、別々のタスクを同じ時間枠内で独立して進められるようになります。

Python では asynciothreading も並行処理に見える

Python ライブラリの asynciothreading は概してどちらも並行性を実現しているかのように機能しますが、 CPU が実際に同時に複数の処理を行っているわけではありません。 ただそのように見えるだけです。

数名のゲストをコース料理のディナーに招待している状況を想像してください。 コース料理の中にはオーブンで焼く必要のあるパイやコンロで煮込むスープなど、調理に時間がかかるものもあります。 食事が準備されるのを待つ間、何もせずにただじっとしている人はいません。 その間は他のことをするでしょう。 これは Python における並行性に似ています。 Python のプロセスは何らかの処理が完了するのを待機している場合があります。 たとえば、一部の入出力(I/O)プロセスはオペレーティングシステムによって処理されている間、Python プロセスはただ待機しています。 その際に async を使用すると、そのプロセスが待機中に他の Python プロセスを実行させることができます。

Python のマルチスレッド処理と asyncio の比較

実行の主体に違いがある

asynciothreading が並行しているように見える場合、それらの違いは何でしょうか? 一番の違いは、実行するプロセスとその実行タイミングを管理する主体です。 async/await の場合、この手法は協調的並行性と呼ばれることもあります。 コルーチンや Future はその制御権を別のコルーチンや Future に委譲し、他の処理を実行できるようにします。 これに対し、threading の場合はオペレーティングシステムのマネージャーがどのプロセスが実行されるかどうかを制御します。

協調的並行性は、複数人が発言できるようにマイクが回される会議に似ています。 マイクを持った人が発言でき、発言が終わるか、発言すべきことがなくなると、次の人にマイクが渡されます。 それとは対照的に、マルチスレッド処理は特定のタイミングで発言する人を司会者が指名する会議です。

Python で並行処理コードを書く

サンプルコードを書いて Python における並行性の仕組みを見てみましょう。 asynciothreading の両方を使用し、ファストフード店のシミュレーションを作成してみましょう。

Python の async/await の仕組み

asyncio パッケージは Python 3.4 で、async キーワードと await キーワードは Python 3.5 で導入されました。 async/await を実現する上での重要事項の 1 つには、コルーチンを使用することがあります。 Python のコルーチンは、実際には一時停止や main 関数への返却を可能とするように転用されるジェネレーターです。

では、スタッフメンバーが 1 人しか働いていないバーガーショップを想像してみましょう。 注文の品は注文順に準備され、async 操作は一切行われません。

import time


def make_burger(order_num):
    print(f"Preparing burger #{order_num}...")
    time.sleep(5) # time for making the burger
    print(f"Burger made #{order_num}")


def main():
    for i in range(3):
        make_burger(i)


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"Orders completed in {elapsed:0.2f} seconds.")

この場合、完了するまで少々時間がかかります。

Preparing burger #0...

Burger made #0

Preparing burger #1...

Burger made #1

Preparing burger #2...

Burger made #2

Orders completed in 15.01 seconds.

では、並行して作業できるようにショップのスタッフを増員した場合はどうでしょうか。

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

この 2 つには違いがあるのが分かります。

Preparing burger #0...

Preparing burger #1...

Preparing burger #2...

Burger made #0

Burger made #1

Burger made #2

Orders completed in 5.00 seconds.

asyncio が提供する rungather などの関数や、asyncawait などのキーワードを使用することで、ハンバーガーを並行して作成できるコルーチンが出来上がりました。

次はさらに一歩進んで、より複雑なシミュレーションを作成してみましょう。 スタッフは 2 人だけで、一度に 2 つのハンバーガーを作れる状態を想像してみましょう。

import asyncio

import time

order_queue = asyncio.Queue()

def take_order():

  for i in range(3):

      order_queue.put_nowait(make_burger(i))

async def make_burger(order_num):

  print(f"Preparing burger #{order_num}...")

  await asyncio.sleep(5)  # time for making the burger

  print(f"Burger made #{order_num}")

class Staff:

  def __init__(self, name):

      self.name = name

  async def working(self):

      while order_queue.qsize() > 0:

          print(f"{self.name} is working...")

          task = await order_queue.get()

          await task

          print(f"{self.name} finished a task...")

async def main():

  staff1 = Staff(name="John")

  staff2 = Staff(name="Jane")

  take_order()

  await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

  s = time.perf_counter()

  asyncio.run(main())

  elapsed = time.perf_counter() - s

  print(f"Orders completed in {elapsed:0.2f} seconds.")

ここでは、キューを使用してタスクを格納し、スタッフがタスクを取得できるようにします。

John is working...

Preparing burger #0...

Jane is working...

Preparing burger #1...

Burger made #0

John finished a task...

John is working...

Preparing burger #2...

Burger made #1

Jane finished a task...

Burger made #2

John finished a task...

Orders completed in 10.00 seconds.

この例では asyncio.Queue を使用してタスクを格納していますが、次の例のようにタスクの種類が複数ある場合場合はさらに便利になります。

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = input("Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = input("Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

この例では、比較的時間のかからないフライドポテト作りやユーザーからの入力を必要とする注文の受け付けなどの複数のタスクがあります。

このプログラムはユーザーからの入力を待機中に停止しますが、バックグラウンドで注文を受け付けていない他のスタッフの作業も停止していることに注意してください。 これは、input 関数が async ではないため、await の対象になっていないためです。 非同期コードでの制御は、待機対象になっている場合にのみ解放されることを覚えておきましょう。 これを修正するには、

input("Number of burgers:")

を次に置き換えます。

await asyncio.to_thread(input, "Number of burgers:")

また、フライドポテトにも同じ修正を行います。以下のコードを参照してください。 これで、プログラムが無限ループで実行されるようになりました。 このループを停止させたい場合は、無効な入力によって意図的にプログラムをクラッシュさせることができます。

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = await asyncio.to_thread(input, "Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = await asyncio.to_thread(input, "Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

asyncio.to_thread を使用することで、input 関数を別のスレッドに入れました(こちらのリファレンスを参照)。 ただし、この技でブロックを解除できるのは、Python GIL が存在する状況の I/O バウンドのタスクだけです。

上記のコードを実行すると、ターミナルの標準 I/O が入り乱れて見えるかもしれません。 ユーザーの I/O と処理内容の記録は分離する必要があります。 記録をログに入れ、後で検査することができます。

import asyncio

import logging

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename='pyburger.log', level=logging.INFO)

task_queue = asyncio.Queue()

order_num = 0

closing = False

async def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = await asyncio.to_thread(input, "Number of burgers:")

       for i in range(int(burger_num)):

           await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = await asyncio.to_thread(input, "Number of fries:")

       for i in range(int(fries_num)):

           await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       await task_queue.put(take_order())

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

async def make_burger(order_num):

   logger.info(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   logger.info(f"Burger made #{order_num}")

async def make_fries(order_num):

   logger.info(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   logger.info(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               logger.info(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               task_queue.task_done()

               logger.info(f"{self.name} finish task.")

           elif closing:

               return

           else:

               await asyncio.sleep(1) #rest

async def main():

   global task_queue

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   await asyncio.gather(staff1.working(), staff2.working())

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

この最終的なコードブロックでは、シミュレーション情報を pyburger.log に記録し、ターミナルを顧客向けのメッセージ専用にしました。 また、注文を処理中に無効な入力を捕捉し、入力が無効な場合はユーザーが終了を望んでいるものとして closing フラグをTrue に切り替えています。 closing フラグが True に設定されると、ワーカーが return となり、コルーチンの無限 while ループが終了します。

Python の threading の仕組み

上記の例では、I/O バウンドのタスクを別のスレッドに入れました。 すべてのタスクを別々のスレッドに入れ、それらを同時に実行できるのではないかと思うのではないでしょうか。 asyncio の代わりに threading を使用してみましょう。

以下に示すコードを考察しましょう。このコードでは制限を設けずに複数のハンバーガーを同時に作成しています。

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

```

Instead of creating async coroutines to make the burgers, we can just send functions down different threads like this:

```

import threading

import time

def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   time.sleep(5) # time for making the burger

   print(f"Burger made #{order_num}")

def main():

   order_queue = []

   for i in range(3):

       task = threading.Thread(target=make_burger, args=(i,))

       order_queue.append(task)

       task.start()

   for task in order_queue:

       task.join()

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

main の最初の for ループでは、タスクが別々のスレッドで作成されて開始しています。 2 つ目の for ループでは、すべてのハンバーガーがプログラムが進行する前に(つまり、main に戻る前に)確実に作られるようにしています。

スタッフメンバーが 2 人だけの場合はさらに複雑です。 それぞれのスタッフはスレッドで表現され、すべてのタスクが格納されている通常のリストからタスクを取得します。

import threading

import time

order_queue = []

def take_order():

   for i in range(3):

       order_queue.append(make_burger(i))

def make_burger(order_num):

   def making_burger():

       print(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       print(f"Burger made #{order_num}")

   return making_burger

def working():

     while len(order_queue) > 0:

         print(f"{threading.current_thread().name} is working...")

         task = order_queue.pop(0)

         task()

         print(f"{threading.current_thread().name} finish task...")

def main():

   take_order()

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

if __name__ == "__main__":

 s = time.perf_counter()

 main()

 elapsed = time.perf_counter() - s

 print(f"Orders completed in {elapsed:0.2f} seconds.")

上記のコードを実行すると、空のリストからタスクを取得しようとしているというエラーがいずれかのスレッドで発生する可能性があります。 while ループには task_queue が空でない場合にのみ続行される条件が含まれているため、そのようなエラーは発生する理由が分からないかもしれません。 そのような条件でもエラーになるのは、競合状態が発生しているためです。

競合状態

競合状態は複数のスレッドが同じリソースやデータに同時にアクセスしようとした場合に発生し、システムに問題を引き起こします。 プログラムのロジックでは、リソースがアクセスされる際のタイミングと順序は重要です。また、共有データにアクセスして変更を行う複数のスレッドのタイミングやインターリーブを予測できない場合、エラーが発生する可能性があります。

プログラム内の競合状態を解決するため、task_queue に lock を導入しましょう。

queue_lock = threading.Lock()

working では、キューの長さをチェックしてタスクを取得する際にキューへのアクセス権があることを確認する必要があります。 権限があっても、他のスレッドがキューにアクセスできないようにする必要があります。

def working():

   while True:

       with queue_lock:

           if len(order_queue) == 0:

               return

           else:

               task = order_queue.pop(0)

       print(f"{threading.current_thread().name} is working...")

       task()

       print(f"{threading.current_thread().name} finish task...")

```

Based on what we have learned so far, we can complete our final code with threading like this:

```

import logging

import threading

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename="pyburger_threads.log", level=logging.INFO)

queue_lock = threading.Lock()

task_queue = []

order_num = 0

closing = False

def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = input("Number of burgers:")

       for i in range(int(burger_num)):

           with queue_lock:

               task_queue.append(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = input("Number of fries:")

       for i in range(int(fries_num)):

           with queue_lock:

               task_queue.append(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       with queue_lock:

           task_queue.append(take_order)

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

def make_burger(order_num):

   def making_burger():

       logger.info(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       logger.info(f"Burger made #{order_num}")

   return making_burger

def make_fries(order_num):

   def making_fries():

       logger.info(f"Preparing fried #{order_num}...")

       time.sleep(2)  # time for making fries

       logger.info(f"Fries made #{order_num}")

   return making_fries

def working():

   while True:

       with queue_lock:

           if len(task_queue) == 0:

               if closing:

                   return

               else:

                   task = None

           else:

               task = task_queue.pop(0)

       if task:

           logger.info(f"{threading.current_thread().name} is working...")

           task()

           logger.info(f"{threading.current_thread().name} finish task...")

       else:

           time.sleep(1)  # rest

def main():

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   task_queue.append(take_order)

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

asynciothreading を使用する 2 つのコードスニペットを比較すると、どちらも同じような結果になります。 どちらのスニペットが優れており、なぜどちらか一方を選択すべきなのかが気になるかもしれません。

実際には、潜在的な競合状態やデッドロックを自分で処理する必要がない asyncio コードを書く方がマルチスレッド処理よりも簡単です。 制御はデフォルトでコルーチンによって渡されるため、ロックは必要ありません。 ただし、Python のスレッドは並行して実行される可能性があります。しかし、GIL が存在するため、ほとんどの場合は並列では実行されません。 これについては、次のブログ記事で nogil(スレッドフリー)Python について説明する際に話しましょう。

並行性の活用

プログラミングで並行性を利用したい理由は何でしょうか。 主な理由の 1 つは、スピードです。 上記のように、待機時間を短縮できればタスクをより迅速に完了できます。 コンピューティングではさまざまな待機処理が存在し、それぞれの処理に対してさまざまな時間節約の手法が使用される傾向があります。

I/O バウンドのタスク

タスクやプログラムの実行速度が主にファイルやネットワークからの読み取りやユーザー入力の待機などの I/O 処理の速度に制限されている場合、そのタスクやプログラムは入出力(I/O)バウンドだと考えられます。 I/O 処理は概して他の CPU 処理よりも遅いため、多数の I/O 処理が発生するタスクはかなりの長時間を要する可能性があります。 このようなタスクの典型的な例には、データベースからのデータの読み取り、ウェブリクエストの処理、大きなファイルの操作などがあります。

async/await の並行性を利用して処理シーケンスのブロックを解除し、他のタスクが待機中に処理されるようにすることで、I/O バウンドのタスクを実行中の待機時間を最適化することができます。

async/await の並行性は、データベース通信やウェブリクエストの処理が大量に発生するウェブアプリケーションなどの多くの Python アプリケーションに恩恵をもたらします。 ユーザーがアプリケーションと対話している間にバックグラウンドタスクを実行できるようにすると、GUI(グラフィカルユーザーインターフェース)も async/await による並行性の恩恵を受けられます。

CPU バウンドのタスク

タスクやプログラムの実行速度が主に CPU の速度に制限されている場合、そのタスクやプログラムは CPU バウンドだと考えられます。 このようなタスクの典型的な例には、サイズの変更や編集といった画像や動画の処理や、行列の操作や機械学習モデルのトレーニングといった複雑な数学計算などがあります。

CPU は元々複数のタスクを処理しているため、CPU バウンドのタスクは I/O バウンドのタスクのように async/await による並行性を利用した最適化はほぼ不可能です。 このような CPU バウンドのタスクは、マシンに複数の CPU が搭載されている場合や、タスクの一部を 1 つ以上の GPU にオフロードできる場合は、さらにスレッドを作成してマルチプロセス処理を実行することでより迅速に完了できます。 マルチプロセス処理は、このような CPU や GPU の使用方法を最適化できます。最近の多くの機械学習や AI モデルは複数の GPU でトレーニングされているのも、これが理由です。

ただし、純粋な Python コードでこれを実行するのは困難です。Python 自体が抽象レイヤーを提供しており、ユーザーが下位計算プロセスを制御する必要がないように設計されているためです。 さらに、Python の GIL が Python リソースをコンピューター上の複数のスレッドで共有することを制限しています。 最近、Python 3.13 によって GIL を取り除くことが可能になり、本来のマルチスレッド処理が可能になりました。 GIL 自体と GIL なしで処理する方法については、次回のブログ記事で説明します。

前述のどの手法でも CPU バウンドのタスクを十分に高速化できない場合があります。 そのような場合は CPU バウンドのタスクをより小さなタスクに分解し、複数のスレッド、複数のプロセッサ、さらには複数のマシンでそれらのタスクを同時に実行する必要があるかもしれません。 それは並列処理となるため、コードを完全に書き直して実装しなければならない場合があります。 Python の multiprocessing パッケージはローカルとリモートの両方の並行性が提供しているため、GIL の制限を回避するための手段として利用できます。 そのいくつかの例についても、次のブログ記事でご紹介します。

PyCharm で並行処理コードをデバッグする

非同期コードや並行処理コードのデバッグは困難な場合があります。プログラムが逐次的に実行されず、コードの実行箇所とタイミングを把握するのが難しいためです。 多くの開発者は print を使用してコードの流れを追跡していますが、このやり方はお勧めできません。非常に不格好であり、並行プログラムのような複雑なプログラムの調査に使用するのは簡単ではないためです。 また、後片付けも面倒です。

多くの IDE には、変数やプログラムの流れを検査するのに便利なデバッガーが備わっています。 また、デバッガーでは複数のスレッドを対象とするスタックトレースが分かりやすく表示されます。 このバーガーショップの例の task_queuePyCharm でどのように追跡できるか見てみましょう。

まず、コード内にブレークポイントをいくつか設定します。 ブレークポイントを設定するには、デバッガーを停止させたい行の行番号をクリックします。 行番号が赤い点に変わると、ブレークポイントが設定されたことになります。 task_queue が別々のスレッドで変更される 23 行目、27 行目、65 行目にブレークポイントを設定します。

その後、右上にある小さな虫のアイコンをクリックして、プログラムをデバッグモードで実行します。

アイコンをクリックすると、Debug(デバッグ)ウィンドウが開きます。 コード内でハイライトされている最初のブレークポイントに到達するまでプログラムが実行されます。

ここでは、John スレッドがタスクを取得しようとしており、65 行目がハイライトされているのが分かります。 この時点では、ハイライトされた行はまだ実行されていません。 これは、ブレークポイントに到達する前に変数を検査する必要がある場合に便利です。

では、task_queue の中を検査してみましょう。 Debug(デバッグ)ウィンドウで、以下のように入力を開始してください。

「task_queue」を選択するか入力し、Enter を押します。 take_order タスクがキューに格納されているのが分かります。

次に、以下のように Step in(ステップ実行)をクリックしてブレークポイントを実行しましょう。

それをクリックした後にポップアップ表示される Special Variables(特殊変数)ウィンドウを調べると、タスク変数が John スレッドで take_order となっていることが分かります。

もう一度 task_queue を検索すると、リストが空になります。

では、Resume Program(プログラムの再開)ボタンをクリックしてプログラムを実行しましょう。

プログラムがユーザー入力部分に到達すると、PyCharm で Console (コンソール)ウィンドウが開き、そこに入力できるようになります。 ハンバーガーを 2 個注文するとしましょう。 「2」と入力して Enter を押します。

2 つ目のブレークポイントに到達しました。 Threads & Variables(スレッドと変数)をクリックしてそのウィンドウに戻ると、入力したとおりに burger_num が「2」になっていることを確認できます。

次に、前回と同様にブレークポイントにステップインして task_queue を調べましょう。 make_burger タスクが 1 つ追加されたことが分かります。

もう一度プログラムを実行し、停止した時点でブレークポイントにステップインすると、Jane がタスクを取得しているのが分かります。

残りのコードはご自身で調べてみてください。 作業が完了したら、ウィンドウの上にある Stop(停止)ボタンを押します。

PyCharm のデバッガーを使用すると、複数の異なるスレッドにわたってプログラムの実行を追跡し、複数の異なる変数を非常に簡単に調べることができます。


まとめ

Python における並行性の基本を学習したので、後は実践を通じて習得していきましょう。 次のブログ記事では、Python GIL、その役割、および GIL が存在しない場合の影響について説明します。

PyCharm には、Python の並行処理コードを操作するための強力なツールが備わっています。 このブログ記事でご覧に入れたように、デバッガーでは非同期コードとスレッドコードの両方をステップ単位で調べることができるため、実行の流れを追跡し、共有リソースを監視し、問題を検出することができます。 PyCharm には直感的なブレークポイント、リアルタイムの変数ビュー、ユーザー入力用のシームレスなコンソールの統合、および堅牢なログ記録のサポートが備わっており、信頼性が高く分かりやすいため、アプリケーションをより簡単に作成、テスト、デバッグすることができます。

オリジナル(英語)ブログ投稿記事の作者:

Cheuk Ting Ho

Cheuk Ting Ho

image description

Discover more