Dagster는 데이터 파이프라인을 선언적으로 정의하고 관리하는 강력한 도구입니다. 이번 블로그 포스팅에서는 Dagster의 핵심 개념 중 하나인 자산(Asset) 을 사용하여 DuckDB 데이터베이스에 products 테이블을 생성하고, 그 결과를 Dagster UI에서 효과적으로 확인하는 방법을 자세히 살펴보겠습니다.
@dg.asset(
compute_kind="duckdb",
group_name="ingestion",
)
def products(duckdb: DuckDBResource) -> dg.MaterializeResult:
with duckdb.get_connection() as conn:
conn.execute(
"""
create or replace table products as (
select * from read_csv_auto('data/products.csv')
)
"""
)
preview_query = "select * from products limit 10"
preview_df = conn.execute(preview_query).fetchdf()
row_count = conn.execute("select count(*) from products").fetchone()
count = row_count[0] if row_count else 0
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(count),
"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)),
}
)
코드 분석:
- @dg.asset(...) 데코레이터:
- @dg.asset 데코레이터는 해당 파이썬 함수(products)를 Dagster의 자산으로 정의합니다. 자산은 Dagster가 관리하는 데이터의 논리적 단위이며, 테이블, 파일, 머신러닝 모델 등 다양한 형태를 가질 수 있습니다.
- compute_kind="duckdb": 이 속성은 해당 자산을 생성하는 연산이 DuckDB 엔진을 사용한다는 것을 Dagster에 알립니다. Dagster UI에서 이 자산의 실행 정보를 추적하고 관리하는 데 도움이 됩니다.
- group_name="ingestion": 이 속성은 Dagster UI에서 자산을 논리적으로 그룹화하는 데 사용됩니다. 여기서는 데이터 수집(ingestion) 관련 자산임을 나타냅니다.
- def products(duckdb: DuckDBResource) -> dg.MaterializeResult: 함수 정의:
- products: 자산의 이름입니다. Dagster UI 및 코드에서 이 이름으로 자산을 참조합니다. 일반적인 파이썬 함수는 load_data()와 같이 동사로 시작하는 경향이 있지만, @dg.asset 데코레이터가 적용된 함수의 이름은 products와 같이 명사 형태를 사용하는 것이 Dagster의 컨벤션에 부합하며, 코드의 의미를 더 명확하게 전달하고 UI에서의 가독성을 높이는 좋은 방법입니다.
- duckdb: DuckDBResource: 이 부분은 의존성 주입(Dependency Injection) 을 보여줍니다. Dagster는 resources 정의에 있는 DuckDBResource 인스턴스를 products 함수의 인자로 자동으로 전달합니다. 이를 통해 자산 함수 내에서 DuckDB 데이터베이스와 상호작용할 수 있습니다. 타입 힌팅(DuckDBResource)은 코드의 가독성을 높이고 정적 분석 도구의 활용을 돕습니다.
- -> dg.MaterializeResult: 이 부분은 함수의 반환 타입 힌트입니다. Dagster 자산 함수는 일반적으로 dg.MaterializeResult 객체를 반환하여 자산의 구체화 결과와 관련 메타데이터를 Dagster에 보고합니다.
- with duckdb.get_connection() as conn::
- duckdb.get_connection() 메서드를 사용하여 DuckDB 데이터베이스에 대한 연결(conn)을 안전하게 획득합니다. with 구문을 사용했으므로 블록이 종료되면 연결이 자동으로 닫힙니다.
- conn.execute(...):
- conn.execute() 메서드를 사용하여 DuckDB 데이터베이스에 SQL 쿼리를 실행합니다.
- create or replace table products as ( select * from read_csv_auto('data/products.csv') ): 이 SQL 쿼리는 data 폴더에 있는 products.csv 파일을 자동으로 읽어들여 products 라는 이름의 새로운 테이블을 생성하거나 기존 테이블이 있다면 덮어씁니다. read_csv_auto 함수는 CSV 파일의 구조를 자동으로 추론하여 테이블을 생성하는 편리한 DuckDB 기능입니다.
- 데이터 미리보기 및 행 개수 조회:
- preview_query = "select * from products limit 10": products 테이블의 처음 10개 행을 조회하는 SQL 쿼리를 정의합니다.
- preview_df = conn.execute(preview_query).fetchdf(): 쿼리를 실행하고 결과를 Pandas DataFrame(preview_df)으로 가져옵니다. 이는 Dagster UI에서 테이블 내용을 미리보기 형태로 보여주기 위함입니다.
- row_count = conn.execute("select count(*) from products").fetchone(): products 테이블의 총 행 개수를 조회하는 SQL 쿼리를 실행하고 결과를 가져옵니다.
- count = row_count[0] if row_count else 0: 조회 결과에서 행 개수를 추출합니다. 결과가 없는 경우 기본값으로 0을 설정합니다.
- return dg.MaterializeResult(...):
- dg.MaterializeResult 객체를 생성하여 자산의 구체화 결과를 Dagster에 보고합니다.
- metadata={...}: metadata 속성을 사용하여 자산에 대한 추가 정보를 Dagster UI에 제공합니다.
- "row_count": dg.MetadataValue.int(count): products 테이블의 총 행 개수를 정수 형태의 메타데이터로 Dagster에 전달합니다. Dagster UI에서 이 정보를 확인할 수 있습니다.
- "preview": dg.MetadataValue.md(preview_df.to_markdown(index=False)): products 테이블의 처음 10개 행을 Markdown 형태로 변환하여 Dagster UI에 미리보기 형태로 표시합니다. 이를 통해 데이터의 내용을 빠르게 확인할 수 있습니다.
결론:
이 코드는 Dagster를 사용하여 DuckDB 데이터베이스를 관리하는 기본적인 워크플로우를 보여줍니다. @dg.asset 데코레이터를 통해 데이터 처리 단위를 자산으로 정의하고, DuckDBResource를 통해 DuckDB와 쉽게 상호작용할 수 있습니다. 특히 dg.MaterializeResult의 metadata를 활용하여 자산의 실행 결과를 Dagster UI에 풍부하게 시각화함으로써 데이터 파이프라인의 이해도와 관리 효율성을 높일 수 있습니다. 이 코드를 기반으로 더 복잡한 데이터 처리 로직과 다양한 Dagster 기능을 통합하여 강력한 데이터 파이프라인을 구축해 보세요.
'시스템 개발 및 관리 > Dagster 사용법' 카테고리의 다른 글
Dagster의 강력한 기능, MonthlyPartitionsDefinition으로 월별 데이터 처리 자동화하기! (0) | 2025.05.06 |
---|---|
Dagster deps 파헤치기: 자산 간 의존성 관리의 핵심 (0) | 2025.05.06 |
Dagster Resource 완전 정복: 파이프라인의 든든한 연결 다리 (1) | 2025.05.05 |
Dagster 개발 환경 빠르게 시작하기: dagster dev -f <파일 경로> 완벽 분석 (0) | 2025.05.05 |
Dagster로 시작하는 데이터 엔지니어링 오케스트레이션 (0) | 2025.05.01 |