이전 포스팅에선 병렬처리를 위해 multiprocessing을 사용했지만 요즘 들어서는 ray를 활용하고 있습니다. 왜 multiprocessing에서 ray로 갈아탔는지에 대한 이유와 ray 사용법에 대해 정리하였습니다.
🙋🏻♀️ multiprocessing이란?
multiprocessing은 프로세스 스포닝(process spawning)을 통해 여러 프로세서를 활용한다. 여기서 말하는 프로세스 스포닝에 대해 설명하자면 우선, 스포닝은 알을 낳는다란 의미이다. 즉 부모 프로세스가 자식 프로세스를 만드는 과정을 의미한다. Pool 객체를 통해 병렬 처리를 하며 비동기처리, 이터레이터 등의 차이에 따라 map, imap, map_async, imap_unordered 등의 메서드를 사용한다.
multiprocessing 라이브러리에서 프로세스 간에 객체를 전달할 때 pickle을 사용해 전달하는데 pickling이 가능한 객체(pickle-able)만 사용 가능하다. pickle은 파이썬 객체를 바이트 스트림으로 직렬화하고 다시 역직렬화 하여 객체를 재구성하는 방식이다. pickle 모듈 문서에선 pickling이 가능한 모듈을 다음과 같은 type이 있다고 한다. 이런 pickling 방식은 모든 프로세스가 데이터에 대한 복사본을 만들어야 하며, 큰 메모리를 할당하고 역직렬화에서 발생하는 오버헤드를 가질 수밖에 없다.
- None, true, and false
- Integers, long integers, floating point numbers, complex numbers
- Normal and Unicode strings
- Tuples, lists, sets, and dictionaries containing only picklable objects
- Functions defined at the top level of a module
- Built-in functions defined at the top level of a module
- Classes that are defined at the top level of a module
🤷🏻♀️ ray는 다른게 뭔데?
ray는 병렬 처리를 위한 오픈 소스 통합 프레임워크이다. 데이터 전처리 뿐만 아니라 분산 training, hyperparameter tuing, model serving에서도 사용할 수 있다. multiprocessing 라이브러리와 다르게 기존에 작성한 코드를 조금만 수정하면 병렬처리를 할 수 있고 메모리 누수 현상이 발생하지 않는다. ray는 직렬화 오버헤드가 적은 Apache Arrow를 사용하여 Zero-copy 직렬화를 수행한다. zero-copy 직렬화란 데이터를 복사하지 않고 직접 메모리에 접근하여 직렬화를 수행하기 때문에 복사 연산을 피할 수 있어 대규모 데이터 처리 작업에 유용하다. 즉, ray는 high-level에서 쉽게 분산 가능하며 병렬적으로 실행시킬 수 있는 시스템을 구축할 수 있다. 또한, 분산/병렬 시스템의 아래와 같은 요구사항들을 강력하고 쉬운 방법으로 풀 수 있도록 도와준다.
- 다수의 컴퓨터에서 동일한 코드로 실행시킬 수 있어야 한다.
- stateful하고 통신 가능한 Microservice 및 Actor를 구축할 수 있어야 한다.
- 기계 고장 및 시스템 고장을 훌륭하게 처리할 수 있어야 한다.
- 거대한 데이터와 수체 데이터를 효율적으로 처리할 수 있어야 한다.
🤔 ray 사용해보기
ray를 설치하고 간단히 사용해보자.
pip install ray
먼저, ray.init()으로 ray cluster를 실행한다. ray.init을 통해 dashboard 실행, cpu core 갯수를 설정할 수 있다. 병렬처리를 하고 싶은 함수 위에 ray.remote 데코레이터로 감싸준다. 데코레이터로 감싸준 함수는 remote(ObjectRef) 메서드를 호출할 수 있다. remote 메서드를 호출하면 Object(Future 객체) Ref(공유 메모리 주소)를 반환한다. ray.put()을 이용하면 데이터를 공유 메모리에 저장하여 복사본을 만들지 않고 모든 프로세스에서 접근할 수 있다. ray.get(ObjectRef)로 값을 반환받을 수 있다.
%%time import numpy as np import ray ray.init() arr = np.random.random(100000000) @ray.remote def mul(x): return x * x arr = ray.put(arr) result = ray.get(mul.remote(arr)) # CPU times: user 2.99 s, sys: 5.24 s, total: 8.22 s# Wall time: 12 s
ray를 다 사용했으면 ray.shutdown()을 이용해 프로세스를 종료해야한다.
ray.shutdown()
🤖 dashboard로 ray가 잘 실행되고 있는지 확인하기
먼저 아래와 같이 default로 설치해줘야한다.
pip install "ray[default]"
ray.init 시 parameter에 아래와 같이 추가해주면 0.0.0.0:8265로 dashboard를 확인할 수 있다.
ray.init(ignore_reinit_error=True, dashboard_host="0.0.0.0", dashboard_port=8265, include_dashboard=True)
cluster에서 cpu 사용량과 memory를 확인할 수 있고 jobs에서 task들을 확인할 수 있다.
간단히, cli로도 확인할 수 있는데 ray status를 입력하면 사용하고 있는 cpu core, memory를 확인할 수 있다.
ray status ======== Autoscaler status: 2023-04-14 15:08:54.505294 ======== Node status --------------------------------------------------------------- Healthy: 1 node_a97857866db864797421c265df4c57eb57b2c4025798091347b49a9c Pending: (no pending nodes) Recent failures: (no failures) Resources --------------------------------------------------------------- Usage: 16.0/16.0 CPU 0.00/4.369 GiB memory 0.00/2.000 GiB object_store_memory Demands: {'CPU': 1.0}: 65115+ pending tasks/actors
✏️ 마치며
이렇게 ray와 multiprocessing의 차이와 ray 사용법에 대해 다뤄봤다. multiprocessing을 사용할 때 스포닝 안되는 라이브러리를 활용했다가 이유가 뭔지 잘 이해하지 못했었는데 이번 포스팅을 작성하면서 이해할 수 있게 됐다. multiprocessing을 활용할 땐 어느정도 task가 진행되고 있는지 알 수 없어 불편했는데 ray는 보기 좋은 dashboard를 제공해줘서 일의 진척도도 쉽게 파악할 수 있을 것 같다. 문서에서는 머신러닝 모델을 학습할 때도 ray를 활용할 수 있다고 소개하고 있는데 이 부분도 좀 더 파고 들어보면 좋을 것 같다.