Airflow의 Sensor 중 하나인 PythonSensor를 사용해서
특정 경로의 파일이 생길 때까지 대기하고,
파일이 생성되면 다음 태스크를 실행하는 로직을 구현해봅니다.
1. 구현 코드
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, PythonSensor
from datetime import datetime
dag = DAG(
'python_sensor_example',
start_date=datetime(2025, 10, 18),
schedule_interval=None
)
def check_file_exists():
return True if os.path.exists('/opt/airflow/dags/python_operator/dummy.txt') else False
wait_for_file = PythonSensor(
task_id='wait_for_file',
python_callable=check_file_exists,
timeout=600, # 최대 10분간 기다림
poke_interval=30, # 30초마다 파일 존재 여부 확인
mode='poke', # poke 모드 (blocking 방식)
dag=dag,
)
def run_after_sensor():
print('✅ run_after_sensor executed!')
dummy_task_poke = PythonOperator(
task_id='run_after_poke_task',
python_callable=run_after_sensor,
dag=dag,
)
wait_for_file >> dummy_task_poke
2. 코드 설명
2.1 check_file_exists() – 파일 존재 여부 확인 함수
def check_file_exists():
return True if os.path.exists('/opt/airflow/dags/python_operator/dummy.txt') else False
- /opt/airflow/dags/python_operator/dummy.txt 경로에 파일이 있는지를 확인합니다.
- True → 파일 존재 → 센서 성공
- False → 파일 없음 → 주기적으로 다시 확인(poke)
즉, Airflow Sensor가 실행될 때마다
이 함수가 호출되어 파일이 생겼는지 감시하는 역할을 합니다.
2.2 PythonSensor – 조건이 충족될 때까지 대기
wait_for_file = PythonSensor(
task_id='wait_for_file',
python_callable=check_file_exists,
timeout=600,
poke_interval=30,
mode='poke',
dag=dag,
)
주요 파라미터 설명
| 파라미터 | 의미 |
| python_callable | 감시할 조건 지정 |
| timeout | 최대 대기 시간(초 단위, 여기서는 10분) |
| poke_interval | 파일 확인 주기(초 단위) |
| mode='poke' | poke 모드: worker 해당 태스크를 점유하며 대기 |
poke 모드란?
- Sensor가 실행 중인 worker가 계속 살아 있으면서 주기적으로 함수를 호출(polling)합니다.
- mode='reschedule' 로 설정하면 worker를 점유하지 않고, 일정 시간 후 재예약(비동기) 방식으로 동작합니다.
2.3 run_after_sensor() – 이후 실행될 태스크
def run_after_sensor():
print('✅ run_after_sensor executed!')
PythonSensor가 성공적으로 종료(True 반환)하면
이 태스크가 실행됩니다.
2.4 태스크 의존성 설정
wait_for_file >> dummy_task_poke
즉, 실행 순서는 다음과 같습니다 👇
(wait_for_file) → (run_after_poke_task)
3. 동작 흐름 요약
1️⃣ Sensor 실행 → check_file_exists() 호출
2️⃣ 파일이 없으면 → 30초 후 다시 호출 (poke_interval)
3️⃣ 파일이 생기면 → True 반환 → Sensor 성공
4️⃣ 다음 태스크(run_after_poke_task) 실행
5️⃣ 600초(10분) 지나도 파일이 없으면 → Sensor 실패 (DAG 실패)\
4. 시각적 다이어그램
+-------------------------------------+
| wait_for_file (PythonSensor) |
|-------------------------------------|
| check_file_exists() |
| - dummy.txt 있으면 True |
| - 없으면 30초마다 재시도 |
| - 600초 초과 시 실패 |
+-------------------------------------+
|
▼
+-------------------------------+
| run_after_poke_task |
| (PythonOperator) |
| print("run_after_sensor...") |
+-------------------------------+
정리하면
이번 실습에서는 Airflow의 PythonSensor를 이용해
특정 조건(파일 존재 여부)이 만족될 때까지 DAG 실행을 일시 정지(pause) 시키는 방법을 배웠습니다.
포인트
- Sensor는 DAG의 흐름을 제어하는 강력한 도구다.
- poke vs reschedule 모드를 적절히 사용하면 리소스 효율적으로 감시 가능하다.
- 단순한 파일 감시부터 API 응답, DB 상태 확인 등 다양한 용도로 확장 가능하다.
'컨테이너·워크플로우 자동화 > Airflow로 워크플로우 자동화하기' 카테고리의 다른 글
| Airflow BranchOperator로 조건 분기 처리하기 (0) | 2025.10.18 |
|---|---|
| Airflow Sensor의 soft_fail 이해하기 (0) | 2025.10.18 |
| Airflow XCom과 ShortCircuitOperator 이해하기 (0) | 2025.10.18 |
| Airflow ShortCircuitOperator — 조건에 따라 Task 실행 제어하기 (0) | 2025.10.18 |
| Airflow에서 system_site_packages와 requirements 함께 쓰기 (0) | 2025.10.17 |