컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow TaskGroup — 복잡한 DAG을 깔끔하게 묶는 방법

Data Jun 2025. 10. 19. 17:16

Airflow로 DAG을 만들다 보면,
비슷한 종류의 작업(Task)이 반복되거나
논리적으로 하나의 단위로 묶는 게 더 보기 좋은 경우가 많습니다.

예를 들어 👇

  • 데이터 수집, 전처리, 저장을 하나의 단계로 묶고 싶거나
  • 동일한 구조의 Task를 여러 개 반복할 때

이럴 때 유용하게 쓰이는 기능이 바로 TaskGroup입니다.

 

TaskGroup이란?

TaskGroup은 여러 Task를 하나의 “그룹”으로 묶어서
DAG 그래프 상에서 논리적 단위로 표현할 수 있게 해주는 기능입니다.

 

즉, DAG를 시각적으로도 깔끔하게 정리하면서
코드 구조도 모듈화할 수 있습니다.

 

코드 예시

with DAG(
    dag_id="task_group",
    start_date=datetime.datetime(2023, 7, 1),
    schedule="@daily",
    default_args={"retries": 1},
):
    @task_group(default_args={"retries": 3})
    def group1():
        """This docstring will become the tooltip for the TaskGroup."""
        task1 = EmptyOperator(task_id="task1")
        task2 = BashOperator(
            task_id="task2",
            bash_command="echo Hello World!",
            retries=2
        )
        print(task1.retries)  # 3
        print(task2.retries)  # 2

    task3 = EmptyOperator(task_id="task3")

    group1() >> task3

 

코드 동작 해설

 1. DAG 기본 설정

default_args={"retries": 1}

 

  • DAG 전체에 공통적으로 적용되는 기본 설정입니다.
  • 즉, 아무런 설정이 없으면 각 Task는 실패 시 1회 재시도(retry) 합니다.

2. TaskGroup 정의

@task_group(default_args={"retries": 3})
def group1():

 

 

3. 그룹 내부 Task

task1 = EmptyOperator(task_id="task1")
task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
  • task1은 별도의 retries 설정이 없으므로 → 그룹 기본값(3회 재시도) 사용
  • task2는 직접 retries=2로 지정했으므로 → 2회 재시도

즉, TaskGroup 내부에서도 각 Task별로 설정을 재정의할 수 있습니다.

 

4. Task 간 관계

group1() >> task3
  • group1() 실행 시, TaskGroup 전체가 하나의 노드처럼 동작합니다.
  • 따라서 DAG 그래프에서는 group1 박스 안에 task1, task2가 묶여 있고,
    그 다음에 task3이 실행됩니다.

시각적으로 보면 이렇게 보입니다 👇

[group1]
   ├── task1
   └── task2
        ↓
      task3

 

 

정리하면

 

TaskGroup은 DAG 안의 여러 Task를 하나의 논리적 블록으로 묶어서,
DAG를 깔끔하게 정리하고 유지보수를 쉽게 만들어주는 기능이다