이전 포스팅에서는 Dagster 스케줄링과 자산 자동화를 통해 데이터 파이프라인을 관리하는 방법을 살펴보았습니다. 이번에는 Dagster의 강력한 기능 중 하나인 **센서(Sensor)**를 활용하여 외부 이벤트나 특정 조건을 감지하고, 이에 따라 워크플로우를 자동으로 트리거하는 방법을 소개합니다. 특히 데이터가 불규칙적으로 도착하거나 실시간 처리가 필요한 상황에서 센서는 매우 유용합니다.
센서란 무엇일까요? 🤔
Dagster 센서는 외부 시스템이나 조건을 지속적으로 모니터링하고, 정의된 조건이 충족되면 Dagster Job을 실행하는 기능을 제공합니다. 스케줄이 정해진 시간에 작업을 실행하는 것과 달리, 센서는 이벤트 발생을 기반으로 워크플로우를 자동화합니다.
언제 센서를 사용해야 할까요? 🚦
- 이벤트 기반 워크플로우: 새로운 데이터 파일이 특정 디렉토리에 도착했을 때, API 응답이 특정 값으로 변경되었을 때 등 외부 이벤트에 따라 워크플로우를 실행해야 하는 경우.
- 조건부 실행: 특정 조건(예: 데이터 품질 검사 실패, 특정 임계값 초과)이 만족되었을 때만 작업을 실행하여 불필요한 연산을 줄이고자 하는 경우.
- 실시간 처리: 데이터를 사용할 수 있게 되는 즉시 (예약된 시간을 기다리지 않고) 처리해야 하는 경우.
1. 이벤트 기반 자산 만들기: adhoc_request 🎯
이번 단계에서는 경영진이 특정 부서 및 제품별 판매 결과에 대한 피벗 테이블 보고서를 실시간으로 요청하는 상황을 모델링합니다. 이를 위해 adhoc_request라는 새로운 Dagster 자산을 정의합니다.
import dagster as dg
from dagster_duckdb import DuckDBResource
class AdhocRequestConfig(dg.Config):
department: str
product: str
start_date: str
end_date: str
@dg.asset(
deps=["joined_data"],
compute_kind="python",
)
def adhoc_request(
config: AdhocRequestConfig, duckdb: DuckDBResource
) -> dg.MaterializeResult:
query = f"""
select
department,
rep_name,
product_name,
sum(dollar_amount) as total_sales
from joined_data
where date >= '{config.start_date}'
and date < '{config.end_date}'
and department = '{config.department}'
and product_name = '{config.product}'
group by
department,
rep_name,
product_name
"""
with duckdb.get_connection() as conn:
preview_df = conn.execute(query).fetchdf()
return dg.MaterializeResult(
metadata={"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))}
)
코드 설명:
- AdhocRequestConfig(dg.Config): 이 클래스는 adhoc_request 자산을 Materialize할 때 필요한 설정 정보를 정의합니다. 경영진의 요청에 따라 department, product, start_date, end_date를 입력받도록 설계되었습니다.
- @dg.asset(...) 데코레이터: adhoc_request 함수를 Dagster 자산으로 정의합니다.
- deps=["joined_data"]: 이 자산은 joined_data 자산에 의존합니다. 즉, joined_data가 Materialize된 후에 실행될 수 있습니다.
- compute_kind="python": 이 자산은 Python 코드로 실행될 것임을 나타냅니다.
- adhoc_request(config: AdhocRequestConfig, duckdb: DuckDBResource) -> dg.MaterializeResult: 자산 함수는 AdhocRequestConfig 타입의 config 인자를 통해 요청 정보를 받습니다. 또한 DuckDB 리소스에 접근하여 데이터를 쿼리합니다.
- SQL 쿼리: 입력받은 config 값을 기반으로 joined_data 테이블에서 특정 기간, 부서, 제품에 대한 총 판매액을 집계하는 SQL 쿼리를 생성합니다.
- DuckDB 연결 및 쿼리 실행: DuckDB 리소스를 사용하여 데이터베이스에 연결하고 쿼리를 실행하여 결과를 Pandas DataFrame으로 가져옵니다.
- dg.MaterializeResult: 쿼리 결과를 미리보기 형식의 Markdown으로 변환하여 Dagster UI에 표시하는 메타데이터와 함께 Materialize 결과를 반환합니다.
이 adhoc_request 자산은 특정 조건(경영진의 요청)에 따라 실행되어야 하므로, 스케줄링보다는 센서를 통해 자동화하는 것이 더 적합합니다
'시스템 개발 및 관리 > Dagster 사용법' 카테고리의 다른 글
Dagster 똑똑하게 실행하기: -m과 -f 옵션, 언제 어떻게 사용할까? (1) | 2025.05.08 |
---|---|
Dagster 센서 구축: 파일 시스템 이벤트를 감지하여 실시간 보고서 생성하기 (2) | 2025.05.07 |
Dagster 정의(Definitions) 파일로 데이터 파이프라인 통합 관리하기 (0) | 2025.05.07 |
Dagster로 데이터 자산 Materialize하기: 당신의 데이터를 현실로 만드는 마법 (0) | 2025.05.07 |
Dagster 자산 자동화로 데이터 파이프라인 똑똑하게 관리하기: Eager Automation (0) | 2025.05.07 |