컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow Operator — 하나의 작업에는 하나의 오퍼레이터

Data Jun 2025. 10. 13. 22:36

Airflow를 사용할 때 가장 중요한 원칙 중 하나는 **“한 Operator에는 한 Task만 맡기기”**입니다.
즉, Extract, Transform, Load 과정을 각각 분리해야 관리와 복구가 쉬워집니다.

 

잘못된 예시

 이미지 처럼 PythonOperator 하나에

  • Extract Data
  • Transform Data
  • Load Data
    를 모두 넣는 방식은 좋지 않습니다.

이 경우 “Transform Data” 단계에서 오류가 발생하면, 전체 작업이 실패하고
앞 단계인 Extract Data부터 다시 실행해야 합니다.
이는 불필요한 리소스 낭비와 디버깅 어려움을 초래합니다.

 

 

올바른 예시

이미지 처럼 각 단계를 별도의 PythonOperator로 분리하는 것이 바람직합니다.

Extract Data → Transform Data → Load Data
이렇게 구성하면 “Transform Data”에서 실패하더라도,
해당 단계만 재시도할 수 있어 효율적입니다.
Airflow의 장점인 태스크 단위 재실행(Retry) 기능도 제대로 활용할 수 있습니다.

 

예제 코드

아래 예시는 PythonOperator와 DummyOperator를 사용한 간단한 DAG입니다.
각 작업을 별도로 정의하고, 순서대로 실행되도록 구성했습니다.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    print("Hello Airflow!")

# Define the DAG
with DAG('sample_dag',
        description='A simple DAG', 
        schedule_interval='0 0 * * *',
        start_date=datetime(2053, 10, 14),
        catchup=False) as dag:
    
    # Task 1: Print "Hello Airflow!"
    task1 = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello,
        dag=dag
    )

    # Task 2: Dummy task
    task2 = DummyOperator(
        task_id='dummy_task',
        dag=dag
    )

# Define the task dependencies
task1 >> task2

이 DAG는 단순히 “Hello Airflow!”를 출력한 후,
더미 태스크(DummyOperator)를 실행합니다.
실제 프로젝트에서는 Extract, Transform, Load 단계별 함수를 따로 정의해
각각의 PythonOperator에 연결하면 됩니다.

 

 

정리하면

 

“한 Operator = 한 Task” 원칙을 지키면 Airflow DAG의 안정성과 가시성이 크게 향상됩니다.