123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- # 1.基本统计特征
- from flask import Flask, request, jsonify
- import pandas as pd
- from scipy.stats import skew, kurtosis
- import numpy as np
- import os
- import requests
- import math
- app = Flask(__name__)
- def ae_feature(raw, fs):
- mean = np.mean(raw)
- std_dev = np.std(raw)
- max_value = np.max(raw)
- min_value = np.min(raw)
- variance = np.var(raw)
- skewness = skew(raw)
- kurt = kurtosis(raw)
- range=np.max(raw) - np.min(raw)
- rms = np.sum(raw ** 2) / len(raw)
- ae_feature = pd.DataFrame()
- ae_feature['mean'] = [mean]
- ae_feature['std_dev'] = [std_dev]
- ae_feature['max_value'] = [max_value]
- ae_feature['min_value'] = [min_value]
- ae_feature['variance'] = [variance]
- ae_feature['skewness'] = [skewness]
- ae_feature['kurt'] = [kurt]
- ae_feature['range'] = [range]
- ae_feature['rms'] = [rms]
- return ae_feature
- @app.route('/statistical', methods=['POST'])
- def upload_file1():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- # 读取数据集
- data = pd.read_csv('{0}'.format(file_url))
- # 检查是否包含时间列
- if 'Time' in data.columns:
- time_column = data['Time']
- df = data.drop(columns='Time')
- else:
- time_column = None
- df = data
- fs = 300 # 设置提取特征值每组的数据量
- all_features_df = pd.DataFrame()
- for i in range(int(df.shape[1])):
- for j in range(0, len(df), fs):
- end_index = min(j + fs, len(df))
- data_test = np.array(df.iloc[j:end_index, i])
- sa = ae_feature(data_test, fs)
- all_features_df = pd.concat([all_features_df, sa], ignore_index=True)
- # 将结果保存为CSV文件
- all_features_df.to_csv(result_file_path, index=False)
- # 构建返回数据,只返回文件路径
- return jsonify({
- 'code': 200,
- 'msg': '文件处理完成',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- ## 2.时间域特征
- from scipy import fftpack
- def time_features(raw, fs):
- n = len(raw)
- fft_raw = abs(fftpack.fft(raw, n) * 2 / n)
- main_freq_index = np.where(fft_raw[0:math.floor(n / 2)] == max(fft_raw[0:math.floor(n / 2)]))
- f = np.linspace(0, fs, n) * fs / n
- peak_freq = f[main_freq_index[0][0]] # 峰值频率,单位Hz
- xm = np.mean(raw) # 均值
- xstd = np.std(raw) # 标准差
- kur = ((np.sum((raw - xm) ** 4)) / len(raw)) / (xstd ** 4) # 峭度
- cal_ave_amp = np.sum(np.abs(raw)) / n
- power_2 = np.power(raw, 2)
- sum_power = np.sum(power_2)
- cal_rms = np.sqrt(sum_power / n)
- cal_form = cal_rms / cal_ave_amp # 波峰因子,均方根/平均幅值
- cal_peak = np.max(np.abs(raw)) # 峰值
- cal_crest = cal_peak / cal_rms # 峰值因子
- feature_df = pd.DataFrame()
- feature_df['peak_freq_khz'] = [peak_freq / 1000] # 转换为kHz
- feature_df['均值v'] = [xm]
- feature_df['标准差'] = [xstd]
- feature_df['峭度'] = [kur]
- feature_df['平均幅值'] = [cal_ave_amp]
- feature_df['均方根'] = [cal_rms]
- feature_df['波峰因子'] = [cal_form]
- feature_df['峰值'] = [cal_peak]
- feature_df['峰值因子'] = [cal_crest]
- return feature_df
- @app.route('/time', methods=['POST'])
- def upload_file2():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- # 读取数据集
- df = pd.read_csv('{0}'.format(file_url))
- fs = 300 # 每组数据量
- all_features_df = pd.DataFrame()
- for i in range(int(df.shape[1])):
- for j in range(0, len(df.iloc[:, i]), fs):
- end_index = min(j + fs, len(df.iloc[:, i]))
- data_test = np.array(df.iloc[j:end_index, i])
- sa = time_features(data_test, fs)
- all_features_df = pd.concat([all_features_df, sa], ignore_index=True)
- all_features_df.to_csv(result_file_path, index=False)
- # 构建返回数据,只返回文件路径
- return jsonify({
- 'code': 200,
- 'msg': '文件处理完成',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- ## 3.频域特征
- def fe_feature(signal, fs):
- n = len(signal)
- freqs = fftfreq(n, d=1/fs) # 生成频率序列
- signal_fft = fft(signal) # 计算FFT
- signal_fft_shifted = np.fft.fftshift(signal_fft) # 将FFT结果中心化
- # 计算频域特征
- total_energy = np.sum(np.abs(signal_fft)**2)
- total_power = total_energy / n
- rms = np.sqrt(total_power)
- freq_centroid = np.sum(np.abs(freqs) * np.abs(signal_fft_shifted)**2) / total_energy
- freq_bandwidth = np.sum(np.abs(freqs[1:]**2 * signal_fft_shifted[1:]**2) - np.abs(freqs[:-1]**2 * signal_fft_shifted[:-1]**2)) / (2 * np.sum(np.abs(signal_fft_shifted)**2))
- peak_freq = (np.argmax(np.abs(signal_fft_shifted)) + 1) * fs / n # 峰值频率
- freq_skewness = skew(freqs[1:n//2] * (np.abs(signal_fft[1:n//2]) / np.max(np.abs(signal_fft[1:n//2]))))
- freq_kurtosis = kurtosis(freqs[1:n//2] * (np.abs(signal_fft[1:n//2]) / np.max(np.abs(signal_fft[1:n//2]))))
- feature_df = pd.DataFrame({
- '总能量': [total_energy],
- '总功率': [total_power],
- '均方根': [rms],
- '频率中心': [freq_centroid],
- '频率带宽': [freq_bandwidth],
- '峰值频率': [peak_freq],
- '频率偏度': [freq_skewness],
- '频率峰度': [freq_kurtosis]
- })
- return feature_df
- @app.route('/frequency', methods=['POST'])
- def upload_file3():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- # 读取数据集
- df = pd.read_csv('{0}'.format(file_url))
- fs = 300 # 采样频率,根据实际情况调整
- all_features_df = pd.DataFrame()
- for i in range(int(df.shape[1])):
- for j in range(0, len(df.iloc[:, i]), fs):
- end_index = min(j + fs, len(df.iloc[:, i]))
- data_test = df.iloc[j:end_index, i].to_numpy()
- fa = fe_feature(data_test, fs)
- all_features_df = pd.concat([all_features_df, fa], ignore_index=True)
- all_features_df.to_csv(result_file_path, index=False, header=True)
- # 构建返回数据,只返回文件路径
- return jsonify({
- 'code': 200,
- 'msg': '文件处理完成',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # #4.时频域特征
- from scipy.stats import skew, kurtosis
- from scipy.fft import fft, fftfreq
- def tf_features(raw, fs):
- # 计算快速傅里叶变换
- fft_values = fft(raw)
- fft_magnitude = np.abs(fft_values)
- # 计算频率向量
- n = len(raw)
- freq = fftfreq(n, 1 / fs)
- # 时域特征
- mean = np.mean(raw)
- std_dev = np.std(raw)
- max_value = np.max(raw)
- min_value = np.min(raw)
- variance = np.var(raw)
- skewness = skew(raw)
- kurt = kurtosis(raw)
- # 频域特征
- power_spectrum = np.square(fft_magnitude) / n
- mean_freq = np.mean(freq[power_spectrum > 0])
- std_freq = np.std(freq[power_spectrum > 0])
- freq_skewness = skew(freq[power_spectrum > 0])
- freq_kurt = kurtosis(freq[power_spectrum > 0])
- peak_freq_index = np.argmax(power_spectrum)
- peak_freq = freq[peak_freq_index]
- bandwidth = np.max(freq[power_spectrum > 0]) - np.min(freq[power_spectrum > 0])
- energy = np.sum(power_spectrum)
- feature_df = pd.DataFrame({
- '均值': [mean],
- '标准差': [std_dev],
- '最大值': [max_value],
- '最小值': [min_value],
- '方差': [variance],
- '偏度': [skewness],
- '峰度': [kurt],
- '频率均值': [mean_freq],
- '频率标准差': [std_freq],
- '频率偏度': [freq_skewness],
- '频率峰度': [freq_kurt],
- '峰值频率': [peak_freq],
- '带宽': [bandwidth],
- '能量': [energy]
- })
- return feature_df
- @app.route('/time-frequency', methods=['POST'])
- def upload_file4():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- # 读取数据集
- df = pd.read_csv('{0}'.format(file_url))
- fs = 300 # 采样频率
- all_features_df = pd.DataFrame()
- for i in range(int(df.shape[1])):
- for j in range(0, len(df.iloc[:, i]), fs):
- end_index = min(j + fs, len(df.iloc[:, i]))
- data_test = df.iloc[j:end_index, i].to_numpy()
- fa = tf_features(data_test, fs)
- all_features_df = pd.concat([all_features_df, fa], ignore_index=True)
- all_features_df.to_csv(result_file_path, index=False, header=True)
- # 构建返回数据,只返回文件路径
- return jsonify({
- 'code': 200,
- 'msg': '文件处理完成',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- ##5.形态学特征提取
- from flask import Flask, request, jsonify
- import pandas as pd
- from scipy.signal import find_peaks
- import numpy as np
- def morphological_features(raw):
- nonzero_elements = np.count_nonzero(raw)
- mean_slope = np.mean(np.diff(raw) / np.diff(np.arange(len(raw))))
- peaks, _ = find_peaks(raw)
- num_peaks = len(peaks)
- troughs, _ = find_peaks(-raw) # 寻找谷值
- num_troughs = len(troughs)
- # 假设峰值和谷值的宽度为相邻峰值或谷值之间的距离
- peak_widths = np.diff(np.sort(peaks)[1:] - np.sort(peaks)[:-1]) if num_peaks > 1 else np.array([])
- trough_widths = np.diff(np.sort(troughs)[1:] - np.sort(troughs)[:-1]) if num_troughs > 1 else np.array([])
- # 计算曲率和加速度变化需要一阶和二阶导数
- first_derivative = np.diff(raw) / np.diff(np.arange(len(raw)))
- second_derivative = np.diff(first_derivative) / np.diff(np.arange(len(raw) - 1))
- curvature = np.sum(second_derivative ** 2) # 曲率的简单估计
- acceleration_change = np.sum(np.abs(np.diff(first_derivative))) # 加速度变化的简单估计
- feature_df = pd.DataFrame({
- '非零元素数': [nonzero_elements],
- '平均斜率': [mean_slope],
- '峰值数量': [num_peaks],
- '谷值数量': [num_troughs],
- '平均峰宽': [np.mean(peak_widths)] if peak_widths.size else [0],
- '平均谷宽': [np.mean(trough_widths)] if trough_widths.size else [0],
- '曲率': [curvature],
- '加速度变化': [acceleration_change]
- })
- return feature_df
- @app.route('/morphology', methods=['POST'])
- def upload_file5():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- # 读取数据集
- df = pd.read_csv('{0}'.format(file_url))
- all_features_df = pd.DataFrame()
- for i in range(int(df.shape[1])):
- # 假设数据是等间隔采样的,fs 不再作为函数参数
- data_test = df.iloc[:, i].to_numpy()
- fa = morphological_features(data_test)
- all_features_df = pd.concat([all_features_df, fa], ignore_index=True)
- all_features_df.to_csv(result_file_path, index=False, header=True)
- # 构建返回数据,只返回文件路径
- return jsonify({
- 'code': 200,
- 'msg': '文件处理完成',
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- if __name__ == '__main__':
- app.run(debug=True, port=10002, host='0.0.0.0')
|