ETC/Pattern

Producer-consumer pattern

seokhyun2 2020. 8. 2. 22:54

자연어처리에서는 거대한 언어 모델을 학습하기 위해 방대한 양의 학습 데이터를 사용합니다.

학습 데이터의 전처리도 성능에 많은 영향을 끼치면서 전처리를 많이 하고 있는데 학습 데이터의 양이 너무 크다보니 오히려 학습보다 전처리가 더 오래 걸리기도 하여 학습 데이터의 전처리에서는 병렬처리가 필수가 되고 있습니다.

 

그래서 오늘은 병렬처리를 할 때, 유용한 패턴을 하나 소개해보겠습니다.

병렬처리에 대해서 공부하게 되면, 꼭 빠지지 않고 등장하는 것 중에 하나가 producer-consumer pattern(프로듀서-컨슈머 패턴;생산자-소비자 패턴)입니다.

RabbitMQ, Kafka와 같은 메시징 큐를 보면 producer와 consumer는 절대 빠지지 않습니다.

producer와 consumer가 무엇인지 알아보기 전에 아래의 그림을 먼저 보도록 하겠습니다.

 

producer-consumer pattern은 위의 그림처럼 구성이 됩니다.

여러 개의 producer는 각각 동일한 작업을 해서 queue에 집어넣고, consumer에서는 queue에 있는 작업을 가져가서 각자 동일한 작업으로 수행합니다.

여기서 여러 개의 producer와 consumer들이 각각 동일한 작업을 수행하는 여러개의 쓰레드 혹은 프로세스로 구성이 되므로 병렬처리가 되는 것입니다.

 

예시로 살펴보면, 하나의 파일에 엄청나게 많은 문장이 적혀있고 우리는 그 문장들을 전처리를 수행하려고 합니다.

이 때, 한 문장씩 순차적으로 하는 것 보다는 동시에 여러 문장을 각각 전처리하면 훨씬 빠르게 수행할 수 있겠죠?

그래서 하나의 producer로 구성하여 producer는 파일을 읽어서 한 문장 씩 queue에 보내주고, 각 consumer에서는 queue에 들어있는 문장들을 꺼내서 전처리를 수행하도록 구성할 수 있습니다.

그러면 각 consumer는 동일한 작업을 동시에 처리하기 때문에 병렬처리를 쉽게 할 수 있는 구조가 됩니다.

 

자바에서는 멀티쓰레드를 활용하여 쉽게 구현할 수 있는데 파이썬의 경우에는 GIL(Global Interpreter Lock)이 존재하여 멀티 쓰레드보다는 멀티 프로세스로 구현을 해주어야 합니다. 

 

그럼 파이썬으로는 어떻게 producer-consumer pattern을 구현할 수 있는지 한 번 예제를 보도록 하겠습니다.

import time
import os
import random
from multiprocessing import Process, Queue, Lock
 
 
# Producer function that places data on the Queue
def producer(queue, lock, names):
    # Synchronize access to the console
    with lock:
        print('Starting producer => {}'.format(os.getpid()))
         
    # Place our names on the Queue
    for name in names:
        time.sleep(random.randint(0, 10))
        queue.put(name)
 
    # Synchronize access to the console
    with lock:
        print('Producer {} exiting...'.format(os.getpid()))
 
 
# The consumer function takes data off of the Queue
def consumer(queue, lock):
    # Synchronize access to the console
    with lock:
        print('Starting consumer => {}'.format(os.getpid()))
     
    # Run indefinitely
    while True:
        time.sleep(random.randint(0, 10))
         
        # If the queue is empty, queue.get() will block until the queue has data
        name = queue.get()
 
        # Synchronize access to the console
        with lock:
            print('{} got {}'.format(os.getpid(), name))
 
 
if __name__ == '__main__':
     
    # Some lists with our favorite characters
    names = [['Master Shake', 'Meatwad', 'Frylock', 'Carl'],
             ['Early', 'Rusty', 'Sheriff', 'Granny', 'Lil'],
             ['Rick', 'Morty', 'Jerry', 'Summer', 'Beth']]
 
    # Create the Queue object
    queue = Queue()
     
    # Create a lock object to synchronize resource access
    lock = Lock()
 
    producers = []
    consumers = []
 
    for n in names:
        # Create our producer processes by passing the producer function and it's arguments
        producers.append(Process(target=producer, args=(queue, lock, n)))
 
    # Create consumer processes
    for i in range(len(names) * 2):
        p = Process(target=consumer, args=(queue, lock))
         
        # This is critical! The consumer function has an infinite loop
        # Which means it will never exit unless we set daemon to true
        p.daemon = True
        consumers.append(p)
 
    # Start the producers and consumer
    # The Python VM will launch new independent processes for each Process object
    for p in producers:
        p.start()
 
    for c in consumers:
        c.start()
 
    # Like threading, we have a join() method that synchronizes our program
    for p in producers:
        p.join()
 
    print('Parent process exiting...')

위의 예제 소스코드에서 producer는 data를 queue에 넣는 역할을 하고, consumer는 queue에서 data를 꺼내도록 동작합니다.

그래서 producer는 names에 있는 3개의 리스트에 대하여 각 리스트를 queue에 하나씩 집어넣으면서 3개의 프로세스를 시작하도록 구성하였으며, consumer는 6개의 프로세스를 시작하도록 구성하였는데 consumer는 내부에 무한 루프로 구현이 되어 있어 프로세스가 꺼지지 않기 때문에 p.daemon = True를 활용하여 부모 프로세스가 꺼지는 시점에 consumer들의 프로세스가 같이 꺼지도록 구성하였습니다.

 

정리하면, producer는 생산자로 queue에 집어넣는 역할을 하고 consumer는 소비자로 queue에서 꺼내도록 하여, 동일한 작업을 하는 여러 개의 producer 혹은 consumer가 동시에 동작하도록 구성하여 병렬처리를 할 때 유용한 패턴입니다.