컨테이너·워크플로우 자동화/Dagster 사용법

자산 팩토리란? 반복되는 자산을 함수로 자동화하기

Data Jun 2025. 6. 10. 23:08

Dagster로 데이터 파이프라인을 개발하다 보면, 비슷한 구조의 자산(asset)을 여러 개 만들어야 하는 상황을 자주 마주치게 됩니다.
예를 들어, 서로 다른 파일에 CSV 데이터를 저장하는 자산을 만들 때, 자산 정의만 반복되는 경우가 생기죠.

이럴 때 유용한 패턴이 바로 **자산 팩토리(asset factory)**입니다.

 

 

자산 팩토리란?

📌 자산(asset)을 생성하는 함수를 만들어, 유사한 자산을 반복적으로 생성하는 구조

즉, 자산 정의를 함수화해서 매개변수만 바꿔 여러 자산을 만들어낼 수 있도록 하는 패턴입니다.

 

예시: CSV 저장 자산을 반복 생성하는 팩토리

from dagster import asset, Definitions, AssetExecutionContext
from typing import Callable
import pandas as pd

def make_csv_writer_asset(asset_name: str, file_path: str, get_df_fn) -> Callable[[AssetExecutionContext], str]:
    @asset(name=asset_name)
    def csv_writer_asset(context):
        df = get_df_fn()
        df.to_csv(file_path, index=False)
        context.log.info(f"✅ CSV saved to {file_path}")
        return f"Saved: {file_path}"
    
    return csv_writer_asset

이 함수는 다음과 같은 파라미터를 받아 자산을 생성합니다:

  • asset_name: 자산의 이름
  • file_path: CSV 저장 경로
  • get_df_fn: DataFrame을 반환하는 함수

 

자산 팩토리로 여러 자산 만들기

def generate_customer_data():
    return pd.DataFrame({
        "name": ["Alice", "Bob", "Charlie"],
        "age": [30, 25, 35]
    })

defs = Definitions(
    assets=[
        make_csv_writer_asset(
            asset_name="save_customer_csv",
            file_path="./customers.csv",
            get_df_fn=generate_customer_data
        ),
        make_csv_writer_asset(
            asset_name="save_customer_csv_v2",
            file_path="./customers_v2.csv",
            get_df_fn=generate_customer_data
        ),
    ]
)

이처럼 동일한 구조의 자산을 파라미터만 바꿔 여러 개 생성할 수 있어 코드 중복이 확 줄어듭니다.

 

자산 팩토리를 쓰는 이유

 

 

마무리

자산 팩토리는 복잡한 자산 로직을 더 명확하고 재사용 가능한 구조로 관리할 수 있게 해주는 실용적인 방법입니다.
Dagster를 통해 데이터 파이프라인을 설계한다면, 자산 팩토리 패턴을 꼭 한 번 적용해보세요!