시스템 개발 및 관리/MLOps 핵심 기술: 머신러닝 자동화

자동화된 데이터 필터링과 스케줄링 구축

Data Jun 2025. 3. 5. 10:41

데이터 분석 및 시스템 모니터링에서는 일정 주기로 데이터를 수집하고 처리하는 자동화된 파이프라인이 필수적이다. 이를 위해 DataFilterManager를 활용하여 데이터를 필터링하고, 특정 조건을 만족하는 경우 건강 평가 알고리즘을 적용하는 구조를 구축할 수 있다. 또한, 스케줄링 기능을 통해 일정한 간격으로 데이터를 자동으로 가져와 분석하는 프로세스를 운영할 수 있다. 본 글에서는 이러한 자동화 데이터 파이프라인의 구성 요소와 흐름을 설명하고, DataFilterManager를 활용한 데이터 필터링 및 스케줄링 설정 과정을 정리한다.

 

import sys
import os

# 경로 설정: 스크립트 경로에서 상위 디렉토리로 이동한 후 src 경로 추가
health_data_path = os.path.abspath(os.path.join('..', 'src'))
health_learning_data_path = os.path.abspath(os.path.join(os.getcwd(), "../../HealthModelPipeline/dataflow/src"))
preprocessing_path = os.path.abspath(os.path.join(os.getcwd(), "../../PipelinePrep/src"))

paths = [health_data_path, health_learning_data_path, preprocessing_path]

def add_paths(paths):
    """
    지정된 경로들이 sys.path에 없으면 추가하는 함수.
    
    Parameters:
    - paths (list): 추가하려는 경로들의 리스트.
    """
    for path in paths:
        if path not in sys.path:
            sys.path.append(path)
            print(f"Path added: {path}")
        else:
            print(f"Path already exists: {path}")
            
add_paths(paths)

# basic
import pandas as pd

# time
import schedule
import time

# module.healthchecker
from stat_healthchecker.total_system_health_algorithm import apply_system_health_algorithms_with_total
from models_healthchecker.total_system_health_learning_algorithm import apply_system_health_learning_algorithms_with_total
from prep.load_processing import distribute_by_application

# module.dataline
from stat_dataline.logger_confg import logger
from stat_dataline.search_status_flag import DataPipelineManager

특정 경로를 sys.path에 추가하여 모듈을 불러올 수 있도록 설정하는 부분이 포함되어 있습니다.

  • 경로 설정
    • health_data_path, health_learning_data_path, preprocessing_path라는 세 개의 경로를 정의하고 있음.
    • os.path.abspath와 os.path.join을 사용하여 현재 스크립트의 위치를 기준으로 상대 경로를 절대 경로로 변환.
  • 경로 추가 함수 (add_paths)
    • sys.path에 지정한 경로들을 추가하는 add_paths(paths) 함수를 정의.
    • sys.path에 해당 경로가 없는 경우에만 추가하고, 이미 존재하면 추가하지 않음.
  • 모듈 임포트
    • pandas, schedule, time 같은 기본 라이브러리를 불러옴.
    • stat_healthchecker, models_healthchecker, prep.load_processing 등 사용자 정의 패키지를 임포트.
    • stat_dataline.logger_config, stat_dataline.search_status_flag에서 logger와 DataPipelineManager를 불러옴.
  • 프로젝트에서 데이터 처리 및 시스템 건강도를 평가하는 기능을 수행하는 코드 패키지 및 모듈 임포트
  • apply_system_health_algorithms_with_total, apply_system_health_learning_algorithm 등의 함수명이 시스템 건강 분석과 관련된 작업을 수행
  • distribute_by_application 함수는 데이터를 특정 기준으로 분배하는 역할

 

def schedule_health_assessment():
    grouped_index, fetched_data = get_latest_date_on_schedule()
    
    for index in grouped_index:
        # 해당 오퍼레이션 선박, 인덱스, 섹션 추출
        ship_id =  index[0]
        op_index = index[1]
        section =  index[2] 
        
        # 데이터 처리를 위한 갯수 조건을 만족하는지 판단 
        selected_df = fetched_data[(fetched_data['SHIP_ID']==ship_id) & (fetched_data['OP_INDEX']==op_index) & (fetched_data['SECTION']==section)]

        # 해당 오퍼레이션 데이터 길이 추출
        data_len = len(selected_df)

        # 해당 오퍼레이션 시작 시간 추출
        date_time = selected_df.iloc[0]['DATA_TIME']
       
        print(f'SHIP_ID : {ship_id} / OP_INDEX : {op_index} / SECTION : {section} -  데이터 선택 ({data_len})')
        
        if (data_len>=160) :
            
            print(f'SHIP_ID : {ship_id} / OP_INDEX : {op_index} / SECTION : {section} -  조건 통과')      
            
            try:
                sensor, preprocessed = distribute_by_application(ship_id=ship_id, op_index=op_index, section=section)
                if sensor is None and preprocessed is None:
                    print("선박 데이터 프레임이 존재하지 않습니다.")
                    continue       

                elif preprocessed is not None:
                    logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY=The results were derived from the model and statistics package | TYPE=all | IS_PROCESSED=True')
                    print("전처리 후 학습 데이터 프레임이 존재합니다.")
                    apply_system_health_algorithms_with_total(sensor, ship_id, op_index, section)
                    apply_system_health_learning_algorithms_with_total(data=preprocessed, ship_id=ship_id, op_index=op_index, section=section)
                else:
                    logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time}  | LOG_ENTRY=After preprocessing, the model data frame does not exist, so only the statistical algorithm proceeds alone | TYPE=stats | IS_PROCESSED=True')
                    print("전처리 후 모델 데이터 프레임이 존재하지 않아 통계 알고리즘 단독 진행합니다.")
                    apply_system_health_algorithms_with_total(data=sensor, ship_id=ship_id, op_index=op_index, section=section)
                
            except ValueError as e :
                logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
                print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
                continue  # 에러 발생 시 다음 반복으로 넘어감\

            except KeyError as e :
                logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
                print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')

            except TypeError as e :
                logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
                print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
                continue  # 에러 발생 시 다음 반복으로 넘어감
                
            except IndexError as e :
                print(f'에러 발생: {e}. 다음 반복으로 넘어갑니다.')
                logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY={e} | TYPE=exceptional_handling | IS_PROCESSED=False')
                continue  # 에러 발생 시 다음 반복으로 넘어감 
        else:
            logger.info(f'SHIP_ID={ship_id} | OP_INDEX={op_index} | SECTION={section} | START_TIME={date_time} | LOG_ENTRY=The data length is {data_len} and does not satisfy the condition | TYPE=data_length_limit | IS_PROCESSED=False')

이 코드는 선박 운영 데이터의 건강 상태를 평가하는 스케줄링 함수를 구현한 것으로, 특정 조건을 충족하는 데이터를 처리한 후, 적절한 알고리즘을 적용하여 평가를 수행한다.

  • get_latest_date_on_schedule()을 호출하여 최신 데이터를 가져오고, SHIP_ID, OP_INDEX, SECTION을 추출함.
  • 추출된 정보를 기반으로 데이터를 필터링하고, 데이터 개수가 160개 이상인지 확인.
  • 조건을 충족하면 distribute_by_application()을 호출하여 센서 데이터 및 전처리된 데이터 생성.
  • 전처리 데이터가 존재함 -> apply_system_health_algorithms_with_total()의  통계 알고리즘과 apply_system_health_learning_algorithms_with_total()을 실행하여 모델 기반 건강 평가 수행.
  • 전처리 데이터가 없음 -> apply_system_health_algorithms_with_total()을 실행하여 통계 알고리즘만 적용.
  • 데이터 개수가 부족하면 로그를 남기고 처리하지 않음.
  • 실행 중 ValueError, KeyError, TypeError, IndexError 등의 예외 발생 시 로그를 기록하고 다음 반복으로 진행.
  • 개별 데이터 단위로 독립적인 처리가 가능하며, 조건 미충족 또는 오류 발생 시에도 로그를 남겨 추후 분석 가능하도록 설계됨.

데이터 필터링 및 그룹화 후 건강 평가 스케줄링 과정

def get_latest_date_on_schedule():
    pipeline = DataPipelineManager()
    fetched_data = pipeline.filter_by_flag_status()
    
    # 해당 추출 데이터 그룹화
    grouped_data = fetched_data.groupby(['SHIP_ID','OP_INDEX','SECTION']).count() 
    
    # 그룹 후 인덱스 추출 
    grouped_index = grouped_data.index
    
    return grouped_index, fetched_data

데이터 그룹화 (get_latest_date_on_schedule())

  • filter_by_flag_status()에서 반환된 데이터를 받아 처리.
  • SHIP_ID, OP_INDEX, SECTION을 기준으로 groupby()를 수행하여 그룹화.
  • 그룹화된 데이터의 인덱스를 grouped_index로 저장.

 

DataFilterManager의 역할 및 위치


class ScheduledDataFetcher:
    """ [책임] 주어진 일정에 따라 데이터를 필터링하고 반환
    """
    
    def __init__(self, db_engine:DatabaseEngine, table_name):
        self.__engine = db_engine.engine # [캡슐화] DB 엔진 접근
        self.__table_name = table_name # [캡슐화] 테이블 이름

    
    def __fetch_data(self, start_time, end_time):
        """  [내부 전용 메서드] 주어진 시간 범위에서 데이터 조회
        """

        query = f"""
        SELECT * FROM `{self.__table_name}` 
        WHERE `REG_DATE` BETWEEN '{start_time}' AND '{end_time}' AND `FLAG` = 0;
        """
        df = pd.read_sql(query, self.__engine)

        return df

    
    def __update_flag(self, start_time, end_time):
        """ [내부 전용 메서드]
        """

        update_query = f"""
        UPDATE `{self.__table_name}`
        SET `FLAG` = 1
        WHERE `REG_DATE` BETWEEN '{start_time}' AND '{end_time}' AND `FLAG` = 0;
        """
        with self.__engine.begin() as connection:
            connection.execute(update_query)

    
    def fetch_data_on_schedule(self, start_time, end_time):
        """ [행동] 일정에 따라 데이터 필터링 및 FLAG 업데이트트
        """
        
        df = self.__fetch_data(start_time, end_time)
        if df.empty:
            print("[ScheduledDataFetcher] No data found for the given schedule.")
            return None
        
        self.__update_flag(start_time, end_time)
        
        return df
        
class DataFilterManager:
    """ [책임] 날짜를 기반으로 데이터 필터링 및 스케줄링 관리 
    """

    def __init__(self, reference_date_manager:ReferenceDateManager, scheduled_fetcher:ScheduledDataFetcher):
        self.__reference_date_manager = reference_date_manager  # [캡슐화] ReferenceDateManager 의존성
        self.__scheduled_fetcher = scheduled_fetcher  # [캡슐화] ScheduledDataFetcher 의존성

    
    def filter_by_flag_status(self):
        """ [행동] 날짜 목록을 기준으로 데이터를 필터링
        """

        reference_dates  = self.__reference_date_manager.get_reference_dates()
        if not reference_dates:
            print("[DataFilterManager] No reference dates available.")
            return None
        
        filtered_dataframes = []

        for date in reference_dates:
            start_time = pd.Timestamp(date)
            end_time = start_time + pd.Timedelta(days=1)

            filtered_data = self.__scheduled_fetcher.fetch_data_on_schedule(start_time, end_time)
            if filtered_data is not None:
                filtered_dataframes.append(filtered_data)

        print("\n[건전성 분석 데이터 리턴...]")
        return pd.concat(filtered_dataframes, ignore_index=True) if filtered_dataframes else None

데이터 필터링 (DataFilterManager)

  • filter_by_flag_status()에서 날짜 목록을 가져와 기준 설정.
  • ScheduledDataFetcher를 사용해 해당 날짜의 데이터를 조회.
  • 조회된 데이터들을 리스트에 저장한 후, pd.concat()을 통해 하나의 데이터프레임으로 반환.

DataFilterManager 클래스는 src/stat_dataline/search_status_flag.py 내에 존재하며, 날짜를 기준으로 데이터를 필터링하고, 스케줄링을 관리하는 역할을 한다. get_latest_date_on_schedule() 함수에서 DataFilterManager의 filter_by_flag_status() 메서드를 호출하여 최신 데이터를 조회 및 그룹화한 후, 건강 평가 스케줄링(schedule_health_assessment)에 활용된다. 즉, DataFilterManager는 search_status_flag에서 데이터를 필터링하고, 이후 건강 평가 프로세스의 입력으로 활용하는 핵심적인 역할을 담당한다.

 

해당 로그는 선박 데이터의 건강 평가 과정에서 데이터 개수 조건을 만족하지 못한 항목들을 기록한 로그이다.

  • SHIP_ID=xxx-xxx→ 선박 ID
  • OP_INDEX → 오퍼레이션 인덱스
  • SECTION=0 → 섹션 정보
  • START_TIME=2023-09-13 ... → 해당 데이터의 시작 시간
  • LOG_ENTRY=The data length is XX and does not satisfy the condition → 데이터 개수가 XX개로, 최소 조건(160개 이상)을 충족하지 못함

즉, 각 선박 및 오퍼레이션별로 데이터 개수를 확인한 후, 조건을 충족하지 않는 경우 로그를 남기는 과정을 보여준다.

 

라이브러리 구조 및 DataFilterManager 위치

이 프로젝트의 디렉토리 구조는 크게 스크립트 실행, 데이터 처리, 상태 관리, 건강 평가, 시각화 모듈로 나뉘어 있다.

 

  • scripts/ → 실행 관련 파일 (main.py, main_runserver.py 등) 및 로그 파일 저장
  • src/ → 주요 소스 코드 폴더

stat_dataline/ → 데이터 관련 관리 패키지

search_status_flag.py → DataFilterManager 포함

load_database.py, logger_config.py 등 데이터 로드 및 로깅 관련 모듈

stat_healthchecker/ → 건강 평가 알고리즘 관련 패키지

stat_model/ → 모델 관련 패키지

stat_visualizer/ → 시각화 관련 패키지

 

# 스케줄 설정: 3일에 한 번씩 데이터 가져오기
schedule.every(1).days.at("15:02").do(schedule_health_assessment)

# 스케줄 지속 실행
while True:
    print("스케줄 시작")
    schedule.run_pending()
    time.sleep(1)

이 코드는 스케줄링을 통해 일정 주기로 schedule_health_assessment() 함수를 실행하는 기능을 한다.

  • schedule.every(1).days.at("15:02").do(schedule_health_assessment) → 매일 오후 3시 2분에 schedule_health_assessment() 실행
  • while True: → 무한 루프를 돌며 스케줄을 지속적으로 확인
  • schedule.run_pending() → 예약된 작업이 있으면 실행
  • time.sleep(1) → 1초 대기 후 다시 스케줄 확인

즉, 설정된 시간마다 자동으로 데이터를 가져와 건강 평가를 수행하는 자동화 프로세스이다.

 

전체 흐름 정리

데이터 필터링 → 선박별 그룹화 → 평가 조건 확인 → 건강 평가 알고리즘 적용.
이를 통해 특정 날짜 범위의 데이터를 선별하고, 선박 운영 상태를 체계적으로 분석할 수 있도록 설계됨.