시스템 개발 및 관리/Dagster 사용법

Dagster 자산 완벽 해부: DuckDB를 활용한 products 테이블 생성 및 미리보기

Data Jun 2025. 5. 5. 23:30

Dagster는 데이터 파이프라인을 선언적으로 정의하고 관리하는 강력한 도구입니다. 이번 블로그 포스팅에서는 Dagster의 핵심 개념 중 하나인 자산(Asset) 을 사용하여 DuckDB 데이터베이스에 products 테이블을 생성하고, 그 결과를 Dagster UI에서 효과적으로 확인하는 방법을 자세히 살펴보겠습니다.

 

@dg.asset(
    compute_kind="duckdb",
    group_name="ingestion",
)
def products(duckdb: DuckDBResource) -> dg.MaterializeResult:
    with duckdb.get_connection() as conn:
        conn.execute(
            """
            create or replace table products as (
                select * from read_csv_auto('data/products.csv')
            )
            """
        )

        preview_query = "select * from products limit 10"
        preview_df = conn.execute(preview_query).fetchdf()
        row_count = conn.execute("select count(*) from products").fetchone()
        count = row_count[0] if row_count else 0

        return dg.MaterializeResult(
            metadata={
                "row_count": dg.MetadataValue.int(count),
                "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
            }
        )

코드 분석:

  1. @dg.asset(...) 데코레이터:
    • @dg.asset 데코레이터는 해당 파이썬 함수(products)를 Dagster의 자산으로 정의합니다. 자산은 Dagster가 관리하는 데이터의 논리적 단위이며, 테이블, 파일, 머신러닝 모델 등 다양한 형태를 가질 수 있습니다.
    • compute_kind="duckdb": 이 속성은 해당 자산을 생성하는 연산이 DuckDB 엔진을 사용한다는 것을 Dagster에 알립니다. Dagster UI에서 이 자산의 실행 정보를 추적하고 관리하는 데 도움이 됩니다.
    • group_name="ingestion": 이 속성은 Dagster UI에서 자산을 논리적으로 그룹화하는 데 사용됩니다. 여기서는 데이터 수집(ingestion) 관련 자산임을 나타냅니다.
  2. def products(duckdb: DuckDBResource) -> dg.MaterializeResult: 함수 정의:
    • products: 자산의 이름입니다. Dagster UI 및 코드에서 이 이름으로 자산을 참조합니다. 일반적인 파이썬 함수는 load_data()와 같이 동사로 시작하는 경향이 있지만, @dg.asset 데코레이터가 적용된 함수의 이름은 products와 같이 명사 형태를 사용하는 것이 Dagster의 컨벤션에 부합하며, 코드의 의미를 더 명확하게 전달하고 UI에서의 가독성을 높이는 좋은 방법입니다.
    • duckdb: DuckDBResource: 이 부분은 의존성 주입(Dependency Injection) 을 보여줍니다. Dagster는 resources 정의에 있는 DuckDBResource 인스턴스를 products 함수의 인자로 자동으로 전달합니다. 이를 통해 자산 함수 내에서 DuckDB 데이터베이스와 상호작용할 수 있습니다. 타입 힌팅(DuckDBResource)은 코드의 가독성을 높이고 정적 분석 도구의 활용을 돕습니다.
    • -> dg.MaterializeResult: 이 부분은 함수의 반환 타입 힌트입니다. Dagster 자산 함수는 일반적으로 dg.MaterializeResult 객체를 반환하여 자산의 구체화 결과와 관련 메타데이터를 Dagster에 보고합니다.
  3. with duckdb.get_connection() as conn::
    • duckdb.get_connection() 메서드를 사용하여 DuckDB 데이터베이스에 대한 연결(conn)을 안전하게 획득합니다. with 구문을 사용했으므로 블록이 종료되면 연결이 자동으로 닫힙니다.
  4. conn.execute(...):
    • conn.execute() 메서드를 사용하여 DuckDB 데이터베이스에 SQL 쿼리를 실행합니다.
    • create or replace table products as ( select * from read_csv_auto('data/products.csv') ): 이 SQL 쿼리는 data 폴더에 있는 products.csv 파일을 자동으로 읽어들여 products 라는 이름의 새로운 테이블을 생성하거나 기존 테이블이 있다면 덮어씁니다. read_csv_auto 함수는 CSV 파일의 구조를 자동으로 추론하여 테이블을 생성하는 편리한 DuckDB 기능입니다.
  5. 데이터 미리보기 및 행 개수 조회:
    • preview_query = "select * from products limit 10": products 테이블의 처음 10개 행을 조회하는 SQL 쿼리를 정의합니다.
    • preview_df = conn.execute(preview_query).fetchdf(): 쿼리를 실행하고 결과를 Pandas DataFrame(preview_df)으로 가져옵니다. 이는 Dagster UI에서 테이블 내용을 미리보기 형태로 보여주기 위함입니다.
    • row_count = conn.execute("select count(*) from products").fetchone(): products 테이블의 총 행 개수를 조회하는 SQL 쿼리를 실행하고 결과를 가져옵니다.
    • count = row_count[0] if row_count else 0: 조회 결과에서 행 개수를 추출합니다. 결과가 없는 경우 기본값으로 0을 설정합니다.
  6. return dg.MaterializeResult(...):
    • dg.MaterializeResult 객체를 생성하여 자산의 구체화 결과를 Dagster에 보고합니다.
    • metadata={...}: metadata 속성을 사용하여 자산에 대한 추가 정보를 Dagster UI에 제공합니다.
      • "row_count": dg.MetadataValue.int(count): products 테이블의 총 행 개수를 정수 형태의 메타데이터로 Dagster에 전달합니다. Dagster UI에서 이 정보를 확인할 수 있습니다.
      • "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)): products 테이블의 처음 10개 행을 Markdown 형태로 변환하여 Dagster UI에 미리보기 형태로 표시합니다. 이를 통해 데이터의 내용을 빠르게 확인할 수 있습니다.

결론:

이 코드는 Dagster를 사용하여 DuckDB 데이터베이스를 관리하는 기본적인 워크플로우를 보여줍니다. @dg.asset 데코레이터를 통해 데이터 처리 단위를 자산으로 정의하고, DuckDBResource를 통해 DuckDB와 쉽게 상호작용할 수 있습니다. 특히 dg.MaterializeResult의 metadata를 활용하여 자산의 실행 결과를 Dagster UI에 풍부하게 시각화함으로써 데이터 파이프라인의 이해도와 관리 효율성을 높일 수 있습니다. 이 코드를 기반으로 더 복잡한 데이터 처리 로직과 다양한 Dagster 기능을 통합하여 강력한 데이터 파이프라인을 구축해 보세요.