test2.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from pathlib import Path
  2. import json
  3. import pandas as pd
  4. import requests
  5. from io import StringIO
  6. from fastapi import FastAPI, HTTPException
  7. from pydantic import BaseModel
  8. from models import AnalysisRequest
  9. import uvicorn
  10. app = FastAPI(title="飞参判读API")
  11. # 获取当前文件的父目录的父目录
  12. current_dir = Path(__file__).parent
  13. json_path = current_dir.parent / "config" / "config.json"
  14. # 读取 JSON
  15. with open(json_path, "r", encoding="utf-8") as f:
  16. config = json.load(f)
  17. headers = config['mutationFailures']
  18. # 故障分数公式为 60 - 60 * A(A)为系数
  19. # 逻辑判断函数
  20. def check_abnormal_headers(dataframe):
  21. abnormal_headers = []
  22. headers = config['logicalFailures']
  23. # 遍历所有列(表头)
  24. for column_rule in headers:
  25. column_name = column_rule["column"]
  26. if (dataframe[column_name] == 1).any():
  27. abnormal_headers.append(column_rule['column'])
  28. return abnormal_headers
  29. # 突发故障判断逻辑
  30. def analyze_excel_data(dataframe):
  31. results = []
  32. faults = [] # 故障
  33. degradations = [] # 退化
  34. headers = config['mutationFailures']
  35. for column_rule in headers:
  36. column_name = column_rule["column"]
  37. variable = column_rule["variable"]
  38. # 数据预处理
  39. col_data = pd.to_numeric(dataframe[column_name], errors="coerce").dropna()
  40. if len(col_data) == 0:
  41. continue
  42. # 相邻值变化量检测 ----------------------------------------
  43. is_fault_detected = False
  44. # 遍历所有相邻数据点(至少需要2个数据)
  45. for i in range(len(col_data) - 1):
  46. current_val = col_data.iloc[i]
  47. next_val = col_data.iloc[i + 1]
  48. # 判断两个值是否都处于退化区间内
  49. delta = abs(next_val - current_val)
  50. if delta > variable:
  51. results.append(f"{column_name}突变故障")
  52. faults.append(f"{column_name}突变故障")
  53. is_fault_detected = True
  54. break # 发现故障后立即终止循环
  55. if is_fault_detected:
  56. continue # 跳过后续判断
  57. return {"results": results, "fault": faults, "degradation": degradations}
  58. @app.post("/process/faultDiagnosis", summary="分析发动机健康状态")
  59. async def analyze_engine_health(request: AnalysisRequest):
  60. """
  61. 通过 CSV 数据 URL 分析发动机状态,返回:
  62. - 健康评分
  63. - 故障列表
  64. - 性能退化指标
  65. """
  66. try:
  67. # 从请求中获取数据
  68. data_url = request.url
  69. # 获取并解析数据
  70. response = requests.get(data_url)
  71. response.encoding = 'UTF-8'
  72. csv_data = StringIO(response.text)
  73. dataframe = pd.read_csv(csv_data, index_col=False)
  74. # 定义液压泵压力列(根据实际列名修改)
  75. # hydraulic_pumps = ["液压泵1压力", "液压泵2压力", "液压泵3压力", "液压泵4压力"]
  76. # 提取列名和状态值
  77. conditions = config["startupConditions"]
  78. required_columns = [cond["column"] for cond in conditions]
  79. thresholds = {cond["column"]: cond["state"] for cond in conditions}
  80. # 初始化返回结果
  81. final_result = {
  82. "code": 200,
  83. "msg": "操作成功",
  84. "data": {
  85. "score": 100, # 默认满分
  86. "fault": [],
  87. "degradation": []
  88. }
  89. }
  90. try:
  91. # 检查列是否存在
  92. missing_cols = [col for col in required_columns if col not in dataframe]
  93. if missing_cols:
  94. raise ValueError(f"缺少压力列: {', '.join(missing_cols)}")
  95. # 生成条件掩码(所有泵≥25的行)
  96. condition_mask = pd.DataFrame({
  97. col: (dataframe[col] >= thresholds[col]) for col in required_columns
  98. }).all(axis=1)
  99. if condition_mask.any():
  100. # 获取首个满足条件的行号
  101. start_idx = condition_mask.idxmax()
  102. # 截取从该行开始的后续数据
  103. analysis_data = dataframe.loc[start_idx:]
  104. # 执行分析逻辑
  105. logical_errors = check_abnormal_headers(analysis_data)
  106. mutation_res = analyze_excel_data(analysis_data)
  107. # 合并结果
  108. final_result["data"]["fault"] = logical_errors + mutation_res["fault"]
  109. final_result["data"]["degradation"] = mutation_res["degradation"]
  110. # 计算得分(示例公式,按需调整)
  111. error_count = len(final_result["data"]["fault"])
  112. final_result["data"]["score"] = max(0, 60 - error_count * 6)
  113. else:
  114. final_result["msg"] = "🚀 未成功启动,无分析数据"
  115. except Exception as e:
  116. final_result = {
  117. "code": 400,
  118. "msg": f"错误: {str(e)}",
  119. "data": None
  120. }
  121. print(final_result)
  122. except requests.RequestException as e:
  123. raise HTTPException(status_code=400, detail=f"数据获取失败: {str(e)}")
  124. except pd.errors.ParserError as e:
  125. raise HTTPException(status_code=422, detail="CSV 数据解析失败")
  126. except KeyError as e:
  127. raise HTTPException(status_code=500, detail=f"配置字段缺失: {str(e)}")
  128. if __name__ == "__main__":
  129. uvicorn.run(app, host="0.0.0.0", port=8848)