컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow 실전 팁 정리 — DAG 설계부터 변수 관리까지

Data Jun 2025. 10. 19. 20:43

Apache Airflow를 사용하다 보면, 단순한 DAG 정의를 넘어서
코드를 더 깔끔하고 확장성 있게 구성하는 방법이 필요합니다.
오늘은 그중에서도 DAG 문맥(context) 활용법, Task 관리,
그리고 변수·파라미터 전달 팁을 중심으로 정리해봤습니다.

 

1. DAG와 Context Manager (with 구문)

Airflow의 DAG는 단순히 작업(Task)을 담는 컨테이너가 아니라
컨텍스트 매니저(Context Manager) 로 동작합니다.

with DAG('example_dag', start_date=days_ago(2)) as dag:
    task1 = DummyOperator(task_id='task1')
    task2 = DummyOperator(task_id='task2')
    task3 = DummyOperator(task_id='task3')

task1 >> task2 >> task3
  • with 블록 내부에서 정의된 Task들은 자동으로 해당 DAG에 속합니다.
  • 즉, dag=dag를 명시적으로 넘길 필요가 없습니다.
  • DAG 클래스는 __enter__() / __exit__() 메서드를 통해
    블록 진입 시 DAG 컨텍스트를 활성화하고, 종료 시 닫습니다.

핵심 포인트

Airflow의 with DAG(...) 구문은 단순한 문법적 편의가 아니라,
“현재 DAG에 속한 작업들을 자동으로 관리하는 문맥(Context)”을 제공하는 기능입니다.

 

2. Task 관계를 리스트로 정의하기

Task 간의 의존성을 정의할 때는 >> 연산자를 사용하죠.
하지만 Task가 많아질수록 연결 코드가 길어집니다.
이럴 때는 리스트를 사용해 훨씬 간결하게 표현할 수 있습니다.

task1 >> [task2, task3, task4] >> task5

이 한 줄로 아래 코드와 같은 의미를 가집니다.

task1 >> task2
task1 >> task3
task1 >> task4
task2 >> task5
task3 >> task5
task4 >> task5

핵심 포인트

리스트를 활용하면 DAG의 의존 관계를 간결하고 직관적으로 표현할 수 있습니다.

 

3. default_args로 기본 설정 관리하기

여러 Task에 반복되는 설정(start_date, owner, retries 등)을
default_args 딕셔너리로 한 번에 관리할 수 있습니다.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2)
}

with DAG('default_arg_example', default_args=default_args) as dag:
    t1 = DummyOperator(task_id='task1')
    t2 = DummyOperator(task_id='task2')
    t3 = DummyOperator(task_id='task3')

핵심 포인트

DAG 전체에 공통적으로 적용할 인자를 default_args로 정의하면
유지보수성이 높아지고 코드가 깔끔해집니다.

 

4. params 인자로 동적 파라미터 전달하기

Airflow에서는 params 인자를 통해 템플릿에 변수를 주입할 수 있습니다.

params = {'p1': 'v1', 'p2': 'v2'}

with DAG('params_example', start_date=days_ago(2), params=params):
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo {{ params.p1 }}'  # 출력: v1
    )
    task2 = BashOperator(
        task_id='task2',
        bash_command='echo {{ params.p2 }} {{ params.p3 }}',  # 출력: v2 v3
        params={'p3': 'v3'}  # DAG-level params를 오버라이드
    )

task1 >> task2

핵심 포인트

params는 DAG 레벨에서 전역 변수처럼 활용 가능하며,
Jinja 템플릿({{ params.key }})을 통해 동적으로 명령어에 삽입됩니다.

  • DAG 단위로 정의된 params는 모든 Task에서 공통적으로 참조 가능
  • Task 내부에서 params를 오버라이드(override) 하면
    DAG-level params보다 Task-level params가 우선 적용
  • {{ params.key }} 문법으로 Jinja 템플릿 내에서 접근 가능

 

5. BaseHook으로 민감 데이터 관리하기

 

DB 접속 정보나 API 키 등은 코드에 직접 쓰면 보안상 위험합니다.
이럴 때 Airflow의 Connection 기능과 BaseHook을 사용하세요.

from airflow.hooks.base_hook import BaseHook

def _access_connection(ti):
    conn = BaseHook.get_connection('my_postgres_connection')
    print(conn.host, conn.password)

핵심 포인트

Connection은 Airflow UI의 Admin → Connections에서 등록 후,
BaseHook.get_connection()으로 안전하게 불러올 수 있습니다.

 

6. Variable로 환경별 값 관리하기

Airflow의 Variable은 환경 설정이나 외부 파라미터를 저장하는 Key-Value 저장소입니다.

from airflow.models import Variable

# 개별 호출 (비추천 — 요청 과다)
print(Variable.get("var1"))
print(Variable.get("var2"))

# 추천 방식: JSON 형태로 묶어서 가져오기
var_dict = Variable.get("var_dict", deserialize_json=True)
print(var_dict['var1'])
print(var_dict['var2'])

핵심 포인트

 

  • Variable.get(key)는 Airflow 메타DB에 직접 접근하기 때문에
    여러 번 호출하면 비효율적입니다.
  • JSON으로 묶은 뒤 deserialize_json=True 옵션을 주면
    한 번의 쿼리로 여러 값을 로드할 수 있습니다.

 

7. 동적 Task 생성하기 (Dynamic Task)

Task 수가 많거나 반복 구조가 필요할 때는 루프를 이용해 Task를 동적으로 생성할 수 있습니다.

with DAG('dynamic_task', start_date=days_ago(2)) as dag:
    tasks = []
    for i in range(5):
        tasks.append(DummyOperator(task_id=f'task_{i}'))
        if i != 0:
            tasks[i-1] >> tasks[i]

위 코드는 task_0 → task_1 → task_2 → task_3 → task_4 구조를 자동으로 생성합니다.

핵심 포인트

반복문을 활용하면 Task 수에 따라 유연하게 DAG를 구성할 수 있습니다.
동적 DAG 생성은 데이터 파이프라인 자동화의 핵심 도구입니다.

 

 

정리하면

 

Airflow는 단순히 Task를 스케줄링하는 도구가 아니라,
데이터 파이프라인을 구조적으로 관리하는 프레임워크입니다.

컨텍스트 매니저(with DAG(...))를 활용하고,
default_args, params, Variable, BaseHook 등을 적절히 사용하면
유지보수성 높은 DAG를 설계할 수 있습니다.