컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow Dataset 완전 정리

Data Jun 2025. 10. 19. 10:52

Dataset이란?

Airflow 2.4부터 도입된 개념으로,
데이터 파일이나 테이블 등 “데이터 그 자체”를 DAG 간 연결의 중심으로 삼는 방법입니다.

즉, “데이터가 갱신되면 다른 DAG을 실행시키자”
이런 형태의 데이터 중심 스케줄링을 가능하게 하는 기능이에요.

 

Producer & Consumer 개념

Dataset은 데이터 생산자(Producer)소비자(Consumer) DAG을 연결합니다.

  • 🏭 Producer DAG → 어떤 데이터 파일이나 테이블을 만들거나 갱신하는 역할
  • 🧑‍💻 Consumer DAG → 그 데이터가 업데이트될 때 자동 실행되는 DAG

👉 데이터가 갱신되면, Consumer DAG이 “아, 새로운 데이터가 왔네!” 하면서 자동으로 실행되는 구조예요.

 

 

코드로 이해하기

 (1) Dataset 정의

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

 

  • Dataset() 은 데이터를 URI 형식 문자열로 정의합니다.
  • 보통 파일 경로나 테이블명을 넣어요.
  • 정규식(Regex)은 지원하지 않습니다 — 단순 문자열로만 인식합니다.

 

 

(2) Producer DAG

from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from datetime import datetime

local_file = Dataset("/tmp/sample.txt")

with DAG(
    dag_id="my_producer",
    schedule="0 0 * * *",  # 매일 자정 실행
    start_date=datetime(2023, 7, 1),
    catchup=False
) as dag:

    task_producer = BashOperator(
        task_id="producer1",
        bash_command="echo 'hello world' >> /tmp/sample.txt",
        outlets=[local_file]  # 여기서 Dataset을 "생산"
    )​

핵심 포인트:

  • outlets=[local_file]
    → 이 DAG이 어떤 Dataset을 갱신하는지 Airflow에 알려주는 부분입니다.
  • 즉, /tmp/sample.txt 파일이 갱신되면
    이 Dataset이 “업데이트됨” 으로 표시됩니다.

 

(3) Consumer DAG

from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from datetime import datetime

local_file = Dataset("/tmp/sample.txt")

with DAG(
    dag_id="my_consumer",
    schedule=[local_file],  # Dataset을 스케줄 트리거로 사용!
    start_date=datetime(2023, 7, 1),
    catchup=False
) as dag:

    task_consumer = BashOperator(
        task_id="consumer1",
        bash_command="cat /tmp/sample.txt"
    )

핵심 포인트:

  • schedule=[local_file]
    → 이 DAG은 스케줄 주기 없이,
    오직 해당 Dataset이 업데이트될 때만 자동 실행됩니다.
  • 즉, my_producer DAG이 /tmp/sample.txt를 수정하면
    Airflow가 자동으로 my_consumer DAG을 트리거합니다.

 

동작 순서 (ASCII 흐름도)

+---------------------+
|   my_producer DAG   |
|  (echo >> sample.txt) |
+----------+----------+
           |
           v
  [ /tmp/sample.txt ]  --->  Dataset Updated!
           |
           v
+---------------------+
|   my_consumer DAG   |
|  (cat sample.txt)   |
+---------------------+

 

 

 

정리하면

역할 DAG 이름 키 포인
🏭 Producer my_producer outlets=[dataset] — Dataset을 갱신함
🧑‍💻 Consumer my_consumer schedule=[dataset] — Dataset이 바뀔 때 실행됨
Airflow Dataset은
“데이터가 갱신되면 다음 DAG이 자동 실행되게 만드는 연결 고리”
역할을 합니다.

 

즉, “스케줄 기반”이 아니라
데이터 기반으로 DAG을 연결하는 새로운 방식이에요