컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow에서 한 DAG이 다른 DAG을 트리거(Trigger)하기

Data Jun 2025. 10. 18. 23:23

데이터 파이프라인을 운영하다 보면, 한 DAG이 다른 DAG을 자동으로 실행시키는 구조가 필요할 때가 있습니다.
예를 들어, 전처리 DAG이 완료된 뒤 모델 학습 DAG을 자동으로 실행시키는 경우죠.

 

이번 포스트에서는 **Airflow의 TriggerDagRunOperator**를 활용하여
DAG 간에 실행 신호를 전달하고, 설정(conf) 값까지 함께 넘기는 방법을 알아보겠습니다.

 

1. 트리거 역할 DAG — conf_trigger_dag

먼저, 다른 DAG을 실행시키는 트리거용 DAG를 만듭니다.

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

conf_dag_trigger = DAG(
    'conf_trigger_dag',
    start_date=datetime(2023, 8, 12),
    schedule_interval=None
)

conf_trigger_task = TriggerDagRunOperator(
    task_id='conf_trigger_dagrun',
    trigger_dag_id='conf_target_dag',
    conf={"passenger": "fastcampus"},
    dag=conf_dag_trigger,
)

코드 설명

요소 설명
DAG('conf_trigger_dag') 트리거 역할을 담당하는 DAG ID
schedule_interval=None 수동으로만 실행되도록 설정
TriggerDagRunOperator 다른 DAG을 트리거하는 오퍼레이터
trigger_dag_id='conf_target_dag' 실행시킬 대상 DAG ID
cof = {"passenger": "fastcampus"} 대상 DAG으로 전달할 설정 값
dag=conf_dag_trigger 이 태스크가 속한 DAG 지정

즉,
이 DAG을 실행하면 conf_target_dag이 자동으로 실행되며,
“passenger: fastcampus”라는 conf 값이 함께 전달됩니다.

 

2. 대상 DAG — conf_target_dag

이제 트리거로부터 신호를 받아 실제 작업을 수행할 대상 DAG을 작성합니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

conf_dag_target = DAG(
    'conf_target_dag',
    start_date=datetime(2023, 8, 12),
    schedule_interval=None
)

def print_conf(**kwargs):
    print(kwargs['dag_run'].conf['passenger'])

print_conf_task = PythonOperator(
    task_id='print_conf',
    python_callable=print_conf,
    provide_context=True,
    dag=conf_dag_target
)
코드 설명

 

요소 설명
DAG('conf_target_dag') 트리거에 의해 실행되는 대상 DAG
def print_conf(**kwargs) 컨텍스트를 받아 conf 값을 출력하는 함수
kwargs['dag_run'].conf['passenger'] 트리거 DAG에서 전달된 conf 값 접근
PythonOperator Python 함수를 실행하는 오퍼레이터
provide_context=True Airflow의 실행 컨텍스트를 kwargs로 전달

이 DAG은 실행되면 트리거로부터 전달된 값을 출력합니다.

fastcampus

 

3. DAG 간 실행 흐름 요약

전체 데이터 흐름은 다음과 같습니다:

conf_trigger_dag
    ↓
TriggerDagRunOperator
    ↓ (conf={"passenger": "fastcampus"})
conf_target_dag
    ↓
PythonOperator → print_conf(**kwargs)
    ↓
print("fastcampus")

즉, 한 DAG에서 다른 DAG을 자동으로 실행시키면서 파라미터(conf) 까지 전달하는 구조입니다.

 

 

정리하면

 

Airflow에서 TriggerDagRunOperator를 활용하면 DAG 간 실행 제어가 매우 유연해집니다.
파이프라인을 더 모듈화하고, 각 DAG을 독립적으로 관리하면서도 전체 흐름을 하나로 연결할 수 있습니다.