컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow ExternalTaskSensor 이해하기

Data Jun 2025. 10. 19. 07:50

다른 DAG(Task)의 실행 완료를 기다리는 센서

Airflow로 여러 DAG을 운영하다 보면,
서로 다른 DAG 간에 실행 순서를 맞춰야 할 때가 있습니다.
예를 들어,

 

“데이터 전처리 DAG이 끝난 다음 모델 학습 DAG이 실행돼야 한다.”

 

이럴 때 사용하는 게 바로 ExternalTaskSensor 입니다.

 

1. ExternalTaskSensor란?

ExternalTaskSensor는 이름 그대로

“외부(External) DAG의 특정 태스크(Task)가 성공할 때까지 기다리는 센서(Sensor)” 입니다.

즉, 현재 DAG의 특정 태스크가 다른 DAG의 상태를 감시하며 대기하다가,
그 DAG의 특정 태스크가 완료되면 이후 단계를 진행하는 구조예요.

 

2. 기본 구조

Airflow에서 DAG은 기본적으로 독립적인 파이썬 파일입니다.
따라서 DAG 간 직접적인 연결(>>)은 불가능합니다.

ExternalTaskSensor는 Airflow 내부 메타데이터 DB를 이용해
다른 DAG(Task)의 상태를 조회하며 기다립니다.

 

3. 예시 코드

 (1) upstream_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def preprocess_data():
    print("데이터 전처리 완료!")

with DAG(
    dag_id='upstream_dag',
    start_date=datetime(2023, 8, 12),
    schedule_interval='@daily'
) as dag:

    data_preprocessing = PythonOperator(
        task_id='data_preprocessing',
        python_callable=preprocess_data
    )

➡️ upstream_dag은 데이터 전처리를 수행하는 DAG입니다

 

(2) downstream_dag.py

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
from datetime import datetime

def train_model():
    print("모델 학습 시작!")

with DAG(
    dag_id='downstream_dag',
    start_date=datetime(2023, 8, 12),
    schedule_interval='@daily'
) as dag:

    wait_for_preprocessing = ExternalTaskSensor(
        task_id='wait_for_data_preprocessing',
        external_dag_id='upstream_dag',          # 기다릴 DAG
        external_task_id='data_preprocessing',   # 기다릴 Task
        allowed_states=['success'],              # 성공 시 통과
        failed_states=['failed', 'skipped'],     # 실패 시 중단
        poke_interval=60,                        # 60초마다 상태 확인
        timeout=3600                             # 1시간 후 타임아웃
    )

    model_training = PythonOperator(
        task_id='train_model',
        python_callable=train_model
    )

    wait_for_preprocessing >> model_training

 

4. 동작 흐름 (ASCII 다이어그램)

┌──────────────────────────┐
│     upstream_dag         │
│  └── data_preprocessing  │
│        (success)         │
└────────────┬─────────────┘
             │
             ▼
┌──────────────────────────┐
│     downstream_dag       │
│  └── ExternalTaskSensor  │
│        │
│        ▼
│    train_model 실행      │
└──────────────────────────┘
  • downstream_dag의 센서는
    upstream_dag.data_preprocessing 태스크가 성공할 때까지 기다립니다.
  • 성공 시 → 모델 학습 시작
  • 실패 시 → 센서가 중단되며 전체 DAG 실패 처리

 

5. 주요 파라미터 요약

파라미터 설명
external_dag_id 감시할 DAG 이름
external_task_id 감시할 Task 이름
allowed_states 통과 가능한 상태 (보통 'success')
failed_states 실패로 간주할 상태
poke_interval 상태 확인 주기(초 단위)
timeout 최대 대기 시간 (초 단위) 

external_task_id 미지정 또는 None의 경우 

바라보는 대상
→ upstream_dag의 DagRun 전체 상태

성공 조건
→ DAG Run = success

 

 

정리하면

항목 설명
기능 다른 DAG(Task)의 상태를 감시하며 기다림
주도권 다운스트림 DAG이 감시자 역할
활용 예시 전처리 DAG → 모델 학습 DAG, ETL → 리포트 생성 등
핵심 장점 DAG 간 실행 순서를 명확히 제어 가능

ExternalTaskSensor는 Airflow에서 DAG 간 순서를 제어하는 가장 안전한 방법이다.

DAG 간 직접 연결이 불가능한 상황에서,
다른 DAG(Task)의 성공 여부를 감시하며 실행 타이밍을 제어할 수 있다.