Airflow를 사용하다 보면, 워크플로우가 복잡해지면서 태스크들을 논리적으로 묶고 싶을 때가 있습니다.
예전에는 이를 위해 SubDagOperator를 사용했지만,
Airflow 2.x 이후로는 TaskGroup이 공식적으로 권장되는 방법입니다.
이번 글에서는 TaskGroup을 사용해 여러 태스크를 깔끔하게 묶는 방법을 살펴봅니다.
SubDAG의 한계와 TaskGroup의 등장
SubDagOperator는 과거에 DAG 안에 또 다른 DAG을 중첩해서 관리할 수 있는 기능이었습니다.
하지만 다음과 같은 문제들이 있었습니다.
- 스케줄러가 중복으로 동작해 성능 저하 발생
- 병렬 실행이 어렵고 SequentialExecutor로 제한
- DAG 구조가 복잡해져 의존성 관리가 어려움
- Airflow UI에서 시각적으로 혼란스러움
이러한 문제를 해결하기 위해 등장한 것이 TaskGroup입니다.
TaskGroup은 단순히 시각적이고 논리적인 그룹화만 제공하고,
실행은 기존 DAG 안에서 그대로 이뤄집니다.
예제 코드: TaskGroup으로 워크플로우 구성하기
from airflow.decorators import dag, task_group
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# 실행할 Python 함수 정의
def print_hello():
print("Hello")
def print_world():
print("World")
@dag(
schedule_interval='@daily',
start_date=days_ago(1),
catchup=False
)
def my_task_group():
# 시작 태스크
start = EmptyOperator(task_id='start')
# 첫 번째 그룹 정의
@task_group(group_id='group_1')
def group_1_task():
task_1 = PythonOperator(task_id='print_hello', python_callable=print_hello)
task_2 = PythonOperator(task_id='print_world', python_callable=print_world)
# 두 번째 그룹 정의
@task_group(group_id='group_2')
def group_2_task():
task_3 = PythonOperator(task_id='print_airflow', python_callable=print_hello)
task_4 = PythonOperator(task_id='print_fastcampus', python_callable=print_world)
# 그룹 인스턴스 생성
task_group_1 = group_1_task()
task_group_2 = group_2_task()
# 종료 태스크
end = EmptyOperator(task_id='end')
# 실행 순서 정의
start >> task_group_1 >> task_group_2 >> end
dag = my_task_group()
코드 설명
1️⃣ DAG 정의
@dag(schedule_interval='@daily', start_date=days_ago(1), catchup=False)
- 하루에 한 번(@daily) 실행되는 DAG을 정의합니다.
- catchup=False로 과거 날짜에 대한 자동 실행은 방지합니다.
2️⃣ @task_group으로 그룹 정의
@task_group(group_id='group_1')
def group_1_task():
...
- @task_group 데코레이터로 태스크 묶음을 정의합니다.
- group_id는 UI에서 표시될 그룹 이름입니다.
- 그룹 안에는 PythonOperator나 BashOperator 등 원하는 태스크를 자유롭게 넣을 수 있습니다.
3️⃣ 그룹 간 순서 정의
start >> task_group_1 >> task_group_2 >> end
- 그룹 자체가 태스크처럼 동작하므로, >> 연산자를 통해 순서를 지정할 수 있습니다.
- 그룹 안의 태스크들은 병렬로 실행되지만, 그룹 간에는 순차적으로 실행됩니다.
TaskGroup DAG 구조 (ASCII 도식)
+----------------+
| start |
+----------------+
│
▼
+----------------------+
| group_1 |
| ├─ print_hello |
| └─ print_world |
+----------------------+
│
▼
+----------------------+
| group_2 |
| ├─ print_airflow |
| └─ print_fastcampus |
+----------------------+
│
▼
+----------------+
| end |
+----------------+
- 각 TaskGroup은 내부적으로 여러 태스크를 병렬로 실행합니다.
- UI에서도 group_1, group_2가 접히는 형태로 표시되어 보기 깔끔합니다.
정리하면
| 항목 | 설명0 |
| @task_group | 여러 태스크를 논리적으로 묶는 데코레이터 |
| PythonOperator | 파이썬 함수를 실행하는 오퍼레이터 |
| EmptyOperator | 실행 로직 없이 DAG 구조를 정의하는 용도 |
| 장점 | SubDAG의 복잡성을 제거하고, UI 가독성과 관리성을 향상 |
| 실행 순서 | start → group_1 → group_2 → end |
🔹 TaskGroup은 SubDAG의 문제를 해결한 현대적 대안입니다.
🔹 단순히 UI 그룹화를 위해 사용되며, 병렬 실행 및 가시성이 뛰어납니다.
🔹 복잡한 DAG이라면 TaskGroup으로 논리적 구획을 만들어 관리하세요.
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow ExternalTaskSensor 이해하기 (0) | 2025.10.19 |
|---|---|
| Airflow에서 한 DAG이 다른 DAG을 트리거(Trigger)하기 (0) | 2025.10.18 |
| Airflow에서 요일 기반 분기 — BranchDayOfWeekOperator 활용하기 (0) | 2025.10.18 |
| Airflow에서 날짜 기반 분기 — BranchDateTimeOperator 활용하기 (1) | 2025.10.18 |
| BranchOperator vs BranchPythonOperator 비교 (0) | 2025.10.18 |