컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow로 Star Wars API 데이터를 PostgreSQL에 적재하기

Data Jun 2025. 10. 19. 09:51

이번 글에서는 Apache Airflow를 이용해
외부 API에서 데이터를 가져오고, CSV로 저장한 뒤
PostgreSQL 데이터베이스에 자동으로 적재하는 과정을 만들어봅니다.

 

 

1. DAG 전체 구조

전체 흐름은 다음과 같습니다.

┌──────────────────────┐
│ ① create_table_op    │
│ (PostgresOperator)   │
│ └ 테이블 생성         │
└────────────┬─────────┘
             │
             ▼
┌──────────────────────┐
│ ② get_op             │
│ (SimpleHttpOperator) │
│ └ Star Wars API 호출  │
└────────────┬─────────┘
             │
             ▼
┌──────────────────────┐
│ ③ extract_data_op    │
│ (PythonOperator)     │
│ └ JSON → CSV 변환     │
└────────────┬─────────┘
             │
             ▼
┌──────────────────────┐
│ ④ store_op           │
│ (PythonOperator +    │
│  PostgresHook)       │
│ └ CSV → PostgreSQL 적재 │
└──────────────────────┘


create_table_op → get_op → extract_data_op → store_op

API → JSON 응답 → CSV 저장 → PostgreSQL 로드
이 모든 단계를 Airflow DAG이 순차적으로 자동 실행합니다 🚀
  • 테이블 생성 (PostgresOperator)
    PostgreSQL에 데이터를 저장할 테이블이 없다면 새로 만듭니다.
  • API 호출 (SimpleHttpOperator)
    Star Wars API로부터 캐릭터 정보를 요청합니다.
  • 데이터 추출 및 CSV 변환 (PythonOperator)
    응답 데이터를 CSV 파일로 변환해 /tmp/ 경로에 저장합니다.
  • 데이터 적재 (PostgresHook)
    CSV 파일을 PostgreSQL 테이블에 빠르게 적재합니다.

 

 

2. DAG 코드 분석

1️⃣ 테이블 생성

task_create_table_op = PostgresOperator(
    task_id='create_table_op',
    postgres_conn_id='meta_db',
    sql="""
    CREATE TABLE IF NOT EXISTS starwars_character (
        name TEXT NOT NULL,
        height TEXT NOT NULL,
        mass TEXT NOT NULL
    );
    """
)
  • PostgresOperator 는 SQL문을 그대로 실행하는 Operator입니다.
  • 이 태스크는 starwars_character 테이블이 없으면 자동으로 생성합니다.

2️⃣ Star Wars API 호출

task_get_op = SimpleHttpOperator(
    task_id="get_op",
    http_conn_id="my_http_connection",
    endpoint="/api/people/1/",
    headers={"Content-Type": "application/json"},
    method='GET',
    response_filter=lambda response: json.loads(response.text),
    log_response=True
)
  • SimpleHttpOperator는 REST API를 쉽게 호출할 수 있는 Operator입니다.
  • http_conn_id는 Airflow Connection 설정에 등록한 API 기본 URL을 사용합니다.
  • response_filter를 통해 응답(JSON)을 바로 Python 객체로 변환합니다.
  • API 결과 예시:
{
  "name": "Luke Skywalker",
  "height": "172",
  "mass": "77",
  ...
}

 

3️⃣ 데이터 추출 및 CSV로 저장

def _extract_data(**context):
    response = context['ti'].xcom_pull(task_ids='get_op')
    starwars_character = json_normalize({
        "name": response['name'],
        "height": response["height"],
        "mass": response["mass"]
    })
    starwars_character.to_csv("/tmp/starwars_character.csv", header=False, index=None)
  • 이전 단계(get_op)의 API 결과를 XCom으로 받아옵니다.
  • 필요한 필드만 추출 (name, height, mass)
  • pandas.json_normalize()로 테이블 형태로 변환 후 CSV로 저장합니다.

📁 생성되는 파일 예시:

/tmp/starwars_character.csv
Luke Skywalker,172,77

 

4️⃣ PostgreSQL에 데이터 적재

def _store_character(**context):
    hook = PostgresHook(postgres_conn_id='meta_db')
    hook.copy_expert(
        filename="/tmp/starwars_character.csv",
        sql="COPY starwars_character FROM stdin WITH DELIMITER as ','"
    )
  • PostgresHook 은 실제 PostgreSQL DB에 연결하는 클래스입니다.
  • copy_expert() 메서드는 PostgreSQL의 COPY 명령을 이용해
    CSV 파일을 빠르게 테이블로 적재합니다.
  • INSERT보다 훨씬 빠르며, 대용량 데이터 로딩 시 유용합니다.

5️⃣ 태스크 연결

task_create_table_op >> task_get_op >> task_extract_data_op >> task_store_op
  • >> 연산자는 태스크 간 실행 순서를 정의합니다.
  • 즉, 테이블 생성 → API 호출 → CSV 생성 → DB 적재 순서로 실행됩니다.

 

3. 전체 DAG 코드 한눈에 보기

with DAG(
    'http_dag',
    description='http dag',
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 7, 1),
    catchup=False
) as dag:

    task_create_table_op = PostgresOperator(
        task_id='create_table_op',
        postgres_conn_id='meta_db',
        sql="""
        CREATE TABLE IF NOT EXISTS starwars_character (
            name TEXT NOT NULL,
            height TEXT NOT NULL,
            mass TEXT NOT NULL
        );
        """
    )

    task_get_op = SimpleHttpOperator(
        task_id="get_op",
        http_conn_id="my_http_connection",
        endpoint="/api/people/1/",
        headers={"Content-Type": "application/json"},
        method='GET',
        response_filter=lambda response: json.loads(response.text),
        log_response=True
    )

    task_extract_data_op = PythonOperator(
        task_id="extract_data_op",
        python_callable=_extract_data,
        provide_context=True
    )

    task_store_op = PythonOperator(
        task_id="store_op",
        python_callable=_store_character
    )

    task_create_table_op >> task_get_op >> task_extract_data_op >> task_store_op

 

 

 

정리하면

 

이 DAG은 Airflow에서 흔히 사용하는 ETL(Extract → Transform → Load) 구조의 기본 예시입니다.

  • Extract → 외부 API 호출
  • Transform → JSON → CSV 가공
  • Load → DB 적재

 

핵심 포인트:

단순 SQL은 PostgresOperator,
파일 적재나 복잡한 로직은 PythonOperator + Hook 조합이 유리하다!