컨테이너·워크플로우 자동화/Airflow로 워크플로우 자동화하기

Airflow의 Operator와 Sensor — 태스크의 핵심 구성요소 이해하기

Data Jun 2025. 10. 14. 23:16

Airflow를 처음 배우면 “DAG, Task, Operator, Sensor” 같은 용어가 자주 등장합니다.
이 중에서도 Operator는 Airflow의 핵심 구성요소로,
하나의 작업(Task)이 무엇을 할지 정의하는 단위입니다.

 

1. Operator란?

Airflow의 DAG 안에서 Task는 모두 특정 Operator를 기반으로 만들어집니다.
즉, Operator는 Task의 “행동”을 결정하는 클래스라고 볼 수 있습니다.

Operator = Airflow에서 “무엇을 할지”를 정의하는 실행 단위입니다.

 

예를 들어 👇

  • BashOperator: 셸 명령 실행
  • PythonOperator: 파이썬 함수 실행
  • PostgresOperator: SQL 쿼리 실행
  • EmailOperator: 이메일 발송

이처럼 각 Operator는 특정 동작을 담당하며,
여러 Operator를 조합해 데이터 파이프라인(Workflow) 를 구성합니다.

 

간단한 예시

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello():
    print("Hello, Airflow!")

with DAG(
    dag_id="example_operator",
    start_date=datetime(2025, 10, 14),
    schedule_interval="@daily",
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id="print_date",
        bash_command="date"
    )

    task2 = PythonOperator(
        task_id="print_hello",
        python_callable=print_hello
    )

    task1 >> task2

이 DAG는
1️⃣ date 명령을 실행하고
2️⃣ 이후 파이썬 함수 print_hello() 를 실행합니다.

여기서 각각의 Task가 Operator를 기반으로 만들어진 것입니다.

 

 

2. Sensor란?

Sensor는 특정 조건이 충족될 때까지 대기하는 Operator의 한 종류입니다

 

Sensor는 보통 다음 세 가지 속성을 중심으로 동작합니다.

속성 설명
poke_interval 몇 초 간격으로 조건을 체크할지 
timeout 대기 제한 시간 (초 단위)
mode 동작 방식 (poke or reschedule)
  • poke 모드: 워커가 해당 태스크를 계속 점유하며 체크함 (리소스 더 많이 사용)
  • reschedule 모드: 일정 간격마다 체크하고, 그 외 시간에는 워커를 해제함 (리소스 효율적)

 

Sensor의 종류별 예시

1. FileSensor

 

로컬 파일 시스템에서 특정 파일이 존재할 때까지 대기

from airflow.sensors.filesystem import FileSensor
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="file_sensor_example",
    start_date=datetime(2025, 10, 14),
    schedule_interval=None,
    catchup=False
) as dag:

    wait_for_file = FileSensor(
        task_id="wait_for_input_file",
        filepath="/opt/airflow/data/input.csv",
        poke_interval=30,
        timeout=600,
        mode='poke'
    )

활용 예시:

  • ETL 파이프라인에서 “데이터 파일이 수집 폴더에 올라올 때까지” 대기

2. S3KeySensor


“데이터가 도착할 때까지 기다린다”거나 “파일이 생길 때까지 대기한다”는 식으로 동작합니다.

예를 들어, S3 버킷에 데이터가 업로드될 때까지 기다리고 싶다면 👇

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_file = S3KeySensor(
    task_id='wait_for_s3_file',
    bucket_key='path/to/data.csv',
    bucket_name='my-bucket',
    poke_interval=60,
    timeout=3600
)

poke_interval은 몇 초마다 체크할지,
timeout은 최대 대기 시간을 의미합니다.

 

3. ExternalTaskSensor

다른 DAG의 특정 태스크가 끝날 때까지 대기

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_other_dag = ExternalTaskSensor(
    task_id='wait_for_upstream_task',
    external_dag_id='data_ingestion_dag',
    external_task_id='load_to_db',
    poke_interval=60,
    timeout=3600
)

활용 예시:

  • DAG 간 의존성 설정 (예: “데이터 적재 DAG” → “모델 학습 DAG”)

4. SqlSensor

SQL 쿼리 결과가 특정 조건을 만족할 때까지 대기

from airflow.providers.common.sql.sensors.sql import SqlSensor

wait_for_rows = SqlSensor(
    task_id='wait_for_new_rows',
    conn_id='my_postgres_connection',
    sql="SELECT COUNT(*) FROM sample_table WHERE status='ready';",
    poke_interval=30,
    timeout=600
)

활용 예시:

  • 데이터베이스에 새로운 배치가 들어올 때까지 대기

5. TimeSensor

특정 시간이 될 때까지 대기

from airflow.sensors.time_sensor import TimeSensor
from datetime import time

wait_until_nine_am = TimeSensor(
    task_id='wait_until_9am',
    target_time=time(9, 0, 0)
)

 

활용 예시:

  • 매일 오전 9시 이후에만 실행되어야 하는 태스크

6. HttpSensor

외부 API의 응답 상태를 주기적으로 체크

from airflow.providers.http.sensors.http import HttpSensor

wait_for_api = HttpSensor(
    task_id='wait_for_api_response',
    http_conn_id='my_api_connection',
    endpoint='status',
    poke_interval=60,
    timeout=1200
)

활용 예시:

  • 외부 REST API가 “Ready” 상태가 될 때까지 대기

 

 

정리하면

 

Airflow의 Operator는 모든 Task의 실행 단위이며,
Sensor는 그중에서도 “조건을 기다리는 특수한 Operator”입니다.

이 두 가지를 잘 이해하면,
데이터 파이프라인에서 “언제 무엇을 실행할지”를 유연하게 제어할 수 있습니다.
결국, Airflow의 강력함은 이 Operator 체계의 확장성에서 나온다고 할 수 있습니다