上传文件至 /
This commit is contained in:
commit
207553457f
98
1.py
Normal file
98
1.py
Normal file
@ -0,0 +1,98 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
from datetime import datetime
|
||||
from algorithms import detect_outliers_3sigma, detect_outliers_iqr, detect_outliers_grubbs, detect_outliers_gesd
|
||||
|
||||
class OutlierDetector:
|
||||
def __init__(self):
|
||||
self.data = None
|
||||
self.results = None
|
||||
self.current_method = None
|
||||
|
||||
def load_data(self, file_path):
|
||||
"""加载时间序列数据"""
|
||||
try:
|
||||
self.data = pd.read_csv(file_path, parse_dates=['timestamp'])
|
||||
self.data.set_index('timestamp', inplace=True)
|
||||
return True, "数据加载成功"
|
||||
except Exception as e:
|
||||
return False, f"数据加载失败: {str(e)}"
|
||||
|
||||
def detect_outliers(self, method, column, **kwargs):
|
||||
"""使用指定方法检测离群点"""
|
||||
if self.data is None:
|
||||
return False, "请先加载数据"
|
||||
|
||||
if column not in self.data.columns:
|
||||
return False, f"列'{column}'不存在"
|
||||
|
||||
series = self.data[column].dropna()
|
||||
if len(series) == 0:
|
||||
return False, "所选列没有有效数据"
|
||||
|
||||
self.current_method = method
|
||||
|
||||
try:
|
||||
if method == '3sigma':
|
||||
results = detect_outliers_3sigma(series, **kwargs)
|
||||
elif method == 'iqr':
|
||||
results = detect_outliers_iqr(series, **kwargs)
|
||||
elif method == 'grubbs':
|
||||
results = detect_outliers_grubbs(series, **kwargs)
|
||||
elif method == 'gesd':
|
||||
results = detect_outliers_gesd(series, **kwargs)
|
||||
else:
|
||||
return False, "不支持的检测方法"
|
||||
|
||||
self.results = results
|
||||
return True, "离群点检测成功"
|
||||
except Exception as e:
|
||||
return False, f"离群点检测失败: {str(e)}"
|
||||
|
||||
def get_detection_results(self):
|
||||
"""获取检测结果"""
|
||||
if self.results is None:
|
||||
return None
|
||||
|
||||
return {
|
||||
'method': self.current_method,
|
||||
'series_name': self.results['series_name'],
|
||||
'upper_bound': self.results['upper_bound'],
|
||||
'lower_bound': self.results['lower_bound'],
|
||||
'mean': self.results['mean'],
|
||||
'outliers': self.results['outliers'],
|
||||
'outlier_indices': self.results['outlier_indices']
|
||||
}
|
||||
|
||||
def plot_results(self):
|
||||
"""绘制检测结果"""
|
||||
if self.results is None:
|
||||
return False, "没有可用的检测结果"
|
||||
|
||||
plt.figure(figsize=(12, 6))
|
||||
|
||||
# 绘制原始数据
|
||||
plt.plot(self.results['series'], 'b-', label='原始数据')
|
||||
|
||||
# 绘制均值线
|
||||
plt.axhline(self.results['mean'], color='g', linestyle='--', label='均值')
|
||||
|
||||
# 绘制边界线
|
||||
plt.axhline(self.results['upper_bound'], color='r', linestyle='--', label='上边界')
|
||||
if 'lower_bound' in self.results:
|
||||
plt.axhline(self.results['lower_bound'], color='r', linestyle='--', label='下边界')
|
||||
|
||||
# 标记离群点
|
||||
outlier_dates = self.results['series'].index[self.results['outlier_indices']]
|
||||
outlier_values = self.results['series'].iloc[self.results['outlier_indices']]
|
||||
plt.plot(outlier_dates, outlier_values, 'ro', markersize=8, label='离群点')
|
||||
|
||||
plt.title(f"离群点检测 - {self.current_method.upper()}方法")
|
||||
plt.xlabel('时间')
|
||||
plt.ylabel('数值')
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.tight_layout()
|
||||
|
||||
return True, plt
|
145
2.py
Normal file
145
2.py
Normal file
@ -0,0 +1,145 @@
|
||||
import numpy as np
|
||||
from scipy import stats
|
||||
from collections import defaultdict
|
||||
|
||||
def detect_outliers_3sigma(series, threshold=3):
|
||||
"""3倍标准差法检测离群点"""
|
||||
mean = np.mean(series)
|
||||
std = np.std(series)
|
||||
|
||||
upper_bound = mean + threshold * std
|
||||
lower_bound = mean - threshold * std
|
||||
|
||||
outliers = (series > upper_bound) | (series < lower_bound)
|
||||
outlier_indices = np.where(outliers)[0]
|
||||
|
||||
return {
|
||||
'series': series,
|
||||
'series_name': series.name if series.name else '序列',
|
||||
'mean': mean,
|
||||
'std': std,
|
||||
'upper_bound': upper_bound,
|
||||
'lower_bound': lower_bound,
|
||||
'outliers': series[outliers],
|
||||
'outlier_indices': outlier_indices,
|
||||
'threshold': threshold
|
||||
}
|
||||
|
||||
def detect_outliers_iqr(series, k=1.5):
|
||||
"""四分位数法检测离群点"""
|
||||
q1 = series.quantile(0.25)
|
||||
q3 = series.quantile(0.75)
|
||||
iqr = q3 - q1
|
||||
|
||||
upper_bound = q3 + k * iqr
|
||||
lower_bound = q1 - k * iqr
|
||||
|
||||
outliers = (series > upper_bound) | (series < lower_bound)
|
||||
outlier_indices = np.where(outliers)[0]
|
||||
|
||||
return {
|
||||
'series': series,
|
||||
'series_name': series.name if series.name else '序列',
|
||||
'q1': q1,
|
||||
'q3': q3,
|
||||
'iqr': iqr,
|
||||
'upper_bound': upper_bound,
|
||||
'lower_bound': lower_bound,
|
||||
'outliers': series[outliers],
|
||||
'outlier_indices': outlier_indices,
|
||||
'k': k
|
||||
}
|
||||
|
||||
def detect_outliers_grubbs(series, alpha=0.05):
|
||||
"""Grubbs法检测离群点"""
|
||||
values = series.values
|
||||
n = len(values)
|
||||
outlier_indices = []
|
||||
|
||||
while True:
|
||||
if n <= 2:
|
||||
break
|
||||
|
||||
mean = np.mean(values)
|
||||
std = np.std(values)
|
||||
abs_dev = np.abs(values - mean)
|
||||
max_idx = np.argmax(abs_dev)
|
||||
g = abs_dev[max_idx] / std
|
||||
|
||||
t = stats.t.ppf(1 - alpha / (2 * n), n - 2)
|
||||
critical = (n - 1) / np.sqrt(n) * np.sqrt(t**2 / (n - 2 + t**2))
|
||||
|
||||
if g > critical:
|
||||
outlier_indices.append(series.index.get_loc(series.index[max_idx]))
|
||||
values = np.delete(values, max_idx)
|
||||
n -= 1
|
||||
else:
|
||||
break
|
||||
|
||||
upper_bound = mean + critical * std
|
||||
lower_bound = mean - critical * std
|
||||
|
||||
return {
|
||||
'series': series,
|
||||
'series_name': series.name if series.name else '序列',
|
||||
'mean': mean,
|
||||
'std': std,
|
||||
'upper_bound': upper_bound,
|
||||
'lower_bound': lower_bound,
|
||||
'outliers': series[outlier_indices],
|
||||
'outlier_indices': outlier_indices,
|
||||
'alpha': alpha
|
||||
}
|
||||
|
||||
def detect_outliers_gesd(series, alpha=0.05, max_outliers=None):
|
||||
"""GESD (广义极端学生化偏差) 方法检测离群点"""
|
||||
values = series.copy()
|
||||
n = len(values)
|
||||
if max_outliers is None:
|
||||
max_outliers = n // 10 # 默认最多检测10%的数据点为离群点
|
||||
|
||||
outlier_indices = []
|
||||
r_values = []
|
||||
lambda_values = []
|
||||
|
||||
for i in range(1, max_outliers + 1):
|
||||
mean = np.mean(values)
|
||||
std = np.std(values)
|
||||
abs_dev = np.abs(values - mean)
|
||||
max_idx = np.argmax(abs_dev)
|
||||
r = abs_dev[max_idx] / std
|
||||
r_values.append(r)
|
||||
|
||||
p = 1 - alpha / (2 * (n - i + 1))
|
||||
t = stats.t.ppf(p, n - i - 1)
|
||||
lambda_val = (n - i) * t / np.sqrt((n - i - 1 + t**2) * (n - i + 1))
|
||||
lambda_values.append(lambda_val)
|
||||
|
||||
if r > lambda_val:
|
||||
original_idx = series.index.get_loc(values.index[max_idx])
|
||||
outlier_indices.append(original_idx)
|
||||
values = values.drop(values.index[max_idx])
|
||||
else:
|
||||
break
|
||||
|
||||
if len(outlier_indices) > 0:
|
||||
upper_bound = series.iloc[outlier_indices].max() + 0.1 * series.std()
|
||||
lower_bound = series.iloc[outlier_indices].min() - 0.1 * series.std()
|
||||
else:
|
||||
upper_bound = series.mean() + 3 * series.std()
|
||||
lower_bound = series.mean() - 3 * series.std()
|
||||
|
||||
return {
|
||||
'series': series,
|
||||
'series_name': series.name if series.name else '序列',
|
||||
'mean': np.mean(series),
|
||||
'std': np.std(series),
|
||||
'upper_bound': upper_bound,
|
||||
'lower_bound': lower_bound,
|
||||
'outliers': series[outlier_indices],
|
||||
'outlier_indices': outlier_indices,
|
||||
'alpha': alpha,
|
||||
'max_outliers': max_outliers,
|
||||
'r_values': r_values,
|
||||
'lambda_values': lambda_values
|
||||
}
|
Loading…
Reference in New Issue
Block a user