import numpy as np import pandas as pd import matplotlib.pyplot as plt from datetime import datetime from algorithms import detect_outliers_3sigma, detect_outliers_iqr, detect_outliers_grubbs, detect_outliers_gesd class OutlierDetector: def __init__(self): self.data = None self.results = None self.current_method = None def load_data(self, file_path): """加载时间序列数据""" try: self.data = pd.read_csv(file_path, parse_dates=['timestamp']) self.data.set_index('timestamp', inplace=True) return True, "数据加载成功" except Exception as e: return False, f"数据加载失败: {str(e)}" def detect_outliers(self, method, column, **kwargs): """使用指定方法检测离群点""" if self.data is None: return False, "请先加载数据" if column not in self.data.columns: return False, f"列'{column}'不存在" series = self.data[column].dropna() if len(series) == 0: return False, "所选列没有有效数据" self.current_method = method try: if method == '3sigma': results = detect_outliers_3sigma(series, **kwargs) elif method == 'iqr': results = detect_outliers_iqr(series, **kwargs) elif method == 'grubbs': results = detect_outliers_grubbs(series, **kwargs) elif method == 'gesd': results = detect_outliers_gesd(series, **kwargs) else: return False, "不支持的检测方法" self.results = results return True, "离群点检测成功" except Exception as e: return False, f"离群点检测失败: {str(e)}" def get_detection_results(self): """获取检测结果""" if self.results is None: return None return { 'method': self.current_method, 'series_name': self.results['series_name'], 'upper_bound': self.results['upper_bound'], 'lower_bound': self.results['lower_bound'], 'mean': self.results['mean'], 'outliers': self.results['outliers'], 'outlier_indices': self.results['outlier_indices'] } def plot_results(self): """绘制检测结果""" if self.results is None: return False, "没有可用的检测结果" plt.figure(figsize=(12, 6)) # 绘制原始数据 plt.plot(self.results['series'], 'b-', label='原始数据') # 绘制均值线 plt.axhline(self.results['mean'], color='g', linestyle='--', label='均值') # 绘制边界线 plt.axhline(self.results['upper_bound'], color='r', linestyle='--', label='上边界') if 'lower_bound' in self.results: plt.axhline(self.results['lower_bound'], color='r', linestyle='--', label='下边界') # 标记离群点 outlier_dates = self.results['series'].index[self.results['outlier_indices']] outlier_values = self.results['series'].iloc[self.results['outlier_indices']] plt.plot(outlier_dates, outlier_values, 'ro', markersize=8, label='离群点') plt.title(f"离群点检测 - {self.current_method.upper()}方法") plt.xlabel('时间') plt.ylabel('数值') plt.legend() plt.grid(True) plt.tight_layout() return True, plt