Dataset이란?
Airflow 2.4부터 도입된 개념으로,
데이터 파일이나 테이블 등 “데이터 그 자체”를 DAG 간 연결의 중심으로 삼는 방법입니다.
즉, “데이터가 갱신되면 다른 DAG을 실행시키자”
이런 형태의 데이터 중심 스케줄링을 가능하게 하는 기능이에요.
Producer & Consumer 개념
Dataset은 데이터 생산자(Producer) 와 소비자(Consumer) DAG을 연결합니다.
- 🏭 Producer DAG → 어떤 데이터 파일이나 테이블을 만들거나 갱신하는 역할
- 🧑💻 Consumer DAG → 그 데이터가 업데이트될 때 자동 실행되는 DAG
👉 데이터가 갱신되면, Consumer DAG이 “아, 새로운 데이터가 왔네!” 하면서 자동으로 실행되는 구조예요.
코드로 이해하기
(1) Dataset 정의
from airflow.datasets import Dataset
example_dataset = Dataset("s3://dataset-bucket/example.csv")
- Dataset() 은 데이터를 URI 형식 문자열로 정의합니다.
- 보통 파일 경로나 테이블명을 넣어요.
- 정규식(Regex)은 지원하지 않습니다 — 단순 문자열로만 인식합니다.
(2) Producer DAG
from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from datetime import datetime
local_file = Dataset("/tmp/sample.txt")
with DAG(
dag_id="my_producer",
schedule="0 0 * * *", # 매일 자정 실행
start_date=datetime(2023, 7, 1),
catchup=False
) as dag:
task_producer = BashOperator(
task_id="producer1",
bash_command="echo 'hello world' >> /tmp/sample.txt",
outlets=[local_file] # 여기서 Dataset을 "생산"
)
핵심 포인트:
- outlets=[local_file]
→ 이 DAG이 어떤 Dataset을 갱신하는지 Airflow에 알려주는 부분입니다. - 즉, /tmp/sample.txt 파일이 갱신되면
이 Dataset이 “업데이트됨” 으로 표시됩니다.
(3) Consumer DAG
from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from datetime import datetime
local_file = Dataset("/tmp/sample.txt")
with DAG(
dag_id="my_consumer",
schedule=[local_file], # Dataset을 스케줄 트리거로 사용!
start_date=datetime(2023, 7, 1),
catchup=False
) as dag:
task_consumer = BashOperator(
task_id="consumer1",
bash_command="cat /tmp/sample.txt"
)
핵심 포인트:
- schedule=[local_file]
→ 이 DAG은 스케줄 주기 없이,
오직 해당 Dataset이 업데이트될 때만 자동 실행됩니다. - 즉, my_producer DAG이 /tmp/sample.txt를 수정하면
Airflow가 자동으로 my_consumer DAG을 트리거합니다.
동작 순서 (ASCII 흐름도)
+---------------------+
| my_producer DAG |
| (echo >> sample.txt) |
+----------+----------+
|
v
[ /tmp/sample.txt ] ---> Dataset Updated!
|
v
+---------------------+
| my_consumer DAG |
| (cat sample.txt) |
+---------------------+
정리하면
| 역할 | DAG 이름 | 키 포인 |
| 🏭 Producer | my_producer | outlets=[dataset] — Dataset을 갱신함 |
| 🧑💻 Consumer | my_consumer | schedule=[dataset] — Dataset이 바뀔 때 실행됨 |
Airflow Dataset은
“데이터가 갱신되면 다음 DAG이 자동 실행되게 만드는 연결 고리”
역할을 합니다.
즉, “스케줄 기반”이 아니라
데이터 기반으로 DAG을 연결하는 새로운 방식이에요
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow Worker별 Queue 설정하기 (0) | 2025.10.19 |
|---|---|
| Airflow Queue란? (0) | 2025.10.19 |
| Airflow의 Backfill & Catchup 이해하기 (0) | 2025.10.19 |
| Airflow schedule_interval 완벽 정리 (0) | 2025.10.19 |
| Airflow DAG Scheduling 이해 (0) | 2025.10.19 |