From 207553457fced025b84ad1bcb6a998aed19c1857 Mon Sep 17 00:00:00 2001 From: ljy <473960544@qq.com> Date: Fri, 13 Jun 2025 10:38:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 1.py | 98 ++++++++++++++++++++++++++++++++++++++++ 2.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 243 insertions(+) create mode 100644 1.py create mode 100644 2.py diff --git a/1.py b/1.py new file mode 100644 index 0000000..327e5e5 --- /dev/null +++ b/1.py @@ -0,0 +1,98 @@ +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +from datetime import datetime +from algorithms import detect_outliers_3sigma, detect_outliers_iqr, detect_outliers_grubbs, detect_outliers_gesd + +class OutlierDetector: + def __init__(self): + self.data = None + self.results = None + self.current_method = None + + def load_data(self, file_path): + """加载时间序列数据""" + try: + self.data = pd.read_csv(file_path, parse_dates=['timestamp']) + self.data.set_index('timestamp', inplace=True) + return True, "数据加载成功" + except Exception as e: + return False, f"数据加载失败: {str(e)}" + + def detect_outliers(self, method, column, **kwargs): + """使用指定方法检测离群点""" + if self.data is None: + return False, "请先加载数据" + + if column not in self.data.columns: + return False, f"列'{column}'不存在" + + series = self.data[column].dropna() + if len(series) == 0: + return False, "所选列没有有效数据" + + self.current_method = method + + try: + if method == '3sigma': + results = detect_outliers_3sigma(series, **kwargs) + elif method == 'iqr': + results = detect_outliers_iqr(series, **kwargs) + elif method == 'grubbs': + results = detect_outliers_grubbs(series, **kwargs) + elif method == 'gesd': + results = detect_outliers_gesd(series, **kwargs) + else: + return False, "不支持的检测方法" + + self.results = results + return True, "离群点检测成功" + except Exception as e: + return False, f"离群点检测失败: {str(e)}" + + def get_detection_results(self): + """获取检测结果""" + if self.results is None: + return None + + return { + 'method': self.current_method, + 'series_name': self.results['series_name'], + 'upper_bound': self.results['upper_bound'], + 'lower_bound': self.results['lower_bound'], + 'mean': self.results['mean'], + 'outliers': self.results['outliers'], + 'outlier_indices': self.results['outlier_indices'] + } + + def plot_results(self): + """绘制检测结果""" + if self.results is None: + return False, "没有可用的检测结果" + + plt.figure(figsize=(12, 6)) + + # 绘制原始数据 + plt.plot(self.results['series'], 'b-', label='原始数据') + + # 绘制均值线 + plt.axhline(self.results['mean'], color='g', linestyle='--', label='均值') + + # 绘制边界线 + plt.axhline(self.results['upper_bound'], color='r', linestyle='--', label='上边界') + if 'lower_bound' in self.results: + plt.axhline(self.results['lower_bound'], color='r', linestyle='--', label='下边界') + + # 标记离群点 + outlier_dates = self.results['series'].index[self.results['outlier_indices']] + outlier_values = self.results['series'].iloc[self.results['outlier_indices']] + plt.plot(outlier_dates, outlier_values, 'ro', markersize=8, label='离群点') + + plt.title(f"离群点检测 - {self.current_method.upper()}方法") + plt.xlabel('时间') + plt.ylabel('数值') + plt.legend() + plt.grid(True) + plt.tight_layout() + + return True, plt \ No newline at end of file diff --git a/2.py b/2.py new file mode 100644 index 0000000..5ed5f46 --- /dev/null +++ b/2.py @@ -0,0 +1,145 @@ +import numpy as np +from scipy import stats +from collections import defaultdict + +def detect_outliers_3sigma(series, threshold=3): + """3倍标准差法检测离群点""" + mean = np.mean(series) + std = np.std(series) + + upper_bound = mean + threshold * std + lower_bound = mean - threshold * std + + outliers = (series > upper_bound) | (series < lower_bound) + outlier_indices = np.where(outliers)[0] + + return { + 'series': series, + 'series_name': series.name if series.name else '序列', + 'mean': mean, + 'std': std, + 'upper_bound': upper_bound, + 'lower_bound': lower_bound, + 'outliers': series[outliers], + 'outlier_indices': outlier_indices, + 'threshold': threshold + } + +def detect_outliers_iqr(series, k=1.5): + """四分位数法检测离群点""" + q1 = series.quantile(0.25) + q3 = series.quantile(0.75) + iqr = q3 - q1 + + upper_bound = q3 + k * iqr + lower_bound = q1 - k * iqr + + outliers = (series > upper_bound) | (series < lower_bound) + outlier_indices = np.where(outliers)[0] + + return { + 'series': series, + 'series_name': series.name if series.name else '序列', + 'q1': q1, + 'q3': q3, + 'iqr': iqr, + 'upper_bound': upper_bound, + 'lower_bound': lower_bound, + 'outliers': series[outliers], + 'outlier_indices': outlier_indices, + 'k': k + } + +def detect_outliers_grubbs(series, alpha=0.05): + """Grubbs法检测离群点""" + values = series.values + n = len(values) + outlier_indices = [] + + while True: + if n <= 2: + break + + mean = np.mean(values) + std = np.std(values) + abs_dev = np.abs(values - mean) + max_idx = np.argmax(abs_dev) + g = abs_dev[max_idx] / std + + t = stats.t.ppf(1 - alpha / (2 * n), n - 2) + critical = (n - 1) / np.sqrt(n) * np.sqrt(t**2 / (n - 2 + t**2)) + + if g > critical: + outlier_indices.append(series.index.get_loc(series.index[max_idx])) + values = np.delete(values, max_idx) + n -= 1 + else: + break + + upper_bound = mean + critical * std + lower_bound = mean - critical * std + + return { + 'series': series, + 'series_name': series.name if series.name else '序列', + 'mean': mean, + 'std': std, + 'upper_bound': upper_bound, + 'lower_bound': lower_bound, + 'outliers': series[outlier_indices], + 'outlier_indices': outlier_indices, + 'alpha': alpha + } + +def detect_outliers_gesd(series, alpha=0.05, max_outliers=None): + """GESD (广义极端学生化偏差) 方法检测离群点""" + values = series.copy() + n = len(values) + if max_outliers is None: + max_outliers = n // 10 # 默认最多检测10%的数据点为离群点 + + outlier_indices = [] + r_values = [] + lambda_values = [] + + for i in range(1, max_outliers + 1): + mean = np.mean(values) + std = np.std(values) + abs_dev = np.abs(values - mean) + max_idx = np.argmax(abs_dev) + r = abs_dev[max_idx] / std + r_values.append(r) + + p = 1 - alpha / (2 * (n - i + 1)) + t = stats.t.ppf(p, n - i - 1) + lambda_val = (n - i) * t / np.sqrt((n - i - 1 + t**2) * (n - i + 1)) + lambda_values.append(lambda_val) + + if r > lambda_val: + original_idx = series.index.get_loc(values.index[max_idx]) + outlier_indices.append(original_idx) + values = values.drop(values.index[max_idx]) + else: + break + + if len(outlier_indices) > 0: + upper_bound = series.iloc[outlier_indices].max() + 0.1 * series.std() + lower_bound = series.iloc[outlier_indices].min() - 0.1 * series.std() + else: + upper_bound = series.mean() + 3 * series.std() + lower_bound = series.mean() - 3 * series.std() + + return { + 'series': series, + 'series_name': series.name if series.name else '序列', + 'mean': np.mean(series), + 'std': np.std(series), + 'upper_bound': upper_bound, + 'lower_bound': lower_bound, + 'outliers': series[outlier_indices], + 'outlier_indices': outlier_indices, + 'alpha': alpha, + 'max_outliers': max_outliers, + 'r_values': r_values, + 'lambda_values': lambda_values + } \ No newline at end of file