BranchOperator (또는 BaseBranchOperator)
BranchOperator는 “직접 상속받아서 새 오퍼레이터를 만드는 용도”입니다.
즉, 아래처럼 클래스를 새로 만들어서 로직을 커스터마이징해야 합니다.
from airflow.operators.branch import BaseBranchOperator
import random
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
random_value = random.randint(1, 100)
if random_value > 50:
return 'branch_a'
else:
return 'branch_b'
- 핵심 포인트는 choose_branch() 메서드를 반드시 구현해야 한다는 점입니다.
- Airflow는 이 메서드의 반환값을 이용해 어떤 task를 실행할지 결정합니다.
- 반환값은 다음 중 하나여야 합니다:
- 단일 task_id (문자열)
- 여러 개의 task_id (리스트나 튜플)
즉, 이건 새로운 오퍼레이터를 설계할 때 사용합니다.
BranchPythonOperator
반대로, BranchPythonOperator는
“그냥 DAG 안에서 간단한 조건 분기만 하고 싶다”는 경우에 사용합니다.
별도의 클래스를 정의할 필요 없이 함수로 분기 조건을 정의하면 됩니다.
from airflow.operators.python import BranchPythonOperator
def _choose_branch(**context):
import random
if random.randint(1, 100) > 50:
return 'branch_a'
else:
return 'branch_b'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=_choose_branch,
dag=dag,
)
차이점은 choose_branch()를 직접 override하지 않고
python_callable로 전달된 함수가 내부적으로 호출된다는 점이에요.
내부 구조 관계
사실 BranchPythonOperator는 내부적으로 BaseBranchOperator를 상속받습니다.
즉, 이런 관계입니다 👇
BaseOperator
└── BaseBranchOperator
└── BranchPythonOperator
즉, BranchPythonOperator는 **“BaseBranchOperator + PythonCallable 기능”**이 결합된 버전이라고 보면 됩니다.
언제 어떤 걸 써야 할까?
| 상황 | 추천 오퍼레이터 |
| 간단한 조건 분기 | BranchPythonOperator |
| 복잡한 조건 로직 (API, DB, 외부 의존성 등) | BranchPythonOperator |
| 재사용 가능한 커스텀 연산자 설계 | BaseBranchOperator |
| Airflow Plugin 개발, Operator 확장 | BaseBranchOperator |
예시 비교
🔸 BranchPythonOperator 버전
from airflow.operators.python import BranchPythonOperator
def _choose_branch():
import random
if random.randint(1, 100) > 50:
return 'branch_a'
return 'branch_b'
branch = BranchPythonOperator(
task_id='branch_task',
python_callable=_choose_branch,
dag=dag,
)
🔸 BaseBranchOperator 상속 버전
from airflow.operators.branch import BaseBranchOperator
import random
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
if random.randint(1, 100) > 50:
return 'branch_a'
return 'branch_b'
branch = MyBranchOperator(
task_id='branch_task',
dag=dag,
)
정리하면
| 항목 | BranchOperator | BranchPythonOperator |
| 용도 | 커스텀 Operator 개발 | DAG 내 간단한 분기 |
| 로직 정의 방식 | choose_branch() 메서드 오버라이딩 | python_callable 함수 전달 |
| 코드량 | 많음 | 적음 |
| 확장성 | 높음(클래스 단위) | 제한적(함수 단위) |
| 추천 상황 | 복잡한 분기 처리, Operator 재사용 | DAG 내부 단순 조건 분 |
🔸 BranchPythonOperator는 “함수형 분기”
🔸 BaseBranchOperator는 “클래스형 분기”
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow에서 요일 기반 분기 — BranchDayOfWeekOperator 활용하기 (0) | 2025.10.18 |
|---|---|
| Airflow에서 날짜 기반 분기 — BranchDateTimeOperator 활용하기 (1) | 2025.10.18 |
| Airflow BranchOperator로 조건 분기 처리하기 (0) | 2025.10.18 |
| Airflow Sensor의 soft_fail 이해하기 (0) | 2025.10.18 |
| Airflow Sensor 실습 — PythonSensor로 파일 확인 (0) | 2025.10.18 |