# ## 1.特征值聚类分析:评估聚类结果的质量和是否存在退化。 # # 轮廓系数的范围是[-1, 1],接近1表示聚类效果好, # # 肘部法则通过改变聚类的数量,计算不同聚类数的总内部平方和,然后选择一个点,在这个点之后WCSS下降的速率明显减慢,这个点就是合适的聚类数。 # # Calinski-Harabasz指数衡量聚类间的分离度和聚类内的紧密度。CH指数越大,表示聚类效果越好。 # # 戴维森堡丁指数衡量聚类内样本的相似度和聚类间样本的不相似度。DB指数越小,表示聚类效果越好。 from flask import Flask, request, jsonify from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score import pandas as pd import matplotlib import matplotlib.pyplot as plt import os import requests app = Flask(__name__) @app.route('/cluster_analysis', methods=['POST']) def cluster_analysis(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') df = pd.read_csv('{0}'.format(file_url)) # 应用 KMeans 聚类 kmeans = KMeans(n_clusters=2, random_state=0).fit(df) df['cluster'] = kmeans.labels_ # 肘部法则 wcss = [] for i in range(1, 11): # 尝试1到10个聚类 kmeans = KMeans(n_clusters=i, init='k-means++', max_iter=300, n_init=10, random_state=0) kmeans.fit(df) wcss.append(kmeans.inertia_) # 可视化肘部法则 plt.figure() plt.plot(range(1, 11), wcss) plt.title('Elbow Method') plt.xlabel('Number of clusters') plt.ylabel('WCSS') plt.savefig(result_img_path) # 保存图片 # 计算轮廓系数 silhouette_avg = silhouette_score(df, kmeans.labels_) # 计算戴维森堡丁指数 db_index = davies_bouldin_score(df, kmeans.labels_) # 计算Calinski-Harabasz指数 ch_index = calinski_harabasz_score(df, kmeans.labels_) # 创建一个包含计算结果的 DataFrame results_df = pd.DataFrame({ 'Metric': ['Silhouette Coefficient', 'Davies-Bouldin Index', 'Calinski-Harabasz Index'], 'Value': [silhouette_avg, db_index, ch_index] }) # 保存结果到 CSV 文件 results_df.to_csv(result_file_path, index=False) # 返回结果文件路径给客户端 return jsonify({ 'code': 200, 'msg': 'Cluster analysis completed', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) ## 2.基于统计的方法来评估退化:计算DataFrame中每一列的均值和标准差。并绘制每一列特征值的直方图 def statistical_degradation(df): means = df.mean() std_devs = df.std() return means, std_devs def save_to_csv(means, std_devs, file_path): stats_df = pd.DataFrame({'Mean': means, 'Standard Deviation': std_devs}) stats_df.to_csv(file_path, index=False) def plot_feature_distribution(df, means, std_devs,result_img_path): fig, axes = plt.subplots(nrows=len(df.columns), ncols=1, figsize=(10, 5 * len(df.columns))) for i, (column, ax) in enumerate(zip(df.columns, axes)): df[column].plot(kind='hist', ax=ax, bins=20, alpha=0.5) ax.axvline(means[i], color='r', linestyle='--', label=f'Mean: {means[i]:.2f}') ax.axvline(means[i] - std_devs[i], color='g', linestyle='-', label=f'-1 Std Dev') ax.axvline(means[i] + std_devs[i], color='g', linestyle='-') ax.set_title(f'Distribution of {column}') ax.legend(loc='upper left') if i != len(df.columns) - 1: ax.xaxis.set_visible(False) if i == 0: ax.set_ylabel('Frequency') else: ax.yaxis.set_visible(False) fig.tight_layout() plt.savefig(result_img_path) @app.route('/analyze_features', methods=['POST']) def analyze_features(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') df = pd.read_csv('{0}'.format(file_url)) # 计算统计数据 means, std_devs = statistical_degradation(df) save_to_csv(means, std_devs, result_file_path) # 绘制特征值分布图 plot_feature_distribution(df, means, std_devs,result_img_path) # 返回结果文件路径给客户端 return jsonify({ 'code': 200, 'msg': 'Feature analysis completed', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # # ## 3.基于趋势分析法:分析特征值随时间的趋势,如果斜率显著不为零,则可能表明退化。 from flask import Flask, request, jsonify from sklearn.linear_model import LinearRegression def trend_analysis(df, some_threshold): results_list = [] for column in df.columns: X = np.arange(len(df[column])).reshape(-1, 1) y = df[column].values model = LinearRegression().fit(X, y) slope = model.coef_[0] results_list.append({'Feature': column, 'Slope': slope, 'Significant': abs(slope) > some_threshold}) results_df = pd.DataFrame(results_list) return results_df @app.route('/trend_analysis', methods=['POST']) def trend_analysis_endpoint(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') df = pd.read_csv('{0}'.format(file_url)) # 设置斜率显著性的阈值 some_threshold = 0.01 # 进行趋势分析并获取结果 slopes = trend_analysis(df, some_threshold) # 保存结果到CSV文件 slopes.to_csv(result_file_path, index=False) # 筛选出显著的特征 significant_columns = slopes[slopes['Significant'] == True]['Feature'] num_features = len(significant_columns) # 动态设置图形的高度,每个子图的高度为4英寸 plt.figure(figsize=(15, 4 * num_features)) # 总宽度15英寸,高度根据特征数量自适应 for i, column in enumerate(significant_columns): plt.subplot(num_features, 1, i+1) # 创建子图 plt.scatter(range(len(df)), df[column], label='Data') significant_slope = slopes[slopes['Feature'] == column]['Slope'].values[0] plt.plot(range(len(df)), significant_slope * np.arange(len(df)) + df[column].iloc[0], color='red', label=f'Trend line with slope {significant_slope:.4f}') plt.xlabel('Time') plt.ylabel(column) plt.title(f'Trend Analysis for {column}') plt.legend() plt.tight_layout() # 调整子图布局以避免重叠 plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 # 返回结果文件路径和趋势图路径给客户端 return jsonify({ 'code': 200, 'msg': 'Trend analysis completed', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # # ## 4.基于时间序列分析的方法:识别特征值的周期性变化或异常模式。 from statsmodels.tsa.arima.model import ARIMA import warnings def time_series_degradation_multicolumn(df): aic_values = pd.Series() for column in df.columns: data = df[column].values model = ARIMA(data, order=(1, 1, 1)) with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=FutureWarning) results = model.fit() aic_value = results.aic # print(f"AIC for {column}:", aic_value) # 可在Flask日志中输出此信息 aic_values[column] = aic_value return aic_values @app.route('/time_series_analysis', methods=['POST']) def time_series_analysis(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') df = pd.read_csv('{0}'.format(file_url)) # 进行多变量时间序列退化评估 aic_values = time_series_degradation_multicolumn(df) # 将AIC值保存到CSV文件 aic_values.to_csv(result_file_path, index=True) # 选择AIC值最高的前N个特征 N = 10 top_features = aic_values.sort_values(ascending=False).index[:N] # 设置图形和子图的布局 num_features = len(top_features) fig, axes = plt.subplots(num_features, 1, figsize=(10, 4 * num_features), sharex=True) for i, column in enumerate(top_features): df[column].plot(ax=axes[i], label=column) axes[i].set_title(f'Time Series Plot for {column}') axes[i].set_xlabel('Time') axes[i].set_ylabel(column) axes[i].legend() # 如果只有一个特征,axes可能不是数组,需要检查并相应地调整 if num_features == 1: axes.legend() plt.tight_layout() plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 # 返回结果文件路径和时间序列图路径给客户端 return jsonify({ 'code': 200, 'msg': 'Time series analysis completed', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # #5.频域分析:通过傅里叶变换分析信号的频率成分,识别异常频率成分可能表明的退化。 from flask import Flask, request, jsonify import numpy as np import pandas as pd import matplotlib.pyplot as plt import requests def perform_fft_analysis(df): n_columns = df.shape[1] fig, axes = plt.subplots(n_columns, 1, figsize=(10, 4 * n_columns)) fft_results = [] for i, column in enumerate(df.columns): data = df[column].values fft = np.fft.fft(data) frequencies = np.fft.fftfreq(len(data), d=1) peak_frequency_index = np.argmax(np.abs(fft)) peak_frequency = frequencies[peak_frequency_index] peak_amplitude = np.abs(fft[peak_frequency_index]) axes[i].plot(frequencies, np.abs(fft)) axes[i].set_title(f'Frequency Spectrum of {column}') axes[i].set_xlabel('Frequency (Hz)') axes[i].set_ylabel('Amplitude') axes[i].grid(True) fft_results.append({ 'Feature': column, 'Peak Frequency (Hz)': peak_frequency, 'Peak Amplitude': peak_amplitude }) plt.tight_layout() plt.subplots_adjust(hspace=0.5) return fig, fft_results @app.route('/fft_analysis', methods=['POST']) def fft_analysis(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') df = pd.read_csv('{0}'.format(file_url)) # 执行FFT分析 fig, fft_results = perform_fft_analysis(df) # 保存图形 plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 # 将FFT结果保存到CSV fft_df = pd.DataFrame(fft_results) fft_df.to_csv(result_file_path, index=False) # 返回结果文件路径和FFT图路径给客户端 return jsonify({ 'code': 200, 'msg': 'FFT analysis completed', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) if __name__ == '__main__': app.run(debug=True, port=10005, host='0.0.0.0')