다른 DAG(Task)의 실행 완료를 기다리는 센서
Airflow로 여러 DAG을 운영하다 보면,
서로 다른 DAG 간에 실행 순서를 맞춰야 할 때가 있습니다.
예를 들어,
“데이터 전처리 DAG이 끝난 다음 모델 학습 DAG이 실행돼야 한다.”
이럴 때 사용하는 게 바로 ExternalTaskSensor 입니다.
1. ExternalTaskSensor란?
ExternalTaskSensor는 이름 그대로
“외부(External) DAG의 특정 태스크(Task)가 성공할 때까지 기다리는 센서(Sensor)” 입니다.
즉, 현재 DAG의 특정 태스크가 다른 DAG의 상태를 감시하며 대기하다가,
그 DAG의 특정 태스크가 완료되면 이후 단계를 진행하는 구조예요.
2. 기본 구조
Airflow에서 DAG은 기본적으로 독립적인 파이썬 파일입니다.
따라서 DAG 간 직접적인 연결(>>)은 불가능합니다.
ExternalTaskSensor는 Airflow 내부 메타데이터 DB를 이용해
다른 DAG(Task)의 상태를 조회하며 기다립니다.
3. 예시 코드
(1) upstream_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def preprocess_data():
print("데이터 전처리 완료!")
with DAG(
dag_id='upstream_dag',
start_date=datetime(2023, 8, 12),
schedule_interval='@daily'
) as dag:
data_preprocessing = PythonOperator(
task_id='data_preprocessing',
python_callable=preprocess_data
)
➡️ upstream_dag은 데이터 전처리를 수행하는 DAG입니다
(2) downstream_dag.py
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
from datetime import datetime
def train_model():
print("모델 학습 시작!")
with DAG(
dag_id='downstream_dag',
start_date=datetime(2023, 8, 12),
schedule_interval='@daily'
) as dag:
wait_for_preprocessing = ExternalTaskSensor(
task_id='wait_for_data_preprocessing',
external_dag_id='upstream_dag', # 기다릴 DAG
external_task_id='data_preprocessing', # 기다릴 Task
allowed_states=['success'], # 성공 시 통과
failed_states=['failed', 'skipped'], # 실패 시 중단
poke_interval=60, # 60초마다 상태 확인
timeout=3600 # 1시간 후 타임아웃
)
model_training = PythonOperator(
task_id='train_model',
python_callable=train_model
)
wait_for_preprocessing >> model_training
4. 동작 흐름 (ASCII 다이어그램)
┌──────────────────────────┐
│ upstream_dag │
│ └── data_preprocessing │
│ (success) │
└────────────┬─────────────┘
│
▼
┌──────────────────────────┐
│ downstream_dag │
│ └── ExternalTaskSensor │
│ │
│ ▼
│ train_model 실행 │
└──────────────────────────┘
- downstream_dag의 센서는
upstream_dag.data_preprocessing 태스크가 성공할 때까지 기다립니다. - 성공 시 → 모델 학습 시작
- 실패 시 → 센서가 중단되며 전체 DAG 실패 처리
5. 주요 파라미터 요약
| 파라미터 | 설명 |
| external_dag_id | 감시할 DAG 이름 |
| external_task_id | 감시할 Task 이름 |
| allowed_states | 통과 가능한 상태 (보통 'success') |
| failed_states | 실패로 간주할 상태 |
| poke_interval | 상태 확인 주기(초 단위) |
| timeout | 최대 대기 시간 (초 단위) |
external_task_id 미지정 또는 None의 경우
바라보는 대상
→ upstream_dag의 DagRun 전체 상태
성공 조건
→ DAG Run = success
정리하면
| 항목 | 설명 |
| 기능 | 다른 DAG(Task)의 상태를 감시하며 기다림 |
| 주도권 | 다운스트림 DAG이 감시자 역할 |
| 활용 예시 | 전처리 DAG → 모델 학습 DAG, ETL → 리포트 생성 등 |
| 핵심 장점 | DAG 간 실행 순서를 명확히 제어 가능 |
ExternalTaskSensor는 Airflow에서 DAG 간 순서를 제어하는 가장 안전한 방법이다.
DAG 간 직접 연결이 불가능한 상황에서,
다른 DAG(Task)의 성공 여부를 감시하며 실행 타이밍을 제어할 수 있다.
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow로 Star Wars API 데이터를 PostgreSQL에 적재하기 (0) | 2025.10.19 |
|---|---|
| Airflow ExternalTaskMarker 이해하기 (0) | 2025.10.19 |
| Airflow에서 한 DAG이 다른 DAG을 트리거(Trigger)하기 (0) | 2025.10.18 |
| Airflow에서 SubDAG 대신 TaskGroup 사용하기 (0) | 2025.10.18 |
| Airflow에서 요일 기반 분기 — BranchDayOfWeekOperator 활용하기 (0) | 2025.10.18 |