이번 포스팅에서는 Airflow의 Variable, XCom, ShortCircuitOperator를 활용해
사용자 입력값에 따라 DAG의 흐름을 제어하는 로직을 살펴봅니다.
목표는 다음과 같습니다 👇
Airflow Variable에 저장된 값이 10보다 크면 이후 태스크를 실행하고,
10 이하라면 실행을 중단(SKIP)하는 조건 분기 DAG 만들기
코드 전체 구조
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
from airflow.models import Variable
from datetime import datetime
dag = DAG(
'ShortCircuitOperator',
start_date=datetime(2025, 10, 18),
schedule_interval=None
)
def receive_user_input(**kwargs):
user_input_value = Variable.get('user_input_value', default_var=0)
if user_input_value:
return int(user_input_value)
else:
print("No value set for 'user_input_value' variable.")
return 0
receive_input_task = PythonOperator(
task_id='receive_user_input',
python_callable=receive_user_input,
provide_context=True,
dag=dag
)
def complex_condition_check(**kwargs):
ti = kwargs['ti']
received_value = ti.xcom_pull(task_ids='receive_user_input', dag_id='ShortCircuitOperator')
if received_value > 10:
return True
return False
complex_condition_task = ShortCircuitOperator(
task_id='complex_condition_check',
python_callable=complex_condition_check,
provide_context=True,
dag=dag
)
def run_this_task():
print("✅ 조건을 통과했습니다. 다음 작업을 실행합니다.")
run_this_task = PythonOperator(
task_id='run_this_task',
python_callable=run_this_task,
dag=dag
)
receive_input_task >> complex_condition_task >> run_this_task
1. 단계별 동작 설명
1.1 receive_user_input
Airflow의 Variable에서 'user_input_value' 값을 읽어옵니다.
없으면 기본값 0을 반환하고, 반환된 값은 자동으로 XCom에 저장됩니다.
def receive_user_input(**kwargs):
user_input_value = Variable.get('user_input_value', default_var=0)
return int(user_input_value)
1.2 complex_condition_check
이전 태스크(receive_user_input)의 결과를 XCom에서 가져와 조건 검사를 수행합니다.
received_value = ti.xcom_pull(task_ids='receive_user_input')
if received_value > 10:
return True # 이후 태스크 실행
return False # 이후 태스크 중단
ShortCircuitOperator는 반환값이 True면 다음 태스크를 실행하고,
False면 다음 모든 downstream 태스크를 Skip 처리합니다.
1.3 run_this_task
조건을 통과했을 때만 실행되는 태스크입니다
2. **kwargs를 왜 쓰는가?
receive_user_input()은 이전 태스크에서 데이터를 받지 않지만,
Airflow는 PythonOperator 실행 시 컨텍스트(context) 정보를 자동으로 넘겨줍니다.
def receive_user_input(**kwargs):
...
이를 받지 않으면 Airflow가 내부적으로 python_callable(**context) 형태로 호출할 때
“unexpected keyword argument” 오류가 발생할 수 있습니다.
즉, **kwargs는 필요하든 아니든 Airflow 컨텍스트를 안전하게 받기 위한 표준 인자입니다.
3. XCom의 개념 정리
XCom이란?
XCom은 “Cross-Communication”의 줄임말로,
태스크 간 데이터를 주고받기 위한 Airflow 내부 메커니즘입니다.
한 태스크의 리턴값을 다른 태스크가 가져올 수 있도록 저장·전달하는 구조
3.1 작동 방식
(1) Push 단계 (저장)
def push_func(**kwargs):
return 42 # 자동으로 XCom push
(2) Pull 단계 (조회)
def pull_func(**kwargs):
ti = kwargs['ti']
value = ti.xcom_pull(task_ids='push_func')
print(value) # 42
3.2 튜플/딕셔너리 리턴 시 XCom 동작
Airflow는 리턴값이 무엇이든 하나의 객체로 직렬화하여 저장합니다.
즉, 튜플을 리턴하면 전체가 한 덩어리로 들어갑니다.
예시
def push_tuple():
return (10, 20, 30)
➡ XCom 저장 내용:
(10, 20, 30)
가져올 때:
result = ti.xcom_pull(task_ids='push_tuple')
print(result[0]) # 10
3.3 여러 값을 개별로 저장하고 싶다면?
직접 xcom_push()를 사용해야 합니다 👇
def push_multiple(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='a', value=1)
ti.xcom_push(key='b', value=2)
조회 시:
ti.xcom_pull(task_ids='push_multiple', key='a')
전체 흐름 요약 다이어그램
+---------------------+
| receive_user_input | → Variable 값 읽기
+---------------------+
|
▼
+----------------------------+
| complex_condition_check | → 값 > 10 ? True : False
+----------------------------+
|
True | False
▼
+----------------+
| run_this_task | → 조건 True일 때만 실행
+----------------+
정리하면
| 구성요소 | 역할 |
| Variable.get() | Airflow Variable 값 읽기 |
| PythonOperator | 파이썬 함수 실행 |
| ShotCircuitOperator | 조건에 따라 이우 태스크 실행/중단 |
| xcom_pull() | 이전 태스크 결과 가져오기 |
| **kwargs | Airflow 컨텍스트 수신용 |
Airflow를 처음 접하면 “값 전달이 어떻게 이루어지는지”, “조건 분기는 어디서 하는지”가 헷갈리지만,
이번 예제처럼 Variable → XCom → ShortCircuitOperator 구조를 익히면
데이터 흐름을 제어하는 DAG를 훨씬 유연하게 설계할 수 있습니다.
핵심 키워드
XCom = Cross Communication
ShortCircuitOperator = 조건 분기 컨트롤러
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow Sensor의 soft_fail 이해하기 (0) | 2025.10.18 |
|---|---|
| Airflow Sensor 실습 — PythonSensor로 파일 확인 (0) | 2025.10.18 |
| Airflow ShortCircuitOperator — 조건에 따라 Task 실행 제어하기 (0) | 2025.10.18 |
| Airflow에서 system_site_packages와 requirements 함께 쓰기 (0) | 2025.10.17 |
| Airflow의 PythonVirtualenvOperator — 왜 requirements를 지정해야 할까? (0) | 2025.10.17 |