#开发时间:2024/8/28 10:56 #开发时间:2024/6/12 9:36 import numpy as np import pandas as pd import matplotlib.pyplot as plt from matplotlib import rcParams from flask import Flask, request, jsonify from threading import Lock # 移动平均法 def moving_average(signal, window_size): return np.convolve(signal, np.ones(window_size) / window_size, mode='same') app = Flask(__name__) lock = Lock() @app.route('/moving_average', methods=['POST']) def set_file_address1(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') # 读取数据集 noisy_signal = pd.read_csv('{0}'.format(file_url)) plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 config = { "font.family": 'serif', "font.size": 20, "mathtext.fontset": 'stix', "font.serif": ['Times New Roman'], # 宋体 'axes.unicode_minus': False # 处理负号 } rcParams.update(config) # 检查是否包含时间列 if 'Time' in noisy_signal.columns: time_column = noisy_signal['Time'].values.reshape(-1, 1) other_columns = noisy_signal.drop(columns='Time') else: time_column = None other_columns = noisy_signal # 应用去噪方法 window_size = 20 # 窗口宽度 denoised_signals = pd.DataFrame() for column in other_columns.columns: denoised_signals[column] = moving_average(other_columns[column].values, window_size) # 如果有时间列,将其添加回去 if time_column is not None: denoised_signals.insert(0, 'time', time_column) # 保存去噪后的信号到CSV文件 denoised_signals.to_csv(result_file_path, index=False) # 绘图 plt.figure(figsize=(12, 10)) # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例) plt.subplot(2, 1, 1) plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal') plt.legend() plt.subplot(2, 1, 2) plt.plot(denoised_signals.iloc[:, 1], label='Moving Average') plt.legend() plt.tight_layout() plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 return jsonify({ 'code': 200, 'msg': 'moving_average_denoise completed' }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # 傅里叶变换去噪 from scipy.fft import fft, ifft def fft_denoise(signal, cutoff_freq, fs): signal_fft = fft(signal) frequencies = np.fft.fftfreq(len(signal), 1 / fs) signal_fft[np.abs(frequencies) > cutoff_freq] = 0 return ifft(signal_fft).real @app.route('/fft_denoise', methods=['POST']) def set_file_address2(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') # 读取数据集 noisy_signal = pd.read_csv('{0}'.format(file_url)) plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 config = { "font.family": 'serif', "font.size": 20, "mathtext.fontset": 'stix', "font.serif": ['Times New Roman'], # 宋体 'axes.unicode_minus': False # 处理负号 } rcParams.update(config) # 读取信号 fs = 1000 # 检查是否包含时间列 if 'Time' in noisy_signal.columns: time_column = noisy_signal['Time'].values.reshape(-1, 1) other_columns = noisy_signal.drop(columns='Time') else: time_column = None other_columns = noisy_signal # 应用去噪方法 cutoff_freq = 10 # 截止频率 denoised_signals = pd.DataFrame() for column in other_columns.columns: denoised_signals[column] = fft_denoise(other_columns[column].values, cutoff_freq, fs) # 如果有时间列,将其添加回去 if time_column is not None: denoised_signals.insert(0, 'Time', time_column) # 保存去噪后的信号到CSV文件 denoised_signals.to_csv(result_file_path, index=False) # 绘图 plt.figure(figsize=(12, 10)) # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例) plt.subplot(2, 1, 1) plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal') plt.legend() plt.subplot(2, 1, 2) plt.plot(denoised_signals.iloc[:, 1], label='FFT Denoised') plt.legend() plt.tight_layout() plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 return jsonify({ 'code': 200, 'msg': 'fft_denoise completed' }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) # 卡尔曼滤波去噪 import numpy as np import pandas as pd from filterpy.kalman import KalmanFilter import matplotlib.pyplot as plt from matplotlib import rcParams from flask import Flask, request, jsonify def kalman_denoise(signal): kf = KalmanFilter(dim_x=2, dim_z=1) kf.x = np.array([0., 0.]) kf.F = np.array([[1., 1.], [0., 1.]]) kf.H = np.array([[1., 0.]]) kf.P *= 1000. kf.R = 5 kf.Q = np.array([[1e-5, 0.], [0., 1e-5]]) filtered_signal = [] for z in signal: kf.predict() kf.update(z) filtered_signal.append(kf.x[0]) return np.array(filtered_signal) @app.route('/kalman_denoise', methods=['POST']) def set_file_address3(): try: # 检查请求体中是否包含文件地址 data = request.json if 'file_url' not in data: return jsonify({'code': 400, 'msg': 'No file URL provided'}) file_url = request.json.get('file_url') result_file_path = request.json.get('result_file_path') result_img_path = request.json.get('result_img_path') # 读取数据集 noisy_signal = pd.read_csv('{0}'.format(file_url)) plt.rcParams['xtick.direction'] = 'in' plt.rcParams['ytick.direction'] = 'in' plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 config = { "font.family": 'serif', "font.size": 20, "mathtext.fontset": 'stix', "font.serif": ['Times New Roman'], # 宋体 'axes.unicode_minus': False # 处理负号 } rcParams.update(config) # 检查是否包含时间列 if 'Time' in noisy_signal.columns: time_column = noisy_signal['Time'].values.reshape(-1, 1) other_columns = noisy_signal.drop(columns='Time') else: time_column = None other_columns = noisy_signal # 应用去噪方法 denoised_signals = pd.DataFrame() for column in other_columns.columns: denoised_signals[column] = kalman_denoise(other_columns[column].values) # 如果有时间列,将其添加回去 if time_column is not None: denoised_signals.insert(0, 'time', time_column) # 保存去噪后的信号到CSV文件 denoised_signals.to_csv(result_file_path, index=False) # 绘图 plt.figure(figsize=(12, 10)) # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例) plt.subplot(2, 1, 1) plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal') plt.legend() plt.subplot(2, 1, 2) plt.plot(denoised_signals.iloc[:, 1], label='Kalman Filtered') plt.legend() plt.tight_layout() plt.savefig(result_img_path) plt.close() # 关闭图形以释放资源 return jsonify({ 'code': 200, 'msg': 'kalman_denoise completed' }) except Exception as e: return jsonify({ 'code': 500, 'msg': str(e) }) if __name__ == '__main__': app.run(port=10004, debug=True, use_reloader=False)