# ## 1.PCA + K-means 聚类 # import matplotlib # matplotlib.use('Qt5Agg') import pandas as pd import matplotlib.pyplot as plt from sklearn.decomposition import PCA from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler # 读取CSV文件 '''df = pd.read_csv(r'/03特征提取/源代码/特征值文件-频域.csv') # 特征缩放 scaler = StandardScaler() X_scaled = scaler.fit_transform(df) # PCA降维 pca = PCA(n_components=2) X_pca = pca.fit_transform(X_scaled) # KMeans聚类 kmeans = KMeans(n_clusters=2, n_init=10, random_state=42) kmeans.fit(X_pca) # 定义颜色列表 colors = ['r', 'g', 'b'] # 为每个聚类指定颜色 # 绘制结果 fig, ax = plt.subplots(figsize=(15, 5)) # 使用1个子图显示结果 # 检查聚类数量是否超过颜色列表的长度 if kmeans.n_clusters <= len(colors): # 绘制聚类结果 for i in range(kmeans.n_clusters): cluster_members = X_pca[kmeans.labels_ == i] ax.scatter(cluster_members[:, 0], cluster_members[:, 1], c=colors[i], label=f'Cluster {i + 1}') # 绘制质心 ax.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1], s=300, c='k', marker='x', label='Centroids') # 设置图表标题和图例 ax.set_title('PCA and KMeans Clustering') ax.legend() plt.savefig('PCA and KMeans.jpg') plt.show() # 保存PCA结果和聚类标签到CSV pca_df = pd.DataFrame(data=X_pca, columns=['PCA1', 'PCA2']) pca_df['Cluster'] = kmeans.labels_ output_csv_path = '../../05故障预测/源代码/PCA_Results.csv' pca_df.to_csv(output_csv_path, index=False) else: print("聚类数量超过了预定义的颜色数量。")''' #2.自编码器(AutoEncoder): 一种无监督神经网络模型,通过学习输入数据的有效表示来进行特征学习。 import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from keras.layers import Input, Dense from keras.models import Model # 读取CSV文件 df = pd.read_csv('特征提取.csv') # 特征缩放 scaler = StandardScaler() X_scaled = scaler.fit_transform(df) # 定义自编码器架构 input_dim = X_scaled.shape[1] # 输入维度 encoding_dim = 8 # 编码维度,根据您的情况调整 # 输入层 input_layer = Input(shape=(input_dim,)) # 编码层 encoded = Dense(encoding_dim, activation='relu')(input_layer) # 解码层 decoded = Dense(input_dim, activation='sigmoid')(encoded) # 创建自编码器模型 autoencoder = Model(input_layer, decoded) # 编译模型 autoencoder.compile(optimizer='adam', loss='mean_squared_error') # 训练自编码器 autoencoder.fit(X_scaled, X_scaled, epochs=50, batch_size=256, shuffle=True) # 创建编码器和解码器模型 encoder = Model(input_layer, encoded) # 编码然后解码数据 X_encoded = encoder.predict(X_scaled) X_decoded = autoencoder.predict(X_encoded) # 计算重构误差 reconstruction_error = np.mean(np.square(X_scaled - X_decoded), axis=1) # 绘制重构误差的分布情况 plt.hist(reconstruction_error, bins=50, color='blue', alpha=0.7) plt.xlabel('Reconstruction Error') plt.ylabel('Frequency') plt.title('Reconstruction Error Distribution') plt.savefig('Reconstruction_Error.jpg') # plt.show() # 保存重构误差到CSV reconstruction_error_df = pd.DataFrame(reconstruction_error, columns=['Reconstruction_Error']) output_csv_path = 'Reconstruction_Error.csv' reconstruction_error_df.to_csv(output_csv_path, index=False) # 根据重构误差进行异常检测 # 假设大于均值加上2倍标准差的数据点为异常点 mean = np.mean(reconstruction_error) std = np.std(reconstruction_error) threshold = mean + 2 * std outliers = reconstruction_error > threshold # 打印异常点的索引 print("Indices of abnormal data points:", np.where(outliers)[0]) ##3.高斯混合模型(GMM): 假设数据由多个高斯分布混合而成,使用EM算法来估计每个高斯分布的参数,从而进行聚类。 ''' import pandas as pd import matplotlib matplotlib.use('Qt5Agg') import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.mixture import GaussianMixture from sklearn.decomposition import PCA # 读取CSV文件 df = pd.read_csv(r'/03特征提取/源代码/特征值文件-频域.csv') # 特征缩放 scaler = StandardScaler() X_scaled = scaler.fit_transform(df) # 假设数据只有两类正常和异常,我们先使用PCA降维以简化GMM的计算 pca = PCA(n_components=2) X_pca = pca.fit_transform(X_scaled) # GMM,我们假设有2个高斯分布(即2个簇) gmm = GaussianMixture(n_components=2, random_state=42) gmm.fit(X_pca) # 预测每个数据点属于每个组件的对数概率 log_prob = gmm.score_samples(X_pca) # 选择具有最高对数概率的组件作为每个数据点的聚类标签 predicted_labels = gmm.predict(X_pca) # 绘制结果 plt.figure(figsize=(15, 5)) # 使用GMM的聚类标签来为散点图上色 plt.scatter(X_pca[:, 0], X_pca[:, 1], c=predicted_labels, cmap=plt.cm.viridis, label=['Cluster 1', 'Cluster 2']) # 绘制质心 plt.scatter(gmm.means_[:, 0], gmm.means_[:, 1], s=300, c='k', marker='x', label='Centroids') plt.title('Gaussian Mixture Model Clustering') plt.legend() plt.savefig('GMM_Clustering.jpg') plt.show() # 保存GMM结果到CSV gmm_df = pd.DataFrame(data=X_pca, columns=['PCA1', 'PCA2']) gmm_df['Cluster'] = predicted_labels output_csv_path = 'GMM_Results.csv' gmm_df.to_csv(output_csv_path, index=False) #4.孤立森林(Isolation Forest): 一种异常检测算法,通过随机选择特征和切分值来“孤立”异常点。 import pandas as pd import numpy as np import matplotlib matplotlib.use('Qt5Agg') import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.ensemble import IsolationForest from sklearn.decomposition import PCA # 读取CSV文件 df = pd.read_csv(r'/03特征提取/源代码/特征值文件-频域.csv') # 特征缩放 scaler = StandardScaler() X_scaled = scaler.fit_transform(df) # PCA降维 pca = PCA(n_components=2) X_pca = pca.fit_transform(X_scaled) # 孤立森林异常检测 iforest = IsolationForest(n_estimators=100, contamination=0.1, random_state=42) iforest.fit(X_scaled) # 预测异常分数 scores = iforest.decision_function(X_scaled) # 将数据点标记为正常或异常 predicted_labels = (scores < np.median(scores)).astype(int) # 1 for normal, 0 for anomaly # 绘制结果 plt.figure(figsize=(15, 5)) # 为正常和异常数据分别指定颜色 normal_color = 'green' anomaly_color = 'red' # 使用不同的颜色绘制正常和异常点 scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=[normal_color if label else anomaly_color for label in predicted_labels], label=['Normal', 'Anomaly']) plt.title('Isolation Forest Anomaly Detection') plt.legend() plt.savefig('Isolation_Forest_Anomaly_Detection.jpg') plt.show() # 保存孤立森林结果到CSV iforest_df = pd.DataFrame({ 'PCA1': X_pca[:, 0], 'PCA2': X_pca[:, 1], 'Anomaly_Score': scores, 'Anomaly_Label': predicted_labels }) output_csv_path = 'Isolation_Forest_Results.csv' iforest_df.to_csv(output_csv_path, index=False) ##5.变分自编码器(Variational AutoEncoder, VAE): 一种生成模型,通过学习输入数据的概率表示来进行特征学习 # import matplotlib # matplotlib.use('Qt5Agg') import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA from keras.layers import Input, Dense, Lambda, Layer, Reshape from keras.models import Model from keras import backend as K from keras.losses import binary_crossentropy # 读取CSV文件 df = pd.read_csv(r'/03特征提取/源代码/特征值文件-频域.csv') # 特征缩放 scaler = StandardScaler() X_scaled = scaler.fit_transform(df) # VAE模型参数 input_dim = X_scaled.shape[1] # 输入数据的维度 intermediate_dim = 64 # 隐藏层的维度 batch_size = 128 # 训练时的批量大小 epochs = 50 # 训练迭代次数 # VAE模型架构 input_layer = Input(shape=(input_dim,), name='encoder_input') x = Dense(intermediate_dim, activation='relu')(input_layer) # 均值和对数方差的独立同分布输出 z_mean = Dense(input_dim, name='z_mean')(x) z_log_var = Dense(input_dim, name='z_log_var')(x) # 方差需要为正数,因此取指数 def sampling(args): z_mean, z_log_var = args batch = K.shape(z_mean)[0] dim = K.int_shape(z_mean)[1] epsilon = K.random_normal(shape=(batch, dim)) return z_mean + K.exp(0.5 * z_log_var) * epsilon z = Lambda(sampling, output_shape=(input_dim,), name='z')([z_mean, z_log_var]) # 实例化编码器模型 encoder = Model(input_layer, [z_mean, z_log_var, z], name='encoder') encoder.summary() # 解码器的输入是潜在空间的样本 decoder_input = Input(shape=(input_dim,), name='decoder_input') # 重建输入数据 x = Dense(intermediate_dim, activation='relu')(decoder_input) x = Dense(input_dim, activation='sigmoid')(x) # 实例化解码器模型 decoder = Model(decoder_input, x, name='decoder') decoder.summary() # VAE模型 z_mean, z_log_var, z = encoder(input_layer) x_decoded = decoder(z) # VAE模型 vae = Model(input_layer, [x_decoded, z_mean, z_log_var]) # VAE损失函数 reconstruction_loss = binary_crossentropy(input_layer, x_decoded) reconstruction_loss *= input_dim kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var) kl_loss = K.sum(kl_loss, axis=-1) kl_loss *= -0.5 vae_loss = K.mean(reconstruction_loss + kl_loss) vae.add_loss(vae_loss) vae.compile(optimizer='adam') vae.summary() # 训练VAE模型 vae.fit(X_scaled, epochs=epochs, batch_size=batch_size, shuffle=True) # 使用VAE模型进行预测,获取解码后的输出 X_decoded = vae.predict(X_scaled)[0] # 修改这一行,取出列表中的第一个元素 # 确保X_decoded是二维数组,然后转换为一维数组以计算重构误差 if X_decoded.ndim == 2: X_decoded = X_decoded.reshape(X_decoded.shape[0], -1) reconstruction_error = np.mean(np.square(X_scaled - X_decoded), axis=1) # 绘制重构误差的分布情况 plt.figure(figsize=(10, 6)) # 选择一个颜色,而不是一个颜色列表 color = 'blue' # 例如,我们选择 'blue' 颜色 plt.hist(reconstruction_error, bins=50, color=color, alpha=0.7) plt.xlabel('Reconstruction Error') plt.ylabel('Frequency') plt.title('Reconstruction Error Distribution') plt.savefig('VAE_Reconstruction_Error.jpg') plt.show() # 保存重构误差到CSV reconstruction_error_df = pd.DataFrame(reconstruction_error, columns=['Reconstruction_Error']) output_csv_path = '../../05故障预测/源代码/VAE_Reconstruction_Error.csv' reconstruction_error_df.to_csv(output_csv_path, index=False) # 根据重构误差进行异常检测 mean = np.mean(reconstruction_error) std = np.std(reconstruction_error) threshold = mean + 2 * std outliers = reconstruction_error > threshold # 打印异常点的索引 print("Indices of abnormal data points:", np.where(outliers)[0])'''