from pathlib import Path import json import pandas as pd import requests from io import StringIO from fastapi import FastAPI, HTTPException from pydantic import BaseModel app = FastAPI(title="飞参判读API") # 获取当前文件的父目录的父目录 current_dir = Path(__file__).parent json_path = current_dir.parent / "config" / "config.json" # 读取 JSON with open(json_path, "r", encoding="utf-8") as f: config = json.load(f) headers = config['mutationFailures'] # 故障分数公式为 60 - 60 * A(A)为系数 # 逻辑判断函数 def check_abnormal_headers(dataframe): abnormal_headers = [] headers = config['logicalFailures'] # 遍历所有列(表头) for column_rule in headers: column_name = column_rule["column"] if (dataframe[column_name] == 1).any(): abnormal_headers.append(column_rule['column']) return abnormal_headers # 突发故障判断逻辑 def analyze_excel_data(dataframe): results = [] faults = [] # 故障 degradations = [] # 退化 headers = config['mutationFailures'] for column_rule in headers: column_name = column_rule["column"] variable = column_rule["variable"] # 数据预处理 col_data = pd.to_numeric(dataframe[column_name], errors="coerce").dropna() if len(col_data) == 0: continue # 相邻值变化量检测 ---------------------------------------- is_fault_detected = False # 遍历所有相邻数据点(至少需要2个数据) for i in range(len(col_data) - 1): current_val = col_data.iloc[i] next_val = col_data.iloc[i + 1] # 判断两个值是否都处于退化区间内 delta = abs(next_val - current_val) if delta > variable: results.append(f"{column_name}突变故障") faults.append(f"{column_name}突变故障") is_fault_detected = True break # 发现故障后立即终止循环 if is_fault_detected: continue # 跳过后续判断 return {"results": results, "fault": faults, "degradation": degradations} if __name__ == '__main__': # 读取数据(请确认文件路径和列名) csv_path = r"D:\project\daima\huang\Hang_KG\data\test.txt" dataframe = pd.read_csv(csv_path, encoding='UTF-8') # print("hearder",headers) # 定义液压泵压力列(根据实际列名修改) # hydraulic_pumps = ["液压泵1压力", "液压泵2压力", "液压泵3压力", "液压泵4压力"] # 提取列名和状态值 conditions = config["startupConditions"] required_columns = [cond["column"] for cond in conditions] thresholds = {cond["column"]: cond["state"] for cond in conditions} # 初始化返回结果 final_result = { "code": 200, "msg": "操作成功", "data": { "score": 100, # 默认满分 "fault": [], "degradation": [] } } try: # 检查列是否存在 missing_cols = [col for col in required_columns if col not in dataframe] if missing_cols: raise ValueError(f"缺少压力列: {', '.join(missing_cols)}") # 生成条件掩码(所有泵≥25的行) condition_mask = pd.DataFrame({ col: (dataframe[col] >= thresholds[col]) for col in required_columns }).all(axis=1) if condition_mask.any(): # 获取首个满足条件的行号 start_idx = condition_mask.idxmax() # 截取从该行开始的后续数据 analysis_data = dataframe.loc[start_idx:] # 执行分析逻辑 logical_errors = check_abnormal_headers(analysis_data) mutation_res = analyze_excel_data(analysis_data) # 合并结果 final_result["data"]["fault"] = logical_errors + mutation_res["fault"] final_result["data"]["degradation"] = mutation_res["degradation"] # 计算得分(示例公式,按需调整) error_count = len(final_result["data"]["fault"]) final_result["data"]["score"] = max(0, 60 - error_count * 6) else: final_result["msg"] = "🚀 未成功启动,无分析数据" except Exception as e: final_result = { "code": 400, "msg": f"错误: {str(e)}", "data": None } print(final_result)