# 1.基本统计特征 from flask import Flask, request, jsonify import pandas as pd from scipy.stats import skew, kurtosis import numpy as np import os import requests import math app = Flask(__name__) def ae_feature(raw, fs): mean = np.mean(raw) std_dev = np.std(raw) max_value = np.max(raw) min_value = np.min(raw) variance = np.var(raw) skewness = skew(raw) kurt = kurtosis(raw) range=np.max(raw) - np.min(raw) rms = np.sum(raw ** 2) / len(raw) ae_feature = pd.DataFrame() ae_feature['mean'] = [mean] ae_feature['std_dev'] = [std_dev] ae_feature['max_value'] = [max_value] ae_feature['min_value'] = [min_value] ae_feature['variance'] = [variance] ae_feature['skewness'] = [skewness] ae_feature['kurt'] = [kurt] ae_feature['range'] = [range] ae_feature['rms'] = [rms] return ae_feature @app.route('/statistical', methods=['POST']) def upload_file1(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') # 读取数据集 data = pd.read_csv('{0}'.format(file_url)) # 检查是否包含时间列 if 'Time' in data.columns: time_column = data['Time'] df = data.drop(columns='Time') else: time_column = None df = data fs = 300 # 设置提取特征值每组的数据量 all_features_df = pd.DataFrame() for i in range(int(df.shape[1])): for j in range(0, len(df), fs): end_index = min(j + fs, len(df)) data_test = np.array(df.iloc[j:end_index, i]) sa = ae_feature(data_test, fs) all_features_df = pd.concat([all_features_df, sa], ignore_index=True) # 将结果保存为CSV文件 all_features_df.to_csv(result_file_path, index=False) # 构建返回数据,只返回文件路径 return jsonify({ 'code': 200, 'msg': '文件处理完成', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) ## 2.时间域特征 from scipy import fftpack def time_features(raw, fs): n = len(raw) fft_raw = abs(fftpack.fft(raw, n) * 2 / n) main_freq_index = np.where(fft_raw[0:math.floor(n / 2)] == max(fft_raw[0:math.floor(n / 2)])) f = np.linspace(0, fs, n) * fs / n peak_freq = f[main_freq_index[0][0]] # 峰值频率,单位Hz xm = np.mean(raw) # 均值 xstd = np.std(raw) # 标准差 kur = ((np.sum((raw - xm) ** 4)) / len(raw)) / (xstd ** 4) # 峭度 cal_ave_amp = np.sum(np.abs(raw)) / n power_2 = np.power(raw, 2) sum_power = np.sum(power_2) cal_rms = np.sqrt(sum_power / n) cal_form = cal_rms / cal_ave_amp # 波峰因子,均方根/平均幅值 cal_peak = np.max(np.abs(raw)) # 峰值 cal_crest = cal_peak / cal_rms # 峰值因子 feature_df = pd.DataFrame() feature_df['peak_freq_khz'] = [peak_freq / 1000] # 转换为kHz feature_df['均值v'] = [xm] feature_df['标准差'] = [xstd] feature_df['峭度'] = [kur] feature_df['平均幅值'] = [cal_ave_amp] feature_df['均方根'] = [cal_rms] feature_df['波峰因子'] = [cal_form] feature_df['峰值'] = [cal_peak] feature_df['峰值因子'] = [cal_crest] return feature_df @app.route('/time', methods=['POST']) def upload_file2(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') # 读取数据集 df = pd.read_csv('{0}'.format(file_url)) fs = 300 # 每组数据量 all_features_df = pd.DataFrame() for i in range(int(df.shape[1])): for j in range(0, len(df.iloc[:, i]), fs): end_index = min(j + fs, len(df.iloc[:, i])) data_test = np.array(df.iloc[j:end_index, i]) sa = time_features(data_test, fs) all_features_df = pd.concat([all_features_df, sa], ignore_index=True) all_features_df.to_csv(result_file_path, index=False) # 构建返回数据,只返回文件路径 return jsonify({ 'code': 200, 'msg': '文件处理完成', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) ## 3.频域特征 def fe_feature(signal, fs): n = len(signal) freqs = fftfreq(n, d=1/fs) # 生成频率序列 signal_fft = fft(signal) # 计算FFT signal_fft_shifted = np.fft.fftshift(signal_fft) # 将FFT结果中心化 # 计算频域特征 total_energy = np.sum(np.abs(signal_fft)**2) total_power = total_energy / n rms = np.sqrt(total_power) freq_centroid = np.sum(np.abs(freqs) * np.abs(signal_fft_shifted)**2) / total_energy freq_bandwidth = np.sum(np.abs(freqs[1:]**2 * signal_fft_shifted[1:]**2) - np.abs(freqs[:-1]**2 * signal_fft_shifted[:-1]**2)) / (2 * np.sum(np.abs(signal_fft_shifted)**2)) peak_freq = (np.argmax(np.abs(signal_fft_shifted)) + 1) * fs / n # 峰值频率 freq_skewness = skew(freqs[1:n//2] * (np.abs(signal_fft[1:n//2]) / np.max(np.abs(signal_fft[1:n//2])))) freq_kurtosis = kurtosis(freqs[1:n//2] * (np.abs(signal_fft[1:n//2]) / np.max(np.abs(signal_fft[1:n//2])))) feature_df = pd.DataFrame({ '总能量': [total_energy], '总功率': [total_power], '均方根': [rms], '频率中心': [freq_centroid], '频率带宽': [freq_bandwidth], '峰值频率': [peak_freq], '频率偏度': [freq_skewness], '频率峰度': [freq_kurtosis] }) return feature_df @app.route('/frequency', methods=['POST']) def upload_file3(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') # 读取数据集 df = pd.read_csv('{0}'.format(file_url)) fs = 300 # 采样频率,根据实际情况调整 all_features_df = pd.DataFrame() for i in range(int(df.shape[1])): for j in range(0, len(df.iloc[:, i]), fs): end_index = min(j + fs, len(df.iloc[:, i])) data_test = df.iloc[j:end_index, i].to_numpy() fa = fe_feature(data_test, fs) all_features_df = pd.concat([all_features_df, fa], ignore_index=True) all_features_df.to_csv(result_file_path, index=False, header=True) # 构建返回数据,只返回文件路径 return jsonify({ 'code': 200, 'msg': '文件处理完成', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # #4.时频域特征 from scipy.stats import skew, kurtosis from scipy.fft import fft, fftfreq def tf_features(raw, fs): # 计算快速傅里叶变换 fft_values = fft(raw) fft_magnitude = np.abs(fft_values) # 计算频率向量 n = len(raw) freq = fftfreq(n, 1 / fs) # 时域特征 mean = np.mean(raw) std_dev = np.std(raw) max_value = np.max(raw) min_value = np.min(raw) variance = np.var(raw) skewness = skew(raw) kurt = kurtosis(raw) # 频域特征 power_spectrum = np.square(fft_magnitude) / n mean_freq = np.mean(freq[power_spectrum > 0]) std_freq = np.std(freq[power_spectrum > 0]) freq_skewness = skew(freq[power_spectrum > 0]) freq_kurt = kurtosis(freq[power_spectrum > 0]) peak_freq_index = np.argmax(power_spectrum) peak_freq = freq[peak_freq_index] bandwidth = np.max(freq[power_spectrum > 0]) - np.min(freq[power_spectrum > 0]) energy = np.sum(power_spectrum) feature_df = pd.DataFrame({ '均值': [mean], '标准差': [std_dev], '最大值': [max_value], '最小值': [min_value], '方差': [variance], '偏度': [skewness], '峰度': [kurt], '频率均值': [mean_freq], '频率标准差': [std_freq], '频率偏度': [freq_skewness], '频率峰度': [freq_kurt], '峰值频率': [peak_freq], '带宽': [bandwidth], '能量': [energy] }) return feature_df @app.route('/time-frequency', methods=['POST']) def upload_file4(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') # 读取数据集 df = pd.read_csv('{0}'.format(file_url)) fs = 300 # 采样频率 all_features_df = pd.DataFrame() for i in range(int(df.shape[1])): for j in range(0, len(df.iloc[:, i]), fs): end_index = min(j + fs, len(df.iloc[:, i])) data_test = df.iloc[j:end_index, i].to_numpy() fa = tf_features(data_test, fs) all_features_df = pd.concat([all_features_df, fa], ignore_index=True) all_features_df.to_csv(result_file_path, index=False, header=True) # 构建返回数据,只返回文件路径 return jsonify({ 'code': 200, 'msg': '文件处理完成', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) ##5.形态学特征提取 from flask import Flask, request, jsonify import pandas as pd from scipy.signal import find_peaks import numpy as np def morphological_features(raw): nonzero_elements = np.count_nonzero(raw) mean_slope = np.mean(np.diff(raw) / np.diff(np.arange(len(raw)))) peaks, _ = find_peaks(raw) num_peaks = len(peaks) troughs, _ = find_peaks(-raw) # 寻找谷值 num_troughs = len(troughs) # 假设峰值和谷值的宽度为相邻峰值或谷值之间的距离 peak_widths = np.diff(np.sort(peaks)[1:] - np.sort(peaks)[:-1]) if num_peaks > 1 else np.array([]) trough_widths = np.diff(np.sort(troughs)[1:] - np.sort(troughs)[:-1]) if num_troughs > 1 else np.array([]) # 计算曲率和加速度变化需要一阶和二阶导数 first_derivative = np.diff(raw) / np.diff(np.arange(len(raw))) second_derivative = np.diff(first_derivative) / np.diff(np.arange(len(raw) - 1)) curvature = np.sum(second_derivative ** 2) # 曲率的简单估计 acceleration_change = np.sum(np.abs(np.diff(first_derivative))) # 加速度变化的简单估计 feature_df = pd.DataFrame({ '非零元素数': [nonzero_elements], '平均斜率': [mean_slope], '峰值数量': [num_peaks], '谷值数量': [num_troughs], '平均峰宽': [np.mean(peak_widths)] if peak_widths.size else [0], '平均谷宽': [np.mean(trough_widths)] if trough_widths.size else [0], '曲率': [curvature], '加速度变化': [acceleration_change] }) return feature_df @app.route('/morphology', methods=['POST']) def upload_file5(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') # 读取数据集 df = pd.read_csv('{0}'.format(file_url)) all_features_df = pd.DataFrame() for i in range(int(df.shape[1])): # 假设数据是等间隔采样的,fs 不再作为函数参数 data_test = df.iloc[:, i].to_numpy() fa = morphological_features(data_test) all_features_df = pd.concat([all_features_df, fa], ignore_index=True) all_features_df.to_csv(result_file_path, index=False, header=True) # 构建返回数据,只返回文件路径 return jsonify({ 'code': 200, 'msg': '文件处理完成', }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) if __name__ == '__main__': app.run(debug=True, port=10002, host='0.0.0.0')