from __future__ import annotations from celery import shared_task from django.conf import settings from django.core.files.base import ContentFile from django.urls import reverse from django.utils import timezone from apps.notifications.services import RedisNotificationStore from apps.reports.models import ReportExportJob from apps.reports.services import build_table_report, build_user_scoped_table_reports from apps.reports.services.export_i18n import build_export_locale from apps.reports.services.exporters import build_excel_report, build_pdf_report def _build_export_action_url(job: ReportExportJob) -> str: path = reverse("report-export-job-download", kwargs={"pk": job.id}) if settings.BASE_URL: return f"{settings.BASE_URL.rstrip('/')}{path}" return path @shared_task(name="reports.generate_export") def generate_report_export_task(job_id: str): job = ReportExportJob.objects.filter(id=job_id).first() if not job: return None job.mark_processing() try: locale = build_export_locale(job.filters.get("language")) report_data = build_table_report(job.requesting_user, job.filters) if job.export_type == ReportExportJob.ExportType.EXCEL: per_user_reports = build_user_scoped_table_reports(job.requesting_user, job.filters) content = build_excel_report(report_data=report_data, locale=locale, per_user_reports=per_user_reports) suffix = "xlsx" mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" else: content = build_pdf_report(report_data=report_data, locale=locale) suffix = "pdf" mime_type = "application/pdf" file_name = f"workspace-report-{timezone.now().strftime('%Y%m%d-%H%M%S')}.{suffix}" storage_name = f"reports/exports/{job.id}-{file_name}" job.file.save(storage_name, ContentFile(content), save=False) job.status = ReportExportJob.Status.COMPLETED job.file_name = file_name job.completed_at = timezone.now() job.expires_at = job.completed_at + timezone.timedelta(days=settings.REPORT_EXPORT_RETENTION_DAYS) job.error_message = "" job.save( update_fields=[ "file", "status", "file_name", "completed_at", "expires_at", "error_message", "updated_at", ] ) action_url = _build_export_action_url(job) RedisNotificationStore.add( str(job.requesting_user_id), { "type": "report_export_ready", "title": "Report export is ready", "message": f"Your {job.export_type.upper()} report for {job.workspace.name} is ready to download.", "level": "success", "action_url": action_url, "entity_type": "report_export", "entity_id": str(job.id), "meta": { "workspace_id": str(job.workspace_id), "workspace_name": job.workspace.name, "export_type": job.export_type, "file_name": file_name, "download_url": action_url, "mime_type": mime_type, }, }, ) return str(job.id) except Exception as exc: # noqa: BLE001 job.mark_failed(str(exc)) RedisNotificationStore.add( str(job.requesting_user_id), { "type": "report_export_failed", "title": "Report export failed", "message": f"Your {job.export_type.upper()} report for {job.workspace.name} could not be generated.", "level": "error", "action_url": "/reports", "entity_type": "report_export", "entity_id": str(job.id), "meta": { "workspace_id": str(job.workspace_id), "workspace_name": job.workspace.name, "export_type": job.export_type, }, }, ) raise @shared_task(name="reports.cleanup_expired_exports") def cleanup_expired_report_exports_task(): expired_jobs = ReportExportJob.objects.filter( status=ReportExportJob.Status.COMPLETED, expires_at__lte=timezone.now(), ) removed = 0 for job in expired_jobs: if job.file: job.file.delete(save=False) job.mark_expired() removed += 1 return removed