123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #开发时间:2024/8/28 10:56
- #开发时间:2024/6/12 9:36
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- from matplotlib import rcParams
- from flask import Flask, request, jsonify
- from threading import Lock
- # 移动平均法
- def moving_average(signal, window_size):
- return np.convolve(signal, np.ones(window_size) / window_size, mode='same')
- app = Flask(__name__)
- lock = Lock()
- @app.route('/moving_average', methods=['POST'])
- def set_file_address1():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- # 读取数据集
- noisy_signal = pd.read_csv('{0}'.format(file_url))
- plt.rcParams['xtick.direction'] = 'in'
- plt.rcParams['ytick.direction'] = 'in'
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- config = {
- "font.family": 'serif',
- "font.size": 20,
- "mathtext.fontset": 'stix',
- "font.serif": ['Times New Roman'], # 宋体
- 'axes.unicode_minus': False # 处理负号
- }
- rcParams.update(config)
- # 检查是否包含时间列
- if 'Time' in noisy_signal.columns:
- time_column = noisy_signal['Time'].values.reshape(-1, 1)
- other_columns = noisy_signal.drop(columns='Time')
- else:
- time_column = None
- other_columns = noisy_signal
- # 应用去噪方法
- window_size = 20 # 窗口宽度
- denoised_signals = pd.DataFrame()
- for column in other_columns.columns:
- denoised_signals[column] = moving_average(other_columns[column].values, window_size)
- # 如果有时间列,将其添加回去
- if time_column is not None:
- denoised_signals.insert(0, 'time', time_column)
- # 保存去噪后的信号到CSV文件
- denoised_signals.to_csv(result_file_path, index=False)
- # 绘图
- plt.figure(figsize=(12, 10))
- # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
- plt.subplot(2, 1, 1)
- plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
- plt.legend()
- plt.subplot(2, 1, 2)
- plt.plot(denoised_signals.iloc[:, 1], label='Moving Average')
- plt.legend()
- plt.tight_layout()
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- return jsonify({
- 'code': 200,
- 'msg': 'moving_average_denoise completed'
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # 傅里叶变换去噪
- from scipy.fft import fft, ifft
- def fft_denoise(signal, cutoff_freq, fs):
- signal_fft = fft(signal)
- frequencies = np.fft.fftfreq(len(signal), 1 / fs)
- signal_fft[np.abs(frequencies) > cutoff_freq] = 0
- return ifft(signal_fft).real
- @app.route('/fft_denoise', methods=['POST'])
- def set_file_address2():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- # 读取数据集
- noisy_signal = pd.read_csv('{0}'.format(file_url))
- plt.rcParams['xtick.direction'] = 'in'
- plt.rcParams['ytick.direction'] = 'in'
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- config = {
- "font.family": 'serif',
- "font.size": 20,
- "mathtext.fontset": 'stix',
- "font.serif": ['Times New Roman'], # 宋体
- 'axes.unicode_minus': False # 处理负号
- }
- rcParams.update(config)
- # 读取信号
- fs = 1000
- # 检查是否包含时间列
- if 'Time' in noisy_signal.columns:
- time_column = noisy_signal['Time'].values.reshape(-1, 1)
- other_columns = noisy_signal.drop(columns='Time')
- else:
- time_column = None
- other_columns = noisy_signal
- # 应用去噪方法
- cutoff_freq = 10 # 截止频率
- denoised_signals = pd.DataFrame()
- for column in other_columns.columns:
- denoised_signals[column] = fft_denoise(other_columns[column].values, cutoff_freq, fs)
- # 如果有时间列,将其添加回去
- if time_column is not None:
- denoised_signals.insert(0, 'Time', time_column)
- # 保存去噪后的信号到CSV文件
- denoised_signals.to_csv(result_file_path, index=False)
- # 绘图
- plt.figure(figsize=(12, 10))
- # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
- plt.subplot(2, 1, 1)
- plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
- plt.legend()
- plt.subplot(2, 1, 2)
- plt.plot(denoised_signals.iloc[:, 1], label='FFT Denoised')
- plt.legend()
- plt.tight_layout()
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- return jsonify({
- 'code': 200,
- 'msg': 'fft_denoise completed'
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- # 卡尔曼滤波去噪
- import numpy as np
- import pandas as pd
- from filterpy.kalman import KalmanFilter
- import matplotlib.pyplot as plt
- from matplotlib import rcParams
- from flask import Flask, request, jsonify
- def kalman_denoise(signal):
- kf = KalmanFilter(dim_x=2, dim_z=1)
- kf.x = np.array([0., 0.])
- kf.F = np.array([[1., 1.],
- [0., 1.]])
- kf.H = np.array([[1., 0.]])
- kf.P *= 1000.
- kf.R = 5
- kf.Q = np.array([[1e-5, 0.],
- [0., 1e-5]])
- filtered_signal = []
- for z in signal:
- kf.predict()
- kf.update(z)
- filtered_signal.append(kf.x[0])
- return np.array(filtered_signal)
- @app.route('/kalman_denoise', methods=['POST'])
- def set_file_address3():
- try:
- # 检查请求体中是否包含文件地址
- data = request.json
- if 'file_url' not in data:
- return jsonify({'code': 400, 'msg': 'No file URL provided'})
- file_url = request.json.get('file_url')
- result_file_path = request.json.get('result_file_path')
- result_img_path = request.json.get('result_img_path')
- # 读取数据集
- noisy_signal = pd.read_csv('{0}'.format(file_url))
- plt.rcParams['xtick.direction'] = 'in'
- plt.rcParams['ytick.direction'] = 'in'
- plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
- plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
- config = {
- "font.family": 'serif',
- "font.size": 20,
- "mathtext.fontset": 'stix',
- "font.serif": ['Times New Roman'], # 宋体
- 'axes.unicode_minus': False # 处理负号
- }
- rcParams.update(config)
- # 检查是否包含时间列
- if 'Time' in noisy_signal.columns:
- time_column = noisy_signal['Time'].values.reshape(-1, 1)
- other_columns = noisy_signal.drop(columns='Time')
- else:
- time_column = None
- other_columns = noisy_signal
- # 应用去噪方法
- denoised_signals = pd.DataFrame()
- for column in other_columns.columns:
- denoised_signals[column] = kalman_denoise(other_columns[column].values)
- # 如果有时间列,将其添加回去
- if time_column is not None:
- denoised_signals.insert(0, 'time', time_column)
- # 保存去噪后的信号到CSV文件
- denoised_signals.to_csv(result_file_path, index=False)
- # 绘图
- plt.figure(figsize=(12, 10))
- # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
- plt.subplot(2, 1, 1)
- plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
- plt.legend()
- plt.subplot(2, 1, 2)
- plt.plot(denoised_signals.iloc[:, 1], label='Kalman Filtered')
- plt.legend()
- plt.tight_layout()
- plt.savefig(result_img_path)
- plt.close() # 关闭图形以释放资源
- return jsonify({
- 'code': 200,
- 'msg': 'kalman_denoise completed'
- })
- except Exception as e:
- return jsonify({
- 'code': 500,
- 'msg': str(e)
- })
- if __name__ == '__main__':
- app.run(port=10004, debug=True, use_reloader=False)
|