자동화된 데이터 필터링과 스케줄링 구축
데이터 분석 및 시스템 모니터링에서는 일정 주기로 데이터를 수집하고 처리하는 자동화된 파이프라인이 필수적이다. 이를 위해 DataFilterManager를 활용하여 데이터를 필터링하고, 특정 조건을 만족하는 경우 건강 평가 알고리즘을 적용하는 구조를 구축할 수 있다. 또한, 스케줄링 기능을 통해 일정한 간격으로 데이터를 자동으로 가져와 분석하는 프로세스를 운영할 수 있다. 본 글에서는 이러한 자동화 데이터 파이프라인의 구성 요소와 흐름을 설명하고, DataFilterManager를 활용한 데이터 필터링 및 스케줄링 설정 과정을 정리한다.
import sys
import os
# 경로 설정: 스크립트 경로에서 상위 디렉토리로 이동한 후 src 경로 추가
health_data_path = os.path.abspath(os.path.join('..', 'src'))
health_learning_data_path = os.path.abspath(os.path.join(os.getcwd(), "../../HealthModelPipeline/dataflow/src"))
preprocessing_path = os.path.abspath(os.path.join(os.getcwd(), "../../PipelinePrep/src"))
paths = [health_data_path, health_learning_data_path, preprocessing_path]
def add_paths(paths):
"""
지정된 경로들이 sys.path에 없으면 추가하는 함수.
Parameters:
- paths (list): 추가하려는 경로들의 리스트.
"""
for path in paths:
if path not in sys.path:
sys.path.append(path)
print(f"Path added: {path}")
else:
print(f"Path already exists: {path}")
add_paths(paths)
# basic
import pandas as pd
# time
import schedule
import time
# module.healthchecker
from stat_healthchecker.total_system_health_algorithm import apply_system_health_algorithms_with_total
from models_healthchecker.total_system_health_learning_algorithm import apply_system_health_learning_algorithms_with_total
from prep.load_processing import distribute_by_application
# module.dataline
from stat_dataline.logger_confg import logger
from stat_dataline.search_status_flag import DataPipelineManager
특정 경로를 sys.path에 추가하여 모듈을 불러올 수 있도록 설정하는 부분이 포함되어 있습니다.
- 경로 설정
- health_data_path, health_learning_data_path, preprocessing_path라는 세 개의 경로를 정의하고 있음.
- os.path.abspath와 os.path.join을 사용하여 현재 스크립트의 위치를 기준으로 상대 경로를 절대 경로로 변환.
- 경로 추가 함수 (add_paths)
- sys.path에 지정한 경로들을 추가하는 add_paths(paths) 함수를 정의.
- sys.path에 해당 경로가 없는 경우에만 추가하고, 이미 존재하면 추가하지 않음.
- 모듈 임포트
- pandas, schedule, time 같은 기본 라이브러리를 불러옴.
- stat_healthchecker, models_healthchecker, prep.load_processing 등 사용자 정의 패키지를 임포트.
- stat_dataline.logger_config, stat_dataline.search_status_flag에서 logger와 DataPipelineManager를 불러옴.
- 프로젝트에서 데이터 처리 및 시스템 건강도를 평가하는 기능을 수행하는 코드 패키지 및 모듈 임포트
- apply_system_health_algorithms_with_total, apply_system_health_learning_algorithm 등의 함수명이 시스템 건강 분석과 관련된 작업을 수행
- distribute_by_application 함수는 데이터를 특정 기준으로 분배하는 역할
def schedule_health_assessment():
grouped_index, fetched_data = get_latest_date_on_schedule()
for index in grouped_index:
# 해당 오퍼레이션 선박, 인덱스, 섹션 추출
ship_id = index[0]
op_index = index[1]
section = index[2]
# 데이터 처리를 위한 갯수 조건을 만족하는지 판단
selected_df = fetched_data[(fetched_data['SHIP_ID']==ship_id) & (fetched_data['OP_INDEX']==op_index) & (fetched_data['SECTION']==section)]
# 해당 오퍼레이션 데이터 길이 추출
data_len = len(selected_df)
# 해당 오퍼레이션 시작 시간 추출
date_time = selected_df.iloc[0]['DATA_TIME']
print(f'SHIP_ID : {ship_id} / OP_INDEX : {op_index} / SECTION : {section} - 데이터 선택 ({data_len})')
if (data_len>=160) :
print(f'SHIP_ID : {ship_id} / OP_INDEX : {op_index} / SECTION : {section} - 조건 통과')
try:
sensor, preprocessed = distribute_by_application(ship_id=ship_id, op_index=op_index, section=section)
if sensor is None and preprocessed is None:
print("선박 데이터 프레임이 존재하지 않습니다.")
continue
elif preprocessed is not None:
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY=The results were derived from the model and statistics package | TYPE=all | IS_PROCESSED=True')
print("전처리 후 학습 데이터 프레임이 존재합니다.")
apply_system_health_algorithms_with_total(sensor, ship_id, op_index, section)
apply_system_health_learning_algorithms_with_total(data=preprocessed, ship_id=ship_id, op_index=op_index, section=section)
else:
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY=After preprocessing, the model data frame does not exist, so only the statistical algorithm proceeds alone | TYPE=stats | IS_PROCESSED=True')
print("전처리 후 모델 데이터 프레임이 존재하지 않아 통계 알고리즘 단독 진행합니다.")
apply_system_health_algorithms_with_total(data=sensor, ship_id=ship_id, op_index=op_index, section=section)
except ValueError as e :
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
continue # 에러 발생 시 다음 반복으로 넘어감\
except KeyError as e :
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
except TypeError as e :
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
continue # 에러 발생 시 다음 반복으로 넘어감
except IndexError as e :
print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
continue # 에러 발생 시 다음 반복으로 넘어감
else:
logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY=The data length is {data_len} and does not satisfy the condition | TYPE=data_length_limit | IS_PROCESSED=False')
이 코드는 선박 운영 데이터의 건강 상태를 평가하는 스케줄링 함수를 구현한 것으로, 특정 조건을 충족하는 데이터를 처리한 후, 적절한 알고리즘을 적용하여 평가를 수행한다.
- get_latest_date_on_schedule()을 호출하여 최신 데이터를 가져오고, SHIP_ID, OP_INDEX, SECTION을 추출함.
- 추출된 정보를 기반으로 데이터를 필터링하고, 데이터 개수가 160개 이상인지 확인.
- 조건을 충족하면 distribute_by_application()을 호출하여 센서 데이터 및 전처리된 데이터 생성.
- 전처리 데이터가 존재함 -> apply_system_health_algorithms_with_total()의 통계 알고리즘과 apply_system_health_learning_algorithms_with_total()을 실행하여 모델 기반 건강 평가 수행.
- 전처리 데이터가 없음 -> apply_system_health_algorithms_with_total()을 실행하여 통계 알고리즘만 적용.
- 데이터 개수가 부족하면 로그를 남기고 처리하지 않음.
- 실행 중 ValueError, KeyError, TypeError, IndexError 등의 예외 발생 시 로그를 기록하고 다음 반복으로 진행.
- 개별 데이터 단위로 독립적인 처리가 가능하며, 조건 미충족 또는 오류 발생 시에도 로그를 남겨 추후 분석 가능하도록 설계됨.
데이터 필터링 및 그룹화 후 건강 평가 스케줄링 과정
def get_latest_date_on_schedule():
pipeline = DataPipelineManager()
fetched_data = pipeline.filter_by_flag_status()
# 해당 추출 데이터 그룹화
grouped_data = fetched_data.groupby(['SHIP_ID','OP_INDEX','SECTION']).count()
# 그룹 후 인덱스 추출
grouped_index = grouped_data.index
return grouped_index, fetched_data
데이터 그룹화 (get_latest_date_on_schedule())
- filter_by_flag_status()에서 반환된 데이터를 받아 처리.
- SHIP_ID, OP_INDEX, SECTION을 기준으로 groupby()를 수행하여 그룹화.
- 그룹화된 데이터의 인덱스를 grouped_index로 저장.
DataFilterManager의 역할 및 위치
class ScheduledDataFetcher:
""" [책임] 주어진 일정에 따라 데이터를 필터링하고 반환
"""
def __init__(self, db_engine:DatabaseEngine, table_name):
self.__engine = db_engine.engine # [캡슐화] DB 엔진 접근
self.__table_name = table_name # [캡슐화] 테이블 이름
def __fetch_data(self, start_time, end_time):
""" [내부 전용 메서드] 주어진 시간 범위에서 데이터 조회
"""
query = f"""
SELECT * FROM `{self.__table_name}`
WHERE `REG_DATE` BETWEEN '{start_time}' AND '{end_time}' AND `FLAG` = 0;
"""
df = pd.read_sql(query, self.__engine)
return df
def __update_flag(self, start_time, end_time):
""" [내부 전용 메서드]
"""
update_query = f"""
UPDATE `{self.__table_name}`
SET `FLAG` = 1
WHERE `REG_DATE` BETWEEN '{start_time}' AND '{end_time}' AND `FLAG` = 0;
"""
with self.__engine.begin() as connection:
connection.execute(update_query)
def fetch_data_on_schedule(self, start_time, end_time):
""" [행동] 일정에 따라 데이터 필터링 및 FLAG 업데이트트
"""
df = self.__fetch_data(start_time, end_time)
if df.empty:
print("[ScheduledDataFetcher] No data found for the given schedule.")
return None
self.__update_flag(start_time, end_time)
return df
class DataFilterManager:
""" [책임] 날짜를 기반으로 데이터 필터링 및 스케줄링 관리
"""
def __init__(self, reference_date_manager:ReferenceDateManager, scheduled_fetcher:ScheduledDataFetcher):
self.__reference_date_manager = reference_date_manager # [캡슐화] ReferenceDateManager 의존성
self.__scheduled_fetcher = scheduled_fetcher # [캡슐화] ScheduledDataFetcher 의존성
def filter_by_flag_status(self):
""" [행동] 날짜 목록을 기준으로 데이터를 필터링
"""
reference_dates = self.__reference_date_manager.get_reference_dates()
if not reference_dates:
print("[DataFilterManager] No reference dates available.")
return None
filtered_dataframes = []
for date in reference_dates:
start_time = pd.Timestamp(date)
end_time = start_time + pd.Timedelta(days=1)
filtered_data = self.__scheduled_fetcher.fetch_data_on_schedule(start_time, end_time)
if filtered_data is not None:
filtered_dataframes.append(filtered_data)
print("\n[건전성 분석 데이터 리턴...]")
return pd.concat(filtered_dataframes, ignore_index=True) if filtered_dataframes else None
데이터 필터링 (DataFilterManager)
- filter_by_flag_status()에서 날짜 목록을 가져와 기준 설정.
- ScheduledDataFetcher를 사용해 해당 날짜의 데이터를 조회.
- 조회된 데이터들을 리스트에 저장한 후, pd.concat()을 통해 하나의 데이터프레임으로 반환.
DataFilterManager 클래스는 src/stat_dataline/search_status_flag.py 내에 존재하며, 날짜를 기준으로 데이터를 필터링하고, 스케줄링을 관리하는 역할을 한다. get_latest_date_on_schedule() 함수에서 DataFilterManager의 filter_by_flag_status() 메서드를 호출하여 최신 데이터를 조회 및 그룹화한 후, 건강 평가 스케줄링(schedule_health_assessment)에 활용된다. 즉, DataFilterManager는 search_status_flag에서 데이터를 필터링하고, 이후 건강 평가 프로세스의 입력으로 활용하는 핵심적인 역할을 담당한다.
해당 로그는 선박 데이터의 건강 평가 과정에서 데이터 개수 조건을 만족하지 못한 항목들을 기록한 로그이다.
- SHIP_ID=xxx-xxx→ 선박 ID
- OP_INDEX → 오퍼레이션 인덱스
- SECTION=0 → 섹션 정보
- START_TIME=2023-09-13 ... → 해당 데이터의 시작 시간
- LOG_ENTRY=The data length is XX and does not satisfy the condition → 데이터 개수가 XX개로, 최소 조건(160개 이상)을 충족하지 못함
즉, 각 선박 및 오퍼레이션별로 데이터 개수를 확인한 후, 조건을 충족하지 않는 경우 로그를 남기는 과정을 보여준다.
라이브러리 구조 및 DataFilterManager 위치
이 프로젝트의 디렉토리 구조는 크게 스크립트 실행, 데이터 처리, 상태 관리, 건강 평가, 시각화 모듈로 나뉘어 있다.
- scripts/ → 실행 관련 파일 (main.py, main_runserver.py 등) 및 로그 파일 저장
- src/ → 주요 소스 코드 폴더
stat_dataline/ → 데이터 관련 관리 패키지
search_status_flag.py → DataFilterManager 포함
load_database.py, logger_config.py 등 데이터 로드 및 로깅 관련 모듈
stat_healthchecker/ → 건강 평가 알고리즘 관련 패키지
stat_model/ → 모델 관련 패키지
stat_visualizer/ → 시각화 관련 패키지
# 스케줄 설정: 3일에 한 번씩 데이터 가져오기
schedule.every(1).days.at("15:02").do(schedule_health_assessment)
# 스케줄 지속 실행
while True:
print("스케줄 시작")
schedule.run_pending()
time.sleep(1)
이 코드는 스케줄링을 통해 일정 주기로 schedule_health_assessment() 함수를 실행하는 기능을 한다.
- schedule.every(1).days.at("15:02").do(schedule_health_assessment) → 매일 오후 3시 2분에 schedule_health_assessment() 실행
- while True: → 무한 루프를 돌며 스케줄을 지속적으로 확인
- schedule.run_pending() → 예약된 작업이 있으면 실행
- time.sleep(1) → 1초 대기 후 다시 스케줄 확인
즉, 설정된 시간마다 자동으로 데이터를 가져와 건강 평가를 수행하는 자동화 프로세스이다.
전체 흐름 정리
데이터 필터링 → 선박별 그룹화 → 평가 조건 확인 → 건강 평가 알고리즘 적용.
이를 통해 특정 날짜 범위의 데이터를 선별하고, 선박 운영 상태를 체계적으로 분석할 수 있도록 설계됨.