test(backend): convert existing app suites to unittest

This commit is contained in:
2026-04-30 12:41:54 +03:30
parent 204225dd16
commit 8774a4d4dc
16 changed files with 1785 additions and 1780 deletions

View File

@@ -1,264 +1,111 @@
from datetime import timedelta
from decimal import Decimal
from io import BytesIO
from unittest.mock import patch
import pytest
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from django.test import TestCase
from django.utils import timezone
from openpyxl import load_workbook
from apps.notifications.services import store as notification_store
from apps.reports.models import ReportExportJob
from apps.reports.tasks import cleanup_expired_report_exports_task, generate_report_export_task
from apps.time_entries.models import TimeEntry
from apps.reports.tasks import (
cleanup_expired_report_exports_task,
generate_report_export_task,
)
from apps.users.models import User
from apps.workspaces.models import Workspace
class FakeRedis:
def pipeline(self):
return self
class ReportTaskTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = User.objects.create_user(
mobile="09129990001",
password="secret123",
first_name="Owner",
last_name="User",
)
cls.workspace = Workspace.objects.create(name="Exports", owner=cls.owner)
def zadd(self, *args, **kwargs):
return self
def test_generate_excel_export_marks_job_complete_and_sends_notification(self):
job = ReportExportJob.objects.create(
requesting_user=self.owner,
workspace=self.workspace,
export_type=ReportExportJob.ExportType.EXCEL,
filters={
"workspace": str(self.workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": str(self.owner.id),
"client": None,
"project": None,
"tags": [],
"language": "en",
},
)
def hset(self, *args, **kwargs):
return self
with patch("apps.reports.tasks.build_table_report", return_value={"scope": {}, "summary": {}, "days": [], "clients": [], "projects": [], "tags": []}) as build_table_report:
with patch("apps.reports.tasks.build_user_scoped_table_reports", return_value=[]) as build_user_reports:
with patch("apps.reports.tasks.build_excel_report", return_value=b"excel-content") as build_excel_report:
with patch("apps.reports.tasks.RedisNotificationStore.add") as notify:
generate_report_export_task(str(job.id))
def sadd(self, *args, **kwargs):
return self
job.refresh_from_db()
self.assertEqual(job.status, ReportExportJob.Status.COMPLETED)
self.assertTrue(bool(job.file))
self.assertTrue(default_storage.exists(job.file.name))
build_table_report.assert_called_once()
build_user_reports.assert_called_once()
build_excel_report.assert_called_once()
notify.assert_called_once()
self.assertEqual(notify.call_args.args[0], str(self.owner.id))
self.assertEqual(notify.call_args.args[1]["type"], "report_export_ready")
def execute(self):
return []
def test_generate_pdf_export_failure_marks_job_failed_and_notifies(self):
job = ReportExportJob.objects.create(
requesting_user=self.owner,
workspace=self.workspace,
export_type=ReportExportJob.ExportType.PDF,
filters={
"workspace": str(self.workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": str(self.owner.id),
"client": None,
"project": None,
"tags": [],
"language": "fa",
},
)
def publish(self, *args, **kwargs):
return None
with patch("apps.reports.tasks.build_table_report", return_value={"scope": {}, "summary": {}, "days": [], "clients": [], "projects": [], "tags": []}):
with patch("apps.reports.tasks.build_pdf_report", side_effect=RuntimeError("boom")):
with patch("apps.reports.tasks.RedisNotificationStore.add") as notify:
with self.assertRaises(RuntimeError):
generate_report_export_task(str(job.id))
def zrevrange(self, *args, **kwargs):
return []
job.refresh_from_db()
self.assertEqual(job.status, ReportExportJob.Status.FAILED)
self.assertEqual(job.error_message, "boom")
notify.assert_called_once()
self.assertEqual(notify.call_args.args[1]["type"], "report_export_failed")
def hget(self, *args, **kwargs):
return None
def test_cleanup_expires_and_removes_files(self):
job = ReportExportJob.objects.create(
requesting_user=self.owner,
workspace=self.workspace,
export_type=ReportExportJob.ExportType.EXCEL,
status=ReportExportJob.Status.COMPLETED,
filters={},
expires_at=timezone.now() - timezone.timedelta(days=1),
)
file_name = f"reports/exports/{job.id}-old.xlsx"
job.file.save(file_name, ContentFile(b"old-data"), save=False)
job.save(update_fields=["file", "updated_at"])
def zrem(self, *args, **kwargs):
return 1
removed = cleanup_expired_report_exports_task()
job.refresh_from_db()
def hdel(self, *args, **kwargs):
return 1
def zcard(self, *args, **kwargs):
return 0
def smembers(self, *args, **kwargs):
return set()
def srem(self, *args, **kwargs):
return 1
@pytest.fixture()
def fake_redis(monkeypatch):
redis = FakeRedis()
monkeypatch.setattr(notification_store, "redis_client", redis)
return redis
@pytest.fixture()
def owner(db):
return User.objects.create_user(mobile="09129990001", password="secret123", first_name="Owner", last_name="User")
@pytest.fixture()
def teammate(db):
return User.objects.create_user(mobile="09129990002", password="secret123", first_name="Team", last_name="Mate")
@pytest.fixture()
def workspace(owner, teammate):
workspace = Workspace.objects.create(name="Exports", owner=owner)
workspace.memberships.create(user=teammate, role="member", is_active=True)
return workspace
@pytest.fixture()
def time_entry(workspace, owner):
return TimeEntry.objects.create(
workspace=workspace,
user=owner,
description="Export row",
start_time="2026-04-12T08:00:00+03:30",
end_time="2026-04-12T10:00:00+03:30",
duration=timedelta(hours=2),
is_billable=True,
hourly_rate=Decimal("15.00"),
currency="USD",
)
@pytest.fixture()
def teammate_entry(workspace, teammate):
return TimeEntry.objects.create(
workspace=workspace,
user=teammate,
description="Team row",
start_time="2026-04-13T08:00:00+03:30",
end_time="2026-04-13T09:00:00+03:30",
duration=timedelta(hours=1),
is_billable=False,
currency="USD",
)
def test_generate_export_creates_file_and_marks_job_complete(fake_redis, workspace, owner, time_entry):
job = ReportExportJob.objects.create(
requesting_user=owner,
workspace=workspace,
export_type=ReportExportJob.ExportType.EXCEL,
filters={
"workspace": str(workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": str(owner.id),
"client": None,
"project": None,
"tags": [],
"language": "en",
},
)
generate_report_export_task(str(job.id))
job.refresh_from_db()
assert job.status == ReportExportJob.Status.COMPLETED
assert bool(job.file)
assert default_storage.exists(job.file.name)
def test_generate_excel_export_adds_per_user_sheets_for_all_users_scope(
fake_redis,
workspace,
owner,
teammate,
time_entry,
teammate_entry,
):
job = ReportExportJob.objects.create(
requesting_user=owner,
workspace=workspace,
export_type=ReportExportJob.ExportType.EXCEL,
filters={
"workspace": str(workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": None,
"client": None,
"project": None,
"tags": [],
"language": "en",
},
)
generate_report_export_task(str(job.id))
job.refresh_from_db()
workbook = load_workbook(BytesIO(job.file.read()))
assert workbook.sheetnames[0] == "Overall Report"
assert any("Owner User" in sheet for sheet in workbook.sheetnames[1:])
assert any("Team Mate" in sheet for sheet in workbook.sheetnames[1:])
assert len(workbook.sheetnames) == 3
def test_generate_excel_export_includes_daily_rate_column_and_split_user_meta(
fake_redis,
workspace,
owner,
time_entry,
):
job = ReportExportJob.objects.create(
requesting_user=owner,
workspace=workspace,
export_type=ReportExportJob.ExportType.EXCEL,
filters={
"workspace": str(workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": str(owner.id),
"client": None,
"project": None,
"tags": [],
"language": "en",
},
)
generate_report_export_task(str(job.id))
job.refresh_from_db()
workbook = load_workbook(BytesIO(job.file.read()))
worksheet = workbook.active
values = list(worksheet.iter_rows(values_only=True))
assert any(row[:2] == ("User", "Owner User") for row in values if row)
assert any(row[:2] == ("Mobile", "09129990001") for row in values if row)
daily_header = next(row[:6] for row in values if row and row[0] == "Date")
assert daily_header == (
"Date",
"Billable hours",
"Non-billable hours",
"Total hours",
"Hourly rate",
"Income",
)
daily_row = next(row[:6] for row in values if row and row[0] == "2026/04/12")
assert daily_row[4] == "15 USD"
def test_generate_pdf_export_supports_persian_locale(fake_redis, workspace, owner, time_entry):
job = ReportExportJob.objects.create(
requesting_user=owner,
workspace=workspace,
export_type=ReportExportJob.ExportType.PDF,
filters={
"workspace": str(workspace.id),
"period": "this_month",
"from_date": "2026-04-01",
"to_date": "2026-04-30",
"user": str(owner.id),
"client": None,
"project": None,
"tags": [],
"language": "fa",
},
)
generate_report_export_task(str(job.id))
job.refresh_from_db()
assert job.status == ReportExportJob.Status.COMPLETED
assert job.file.read(4) == b"%PDF"
def test_cleanup_expires_and_removes_files(fake_redis, workspace, owner):
job = ReportExportJob.objects.create(
requesting_user=owner,
workspace=workspace,
export_type=ReportExportJob.ExportType.EXCEL,
status=ReportExportJob.Status.COMPLETED,
filters={},
expires_at=timezone.now() - timezone.timedelta(days=1),
)
file_name = f"reports/exports/{job.id}-old.xlsx"
job.file.save(file_name, ContentFile(b"old-data"), save=False)
job.save(update_fields=["file", "updated_at"])
removed = cleanup_expired_report_exports_task()
job.refresh_from_db()
assert removed == 1
assert job.status == ReportExportJob.Status.EXPIRED
assert not default_storage.exists(file_name)
self.assertEqual(removed, 1)
self.assertEqual(job.status, ReportExportJob.Status.EXPIRED)
self.assertFalse(default_storage.exists(file_name))