이번 글에서는 Apache Airflow를 이용해
외부 API에서 데이터를 가져오고, CSV로 저장한 뒤
PostgreSQL 데이터베이스에 자동으로 적재하는 과정을 만들어봅니다.
1. DAG 전체 구조
전체 흐름은 다음과 같습니다.
┌──────────────────────┐
│ ① create_table_op │
│ (PostgresOperator) │
│ └ 테이블 생성 │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ ② get_op │
│ (SimpleHttpOperator) │
│ └ Star Wars API 호출 │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ ③ extract_data_op │
│ (PythonOperator) │
│ └ JSON → CSV 변환 │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ ④ store_op │
│ (PythonOperator + │
│ PostgresHook) │
│ └ CSV → PostgreSQL 적재 │
└──────────────────────┘
create_table_op → get_op → extract_data_op → store_op
API → JSON 응답 → CSV 저장 → PostgreSQL 로드
이 모든 단계를 Airflow DAG이 순차적으로 자동 실행합니다 🚀
- 테이블 생성 (PostgresOperator)
PostgreSQL에 데이터를 저장할 테이블이 없다면 새로 만듭니다. - API 호출 (SimpleHttpOperator)
Star Wars API로부터 캐릭터 정보를 요청합니다. - 데이터 추출 및 CSV 변환 (PythonOperator)
응답 데이터를 CSV 파일로 변환해 /tmp/ 경로에 저장합니다. - 데이터 적재 (PostgresHook)
CSV 파일을 PostgreSQL 테이블에 빠르게 적재합니다.
2. DAG 코드 분석
1️⃣ 테이블 생성
task_create_table_op = PostgresOperator(
task_id='create_table_op',
postgres_conn_id='meta_db',
sql="""
CREATE TABLE IF NOT EXISTS starwars_character (
name TEXT NOT NULL,
height TEXT NOT NULL,
mass TEXT NOT NULL
);
"""
)
- PostgresOperator 는 SQL문을 그대로 실행하는 Operator입니다.
- 이 태스크는 starwars_character 테이블이 없으면 자동으로 생성합니다.
2️⃣ Star Wars API 호출
task_get_op = SimpleHttpOperator(
task_id="get_op",
http_conn_id="my_http_connection",
endpoint="/api/people/1/",
headers={"Content-Type": "application/json"},
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
- SimpleHttpOperator는 REST API를 쉽게 호출할 수 있는 Operator입니다.
- http_conn_id는 Airflow Connection 설정에 등록한 API 기본 URL을 사용합니다.
- response_filter를 통해 응답(JSON)을 바로 Python 객체로 변환합니다.
- API 결과 예시:
{
"name": "Luke Skywalker",
"height": "172",
"mass": "77",
...
}
3️⃣ 데이터 추출 및 CSV로 저장
def _extract_data(**context):
response = context['ti'].xcom_pull(task_ids='get_op')
starwars_character = json_normalize({
"name": response['name'],
"height": response["height"],
"mass": response["mass"]
})
starwars_character.to_csv("/tmp/starwars_character.csv", header=False, index=None)
- 이전 단계(get_op)의 API 결과를 XCom으로 받아옵니다.
- 필요한 필드만 추출 (name, height, mass)
- pandas.json_normalize()로 테이블 형태로 변환 후 CSV로 저장합니다.
📁 생성되는 파일 예시:
/tmp/starwars_character.csv
Luke Skywalker,172,77
4️⃣ PostgreSQL에 데이터 적재
def _store_character(**context):
hook = PostgresHook(postgres_conn_id='meta_db')
hook.copy_expert(
filename="/tmp/starwars_character.csv",
sql="COPY starwars_character FROM stdin WITH DELIMITER as ','"
)
- PostgresHook 은 실제 PostgreSQL DB에 연결하는 클래스입니다.
- copy_expert() 메서드는 PostgreSQL의 COPY 명령을 이용해
CSV 파일을 빠르게 테이블로 적재합니다. - INSERT보다 훨씬 빠르며, 대용량 데이터 로딩 시 유용합니다.
5️⃣ 태스크 연결
task_create_table_op >> task_get_op >> task_extract_data_op >> task_store_op
- >> 연산자는 태스크 간 실행 순서를 정의합니다.
- 즉, 테이블 생성 → API 호출 → CSV 생성 → DB 적재 순서로 실행됩니다.
3. 전체 DAG 코드 한눈에 보기
with DAG(
'http_dag',
description='http dag',
schedule_interval='0 0 * * *',
start_date=datetime(2023, 7, 1),
catchup=False
) as dag:
task_create_table_op = PostgresOperator(
task_id='create_table_op',
postgres_conn_id='meta_db',
sql="""
CREATE TABLE IF NOT EXISTS starwars_character (
name TEXT NOT NULL,
height TEXT NOT NULL,
mass TEXT NOT NULL
);
"""
)
task_get_op = SimpleHttpOperator(
task_id="get_op",
http_conn_id="my_http_connection",
endpoint="/api/people/1/",
headers={"Content-Type": "application/json"},
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
task_extract_data_op = PythonOperator(
task_id="extract_data_op",
python_callable=_extract_data,
provide_context=True
)
task_store_op = PythonOperator(
task_id="store_op",
python_callable=_store_character
)
task_create_table_op >> task_get_op >> task_extract_data_op >> task_store_op
정리하면
이 DAG은 Airflow에서 흔히 사용하는 ETL(Extract → Transform → Load) 구조의 기본 예시입니다.
- Extract → 외부 API 호출
- Transform → JSON → CSV 가공
- Load → DB 적재
핵심 포인트:
단순 SQL은 PostgresOperator,
파일 적재나 복잡한 로직은 PythonOperator + Hook 조합이 유리하다!
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow schedule_interval 완벽 정리 (0) | 2025.10.19 |
|---|---|
| Airflow DAG Scheduling 이해 (0) | 2025.10.19 |
| Airflow ExternalTaskMarker 이해하기 (0) | 2025.10.19 |
| Airflow ExternalTaskSensor 이해하기 (0) | 2025.10.19 |
| Airflow에서 한 DAG이 다른 DAG을 트리거(Trigger)하기 (0) | 2025.10.18 |