✨ multiprocessing이 필요한 이유
많은 개발자들이 하는 일 중 하나는 기다리기일 것이다. 최근 회사에 취업을 하고 나서 제일 먼저 맡은 일은 두개의 데이터베이스에 존재하는 sequence 정보들을 가지고 similarity를 구해 relation 데이터베이스를 만드는 것인데 거의 일주일 넘게 삽질을 하였다. (혼남의 연속..🙇🏻♀️) 각 데이터프레임의 sequence들을 cross mapping하여 계산해야하기 때문에 대략적으로 코드를 짜면 아래와 같다(sequence similarity 계산은 생략하고 sequence를 단순히 더하는 계산 코드임)
def calculate(first_df, second_df): results = [] for first_index, first_seq in first_df.values: for second_index, second_seq in second_df.values: results.append(first_seq+second_seq) return results
데이터가 적으면 돌려도 오래 걸릴 일이 없지만 나같은 경우는..3000 row의 데이터베이스와 8000 row의 데이터베이스라 대략 계산했을 때 약 14시간 실제로는 더 오래 걸릴 것이다. 개발자에게 이런 비효율적인 코드는 효율화의 욕구를 불러일으킬 것이다. 그래서 사용한게 multiprocessing module이다.
✨ multiprocessing 전 알아야 할 것
💡 CPU가 뭐지?
multiprocessing을 사용하기 전에 기본적으로 알아야할 것들이 있다. 난 이 부분을 몰라서 엄청 삽질했다😞 다른 언어는 잘 모르겠지만 python은 1개의 cpu 위에서 코드가 실행된다. cpu 많이 들어봤을거다. python에서 아래와 같이 코드를 입력하면 cpu 갯수가 나온다. cpu는 컴퓨터의 머리라고 생각하면 되는데 우리가 수식을 계산할 때 머리를 사용하는 것처럼 컴퓨터가 연산할 때 cpu를 사용한다. 위처럼 for문에 for문을 돌리는 경우는 cpu가 열심히 일하는 경우이다. multiprocessing에선 pool을 사용하면 된다.
💡 process는 뭔데?
multiprocessing documentation을 보면 multiprocessing에는 process와 pool이 있다. pool은 내가 위에서 말한 것처럼 cpu를 여러개 돌리기 위해 쓰는 것이고(cpu bound를 위한 것이라 할 수 있다.) process는 I/O bound를 위한 것이다. I/O bound가 뭔데? I/O bound는 input/output bound인데 예를 들면 이런거다. 내가 코딩을 하기 위해 pycharm을 켜놓는다. 그리고 옆에 구글링을 위해 크롬을 킨다. 그리고 오늘 배운 것을 적어놓기 위해 노션을 켜놓고 중간중간 쉴때 글을 쓰기 위해 velog를 켜놓는다. 나는 이제 코딩을 시작할 것이다. 먼저 코딩을 한다. 그러다 모르는게 생겼다. 그러면 바로 구글링을 한다. 그러다 알아냈으면 다시 코딩 그러다 지식을 터득했으면 노션에 따로 정리 이렇게 하나의 몸과 머리로 여러개를 조금씩 잘라서 동시에 실행하는 것을 I/O bound라고 한다.
import multiprocessing multiprocessing.cpu_count()#난 10개
cpu bound는 회사에서 10개의 작업을 진행하게 되었다면 직원 2명한테 5개씩 나눠서 일을 시킨다. 그러면 동시에 각자 작업을 진행하게 될거고 마지막에 병합하면 된다. 이 것을 cpu bound라고 한다. 그래서 결국 난 for문의 for문을 돌려서 연산을 해야하기 때문에 pool을 사용할 것이다.
✨ multiprocessing 사용법
우선 제일 먼저 해야하는 것은 각 cpu에 나눠줄 수 있게 데이터를 비슷한 양으로 쪼개는 것이다. 내가 사용할 cpu의 갯수를 10개라고 한다면 아래와 같이 데이터를 쪼갤 수 있다. 참고로 사용할 cpu를 multiprocessing에선 worker라고 부른다. second_df를 출력해보면 10개의 리스트로 되어있고 리스트 안에는 데이터프레임이 들어있다. 10개의 리스트 하나씩 10개의 worker들이 일하게 된다.
import numpy as np max_worker = 10 second_df = np.array_split(second_df, max_worker)
이 후에 pool을 실행해보자. 하나씩 설명하자면 먼저 Pool을 열어 map을 실행하는데 여기서 궁금한 점 한가지 partial이 뭐지? partial은 고정시켜주는 것이다. 결국 난 for문에 for문을 돌려야한다. 그러면 map에는 이런 쌍들이 들어가야 한다. first_df의 갯수가 3000, second_df의 갯수가 8000이라할 때 (first_seq1, second_seq1), (first_seq1, second_seq2), (first_seq1, second_seq3), …, (first_seq3000, second_seq7999), (first_seq3000, second_seq8000) 이렇게 cross mapping으로 들어가야한다. 이 부분을 더 효율화하는 것이 partial이다. first_df는 고정해주면 second_df가 반복하는 만큼 알아서 반복시켜준다. 일반적인 map함수와 비슷하게 들어가면 된다. 그러고 각 worker들이 열심히 계산한 것이 쪼개져서 results에 저장된다. 그래서 chain.from_iterable로 shape을 줄여준다.
with Pool(processes=max_worker) as pool: results = pool.map(partial(calculate, first_df), second_df) df = pd.DataFrame(list(chain.from_iterable(results))
모든 코드가 완성되었다. 코드를 합쳐보면 이렇게 된다.
import numpy as np import multiprocessing from functools import partial from itertools import chain max_worker = 10 def calculate(first_df, second_df): results = [] for first_index, first_seq in first_df.values: for second_index, second_seq in second_df.values: results.append(first_seq+second_seq) return results second_df = np.array_split(second_df, max_worker) with Pool(processes=max_worker) as pool: results = pool.map(partial(calculate, first_df), second_df) df = pd.DataFrame(list(chain.from_iterable(results))
추가로 tqdm도 사용할 수 있다. 내 코드에선 tqdm(first_df.values)로 chop하면 된다.
✨ 주의할 점
multiprocessing을 사용할 땐 꼭 아래의 코드와 함께 써야한다고 적혀있다. 써야하는 이유는 module을 import할 때 미리 실행되는 것을 방지하기 위해서이다. 자세한 내용은 구글링하면 많이 나온다.
if__name__ == '__main__'