How-To's Tutorials Web Development

더 빠른 Python: async/await와 threading을 활용한 동시성

Read this post in other languages:

Python으로 코딩한 경험이 있다면, 특히 FastAPI나 discord.py 같은 프레임워크나 라이브러리를 사용해 본 적이 있다면 async/await 또는 asyncio를 사용해 보셨을 겁니다. “Python의 멀티스레딩은 진짜가 아니다”라는 말을 들어보셨을 수도 있고, Python의 유명한(혹은 악명 높은) GIL을 알고 계실 수도 있습니다. 이처럼 Python의 멀티스레딩에 회의적인 상황에서 async/await와 멀티스레딩이 특히 Python 프로그래밍 차원에서 실제로 어떻게 다른지 궁금하실 수 있습니다. 그렇다면 이 블로그 글은 바로 여러분을 위한 것입니다!

멀티스레딩이란?

프로그래밍에서 멀티스레딩이란 하나의 프로그램이 여러 개의 연속적인 작업(즉, 스레드)을 동시에 실행할 수 있는 능력을 의미합니다. 이러한 스레드는 단일 프로세서 코어에서 실행될 수도 있고, 여러 코어에 분산되어 실행될 수도 있습니다. 하지만 Python에서는 Global Interpreter Lock(GIL)의 제약 때문에 멀티스레딩이 단일 코어에서만 처리됩니다. 예외적으로 GIL을 제거한 nogil(스레드 프리라고도 함) Python도 있지만, 이에 대해서는 이 시리즈의 2부에서 다룰 예정입니다. 이 블로그 글에서는 GIL이 항상 있다고 전제하겠습니다.

동시성이란?

프로그래밍에서 동시성이란 컴퓨터가 한 번에 여러 작업을 수행하거나, 실제로는 단일 프로세서에서 여러 작업이 번갈아 실행되더라도 동시에 여러 작업을 수행하는 것처럼 보이는 상태를 의미합니다. 프로그램의 여러 구성 요소 간 리소스와 상호작용을 관리하여 서로 다른 작업이 독립적으로, 시간을 겹쳐 진행될 수 있게 합니다.

동시적으로 보이는 Python의 asynciothreading

대략적으로 말하자면, Python의 asynciothreading 라이브러리는 모두 동시성이 구현된 것처럼 보이도록 합니다. 하지만 실제로 CPU가 완전히 동시에 여러 작업을 처리하고 있는 것은 아닙니다. 단지 그렇게 보일 뿐이죠.

손님을 초대해 여러 디너 코스를 준비하고 있다고 상상해 보세요. 일부 요리는 조리하는 데 시간이 걸립니다. 예를 들어, 오븐에서 구워야 하는 파이나 가스레인지 위에서 천천히 끓여야 하는 수프가 그렇습니다. 이 요리가 완성되기를 기다리는 동안 가만히 서서 기다리기만 하지는 않습니다. 그 사이에 다른 일을 처리합니다. 이것이 바로 Python의 동시성과 유사합니다. Python 프로세스가 어떤 작업이 완료되기를 기다리기는 경우도 있습니다. 예를 들어, 입출력(I/O) 프로세스를 운영 체제가 처리하고 있으면 Python 프로세스가 그저 기다리고 있을 수 있습니다. 이때 async를 사용하면 기다리는 동안 다른 Python 프로세스를 실행할 수 있습니다.

Python의 멀티스레딩과 asyncio 비교

제어의 주체 차이

asynciothreading이 모두 동시적으로 보인다면 둘 사이에는 어떤 차이가 있을까요? 가장 큰 차이점은 언제 어떤 프로세스를 실행할지 결정하는 주체에 있습니다. async/await 방식은 때때로 협력형 동시성이라고 불립니다. 코루틴이나 퓨처는 제어권을 다른 코루틴이나 퓨처에 넘겨, 다른 프로세스가 실행되도록 합니다. 반면 threading에서는 어떤 프로세스를 실행할지 운영 체제의 관리자가 제어합니다.

협력형 동시성은 마이크를 돌려가며 발언하는 회의에 비유할 수 있습니다. 마이크를 가진 사람이 말을 하고, 더 이상 할 말이 없으면 다른 사람에게 마이크를 넘기는 것이죠. 반면, 멀티스레딩은 회의 사회자가 누가 발언할지를 지정해주는 방식입니다. 

Python으로 동시성 코드 작성해 보기

이제 Python에서 동시성이 실제로 어떻게 동작하는지를 확인해 보겠습니다. asynciothreading을 모두 사용하여 패스트푸드 음식점 시뮬레이션을 만들어 보겠습니다.

Python에서 async/await의 작동 방

asyncio 패키지는 Python 3.4에서 도입되었으며, asyncawait 키워드는 Python 3.5에서 도입되었습니다. async/await를 가능하게 하는 주요 요소 중 하나는 코루틴 사용입니다. Python의 코루틴은 일시 중지하고 메인 함수로 제어를 다시 넘길 수 있도록 재구성된 제너레이터입니다.

햄버거 가게에 근무 중인 직원이 한 명뿐이라고 가정해 보겠습니다. 주문은 선입선출 방식의 대기열로 처리되며, 비동기 작업은 수행할 수 없습니다.

import time


def make_burger(order_num):
    print(f"Preparing burger #{order_num}...")
    time.sleep(5) # time for making the burger
    print(f"Burger made #{order_num}")


def main():
    for i in range(3):
        make_burger(i)


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"Orders completed in {elapsed:0.2f} seconds.")

이 작업은 완료되는 데 시간이 좀 걸립니다.

Preparing burger #0...

Burger made #0

Preparing burger #1...

Burger made #1

Preparing burger #2...

Burger made #2

Orders completed in 15.01 seconds.

이제 가게에 직원이 더 늘어나서, 여러 작업을 동시에 처리할 수 있다고 가정해 보겠습니다.

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

두 방식의 차이가 뚜렷해집니다.

Preparing burger #0...

Preparing burger #1...

Preparing burger #2...

Burger made #0

Burger made #1

Burger made #2

Orders completed in 5.00 seconds.

asyncio에서 제공하는 rungather 같은 함수, 그리고 asyncawait 키워드를 사용하여 햄버거를 동시에 만들 수 있는 코루틴을 생성했습니다.

이제 한 걸음 더 나아가 좀 더 복잡한 시뮬레이션을 만들어 보겠습니다. 직원이 두 명뿐이라서 한 번에 햄버거를 두 개만 만들 수 있다고 가정해 봅시다.

import asyncio

import time

order_queue = asyncio.Queue()

def take_order():

  for i in range(3):

      order_queue.put_nowait(make_burger(i))

async def make_burger(order_num):

  print(f"Preparing burger #{order_num}...")

  await asyncio.sleep(5)  # time for making the burger

  print(f"Burger made #{order_num}")

class Staff:

  def __init__(self, name):

      self.name = name

  async def working(self):

      while order_queue.qsize() > 0:

          print(f"{self.name} is working...")

          task = await order_queue.get()

          await task

          print(f"{self.name} finished a task...")

async def main():

  staff1 = Staff(name="John")

  staff2 = Staff(name="Jane")

  take_order()

  await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

  s = time.perf_counter()

  asyncio.run(main())

  elapsed = time.perf_counter() - s

  print(f"Orders completed in {elapsed:0.2f} seconds.")

대기열을 사용해 작업을 보류시키면 직원들이 작업을 하나씩 가져가 처리하게 됩니다.

John is working...

Preparing burger #0...

Jane is working...

Preparing burger #1...

Burger made #0

John finished a task...

John is working...

Preparing burger #2...

Burger made #1

Jane finished a task...

Burger made #2

John finished a task...

Orders completed in 10.00 seconds.

이 예시에서는 asyncio.Queue를 작업 저장용으로 사용하지만, 다음 예시처럼 여러 종류의 작업이 있는 경우에 더 다양하게 활용할 수 있습니다.

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = input("Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = input("Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

이 예시에는 여러 개의 작업이 있으며, 감자튀김 만들기처럼 시간이 적게 걸리는 일도 있고, 주문 받기처럼 사용자의 입력이 필요한 일도 포함되어 있습니다. 

주목할 점은 프로그램이 사용자 입력을 기다리면서 멈추고, 주문을 받지 않는 다른 직원까지도 뒤에서 작업을 멈춘다는 것입니다. 이는 input 함수가 비동기가 아니기 때문에 await할 수 없어서입니다. 비동기 코드에서는 await될 때에만 제어권이 해제된다는 점을 기억하세요. 이 문제를 해결하려면 다음 코드를

input("Number of burgers:")

다음으로 바꿉니다. 

await asyncio.to_thread(input, "Number of burgers:")

감자튀김에도 똑같이 적용합니다. 아래 코드를 참고하세요. 현재 프로그램은 무한 루프로 실행되므로 중지하려면 일부러 잘못 입력하여 프로그램을 종료할 수 있습니다.

import asyncio

import time

task_queue = asyncio.Queue()

order_num = 0

async def take_order():

   global order_num

   order_num += 1

   print(f"Order burger and fries for order #{order_num:04d}:")

   burger_num = await asyncio.to_thread(input, "Number of burgers:")

   for i in range(int(burger_num)):

       await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

   fries_num = await asyncio.to_thread(input, "Number of fries:")

   for i in range(int(fries_num)):

       await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

   print(f"Order #{order_num:04d} queued.")

   await task_queue.put(take_order())

async def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   print(f"Burger made #{order_num}")

async def make_fries(order_num):

   print(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   print(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               print(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               print(f"{self.name} finish task...")

           else:

               await asyncio.sleep(1) #rest

async def main():

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   await asyncio.gather(staff1.working(), staff2.working())

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

asyncio.to_thread를 사용하여 input 함수를 별도의 스레드에 배치했습니다(이 문서 참조). 단, 이 방식은 Python GIL이 있을 때에만 I/O 바운드 작업을 블로킹 해제할 수 있다는 점에 유의해야 합니다.

위 코드를 실행해 보면, 터미널의 표준 I/O가 뒤섞여 출력되는 것을 볼 수 있습니다. 사용자 I/O와 프로그램 실행 기록은 분리되어야 합니다. 실행 기록은 로그로 저장해 두었다가 나중에 확인할 수 있습니다. 

import asyncio

import logging

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename='pyburger.log', level=logging.INFO)

task_queue = asyncio.Queue()

order_num = 0

closing = False

async def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = await asyncio.to_thread(input, "Number of burgers:")

       for i in range(int(burger_num)):

           await task_queue.put(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = await asyncio.to_thread(input, "Number of fries:")

       for i in range(int(fries_num)):

           await task_queue.put(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       await task_queue.put(take_order())

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

async def make_burger(order_num):

   logger.info(f"Preparing burger #{order_num}...")

   await asyncio.sleep(5)  # time for making the burger

   logger.info(f"Burger made #{order_num}")

async def make_fries(order_num):

   logger.info(f"Preparing fries #{order_num}...")

   await asyncio.sleep(2)  # time for making fries

   logger.info(f"Fries made #{order_num}")

class Staff:

   def __init__(self, name):

       self.name = name

   async def working(self):

       while True:

           if task_queue.qsize() > 0:

               logger.info(f"{self.name} is working...")

               task = await task_queue.get()

               await task

               task_queue.task_done()

               logger.info(f"{self.name} finish task.")

           elif closing:

               return

           else:

               await asyncio.sleep(1) #rest

async def main():

   global task_queue

   task_queue.put_nowait(take_order())

   staff1 = Staff(name="John")

   staff2 = Staff(name="Jane")

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   await asyncio.gather(staff1.working(), staff2.working())

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   asyncio.run(main())

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

마지막 코드 블록에서는 시뮬레이션 정보를 pyburger.log에 기록하고, 터미널은 고객 메시지용으로만 두었습니다. 또한 주문 과정 중에 잘못된 입력이 발생하면 이를 포착하여, 사용자가 종료를 원한다고 가정하고 closing 플래그를 True로 전환합니다. closing 플래그가 True로 설정되면, 직원은 return을 실행하여 코루틴의 무한 while 루프를 종료합니다.

Python에서 threading의 작동 방식

위의 예시에서는 I/O 바운드 작업을 별도의 스레드에 배치했습니다. 그런데 모든 작업을 각각의 스레드에 넣고 동시에 실행되도록 만들 수는 없을까요? 이번에는 asyncio 대신 threading을 사용해 보겠습니다.

아래에 나와 있는 코드처럼 아무런 제한 없이 햄버거를 동시에 만드는 구조를 생각해 봅시다.

import asyncio

import time

async def make_burger(order_num):

    print(f"Preparing burger #{order_num}...")

    await asyncio.sleep(5) # time for making the burger

    print(f"Burger made #{order_num}")

async def main():

    order_queue = []

    for i in range(3):

        order_queue.append(make_burger(i))

    await asyncio.gather(*(order_queue))

if __name__ == "__main__":

    s = time.perf_counter()

    asyncio.run(main())

    elapsed = time.perf_counter() - s

    print(f"Orders completed in {elapsed:0.2f} seconds.")

```

Instead of creating async coroutines to make the burgers, we can just send functions down different threads like this:

```

import threading

import time

def make_burger(order_num):

   print(f"Preparing burger #{order_num}...")

   time.sleep(5) # time for making the burger

   print(f"Burger made #{order_num}")

def main():

   order_queue = []

   for i in range(3):

       task = threading.Thread(target=make_burger, args=(i,))

       order_queue.append(task)

       task.start()

   for task in order_queue:

       task.join()

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   print(f"Orders completed in {elapsed:0.2f} seconds.")

main 함수의 첫 번째 for 루프에서는, 각 작업이 서로 다른 스레드에서 생성되어 실행되기 시작합니다. 두 번째 for 루프는 프로그램이 그 다음 단계로 넘어가기 전에(즉, main으로 돌아가기 전에) 모든 햄버거가 제작 완료되도록 합니다.

직원이 두 명뿐일 경우에는 상황이 더 복잡해집니다. 직원 각각은 하나의 스레드로 표현되며, 모든 작업이 저장된 일반 목록에서 작업을 가져갑니다.

import threading

import time

order_queue = []

def take_order():

   for i in range(3):

       order_queue.append(make_burger(i))

def make_burger(order_num):

   def making_burger():

       print(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       print(f"Burger made #{order_num}")

   return making_burger

def working():

     while len(order_queue) > 0:

         print(f"{threading.current_thread().name} is working...")

         task = order_queue.pop(0)

         task()

         print(f"{threading.current_thread().name} finish task...")

def main():

   take_order()

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

if __name__ == "__main__":

 s = time.perf_counter()

 main()

 elapsed = time.perf_counter() - s

 print(f"Orders completed in {elapsed:0.2f} seconds.")

위 코드를 실행하면 빈 목록에서 작업을 가져오려 한다는 메시지와 함께 오류가 한 스레드에서 발생할 수 있습니다. task_queue가 비어 있지 않을 때만 while 루프가 계속 실행되도록 조건을 걸어두었는데 왜 이런 오류가 발생하는지 궁금하실 수 있습니다. 그 이유는 경쟁 조건이 발생했기 때문입니다.

경쟁 조건

경쟁 조건은 여러 스레드가 동시에 동일한 리소스나 데이터에 액세스하려고 할 때 발생하며 시스템에 문제를 일으킬 수 있습니다. 리소스에 접근하는 타이밍과 순서는 프로그램 로직에 중요하며, 여러 스레드가 예측할 수 없는 타이밍에 또는 서로 겹쳐서 공유 데이터에 액세스하거나 이를 수정하면 오류가 발생할 수 있습니다.

프로그램에서 발생하는 경쟁 조건을 해결하기 위해 task_queue에 lock을 넣겠습니다.

queue_lock = threading.Lock()

작업을 수행하려면, 대기열의 길이를 확인하거나 작업을 가져올 때 해당 대기열에 액세스할 수 있어야 합니다. 한 스레드에 액세스 권한이 있는 동안에는 다른 스레드가 해당 대기열에 액세스할 수 없습니다.

def working():

   while True:

       with queue_lock:

           if len(order_queue) == 0:

               return

           else:

               task = order_queue.pop(0)

       print(f"{threading.current_thread().name} is working...")

       task()

       print(f"{threading.current_thread().name} finish task...")

```

Based on what we have learned so far, we can complete our final code with threading like this:

```

import logging

import threading

import time

logger = logging.getLogger(__name__)

logging.basicConfig(filename="pyburger_threads.log", level=logging.INFO)

queue_lock = threading.Lock()

task_queue = []

order_num = 0

closing = False

def take_order():

   global order_num, closing

   try:

       order_num += 1

       logger.info(f"Taking Order #{order_num:04d}...")

       print(f"Order burger and fries for order #{order_num:04d}:")

       burger_num = input("Number of burgers:")

       for i in range(int(burger_num)):

           with queue_lock:

               task_queue.append(make_burger(f"{order_num:04d}-burger{i:02d}"))

       fries_num = input("Number of fries:")

       for i in range(int(fries_num)):

           with queue_lock:

               task_queue.append(make_fries(f"{order_num:04d}-fries{i:02d}"))

       logger.info(f"Order #{order_num:04d} queued.")

       print(f"Order #{order_num:04d} queued, please wait.")

       with queue_lock:

           task_queue.append(take_order)

   except ValueError:

       print("Goodbye!")

       logger.info("Closing down... stop taking orders and finish all tasks.")

       closing = True

def make_burger(order_num):

   def making_burger():

       logger.info(f"Preparing burger #{order_num}...")

       time.sleep(5)  # time for making the burger

       logger.info(f"Burger made #{order_num}")

   return making_burger

def make_fries(order_num):

   def making_fries():

       logger.info(f"Preparing fried #{order_num}...")

       time.sleep(2)  # time for making fries

       logger.info(f"Fries made #{order_num}")

   return making_fries

def working():

   while True:

       with queue_lock:

           if len(task_queue) == 0:

               if closing:

                   return

               else:

                   task = None

           else:

               task = task_queue.pop(0)

       if task:

           logger.info(f"{threading.current_thread().name} is working...")

           task()

           logger.info(f"{threading.current_thread().name} finish task...")

       else:

           time.sleep(1)  # rest

def main():

   print("Welcome to Pyburger!")

   logger.info("Ready for business!")

   task_queue.append(take_order)

   staff1 = threading.Thread(target=working, name="John")

   staff1.start()

   staff2 = threading.Thread(target=working, name="Jane")

   staff2.start()

   staff1.join()

   staff2.join()

   logger.info("All tasks finished. Closing now.")

if __name__ == "__main__":

   s = time.perf_counter()

   main()

   elapsed = time.perf_counter() - s

   logger.info(f"Orders completed in {elapsed:0.2f} seconds.")

asynciothreading을 사용한 두 코드 스니펫을 비교해 보면 결과는 비슷합니다. 그러면 더 나은 것은 무엇이고, 둘 중 하나를 선택해야 하는 이유는 무엇일까요?

실용적인 관점에서 보면 asyncio 코드를 작성하는 것이 멀티스레딩보다 더 쉽습니다. 경쟁 조건이나 교착 상태 같은 문제를 신경 쓸 필요가 없기 때문입니다. 제어가 기본적으로 코루틴 간에 전달되기 때문에 lock이 필요 없습니다. 반면, Python 스레드는 병렬 실행이 가능하지만, GIL이 있는 한 대부분의 경우에는 실제로 병렬로 실행되지 않습니다. 이에 대해서는 다음 블로그 글에서 nogil(스레드 프리) Python을 다룰 때 다시 이야기하겠습니다.

동시성의 이점

프로그래밍에서 왜 동시성을 사용해야 할까요? 바로 속도 때문입니다. 앞서 살펴본 것처럼 대기 시간을 줄이면 전체 작업을 더 빠르게 끝낼 수 있습니다. 컴퓨팅에는 다양한 형태의 대기 시간이 있으며, 각각의 경우에 따라 시간을 절약하기 위해 서로 다른 방법을 사용합니다.

I/O 바운드 작업

작업 또는 프로그램이 I/O 바운드라고 할 수 있는 경우는 해당 작업의 실행 속도가 기본적으로 파일이나 네트워크에서 읽어오거나 사용자 입력을 대기하는 등의 입출력을 처리하는 속도로 제한되는 경우입니다. I/O 처리는 일반적으로 CPU 처리보다 느리므로 I/O가 많은 작업은 상당히 오래 걸릴 수 있습니다. 이러한 작업의 대표적인 예로는 데이터베이스에서 데이터 읽어오기, 웹 요청 처리, 대용량 파일 작업 등이 있습니다.

async/await 동시성을 사용하면 처리 흐름을 블로킹 해제하고 대기하는 동안 다른 작업이 처리되도록 하여, I/O 바운드 작업 중 발생하는 대기 시간을 최적화할 수 있습니다.

async/await 동시성은 데이터베이스와의 통신이나 웹 요청 처리가 많은 웹 애플리케이션 등의 여러 Python 애플리케이션에서 유용하게 활용됩니다. GUI(그래픽 사용자 인터페이스)에서도 async/await 동시성을 활용하면 사용자가 애플리케이션과 상호작용하는 동안 백그라운드 작업이 실행되도록 할 수 있습니다.

CPU 바운드 작업

작업이나 프로그램이 CPU 바운드라고 할 수 있는 경우는 실행 속도가 기본적으로 CPU의 처리 속도로 제한될 때입니다. 대표적인 예로는 이미지 또는 동영상 처리(예: 크기 변경, 편집)와 행렬 곱셈이나 머신러닝 모델 트레이닝과 같은 복잡한 수치 계산 작업이 있습니다.

I/O 바운드 작업과 달리, CPU 바운드 작업은 CPU가 이미 해당 작업을 처리하는 데 사용되고 있기 때문에 async/await 동시성으로는 거의 최적화할 수 없습니다. 시스템에 CPU가 두 개 이상 있거나 일부 작업을 하나 이상의 GPU로 오프로드할 수 있다면, 스레드를 늘리고 멀티프로세싱을 수행하여 CPU 바운드 작업을 더 빠르게 완료할 수 있습니다. 멀티프로세싱은 CPU와 GPU의 활용을 최적화할 수 있으며, 이는 요즘 많은 머신러닝 및 AI 모델이 여러 GPU에서 트레이닝되는 이유이기도 합니다.

그러나 이는 순수 Python 코드만으로는 수행하기 어렵습니다. Python 자체가 사용자가 저수준 컴퓨팅 프로세스를 직접 제어하지 않아도 되도록 추상화된 계층 구조로 설계되었기 때문입니다. 게다가 Python의 GIL은 하나의 컴퓨터에서 여러 스레드가 Python 리소스를 공유하는 것을 제한합니다. 최근 Python 3.13에서는 GIL을 제거할 수 있는 기능이 도입되어 진정한 멀티스레딩이 가능해졌습니다. GIL과 이를 제거하고 실행하는 방법에 대해서는 다음 블로그 글에서 자세히 다루겠습니다.

경우에 따라 위에서 소개한 방법으로도 CPU 바운드 작업의 속도를 충분히 끌어올리지 못할 수 있습니다. 그럴 경우, CPU 바운드 작업을 더 세부적으로 나눈 뒤, 여러 스레드, 여러 프로세서 또는 여러 대의 컴퓨터에 분산해 동시에 처리해야 할 수도 있습니다. 이는 병렬 처리에 해당하며, 이를 구현하려면 코드를 처음부터 다시 작성해야 할 수도 있습니다. Python에서는 multiprocessing 패키지를 통해 로컬 및 원격 동시성을 모두 구현할 수 있으며, 이를 활용하면 GIL의 제약을 우회할 수 있습니다. 이와 관련된 예시는 다음 블로그 글에서 함께 살펴보겠습니다.

PyCharm의 동시성 코드 디버그

비동기 또는 동시성 코드는 프로그램이 순차적으로 실행되지 않기 때문에 언제 어디에서 코드가 실행되고 있는지 파악하기 어려워서 디버그하기가 까다롭습니다. 많은 개발자들이 코드 흐름을 추적하기 위해 print를 사용하지만, 이 방법은 매우 까다로울 뿐만 아니라 동시성처럼 복잡한 프로그램을 조사할 때 사용하기 쉽지 않기 때문에 권장되지 않습니다. 게다가 지저분해서 나중에 정리해야 합니다.

대부분의 IDE는 프로그램의 변수와 흐름을 검사하는 데 적합한 디버거를 제공합니다. 또한 디버거는 여러 스레드에 걸쳐 명확한 스택 추적을 수행합니다. PyCharm으로 음식점을 시뮬레이션한 이 예시에서 task_queue를 어떻게 추적할 수 있는지 살펴보겠습니다.

먼저 코드에 몇 개의 중단점을 설정하겠습니다. 디버거를 일시 중지시킬 위치의 줄 번호를 클릭하면 중단점을 설정할 수 있습니다. 그러면 중단점이 설정되었음을 나타내는 빨간 점이 줄 번호에 표시됩니다. task_queue가 서로 다른 스레드에서 변경되는 위치인 23번, 27번, 65번 줄에 중단점을 설정하겠습니다.

그런 다음, 오른쪽 상단에 있는 작은 벌레 모양 아이콘을 클릭하면 디버그 모드로 프로그램을 실행할 수 있습니다.

아이콘을 클릭하면 Debug(디버그) 창이 열립니다. 프로그램은 코드에서 강조 표시된 첫 번째 중단점에 도달할 때까지 실행됩니다.

여기서는 John 스레드가 작업을 가져가려는 중이며, 65번 줄이 강조 표시되어 있는 것을 확인할 수 있습니다. 이 시점에는 강조 표시된 줄이 아직 실행되지 않았습니다. 이 방법은 중단점에 진입하기 전에 변수를 검사하고 싶을 때 유용합니다.

task_queue에 어떤 값이 들어 있는지 확인해 보겠습니다. 아래와 같이 Debug(디버그) 창에 입력을 시작하면 간단하게 확인할 수 있습니다.

“task_queue”를 선택하거나 직접 입력한 뒤 Enter를 누르세요. take_order 작업이 대기열에 들어 있는 것을 확인할 수 있습니다.

이제 아래와 같이 Step in(스텝인) 버튼을 클릭하여 중단점을 실행해 보겠습니다.

해당 버튼을 누른 뒤 나타나는 Special Variables(특수 변수) 창을 보면, John 스레드에서 task 변수가 take_order인 것을 확인할 수 있습니다.

task_queue를 다시 검색해 보면, 이제 목록이 비어 있음을 알 수 있습니다.

이제 Resume Program(프로그램 재개) 버튼을 클릭하여 프로그램을 계속 실행해 보겠습니다.

프로그램이 사용자 입력 부분에 도달하면, PyCharm이 Console(콘솔) 창을 표시하여 사용자가 입력을 제공할 수 있도록 합니다. 예를 들어, 햄버거 두 개를 주문한다고 가정해 보겠습니다. “2”를 입력한 다음 Enter를 누르세요.

이제 두 번째 중단점에 도달했습니다. Threads & Variables(스레드 및 변수) 창으로 다시 돌아가면, burger_num 값이 우리가 입력한 대로 2로 설정되어 있는 것을 확인할 수 있습니다.

이제 이전과 마찬가지로 중단점을 따라 들어가 task_queue를 확인해 보겠습니다. make_burger 작업이 하나 추가된 것을 확인할 수 있습니다.

프로그램을 다시 실행한 뒤, 중단 시점에서 중단점에 들어가 보면 Jane이 작업을 가져가고 있는 것을 확인할 수 있습니다.

나머지 코드는 직접 살펴보시기 바랍니다. 작업이 끝나면, 창 상단에 있는 빨간색 Stop(중지) 버튼을 누르면 됩니다.

PyCharm의 디버거를 사용하면 여러 스레드에 걸쳐 프로그램의 실행을 따라가고, 다양한 변수를 매우 쉽게 검사할 수 있습니다.


결론

지금까지 Python의 동시성에 대해 기본적인 내용을 살펴보았습니다. 이를 연습하여 완벽하게 다루게 되시길 바랍니다. 다음 블로그 글에서는 Python GIL에 대해 알아보고, 그 역할과 GIL이 없을 때 달라지는 점도 함께 살펴보겠습니다.

PyCharm은 동시성 Python 코드 작업에 강력한 도구를 제공합니다. 이 블로그에서 소개한 것처럼 PyCharm의 디버거를 사용하면 비동기 코드와 스레드 코드 모두를 단계별로 검사하고, 실행 흐름을 추적하고, 공유 리소스를 모니터링하며, 문제를 빠르게 탐지할 수 있습니다. PyCharm은 직관적인 중단점, 실시간 변수 뷰, 사용자 입력을 위한 매끄러운 콘솔 통합, 강력한 로깅 지원을 통해 애플리케이션을 자신 있고 명확하고 쉽게 작성 및 테스트하고, 디버그할 수 있도록 돕습니다.

 
 
게시물 원문 작성자
Cheuk Ting Ho

Cheuk Ting Ho

image description

Discover more