Apache 기반 Airflow 기본기 다지기
💨

Apache 기반 Airflow 기본기 다지기

Tags
Data
Airflow
Published
May 23, 2024
Author
유레미 EUREMI

개요

주요 용어

  • DAG: Directed Acyclic Graph(대그), 방향성 비순 환 그래프
    • DAG 파일은 파이썬으로 이루어져있음
  • backfilling(백필): DAG의 과거 시점을 지정해 실행하는 프로세스
  • 태스크
  • 오퍼레이터

그래프 기반 표현의 특성

  • 독립적인 태스크의 경우 태스크를 병렬로 실행할 수 있음
    • 날씨 예보 가져오기, 판매 데이터 가져오기는 독립적임
  • 모놀리식(단일) 스크립트가 X, 점진적인 태스크로 명확하게 분리할 수 있음
    • 중간에 실패하면 실패한 태스크만 재실행하면 됨

주요 구성 요소

  • Airflow 스케줄러: 현재 시점에서 DAG의 스케줄이 지난 경우 Airflow 워커에 DAG의 태스크를 예약함
  • Airflow 워커: 예약된 태스크를 선택하고 실행함
  • Airflow 웹 서버: 스케줄러에서 분석한 DAG를 시각화하고 DAG 실행과 결과를 확인할 수 있는 인터페이스를 제공함

Airflow의 장점

  • bash 쉘과 python을 동시에 활용할 수 있다.
  • 앞 태스크가 실패했을 경우, 뒷 태스크는 실행되지 않으며 실패한 태스크만 재실행할 수 있다.

튜토리얼

시작을 해보자! 3개의 태스크로 구성되어있는 파이프라인을 만드려고 한다. 먼저, curl을 이용해 로켓의 발사 정보를 가져올 수 있는 api를 이용해서 launches.json 파일을 받아온다. 그다음 launches.json 파일 안에 image url을 파이썬을 사용해 다운 받는다. 마지막으로 이미지 갯수를 확인하는 bash shell 명령어를 입력한다.
아래 코드는 Apache Airflow 기반의 데이터 파이프라인에서 발췌했으며 도커 이미지가 없어 다른 이미지로 변경하였다.
# download_rocket_launches.pyimport json import pathlib import airflow.utils.dates import requests import requests.exceptions as requests_exceptions from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator dag = DAG( dag_id="download_rocket_launches", description="Download rocket pictures of recently launched rockets.", start_date=airflow.utils.dates.days_ago(14), schedule_interval="@daily", ) download_launches = BashOperator( task_id="download_launches", bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",# noqa: E501 dag=dag, ) def _get_pictures(): # Ensure directory exists pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True) # Download all pictures in launches.jsonwith open("/tmp/launches.json") as f: launches = json.load(f) image_urls = [launch["image"] for launch in launches["results"]] for image_url in image_urls: try: response = requests.get(image_url) image_filename = image_url.split("/")[-1] target_file = f"/tmp/images/{image_filename}" with open(target_file, "wb") as f: f.write(response.content) print(f"Downloaded {image_url} to {target_file}") except requests_exceptions.MissingSchema: print(f"{image_url} appears to be an invalid URL.") except requests_exceptions.ConnectionError: print(f"Could not connect to {image_url}.") get_pictures = PythonOperator( task_id="get_pictures", python_callable=_get_pictures, dag=dag ) notify = BashOperator( task_id="notify", bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."', dag=dag, ) download_launches >> get_pictures >> notify
airflow를 실행하기 위해서 docker를 사용한다. docker를 실행했을때 로그가 출력되는 걸 확인할 수 있다.
docker run \\ -ti \\ -p 8080:8080 \\ -v {download_rocket_launches.py 파일 위치}:/opt/airflow/dags/download_rocket_launches.py \\ --name airflow \\ --entrypoint=/bin/bash \\ apache/airflow:slim-latest-python3.8 \\ -c '( \\ airflow db init && \\ airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org \\ ); \\ airflow webserver & \\ airflow scheduler \\ '
성공적으로 실행됐다면 http://localhost:8080으로 들어가보자
왼쪽 토글을 오른쪽으로 옮기고 재생을 누르면 실행된다!

스케줄

기본적인 스케줄 간격, cron 기반의 스케줄 간격, 빈도 기반의 스케줄 간격이 존재한다.
가장 먼저 기본적인 스케줄 간격을 알아보자. schedule_interval을 @daily로 한다면 실행은 매일 자정이다. 예를 들어 2019-01-01에 DAG를 실행하면 1월 2일 자정에 처음으로 실행된다. start_date가 2019-01-01, end date가 2019-01-05라면 아래와 같이 다섯번 실행된다.
  • 2019-01-02: 최초 실행
  • 2019-01-03: 두번째 실행
  • 2019-01-04: 세번째 실행
  • 2019-01-05: 네번째 실행
  • 2019-01-06: 다섯번째 실행
cron 기반의 스케줄 간격은 5개의 구성 요소가 있고 맨 앞 순서부터 분(0~59), 시간(0~23), 일(1~31), 월(1~12), 요일(0~6)으로 되어있다. *는 애스터리스크로 제한되지 않은 필드로 정의한다.
  • 0 0 * * 0이라면 일요일 자정에 실행
  • 2 13 2 * *이라면 매월 2일 13시 2분에 실행
  • 0 0,12 * * MON-FRI라면 매월 월-금 자정, 정오에 실행
빈도 기반의 스케줄 간격은 cron의 제약을 해결할 수 있다. 예를 들어, 3일마다 한 번씩 실행하기 위해선 cron 기반은 가능하지 않다. 이럴떈 timedelta 인스턴스를 사용해 빈도 기반의 스케줄 간격을 정의힌다. 아래와 같이 dag를 만들면 2019년 1월 4일에 실행될 것이다.
dag=DAG( dag_id="time_delta", schedule_interval=dt.timedelta(days=3), start_date=dt.datetime(year=2019, month=1, day=1), end_date=dt.datetime(year=2019, month=1, day=5)

의존성

두 태스크가 의존성을 가질때는 아래 의존성을 추가해야한다.
fetch_weather >> clean_weather fetch_sales >> clean_sales
하나의 태스크가 2개의 태스크에 의존성을 가진다면 (예를 들어, 데이터셋을 만들기 위해 날씨 데이터와 판매 데이터 모두를 가져와야한다면) 아래와 같이 팬인(일 대 다) 의존성 태스크를 생성해야한다.
[fetch_weather, fetch_sales] >> join_datasets
반대라면 팬아웃(다 대 일)이라고 한다.
start >> [fetch_weather, fetch_sales]
위와 같은 의존성일 때 DAG를 실행하면 fetch_weather와 fetch_sales는 병렬로 실행된다.

브랜치

조건에 의해 다른 task를 실행해야한다면 BranchPythonOperator를 사용한다. 선택하지 않은 브랜치 작업은 모두 건너뛴다.

XCom

태스크들은 독립적으로 실행되기 때문에 태스크 간에 데이터를 공유하기 위해선 XCom을 사용해야 한다.
def _train_mode(**context): model_id=str(uuid.uuid4()) context["task_instance"].xcom_push(key="model_id", value="model_id") def _deploy_model(**context): model_id=context["task_instance"].xcom.pull( "task_ids="train_model, key="model_id" ) join_datasets >> train_model >> deploy_model
이 접근 방식은 함수를 정의한 후 PythonOperator를 이용해 Airflow 태스크를 생성해야하기 때문에 Taskflow API를 사용한다. @task 라는 데코레이터로 변환할 수 있다.
from airflow.decorators import task with DAG(...) as dag: @task def train_model(): model_id = str(uuid.uuid4()) return model_id @task def deploy_model(model_id: str): print(f"Deploying model {model_id")

참고자료

  • Apache Airflow 기반의 데이터 파이프라인 책