98 lines
3.6 KiB
Python
98 lines
3.6 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
from datetime import datetime
|
|
from algorithms import detect_outliers_3sigma, detect_outliers_iqr, detect_outliers_grubbs, detect_outliers_gesd
|
|
|
|
class OutlierDetector:
|
|
def __init__(self):
|
|
self.data = None
|
|
self.results = None
|
|
self.current_method = None
|
|
|
|
def load_data(self, file_path):
|
|
"""加载时间序列数据"""
|
|
try:
|
|
self.data = pd.read_csv(file_path, parse_dates=['timestamp'])
|
|
self.data.set_index('timestamp', inplace=True)
|
|
return True, "数据加载成功"
|
|
except Exception as e:
|
|
return False, f"数据加载失败: {str(e)}"
|
|
|
|
def detect_outliers(self, method, column, **kwargs):
|
|
"""使用指定方法检测离群点"""
|
|
if self.data is None:
|
|
return False, "请先加载数据"
|
|
|
|
if column not in self.data.columns:
|
|
return False, f"列'{column}'不存在"
|
|
|
|
series = self.data[column].dropna()
|
|
if len(series) == 0:
|
|
return False, "所选列没有有效数据"
|
|
|
|
self.current_method = method
|
|
|
|
try:
|
|
if method == '3sigma':
|
|
results = detect_outliers_3sigma(series, **kwargs)
|
|
elif method == 'iqr':
|
|
results = detect_outliers_iqr(series, **kwargs)
|
|
elif method == 'grubbs':
|
|
results = detect_outliers_grubbs(series, **kwargs)
|
|
elif method == 'gesd':
|
|
results = detect_outliers_gesd(series, **kwargs)
|
|
else:
|
|
return False, "不支持的检测方法"
|
|
|
|
self.results = results
|
|
return True, "离群点检测成功"
|
|
except Exception as e:
|
|
return False, f"离群点检测失败: {str(e)}"
|
|
|
|
def get_detection_results(self):
|
|
"""获取检测结果"""
|
|
if self.results is None:
|
|
return None
|
|
|
|
return {
|
|
'method': self.current_method,
|
|
'series_name': self.results['series_name'],
|
|
'upper_bound': self.results['upper_bound'],
|
|
'lower_bound': self.results['lower_bound'],
|
|
'mean': self.results['mean'],
|
|
'outliers': self.results['outliers'],
|
|
'outlier_indices': self.results['outlier_indices']
|
|
}
|
|
|
|
def plot_results(self):
|
|
"""绘制检测结果"""
|
|
if self.results is None:
|
|
return False, "没有可用的检测结果"
|
|
|
|
plt.figure(figsize=(12, 6))
|
|
|
|
# 绘制原始数据
|
|
plt.plot(self.results['series'], 'b-', label='原始数据')
|
|
|
|
# 绘制均值线
|
|
plt.axhline(self.results['mean'], color='g', linestyle='--', label='均值')
|
|
|
|
# 绘制边界线
|
|
plt.axhline(self.results['upper_bound'], color='r', linestyle='--', label='上边界')
|
|
if 'lower_bound' in self.results:
|
|
plt.axhline(self.results['lower_bound'], color='r', linestyle='--', label='下边界')
|
|
|
|
# 标记离群点
|
|
outlier_dates = self.results['series'].index[self.results['outlier_indices']]
|
|
outlier_values = self.results['series'].iloc[self.results['outlier_indices']]
|
|
plt.plot(outlier_dates, outlier_values, 'ro', markersize=8, label='离群点')
|
|
|
|
plt.title(f"离群点检测 - {self.current_method.upper()}方法")
|
|
plt.xlabel('时间')
|
|
plt.ylabel('数值')
|
|
plt.legend()
|
|
plt.grid(True)
|
|
plt.tight_layout()
|
|
|
|
return True, plt |