How-To's Tutorials Web Development

Schnelleres Python: Nebenläufigkeit bei async/await und Threading

Read this post in other languages:

Faster Python Concurrency in async/await and threading

Wenn Sie schon eine Weile mit Python programmieren, und insbesondere wenn Sie Frameworks und Bibliotheken wie FastAPI und discord.py nutzen, haben Sie wahrscheinlich schon einmal async/await oder asyncio verwendet. Vielleicht haben Sie auch schon Aussagen wie „Es gibt kein echtes Multithreading in Python“ gehört, und vielleicht kennen Sie auch den berühmt-berüchtigten GIL in Python. Angesichts des „unechten“ Multithreadings in Python fragen Sie sich vielleicht, was der Unterschied zwischen async/await und Multithreading in Wirklichkeit ist – insbesondere bei der Programmierung in Python. Im folgenden Artikel beantworten wir Ihnen diese Frage!

Was ist Multithreading?

In der Programmierung bezieht sich Multithreading auf die Fähigkeit eines Programms, mehrere sequenzielle Aufgaben (sogenannte Threads) gleichzeitig auszuführen. Diese Threads können auf einem einzelnen Prozessorkern oder auf mehreren Kernen ausgeführt werden. Aufgrund der Beschränkung durch den Global Interpreter Lock (GIL) wird bei Multithreading in Python jedoch nur ein einziger Kern verwendet. Die Ausnahme ist die NoGIL-Version von Python (auch „thread-free“ genannt, also „mit freiem Threading“), die den GIL abschafft und auf die wir in Teil 2 dieser Reihe eingehen werden. In diesem Blogartikel gehen wir davon aus, dass der GIL stets vorhanden ist.

Was ist Nebenläufigkeit?

In der Programmierung bedeutet Nebenläufigkeit, dass der Computer mehr als eine Aufgabe gleichzeitig abarbeitet – oder zumindest diesen Anschein erweckt, auch wenn die unterschiedlichen Aufgaben auf einem einzigen Prozessor ausgeführt werden. Durch die Verwaltung von Ressourcen und Interaktionen zwischen unterschiedlichen Teilen eines Programms können verschiedene Aufgaben unabhängig voneinander und in sich überschneidenden Zeitintervallen voranschreiten.

In Python sind sowohl asyncio als auch threading scheinbar nebenläufig

Flüchtig betrachtet vermitteln die Python-Bibliotheken asyncio und threading beide den Eindruck von Nebenläufigkeit. In Wirklichkeit führen Ihre CPUs jedoch nicht mehrere Aufgaben zur genau gleichen Zeit aus. Sie erwecken nur diesen Eindruck.

Stellen Sie sich vor, Sie laden Gäste zu einem mehrgängigen Abendessen ein. Die Zubereitung einiger dieser Gänge ist zeitaufwändig – der Kuchen braucht Zeit, um im Ofen braun zu werden, und die Suppe muss länger auf dem Herd köcheln. Während wir darauf warten, dass diese Gerichte gar sind, stehen wir nicht nur herum und warten. In der Zwischenzeit tun wir etwas anderes. Ähnlich ist es auch mit der Nebenläufigkeit in Python. Manchmal muss Ihr Python-Prozess darauf warten, dass etwas erledigt wird. Zum Beispiel werden einige Ein-/Ausgabeprozesse (E/A) vom Betriebssystem abgewickelt, und diese Zeit muss der Python-Prozess einfach abwarten. Während dieser Wartezeit können wir mit async einen anderen Python-Prozess ausführen.

Python-Multithreading und asyncio

Der Unterschied liegt darin, wer das Sagen hat

Wenn sowohl asyncio als auch threading nebenläufig erscheinen, was ist dann der eigentliche Unterschied zwischen ihnen? Nun, der Hauptunterschied besteht darin, wer entscheidet, welcher Prozess wann ausgeführt wird. Der Ansatz von async/await wird manchmal auch als kooperative Nebenläufigkeit bezeichnet. Eine Coroutine oder ein Future gibt die Kontrolle an eine andere Coroutine oder ein anderes Future ab, damit auch andere Programmteile zum Zug kommen. Bei Threading hingegen hat der betriebssystemeigene Manager die Kontrolle darüber, welcher Prozess gerade ausgeführt wird.

Kooperative Nebenläufigkeit ist wie ein Meeting, bei dem das Mikrofon herumgereicht wird, damit die Teilnehmenden alle sprechen können. Die Person, die das Mikrofon hat, kann sprechen, und wenn sie fertig ist, gibt sie das Mikrofon an die nächste Person weiter. Im Gegensatz dazu entspricht Multithreading einem Meeting, bei dem jemand den Vorsitz hat, und diese Person bestimmt, wer zu einem bestimmten Zeitpunkt das Wort hat.

Nebenläufige Programmierung in Python

Sehen wir uns an, wie Nebenläufigkeit in Python funktioniert, indem wir einige beispielhafte Codeabschnitte schreiben. Wir werden ein Fastfood-Restaurant simulieren und dabei sowohl asyncio als auch threading einsetzen.

So funktioniert async/await in Python

Das Paket asyncio wurde in Python 3.4 und die Schlüsselwörter async und await in Python 3.5 eingeführt. Einer der wichtigsten Aspekte, die async/await möglich machen, ist die Verwendung von Coroutinen. Coroutinen in Python sind in Wirklichkeit Generatoren, die so umfunktioniert wurden, dass sie anhalten und die Kontrolle an die Hauptfunktion zurückgeben können.

Nehmen wir jetzt an, in einem Burger-Restaurant arbeitet gerade nur eine Person. Die Aufträge werden in der Reihenfolge ihres Eingangs abgearbeitet, und es sind keine asynchronen Operationen möglich:

import time


def make_burger(order_num):
    print(f"Preparing burger #{order_num}...")
    time.sleep(5) # time for making the burger
    print(f"Burger made #{order_num}")


def main():
    for i in range(3):
        make_burger(i)


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"Orders completed in {elapsed:0.2f} seconds.")

Dies wird eine Weile dauern:

Preparing burger #0...

Burger made #0

Preparing burger #1...

Burger made #1

Preparing burger #2...

Burger made #2

Orders completed in 15.01 seconds.

Nehmen wir jetzt an, das Restaurant stellt mehr Personal ein, sodass die Arbeit parallel erledigt werden kann:

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

Der Unterschied zwischen den beiden Szenarien ist deutlich:

Preparing burger #0...

Preparing burger #1...

Preparing burger #2...

Burger made #0

Burger made #1

Burger made #2

Orders completed in 5.00 seconds.

Mit den von asyncio bereitgestellten Funktionen wie run und gather sowie den Schlüsselwörtern async und await haben wir Coroutinen erstellt, die mehrere Burger parallel zubereiten können.

Wir gehen nun einen Schritt weiter und erstellen eine kompliziertere Simulation. Stellen Sie sich vor, wir haben nur zwei Arbeitskräfte und können nur zwei Burger auf einmal zubereiten.

import asyncio

import time

order_queue = asyncio.Queue()

def take_order():

  for i in range(3):

      order_queue.put_nowait(make_burger(i))

async def make_burger(order_num):

  print(f"Preparing burger #{order_num}...")

  await asyncio.sleep(5)  # time for making the burger

  print(f"Burger made #{order_num}")

class Staff:

  def __init__(self, name):

      self.name = name

  async def working(self):

      while order_queue.qsize() > 0:

          print(f"{self.name} is working...")

          task = await order_queue.get()

          await task

          print(f"{self.name} finished a task...")

async def main():

  staff1 = Staff(name="John")

  staff2 = Staff(name="Jane")

  take_order()

  await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

  s = time.perf_counter()

  asyncio.run(main())

  elapsed = time.perf_counter() - s

  print(f"Orders completed in {elapsed:0.2f} seconds.")

Wir speichern die Aufgaben in einer Warteschlange, und die Mitarbeitenden holen sie ab.

John is working...

Preparing burger #0...

Jane is working...

Preparing burger #1...

Burger made #0

John finished a task...

John is working...

Preparing burger #2...

Burger made #1

Jane finished a task...

Burger made #2

John finished a task...

Orders completed in 10.00 seconds.

In diesem Beispiel verwenden wir asyncio.Queue zum Speichern der Aufgaben – das ist insbesondere bei mehreren Aufgabenarten nützlich, wie wir sie im nächsten Beispiel verwenden werden.

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = input("Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = input("Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

In diesem Beispiel gibt es mehrere Aufgaben, darunter die Zubereitung von Pommes frites, die weniger Zeit in Anspruch nimmt, und das Aufnehmen von Bestellungen, also das Einholen von Benutzereingaben.

Beachten Sie, dass das Programm beim Warten auf Benutzereingaben anhält, und sogar die anderen Mitarbeitenden, die nicht mit dem Aufnehmen der Bestellung beschäftigt sind, stellen ihre Arbeit im Hintergrund ein. Das liegt daran, dass die Funktion input nicht asynchron ist und daher nicht await verwendet. Denken Sie daran: Die Kontrolle in asynchronem Code wird nur abgegeben, wenn await verwendet wird. Um dies zu beheben, ersetzen wir

input("Number of burgers:")

durch

await asyncio.to_thread(input, "Number of burgers:")

Das Gleiche tun wir auch bei der Pommes-Zubereitung – siehe den Code unten. Beachten Sie, dass das Programm jetzt in einer Endlosschleife läuft. Wenn wir es beenden wollen, können wir das Programm mit einer ungültigen Eingabe absichtlich zum Absturz bringen.

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = await asyncio.to_thread(input, "Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = await asyncio.to_thread(input, "Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

Durch die Verwendung von asyncio.to_thread haben wir die input-Funktion in einen separaten Thread ausgelagert (siehe diese Referenz). Beachten Sie jedoch: Wenn der Python-GIL vorhanden ist, funktioniert dieser Trick nur mit E/A-gebundenen Aufgaben.

Beim Ausführen des obigen Codes dürfte Ihnen außerdem auffallen, dass die Standard-Eingabe/Ausgabe im Terminal durcheinandergewürfelt ist. Die Benutzer-E/A sollte von der Protokollierung der Vorgänge getrennt werden. Wir können die Vorgänge in einem Protokoll speichern, um sie später zu untersuchen.

import asyncio

import logging

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename='pyburger.log', level=logging.INFO)

task_queue = asyncio.Queue()

order_num = 0

closing = False

async def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = await asyncio.to_thread(input, "Number of burgers:")

       for i in range(int(burger_num)):

           await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = await asyncio.to_thread(input, "Number of fries:")

       for i in range(int(fries_num)):

           await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       await task_queue.put(take_order())

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

async def make_burger(order_num):

   logger.info(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   logger.info(f"Burger made #{order_num}")

async def make_fries(order_num):

   logger.info(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   logger.info(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               logger.info(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               task_queue.task_done()

               logger.info(f"{self.name} finish task.")

           elif closing:

               return

           else:

               await asyncio.sleep(1) #rest

async def main():

   global task_queue

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   await asyncio.gather(staff1.working(), staff2.working())

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

In diesem letzten Codeblock haben wir die Infoausgaben zur Simulation in pyburger.log protokolliert und das Terminal für Mitteilungen an die Kundschaft reserviert. Außerdem fangen wir ungültige Eingaben während des Bestellvorgangs ab und setzen dann das closing-Flag auf True, in der Annahme, dass hierdurch das Programm beendet werden soll. Sobald das closing-Flag auf True gesetzt ist, kehrt der Worker mit return zurück, und damit wird auch die while-Endlosschleife der Coroutine beendet.

Wie funktioniert threading in Python?

Im obigen Beispiel haben wir eine E/A-gebundene Aufgabe in einen anderen Thread ausgelagert. Vielleicht fragen Sie sich, ob wir alle Aufgaben in separate Threads packen können, um sie parallel auszuführen. Versuchen wir, asyncio durch threading zu ersetzen.

Sehen Sie sich den unten gezeigten Code an, in dem wir Burger parallel zubereiten, ohne jegliche Einschränkungen vorzugeben:

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

```

Instead of creating async coroutines to make the burgers, we can just send functions down different threads like this:

```

import threading

import time

def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   time.sleep(5) # time for making the burger

   print(f"Burger made #{order_num}")

def main():

   order_queue = []

   for i in range(3):

       task = threading.Thread(target=make_burger, args=(i,))

       order_queue.append(task)

       task.start()

   for task in order_queue:

       task.join()

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

In der ersten for-Schleife in main werden Aufgaben in unterschiedlichen Threads erstellt und gestartet. Die zweite for-Schleife sorgt dafür, dass alle Burger fertig sind, bevor das Programm weitergeht (d. h. bevor es zu main zurückkehrt).

Wenn wir nur zwei Mitarbeitende haben, ist es komplizierter. Jede Arbeitskraft wird durch einen Thread repräsentiert, und sie entnehmen ihre Aufgaben aus einer normalen Liste, in der alle Aufgaben gespeichert sind.

import threading

import time

order_queue = []

def take_order():

   for i in range(3):

       order_queue.append(make_burger(i))

def make_burger(order_num):

   def making_burger():

       print(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       print(f"Burger made #{order_num}")

   return making_burger

def working():

     while len(order_queue) > 0:

         print(f"{threading.current_thread().name} is working...")

         task = order_queue.pop(0)

         task()

         print(f"{threading.current_thread().name} finish task...")

def main():

   take_order()

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

if __name__ == "__main__":

 s = time.perf_counter()

 main()

 elapsed = time.perf_counter() - s

 print(f"Orders completed in {elapsed:0.2f} seconds.")

Wenn Sie den obigen Code ausführen, kann in einem der Threads ein Fehler auftreten, wenn versucht wird, eine Aufgabe aus einer leeren Liste abzuholen. Vielleicht fragen Sie sich, warum dies passiert, wenn wir doch eine Bedingung in der while-Schleife haben, die bewirken soll, dass sie nur fortgesetzt wird, wenn task_queue nicht leer ist. Trotzdem ist es zum Fehler gekommen, weil eine „Wettlaufsituation“ aufgetreten ist – eine sogenannte Race Condition.

Race Conditions

Race Conditions können auftreten, wenn mehrere Threads gleichzeitig versuchen, auf dieselbe Ressource oder dieselben Daten zuzugreifen, und es dadurch zu Problemen im System kommt. Das Timing und die Reihenfolge der Zugriffe auf eine Ressource sind wichtig für die Programmlogik. Ein unvorhergesehener zeitlicher Ablauf oder der parallele Zugriff mehrerer Threads zum Ändern gemeinsam genutzter Daten kann Fehler verursachen.

Um Race Conditions in unserem Programm zu vermeiden, versehen wir task_queue mit einer Sperre:

queue_lock = threading.Lock()

Für unsere Arbeit müssen wir sicherstellen, dass wir Zugriffsrechte auf die Warteschlange haben, wenn wir ihre Länge überprüfen und Aufgaben aus ihr abrufen. Solange wir diese Rechte haben, können andere Threads nicht auf die Warteschlange zugreifen:

def working():

   while True:

       with queue_lock:

           if len(order_queue) == 0:

               return

           else:

               task = order_queue.pop(0)

       print(f"{threading.current_thread().name} is working...")

       task()

       print(f"{threading.current_thread().name} finish task...")

```

Based on what we have learned so far, we can complete our final code with threading like this:

```

import logging

import threading

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename="pyburger_threads.log", level=logging.INFO)

queue_lock = threading.Lock()

task_queue = []

order_num = 0

closing = False

def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = input("Number of burgers:")

       for i in range(int(burger_num)):

           with queue_lock:

               task_queue.append(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = input("Number of fries:")

       for i in range(int(fries_num)):

           with queue_lock:

               task_queue.append(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       with queue_lock:

           task_queue.append(take_order)

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

def make_burger(order_num):

   def making_burger():

       logger.info(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       logger.info(f"Burger made #{order_num}")

   return making_burger

def make_fries(order_num):

   def making_fries():

       logger.info(f"Preparing fried #{order_num}...")

       time.sleep(2)  # time for making fries

       logger.info(f"Fries made #{order_num}")

   return making_fries

def working():

   while True:

       with queue_lock:

           if len(task_queue) == 0:

               if closing:

                   return

               else:

                   task = None

           else:

               task = task_queue.pop(0)

       if task:

           logger.info(f"{threading.current_thread().name} is working...")

           task()

           logger.info(f"{threading.current_thread().name} finish task...")

       else:

           time.sleep(1)  # rest

def main():

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   task_queue.append(take_order)

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

Wenn Sie die beiden Codeblöcke mit asyncio bzw. threading vergleichen, sollten sie ähnliche Ergebnisse liefern. Vielleicht fragen Sie sich, welches der beiden Modelle besser ist und warum Sie das eine dem anderen vorziehen sollten.

In der Praxis ist das Schreiben von asyncio-Code einfacher als Multithreading, weil wir uns nicht selbst um potenzielle Race Conditions und Deadlocks kümmern müssen. Die Kontrolle wird standardmäßig von Coroutinen weitergegeben, sodass keine Sperren erforderlich sind. Allerdings können Python-Threads durchaus parallel ausgeführt werden – nur ist dies selten der Fall, wenn der GIL vorhanden ist. Wir werden darauf zurückkommen, wenn wir im nächsten Blogartikel auf die NoGIL-Version von Python mit freiem Threading eingehen.

Nebenläufigkeit sinnvoll einsetzen

Warum sollten wir bei der Programmierung auf Nebenläufigkeit setzen? Der Hauptgrund ist Geschwindigkeit. Wie wir oben gesehen haben, können Aufgaben schneller erledigt werden, wenn wir die Wartezeiten verkürzen können. Es gibt unterschiedliche Arten des Wartens bei Rechenvorgängen, davon abhängig verwenden wir unterschiedliche Methoden, um Zeit zu sparen.

E/A-gebundene Aufgaben

Eine Aufgabe oder ein Programm gilt als E/A-gebunden, wenn die Ausführungsgeschwindigkeit in erster Linie durch die Geschwindigkeit von Eingabe-/Ausgabe-Vorgänge begrenzt wird – etwa das Einlesen von Daten aus einer Datei oder einem Netzlaufwerk oder das Warten auf Benutzereingaben. E/A-Vorgänge sind meist langsamer als andere CPU-Operationen, und daher können Aufgaben, die zahlreiche solche Vorgänge enthalten, mit einem deutlichen Zeitaufschlag verbunden sein. Typische Beispiele für diese Aufgaben sind das Einlesen von Daten aus einer Datenbank, die Bearbeitung von Internetanfragen oder die Arbeit mit großen Dateien.

Nebenläufigkeit mit async/await kann dazu beitragen, die Wartezeit bei E/A-gebundenen Aufgaben zu optimieren, indem die Abfolge der Verarbeitungsschritte freigegeben wird und bei Wartezeiten andere Aufgaben erledigt werden können.

Die Verwendung von async/await ist in vielen Python-Anwendungen von Vorteil, z. B. in Web-Anwendungen, die häufig mit Datenbanken kommunizieren oder Web-Anfragen bearbeiten. Grafische Bedienoberflächen (GUIs) können ebenfalls von async/await profitieren, indem die Ausführung von Hintergrundaufgaben ermöglicht wird, während die Benutzer*innen mit der Anwendung interagieren.

CPU-gebundene Aufgaben

Eine Aufgabe oder ein Programm gilt als CPU-gebunden, wenn die Ausführungsgeschwindigkeit in erster Linie durch die Geschwindigkeit der CPU begrenzt ist. Typische Beispiele sind die Bild- oder Videobearbeitung, zum Beispiel Größenanpassungen oder andere Bildmanipulationen, sowie komplexe mathematische Berechnungen, etwa die Matrixmultiplikation oder das Modelltraining beim maschinellen Lernen.

Im Gegensatz zu E/A-gebundenen Aufgaben können CPU-gebundene Aufgaben nur selten durch async/await optimiert werden, da die CPU mit der Bearbeitung der Aufgaben bereits ausgelastet ist. Wenn Ihr System mehrere CPUs besitzt oder einige der Aufgaben an eine oder mehrere GPUs ausgelagert werden können, dann können CPU-gebundene Aufgaben schneller erledigt werden, indem mehrere Threads erstellt werden und Multiprocessing verwendet wird. Durch Multiprocessing kann die Auslastung dieser CPUs und GPUs optimiert werden. Dies ist auch der Grund, warum viele Machine-Learning-Systeme und KI-Modelle heutzutage auf Systemen mit vielen GPUs trainiert werden.

Mit reinem Python-Code ist dies jedoch schwer zu bewerkstelligen, da Python so konzipiert ist, dass den Benutzer*innen die Kontrolle über die grundlegenden Berechnungsprozesse abgenommen wird. Darüber hinaus schränkt der GIL die gemeinsame Nutzung von Ressourcen durch mehrere Python-Threads ein. Seit Python 3.13 ist es jedoch möglich, ohne GIL zu arbeiten, und dies ermöglicht echtes Multithreading. Der GIL – und die Möglichkeit, ohne ihn zu arbeiten – wird das Thema unseres nächsten Artikels sein.

Manchmal ist keine der oben genannten Methoden in der Lage, CPU-gebundene Aufgaben zufriedenstellend zu beschleunigen. In solchen Fällen müssen die CPU-gebundenen Aufgaben eventuell in kleinere Aufgaben gegliedert werden, damit sie auf mehrere Threads, mehrere Prozessoren oder gar mehrere Systeme aufgeteilt werden können. Für diese Art der Parallelverarbeitung müssen Sie Ihren Code allerdings möglicherweise komplett überarbeiten. In Python bietet das Paket multiprocessing sowohl lokale als auch Remote-Nebenläufigkeit, mit der die GIL-bedingten Einschränkungen umgangen werden können. Auch diesbezüglich wir werden uns im nächsten Blogartikel einige Beispiele ansehen.

Debuggen von nebenläufigem Code in PyCharm

Das Debuggen von asynchronem oder nebenläufigem Code kann schwierig sein, da die Programmausführung nicht sequenziell erfolgt, es ist also schwer zu erkennen, wo und wann der Code ausgeführt wird. Viele Entwickler*innen verwenden print, um den Codefluss nachzuvollziehen. Dieser Ansatz ist jedoch nicht zu empfehlen, da er sehr schwerfällig ist und die Untersuchung eines Programms, das Komplexitäten wie Nebenläufigkeit aufweist, damit nicht einfach ist. Außerdem ist es umständlich, den Code danach aufzuräumen.

Viele IDEs stellen Debugger bereit, die sich hervorragend zum Untersuchen von Variablen und des Programmablaufs eignen. Debugger stellen auch einen klaren Stack-Trace über mehrere Threads hinweg bereit. Sehen wir uns an, wie wir in PyCharm verfolgen können, was in unserer Restaurantsimulation mit task_queue geschieht.

Zunächst setzen wir einige Haltepunkte in unserem Code. Dazu können wir auf die Nummer der Zeile klicken, in der der Debugger die Ausführung anhalten soll. Die Zeilennummer wird durch einen roten Punkt ersetzt, um anzuzeigen, dass dort ein Haltepunkt gesetzt ist. Wir setzen Haltepunkte in den Zeilen 23, 27 und 65, wo task_queue in verschiedenen Threads geändert wird.

Dann können wir das Programm im Debug-Modus ausführen, indem wir oben rechts auf das kleine Käfersymbol klicken.

Nach Anklicken des Symbols öffnet sich das Debug-Fenster. Das Programm wird ausgeführt, bis es auf den ersten im Code markierten Haltepunkt trifft.

Hier sehen wir, dass der Thread John versucht, eine Aufgabe abzuholen, und Zeile 65 ist markiert. Zu diesem Zeitpunkt ist die markierte Zeile noch nicht ausgeführt worden. Dies ist nützlich, wenn wir die Variablen untersuchen wollen, bevor der Code am Haltepunkt ausgeführt wird.

Sehen wir uns an, was sich in task_queue befindet. Das lässt sich ganz einfach bewerkstelligen, indem wir im Debug-Fenster wie unten abgebildet mit der Eingabe des Variablennamens beginnen.

Wählen Sie „task_queue“ aus oder geben Sie es ein, und drücken Sie dann Enter. Sie werden sehen, dass sich die Aufgabe take_order in der Warteschlange befindet.

Führen Sie nun den Haltepunkt aus, indem Sie wie unten abgebildet auf die Schaltfläche Step in klicken.

Nachdem Sie auf diese Schaltfläche geklickt und das sich öffnende Fenster Special Variables untersucht haben, sehen Sie, dass die Aufgabenvariable jetzt take_order im Thread John ist.

Wenn Sie sich task_queue erneut ansehen, werden Sie feststellen, dass die Liste jetzt leer ist.

Klicken Sie nun auf die Schaltfläche Resume Program und lassen Sie das Programm weiterlaufen.

Wenn das Programm die Benutzereingabe erreicht, zeigt uns PyCharm das Console-Fenster an, um uns die Eingabe zu ermöglichen. Nehmen wir an, wir möchten zwei Burger. Wir geben 2 ein und drücken Enter.

Jetzt kommen wir zum zweiten Haltepunkt. Wenn wir auf Threads & Variables klicken, um zu diesem Fenster zurückzukehren, sehen wir, dass burger_num jetzt entsprechend unserer Eingabe den Wert 2 hat.

Wir führen nun mit einem Einzelschritt den Code am Haltepunkt aus und überprüfen wie zuvor die Variable task_queue. Wir sehen, dass eine Aufgabe make_burger hinzugefügt wurde.

Wir lassen das Programm erneut weiterlaufen, und wenn wir nach dem erneuten Anhalten den Haltepunkt mit einem Einzelschritt ausführen, sehen wir, dass Jane die Aufgabe abholt.

Den Rest des Codes können Sie nun eigenständig untersuchen. Wenn Sie fertig sind, klicken Sie einfach auf die rote Stop-Schaltfläche am oberen Fensterrand.

Mit dem Debugger in PyCharm können Sie die Ausführung Ihres Programms über verschiedene Threads hinweg verfolgen und sehr einfach unterschiedliche Variablen untersuchen.


Fazit

Jetzt kennen Sie die Grundlagen der Nebenläufigkeit in Python, und ich bin zuversichtlich, dass Sie das Thema mit etwas Übung meistern werden. Im nächsten Blogartikel werfen wir einen Blick auf den GIL in Python, die Rolle, die dieser spielt, und was sich ändert, wenn er nicht vorhanden ist.

PyCharm stellt leistungsstarke Werkzeuge für die Arbeit mit nebenläufigem Python-Code bereit. Wie in diesem Blogartikel gezeigt, ermöglicht der Debugger das schrittweise Untersuchen von asynchron oder in Threads ausgeführtem Code und hilft Ihnen damit, den Ausführungsfluss zu verfolgen, gemeinsame Ressourcen zu überwachen und Probleme zu erkennen. Mit intuitiven Haltepunkten, der Anzeige von Variableninhalten in Echtzeit, nahtloser Einbindung der Konsole für Benutzereingaben und einer robusten Protokollierungsunterstützung erleichtert Ihnen PyCharm das Schreiben, Testen und Debuggen von Anwendungen.

Autorin des ursprünglichen Blogposts

image description