Developer 2024 🎇限时优惠进行中,现在购买立即享受

现在购买

自己动手撸一个导出excel文件功能

avatar底层群员-吴彦祖Pro
7月27日636次阅读

这里用到的第三方包是xlsxwriter,这个包可以导出特殊字符和emoji表情

第一步,新建一个序列化器serialze.py

from typing import Dict, List

from django.core.exceptions import FieldDoesNotExist
from django.db import models

"""自定义序列化
"""


def string_convert(string: str):
    """
    将连字符格式转成小驼峰格式
    example: user_name -> userName
    """

    char_list = string.split("_")

    # 首个单词不转换
    for index, char in enumerate(char_list[1:], start=1):
        char_list[index] = char.capitalize()

    return "".join(char_list)


def serialize_model(model: models.Model, excludes: list = None) -> Dict:
    """
    模型序列化,会根据 select_related 和 prefetch_related 关联查询的结果进行序列化,可以在查询时使用 only、defer 来筛选序列化的字段。
    它不会自做主张的去查询数据库,只用你查询出来的结果,成功避免了 N+1 查询问题。

    # 具体请看:
    https://aber.sh/articles/A-new-idea-of-serializing-Django-model/
    """
    serialized = set()

    def _serialize_model(obj) -> Dict:

        # 当 model 存在一对一字段时,会陷入循环,使用闭包的自由变量存储已序列化的 model,
        # 在第二次循环到该 model 时直接返回 model.pk,不再循环。
        nonlocal serialized
        if obj in serialized:
            return obj.pk
        else:
            serialized.add(obj)

        # 当 model 存在一对一或一对多字段,且该字段的值为 None 时,直接返回空{},否则会报错。
        if obj is None:
            return {}

        result = {
            string_convert(name): _serialize_model(foreign_key)
            for name, foreign_key in obj.__dict__["_state"].__dict__.get("fields_cache", {}).items()
        }

        for name, value in obj.__dict__.items():
            buried_fields = getattr(obj, "buried_fields", [])
            if name in buried_fields:
                # 不可暴露的字段
                continue
            try:
                obj._meta.get_field(name)  # noqa
            except FieldDoesNotExist:
                # 非模型字段
                continue
            else:
                result[string_convert(name)] = value

        for name, queryset in obj.__dict__.get("_prefetched_objects_cache", {}).items():
            result[string_convert(name)] = serialize_queryset(queryset)

        return result

    results = _serialize_model(model)

    # 剔除排斥的字段
    excludes = excludes or []
    for field in excludes:
        try:
            del results[string_convert(field)]
        except KeyError:
            # 这一步忽略foreign_key会爆异常,临时解决一下
            del results[string_convert(field + "Id")]

    return results


def serialize_queryset(queryset: models.QuerySet) -> List[Dict]:
    return [serialize_model(model) for model in queryset]

第二步,实现核心功能 export_extensions.py

import time

import xlsxwriter
from django.conf import settings
from django.http import StreamingHttpResponse

# 这里导包注意下路径
from app1.serialze import serialize_model


def file_iterator(file, offset=512):
    with open(file, 'rb') as f:
        while True:
            c = f.read(offset)
            if c:
                yield c
            else:
                break


def get_excel_response(queryset, excludes=[]):
    """
    excludes: 排除哪些字段
    """
    start_time = time.time()
    meta = queryset.model._meta  # noqa
    excel_writer = settings.BASE_DIR / f'media/temp_{meta.model_name}.xlsx'
    workbook = xlsxwriter.Workbook(excel_writer)
    worksheet = workbook.add_worksheet('sheet1')

    stu = meta.fields

    col_name = [stu[x].verbose_name or stu[x].name
                for x in range(len(stu))
                if stu[x].name not in excludes]

    worksheet.write_row('A1', col_name)
    # data_list = serialize_queryset(queryset)
    for row, model in enumerate(queryset):
        obj = serialize_model(model, excludes=excludes)
        for col, (k, v) in enumerate(obj.items()):
            worksheet.write(row + 1, col, v)

    workbook.close()
    response = StreamingHttpResponse(file_iterator(excel_writer.as_posix()))
    response['Content-Type'] = 'application/octet-stream'
    response['Content-Disposition'] = f'attachment;filename="export_{meta.model_name}.xlsx"'
    print('导出耗时:', time.time() - start_time)
    return response

第三步,在admin.py里添加action

@admin.register(Country)
class CountryAdmin(admin.ModelAdmin):
    list_display = ['id', 'name', 'code', 'district']
    actions = ['export_excel', ]

    @admin.action(description='导出选中Excel')
    def export_excel(self, request, queryset):
        return get_excel_response(queryset, ['id'])

以上完毕

发布评论
登录后发表内容
1个评论