这里用到的第三方包是xlsxwriter
,这个包可以导出特殊字符和emoji表情
第一步,新建一个序列化器serialze.py
from typing import Dict, List
from django.core.exceptions import FieldDoesNotExist
from django.db import models
"""自定义序列化
"""
def string_convert(string: str):
"""
将连字符格式转成小驼峰格式
example: user_name -> userName
"""
char_list = string.split("_")
# 首个单词不转换
for index, char in enumerate(char_list[1:], start=1):
char_list[index] = char.capitalize()
return "".join(char_list)
def serialize_model(model: models.Model, excludes: list = None) -> Dict:
"""
模型序列化,会根据 select_related 和 prefetch_related 关联查询的结果进行序列化,可以在查询时使用 only、defer 来筛选序列化的字段。
它不会自做主张的去查询数据库,只用你查询出来的结果,成功避免了 N+1 查询问题。
# 具体请看:
https://aber.sh/articles/A-new-idea-of-serializing-Django-model/
"""
serialized = set()
def _serialize_model(obj) -> Dict:
# 当 model 存在一对一字段时,会陷入循环,使用闭包的自由变量存储已序列化的 model,
# 在第二次循环到该 model 时直接返回 model.pk,不再循环。
nonlocal serialized
if obj in serialized:
return obj.pk
else:
serialized.add(obj)
# 当 model 存在一对一或一对多字段,且该字段的值为 None 时,直接返回空{},否则会报错。
if obj is None:
return {}
result = {
string_convert(name): _serialize_model(foreign_key)
for name, foreign_key in obj.__dict__["_state"].__dict__.get("fields_cache", {}).items()
}
for name, value in obj.__dict__.items():
buried_fields = getattr(obj, "buried_fields", [])
if name in buried_fields:
# 不可暴露的字段
continue
try:
obj._meta.get_field(name) # noqa
except FieldDoesNotExist:
# 非模型字段
continue
else:
result[string_convert(name)] = value
for name, queryset in obj.__dict__.get("_prefetched_objects_cache", {}).items():
result[string_convert(name)] = serialize_queryset(queryset)
return result
results = _serialize_model(model)
# 剔除排斥的字段
excludes = excludes or []
for field in excludes:
try:
del results[string_convert(field)]
except KeyError:
# 这一步忽略foreign_key会爆异常,临时解决一下
del results[string_convert(field + "Id")]
return results
def serialize_queryset(queryset: models.QuerySet) -> List[Dict]:
return [serialize_model(model) for model in queryset]
第二步,实现核心功能 export_extensions.py
import time
import xlsxwriter
from django.conf import settings
from django.http import StreamingHttpResponse
# 这里导包注意下路径
from app1.serialze import serialize_model
def file_iterator(file, offset=512):
with open(file, 'rb') as f:
while True:
c = f.read(offset)
if c:
yield c
else:
break
def get_excel_response(queryset, excludes=[]):
"""
excludes: 排除哪些字段
"""
start_time = time.time()
meta = queryset.model._meta # noqa
excel_writer = settings.BASE_DIR / f'media/temp_{meta.model_name}.xlsx'
workbook = xlsxwriter.Workbook(excel_writer)
worksheet = workbook.add_worksheet('sheet1')
stu = meta.fields
col_name = [stu[x].verbose_name or stu[x].name
for x in range(len(stu))
if stu[x].name not in excludes]
worksheet.write_row('A1', col_name)
# data_list = serialize_queryset(queryset)
for row, model in enumerate(queryset):
obj = serialize_model(model, excludes=excludes)
for col, (k, v) in enumerate(obj.items()):
worksheet.write(row + 1, col, v)
workbook.close()
response = StreamingHttpResponse(file_iterator(excel_writer.as_posix()))
response['Content-Type'] = 'application/octet-stream'
response['Content-Disposition'] = f'attachment;filename="export_{meta.model_name}.xlsx"'
print('导出耗时:', time.time() - start_time)
return response
第三步,在admin.py里添加action
@admin.register(Country)
class CountryAdmin(admin.ModelAdmin):
list_display = ['id', 'name', 'code', 'district']
actions = ['export_excel', ]
@admin.action(description='导出选中Excel')
def export_excel(self, request, queryset):
return get_excel_response(queryset, ['id'])
以上完毕