Quellcode durchsuchen

事务封装、处理excel和txt导入

R vor 1 Jahr
Ursprung
Commit
cbfc4276ee
4 geänderte Dateien mit 136 neuen und 111 gelöschten Zeilen
  1. 69 37
      router/dataMark/dataset.js
  2. 33 53
      utils/dbHelper.js
  3. 19 0
      utils/index.js
  4. 15 21
      utils/tool.js

+ 69 - 37
router/dataMark/dataset.js

@@ -3,10 +3,11 @@ const router = express.Router()
 const utils = require('../../utils/index.js')
 const pools = require('../../utils/pools.js')
 const xlsx = require('node-xlsx')
+const fs = require('fs')
 const path = require('path')
 const fileEvent = require('../../utils/file')
-const execTrans = require('../../utils/dbHelper.js')
-const getNewSqlParamEntity = require('../../utils/tool.js')
+const execTransection = require('../../utils/dbHelper.js')
+const datetimeToTime = require('../../utils/tool.js')
 const uuid = require('uuid')
 // 增加数据集
 router.post('/addDataset', async (req, res) => {
@@ -66,14 +67,22 @@ router.post('/delDataset', async (req, res) => {
 
 // 查询导入记录
 router.post('/getImportRecord', async (req, res) => {
-  let user = await utils.getUserInfo({ req, res }),
-    obj = req.body
-  let sql = `SELECT id,name,importer_id,time,data_volume FROM file_import_t WHERE 1=1` // WHERE 1=1是一个始终为真的条件,后续动态添加查询条件的起点
-  sql = utils.setLike(sql, 'name', obj.name) // 添加一个基于name字段的模糊匹配条件,匹配的值来自obj.name
+  let user = await utils.getUserInfo({ req, res })
+  obj = req.body
+  console.log('req.body', req.body)
+  let sql = `SELECT 
+    f.id,
+    f.name,
+    (SELECT name from user u where f.importer_id = u.id) as importerName,
+    f.time,
+    f.data_volume
+  FROM 
+  file_import_t f WHERE f.dataset_id = ?`
+  sql = utils.setLike(sql, 'name', obj.name)
   sql = utils.setMoreId(sql, user)
-  let { total } = await utils.getSum({ sql, name: 'file_import_t', res, req }) // 执行SQL查询并获取结果中的总数
-  sql = utils.pageSize(sql, obj.page, obj.size) // 函数来修改SQL语句,添加分页功能。它使用obj.page和obj.size来确定返回结果的范围。
-  let { result } = await pools({ sql, res, req })
+  let { total } = await utils.getSumWhere({ sql: '', val: [obj.dataset_id], name: 'file_import_t f', res, req })
+  sql = utils.pageSize(sql, obj.page, obj.size)
+  let { result } = await pools({ sql, val: [obj.dataset_id], req })
   res.send(utils.returnData({ data: result, total }))
 })
 
@@ -81,46 +90,69 @@ router.post('/getImportRecord', async (req, res) => {
 router.post('/getDataList', async (req, res) => {
   let user = await utils.getUserInfo({ req, res }),
     obj = req.body
-  let sql = `SELECT id,text,is_mark,state FROM mark_data_t WHERE 1=1` // WHERE 1=1是一个始终为真的条件,后续动态添加查询条件的起点
-  sql = utils.setLike(sql, 'text', obj.text) // 添加一个基于name字段的模糊匹配条件,匹配的值来自obj.name
+  let sql = `SELECT id,text,is_mark,state FROM mark_data_t WHERE 1=1`
+  sql = utils.setLike(sql, 'text', obj.text)
   sql = utils.setMoreId(sql, user)
-  let { total } = await utils.getSum({ sql, name: 'mark_data_t', res, req }) // 执行SQL查询并获取结果中的总数
-  sql = utils.pageSize(sql, obj.page, obj.size) // 函数来修改SQL语句,添加分页功能。它使用obj.page和obj.size来确定返回结果的范围。
+  let { total } = await utils.getSum({ sql, name: 'mark_data_t', res, req })
+  sql = utils.pageSize(sql, obj.page, obj.size)
   let { result } = await pools({ sql, res, req })
   res.send(utils.returnData({ data: result, total }))
 })
 
 // 导入数据
 router.post('/importData', async (req, res) => {
-  var sqlParamsEntity = []
-  let fileImportSql = 'INSERT INTO file_import_t(`id`,`name`,`importer_id`,`data_volume`,`dataset_id`) VALUES ?'
-  // let sql1 = 'INSERT INTO mark_data_t(`text`,`file_id`,`time`,`data_volume`,`dataset_id`) VALUES ?'
   let fileArr = await fileEvent(req, res)
+  let filename = fileArr[0].filename
+  const fileType = req.files[0].mimetype
+  let list
+  //配置获取文件路径
+  if (fileType.includes('text')) {
+    const path = `public/uploadFile/${filename}`
+    fs.readFile(path, 'utf-8', (err, data) => {
+      if (err) {
+        res.send(utils.returnData({ code: -1, msg: '文件解析失败' }))
+      }
+      const data1 = data.split(/\r?\n/)
+      list = data1.map((line) => [line])
+      writeToDatabase(fileArr, list, req, res)
+    })
+  } else {
+    let xlsxRes = xlsx.parse(`${path.join(__dirname, '../../', 'public/uploadFile/')}${filename}`, { cellDates: true })
+    list = xlsxRes[0].data
+    list.splice(0, 1)
+    writeToDatabase(fileArr, list, req, res)
+  }
+})
+const writeToDatabase = (fileArr, list, req, res) => {
+  let sqlParamsEntity = []
+  let fileImportSql = 'INSERT INTO file_import_t(`id`,`name`,`importer_id`,`time`,`data_volume`,`dataset_id`) VALUES (?,?,?,?,?,?)'
   let params = JSON.parse(fileArr[0].params.listType)
   const uniqueId = uuid.v4()
-  console.log('fileArr', fileArr)
-  let filename = fileArr[0].filename
   let name = fileArr[0].originalname
-  //配置获取文件路径
-  let xlsxRes = xlsx.parse(`${path.join(__dirname, '../../', 'public/uploadFile/')}${filename}`, { cellDates: true })
-  let list = xlsxRes[0].data
-  list.splice(0, 1)
-  var fileImportParam = { id: uniqueId, name, importer_id: params.importer_id, data_volume: list.length, dataset_id: params.dataset_id }
-  sqlParamsEntity.push(getNewSqlParamEntity({ sql: fileImportSql, fileImportParam }))
-  let markDataSql = 'INSERT INTO file_import_t(text,`file_id`,`dataset_id`,is_mark,state) VALUES ?'
+  let { importer_id: importerId, dataset_id: datasetId } = params
+  let currentTime = datetimeToTime()
+  let dataVolume = list.length
+  var fileImportParam = [uniqueId, name, importerId, currentTime, dataVolume, datasetId]
+  sqlParamsEntity.push({
+    sql: fileImportSql,
+    values: fileImportParam
+  })
+  let markDataSql = 'INSERT INTO mark_data_t(text,`file_id`,`dataset_id`,is_mark,state) VALUES (?,?,?,?,?)'
+  console.log('list.length', list.length)
   for (let i = 0; i < list.length; i++) {
     let item = list[i]
-    console.log('item', item)
-    // var markDataParams = { id: uniqueId, name, importer_id: params.importer_id, data_volume: list.length, dataset_id: params.dataset_id }
-    // sqlParamsEntity.push(getNewSqlParamEntity({ sql: fileImportSql, fileImportParam }))
+    var markDataParams = [item[0], uniqueId, datasetId, true, false]
+    sqlParamsEntity.push({
+      sql: markDataSql,
+      values: markDataParams
+    })
   }
-  // await pools({sql,val:[list],run:false,res,req,msg:"请确认文档导入值没有问题!!!"});
-  execTrans(sqlParamsEntity, function (err, info) {
-    if (err) {
-      console.error('事务执行失败', err)
-    } else {
-      console.log('done.')
-    }
-  })
-})
+  execTransection(sqlParamsEntity)
+    .then((resp) => {
+      res.send(utils.returnData({ code: 1, msg: '导入成功', data: [] }))
+    })
+    .catch((err) => {
+      res.send(utils.returnData({ code: -1, msg, err, req }))
+    })
+}
 module.exports = router

+ 33 - 53
utils/dbHelper.js

@@ -1,63 +1,43 @@
+// const pool = require('../db/mysql') // 导入pool对象
 const pool = require('../pool.js')
-// var async = require('async')
 
-// module.exports = {
-//   execTrans: execTrans
-// }
-
-module.exports = function execTrans(sqlparamsEntities, callback) {
-  pool.getConnection(function (err, connection) {
-    if (err) {
-      return callback(err, null)
-    }
-    connection.beginTransaction(function (err) {
+module.exports = function execTransection(sqlArr) {
+  return new Promise((resolve, reject) => {
+    var promiseArr = []
+    pool.getConnection(function (err, connection) {
       if (err) {
-        return callback(err, null)
+        return reject(err)
       }
-      console.log('开始执行transaction,共执行' + sqlparamsEntities.length + '条数据')
-      var funcAry = []
-      sqlparamsEntities.forEach(function (sql_param) {
-        var temp = function (cb) {
-          var sql = sql_param.sql
-          var param = sql_param.params
-          connection.query(sql, param, function (tErr, rows, fields) {
-            if (tErr) {
-              connection.rollback(function () {
-                console.log('事务失败,' + sql_param + ',ERROR:' + tErr)
-                throw tErr
-              })
-            } else {
-              return cb(null, 'ok')
-            }
-          })
-        }
-        funcAry.push(temp)
-      })
-
-      async.series(funcAry, function (err, result) {
-        console.log('transaction error: ' + err)
+      connection.beginTransaction((err) => {
         if (err) {
-          connection.rollback(function (err) {
-            console.log('transaction error: ' + err)
-            connection.release()
-            return callback(err, null)
+          return reject('开启事务失败')
+        }
+        // 将所有需要执行的sql封装为数组
+        promiseArr = sqlArr.map(({ sql, values }) => {
+          return new Promise((resolve, reject) => {
+            connection.query(sql, values, (e, rows, fields) => {
+              e ? reject(e) : resolve({ rows, success: true })
+            })
           })
-        } else {
-          connection.commit(function (err, info) {
-            console.log('transaction info: ' + JSON.stringify(info))
-            if (err) {
-              console.log('执行事务失败,' + err)
-              connection.rollback(function (err) {
-                console.log('transaction error: ' + err)
-                connection.release()
-                return callback(err, null)
-              })
-            } else {
-              connection.release()
-              return callback(null, info)
-            }
+        })
+        // Promise调用所有sql,一旦出错,回滚,否则,提交事务并释放链接
+        Promise.all(promiseArr)
+          .then((res) => {
+            connection.commit((error) => {
+              if (error) {
+                console.log('事务提交失败' + error)
+                reject(error)
+              }
+            })
+            connection.release() // 释放链接
+            resolve({ res })
+          })
+          .catch((err) => {
+            connection.rollback(() => {
+              console.log('数据操作回滚' + err)
+            })
+            reject(err)
           })
-        }
       })
     })
   })

+ 19 - 0
utils/index.js

@@ -259,6 +259,25 @@ module.exports = {
     let { result: resultRes } = await this.poolsEvent()({ sql: sqlRes, res, req })
     return { total: resultRes[0]['count(1)'] }
   },
+
+  /**
+   * 查询总数
+   * @param sql  sql语句
+   * @param val ?另加值
+   * @param name  表名
+   * @param res  响应主体
+   * @param req  请求主体
+   * */
+  async getSumWhere({ sql = '', val = [], name, res, req }) {
+    const regex = /WHERE(.+)/
+    const result = sql.match(regex)
+    console.log('result', result)
+    let where = '1=1'
+    if (result && result[1]) where = result[1].trim()
+    let sqlRes = `SELECT count(1) FROM ${name} WHERE ${where}`
+    let { result: resultRes } = await this.poolsEvent()({ sql: sqlRes, val, res, req })
+    return { total: resultRes[0]['count(1)'] }
+  },
   /**
    * 将多账户id加入sql判断
    * @param sql  sql语句

+ 15 - 21
utils/tool.js

@@ -1,22 +1,16 @@
-module.exports = function getNewSqlParamEntity(sql, params, callback) {
-  if (callback) {
-    return callback(null, {
-      sql: sql,
-      params: params
-    })
-  }
-  return {
-    sql: sql,
-    params: params
-  }
-}
-module.exports = function timestampToTime(timestamp) {
-  var date = new Date(timestamp * 1000) //时间戳为10位需*1000,时间戳为13位的话不需乘1000
-  var Y = date.getFullYear() + '-'
-  var M = (date.getMonth() + 1 < 10 ? '0' + (date.getMonth() + 1) : date.getMonth() + 1) + '-'
-  var D = (date.getDate() < 10 ? '0' + date.getDate() : date.getDate()) + ' '
-  var h = (date.getHours() < 10 ? '0' + date.getHours() : date.getHours()) + ':'
-  var m = (date.getMinutes() < 10 ? '0' + date.getMinutes() : date.getMinutes()) + ':'
-  var s = date.getSeconds() < 10 ? '0' + date.getSeconds() : date.getSeconds()
-  return Y + M + D + h + m + s
+module.exports = function datetimeToTime() {
+  var date = new Date()
+  return (
+    date.getFullYear() +
+    '-' +
+    ('0' + (date.getMonth() + 1)).slice(-2) +
+    '-' +
+    ('0' + date.getDate()).slice(-2) +
+    ' ' +
+    ('0' + date.getHours()).slice(-2) +
+    ':' +
+    ('0' + date.getMinutes()).slice(-2) +
+    ':' +
+    ('0' + date.getSeconds()).slice(-2)
+  )
 }