컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

[프로젝트] Docker Compose 기반 Airflow와 PostgreSQL로 구성한 SQL 중심 ETL 파이프라인

Data Jun 2026. 2. 9. 11:38

Airflow를 처음 접할 때 가장 헷갈리는 지점은
그래서 실제로 어떤 식으로 ETL을 구성해야 하지?”라는 부분입니다.

 

이 글에서는
Docker Compose로 Airflow를 띄우고, PostgresOperator와 SQL 파일만으로 ETL을 구성한 구조를 정리합니다.

복잡한 Python 로직 없이, SQL과 스케줄링의 역할 분리에 초점을 둔 구현입니다.

 

1. 전체 구조 한 번에 이해하기

이 파이프라인은 조립 라인과 유사합니다.

  • 원천 데이터는 그대로 가져오고
  • 중간에서 한 번 정리한 뒤
  • 최종 테이블에 안정적으로 적재

Airflow는 이 라인이 순서대로, 반복 가능하게 흐르도록 제어만 합니다

 

중요한 포인트는

“Airflow는 로직을 담당하지 않고, 흐름(스케줄링)만 관리한다”
라는 역할 분리입니다.

 

 

2. 구조 아키텍처

아키텍처 구조

 

3. 디렉터리 구조

airflow-etl/
├─ dags/
│ ├─ etl_dag.py # ETL DAG 정의
│ └─ sql/
│ ├─ extract_orders.sql
│ ├─ transform_orders.sql
│ └─ load_orders.sql
├─ scripts/
│ └─ setup.sh # Airflow 초기화 및 실행 스크립트
├─ docker-compose.yaml
├─ airflow.cfg
└─ README.md

 

 

4. Docker Compose로 Airflow 환경 구성

Airflow는 Docker Compose로 구성되어 있습니다.

  • 웹서버, 스케줄러, 메타DB가 컨테이너 단위로 분리
  • 로컬에서도 동일한 실행 환경 보장
  • 초기화와 실행을 Bash Script로 자동화

이 방식은
“환경 설정 → 실행”을 한 번에 처리하는 스위치를 만든 것과 같습니다.

bash scripts/setup.sh
  • env 환경 변수 로드
  • Airflow DB 초기화
  • 컨테이너 실행
  • PostgreSQL Connection 자동 등록
#!/bin/bash

set -e # 스크립트 실행 중 어떤 명령이든 하나라도 실패하면 즉시 스크립트를 중단(exit)시키는 옵션.

echo "▶ Airflow setup started"

# 1. 환경 변수 로드 (.env 파일: 파일 존재 및 일반파일 검사함)
if [ ! -f .env ]; then
  echo "❌ .env file not found"
  exit 1
fi

source .env # 현재 셀에 변 수를 로드함.

# 2. Airflow 초기화 (DB + 기본 계정 생성)
docker compose up airflow-init

# 3. Airflow 컨테이너 기동
docker compose up -d

# 4. Airflow Postgres Connection 생성
docker compose exec airflow-webserver airflow connections add my_postgres_connection \
    --conn-type postgres \
    --conn-host $POSTGRES_HOST \
    --conn-schema $POSTGRES_DB \
    --conn-login $POSTGRES_USER \
    --conn-password $POSTGRES_PASSWORD \
    --conn-port $POSTGRES_PORT || true # 이 줄에서 에러 나도 괜찮다, 계속 가라”는 의도 표현


echo "✅ Airflow setup completed"
echo "👉 Airflow UI: http://localhost:8080"
echo "👉 Login: $AIRFLOW_ADMIN_USER / $AIRFLOW_ADMIN_PASSWORD"

이 한 줄로 아래 작업이 모두 수행됩니다.

 

5. DAG 설계: SQL 중심 ETL

ETL은 하나의 DAG 안에서 Extract → Transform → Load 순서로 구성됩니다.

 

각 단계는 모두 PostgresOperator를 사용하며,
Python 코드 안에는 SQL 파일 경로만 정의합니다.

 

extract_task >> transform_task >> load_task
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

default_args = {"start_date": datetime(2025, 12, 30)}

with DAG(
    'postgres_etl',
    description='Extract, transform, and load data using PostgreSQL.',
    default_args=default_args,
    schedule='0 0 * * *',
    catchup=False
) as dag:

    extract_task = PostgresOperator(
        task_id = 'extract_from_postgres',
        postgres_conn_id='my_postgres_connection',
        sql="sql/extract_orders.sql"
    )

    transform_task = PostgresOperator(
        task_id = 'transform_from_postgres',
        postgres_conn_id='my_postgres_connection',
        sql="sql/transform_orders.sql"
    )

    load_task = PostgresOperator(
        task_id = 'load_from_postgres',
        postgres_conn_id='my_postgres_connection',
        sql="sql/load_orders.sql"
    )

    extract_task >> transform_task >> load_task

이는
파이프를 직렬로 연결한 구조와 같아
앞 단계가 끝나야 다음 단계가 흐를 수 있습니다.

 

Airflow는 이 의존성만 관리하고,
실제 데이터 처리는 전부 SQL이 담당합니다.

 

6. Load 단계에서의 핵심: Upsert 전략

# extract_order.sql

INSERT INTO staging.order (
    order_id,
    user_id,
    order_amount,
    order_time,
    status
)
SELECT  
    order_id,
    user_id,
    order_amount,
    order_time,
    status
FROM raw."order"
ON CONFLICT (order_id)
DO UPDATE SET
    user_id      = EXCLUDED.user_id,
    order_amount = EXCLUDED.order_amount,
    order_time   = EXCLUDED.order_time,
    status       = EXCLUDED.status;
# transform_orders.sql

UPDATE staging.order
SET order_time = NOW()
WHERE order_time IS NULL;
# load_orders.sql

INSERT INTO mart.order (
    order_id,
    user_id,
    order_amount,
    order_time,
    status 
)
SELECT 
    order_id,
    user_id,
    order_amount,
    order_time,
    status
FROM staging.order
ON CONFLICT (order_id)
DO UPDATE SET
    user_id      = EXCLUDED.user_id,
    order_amount = EXCLUDED.order_amount,
    order_time   = EXCLUDED.order_time,
    status        = EXCLUDED.status;

load_orders.sql에서는 ON CONFLICT 구문을 사용합니다.

이 방식의 핵심은 단순합니다.

  • 이미 존재하는 주문 → UPDATE
  • 새로 들어온 주문 → INSERT

즉, 배치를 여러 번 실행해도 데이터가 망가지지 않습니다.

 

이는 ETL에서 매우 중요한 특성으로,
*“실패해도 다시 실행할 수 있는 파이프라인”*을 만드는 기본 조건입니다.

 

7. 이 구조의 장점 정리

이 구현의 핵심은  역할 분리입니다.

  • Airflow: 스케줄링과 흐름 제어
  • SQL: 데이터 로직 전담
  • Bash: 실행 자동화

각 도구가 자기 역할만 수행하도록 설계되어
확장과 유지보수가 쉬운 구조를 만듭니다.

 

Docker Compose 기반 Airflow 환경에서 SQL 중심으로 PostgreSQL ETL을 구성하고, Bash 스크립트로 실행까지 자동화한 파이프라인 정리입니다.

 

깃허브 주소: AnalyzeGit/airflow-etl

 

GitHub - AnalyzeGit/airflow-etl

Contribute to AnalyzeGit/airflow-etl development by creating an account on GitHub.

github.com