컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow에서 PostgresOperator만으로 ETL을 한다는 것은

Data Jun 2025. 12. 30. 18:05

처음 Airflow를 접하면 ETL을 이렇게 생각하기 쉽다.

“Extract = 데이터를 Airflow로 가져오는 것”
“Transform = Python에서 가공”
“Load = 다시 DB에 넣기”

하지만 PostgresOperator만 사용하는 ETL은 이 생각을 완전히 바꿔준다,

 

핵심 관점 전환

Airflow는 데이터를 처리하지 않는다.
Airflow는 DB에게 ‘언제 어떤 SQL을 실행할지’만 지시한다.

즉,

  • 데이터는 항상 DB 안에 있고
  • ETL은 SQL로 수행
  • Airflow는 순서와 실행만 관리

 

1. 우리가 선택한 ETL 구조

이번 연습에서 사용한 구조는 실무에서 가장 흔한 형태다.

raw.orders      (원본 A)
   ↓ Extract
staging.orders  (기준 A′)
   ↓ Transform
mart.orders     (결과 B)
  • raw : 원본 데이터 (건드리지 않음)
  • staging : ETL 기준 데이터 (가공 OK)
  • mart : 최종 결과 (분석/서빙용)

ETL을 안전하게 하려면 최소 이 3단계가 가장 깔끔하다.

 

2. PostgresOperator로 ETL을 나누는 이유

Airflow DAG에서는 PostgresOperator 3개만 사용했다.

extract_orders   → transform_orders → load_orders

각 Task는 역할이 명확하다.

단계 역할
Extract 데이터를 읽는 것이 아니라 기준 상태를 만드는 것 
Transform 기준 데이터(staging)를 가공
Load 최종 결과(mart)에 반영

 

3. 가장 헷갈렸던 Extract 개념

처음 가장 헷갈렸던 질문은 이거였다.

“그냥 SELECT로 가져오면 되는데,
왜 굳이 INSERT를 하지?”

SELECT * FROM raw.orders;

 

  • 이건 잠깐 조회
  • DB에 아무것도 남지 않음
  • 다음 단계가 참조할 수 없음
INSERT INTO staging.orders (...)
SELECT ...
FROM raw.orders;

Extract란
데이터를 ‘가져오는 것’이 아니라
데이터를 DB에 ‘기억으로 남기는 것’이다.

 

이 시점의 데이터를 ETL 기준 상태(staging) 로 고정하는 게 목적이다.

 

4. Extract SQL 전체 코드

이번에 사용한 Extract 쿼리는 아래와 같다.

INSERT INTO staging.orders (
    order_id,
    user_id,
    order_amount,
    order_time,
    status
)
SELECT
    order_id,
    user_id,
    order_amount,
    order_time,
    status
FROM raw.orders
ON CONFLICT (order_id)
DO UPDATE SET
    user_id      = EXCLUDED.user_id,
    order_amount = EXCLUDED.order_amount,
    order_time   = EXCLUDED.order_time,
    status       = EXCLUDED.status;

 

 

5. 여기서 가장 많이 헷갈리는 포인트들 

1️⃣ INSERT + SELECT는 서브쿼리가 아니다

INSERT INTO table
SELECT ...
FROM other_table;

 

  • ❌ 서브쿼리 아님
  • SELECT 결과를 INSERT의 입력값으로 사용하는 구조

2️⃣ ON CONFLICT는 SELECT에 붙는 게 아니다

ON CONFLICT (order_id)
DO UPDATE SET ...

 

  • ❌ SELECT 옵션 아님
  • INSERT의 옵션

 

즉 구조는 항상 이렇다.

INSERT
 ├─ VALUES 또는 SELECT
 └─ ON CONFLICT
     └─ DO UPDATE

 

 

3️⃣ EXCLUDED는 무엇인가?

EXCLUDED는 이 의미다.

“INSERT하려다 중복 때문에 들어가지 못한 ‘새 데이터’”

즉,

  • 기존 데이터 ❌
  • 새로 들어오려던 값 ⭕

그래서 이렇게 쓴다.

status = EXCLUDED.status

기존 행의 값을, 새 값으로 덮어쓴다

 

6. PostgresOperator ETL의 핵심 정리

Airflow + PostgresOperator 기반 ETL의 본질은 이것이다.

  • Airflow는 데이터를 들고 다니지 않는다
  • 데이터는 항상 DB 안에 있다
  • ETL은 SQL로 수행한다
  • Extract는 “조회”가 아니라 “기준 상태 생성”
  • EXCLUDED는 “배제된 새 값”
  • ON CONFLICT는 INSERT의 일부

 

7. 정리하면

PostgresOperator만으로 ETL을 구성하면
ETL의 본질(SQL, 데이터 흐름, 기준 상태)이
가장 또렷하게 보인다.

 

지금 이 과정을 한 번 제대로 이해하면
Python ETL, Spark, DBT로 넘어가도
개념이 흔들리지 않는다