컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow ShortCircuitOperator — 조건에 따라 Task 실행 제어하기

Data Jun 2025. 10. 18. 09:24

Airflow를 사용하다 보면 특정 조건이 충족될 때만 다음 Task를 실행하고 싶은 경우가 있습니다.

 

예를 들어,

  • 데이터가 존재할 때만 분석을 실행하거나
  • 특정 요일(예: 월요일)일 때만 알림을 보내는 경우죠.

이때 유용하게 사용할 수 있는 오퍼레이터가 바로 ShortCircuitOperator 입니다.

 

ShortCircuitOperator란?

ShortCircuitOperator는 현재 Task의 반환값(True/False)에 따라,

이후 Task를 실행할지 건너뛸지를 결정하는 제어용 오퍼레이터입니다.

즉, DAG 안에서 “if 조건문” 역할을 하는 오퍼레이터예요.

 

예제 코드

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator

# 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
dag = DAG(
    'ShortCircuitOperator',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 8, 12),
    catchup=False,
)

# 1️⃣ 조건 함수
def condition_check():
    return True  # True면 다음 Task 실행, False면 다음 Task 모두 skip

# 2️⃣ 다음 Task
def run_this_func():
    print('run_this_func executed!')

# ShortCircuitOperator 설정
condition_task = ShortCircuitOperator(
    task_id='condition_check',
    python_callable=condition_check,
    dag=dag
)

# PythonOperator 설정
run_this_task = PythonOperator(
    task_id='run_this',
    python_callable=run_this_func,
    dag=dag
)

# Task 순서 정의
condition_task >> run_this_task

 

동작 방식

단계
1단계 condition_task 실행  →  condition_check()  함수의 반환값 확인
2단계 결과가 True이면 → run_this_task 실행
3단계 결과가 False이면 → run_this_task는 skipped(건너뜀)처리

즉, 현재 오퍼레이터의 불리언 값(True/False) 을 기준으로
다음 Task의 실행 여부를 결정하게 됩니다.

 

 

정리하면

 

ShortCircuitOperator는 Airflow에서 조건 분기(if문) 역할을 담당하는 오퍼레이터입니다.

현재 Task의 반환값(True/False)을 기준으로
다음 Task를 실행할지 건너뛸지를 결정할 수 있어
DAG 제어를 더욱 유연하게 만들어 줍니다.