컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow Sensor 실습 — PythonSensor로 파일 확인

Data Jun 2025. 10. 18. 10:24

Airflow의 Sensor 중 하나인 PythonSensor를 사용해서
특정 경로의 파일이 생길 때까지 대기하고,
파일이 생성되면 다음 태스크를 실행하는 로직을 구현해봅니다.

 

1. 구현 코드

import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, PythonSensor
from datetime import datetime

dag = DAG(
    'python_sensor_example',
    start_date=datetime(2025, 10, 18),
    schedule_interval=None
)

def check_file_exists():
    return True if os.path.exists('/opt/airflow/dags/python_operator/dummy.txt') else False

wait_for_file = PythonSensor(
    task_id='wait_for_file',
    python_callable=check_file_exists,
    timeout=600,           # 최대 10분간 기다림
    poke_interval=30,      # 30초마다 파일 존재 여부 확인
    mode='poke',           # poke 모드 (blocking 방식)
    dag=dag,
)

def run_after_sensor():
    print('✅ run_after_sensor executed!')

dummy_task_poke = PythonOperator(
    task_id='run_after_poke_task',
    python_callable=run_after_sensor,
    dag=dag,
)

wait_for_file >> dummy_task_poke

 

2. 코드 설명

  2.1 check_file_exists() – 파일 존재 여부 확인 함수

def check_file_exists():
    return True if os.path.exists('/opt/airflow/dags/python_operator/dummy.txt') else False
  • /opt/airflow/dags/python_operator/dummy.txt 경로에 파일이 있는지를 확인합니다.
  • True → 파일 존재 → 센서 성공
  • False → 파일 없음 → 주기적으로 다시 확인(poke)

즉, Airflow Sensor가 실행될 때마다
이 함수가 호출되어 파일이 생겼는지 감시하는 역할을 합니다.

 

 2.2 PythonSensor – 조건이 충족될 때까지 대기

wait_for_file = PythonSensor(
    task_id='wait_for_file',
    python_callable=check_file_exists,
    timeout=600,
    poke_interval=30,
    mode='poke',
    dag=dag,
)

주요 파라미터 설명

파라미터 의미
python_callable 감시할 조건 지정
timeout 최대 대기 시간(초 단위, 여기서는 10분)
poke_interval 파일 확인 주기(초 단위)
mode='poke' poke 모드: worker 해당 태스크를 점유하며 대기

poke 모드란?

  • Sensor가 실행 중인 worker가 계속 살아 있으면서 주기적으로 함수를 호출(polling)합니다.
  • mode='reschedule' 로 설정하면 worker를 점유하지 않고, 일정 시간 후 재예약(비동기) 방식으로 동작합니다.

 

2.3 run_after_sensor() – 이후 실행될 태스크

def run_after_sensor():
    print('✅ run_after_sensor executed!')

PythonSensor가 성공적으로 종료(True 반환)하면
이 태스크가 실행됩니다.

 

2.4 태스크 의존성 설정

wait_for_file >> dummy_task_poke

즉, 실행 순서는 다음과 같습니다 👇

(wait_for_file)  →  (run_after_poke_task)

 

3. 동작 흐름 요약

1️⃣ Sensor 실행 → check_file_exists() 호출
2️⃣ 파일이 없으면 → 30초 후 다시 호출 (poke_interval)
3️⃣ 파일이 생기면 → True 반환 → Sensor 성공
4️⃣ 다음 태스크(run_after_poke_task) 실행
5️⃣ 600초(10분) 지나도 파일이 없으면 → Sensor 실패 (DAG 실패)\

 

4. 시각적 다이어그램

+-------------------------------------+
|  wait_for_file (PythonSensor)       |
|-------------------------------------|
| check_file_exists()                 |
| - dummy.txt 있으면 True             |
| - 없으면 30초마다 재시도           |
| - 600초 초과 시 실패                |
+-------------------------------------+
                |
                ▼
+-------------------------------+
| run_after_poke_task           |
| (PythonOperator)              |
| print("run_after_sensor...")  |
+-------------------------------+

 

 

정리하면

 

이번 실습에서는 Airflow의 PythonSensor를 이용해
특정 조건(파일 존재 여부)이 만족될 때까지 DAG 실행을 일시 정지(pause) 시키는 방법을 배웠습니다.

 

포인트

  • Sensor는 DAG의 흐름을 제어하는 강력한 도구다.
  • poke vs reschedule 모드를 적절히 사용하면 리소스 효율적으로 감시 가능하다.
  • 단순한 파일 감시부터 API 응답, DB 상태 확인 등 다양한 용도로 확장 가능하다.