去噪.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. #开发时间:2024/8/28 10:56
  2. #开发时间:2024/6/12 9:36
  3. import numpy as np
  4. import pandas as pd
  5. import matplotlib.pyplot as plt
  6. from matplotlib import rcParams
  7. from flask import Flask, request, jsonify
  8. from threading import Lock
  9. # 移动平均法
  10. def moving_average(signal, window_size):
  11. return np.convolve(signal, np.ones(window_size) / window_size, mode='same')
  12. app = Flask(__name__)
  13. lock = Lock()
  14. @app.route('/moving_average', methods=['POST'])
  15. def set_file_address1():
  16. try:
  17. # 检查请求体中是否包含文件地址
  18. data = request.json
  19. if 'file_url' not in data:
  20. return jsonify({'code': 400, 'msg': 'No file URL provided'})
  21. file_url = request.json.get('file_url')
  22. result_file_path = request.json.get('result_file_path')
  23. result_img_path = request.json.get('result_img_path')
  24. # 读取数据集
  25. noisy_signal = pd.read_csv('{0}'.format(file_url))
  26. plt.rcParams['xtick.direction'] = 'in'
  27. plt.rcParams['ytick.direction'] = 'in'
  28. plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
  29. plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
  30. config = {
  31. "font.family": 'serif',
  32. "font.size": 20,
  33. "mathtext.fontset": 'stix',
  34. "font.serif": ['Times New Roman'], # 宋体
  35. 'axes.unicode_minus': False # 处理负号
  36. }
  37. rcParams.update(config)
  38. # 检查是否包含时间列
  39. if 'Time' in noisy_signal.columns:
  40. time_column = noisy_signal['Time'].values.reshape(-1, 1)
  41. other_columns = noisy_signal.drop(columns='Time')
  42. else:
  43. time_column = None
  44. other_columns = noisy_signal
  45. # 应用去噪方法
  46. window_size = 20 # 窗口宽度
  47. denoised_signals = pd.DataFrame()
  48. for column in other_columns.columns:
  49. denoised_signals[column] = moving_average(other_columns[column].values, window_size)
  50. # 如果有时间列,将其添加回去
  51. if time_column is not None:
  52. denoised_signals.insert(0, 'time', time_column)
  53. # 保存去噪后的信号到CSV文件
  54. denoised_signals.to_csv(result_file_path, index=False)
  55. # 绘图
  56. plt.figure(figsize=(12, 10))
  57. # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
  58. plt.subplot(2, 1, 1)
  59. plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
  60. plt.legend()
  61. plt.subplot(2, 1, 2)
  62. plt.plot(denoised_signals.iloc[:, 1], label='Moving Average')
  63. plt.legend()
  64. plt.tight_layout()
  65. plt.savefig(result_img_path)
  66. plt.close() # 关闭图形以释放资源
  67. return jsonify({
  68. 'code': 200,
  69. 'msg': 'moving_average_denoise completed'
  70. })
  71. except Exception as e:
  72. return jsonify({
  73. 'code': 500,
  74. 'msg': str(e)
  75. })
  76. # 傅里叶变换去噪
  77. from scipy.fft import fft, ifft
  78. def fft_denoise(signal, cutoff_freq, fs):
  79. signal_fft = fft(signal)
  80. frequencies = np.fft.fftfreq(len(signal), 1 / fs)
  81. signal_fft[np.abs(frequencies) > cutoff_freq] = 0
  82. return ifft(signal_fft).real
  83. @app.route('/fft_denoise', methods=['POST'])
  84. def set_file_address2():
  85. try:
  86. # 检查请求体中是否包含文件地址
  87. data = request.json
  88. if 'file_url' not in data:
  89. return jsonify({'code': 400, 'msg': 'No file URL provided'})
  90. file_url = request.json.get('file_url')
  91. result_file_path = request.json.get('result_file_path')
  92. result_img_path = request.json.get('result_img_path')
  93. # 读取数据集
  94. noisy_signal = pd.read_csv('{0}'.format(file_url))
  95. plt.rcParams['xtick.direction'] = 'in'
  96. plt.rcParams['ytick.direction'] = 'in'
  97. plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
  98. plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
  99. config = {
  100. "font.family": 'serif',
  101. "font.size": 20,
  102. "mathtext.fontset": 'stix',
  103. "font.serif": ['Times New Roman'], # 宋体
  104. 'axes.unicode_minus': False # 处理负号
  105. }
  106. rcParams.update(config)
  107. # 读取信号
  108. fs = 1000
  109. # 检查是否包含时间列
  110. if 'Time' in noisy_signal.columns:
  111. time_column = noisy_signal['Time'].values.reshape(-1, 1)
  112. other_columns = noisy_signal.drop(columns='Time')
  113. else:
  114. time_column = None
  115. other_columns = noisy_signal
  116. # 应用去噪方法
  117. cutoff_freq = 10 # 截止频率
  118. denoised_signals = pd.DataFrame()
  119. for column in other_columns.columns:
  120. denoised_signals[column] = fft_denoise(other_columns[column].values, cutoff_freq, fs)
  121. # 如果有时间列,将其添加回去
  122. if time_column is not None:
  123. denoised_signals.insert(0, 'Time', time_column)
  124. # 保存去噪后的信号到CSV文件
  125. denoised_signals.to_csv(result_file_path, index=False)
  126. # 绘图
  127. plt.figure(figsize=(12, 10))
  128. # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
  129. plt.subplot(2, 1, 1)
  130. plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
  131. plt.legend()
  132. plt.subplot(2, 1, 2)
  133. plt.plot(denoised_signals.iloc[:, 1], label='FFT Denoised')
  134. plt.legend()
  135. plt.tight_layout()
  136. plt.savefig(result_img_path)
  137. plt.close() # 关闭图形以释放资源
  138. return jsonify({
  139. 'code': 200,
  140. 'msg': 'fft_denoise completed'
  141. })
  142. except Exception as e:
  143. return jsonify({
  144. 'code': 500,
  145. 'msg': str(e)
  146. })
  147. # 卡尔曼滤波去噪
  148. import numpy as np
  149. import pandas as pd
  150. from filterpy.kalman import KalmanFilter
  151. import matplotlib.pyplot as plt
  152. from matplotlib import rcParams
  153. from flask import Flask, request, jsonify
  154. def kalman_denoise(signal):
  155. kf = KalmanFilter(dim_x=2, dim_z=1)
  156. kf.x = np.array([0., 0.])
  157. kf.F = np.array([[1., 1.],
  158. [0., 1.]])
  159. kf.H = np.array([[1., 0.]])
  160. kf.P *= 1000.
  161. kf.R = 5
  162. kf.Q = np.array([[1e-5, 0.],
  163. [0., 1e-5]])
  164. filtered_signal = []
  165. for z in signal:
  166. kf.predict()
  167. kf.update(z)
  168. filtered_signal.append(kf.x[0])
  169. return np.array(filtered_signal)
  170. @app.route('/kalman_denoise', methods=['POST'])
  171. def set_file_address3():
  172. try:
  173. # 检查请求体中是否包含文件地址
  174. data = request.json
  175. if 'file_url' not in data:
  176. return jsonify({'code': 400, 'msg': 'No file URL provided'})
  177. file_url = request.json.get('file_url')
  178. result_file_path = request.json.get('result_file_path')
  179. result_img_path = request.json.get('result_img_path')
  180. # 读取数据集
  181. noisy_signal = pd.read_csv('{0}'.format(file_url))
  182. plt.rcParams['xtick.direction'] = 'in'
  183. plt.rcParams['ytick.direction'] = 'in'
  184. plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
  185. plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
  186. config = {
  187. "font.family": 'serif',
  188. "font.size": 20,
  189. "mathtext.fontset": 'stix',
  190. "font.serif": ['Times New Roman'], # 宋体
  191. 'axes.unicode_minus': False # 处理负号
  192. }
  193. rcParams.update(config)
  194. # 检查是否包含时间列
  195. if 'Time' in noisy_signal.columns:
  196. time_column = noisy_signal['Time'].values.reshape(-1, 1)
  197. other_columns = noisy_signal.drop(columns='Time')
  198. else:
  199. time_column = None
  200. other_columns = noisy_signal
  201. # 应用去噪方法
  202. denoised_signals = pd.DataFrame()
  203. for column in other_columns.columns:
  204. denoised_signals[column] = kalman_denoise(other_columns[column].values)
  205. # 如果有时间列,将其添加回去
  206. if time_column is not None:
  207. denoised_signals.insert(0, 'time', time_column)
  208. # 保存去噪后的信号到CSV文件
  209. denoised_signals.to_csv(result_file_path, index=False)
  210. # 绘图
  211. plt.figure(figsize=(12, 10))
  212. # 绘制原始信号和去噪后信号(这里只绘制第一列作为示例)
  213. plt.subplot(2, 1, 1)
  214. plt.plot(noisy_signal.iloc[:, 1], label='Noisy Signal')
  215. plt.legend()
  216. plt.subplot(2, 1, 2)
  217. plt.plot(denoised_signals.iloc[:, 1], label='Kalman Filtered')
  218. plt.legend()
  219. plt.tight_layout()
  220. plt.savefig(result_img_path)
  221. plt.close() # 关闭图形以释放资源
  222. return jsonify({
  223. 'code': 200,
  224. 'msg': 'kalman_denoise completed'
  225. })
  226. except Exception as e:
  227. return jsonify({
  228. 'code': 500,
  229. 'msg': str(e)
  230. })
  231. if __name__ == '__main__':
  232. app.run(port=10004, debug=True, use_reloader=False)