컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow XCom과 ShortCircuitOperator 이해하기

Data Jun 2025. 10. 18. 09:52

이번 포스팅에서는 Airflow의 Variable, XCom, ShortCircuitOperator를 활용해
사용자 입력값에 따라 DAG의 흐름을 제어하는 로직을 살펴봅니다.

목표는 다음과 같습니다 👇

 

Airflow Variable에 저장된 값이 10보다 크면 이후 태스크를 실행하고,

10 이하라면 실행을 중단(SKIP)하는 조건 분기 DAG 만들기

 

코드 전체 구조

from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
from airflow.models import Variable
from datetime import datetime

dag = DAG(
    'ShortCircuitOperator',
    start_date=datetime(2025, 10, 18),
    schedule_interval=None
)

def receive_user_input(**kwargs):
    user_input_value = Variable.get('user_input_value', default_var=0)
    if user_input_value:
        return int(user_input_value)
    else:
        print("No value set for 'user_input_value' variable.")
        return 0

receive_input_task = PythonOperator(
    task_id='receive_user_input',
    python_callable=receive_user_input,
    provide_context=True,
    dag=dag
)

def complex_condition_check(**kwargs):
    ti = kwargs['ti']
    received_value = ti.xcom_pull(task_ids='receive_user_input', dag_id='ShortCircuitOperator')
    if received_value > 10:
        return True
    return False

complex_condition_task = ShortCircuitOperator(
    task_id='complex_condition_check',
    python_callable=complex_condition_check,
    provide_context=True,
    dag=dag
)

def run_this_task():
    print("✅ 조건을 통과했습니다. 다음 작업을 실행합니다.")

run_this_task = PythonOperator(
    task_id='run_this_task',
    python_callable=run_this_task,
    dag=dag
)

receive_input_task >> complex_condition_task >> run_this_task

 

1. 단계별 동작 설명

 1.1 receive_user_input

 

Airflow의 Variable에서 'user_input_value' 값을 읽어옵니다.
없으면 기본값 0을 반환하고, 반환된 값은 자동으로 XCom에 저장됩니다.

def receive_user_input(**kwargs):
    user_input_value = Variable.get('user_input_value', default_var=0)
    return int(user_input_value)

 

  1.2  complex_condition_check

 

이전 태스크(receive_user_input)의 결과를 XCom에서 가져와 조건 검사를 수행합니다.

received_value = ti.xcom_pull(task_ids='receive_user_input')
if received_value > 10:
    return True  # 이후 태스크 실행
return False     # 이후 태스크 중단

ShortCircuitOperator는 반환값이 True면 다음 태스크를 실행하고,
False면 다음 모든 downstream 태스크를 Skip 처리합니다.

 

 1.3  run_this_task

조건을 통과했을 때만 실행되는 태스크입니다

 

 

2. **kwargs를 왜 쓰는가?

receive_user_input()은 이전 태스크에서 데이터를 받지 않지만,
Airflow는 PythonOperator 실행 시 컨텍스트(context) 정보를 자동으로 넘겨줍니다.

def receive_user_input(**kwargs):
    ...

 

이를 받지 않으면 Airflow가 내부적으로 python_callable(**context) 형태로 호출할 때
“unexpected keyword argument” 오류가 발생할 수 있습니다.

즉, **kwargs는 필요하든 아니든 Airflow 컨텍스트를 안전하게 받기 위한 표준 인자입니다.

 

 

3. XCom의 개념 정리

XCom이란?

XCom은 “Cross-Communication”의 줄임말로,
태스크 간 데이터를 주고받기 위한 Airflow 내부 메커니즘입니다.

 

한 태스크의 리턴값을 다른 태스크가 가져올 수 있도록 저장·전달하는 구조

 

3.1 작동 방식

 

 (1)  Push 단계 (저장)

def push_func(**kwargs):
    return 42  # 자동으로 XCom push

 

 (2) Pull 단계 (조회)

def pull_func(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='push_func')
    print(value)  # 42

 

 

3.2 튜플/딕셔너리 리턴 시 XCom 동작

 

Airflow는 리턴값이 무엇이든 하나의 객체로 직렬화하여 저장합니다.
즉, 튜플을 리턴하면 전체가 한 덩어리로 들어갑니다.

 

예시

def push_tuple():
    return (10, 20, 30)

➡ XCom 저장 내용:

(10, 20, 30)

가져올 때:

result = ti.xcom_pull(task_ids='push_tuple')
print(result[0])  # 10

 

3.3 여러 값을 개별로 저장하고 싶다면?

 

직접 xcom_push()를 사용해야 합니다 👇

def push_multiple(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='a', value=1)
    ti.xcom_push(key='b', value=2)

조회 시:

ti.xcom_pull(task_ids='push_multiple', key='a')

 

 

전체 흐름 요약 다이어그램

+---------------------+
| receive_user_input  |  → Variable 값 읽기
+---------------------+
           |
           ▼
+----------------------------+
| complex_condition_check    |  → 값 > 10 ? True : False
+----------------------------+
           |
      True | False
           ▼
+----------------+
| run_this_task  |  → 조건 True일 때만 실행
+----------------+

 

 

정리하면

구성요소 역할
Variable.get() Airflow Variable 값 읽기
PythonOperator 파이썬 함수 실행
ShotCircuitOperator 조건에 따라 이우 태스크 실행/중단
xcom_pull() 이전 태스크 결과 가져오기
**kwargs Airflow 컨텍스트 수신용

Airflow를 처음 접하면 “값 전달이 어떻게 이루어지는지”, “조건 분기는 어디서 하는지”가 헷갈리지만,
이번 예제처럼 Variable → XCom → ShortCircuitOperator 구조를 익히면
데이터 흐름을 제어하는 DAG를 훨씬 유연하게 설계할 수 있습니다.

 

핵심 키워드

XCom = Cross Communication
ShortCircuitOperator = 조건 분기 컨트롤러