# #扩充1--随机采样:是一种简单直接的数据增强方法,通过从现有数据中随机选择样本来增加数据集的大小。对于不平衡的数据集,可以通过有偏的采样来平衡类别,即对少数类的样本进行过采样,对多数类的样本进行欠采样。 import pandas as pd import numpy as np def extend_data_with_ordered_sampling(df, expansion_ratio=0.2): """ 通过有序采样的方式扩充整个数据集,不包括时间列(如果存在)。 :param df: 原始数据帧。 :param expansion_ratio: 扩充数据的比例,即新数据占原始数据的比例。 :return: 扩充后的数据帧。 """ # 检查是否存在 'Time' 列,如果存在,则删除 if 'Time' in df.columns: df = df.drop(columns=['Time']) # 计算需要扩充的样本数量 n_samples = int(len(df) * expansion_ratio) # 对剩余数据进行排序(假设df已经是按照时间排序的,如果没有排序需要添加排序逻辑) df_sorted = df.sort_index() # 按照顺序取出一部分数据作为扩充数据,不打乱顺序 sampled_data = df_sorted.tail(n_samples).copy() # 合并原始数据帧与采样数据帧 final_data = pd.concat([df, sampled_data], ignore_index=True) return final_data # 读取数据 data_path = r'D:\验收材料\空工大-装备性能退化评估和故障预测健康管理软件\里程碑最终算法\01补全\源代码\补全后的数据.csv' data = pd.read_csv(data_path) expansion_ratio = 0.2 # 扩充数据的比例 try: extended_data = extend_data_with_ordered_sampling(data, expansion_ratio) extended_data.to_csv('扩充数据-随机采样.csv', index=False) except Exception as e: print(f"发生错误: {e}") # #扩充2--数据扰动:通过在原始数据上添加小的随机扰动来生成新的数据点。这种方法适用于数值型数据,可以帮助模型学习到更加泛化的特征表示。 # import numpy as np # import pandas as pd # # def add_random_perturbation(series, sigma): # """ # 对数值型序列添加随机扰动。 # """ # return series + np.random.normal(0, sigma, size=len(series)) # # def extend_data_with_perturbation(df, sigma, expansion_ratio): # """ # 对数据帧中的数值型列添加随机扰动,并扩充数据。 # """ # # 检查是否存在 'Time' 列,如果存在,则删除 # if 'Time' in df.columns: # df = df.drop(columns=['Time']) # # numerical_columns = df.select_dtypes(include=[np.number]).columns # extended_data = df.copy() # # for col in numerical_columns: # extended_data[col] = add_random_perturbation(df[col], sigma) # # # 计算扩充的数据量 # n_samples = int(len(df) * expansion_ratio) # # # 扩充数据 # expanded_data = extended_data.iloc[-n_samples:].copy() # # # 合并原始数据和扩充数据 # final_data = pd.concat([df, expanded_data], ignore_index=True) # # return final_data # # # 读取数据 # data_path = r'E:\BaiduSyncdisk\zhiguan\01最近做的\算法调试\自己调试--Y\里程碑最终算法\01补全\源代码\补全后的数据-无time.csv' # data = pd.read_csv(data_path) # # sigma = 0.05 # 扰动的标准差 # expansion_ratio = 0.2 # 扩充数据的比例 # # try: # extended_data = extend_data_with_perturbation(data, sigma, expansion_ratio) # extended_data.to_csv('扩充后的数据-数据扰动.csv', index=False) # except Exception as e: # print(f"发生错误: {e}") # #扩充3--Wavelet变换:可以将信号分解成不同频率的子信号,然后可以通过对这些子信号进行处理来实现数据扩充。 # import numpy as np # import pandas as pd # import pywt # # def wavelet_transform(series, wavelet='db1', level=1): # """ # 对一维数值数据进行小波变换。 # """ # return pywt.wavedec(series, wavelet, level=level) # # def wavelet_reconstruct(coeffs, wavelet='db1'): # """ # 使用小波变换后的系数重构数据。 # """ # return pywt.waverec(coeffs, wavelet) # # def perturb_coeffs(coeffs, sigma): # """ # 对小波变换的系数进行扰动。 # """ # perturbed_coeffs = [] # for coeff in coeffs: # # 对细节系数进行扰动,近似系数保持不变 # if np.issubdtype(coeff.dtype, np.number): # perturbed_coeff = coeff + sigma * np.random.randn(*coeff.shape) # else: # perturbed_coeff = coeff # perturbed_coeffs.append(perturbed_coeff) # return perturbed_coeffs # # def extend_data_with_wavelet(df, wavelet='db1', level=1, sigma=0.05, expansion_ratio=0.2): # # 检查是否存在 'Time' 列,如果存在,则删除 # if 'Time' in df.columns: # df = df.drop(columns=['Time']) # # numerical_columns = df.select_dtypes(include=[np.number]).columns # extended_data = df.copy() # # for col in numerical_columns: # coeffs = wavelet_transform(df[col], wavelet, level) # perturbed_coeffs = perturb_coeffs(coeffs, sigma) # reconstructed_series = wavelet_reconstruct(perturbed_coeffs, wavelet) # extended_data[col] = reconstructed_series[:len(df[col])] # 保持原数据长度 # # # 计算扩充的数据量 # n_samples = int(len(df) * expansion_ratio) # # # 扩充数据 # expanded_data = extended_data.iloc[-n_samples:].copy() # # # 合并原始数据和扩充数据 # final_data = pd.concat([extended_data, expanded_data], ignore_index=True) # # return final_data # # # 读取数据 # data_path = r'E:\BaiduSyncdisk\zhiguan\01最近做的\算法调试\自己调试--Y\里程碑最终算法\01补全\源代码\补全后的数据-无time.csv' # data = pd.read_csv(data_path) # # wavelet = 'db1' # 选择小波基 # level = 1 # 分解层数 # sigma = 0.05 # 扰动的标准差 # expansion_ratio = 0.2 # 扩充数据的比例 # # try: # extended_data = extend_data_with_wavelet(data, wavelet, level, sigma, expansion_ratio) # extended_data.to_csv('扩充后的数据-wavelet.csv', index=False) # except Exception as e: # print(f"发生错误: {e}") # #扩充四--小波系数扰动:对小波变换后的系数进行扰动,即在系数中加入随机噪声。这是通过在系数上加上标准差为sigma的高斯随机数实现的 # import numpy as np # import pandas as pd # import pywt # # def wavelet_transform(series, wavelet='db1', level=1): # return pywt.wavedec(series, wavelet, level=level) # # def wavelet_reconstruct(coeffs, wavelet='db1'): # return pywt.waverec(coeffs, wavelet) # # def perturb_coeffs(coeffs, sigma): # perturbed_coeffs = [] # for coeff in coeffs: # if np.issubdtype(coeff.dtype, np.number): # perturbed_coeff = coeff + sigma * np.random.randn(*coeff.shape) # else: # perturbed_coeff = coeff # perturbed_coeffs.append(perturbed_coeff) # return perturbed_coeffs # # def enhance_or_reduce(coeffs, factor): # """ # 对小波变换后的高频系数进行增强或衰减。 # """ # enhanced_coeffs = [] # for i, coeff in enumerate(coeffs): # # 细节系数从索引1开始,我们对其进行增强或衰减 # if i > 0: # enhanced_coeffs.append(coeff * factor) # else: # enhanced_coeffs.append(coeff) # return enhanced_coeffs # # def extend_data_with_wavelet(df, wavelet='db1', level=1, sigma=0.05, expansion_ratio=0.2): # # 检查是否存在 'Time' 列,如果存在,则删除 # if 'Time' in df.columns: # df = df.drop(columns=['Time']) # # numerical_columns = df.select_dtypes(include=[np.number]).columns # extended_data = df.copy() # # for col in numerical_columns: # coeffs = wavelet_transform(df[col], wavelet, level) # perturbed_coeffs = perturb_coeffs(coeffs, sigma) # enhanced_coeffs = enhance_or_reduce(perturbed_coeffs, factor=1.1) # 增强高频系数 # reconstructed_series = wavelet_reconstruct(enhanced_coeffs, wavelet) # extended_data[col] = reconstructed_series # # # 计算扩充的数据量 # n_samples = int(len(df) * expansion_ratio) # # # 扩充数据 # expanded_data = extended_data.iloc[-n_samples:].copy() # # # 合并原始数据和扩充数据 # final_data = pd.concat([df, expanded_data], axis=0, ignore_index=True) # # return final_data # # # data = pd.read_csv(r'E:\BaiduSyncdisk\zhiguan\01最近做的\算法调试\自己调试--Y\里程碑最终算法\01补全\源代码\补全后的数据-无time.csv') # wavelet = 'db1' # 选择小波基 # level = 1 # 分解层数 # sigma = 0.05 # 扰动的标准差 # expansion_ratio = 0.2 # 扩充数据的比例 # # try: # extended_data = extend_data_with_wavelet(data, wavelet, level, sigma, expansion_ratio) # extended_data.to_csv('扩充后的数据-Wavelet变换.csv', index=False) # except Exception as e: # print(f"发生错误: {e}") # #扩充5:小波线性插值 # import numpy as np # import pandas as pd # import pywt # # def wavelet_transform(series, wavelet='db1', level=1): # """ # 对一维数值数据进行小波变换。 # """ # return pywt.wavedec(series, wavelet, level=level) # # def wavelet_reconstruct(coeffs, wavelet='db1'): # """ # 使用小波变换后的系数重构数据。 # """ # return pywt.waverec(coeffs, wavelet) # # def perturb_coeffs(coeffs, sigma): # """ # 对小波变换的系数进行扰动。 # """ # perturbed_coeffs = [] # for coeff in coeffs: # if np.issubdtype(coeff.dtype, np.number): # perturbed_coeff = coeff + sigma * np.random.randn(*coeff.shape) # else: # perturbed_coeff = coeff # perturbed_coeffs.append(perturbed_coeff) # return perturbed_coeffs # # def interpolate_coeffs(coeffs, new_length): # """ # 对小波变换的系数进行线性插值。 # """ # interpolated_coeffs = [] # for coeff in coeffs: # if new_length: # coeff = np.interp(np.arange(new_length), np.arange(len(coeff)), coeff) # interpolated_coeffs.append(coeff) # return interpolated_coeffs # # def extend_data_with_wavelet(df, wavelet='db1', level=1, sigma=0.05, expansion_ratio=0.2, new_length=None): # # 检查是否存在 'Time' 列,如果存在,则删除 # if 'Time' in df.columns: # df = df.drop(columns=['Time']) # # numerical_columns = df.select_dtypes(include=[np.number]).columns # extended_data = df.copy() # # for col in numerical_columns: # coeffs = wavelet_transform(df[col], wavelet, level) # perturbed_coeffs = perturb_coeffs(coeffs, sigma) # if new_length is not None: # perturbed_coeffs = interpolate_coeffs(perturbed_coeffs, new_length) # reconstructed_series = wavelet_reconstruct(perturbed_coeffs, wavelet) # extended_data[col] = reconstructed_series[:len(df[col])] # 确保数据长度一致 # # # 计算扩充的数据量 # n_samples = int(len(df) * expansion_ratio) # # # 扩充数据 # expanded_data = extended_data.iloc[-n_samples:].copy() # # # 合并原始数据和扩充数据 # final_data = pd.concat([extended_data, expanded_data], ignore_index=True) # # return final_data # # # 读取数据 # data = pd.read_csv(r'E:\BaiduSyncdisk\zhiguan\01最近做的\算法调试\自己调试--Y\里程碑最终算法\01补全\源代码\补全后的数据-无time.csv') # # wavelet = 'db1' # 选择小波基 # level = 1 # 分解层数 # sigma = 0.05 # 扰动的标准差 # expansion_ratio = 0.2 # 扩充数据的比例 # new_length = None # 设置新的数据长度,如果需要 # # try: # extended_data = extend_data_with_wavelet(data, wavelet, level, sigma, expansion_ratio, new_length) # extended_data.to_csv('扩充后的数据-小波线性.csv', index=False) # except Exception as e: # print(f"发生错误: {e}")