컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow BranchOperator로 조건 분기 처리하기

Data Jun 2025. 10. 18. 16:05

BranchOperator란?

Airflow의 BranchOperator는 워크플로우 내에서 조건문(if-else) 역할을 수행하는 연산자입니다.
즉, 특정 조건에 따라 다음에 실행될 Task를 선택하게 합니다.

예를 들어,

  • CPU 사용량이 50% 이상이면 branch_a 실행
  • 그렇지 않으면 branch_b 실행
    이런 식으로 분기 제어를 할 수 있습니다.

DAG 구조

이번 예제에서는 다음과 같은 간단한 DAG을 구성합니다.

start_task
   ↓
branch_task  (MyBranchOperator)
   ↓
 ┌──────────────┬──────────────┐
 │              │              │
branch_a    branch_b
 │              │
 └──────────────┴──────────────┘
         ↓
      end_task

 

코드 구조 살펴보기

from airflow.operators.bash import BashOperator
from branch_operator.default_dag import create_dag
from branch_operator.my_branch_operator import MyBranchOperator

dag = create_dag("BranchOperator")

start = BashOperator(
    task_id='start_task',
    bash_command='echo start',
    dag=dag,
)

branch = MyBranchOperator(
    task_id='branch_task',
    dag=dag,
)

branch_a = BashOperator(
    task_id='branch_a',
    bash_command='echo "a!!!(50>)"; exit 0;',
    dag=dag,
)

branch_b = BashOperator(
    task_id='branch_b',
    bash_command='echo "b!!!(50<)"; exit 0',
    dag=dag,
)

end = BashOperator(
    task_id='end_task',
    bash_command='echo end',
    dag=dag,
)

start >> branch
branch >> [branch_a, branch_b] >> end

 

주요 코드 설명

 1. create_dag()

 

default_dag.py 내부에서 DAG 기본 설정을 반환하는 함수입니다.
예를 들어 다음과 같은 형태로 정의되어 있을 수 있습니다:

from airflow import DAG
from datetime import datetime

def create_dag(dag_id: str):
    return DAG(
        dag_id=dag_id,
        start_date=datetime(2025, 10, 18),
        schedule_interval=None,
        catchup=False,
    )

 

2. MyBranchOperator

 

BaseBranchOperator를 상속받아 분기 로직을 직접 구현한 커스텀 오퍼레이터입니다.

예시 구현:

from airflow.operators.branch import BaseBranchOperator
import random

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        random_value = random.randint(1, 100)
        if random_value > 50:
            return 'branch_a'
        else:
            return 'branch_b'

여기서 choose_branch() 메서드는 다음에 실행할 task_id를 문자열로 반환합니다.

 

3. BashOperator

 

각 브랜치에서 실행되는 실제 Task입니다.

bash_command='echo "a!!!(50>)"; exit 0;'

명령은 다음을 수행합니다:

  1. "a!!!(50>)" 라는 문자열을 출력 (echo)
  2. 정상 종료 (exit 0) — Airflow는 종료 코드 0을 “성공”으로 판단합니다.

즉, 로그 상에는 단순히 문자열만 찍히고 태스크는 성공 처리됩니다.\]

 

 

실행 흐름

  1. DAG 실행 시 start_task → branch_task 순서로 실행됩니다.
  2. MyBranchOperator의 choose_branch() 함수가 실행되어 조건 판별 수행
  3. 조건 결과에 따라 branch_a 또는 branch_b 중 하나만 실행
  4. 이후 공통적으로 end_task로 흐름이 합쳐짐

 

 

정리하면

 

Airflow의 BranchOperator를 활용해
조건 기반 분기(Conditional Branching) 를 구현해봤습니다.

핵심 요점은 다음과 같습니다:

  • BaseBranchOperator 상속 후 choose_branch() 구현
  • 반환된 task_id만 실행됨 (나머지는 skip)
  • BashOperator를 이용해 각 브랜치 동작 명확히 확인 가능

이제 이 개념을 확장해서
예를 들어 API 응답, DB 쿼리 결과, CPU 사용량, 날짜 조건
다양한 실시간 조건에 따라 워크플로우를 유연하게 제어할 수 있습니다.