123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- from flask import Flask, request, jsonify
- import pandas as pd
- import joblib
- from sklearn.model_selection import train_test_split
- from sklearn.ensemble import RandomForestClassifier
- import json
- from sklearn.pipeline import Pipeline
- from sklearn.feature_extraction.text import TfidfVectorizer
- app = Flask(__name__)
- app.config['JSON_AS_ASCII'] = False
- app.config['JSONIFY_MIMETYPE'] = "application/json;charset=utf-8" # 指定浏览器渲染的文件类型,和解码格式;
- def load_model(model_path):
- try:
- # Load the pre-trained model
- loaded_model = joblib.load(model_path)
- return loaded_model
- except Exception as e:
- raise Exception(f"Failed to load the model: {str(e)}")
- def make_recommendations(model, use_scene, search_condition):
- try:
- # Convert input data to the format used for training
- input_data = pd.DataFrame({'useScene': [use_scene], 'searchCondition': [search_condition]})
- X_input = input_data[['useScene', 'searchCondition']].apply(lambda x: ' '.join(x.astype(str)), axis=1)
- # Predict recommendations using the trained model
- y_pred = model.predict(X_input)
- y_pred_list = y_pred[0].tolist()
- # Format the output data
- output_data = {
- 'useScene': use_scene,
- 'searchCondition': search_condition,
- 'result1Id': y_pred_list[0],
- 'result2Id': y_pred_list[1],
- 'result3Id': y_pred_list[2],
- 'result4Id': y_pred_list[3],
- 'result5Id': y_pred_list[4]
- }
- return output_data
- except Exception as e:
- raise Exception(f"Failed to make recommendations: {str(e)}")
- @app.route('/recommend', methods=['POST'])
- def recommend_endpoint():
- try:
- # Get input parameters from the request
- data = request.json
- model_path = data.get('modelPath')
- use_scene = data.get('useScene')
- search_condition = data.get('searchCondition')
- # Load the pre-trained model and vectorizer
- model = load_model(model_path+'model.pkl')
- # Make recommendations
- result_data = make_recommendations(model, use_scene, search_condition)
- return jsonify({'code': 200, 'msg': '成功', 'data': result_data})
- except Exception as e:
- return jsonify({'code': 500, 'msg': str(e)})
- #训练算法API--完成版
- def train_and_save_model(data, model_path):
- try:
- # Convert the JSON data to a DataFrame
- df = pd.DataFrame(data['dataSet'])
- pipeline = Pipeline([
- ('vectorizer', TfidfVectorizer()),
- ('classifier', RandomForestClassifier(n_estimators=400, random_state=42))
- ])
- # Extract features and labels
- X = df[['useScene', 'searchCondition']].apply(lambda x: ' '.join(x.astype(str)), axis=1)
- y = df[['result1Id', 'result2Id', 'result3Id', 'result4Id', 'result5Id']]
- pipeline.fit(X, y)
- # Save the trained model to a file
- joblib.dump(pipeline, model_path+'model.pkl')
- return {'code': 200, 'msg': '成功'}
- except Exception as e:
- return {'code': 500, 'msg': str(e)}
- @app.route('/train', methods=['POST'])
- def train_model():
- try:
- # Get input parameters from the request
- data = request.json
- model_path = data.get('modelPath')
- # Call the function to train and save the model
- result = train_and_save_model(data, model_path)
- return jsonify(result)
- except Exception as e:
- return jsonify({'code': 500, 'msg': str(e)})
- #验证算法API--完成版2
- # 定义训练模型的函数
- def train_model2(data):
- train_data, test_data = train_test_split(data, test_size=0.3, random_state=48)
- X_train = train_data[['useScene', 'searchCondition']].apply(lambda x: ' '.join(x.astype(str)), axis=1)
- y_train = train_data[['result1Id', 'result2Id', 'result3Id', 'result4Id', 'result5Id']]
- X_test = test_data[['useScene', 'searchCondition']].apply(lambda x: ' '.join(x.astype(str)), axis=1)
- y_test = test_data[['result1Id', 'result2Id', 'result3Id', 'result4Id', 'result5Id']]
- train_data['trained'] = 1
- train_data['tested'] = 0
- # 使用CountVectorizer将文本转换为词袋模型
- vectorizer = TfidfVectorizer()
- X_train_matrix = vectorizer.fit_transform(X_train)
- X_test_matrix = vectorizer.transform(X_test)
- rf_classifier = RandomForestClassifier(n_estimators=400, random_state=42)
- rf_classifier.fit(X_train_matrix, y_train)
- return vectorizer, rf_classifier, test_data, y_test, train_data
- # 定义预测函数
- def predict(vectorizer, rf_classifier, test_data, y_test,train_data):
- X_test = test_data[['useScene', 'searchCondition']].apply(lambda x: ' '.join(x.astype(str)), axis=1)
- y_pred = rf_classifier.predict(vectorizer.transform(X_test))
- y_pred_df = pd.DataFrame(y_pred, columns=['calculate1Id', 'calculate2Id', 'calculate3Id', 'calculate4Id', 'calculate5Id'])
- y_pred_df.index = test_data.index
- output_df = pd.concat([test_data[['id','useScene','searchCondition','result1Id','result2Id','result3Id','result4Id','result5Id']], y_pred_df], axis=1)
- output_df['trained'] = 0 # 0 indicates the record is not in the training set
- output_df['tested'] = 1 # 1 indicates the record is in the testing set
- full_data = pd.concat([train_data, output_df]).sort_index().reset_index(drop=True)
- # 计算准确率和召回率
- TP_list = []
- TN_list = []
- FP_list = []
- FN_list = []
- for i in range(y_test.shape[0]):
- # Count true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN) for each row
- y_test_row = y_test.iloc[i, :].astype(str)
- y_pred_row = y_pred_df.iloc[i, :].astype(str)
- TP = sum((y_test_row).isin(y_pred_row))
- TN = sum((y_test_row).isin(y_pred_row) == False)
- FP = sum((y_pred_row).isin(y_test_row) == False)
- FN = sum((y_test_row).isin(y_pred_row) == False)
- TP_list.append(TP)
- TN_list.append(TN)
- FP_list.append(FP)
- FN_list.append(FN)
- accuracy_list = [(TP + TN) / (TP + TN + FP + FN) if (TP + TN + FP + FN) != 0 else 0 for TP, TN, FP, FN in zip(TP_list, TN_list, FP_list, FN_list)]
- recall_list = [TP / (TP + FN) if (TP + FN) != 0 else 0 for TP, FN in zip(TP_list, FN_list)]
- # 输出JSON格式的结果
- output_data = {}
- output_data['code'] = 200
- output_data['msg'] = '成功'
- output_data['data'] = {}
- output_data['data']['accuracyRate'] = round((sum(accuracy_list) / len(accuracy_list))*100, 2)
- output_data['data']['recallRate'] = round((sum(recall_list) / len(recall_list))*100, 2)
- output_data['data']['dataSet'] = full_data.to_dict('records')
- return output_data
- # 定义路由和处理函数
- @app.route('/verification', methods=['POST'])
- def verification():
- # 从请求中获取JSON数据
- input_data = request.get_json()
- # 将JSON数据转换为DataFrame
- data = pd.DataFrame(input_data['dataSet'])
- # print(data)
- # 训练模型
- vectorizer, rf_classifier, test_data, y_test,train_data = train_model2(data)
- # 预测结果
- # print(train_data)
- output_data = predict(vectorizer, rf_classifier, test_data, y_test, train_data)
- # print(output_data)
- # 返回JSON格式的响应
- return jsonify(output_data)
- # 运行应用
- if __name__ == '__main__':
- app.run(debug=True, port=8081, host='0.0.0.0')
|