컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow TaskFlow API: 더 직관적인 파이썬 방식으로 DAG를 설계

Data Jun 2025. 10. 19. 20:59

Apache Airflow는 데이터 파이프라인을 관리하는 대표적인 워크플로우 오케스트레이션 툴입니다.
Airflow 2.0부터 새롭게 도입된 TaskFlow API는 기존의 Operator 기반 DAG 설계보다 훨씬 더 “파이썬다운(Pythonic)” 방식으로 코드를 작성할 수 있게 해줍니다.

 

이 글에서는 TaskFlow API의 핵심 개념과 주요 기능, 그리고 언제 사용하면 좋은지까지 정리해보겠습니다.

 

1. Task Flow API란?

TaskFlow API는 Airflow 2.0에서 도입된 기능으로,
데코레이터와 네이티브 파이썬 함수 기반의 워크플로우 정의 방식을 제공합니다.

 

기존에는 PythonOperator 등을 이용해 Operator 인스턴스를 직접 생성해야 했다면,
이제는 단순히 함수 위에 @task 데코레이터를 붙여 Task를 정의할 수 있습니다.

👉 핵심 장점

  • 더 직관적이고 Pythonic한 코드 스타일
  • Task 간 데이터 전달이 자연스럽고 간결
  • 코드 가독성과 유지보수성 대폭 향상

 

2. 주요 특징 (Key Features)

 1️⃣ 함수 기반 Task 정의

 

이제 Task를 생성하기 위해 Operator 클래스를 인스턴스화할 필요가 없습니다.
함수에 @task 데코레이터만 붙이면 그것이 바로 Airflow의 하나의 Task가 됩니다.

from airflow.decorators import task

@task
def extract():
    # 데이터 추출 로직
    return data

핵심 포인트:

Task 정의가 단순 함수로 바뀌면서 코드가 훨씬 깔끔해지고,
Airflow 초보자에게도 접근성이 좋아졌습니다.

 

 2️⃣ XCom을 이용한 데이터 전달 (자동 데이터 Passing)

 

Task 간 데이터를 주고받을 때, 이제는 xcom_push, xcom_pull을 직접 호출할 필요가 없습니다.
함수의 return 값과 파라미터만으로 데이터가 자동으로 전달됩니다.

@task
def transform(data):
    # 데이터 변환
    return transformed_data

@task
def load(data):
    # 데이터 적재
    print("데이터 적재 완료")

processed_data = transform(raw_data)
load(processed_data)

핵심 포인트:

“명시적 push/pull 없이도 자동으로 XCom이 연결된다.”
더 이상 ti.xcom_push()와 같은 코드로 복잡하게 처리하지 않아도 됩니다.

 

 3️⃣ XCom JSON 직렬화 활용 (고급 예시)

 

TaskFlow API 이전에는 XCom을 통해 JSON 데이터를 주고받으려면 아래처럼
명시적으로 ti.xcom_push()를 사용해야 했습니다.

def transform(**kwargs):
    ti = kwargs['ti']
    extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data')
    order_data = json.loads(extract_data_string)
    ...
    ti.xcom_push('total_order_value', json.dumps(total_value))

하지만 이제는 단순히 dict 형태를 반환하면 Airflow가 자동으로 JSON 직렬화/역직렬화를 처리합니다.

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}

핵심 포인트:

multiple_outputs=True 옵션을 주면 함수의 반환값 딕셔너리가 자동으로 여러 XCom으로 분리되어 관리됩니다.

 

 4️⃣ @dag 데코레이터로 간결한 DAG 정의

 

이제 with DAG(...) 구문 없이도, @dag 데코레이터를 통해
TaskFlow DAG를 함수 하나로 감쌀 수 있습니다.

from airflow.decorators import dag
from datetime import datetime

@dag(schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False)
def my_etl_dag():
    data = extract()
    transformed = transform(data)
    load(transformed)

dag = my_etl_dag()

핵심 포인트:

@dag 데코레이터는 Task 정의와 의존성 설정을 하나의 함수 안에 캡슐화해
DAG 전체를 더 직관적으로 표현할 수 있습니다.

 

 5️⃣ 가독성과 유지보수성 향상

 

TaskFlow API를 사용하면 Airflow DAG 코드가 일반적인 Python 코드처럼 보이게 됩니다.
즉, “운영자가 읽기 쉽고, 개발자가 유지보수하기 좋은” 코드로 변모합니다.

 

핵심 포인트:

기존 Airflow 특유의 장황한 Operator 기반 코드 대신,
함수 중심의 Pythonic 코드 스타일을 도입할 수 있습니다.

 

 

3. TaskFlow API의 장점 (Benefits)

항목 설명
Easae of Use 함수 기반 설계로 직관적이며 학습 곡선이 완화됨
 Enhanced Maintainability 코드 구조가 단순해지고 수정이 용이함
Better Data Handling Task 간 데이터 전달을 자동화하여 오류 가능성 감

 

4. 언제 TaskFlow API를 사용하면 좋을까?

사용 사례 설명
ETL 파이프라인 여러 단계를 거치는 데이터 흐름 관리에 이상적
단순한 Workflow 함수형 설계에 잘 어울리는 구조의 DAG
프로토타이핑 빠른 테스트 및 반복 개발 시 유용

 

 

정리하면

 

TaskFlow API는 Airflow를 훨씬 더 Pythonic하게 만들어주는 혁신적인 기능입니다.
복잡한 Operator 코드를 최소화하고, 함수 중심의 코드 설계로 가독성을 높이며,
XCom을 통한 데이터 전달을 자동화하여 개발 효율을 크게 끌어올립니다.

데이터 파이프라인을 코드로 깔끔하게 설계하고 싶다면,
이제는 TaskFlow API를 기본으로 사용하는 것이 표준이 되어가고 있습니다.