컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

BranchOperator vs BranchPythonOperator 비교

Data Jun 2025. 10. 18. 16:15

BranchOperator (또는 BaseBranchOperator)

BranchOperator는 “직접 상속받아서 새 오퍼레이터를 만드는 용도”입니다.
즉, 아래처럼 클래스를 새로 만들어서 로직을 커스터마이징해야 합니다.

from airflow.operators.branch import BaseBranchOperator
import random

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        random_value = random.randint(1, 100)
        if random_value > 50:
            return 'branch_a'
        else:
            return 'branch_b'
  • 핵심 포인트는 choose_branch() 메서드를 반드시 구현해야 한다는 점입니다.
  • Airflow는 이 메서드의 반환값을 이용해 어떤 task를 실행할지 결정합니다.
  • 반환값은 다음 중 하나여야 합니다:
    • 단일 task_id (문자열)
    • 여러 개의 task_id (리스트나 튜플)

즉, 이건 새로운 오퍼레이터를 설계할 때 사용합니다.

 

BranchPythonOperator

반대로, BranchPythonOperator는
“그냥 DAG 안에서 간단한 조건 분기만 하고 싶다”는 경우에 사용합니다.
별도의 클래스를 정의할 필요 없이 함수로 분기 조건을 정의하면 됩니다.

from airflow.operators.python import BranchPythonOperator

def _choose_branch(**context):
    import random
    if random.randint(1, 100) > 50:
        return 'branch_a'
    else:
        return 'branch_b'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=_choose_branch,
    dag=dag,
)

차이점은 choose_branch()를 직접 override하지 않고
python_callable로 전달된 함수가 내부적으로 호출된다는 점이에요.

 

내부 구조 관계

사실 BranchPythonOperator는 내부적으로 BaseBranchOperator를 상속받습니다.
즉, 이런 관계입니다 👇

BaseOperator
   └── BaseBranchOperator
         └── BranchPythonOperator

즉, BranchPythonOperator는 **“BaseBranchOperator + PythonCallable 기능”**이 결합된 버전이라고 보면 됩니다.

 

언제 어떤 걸 써야 할까?

상황 추천 오퍼레이터
간단한 조건 분기 BranchPythonOperator
복잡한 조건 로직 (API, DB, 외부 의존성 등) BranchPythonOperator
재사용 가능한 커스텀 연산자 설계 BaseBranchOperator
Airflow Plugin 개발, Operator 확장 BaseBranchOperator

 

예시 비교

🔸 BranchPythonOperator 버전

from airflow.operators.python import BranchPythonOperator

def _choose_branch():
    import random
    if random.randint(1, 100) > 50:
        return 'branch_a'
    return 'branch_b'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=_choose_branch,
    dag=dag,
)

🔸 BaseBranchOperator 상속 버전

from airflow.operators.branch import BaseBranchOperator
import random

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        if random.randint(1, 100) > 50:
            return 'branch_a'
        return 'branch_b'

branch = MyBranchOperator(
    task_id='branch_task',
    dag=dag,
)

 

정리하면

항목 BranchOperator BranchPythonOperator
용도 커스텀 Operator 개발 DAG 내 간단한 분기
로직 정의 방식 choose_branch() 메서드 오버라이딩 python_callable 함수 전달
코드량 많음 적음
확장성 높음(클래스 단위) 제한적(함수 단위)
추천 상황 복잡한 분기 처리, Operator 재사용 DAG 내부 단순 조건 분

🔸 BranchPythonOperator는 “함수형 분기”
🔸 BaseBranchOperator는 “클래스형 분기”