Airflow를 사용하다 보면 특정 조건이 충족될 때만 다음 Task를 실행하고 싶은 경우가 있습니다.
예를 들어,
- 데이터가 존재할 때만 분석을 실행하거나
- 특정 요일(예: 월요일)일 때만 알림을 보내는 경우죠.
이때 유용하게 사용할 수 있는 오퍼레이터가 바로 ShortCircuitOperator 입니다.
ShortCircuitOperator란?
ShortCircuitOperator는 현재 Task의 반환값(True/False)에 따라,
이후 Task를 실행할지 건너뛸지를 결정하는 제어용 오퍼레이터입니다.
즉, DAG 안에서 “if 조건문” 역할을 하는 오퍼레이터예요.
예제 코드
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'ShortCircuitOperator',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 8, 12),
catchup=False,
)
# 1️⃣ 조건 함수
def condition_check():
return True # True면 다음 Task 실행, False면 다음 Task 모두 skip
# 2️⃣ 다음 Task
def run_this_func():
print('run_this_func executed!')
# ShortCircuitOperator 설정
condition_task = ShortCircuitOperator(
task_id='condition_check',
python_callable=condition_check,
dag=dag
)
# PythonOperator 설정
run_this_task = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag
)
# Task 순서 정의
condition_task >> run_this_task
동작 방식
| 단계 | 설 |
| 1단계 | condition_task 실행 → condition_check() 함수의 반환값 확인 |
| 2단계 | 결과가 True이면 → run_this_task 실행 |
| 3단계 | 결과가 False이면 → run_this_task는 skipped(건너뜀)처리 |
즉, 현재 오퍼레이터의 불리언 값(True/False) 을 기준으로
다음 Task의 실행 여부를 결정하게 됩니다.
정리하면
ShortCircuitOperator는 Airflow에서 조건 분기(if문) 역할을 담당하는 오퍼레이터입니다.
현재 Task의 반환값(True/False)을 기준으로
다음 Task를 실행할지 건너뛸지를 결정할 수 있어
DAG 제어를 더욱 유연하게 만들어 줍니다.
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow Sensor 실습 — PythonSensor로 파일 확인 (0) | 2025.10.18 |
|---|---|
| Airflow XCom과 ShortCircuitOperator 이해하기 (0) | 2025.10.18 |
| Airflow에서 system_site_packages와 requirements 함께 쓰기 (0) | 2025.10.17 |
| Airflow의 PythonVirtualenvOperator — 왜 requirements를 지정해야 할까? (0) | 2025.10.17 |
| Airflow의 PythonVirtualenvOperator — 왜 함수 안에 import를 넣어야 할까? (0) | 2025.10.17 |