import_export_mixin.py 15 KB


  1. # -*- coding: utf-8 -*-
  2. from types import FunctionType, MethodType
  3. from urllib.parse import quote
  4. from django.db import transaction
  5. from django.http import HttpResponse
  6. from openpyxl import Workbook
  7. from openpyxl.worksheet.datavalidation import DataValidation
  8. from openpyxl.utils import get_column_letter, quote_sheetname
  9. from openpyxl.worksheet.table import Table, TableStyleInfo
  10. from rest_framework.decorators import action
  11. from rest_framework.request import Request
  12. from dvadmin.utils.import_export import import_to_data
  13. from dvadmin.utils.json_response import DetailResponse
  14. from dvadmin.utils.request_util import get_verbose_name
  15. class ImportSerializerMixin:
  16. """
  17. 自定义导入模板、导入功能
  18. """
  19. # 导入字段
  20. import_field_dict = {}
  21. # 导入序列化器
  22. import_serializer_class = None
  23. # 表格表头最大宽度,默认50个字符
  24. export_column_width = 50
  25. def is_number(self,num):
  26. try:
  27. float(num)
  28. return True
  29. except ValueError:
  30. pass
  31. try:
  32. import unicodedata
  33. unicodedata.numeric(num)
  34. return True
  35. except (TypeError, ValueError):
  36. pass
  37. return False
  38. def get_string_len(self, string):
  39. """
  40. 获取字符串最大长度
  41. :param string:
  42. :return:
  43. """
  44. length = 4
  45. if string is None:
  46. return length
  47. if self.is_number(string):
  48. return length
  49. for char in string:
  50. length += 2.1 if ord(char) > 256 else 1
  51. return round(length, 1) if length <= self.export_column_width else self.export_column_width
  52. @action(methods=['get','post'],detail=False)
  53. @transaction.atomic # Django 事务,防止出错
  54. def import_data(self, request: Request, *args, **kwargs):
  55. """
  56. 导入模板
  57. :param request:
  58. :param args:
  59. :param kwargs:
  60. :return:
  61. """
  62. assert self.import_field_dict, "'%s' 请配置对应的导出模板字段。" % self.__class__.__name__
  63. if isinstance(self.import_field_dict, MethodType) or isinstance(self.import_field_dict, FunctionType):
  64. self.import_field_dict = self.import_field_dict()
  65. # 导出模板
  66. if request.method == "GET":
  67. # 示例数据
  68. queryset = self.filter_queryset(self.get_queryset())
  69. # 导出excel 表
  70. response = HttpResponse(content_type="application/msexcel")
  71. response["Access-Control-Expose-Headers"] = f"Content-Disposition"
  72. response[
  73. "Content-Disposition"
  74. ] = f'attachment;filename={quote(str(f"导入{get_verbose_name(queryset)}模板.xlsx"))}'
  75. wb = Workbook()
  76. ws1 = wb.create_sheet("data", 1)
  77. ws1.sheet_state = "hidden"
  78. ws = wb.active
  79. row = get_column_letter(len(self.import_field_dict) + 1)
  80. column = 10
  81. header_data = [
  82. "序号",
  83. ]
  84. validation_data_dict = {}
  85. for index, ele in enumerate(self.import_field_dict.values()):
  86. if isinstance(ele, dict):
  87. header_data.append(ele.get("title"))
  88. choices = ele.get("choices", {})
  89. if choices.get("data"):
  90. data_list = []
  91. data_list.extend(choices.get("data").keys())
  92. validation_data_dict[ele.get("title")] = data_list
  93. elif choices.get("queryset") and choices.get("values_name"):
  94. data_list = choices.get("queryset").values_list(choices.get("values_name"), flat=True)
  95. validation_data_dict[ele.get("title")] = list(data_list)
  96. else:
  97. continue
  98. column_letter = get_column_letter(len(validation_data_dict))
  99. dv = DataValidation(
  100. type="list",
  101. formula1=f"{quote_sheetname('data')}!${column_letter}$2:${column_letter}${len(validation_data_dict[ele.get('title')]) + 1}",
  102. allow_blank=True,
  103. )
  104. ws.add_data_validation(dv)
  105. dv.add(f"{get_column_letter(index + 2)}2:{get_column_letter(index + 2)}1048576")
  106. else:
  107. header_data.append(ele)
  108. # 添加数据列
  109. ws1.append(list(validation_data_dict.keys()))
  110. for index, validation_data in enumerate(validation_data_dict.values()):
  111. for inx, ele in enumerate(validation_data):
  112. ws1[f"{get_column_letter(index + 1)}{inx + 2}"] = ele
  113. # 插入导出模板正式数据
  114. df_len_max = [self.get_string_len(ele) for ele in header_data]
  115. ws.append(header_data)
  116. #  更新列宽
  117. for index, width in enumerate(df_len_max):
  118. ws.column_dimensions[get_column_letter(index + 1)].width = width
  119. tab = Table(displayName="Table1", ref=f"A1:{row}{column}") # 名称管理器
  120. style = TableStyleInfo(
  121. name="TableStyleLight11",
  122. showFirstColumn=True,
  123. showLastColumn=True,
  124. showRowStripes=True,
  125. showColumnStripes=True,
  126. )
  127. tab.tableStyleInfo = style
  128. ws.add_table(tab)
  129. wb.save(response)
  130. return response
  131. else:
  132. # 从excel中组织对应的数据结构,然后使用序列化器保存
  133. queryset = self.filter_queryset(self.get_queryset())
  134. # 获取多对多字段
  135. m2m_fields = [
  136. ele.name
  137. for ele in queryset.model._meta.get_fields()
  138. if hasattr(ele, "many_to_many") and ele.many_to_many == True
  139. ]
  140. import_field_dict = {'id':'更新主键(勿改)',**self.import_field_dict}
  141. data = import_to_data(request.data.get("url"), import_field_dict, m2m_fields)
  142. for ele in data:
  143. filter_dic = {'id':ele.get('id')}
  144. instance = filter_dic and queryset.filter(**filter_dic).first()
  145. # print(156,ele)
  146. serializer = self.import_serializer_class(instance, data=ele, request=request)
  147. serializer.is_valid(raise_exception=True)
  148. serializer.save()
  149. return DetailResponse(msg=f"导入成功!")
  150. @action(methods=['get'],detail=False)
  151. def update_template(self,request):
  152. queryset = self.filter_queryset(self.get_queryset())
  153. assert self.import_field_dict, "'%s' 请配置对应的导入模板字段。" % self.__class__.__name__
  154. assert self.import_serializer_class, "'%s' 请配置对应的导入序列化器。" % self.__class__.__name__
  155. data = self.import_serializer_class(queryset, many=True, request=request).data
  156. if isinstance(self.import_field_dict, MethodType) or isinstance(self.import_field_dict, FunctionType):
  157. self.import_field_dict = self.import_field_dict()
  158. # 导出excel 表
  159. response = HttpResponse(content_type="application/msexcel")
  160. response["Access-Control-Expose-Headers"] = f"Content-Disposition"
  161. response["content-disposition"] = f'attachment;filename={quote(str(f"导出{get_verbose_name(queryset)}.xlsx"))}'
  162. wb = Workbook()
  163. ws1 = wb.create_sheet("data", 1)
  164. ws1.sheet_state = "hidden"
  165. ws = wb.active
  166. import_field_dict = {}
  167. header_data = ["序号","更新主键(勿改)"]
  168. hidden_header = ["#","id"]
  169. #----设置选项----
  170. validation_data_dict = {}
  171. for index, item in enumerate(self.import_field_dict.items()):
  172. items = list(item)
  173. key = items[0]
  174. value = items[1]
  175. if isinstance(value, dict):
  176. header_data.append(value.get("title"))
  177. hidden_header.append(value.get('display'))
  178. choices = value.get("choices", {})
  179. if choices.get("data"):
  180. data_list = []
  181. data_list.extend(choices.get("data").keys())
  182. validation_data_dict[value.get("title")] = data_list
  183. elif choices.get("queryset") and choices.get("values_name"):
  184. data_list = choices.get("queryset").values_list(choices.get("values_name"), flat=True)
  185. validation_data_dict[value.get("title")] = list(data_list)
  186. else:
  187. continue
  188. column_letter = get_column_letter(len(validation_data_dict))
  189. dv = DataValidation(
  190. type="list",
  191. formula1=f"{quote_sheetname('data')}!${column_letter}$2:${column_letter}${len(validation_data_dict[value.get('title')]) + 1}",
  192. allow_blank=True,
  193. )
  194. ws.add_data_validation(dv)
  195. dv.add(f"{get_column_letter(index + 3)}2:{get_column_letter(index + 3)}1048576")
  196. else:
  197. header_data.append(value)
  198. hidden_header.append(key)
  199. # 添加数据列
  200. ws1.append(list(validation_data_dict.keys()))
  201. for index, validation_data in enumerate(validation_data_dict.values()):
  202. for inx, ele in enumerate(validation_data):
  203. ws1[f"{get_column_letter(index + 1)}{inx + 2}"] = ele
  204. #--------
  205. df_len_max = [self.get_string_len(ele) for ele in header_data]
  206. row = get_column_letter(len(hidden_header) + 1)
  207. column = 1
  208. ws.append(header_data)
  209. for index, results in enumerate(data):
  210. results_list = []
  211. for h_index, h_item in enumerate(hidden_header):
  212. for key, val in results.items():
  213. if key == h_item:
  214. if val is None or val == "":
  215. results_list.append("")
  216. elif isinstance(val,list):
  217. results_list.append(str(val))
  218. else:
  219. results_list.append(val)
  220. # 计算最大列宽度
  221. if isinstance(val,str):
  222. result_column_width = self.get_string_len(val)
  223. if h_index != 0 and result_column_width > df_len_max[h_index]:
  224. df_len_max[h_index] = result_column_width
  225. ws.append([index+1,*results_list])
  226. column += 1
  227. #  更新列宽
  228. for index, width in enumerate(df_len_max):
  229. ws.column_dimensions[get_column_letter(index + 1)].width = width
  230. tab = Table(displayName="Table", ref=f"A1:{row}{column}") # 名称管理器
  231. style = TableStyleInfo(
  232. name="TableStyleLight11",
  233. showFirstColumn=True,
  234. showLastColumn=True,
  235. showRowStripes=True,
  236. showColumnStripes=True,
  237. )
  238. tab.tableStyleInfo = style
  239. ws.add_table(tab)
  240. wb.save(response)
  241. return response
  242. class ExportSerializerMixin:
  243. """
  244. 自定义导出功能
  245. """
  246. # 导出字段
  247. export_field_label = []
  248. # 导出序列化器
  249. export_serializer_class = None
  250. # 表格表头最大宽度,默认50个字符
  251. export_column_width = 50
  252. def is_number(self,num):
  253. try:
  254. float(num)
  255. return True
  256. except ValueError:
  257. pass
  258. try:
  259. import unicodedata
  260. unicodedata.numeric(num)
  261. return True
  262. except (TypeError, ValueError):
  263. pass
  264. return False
  265. def get_string_len(self, string):
  266. """
  267. 获取字符串最大长度
  268. :param string:
  269. :return:
  270. """
  271. length = 4
  272. if string is None:
  273. return length
  274. if self.is_number(string):
  275. return length
  276. for char in string:
  277. length += 2.1 if ord(char) > 256 else 1
  278. return round(length, 1) if length <= self.export_column_width else self.export_column_width
  279. @action(methods=['get'],detail=False)
  280. def export_data(self, request: Request, *args, **kwargs):
  281. """
  282. 导出功能
  283. :param request:
  284. :param args:
  285. :param kwargs:
  286. :return:
  287. """
  288. queryset = self.filter_queryset(self.get_queryset())
  289. assert self.export_field_label, "'%s' 请配置对应的导出模板字段。" % self.__class__.__name__
  290. assert self.export_serializer_class, "'%s' 请配置对应的导出序列化器。" % self.__class__.__name__
  291. data = self.export_serializer_class(queryset, many=True, request=request).data
  292. # 导出excel 表
  293. response = HttpResponse(content_type="application/msexcel")
  294. response["Access-Control-Expose-Headers"] = f"Content-Disposition"
  295. response["content-disposition"] = f'attachment;filename={quote(str(f"导出{get_verbose_name(queryset)}.xlsx"))}'
  296. wb = Workbook()
  297. ws = wb.active
  298. header_data = ["序号", *self.export_field_label.values()]
  299. hidden_header = ["#", *self.export_field_label.keys()]
  300. df_len_max = [self.get_string_len(ele) for ele in header_data]
  301. row = get_column_letter(len(self.export_field_label) + 1)
  302. column = 1
  303. ws.append(header_data)
  304. for index, results in enumerate(data):
  305. results_list = []
  306. for h_index, h_item in enumerate(hidden_header):
  307. for key,val in results.items():
  308. if key == h_item:
  309. if val is None or val=="":
  310. results_list.append("")
  311. else:
  312. results_list.append(val)
  313. # 计算最大列宽度
  314. result_column_width = self.get_string_len(val)
  315. if h_index !=0 and result_column_width > df_len_max[h_index]:
  316. df_len_max[h_index] = result_column_width
  317. ws.append([index + 1, *results_list])
  318. column += 1
  319. #  更新列宽
  320. for index, width in enumerate(df_len_max):
  321. ws.column_dimensions[get_column_letter(index + 1)].width = width
  322. tab = Table(displayName="Table", ref=f"A1:{row}{column}") # 名称管理器
  323. style = TableStyleInfo(
  324. name="TableStyleLight11",
  325. showFirstColumn=True,
  326. showLastColumn=True,
  327. showRowStripes=True,
  328. showColumnStripes=True,
  329. )
  330. tab.tableStyleInfo = style
  331. ws.add_table(tab)
  332. wb.save(response)
  333. return response