123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- # ## 1.特征值聚类分析:评估聚类结果的质量和是否存在退化。
- # # 轮廓系数的范围是[-1, 1],接近1表示聚类效果好,
- # # 肘部法则通过改变聚类的数量,计算不同聚类数的总内部平方和,然后选择一个点,在这个点之后WCSS下降的速率明显减慢,这个点就是合适的聚类数。
- # # Calinski-Harabasz指数衡量聚类间的分离度和聚类内的紧密度。CH指数越大,表示聚类效果越好。
- # # 戴维森堡丁指数衡量聚类内样本的相似度和聚类间样本的不相似度。DB指数越小,表示聚类效果越好。
- from flask import Flask, request, jsonify
- from sklearn.cluster import KMeans
- from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score
- import pandas as pd
- import matplotlib
- import matplotlib.pyplot as plt
- import os
- import requests
- app = Flask(__name__)
- @app.route('/cluster_analysis', methods=['POST'])
- def cluster_analysis():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- df = pd.read_csv('{0}'.format(file_url))
- # 应用 KMeans 聚类
- kmeans = KMeans(n_clusters=2, random_state=0).fit(df)
- df['cluster'] = kmeans.labels_
- # 肘部法则
- wcss = []
- for i in range(1, 11): # 尝试1到10个聚类
- kmeans = KMeans(n_clusters=i, init='k-means++', max_iter=300, n_init=10, random_state=0)
- kmeans.fit(df)
- wcss.append(kmeans.inertia_)
- # 可视化肘部法则
- plt.figure()
- plt.plot(range(1, 11), wcss)
- plt.title('Elbow Method')
- plt.xlabel('Number of clusters')
- plt.ylabel('WCSS')
- plt.savefig(result_img_path) # 保存图片
- # 计算轮廓系数
- silhouette_avg = silhouette_score(df, kmeans.labels_)
- # 计算戴维森堡丁指数
- db_index = davies_bouldin_score(df, kmeans.labels_)
- # 计算Calinski-Harabasz指数
- ch_index = calinski_harabasz_score(df, kmeans.labels_)
- # 创建一个包含计算结果的 DataFrame
- results_df = pd.DataFrame({
- 'Metric': ['Silhouette Coefficient', 'Davies-Bouldin Index', 'Calinski-Harabasz Index'],
- 'Value': [silhouette_avg, db_index, ch_index]
- })
- # 保存结果到 CSV 文件
- results_df.to_csv(result_file_path, index=False)
- # 返回结果文件路径给客户端
- return jsonify({
- 'code': 200,
- 'msg': 'Cluster analysis completed',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- ## 2.基于统计的方法来评估退化:计算DataFrame中每一列的均值和标准差。并绘制每一列特征值的直方图
- def statistical_degradation(df):
- means = df.mean()
- std_devs = df.std()
- return means, std_devs
- def save_to_csv(means, std_devs, file_path):
- stats_df = pd.DataFrame({'Mean': means, 'Standard Deviation': std_devs})
- stats_df.to_csv(file_path, index=False)
- def plot_feature_distribution(df, means, std_devs,result_img_path):
- fig, axes = plt.subplots(nrows=len(df.columns), ncols=1, figsize=(10, 5 * len(df.columns)))
- for i, (column, ax) in enumerate(zip(df.columns, axes)):
- df[column].plot(kind='hist', ax=ax, bins=20, alpha=0.5)
- ax.axvline(means[i], color='r', linestyle='--', label=f'Mean: {means[i]:.2f}')
- ax.axvline(means[i] - std_devs[i], color='g', linestyle='-', label=f'-1 Std Dev')
- ax.axvline(means[i] + std_devs[i], color='g', linestyle='-')
- ax.set_title(f'Distribution of {column}')
- ax.legend(loc='upper left')
- if i != len(df.columns) - 1:
- ax.xaxis.set_visible(False)
- if i == 0:
- ax.set_ylabel('Frequency')
- else:
- ax.yaxis.set_visible(False)
- fig.tight_layout()
- plt.savefig(result_img_path)
- @app.route('/analyze_features', methods=['POST'])
- def analyze_features():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- df = pd.read_csv('{0}'.format(file_url))
- # 计算统计数据
- means, std_devs = statistical_degradation(df)
- save_to_csv(means, std_devs, result_file_path)
- # 绘制特征值分布图
- plot_feature_distribution(df, means, std_devs,result_img_path)
- # 返回结果文件路径给客户端
- return jsonify({
- 'code': 200,
- 'msg': 'Feature analysis completed',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # # ## 3.基于趋势分析法:分析特征值随时间的趋势,如果斜率显著不为零,则可能表明退化。
- from flask import Flask, request, jsonify
- from sklearn.linear_model import LinearRegression
- def trend_analysis(df, some_threshold):
- results_list = []
- for column in df.columns:
- X = np.arange(len(df[column])).reshape(-1, 1)
- y = df[column].values
- model = LinearRegression().fit(X, y)
- slope = model.coef_[0]
- results_list.append({'Feature': column, 'Slope': slope, 'Significant': abs(slope) > some_threshold})
- results_df = pd.DataFrame(results_list)
- return results_df
- @app.route('/trend_analysis', methods=['POST'])
- def trend_analysis_endpoint():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- df = pd.read_csv('{0}'.format(file_url))
- # 设置斜率显著性的阈值
- some_threshold = 0.01
- # 进行趋势分析并获取结果
- slopes = trend_analysis(df, some_threshold)
- # 保存结果到CSV文件
- slopes.to_csv(result_file_path, index=False)
- # 筛选出显著的特征
- significant_columns = slopes[slopes['Significant'] == True]['Feature']
- num_features = len(significant_columns)
- # 动态设置图形的高度,每个子图的高度为4英寸
- plt.figure(figsize=(15, 4 * num_features)) # 总宽度15英寸,高度根据特征数量自适应
- for i, column in enumerate(significant_columns):
- plt.subplot(num_features, 1, i+1) # 创建子图
- plt.scatter(range(len(df)), df[column], label='Data')
- significant_slope = slopes[slopes['Feature'] == column]['Slope'].values[0]
- plt.plot(range(len(df)), significant_slope * np.arange(len(df)) + df[column].iloc[0],
- color='red', label=f'Trend line with slope {significant_slope:.4f}')
- plt.xlabel('Time')
- plt.ylabel(column)
- plt.title(f'Trend Analysis for {column}')
- plt.legend()
- plt.tight_layout() # 调整子图布局以避免重叠
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- # 返回结果文件路径和趋势图路径给客户端
- return jsonify({
- 'code': 200,
- 'msg': 'Trend analysis completed',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # # ## 4.基于时间序列分析的方法:识别特征值的周期性变化或异常模式。
- from statsmodels.tsa.arima.model import ARIMA
- import warnings
- def time_series_degradation_multicolumn(df):
- aic_values = pd.Series()
- for column in df.columns:
- data = df[column].values
- model = ARIMA(data, order=(1, 1, 1))
- with warnings.catch_warnings():
- warnings.filterwarnings('ignore', category=FutureWarning)
- results = model.fit()
- aic_value = results.aic
- # print(f"AIC for {column}:", aic_value) # 可在Flask日志中输出此信息
- aic_values[column] = aic_value
- return aic_values
- @app.route('/time_series_analysis', methods=['POST'])
- def time_series_analysis():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- df = pd.read_csv('{0}'.format(file_url))
- # 进行多变量时间序列退化评估
- aic_values = time_series_degradation_multicolumn(df)
- # 将AIC值保存到CSV文件
- aic_values.to_csv(result_file_path, index=True)
- # 选择AIC值最高的前N个特征
- N = 10
- top_features = aic_values.sort_values(ascending=False).index[:N]
- # 设置图形和子图的布局
- num_features = len(top_features)
- fig, axes = plt.subplots(num_features, 1, figsize=(10, 4 * num_features), sharex=True)
- for i, column in enumerate(top_features):
- df[column].plot(ax=axes[i], label=column)
- axes[i].set_title(f'Time Series Plot for {column}')
- axes[i].set_xlabel('Time')
- axes[i].set_ylabel(column)
- axes[i].legend()
- # 如果只有一个特征,axes可能不是数组,需要检查并相应地调整
- if num_features == 1:
- axes.legend()
- plt.tight_layout()
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- # 返回结果文件路径和时间序列图路径给客户端
- return jsonify({
- 'code': 200,
- 'msg': 'Time series analysis completed',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # #5.频域分析:通过傅里叶变换分析信号的频率成分,识别异常频率成分可能表明的退化。
- from flask import Flask, request, jsonify
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- import requests
- def perform_fft_analysis(df):
- n_columns = df.shape[1]
- fig, axes = plt.subplots(n_columns, 1, figsize=(10, 4 * n_columns))
- fft_results = []
- for i, column in enumerate(df.columns):
- data = df[column].values
- fft = np.fft.fft(data)
- frequencies = np.fft.fftfreq(len(data), d=1)
- peak_frequency_index = np.argmax(np.abs(fft))
- peak_frequency = frequencies[peak_frequency_index]
- peak_amplitude = np.abs(fft[peak_frequency_index])
- axes[i].plot(frequencies, np.abs(fft))
- axes[i].set_title(f'Frequency Spectrum of {column}')
- axes[i].set_xlabel('Frequency (Hz)')
- axes[i].set_ylabel('Amplitude')
- axes[i].grid(True)
- fft_results.append({
- 'Feature': column,
- 'Peak Frequency (Hz)': peak_frequency,
- 'Peak Amplitude': peak_amplitude
- })
- plt.tight_layout()
- plt.subplots_adjust(hspace=0.5)
- return fig, fft_results
- @app.route('/fft_analysis', methods=['POST'])
- def fft_analysis():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- df = pd.read_csv('{0}'.format(file_url))
- # 执行FFT分析
- fig, fft_results = perform_fft_analysis(df)
- # 保存图形
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- # 将FFT结果保存到CSV
- fft_df = pd.DataFrame(fft_results)
- fft_df.to_csv(result_file_path, index=False)
- # 返回结果文件路径和FFT图路径给客户端
- return jsonify({
- 'code': 200,
- 'msg': 'FFT analysis completed',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- if __name__ == '__main__':
- app.run(debug=True, port=10005, host='0.0.0.0')
|