actions.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from pathlib import Path
  2. import json
  3. import pandas as pd
  4. import requests
  5. from io import StringIO
  6. from fastapi import FastAPI, HTTPException
  7. from pydantic import BaseModel
  8. app = FastAPI(title="飞参判读API")
  9. # 获取当前文件的父目录的父目录
  10. current_dir = Path(__file__).parent
  11. json_path = current_dir.parent / "config" / "config.json"
  12. # 读取 JSON
  13. with open(json_path, "r", encoding="utf-8") as f:
  14. config = json.load(f)
  15. headers = config['mutationFailures']
  16. # 故障分数公式为 60 - 60 * A(A)为系数
  17. # 逻辑判断函数
  18. def check_abnormal_headers(dataframe):
  19. abnormal_headers = []
  20. headers = config['logicalFailures']
  21. # 遍历所有列(表头)
  22. for column_rule in headers:
  23. column_name = column_rule["column"]
  24. if (dataframe[column_name] == 1).any():
  25. abnormal_headers.append(column_rule['column'])
  26. return abnormal_headers
  27. # 突发故障判断逻辑
  28. def analyze_excel_data(dataframe):
  29. results = []
  30. faults = [] # 故障
  31. degradations = [] # 退化
  32. headers = config['mutationFailures']
  33. for column_rule in headers:
  34. column_name = column_rule["column"]
  35. variable = column_rule["variable"]
  36. # 数据预处理
  37. col_data = pd.to_numeric(dataframe[column_name], errors="coerce").dropna()
  38. if len(col_data) == 0:
  39. continue
  40. # 相邻值变化量检测 ----------------------------------------
  41. is_fault_detected = False
  42. # 遍历所有相邻数据点(至少需要2个数据)
  43. for i in range(len(col_data) - 1):
  44. current_val = col_data.iloc[i]
  45. next_val = col_data.iloc[i + 1]
  46. # 判断两个值是否都处于退化区间内
  47. delta = abs(next_val - current_val)
  48. if delta > variable:
  49. results.append(f"{column_name}突变故障")
  50. faults.append(f"{column_name}突变故障")
  51. is_fault_detected = True
  52. break # 发现故障后立即终止循环
  53. if is_fault_detected:
  54. continue # 跳过后续判断
  55. return {"results": results, "fault": faults, "degradation": degradations}
  56. if __name__ == '__main__':
  57. # 读取数据(请确认文件路径和列名)
  58. csv_path = r"D:\project\daima\huang\Hang_KG\data\test.txt"
  59. dataframe = pd.read_csv(csv_path, encoding='UTF-8')
  60. # print("hearder",headers)
  61. # 定义液压泵压力列(根据实际列名修改)
  62. # hydraulic_pumps = ["液压泵1压力", "液压泵2压力", "液压泵3压力", "液压泵4压力"]
  63. # 提取列名和状态值
  64. conditions = config["startupConditions"]
  65. required_columns = [cond["column"] for cond in conditions]
  66. thresholds = {cond["column"]: cond["state"] for cond in conditions}
  67. # 初始化返回结果
  68. final_result = {
  69. "code": 200,
  70. "msg": "操作成功",
  71. "data": {
  72. "score": 100, # 默认满分
  73. "fault": [],
  74. "degradation": []
  75. }
  76. }
  77. try:
  78. # 检查列是否存在
  79. missing_cols = [col for col in required_columns if col not in dataframe]
  80. if missing_cols:
  81. raise ValueError(f"缺少压力列: {', '.join(missing_cols)}")
  82. # 生成条件掩码(所有泵≥25的行)
  83. condition_mask = pd.DataFrame({
  84. col: (dataframe[col] >= thresholds[col]) for col in required_columns
  85. }).all(axis=1)
  86. if condition_mask.any():
  87. # 获取首个满足条件的行号
  88. start_idx = condition_mask.idxmax()
  89. # 截取从该行开始的后续数据
  90. analysis_data = dataframe.loc[start_idx:]
  91. # 执行分析逻辑
  92. logical_errors = check_abnormal_headers(analysis_data)
  93. mutation_res = analyze_excel_data(analysis_data)
  94. # 合并结果
  95. final_result["data"]["fault"] = logical_errors + mutation_res["fault"]
  96. final_result["data"]["degradation"] = mutation_res["degradation"]
  97. # 计算得分(示例公式,按需调整)
  98. error_count = len(final_result["data"]["fault"])
  99. final_result["data"]["score"] = max(0, 60 - error_count * 6)
  100. else:
  101. final_result["msg"] = "🚀 未成功启动,无分析数据"
  102. except Exception as e:
  103. final_result = {
  104. "code": 400,
  105. "msg": f"错误: {str(e)}",
  106. "data": None
  107. }
  108. print(final_result)