BranchOperator란?
Airflow의 BranchOperator는 워크플로우 내에서 조건문(if-else) 역할을 수행하는 연산자입니다.
즉, 특정 조건에 따라 다음에 실행될 Task를 선택하게 합니다.
예를 들어,
- CPU 사용량이 50% 이상이면 branch_a 실행
- 그렇지 않으면 branch_b 실행
이런 식으로 분기 제어를 할 수 있습니다.
DAG 구조
이번 예제에서는 다음과 같은 간단한 DAG을 구성합니다.
start_task
↓
branch_task (MyBranchOperator)
↓
┌──────────────┬──────────────┐
│ │ │
branch_a branch_b
│ │
└──────────────┴──────────────┘
↓
end_task
코드 구조 살펴보기
from airflow.operators.bash import BashOperator
from branch_operator.default_dag import create_dag
from branch_operator.my_branch_operator import MyBranchOperator
dag = create_dag("BranchOperator")
start = BashOperator(
task_id='start_task',
bash_command='echo start',
dag=dag,
)
branch = MyBranchOperator(
task_id='branch_task',
dag=dag,
)
branch_a = BashOperator(
task_id='branch_a',
bash_command='echo "a!!!(50>)"; exit 0;',
dag=dag,
)
branch_b = BashOperator(
task_id='branch_b',
bash_command='echo "b!!!(50<)"; exit 0',
dag=dag,
)
end = BashOperator(
task_id='end_task',
bash_command='echo end',
dag=dag,
)
start >> branch
branch >> [branch_a, branch_b] >> end
주요 코드 설명
1. create_dag()
default_dag.py 내부에서 DAG 기본 설정을 반환하는 함수입니다.
예를 들어 다음과 같은 형태로 정의되어 있을 수 있습니다:
from airflow import DAG
from datetime import datetime
def create_dag(dag_id: str):
return DAG(
dag_id=dag_id,
start_date=datetime(2025, 10, 18),
schedule_interval=None,
catchup=False,
)
2. MyBranchOperator
BaseBranchOperator를 상속받아 분기 로직을 직접 구현한 커스텀 오퍼레이터입니다.
예시 구현:
from airflow.operators.branch import BaseBranchOperator
import random
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
random_value = random.randint(1, 100)
if random_value > 50:
return 'branch_a'
else:
return 'branch_b'
여기서 choose_branch() 메서드는 다음에 실행할 task_id를 문자열로 반환합니다.
3. BashOperator
각 브랜치에서 실행되는 실제 Task입니다.
bash_command='echo "a!!!(50>)"; exit 0;'
명령은 다음을 수행합니다:
- "a!!!(50>)" 라는 문자열을 출력 (echo)
- 정상 종료 (exit 0) — Airflow는 종료 코드 0을 “성공”으로 판단합니다.
즉, 로그 상에는 단순히 문자열만 찍히고 태스크는 성공 처리됩니다.\]
실행 흐름
- DAG 실행 시 start_task → branch_task 순서로 실행됩니다.
- MyBranchOperator의 choose_branch() 함수가 실행되어 조건 판별 수행
- 조건 결과에 따라 branch_a 또는 branch_b 중 하나만 실행
- 이후 공통적으로 end_task로 흐름이 합쳐짐
정리하면
Airflow의 BranchOperator를 활용해
조건 기반 분기(Conditional Branching) 를 구현해봤습니다.
핵심 요점은 다음과 같습니다:
- BaseBranchOperator 상속 후 choose_branch() 구현
- 반환된 task_id만 실행됨 (나머지는 skip)
- BashOperator를 이용해 각 브랜치 동작 명확히 확인 가능
이제 이 개념을 확장해서
예를 들어 API 응답, DB 쿼리 결과, CPU 사용량, 날짜 조건 등
다양한 실시간 조건에 따라 워크플로우를 유연하게 제어할 수 있습니다.
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow에서 날짜 기반 분기 — BranchDateTimeOperator 활용하기 (1) | 2025.10.18 |
|---|---|
| BranchOperator vs BranchPythonOperator 비교 (0) | 2025.10.18 |
| Airflow Sensor의 soft_fail 이해하기 (0) | 2025.10.18 |
| Airflow Sensor 실습 — PythonSensor로 파일 확인 (0) | 2025.10.18 |
| Airflow XCom과 ShortCircuitOperator 이해하기 (0) | 2025.10.18 |