Airflow를 처음 접할 때 가장 헷갈리는 지점은
“그래서 실제로 어떤 식으로 ETL을 구성해야 하지?”라는 부분입니다.
이 글에서는
Docker Compose로 Airflow를 띄우고, PostgresOperator와 SQL 파일만으로 ETL을 구성한 구조를 정리합니다.
복잡한 Python 로직 없이, SQL과 스케줄링의 역할 분리에 초점을 둔 구현입니다.
1. 전체 구조 한 번에 이해하기
이 파이프라인은 조립 라인과 유사합니다.
- 원천 데이터는 그대로 가져오고
- 중간에서 한 번 정리한 뒤
- 최종 테이블에 안정적으로 적재
Airflow는 이 라인이 순서대로, 반복 가능하게 흐르도록 제어만 합니다
중요한 포인트는
“Airflow는 로직을 담당하지 않고, 흐름(스케줄링)만 관리한다”
라는 역할 분리입니다.
2. 구조 아키텍처

3. 디렉터리 구조
airflow-etl/
├─ dags/
│ ├─ etl_dag.py # ETL DAG 정의
│ └─ sql/
│ ├─ extract_orders.sql
│ ├─ transform_orders.sql
│ └─ load_orders.sql
├─ scripts/
│ └─ setup.sh # Airflow 초기화 및 실행 스크립트
├─ docker-compose.yaml
├─ airflow.cfg
└─ README.md
4. Docker Compose로 Airflow 환경 구성
Airflow는 Docker Compose로 구성되어 있습니다.
- 웹서버, 스케줄러, 메타DB가 컨테이너 단위로 분리
- 로컬에서도 동일한 실행 환경 보장
- 초기화와 실행을 Bash Script로 자동화
이 방식은
“환경 설정 → 실행”을 한 번에 처리하는 스위치를 만든 것과 같습니다.
bash scripts/setup.sh
- env 환경 변수 로드
- Airflow DB 초기화
- 컨테이너 실행
- PostgreSQL Connection 자동 등록
#!/bin/bash
set -e # 스크립트 실행 중 어떤 명령이든 하나라도 실패하면 즉시 스크립트를 중단(exit)시키는 옵션.
echo "▶ Airflow setup started"
# 1. 환경 변수 로드 (.env 파일: 파일 존재 및 일반파일 검사함)
if [ ! -f .env ]; then
echo "❌ .env file not found"
exit 1
fi
source .env # 현재 셀에 변 수를 로드함.
# 2. Airflow 초기화 (DB + 기본 계정 생성)
docker compose up airflow-init
# 3. Airflow 컨테이너 기동
docker compose up -d
# 4. Airflow Postgres Connection 생성
docker compose exec airflow-webserver airflow connections add my_postgres_connection \
--conn-type postgres \
--conn-host $POSTGRES_HOST \
--conn-schema $POSTGRES_DB \
--conn-login $POSTGRES_USER \
--conn-password $POSTGRES_PASSWORD \
--conn-port $POSTGRES_PORT || true # 이 줄에서 에러 나도 괜찮다, 계속 가라”는 의도 표현
echo "✅ Airflow setup completed"
echo "👉 Airflow UI: http://localhost:8080"
echo "👉 Login: $AIRFLOW_ADMIN_USER / $AIRFLOW_ADMIN_PASSWORD"
이 한 줄로 아래 작업이 모두 수행됩니다.
5. DAG 설계: SQL 중심 ETL
ETL은 하나의 DAG 안에서 Extract → Transform → Load 순서로 구성됩니다.
각 단계는 모두 PostgresOperator를 사용하며,
Python 코드 안에는 SQL 파일 경로만 정의합니다.
extract_task >> transform_task >> load_task
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
default_args = {"start_date": datetime(2025, 12, 30)}
with DAG(
'postgres_etl',
description='Extract, transform, and load data using PostgreSQL.',
default_args=default_args,
schedule='0 0 * * *',
catchup=False
) as dag:
extract_task = PostgresOperator(
task_id = 'extract_from_postgres',
postgres_conn_id='my_postgres_connection',
sql="sql/extract_orders.sql"
)
transform_task = PostgresOperator(
task_id = 'transform_from_postgres',
postgres_conn_id='my_postgres_connection',
sql="sql/transform_orders.sql"
)
load_task = PostgresOperator(
task_id = 'load_from_postgres',
postgres_conn_id='my_postgres_connection',
sql="sql/load_orders.sql"
)
extract_task >> transform_task >> load_task
이는
파이프를 직렬로 연결한 구조와 같아
앞 단계가 끝나야 다음 단계가 흐를 수 있습니다.
Airflow는 이 의존성만 관리하고,
실제 데이터 처리는 전부 SQL이 담당합니다.
6. Load 단계에서의 핵심: Upsert 전략
# extract_order.sql
INSERT INTO staging.order (
order_id,
user_id,
order_amount,
order_time,
status
)
SELECT
order_id,
user_id,
order_amount,
order_time,
status
FROM raw."order"
ON CONFLICT (order_id)
DO UPDATE SET
user_id = EXCLUDED.user_id,
order_amount = EXCLUDED.order_amount,
order_time = EXCLUDED.order_time,
status = EXCLUDED.status;
# transform_orders.sql
UPDATE staging.order
SET order_time = NOW()
WHERE order_time IS NULL;
# load_orders.sql
INSERT INTO mart.order (
order_id,
user_id,
order_amount,
order_time,
status
)
SELECT
order_id,
user_id,
order_amount,
order_time,
status
FROM staging.order
ON CONFLICT (order_id)
DO UPDATE SET
user_id = EXCLUDED.user_id,
order_amount = EXCLUDED.order_amount,
order_time = EXCLUDED.order_time,
status = EXCLUDED.status;
load_orders.sql에서는 ON CONFLICT 구문을 사용합니다.
이 방식의 핵심은 단순합니다.
- 이미 존재하는 주문 → UPDATE
- 새로 들어온 주문 → INSERT
즉, 배치를 여러 번 실행해도 데이터가 망가지지 않습니다.
이는 ETL에서 매우 중요한 특성으로,
*“실패해도 다시 실행할 수 있는 파이프라인”*을 만드는 기본 조건입니다.
7. 이 구조의 장점 정리
이 구현의 핵심은 역할 분리입니다.
- Airflow: 스케줄링과 흐름 제어
- SQL: 데이터 로직 전담
- Bash: 실행 자동화
각 도구가 자기 역할만 수행하도록 설계되어
확장과 유지보수가 쉬운 구조를 만듭니다.
Docker Compose 기반 Airflow 환경에서 SQL 중심으로 PostgreSQL ETL을 구성하고, Bash 스크립트로 실행까지 자동화한 파이프라인 정리입니다.
깃허브 주소: AnalyzeGit/airflow-etl
GitHub - AnalyzeGit/airflow-etl
Contribute to AnalyzeGit/airflow-etl development by creating an account on GitHub.
github.com
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow에서 Worker에만 pip 설치 가능한 경우의 해결 전략 (0) | 2026.02.24 |
|---|---|
| [프로젝트] Apache Airflow 기반 해양 센서 데이터 ETL 자동화 설계 (0) | 2026.02.09 |
| Airflow Task 상태를 기반으로 ETL 적재 성공·실패를 판단하는 방법 (0) | 2026.02.03 |
| Airflow PythonOperator에서 params로 값을 전달하고 함수에서 사용하는 방법 (0) | 2026.02.03 |
| trigger_rule="all_done" 이란 무엇인가 (0) | 2026.02.03 |