diff --git a/apps/logs/tests/test_views.py b/apps/logs/tests/test_views.py index ec75d4f..16864a2 100644 --- a/apps/logs/tests/test_views.py +++ b/apps/logs/tests/test_views.py @@ -1,181 +1,161 @@ -from __future__ import annotations - -from datetime import timedelta - -import pytest from auditlog.models import LogEntry from rest_framework_simplejwt.tokens import AccessToken -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.reports.models import ReportExportJob from apps.users.models import User from apps.workspaces.models import Workspace, WorkspaceMembership -@pytest.fixture() -def api_client(): - return APIClient() +class WorkspaceLogViewTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.owner = cls._user(1) + cls.admin = cls._user(2) + cls.member = cls._user(3) + cls.outsider = cls._user(4) + cls.workspace = Workspace.objects.create( + name="Logs WS", + description="", + owner=cls.owner, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.admin, + role=WorkspaceMembership.Role.ADMIN, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) -def _user(index: int) -> User: - return User.objects.create_user( - mobile=f"093355500{index:02d}", - password="secret123", - first_name=f"Log{index}", - last_name="User", - ) + @staticmethod + def _user(index): + return User.objects.create_user( + mobile=f"093355500{index:02d}", + password="secret123", + first_name=f"Log{index}", + last_name="User", + ) + @staticmethod + def _auth_headers(user): + token = str(AccessToken.for_user(user)) + return {"HTTP_AUTHORIZATION": f"Bearer {token}"} -@pytest.fixture() -def owner(db): - return _user(1) + def _create_tag(self, user, *, name="Audit Tag"): + return self.client.post( + "/api/tags/", + { + "workspace_id": str(self.workspace.id), + "name": name, + "color": "#123456", + }, + format="json", + **self._auth_headers(user), + ) + def test_owner_and_admin_can_list_workspace_logs(self): + create_response = self._create_tag(self.owner) + self.assertEqual(create_response.status_code, 201) -@pytest.fixture() -def admin(db): - return _user(2) + owner_response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}", + **self._auth_headers(self.owner), + ) + admin_response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}", + **self._auth_headers(self.admin), + ) + self.assertEqual(owner_response.status_code, 200) + self.assertEqual(admin_response.status_code, 200) + self.assertEqual(owner_response.data["items"][0]["section"], "tags") -@pytest.fixture() -def member(db): - return _user(3) + def test_member_and_non_member_cannot_list_workspace_logs(self): + self._create_tag(self.owner) + member_response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}", + **self._auth_headers(self.member), + ) + outsider_response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}", + **self._auth_headers(self.outsider), + ) -@pytest.fixture() -def outsider(db): - return _user(4) + self.assertEqual(member_response.status_code, 403) + self.assertEqual(outsider_response.status_code, 403) + def test_jwt_authenticated_writes_capture_actor_and_workspace_metadata(self): + response = self._create_tag(self.owner, name="JWT Tag") + self.assertEqual(response.status_code, 201) -@pytest.fixture() -def workspace(owner, admin, member): - workspace = Workspace.objects.create(name="Logs WS", description="", owner=owner) - WorkspaceMembership.objects.create( - workspace=workspace, - user=admin, - role=WorkspaceMembership.Role.ADMIN, - is_active=True, - ) - WorkspaceMembership.objects.create( - workspace=workspace, - user=member, - role=WorkspaceMembership.Role.MEMBER, - is_active=True, - ) - return workspace + log_entry = LogEntry.objects.filter(content_type__app_label="tags").latest( + "timestamp" + ) + self.assertEqual(log_entry.actor_id, self.owner.id) + self.assertEqual(log_entry.additional_data["workspace_id"], str(self.workspace.id)) + self.assertEqual(log_entry.additional_data["section"], "tags") -def _auth_headers(user: User) -> dict: - token = str(AccessToken.for_user(user)) - return {"HTTP_AUTHORIZATION": f"Bearer {token}"} + def test_logs_support_section_filter_and_detail(self): + tag_response = self._create_tag(self.owner, name="Filtered Tag") + self.assertEqual(tag_response.status_code, 201) + list_response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}§ion=tags", + **self._auth_headers(self.owner), + ) -def _create_tag(client: APIClient, user: User, workspace: Workspace, *, name="Audit Tag"): - return client.post( - "/api/tags/", - {"workspace_id": str(workspace.id), "name": name, "color": "#123456"}, - format="json", - **_auth_headers(user), - ) + self.assertEqual(list_response.status_code, 200) + self.assertTrue(list_response.data["items"]) + log_id = list_response.data["items"][0]["id"] + detail_response = self.client.get( + f"/api/logs/{log_id}/", + **self._auth_headers(self.owner), + ) -@pytest.mark.django_db -def test_owner_and_admin_can_list_workspace_logs(api_client, owner, admin, workspace): - create_response = _create_tag(api_client, owner, workspace) - assert create_response.status_code == 201 + self.assertEqual(detail_response.status_code, 200) + self.assertEqual(detail_response.data["target"]["name"], "Filtered Tag") + self.assertTrue(detail_response.data["changes"]) - owner_response = api_client.get( - f"/api/logs/?workspace={workspace.id}", - **_auth_headers(owner), - ) - admin_response = api_client.get( - f"/api/logs/?workspace={workspace.id}", - **_auth_headers(admin), - ) + def test_soft_delete_and_actorless_background_logs_are_filtered(self): + create_response = self._create_tag(self.owner, name="Delete Me") + self.assertEqual(create_response.status_code, 201) + tag_id = create_response.data["id"] - assert owner_response.status_code == 200 - assert admin_response.status_code == 200 - assert owner_response.data["items"][0]["section"] == "tags" + delete_response = self.client.delete( + f"/api/tags/{tag_id}/", + **self._auth_headers(self.owner), + ) + self.assertEqual(delete_response.status_code, 204) + ReportExportJob.objects.create( + requesting_user=self.owner, + workspace=self.workspace, + export_type=ReportExportJob.ExportType.PDF, + filters={"workspace": str(self.workspace.id)}, + status=ReportExportJob.Status.PENDING, + ) -@pytest.mark.django_db -def test_member_and_non_member_cannot_list_workspace_logs(api_client, owner, member, outsider, workspace): - _create_tag(api_client, owner, workspace) - - member_response = api_client.get( - f"/api/logs/?workspace={workspace.id}", - **_auth_headers(member), - ) - outsider_response = api_client.get( - f"/api/logs/?workspace={workspace.id}", - **_auth_headers(outsider), - ) - - assert member_response.status_code == 403 - assert outsider_response.status_code == 403 - - -@pytest.mark.django_db -def test_jwt_authenticated_writes_capture_actor_and_workspace_metadata(api_client, owner, workspace): - response = _create_tag(api_client, owner, workspace, name="JWT Tag") - assert response.status_code == 201 - - log_entry = LogEntry.objects.filter(content_type__app_label="tags").latest("timestamp") - - assert log_entry.actor_id == owner.id - assert log_entry.additional_data["workspace_id"] == str(workspace.id) - assert log_entry.additional_data["section"] == "tags" - - -@pytest.mark.django_db -def test_logs_support_section_filter_and_detail(api_client, owner, workspace): - tag_response = _create_tag(api_client, owner, workspace, name="Filtered Tag") - assert tag_response.status_code == 201 - - list_response = api_client.get( - f"/api/logs/?workspace={workspace.id}§ion=tags", - **_auth_headers(owner), - ) - - assert list_response.status_code == 200 - assert list_response.data["items"] - log_id = list_response.data["items"][0]["id"] - - detail_response = api_client.get( - f"/api/logs/{log_id}/", - **_auth_headers(owner), - ) - - assert detail_response.status_code == 200 - assert detail_response.data["target"]["name"] == "Filtered Tag" - assert detail_response.data["changes"] - - -@pytest.mark.django_db -def test_soft_delete_and_actorless_background_logs_are_filtered(api_client, owner, workspace): - create_response = _create_tag(api_client, owner, workspace, name="Delete Me") - assert create_response.status_code == 201 - tag_id = create_response.data["id"] - - delete_response = api_client.delete( - f"/api/tags/{tag_id}/", - **_auth_headers(owner), - ) - assert delete_response.status_code == 204 - - ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.PDF, - filters={"workspace": str(workspace.id)}, - status=ReportExportJob.Status.PENDING, - ) - - response = api_client.get( - f"/api/logs/?workspace={workspace.id}&event=delete", - **_auth_headers(owner), - ) - - assert response.status_code == 200 - assert any(item["event"] == "delete" and item["section"] == "tags" for item in response.data["items"]) - assert all(item["section"] != "report_exports" for item in response.data["items"]) + response = self.client.get( + f"/api/logs/?workspace={self.workspace.id}&event=delete", + **self._auth_headers(self.owner), + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + any( + item["event"] == "delete" and item["section"] == "tags" + for item in response.data["items"] + ) + ) + self.assertTrue( + all(item["section"] != "report_exports" for item in response.data["items"]) + ) diff --git a/apps/notifications/tests/fakes.py b/apps/notifications/tests/fakes.py new file mode 100644 index 0000000..cd5769d --- /dev/null +++ b/apps/notifications/tests/fakes.py @@ -0,0 +1,126 @@ +import json +from collections import defaultdict + + +class FakePipeline: + def __init__(self, client): + self.client = client + self.operations = [] + + def __getattr__(self, name): + def wrapper(*args, **kwargs): + self.operations.append((name, args, kwargs)) + return self + + return wrapper + + def execute(self): + results = [] + for name, args, kwargs in self.operations: + results.append(getattr(self.client, name)(*args, **kwargs)) + self.operations.clear() + return results + + +class FakePubSub: + def __init__(self): + self.channels = [] + self.messages = [] + self.closed = False + + def subscribe(self, channel): + self.channels.append(channel) + + def unsubscribe(self, channel): + if channel in self.channels: + self.channels.remove(channel) + + def get_message(self, timeout=1.0): + if self.messages: + return self.messages.pop(0) + return None + + def close(self): + self.closed = True + + +class FakeRedis: + def __init__(self): + self.sorted_sets = defaultdict(dict) + self.hashes = defaultdict(dict) + self.sets = defaultdict(set) + self.published = [] + self.pubsub_instance = FakePubSub() + + def pipeline(self): + return FakePipeline(self) + + def zadd(self, key, mapping): + self.sorted_sets[key].update(mapping) + return len(mapping) + + def hset(self, key, field, value): + self.hashes[key][field] = value + return 1 + + def sadd(self, key, *members): + before = len(self.sets[key]) + self.sets[key].update(members) + return len(self.sets[key]) - before + + def zrevrange(self, key, start, stop): + items = sorted( + self.sorted_sets[key].items(), + key=lambda item: (item[1], item[0]), + reverse=True, + ) + if stop == -1: + return [member for member, _ in items[start:]] + return [member for member, _ in items[start : stop + 1]] + + def hget(self, key, field): + return self.hashes[key].get(field) + + def zrem(self, key, *members): + removed = 0 + for member in members: + if member in self.sorted_sets[key]: + del self.sorted_sets[key][member] + removed += 1 + return removed + + def hdel(self, key, *fields): + removed = 0 + for field in fields: + if field in self.hashes[key]: + del self.hashes[key][field] + removed += 1 + return removed + + def smembers(self, key): + return set(self.sets[key]) + + def srem(self, key, member): + if member in self.sets[key]: + self.sets[key].remove(member) + return 1 + return 0 + + def zrangebyscore(self, key, min_score, max_score): + lower = float("-inf") if min_score == "-inf" else float(min_score) + upper = float(max_score) + return [ + member + for member, score in self.sorted_sets[key].items() + if lower <= score <= upper + ] + + def zcard(self, key): + return len(self.sorted_sets[key]) + + def publish(self, channel, message): + self.published.append((channel, json.loads(message))) + return 1 + + def pubsub(self, ignore_subscribe_messages=True): + return self.pubsub_instance diff --git a/apps/notifications/tests/test_membership_events.py b/apps/notifications/tests/test_membership_events.py index 02c82de..d276eee 100644 --- a/apps/notifications/tests/test_membership_events.py +++ b/apps/notifications/tests/test_membership_events.py @@ -1,159 +1,137 @@ -import pytest +from django.test import TestCase from rest_framework.test import APIClient from apps.notifications.services import store as services from apps.notifications.services import RedisNotificationStore -from apps.notifications.tests.test_services import FakeRedis +from apps.notifications.tests.fakes import FakeRedis from apps.users.models import User from apps.workspaces.models import Workspace, WorkspaceMembership -@pytest.fixture() -def fake_redis(monkeypatch): - redis = FakeRedis() - monkeypatch.setattr(services, "redis_client", redis) - return redis +class WorkspaceMembershipNotificationTests(TestCase): + @classmethod + def setUpTestData(cls): + cls.owner = cls._create_user(1) + cls.member = cls._create_user(2) + @staticmethod + def _create_user(index): + return User.objects.create_user( + mobile=f"091200000{index:02d}", + password="secret123", + first_name=f"User{index}", + ) -@pytest.fixture() -def api_client(): - return APIClient() + def setUp(self): + self.client = APIClient() + self.fake_redis = FakeRedis() + self.original_redis_client = services.redis_client + services.redis_client = self.fake_redis + def tearDown(self): + services.redis_client = self.original_redis_client -def _create_user(index: int) -> User: - return User.objects.create_user( - mobile=f"091200000{index:02d}", - password="secret123", - first_name=f"User{index}", - ) + @staticmethod + def _notifications_for(user): + notifications, _ = RedisNotificationStore.list(str(user.id), paginate=False) + return notifications + def test_workspace_create_notifies_initial_members_not_owner(self): + self.client.force_authenticate(user=self.owner) -def _notifications_for(user): - notifications, _ = RedisNotificationStore.list( - str(user.id), - paginate=False, - ) - return notifications + response = self.client.post( + "/api/workspaces/", + { + "name": "Ops", + "description": "Workspace", + "members": [ + { + "user_id": str(self.member.id), + "role": WorkspaceMembership.Role.ADMIN, + } + ], + }, + format="json", + ) + self.assertEqual(response.status_code, 201) + self.assertEqual(self._notifications_for(self.owner), []) + member_notifications = self._notifications_for(self.member) + self.assertEqual(len(member_notifications), 1) + self.assertEqual(member_notifications[0]["type"], "workspace_membership_added") + self.assertEqual(member_notifications[0]["meta"]["workspace_name"], "Ops") + self.assertEqual( + member_notifications[0]["meta"]["new_role"], + WorkspaceMembership.Role.ADMIN, + ) -@pytest.fixture() -def owner(db): - return _create_user(1) + def test_workspace_membership_crud_emits_all_expected_events(self): + workspace = Workspace.objects.create(name="Design", description="", owner=self.owner) + self.client.force_authenticate(user=self.owner) + create_response = self.client.post( + "/api/workspace-memberships/", + { + "workspace": str(workspace.id), + "user": str(self.member.id), + "role": WorkspaceMembership.Role.MEMBER, + "is_active": True, + }, + format="json", + ) + self.assertEqual(create_response.status_code, 201) -@pytest.fixture() -def member(db): - return _create_user(2) + membership_id = create_response.data["id"] + notifications = self._notifications_for(self.member) + self.assertEqual( + [item["type"] for item in notifications], + ["workspace_membership_added"], + ) + role_response = self.client.patch( + f"/api/workspace-memberships/{membership_id}/", + {"role": WorkspaceMembership.Role.ADMIN}, + format="json", + ) + self.assertEqual(role_response.status_code, 200) -@pytest.fixture() -def another_member(db): - return _create_user(3) + deactivate_response = self.client.patch( + f"/api/workspace-memberships/{membership_id}/", + {"is_active": False}, + format="json", + ) + self.assertEqual(deactivate_response.status_code, 200) + remove_response = self.client.delete( + f"/api/workspace-memberships/{membership_id}/" + ) + self.assertEqual(remove_response.status_code, 204) -@pytest.fixture() -def third_member(db): - return _create_user(4) - - -@pytest.fixture() -def fourth_member(db): - return _create_user(5) - - -def test_workspace_create_notifies_initial_members_not_owner( - fake_redis, api_client, owner, member -): - api_client.force_authenticate(user=owner) - - response = api_client.post( - "/api/workspaces/", - { - "name": "Ops", - "description": "Workspace", - "members": [ - {"user_id": str(member.id), "role": WorkspaceMembership.Role.ADMIN} + notifications = self._notifications_for(self.member) + self.assertEqual( + [item["type"] for item in notifications], + [ + "workspace_membership_removed", + "workspace_membership_deactivated", + "workspace_membership_role_changed", + "workspace_membership_added", ], - }, - format="json", - ) + ) - assert response.status_code == 201 - owner_notifications = _notifications_for(owner) - member_notifications = _notifications_for(member) + def test_workspace_membership_update_skips_self_notifications(self): + workspace = Workspace.objects.create(name="Product", description="", owner=self.owner) + owner_membership = WorkspaceMembership.objects.get( + workspace=workspace, + user=self.owner, + is_deleted=False, + ) + self.client.force_authenticate(user=self.owner) - assert owner_notifications == [] - assert len(member_notifications) == 1 - assert member_notifications[0]["type"] == "workspace_membership_added" - assert member_notifications[0]["meta"]["workspace_name"] == "Ops" - assert member_notifications[0]["meta"]["new_role"] == WorkspaceMembership.Role.ADMIN - - -def test_workspace_membership_crud_emits_add_role_change_deactivate_and_remove( - fake_redis, api_client, owner, member -): - workspace = Workspace.objects.create(name="Design", description="", owner=owner) - api_client.force_authenticate(user=owner) - - create_response = api_client.post( - "/api/workspace-memberships/", - { - "workspace": str(workspace.id), - "user": str(member.id), - "role": WorkspaceMembership.Role.MEMBER, - "is_active": True, - }, - format="json", - ) - assert create_response.status_code == 201 - - membership_id = create_response.data["id"] - notifications = _notifications_for(member) - assert [item["type"] for item in notifications] == ["workspace_membership_added"] - - role_response = api_client.patch( - f"/api/workspace-memberships/{membership_id}/", - {"role": WorkspaceMembership.Role.ADMIN}, - format="json", - ) - assert role_response.status_code == 200 - - deactivate_response = api_client.patch( - f"/api/workspace-memberships/{membership_id}/", - {"is_active": False}, - format="json", - ) - assert deactivate_response.status_code == 200 - - remove_response = api_client.delete(f"/api/workspace-memberships/{membership_id}/") - assert remove_response.status_code == 204 - - notifications = _notifications_for(member) - assert [item["type"] for item in notifications] == [ - "workspace_membership_removed", - "workspace_membership_deactivated", - "workspace_membership_role_changed", - "workspace_membership_added", - ] - - -def test_workspace_membership_update_skips_self_notifications( - fake_redis, api_client, owner -): - workspace = Workspace.objects.create(name="Product", description="", owner=owner) - owner_membership = WorkspaceMembership.objects.get( - workspace=workspace, - user=owner, - is_deleted=False, - ) - api_client.force_authenticate(user=owner) - - response = api_client.patch( - f"/api/workspace-memberships/{owner_membership.id}/", - {"role": WorkspaceMembership.Role.OWNER}, - format="json", - ) - - assert response.status_code == 403 - assert _notifications_for(owner) == [] + response = self.client.patch( + f"/api/workspace-memberships/{owner_membership.id}/", + {"role": WorkspaceMembership.Role.OWNER}, + format="json", + ) + self.assertEqual(response.status_code, 403) + self.assertEqual(self._notifications_for(self.owner), []) diff --git a/apps/notifications/tests/test_services.py b/apps/notifications/tests/test_services.py index 647f733..e7d2565 100644 --- a/apps/notifications/tests/test_services.py +++ b/apps/notifications/tests/test_services.py @@ -1,200 +1,78 @@ -import json -from collections import defaultdict - -import pytest +from django.conf import settings +from django.test import TestCase from apps.notifications.services import store as services from apps.notifications.services import RedisNotificationStore +from apps.notifications.tests.fakes import FakeRedis -class FakePipeline: - def __init__(self, client): - self.client = client - self.operations = [] +class RedisNotificationStoreTests(TestCase): + def setUp(self): + self.fake_redis = FakeRedis() + self.original_redis_client = services.redis_client + services.redis_client = self.fake_redis - def __getattr__(self, name): - def wrapper(*args, **kwargs): - self.operations.append((name, args, kwargs)) - return self + def tearDown(self): + services.redis_client = self.original_redis_client - return wrapper + def test_add_publishes_notification_and_unread_count(self): + with self.settings(NOTIFICATIONS_ENABLED=True): + notification = RedisNotificationStore.add( + "user-1", + { + "title": "Build finished", + "message": "Your deploy completed.", + "level": "success", + }, + ) - def execute(self): - results = [] - for name, args, kwargs in self.operations: - results.append(getattr(self.client, name)(*args, **kwargs)) - self.operations.clear() - return results + self.assertEqual(notification["title"], "Build finished") + self.assertEqual(notification["message"], "Your deploy completed.") + self.assertEqual(notification["level"], "success") + self.assertEqual(len(self.fake_redis.published), 2) - -class FakePubSub: - def __init__(self): - self.channels = [] - self.messages = [] - self.closed = False - - def subscribe(self, channel): - self.channels.append(channel) - - def unsubscribe(self, channel): - if channel in self.channels: - self.channels.remove(channel) - - def get_message(self, timeout=1.0): - if self.messages: - return self.messages.pop(0) - return None - - def close(self): - self.closed = True - - -class FakeRedis: - def __init__(self): - self.sorted_sets = defaultdict(dict) - self.hashes = defaultdict(dict) - self.sets = defaultdict(set) - self.published = [] - self.pubsub_instance = FakePubSub() - - def pipeline(self): - return FakePipeline(self) - - def zadd(self, key, mapping): - self.sorted_sets[key].update(mapping) - return len(mapping) - - def hset(self, key, field, value): - self.hashes[key][field] = value - return 1 - - def sadd(self, key, *members): - before = len(self.sets[key]) - self.sets[key].update(members) - return len(self.sets[key]) - before - - def zrevrange(self, key, start, stop): - items = sorted( - self.sorted_sets[key].items(), - key=lambda item: (item[1], item[0]), - reverse=True, + channel, payload = self.fake_redis.published[0] + self.assertEqual( + channel, + f"{settings.NOTIFICATION_REDIS_CHANNEL_PREFIX}:user-1", ) - if stop == -1: - return [member for member, _ in items[start:]] - return [member for member, _ in items[start : stop + 1]] + self.assertEqual(payload["event"], "notification") + self.assertEqual(payload["data"]["notification"]["id"], notification["id"]) + self.assertEqual(payload["data"]["unread_count"], 1) - def hget(self, key, field): - return self.hashes[key].get(field) + def test_mark_seen_and_mark_all_seen_publish_sync_events(self): + with self.settings(NOTIFICATIONS_ENABLED=True): + first = RedisNotificationStore.add("user-2", {"title": "First"}) + RedisNotificationStore.add("user-2", {"title": "Second"}) + self.fake_redis.published.clear() - def zrem(self, key, *members): - removed = 0 - for member in members: - if member in self.sorted_sets[key]: - del self.sorted_sets[key][member] - removed += 1 - return removed + payload = RedisNotificationStore.mark_seen("user-2", first["id"]) - def hdel(self, key, *fields): - removed = 0 - for field in fields: - if field in self.hashes[key]: - del self.hashes[key][field] - removed += 1 - return removed + self.assertEqual(payload["notification_id"], first["id"]) + self.assertFalse(payload["deleted"]) + self.assertTrue(payload["notification"]["is_seen"]) + self.assertEqual(self.fake_redis.published[0][1]["event"], "notification_seen") - def smembers(self, key): - return set(self.sets[key]) + self.fake_redis.published.clear() + updated = RedisNotificationStore.mark_all_seen("user-2") - def srem(self, key, member): - if member in self.sets[key]: - self.sets[key].remove(member) - return 1 - return 0 + self.assertEqual(updated, 2) + self.assertEqual(self.fake_redis.published[0][1]["event"], "notification_mark_all_read") + self.assertEqual(self.fake_redis.published[1][1]["event"], "unread_count") + self.assertEqual(self.fake_redis.published[1][1]["data"]["unread_count"], 0) - def zrangebyscore(self, key, min_score, max_score): - lower = float("-inf") if min_score == "-inf" else float(min_score) - upper = float(max_score) - return [ - member - for member, score in self.sorted_sets[key].items() - if lower <= score <= upper - ] + def test_list_returns_total_count_and_filtered_notifications(self): + RedisNotificationStore.add("user-3", {"title": "General", "type": "general"}) + RedisNotificationStore.add("user-3", {"title": "Billing", "type": "billing"}) + RedisNotificationStore.add("user-3", {"title": "General 2", "type": "general"}) - def zcard(self, key): - return len(self.sorted_sets[key]) + notifications, total_count = RedisNotificationStore.list( + "user-3", + limit=1, + offset=0, + type_filter="general", + ) - def publish(self, channel, message): - self.published.append((channel, json.loads(message))) - return 1 - - def pubsub(self, ignore_subscribe_messages=True): - return self.pubsub_instance - - -@pytest.fixture() -def fake_redis(monkeypatch): - redis = FakeRedis() - monkeypatch.setattr(services, "redis_client", redis) - return redis - - -def test_add_publishes_notification_and_unread_count(fake_redis, settings): - settings.NOTIFICATIONS_ENABLED = True - - notification = RedisNotificationStore.add( - "user-1", - { - "title": "Build finished", - "message": "Your deploy completed.", - "level": "success", - }, - ) - - assert notification["title"] == "Build finished" - assert notification["message"] == "Your deploy completed." - assert notification["level"] == "success" - assert len(fake_redis.published) == 2 - channel, payload = fake_redis.published[0] - assert channel == f"{settings.NOTIFICATION_REDIS_CHANNEL_PREFIX}:user-1" - assert payload["event"] == "notification" - assert payload["data"]["notification"]["id"] == notification["id"] - assert payload["data"]["unread_count"] == 1 - - -def test_mark_seen_and_mark_all_seen_publish_sync_events(fake_redis, settings): - settings.NOTIFICATIONS_ENABLED = True - first = RedisNotificationStore.add("user-2", {"title": "First"}) - second = RedisNotificationStore.add("user-2", {"title": "Second"}) - fake_redis.published.clear() - - payload = RedisNotificationStore.mark_seen("user-2", first["id"]) - - assert payload["notification_id"] == first["id"] - assert payload["deleted"] is False - assert payload["notification"]["is_seen"] is True - assert fake_redis.published[0][1]["event"] == "notification_seen" - - fake_redis.published.clear() - updated = RedisNotificationStore.mark_all_seen("user-2") - - assert updated == 2 - assert fake_redis.published[0][1]["event"] == "notification_mark_all_read" - assert fake_redis.published[1][1]["event"] == "unread_count" - assert fake_redis.published[1][1]["data"]["unread_count"] == 0 - - -def test_list_returns_total_count_and_filtered_notifications(fake_redis): - RedisNotificationStore.add("user-3", {"title": "General", "type": "general"}) - RedisNotificationStore.add("user-3", {"title": "Billing", "type": "billing"}) - RedisNotificationStore.add("user-3", {"title": "General 2", "type": "general"}) - - notifications, total_count = RedisNotificationStore.list( - "user-3", - limit=1, - offset=0, - type_filter="general", - ) - - assert total_count == 2 - assert len(notifications) == 1 - assert notifications[0]["type"] == "general" + self.assertEqual(total_count, 2) + self.assertEqual(len(notifications), 1) + self.assertEqual(notifications[0]["type"], "general") diff --git a/apps/notifications/tests/test_views.py b/apps/notifications/tests/test_views.py index 35dc08e..fccf22b 100644 --- a/apps/notifications/tests/test_views.py +++ b/apps/notifications/tests/test_views.py @@ -1,166 +1,168 @@ import json import time from datetime import timedelta +from unittest.mock import patch -import pytest +from django.test import override_settings from django.utils import timezone -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.notifications.api import views from apps.notifications.services import store as services from apps.notifications.services import RedisNotificationStore -from apps.notifications.tests.test_services import FakePubSub, FakeRedis +from apps.notifications.tests.fakes import FakePubSub, FakeRedis from apps.users.models import User -@pytest.fixture() -def fake_redis(monkeypatch): - redis = FakeRedis() - monkeypatch.setattr(services, "redis_client", redis) - return redis +class NotificationViewTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.user = User.objects.create_user(mobile="09121111111", password="secret123") + cls.second_user = User.objects.create_user( + mobile="09122222222", + password="secret123", + ) + def setUp(self): + self.fake_redis = FakeRedis() + self.original_redis_client = services.redis_client + services.redis_client = self.fake_redis -@pytest.fixture() -def user(db): - return User.objects.create_user(mobile="09121111111", password="secret123") + def tearDown(self): + services.redis_client = self.original_redis_client + @staticmethod + def _read_sse_chunks(response, count): + iterator = iter(response.streaming_content) + chunks = [] + for _ in range(count): + chunk = next(iterator) + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8") + chunks.append(chunk) + response.close() + return chunks -@pytest.fixture() -def second_user(db): - return User.objects.create_user(mobile="09122222222", password="secret123") + @staticmethod + def _parse_sse_data(chunk): + for line in chunk.splitlines(): + if line.startswith("data: "): + return json.loads(line.removeprefix("data: ")) + raise AssertionError("SSE payload did not include data") + def test_stream_token_endpoint_returns_short_lived_token(self): + self.client.force_authenticate(user=self.user) -def _read_sse_chunks(response, count): - iterator = iter(response.streaming_content) - chunks = [] - for _ in range(count): - chunk = next(iterator) - if isinstance(chunk, bytes): - chunk = chunk.decode("utf-8") - chunks.append(chunk) - response.close() - return chunks + response = self.client.post("/api/notifications/stream-token/") + self.assertEqual(response.status_code, 200) + self.assertTrue(response.data["token"]) + self.assertGreater(response.data["expires_in"], 0) -def _parse_sse_data(chunk: str) -> dict: - for line in chunk.splitlines(): - if line.startswith("data: "): - return json.loads(line.removeprefix("data: ")) - raise AssertionError("SSE payload did not include data") + def test_stream_endpoint_rejects_missing_and_expired_token(self): + missing = self.client.get("/api/notifications/stream/") + self.assertEqual(missing.status_code, 401) + with override_settings(NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS=1): + token = views._issue_stream_token_for_user(str(self.user.id)) + time.sleep(1.1) + expired = self.client.get(f"/api/notifications/stream/?token={token}") -def test_stream_token_endpoint_returns_short_lived_token(user): - client = APIClient() - client.force_authenticate(user=user) + self.assertEqual(expired.status_code, 401) - response = client.post("/api/notifications/stream-token/") + def test_stream_endpoint_sends_only_current_users_notifications(self): + RedisNotificationStore.add(str(self.user.id), {"title": "For current user"}) + RedisNotificationStore.add(str(self.second_user.id), {"title": "For another user"}) + pubsub = FakePubSub() - assert response.status_code == 200 - assert response.data["token"] - assert response.data["expires_in"] > 0 + with patch.object( + RedisNotificationStore, + "get_pubsub", + classmethod(lambda cls: pubsub), + ): + token = views._issue_stream_token_for_user(str(self.user.id)) + response = self.client.get( + f"/api/notifications/stream/?token={token}", + HTTP_ACCEPT="text/event-stream", + ) + retry_line, connected_chunk = self._read_sse_chunks(response, 2) + self.assertEqual(response.status_code, 200) + self.assertTrue(retry_line.startswith("retry:")) + connected = self._parse_sse_data(connected_chunk) + self.assertEqual(connected["unread_count"], 1) + self.assertEqual( + [item["title"] for item in connected["notifications"]], + ["For current user"], + ) -def test_stream_endpoint_rejects_missing_and_expired_token(user, settings): - client = APIClient() + def test_stream_endpoint_emits_heartbeat(self): + pubsub = FakePubSub() + first_now = timezone.now() + tick_values = iter( + [ + first_now, + first_now, + first_now + timedelta(seconds=2), + first_now + timedelta(seconds=2), + first_now + timedelta(seconds=2), + first_now + timedelta(seconds=2), + ] + ) + last_tick = first_now + timedelta(seconds=2) - missing = client.get("/api/notifications/stream/") - assert missing.status_code == 401 + def fake_now(): + return next(tick_values, last_tick) - settings.NOTIFICATION_STREAM_TOKEN_LIFETIME_SECONDS = 1 - token = views._issue_stream_token_for_user(str(user.id)) - time.sleep(1.1) + with override_settings(NOTIFICATION_SSE_HEARTBEAT_SECONDS=1): + with patch.object( + RedisNotificationStore, + "get_pubsub", + classmethod(lambda cls: pubsub), + ): + with patch.object(views.timezone, "now", side_effect=fake_now): + view = views.NotificationStreamView() + stream = view._build_stream(str(self.user.id)) + chunks = [next(stream) for _ in range(4)] + stream.close() - expired = client.get(f"/api/notifications/stream/?token={token}") - assert expired.status_code == 401 + self.assertIn("event: ping", chunks[3]) + def test_notification_list_and_seen_endpoints_work(self): + notification = RedisNotificationStore.add( + str(self.user.id), + {"title": "Deploy succeeded", "type": "deploy"}, + ) + self.client.force_authenticate(user=self.user) -def test_stream_endpoint_sends_only_current_users_notifications( - fake_redis, user, second_user, monkeypatch -): - RedisNotificationStore.add(str(user.id), {"title": "For current user"}) - RedisNotificationStore.add(str(second_user.id), {"title": "For another user"}) - pubsub = FakePubSub() - monkeypatch.setattr(RedisNotificationStore, "get_pubsub", classmethod(lambda cls: pubsub)) - token = views._issue_stream_token_for_user(str(user.id)) + list_response = self.client.get("/api/notifications/list/?type=deploy") + self.assertEqual(list_response.status_code, 200) + self.assertEqual(list_response.data["count"], 1) + self.assertEqual(list_response.data["unread_count"], 1) + self.assertEqual( + list_response.data["notifications"][0]["title"], + "Deploy succeeded", + ) - client = APIClient() - response = client.get( - f"/api/notifications/stream/?token={token}", - HTTP_ACCEPT="text/event-stream", - ) - retry_line, connected_chunk = _read_sse_chunks(response, 2) + seen_response = self.client.post( + "/api/notifications/seen/", + {"id": notification["id"]}, + format="json", + ) + self.assertEqual(seen_response.status_code, 200) + self.assertTrue(seen_response.data["marked_read"]) + self.assertTrue(seen_response.data["notification"]["is_seen"]) - assert response.status_code == 200 - assert retry_line.startswith("retry:") - connected = _parse_sse_data(connected_chunk) - assert connected["unread_count"] == 1 - assert [item["title"] for item in connected["notifications"]] == ["For current user"] + def test_notification_delete_endpoint_removes_notification(self): + notification = RedisNotificationStore.add( + str(self.user.id), + {"title": "Delete me", "type": "deploy"}, + ) + self.client.force_authenticate(user=self.user) + response = self.client.delete(f"/api/notifications/{notification['id']}/") -def test_stream_endpoint_emits_heartbeat(fake_redis, user, settings, monkeypatch): - pubsub = FakePubSub() - monkeypatch.setattr(RedisNotificationStore, "get_pubsub", classmethod(lambda cls: pubsub)) - settings.NOTIFICATION_SSE_HEARTBEAT_SECONDS = 1 - - first_now = timezone.now() - tick_values = iter( - [ - first_now, - first_now, - first_now + timedelta(seconds=2), - first_now + timedelta(seconds=2), - first_now + timedelta(seconds=2), - first_now + timedelta(seconds=2), - ] - ) - last_tick = first_now + timedelta(seconds=2) - - def fake_now(): - return next(tick_values, last_tick) - - monkeypatch.setattr(views.timezone, "now", fake_now) - view = views.NotificationStreamView() - stream = view._build_stream(str(user.id)) - - chunks = [next(stream) for _ in range(4)] - stream.close() - - assert "event: ping" in chunks[3] - - -def test_notification_list_and_seen_endpoints_work(fake_redis, user): - notification = RedisNotificationStore.add( - str(user.id), - {"title": "Deploy succeeded", "type": "deploy"}, - ) - - client = APIClient() - client.force_authenticate(user=user) - - list_response = client.get("/api/notifications/list/?type=deploy") - assert list_response.status_code == 200 - assert list_response.data["count"] == 1 - assert list_response.data["unread_count"] == 1 - assert list_response.data["notifications"][0]["title"] == "Deploy succeeded" - - seen_response = client.post("/api/notifications/seen/", {"id": notification["id"]}, format="json") - assert seen_response.status_code == 200 - assert seen_response.data["marked_read"] is True - assert seen_response.data["notification"]["is_seen"] is True - - -def test_notification_delete_endpoint_removes_notification(fake_redis, user): - notification = RedisNotificationStore.add( - str(user.id), - {"title": "Delete me", "type": "deploy"}, - ) - - client = APIClient() - client.force_authenticate(user=user) - - response = client.delete(f"/api/notifications/{notification['id']}/") - - assert response.status_code == 200 - assert response.data["deleted"] is True - assert response.data["notification_id"] == notification["id"] - assert RedisNotificationStore.get(str(user.id), notification["id"]) is None + self.assertEqual(response.status_code, 200) + self.assertTrue(response.data["deleted"]) + self.assertEqual(response.data["notification_id"], notification["id"]) + self.assertIsNone(RedisNotificationStore.get(str(self.user.id), notification["id"])) diff --git a/apps/projects/tests/test_views.py b/apps/projects/tests/test_views.py index ac80b6d..a1968cc 100644 --- a/apps/projects/tests/test_views.py +++ b/apps/projects/tests/test_views.py @@ -1,5 +1,4 @@ -import pytest -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.clients.models import Client from apps.projects.models import Project @@ -7,69 +6,65 @@ from apps.users.models import User from apps.workspaces.models import Workspace, WorkspaceMembership -@pytest.fixture() -def api_client(): - return APIClient() +class ProjectViewTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.owner = User.objects.create_user( + mobile="09121110001", + password="secret123", + first_name="Owner", + ) + cls.workspace = Workspace.objects.create(name="Projects", owner=cls.owner) + cls.member = User.objects.create_user( + mobile="09121110002", + password="secret123", + first_name="Member", + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) + cls.first_client = Client.objects.create(workspace=cls.workspace, name="Acme") + cls.second_client = Client.objects.create(workspace=cls.workspace, name="Globex") + cls.third_client = Client.objects.create(workspace=cls.workspace, name="Initech") + Project.objects.create( + workspace=cls.workspace, + client=cls.first_client, + name="Alpha", + ) + Project.objects.create( + workspace=cls.workspace, + client=cls.second_client, + name="Beta", + ) + Project.objects.create( + workspace=cls.workspace, + client=cls.third_client, + name="Gamma", + ) + def test_project_list_supports_multi_client_filter(self): + self.client.force_authenticate(user=self.member) -@pytest.fixture() -def owner(db): - return User.objects.create_user(mobile="09121110001", password="secret123", first_name="Owner") + response = self.client.get( + "/api/projects/", + [ + ("workspace", str(self.workspace.id)), + ("clients", str(self.first_client.id)), + ("clients", str(self.second_client.id)), + ], + ) - -@pytest.fixture() -def workspace(owner): - return Workspace.objects.create(name="Projects", owner=owner) - - -@pytest.fixture() -def member(db, workspace): - user = User.objects.create_user(mobile="09121110002", password="secret123", first_name="Member") - WorkspaceMembership.objects.create( - workspace=workspace, - user=user, - role=WorkspaceMembership.Role.MEMBER, - is_active=True, - ) - return user - - -@pytest.fixture() -def clients(workspace): - first = Client.objects.create(workspace=workspace, name="Acme") - second = Client.objects.create(workspace=workspace, name="Globex") - third = Client.objects.create(workspace=workspace, name="Initech") - return first, second, third - - -@pytest.fixture() -def projects(workspace, clients): - first, second, third = clients - return [ - Project.objects.create(workspace=workspace, client=first, name="Alpha"), - Project.objects.create(workspace=workspace, client=second, name="Beta"), - Project.objects.create(workspace=workspace, client=third, name="Gamma"), - ] - - -def test_project_list_supports_multi_client_filter(api_client, member, workspace, clients, projects): - api_client.force_authenticate(user=member) - first, second, _ = clients - - response = api_client.get( - "/api/projects/", - [ - ("workspace", str(workspace.id)), - ("clients", str(first.id)), - ("clients", str(second.id)), - ], - ) - - assert response.status_code == 200 - items = ( - response.data - if isinstance(response.data, list) - else response.data.get("results") or response.data.get("items", []) - ) - result_ids = {str(item["client"]["id"]) for item in items} - assert result_ids == {str(first.id), str(second.id)} + self.assertEqual(response.status_code, 200) + items = ( + response.data + if isinstance(response.data, list) + else response.data.get("results") or response.data.get("items", []) + ) + result_ids = {str(item["client"]["id"]) for item in items} + self.assertEqual( + result_ids, + {str(self.first_client.id), str(self.second_client.id)}, + ) diff --git a/apps/reports/tests/test_tasks.py b/apps/reports/tests/test_tasks.py index e0b236d..9e1a3ad 100644 --- a/apps/reports/tests/test_tasks.py +++ b/apps/reports/tests/test_tasks.py @@ -1,264 +1,111 @@ -from datetime import timedelta -from decimal import Decimal -from io import BytesIO +from unittest.mock import patch -import pytest from django.core.files.base import ContentFile from django.core.files.storage import default_storage +from django.test import TestCase from django.utils import timezone -from openpyxl import load_workbook -from apps.notifications.services import store as notification_store from apps.reports.models import ReportExportJob -from apps.reports.tasks import cleanup_expired_report_exports_task, generate_report_export_task -from apps.time_entries.models import TimeEntry +from apps.reports.tasks import ( + cleanup_expired_report_exports_task, + generate_report_export_task, +) from apps.users.models import User from apps.workspaces.models import Workspace -class FakeRedis: - def pipeline(self): - return self +class ReportTaskTests(TestCase): + @classmethod + def setUpTestData(cls): + cls.owner = User.objects.create_user( + mobile="09129990001", + password="secret123", + first_name="Owner", + last_name="User", + ) + cls.workspace = Workspace.objects.create(name="Exports", owner=cls.owner) - def zadd(self, *args, **kwargs): - return self + def test_generate_excel_export_marks_job_complete_and_sends_notification(self): + job = ReportExportJob.objects.create( + requesting_user=self.owner, + workspace=self.workspace, + export_type=ReportExportJob.ExportType.EXCEL, + filters={ + "workspace": str(self.workspace.id), + "period": "this_month", + "from_date": "2026-04-01", + "to_date": "2026-04-30", + "user": str(self.owner.id), + "client": None, + "project": None, + "tags": [], + "language": "en", + }, + ) - def hset(self, *args, **kwargs): - return self + with patch("apps.reports.tasks.build_table_report", return_value={"scope": {}, "summary": {}, "days": [], "clients": [], "projects": [], "tags": []}) as build_table_report: + with patch("apps.reports.tasks.build_user_scoped_table_reports", return_value=[]) as build_user_reports: + with patch("apps.reports.tasks.build_excel_report", return_value=b"excel-content") as build_excel_report: + with patch("apps.reports.tasks.RedisNotificationStore.add") as notify: + generate_report_export_task(str(job.id)) - def sadd(self, *args, **kwargs): - return self + job.refresh_from_db() + self.assertEqual(job.status, ReportExportJob.Status.COMPLETED) + self.assertTrue(bool(job.file)) + self.assertTrue(default_storage.exists(job.file.name)) + build_table_report.assert_called_once() + build_user_reports.assert_called_once() + build_excel_report.assert_called_once() + notify.assert_called_once() + self.assertEqual(notify.call_args.args[0], str(self.owner.id)) + self.assertEqual(notify.call_args.args[1]["type"], "report_export_ready") - def execute(self): - return [] + def test_generate_pdf_export_failure_marks_job_failed_and_notifies(self): + job = ReportExportJob.objects.create( + requesting_user=self.owner, + workspace=self.workspace, + export_type=ReportExportJob.ExportType.PDF, + filters={ + "workspace": str(self.workspace.id), + "period": "this_month", + "from_date": "2026-04-01", + "to_date": "2026-04-30", + "user": str(self.owner.id), + "client": None, + "project": None, + "tags": [], + "language": "fa", + }, + ) - def publish(self, *args, **kwargs): - return None + with patch("apps.reports.tasks.build_table_report", return_value={"scope": {}, "summary": {}, "days": [], "clients": [], "projects": [], "tags": []}): + with patch("apps.reports.tasks.build_pdf_report", side_effect=RuntimeError("boom")): + with patch("apps.reports.tasks.RedisNotificationStore.add") as notify: + with self.assertRaises(RuntimeError): + generate_report_export_task(str(job.id)) - def zrevrange(self, *args, **kwargs): - return [] + job.refresh_from_db() + self.assertEqual(job.status, ReportExportJob.Status.FAILED) + self.assertEqual(job.error_message, "boom") + notify.assert_called_once() + self.assertEqual(notify.call_args.args[1]["type"], "report_export_failed") - def hget(self, *args, **kwargs): - return None + def test_cleanup_expires_and_removes_files(self): + job = ReportExportJob.objects.create( + requesting_user=self.owner, + workspace=self.workspace, + export_type=ReportExportJob.ExportType.EXCEL, + status=ReportExportJob.Status.COMPLETED, + filters={}, + expires_at=timezone.now() - timezone.timedelta(days=1), + ) + file_name = f"reports/exports/{job.id}-old.xlsx" + job.file.save(file_name, ContentFile(b"old-data"), save=False) + job.save(update_fields=["file", "updated_at"]) - def zrem(self, *args, **kwargs): - return 1 + removed = cleanup_expired_report_exports_task() + job.refresh_from_db() - def hdel(self, *args, **kwargs): - return 1 - - def zcard(self, *args, **kwargs): - return 0 - - def smembers(self, *args, **kwargs): - return set() - - def srem(self, *args, **kwargs): - return 1 - - -@pytest.fixture() -def fake_redis(monkeypatch): - redis = FakeRedis() - monkeypatch.setattr(notification_store, "redis_client", redis) - return redis - - -@pytest.fixture() -def owner(db): - return User.objects.create_user(mobile="09129990001", password="secret123", first_name="Owner", last_name="User") - - -@pytest.fixture() -def teammate(db): - return User.objects.create_user(mobile="09129990002", password="secret123", first_name="Team", last_name="Mate") - - -@pytest.fixture() -def workspace(owner, teammate): - workspace = Workspace.objects.create(name="Exports", owner=owner) - workspace.memberships.create(user=teammate, role="member", is_active=True) - return workspace - - -@pytest.fixture() -def time_entry(workspace, owner): - return TimeEntry.objects.create( - workspace=workspace, - user=owner, - description="Export row", - start_time="2026-04-12T08:00:00+03:30", - end_time="2026-04-12T10:00:00+03:30", - duration=timedelta(hours=2), - is_billable=True, - hourly_rate=Decimal("15.00"), - currency="USD", - ) - - -@pytest.fixture() -def teammate_entry(workspace, teammate): - return TimeEntry.objects.create( - workspace=workspace, - user=teammate, - description="Team row", - start_time="2026-04-13T08:00:00+03:30", - end_time="2026-04-13T09:00:00+03:30", - duration=timedelta(hours=1), - is_billable=False, - currency="USD", - ) - - -def test_generate_export_creates_file_and_marks_job_complete(fake_redis, workspace, owner, time_entry): - job = ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.EXCEL, - filters={ - "workspace": str(workspace.id), - "period": "this_month", - "from_date": "2026-04-01", - "to_date": "2026-04-30", - "user": str(owner.id), - "client": None, - "project": None, - "tags": [], - "language": "en", - }, - ) - - generate_report_export_task(str(job.id)) - job.refresh_from_db() - - assert job.status == ReportExportJob.Status.COMPLETED - assert bool(job.file) - assert default_storage.exists(job.file.name) - - -def test_generate_excel_export_adds_per_user_sheets_for_all_users_scope( - fake_redis, - workspace, - owner, - teammate, - time_entry, - teammate_entry, -): - job = ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.EXCEL, - filters={ - "workspace": str(workspace.id), - "period": "this_month", - "from_date": "2026-04-01", - "to_date": "2026-04-30", - "user": None, - "client": None, - "project": None, - "tags": [], - "language": "en", - }, - ) - - generate_report_export_task(str(job.id)) - job.refresh_from_db() - - workbook = load_workbook(BytesIO(job.file.read())) - assert workbook.sheetnames[0] == "Overall Report" - assert any("Owner User" in sheet for sheet in workbook.sheetnames[1:]) - assert any("Team Mate" in sheet for sheet in workbook.sheetnames[1:]) - assert len(workbook.sheetnames) == 3 - - -def test_generate_excel_export_includes_daily_rate_column_and_split_user_meta( - fake_redis, - workspace, - owner, - time_entry, -): - job = ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.EXCEL, - filters={ - "workspace": str(workspace.id), - "period": "this_month", - "from_date": "2026-04-01", - "to_date": "2026-04-30", - "user": str(owner.id), - "client": None, - "project": None, - "tags": [], - "language": "en", - }, - ) - - generate_report_export_task(str(job.id)) - job.refresh_from_db() - - workbook = load_workbook(BytesIO(job.file.read())) - worksheet = workbook.active - values = list(worksheet.iter_rows(values_only=True)) - - assert any(row[:2] == ("User", "Owner User") for row in values if row) - assert any(row[:2] == ("Mobile", "09129990001") for row in values if row) - - daily_header = next(row[:6] for row in values if row and row[0] == "Date") - assert daily_header == ( - "Date", - "Billable hours", - "Non-billable hours", - "Total hours", - "Hourly rate", - "Income", - ) - - daily_row = next(row[:6] for row in values if row and row[0] == "2026/04/12") - assert daily_row[4] == "15 USD" - - -def test_generate_pdf_export_supports_persian_locale(fake_redis, workspace, owner, time_entry): - job = ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.PDF, - filters={ - "workspace": str(workspace.id), - "period": "this_month", - "from_date": "2026-04-01", - "to_date": "2026-04-30", - "user": str(owner.id), - "client": None, - "project": None, - "tags": [], - "language": "fa", - }, - ) - - generate_report_export_task(str(job.id)) - job.refresh_from_db() - - assert job.status == ReportExportJob.Status.COMPLETED - assert job.file.read(4) == b"%PDF" - - -def test_cleanup_expires_and_removes_files(fake_redis, workspace, owner): - job = ReportExportJob.objects.create( - requesting_user=owner, - workspace=workspace, - export_type=ReportExportJob.ExportType.EXCEL, - status=ReportExportJob.Status.COMPLETED, - filters={}, - expires_at=timezone.now() - timezone.timedelta(days=1), - ) - file_name = f"reports/exports/{job.id}-old.xlsx" - job.file.save(file_name, ContentFile(b"old-data"), save=False) - job.save(update_fields=["file", "updated_at"]) - - removed = cleanup_expired_report_exports_task() - job.refresh_from_db() - - assert removed == 1 - assert job.status == ReportExportJob.Status.EXPIRED - assert not default_storage.exists(file_name) + self.assertEqual(removed, 1) + self.assertEqual(job.status, ReportExportJob.Status.EXPIRED) + self.assertFalse(default_storage.exists(file_name)) diff --git a/apps/reports/tests/test_views.py b/apps/reports/tests/test_views.py index 9825122..e9f7a05 100644 --- a/apps/reports/tests/test_views.py +++ b/apps/reports/tests/test_views.py @@ -1,8 +1,8 @@ from datetime import date, timedelta from decimal import Decimal +from unittest.mock import patch -import pytest -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.clients.models import Client from apps.projects.models import Project @@ -12,194 +12,199 @@ from apps.users.models import User from apps.workspaces.models import Workspace, WorkspaceMembership -@pytest.fixture() -def api_client(): - return APIClient() +class ReportViewTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.owner = User.objects.create_user( + mobile="09128880001", + password="secret123", + first_name="Owner", + ) + cls.admin = User.objects.create_user( + mobile="09128880002", + password="secret123", + first_name="Admin", + ) + cls.member = User.objects.create_user( + mobile="09128880003", + password="secret123", + first_name="Member", + ) + cls.workspace = Workspace.objects.create(name="Reports", owner=cls.owner) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.admin, + role=WorkspaceMembership.Role.ADMIN, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) + cls.client_obj = Client.objects.create(workspace=cls.workspace, name="Acme") + cls.project = Project.objects.create( + workspace=cls.workspace, + name="Website", + client=cls.client_obj, + ) + cls.tag = Tag.objects.create( + workspace=cls.workspace, + name="Design", + color="#ffffff", + ) + entry_owner = TimeEntry.objects.create( + workspace=cls.workspace, + user=cls.owner, + project=cls.project, + description="Owner work", + start_time="2026-04-10T08:00:00+03:30", + end_time="2026-04-10T10:00:00+03:30", + duration=timedelta(hours=2), + is_billable=True, + hourly_rate=Decimal("25.00"), + currency="USD", + ) + entry_owner.tags.add(cls.tag) -@pytest.fixture() -def owner(db): - return User.objects.create_user(mobile="09128880001", password="secret123", first_name="Owner") + entry_member = TimeEntry.objects.create( + workspace=cls.workspace, + user=cls.member, + project=cls.project, + description="Member work", + start_time="2026-04-11T09:00:00+03:30", + end_time="2026-04-11T10:00:00+03:30", + duration=timedelta(hours=1), + is_billable=False, + currency="USD", + ) + entry_member.tags.add(cls.tag) + def test_member_only_sees_own_chart_report(self): + self.client.force_authenticate(user=self.member) -@pytest.fixture() -def admin(db): - return User.objects.create_user(mobile="09128880002", password="secret123", first_name="Admin") + response = self.client.get( + "/api/reports/chart/", + {"workspace": str(self.workspace.id), "period": "this_month"}, + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["summary"]["total_duration"], "01:00:00") -@pytest.fixture() -def member(db): - return User.objects.create_user(mobile="09128880003", password="secret123", first_name="Member") + def test_admin_can_request_combined_table_report(self): + self.client.force_authenticate(user=self.admin) + response = self.client.get( + "/api/reports/table/", + {"workspace": str(self.workspace.id), "period": "this_month"}, + ) -@pytest.fixture() -def workspace(owner, admin, member): - workspace = Workspace.objects.create(name="Reports", owner=owner) - WorkspaceMembership.objects.create(workspace=workspace, user=admin, role=WorkspaceMembership.Role.ADMIN, is_active=True) - WorkspaceMembership.objects.create(workspace=workspace, user=member, role=WorkspaceMembership.Role.MEMBER, is_active=True) - return workspace + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["summary"]["total_duration"], "03:00:00") + self.assertEqual(len(response.data["days"]), 2) + self.assertIsNone(response.data["days"][0]["latest_hourly_rate"]) + self.assertIsNone(response.data["days"][1]["latest_hourly_rate"]) + def test_daily_rate_uses_latest_billable_entry_snapshot(self): + self.client.force_authenticate(user=self.owner) -@pytest.fixture() -def client(workspace): - return Client.objects.create(workspace=workspace, name="Acme") + TimeEntry.objects.create( + workspace=self.workspace, + user=self.owner, + project=self.project, + description="Morning work", + start_time="2026-04-15T08:00:00+03:30", + end_time="2026-04-15T09:00:00+03:30", + duration=timedelta(hours=1), + is_billable=True, + hourly_rate=Decimal("20.00"), + currency="USD", + ) + TimeEntry.objects.create( + workspace=self.workspace, + user=self.owner, + project=self.project, + description="Later work", + start_time="2026-04-15T13:00:00+03:30", + end_time="2026-04-15T15:00:00+03:30", + duration=timedelta(hours=2), + is_billable=True, + hourly_rate=Decimal("35.00"), + currency="USD", + ) + response = self.client.get( + "/api/reports/table/", + { + "workspace": str(self.workspace.id), + "period": "this_month", + "user": str(self.owner.id), + }, + ) -@pytest.fixture() -def project(workspace, client): - return Project.objects.create(workspace=workspace, name="Website", client=client) + self.assertEqual(response.status_code, 200) + target_day = next(day for day in response.data["days"] if day["date"] == "2026-04-15") + self.assertEqual( + target_day["latest_hourly_rate"], + {"amount": "35.00", "currency": "USD"}, + ) + def test_custom_period_longer_than_31_days_is_rejected(self): + self.client.force_authenticate(user=self.owner) -@pytest.fixture() -def tag(workspace): - return Tag.objects.create(workspace=workspace, name="Design", color="#ffffff") + response = self.client.get( + "/api/reports/chart/", + { + "workspace": str(self.workspace.id), + "period": "period", + "from_date": "2026-01-01", + "to_date": "2026-02-15", + }, + ) + self.assertEqual(response.status_code, 400) -@pytest.fixture() -def time_entries(workspace, owner, member, project, tag): - entry_owner = TimeEntry.objects.create( - workspace=workspace, - user=owner, - project=project, - description="Owner work", - start_time="2026-04-10T08:00:00+03:30", - end_time="2026-04-10T10:00:00+03:30", - duration=timedelta(hours=2), - is_billable=True, - hourly_rate=Decimal("25.00"), - currency="USD", - ) - entry_owner.tags.add(tag) - entry_member = TimeEntry.objects.create( - workspace=workspace, - user=member, - project=project, - description="Member work", - start_time="2026-04-11T09:00:00+03:30", - end_time="2026-04-11T10:00:00+03:30", - duration=timedelta(hours=1), - is_billable=False, - currency="USD", - ) - entry_member.tags.add(tag) - return [entry_owner, entry_member] + def test_persian_this_month_uses_jalali_month_bounds(self): + self.client.force_authenticate(user=self.owner) + with patch( + "apps.reports.services.aggregation.timezone.localdate", + return_value=date(2026, 4, 27), + ): + TimeEntry.objects.create( + workspace=self.workspace, + user=self.owner, + project=self.project, + description="Previous jalali month", + start_time="2026-04-20T08:00:00+03:30", + end_time="2026-04-20T09:00:00+03:30", + duration=timedelta(hours=1), + is_billable=False, + currency="USD", + ) + TimeEntry.objects.create( + workspace=self.workspace, + user=self.owner, + project=self.project, + description="Current jalali month", + start_time="2026-04-21T08:00:00+03:30", + end_time="2026-04-21T10:00:00+03:30", + duration=timedelta(hours=2), + is_billable=False, + currency="USD", + ) -def test_member_only_sees_own_chart_report(api_client, member, workspace, time_entries): - api_client.force_authenticate(user=member) + response = self.client.get( + "/api/reports/table/", + { + "workspace": str(self.workspace.id), + "period": "this_month", + "language": "fa", + }, + ) - response = api_client.get( - "/api/reports/chart/", - {"workspace": str(workspace.id), "period": "this_month"}, - ) - - assert response.status_code == 200 - assert response.data["summary"]["total_duration"] == "01:00:00" - - -def test_admin_can_request_combined_table_report(api_client, admin, workspace, time_entries): - api_client.force_authenticate(user=admin) - - response = api_client.get( - "/api/reports/table/", - {"workspace": str(workspace.id), "period": "this_month"}, - ) - - assert response.status_code == 200 - assert response.data["summary"]["total_duration"] == "03:00:00" - assert len(response.data["days"]) == 2 - assert response.data["days"][0]["latest_hourly_rate"] is None - assert response.data["days"][1]["latest_hourly_rate"] is None - - -def test_daily_rate_uses_latest_billable_entry_snapshot(api_client, owner, workspace, project): - api_client.force_authenticate(user=owner) - - TimeEntry.objects.create( - workspace=workspace, - user=owner, - project=project, - description="Morning work", - start_time="2026-04-15T08:00:00+03:30", - end_time="2026-04-15T09:00:00+03:30", - duration=timedelta(hours=1), - is_billable=True, - hourly_rate=Decimal("20.00"), - currency="USD", - ) - TimeEntry.objects.create( - workspace=workspace, - user=owner, - project=project, - description="Later work", - start_time="2026-04-15T13:00:00+03:30", - end_time="2026-04-15T15:00:00+03:30", - duration=timedelta(hours=2), - is_billable=True, - hourly_rate=Decimal("35.00"), - currency="USD", - ) - - response = api_client.get( - "/api/reports/table/", - {"workspace": str(workspace.id), "period": "this_month", "user": str(owner.id)}, - ) - - assert response.status_code == 200 - assert response.data["days"][0]["latest_hourly_rate"] == { - "amount": "35.00", - "currency": "USD", - } - - -def test_custom_period_longer_than_31_days_is_rejected(api_client, owner, workspace): - api_client.force_authenticate(user=owner) - - response = api_client.get( - "/api/reports/chart/", - { - "workspace": str(workspace.id), - "period": "period", - "from_date": "2026-01-01", - "to_date": "2026-02-15", - }, - ) - - assert response.status_code == 400 - - -def test_persian_this_month_uses_jalali_month_bounds(api_client, owner, workspace, project, monkeypatch): - api_client.force_authenticate(user=owner) - monkeypatch.setattr("apps.reports.services.aggregation.timezone.localdate", lambda: date(2026, 4, 27)) - - TimeEntry.objects.create( - workspace=workspace, - user=owner, - project=project, - description="Previous jalali month", - start_time="2026-04-20T08:00:00+03:30", - end_time="2026-04-20T09:00:00+03:30", - duration=timedelta(hours=1), - is_billable=False, - currency="USD", - ) - TimeEntry.objects.create( - workspace=workspace, - user=owner, - project=project, - description="Current jalali month", - start_time="2026-04-21T08:00:00+03:30", - end_time="2026-04-21T10:00:00+03:30", - duration=timedelta(hours=2), - is_billable=False, - currency="USD", - ) - - response = api_client.get( - "/api/reports/table/", - {"workspace": str(workspace.id), "period": "this_month", "language": "fa"}, - ) - - assert response.status_code == 200 - assert response.data["summary"]["total_duration"] == "02:00:00" - assert response.data["scope"]["from_date"] == "2026-04-21" + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["summary"]["total_duration"], "02:00:00") + self.assertEqual(response.data["scope"]["from_date"], "2026-04-21") diff --git a/apps/time_entries/tests/test_filters.py b/apps/time_entries/tests/test_filters.py index 719d495..5e91375 100644 --- a/apps/time_entries/tests/test_filters.py +++ b/apps/time_entries/tests/test_filters.py @@ -1,5 +1,7 @@ from datetime import datetime +from django.test import TestCase + from apps.clients.models import Client from apps.projects.models import Project from apps.tags.models import Tag @@ -12,78 +14,94 @@ from apps.workspaces.models import Workspace def make_aware(year, month, day, hour=9, minute=0, second=0): from django.utils import timezone - return timezone.make_aware(datetime(year, month, day, hour, minute, second), timezone.get_current_timezone()) - - -def test_time_entry_filter_supports_project_client_tags_and_custom_dates(db): - user = User.objects.create_user(mobile="09124444444", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - client_a = Client.objects.create(workspace=workspace, name="Client A") - client_b = Client.objects.create(workspace=workspace, name="Client B") - project_a = Project.objects.create(workspace=workspace, client=client_a, name="Project A") - project_b = Project.objects.create(workspace=workspace, client=client_b, name="Project B") - tag_backend = Tag.objects.create(workspace=workspace, name="Backend", color="#0EA5E9") - tag_ops = Tag.objects.create(workspace=workspace, name="Ops", color="#10B981") - - entry_a = TimeEntry.objects.create( - workspace=workspace, - user=user, - project=project_a, - description="Backend work", - start_time=make_aware(2026, 4, 10, 10, 0, 0), - end_time=make_aware(2026, 4, 10, 12, 0, 0), - ) - entry_a.tags.set([tag_backend]) - - entry_b = TimeEntry.objects.create( - workspace=workspace, - user=user, - project=project_b, - description="Ops work", - start_time=make_aware(2026, 4, 18, 14, 0, 0), - end_time=make_aware(2026, 4, 18, 15, 30, 0), - ) - entry_b.tags.set([tag_ops]) - - queryset = TimeEntry.objects.filter(workspace=workspace, is_deleted=False) - - filtered = TimeEntryFilter( - data={ - "workspace": str(workspace.id), - "project": str(project_a.id), - "client": str(client_a.id), - "tags": str(tag_backend.id), - "started_after": "2026-04-01", - "started_before": "2026-04-15", - }, - queryset=queryset, - ).qs - - assert list(filtered) == [entry_a] - - -def test_time_entry_filter_supports_status_values(db): - user = User.objects.create_user(mobile="09125555555", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - - ended_entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Ended entry", - start_time=make_aware(2026, 4, 24, 9, 0, 0), - end_time=make_aware(2026, 4, 24, 10, 0, 0), - ) - running_entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Running entry", - start_time=make_aware(2026, 4, 15, 9, 0, 0), + current_timezone = timezone.get_current_timezone() + return timezone.make_aware( + datetime(year, month, day, hour, minute, second), + current_timezone, ) - queryset = TimeEntry.objects.filter(workspace=workspace, is_deleted=False) - ended = TimeEntryFilter(data={"status": "ended"}, queryset=queryset).qs - running = TimeEntryFilter(data={"status": "running"}, queryset=queryset).qs +class TimeEntryFilterTests(TestCase): + def test_time_entry_filter_supports_project_client_tags_and_custom_dates(self): + user = User.objects.create_user(mobile="09124444444", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + client_a = Client.objects.create(workspace=workspace, name="Client A") + client_b = Client.objects.create(workspace=workspace, name="Client B") + project_a = Project.objects.create( + workspace=workspace, + client=client_a, + name="Project A", + ) + project_b = Project.objects.create( + workspace=workspace, + client=client_b, + name="Project B", + ) + tag_backend = Tag.objects.create( + workspace=workspace, + name="Backend", + color="#0EA5E9", + ) + tag_ops = Tag.objects.create(workspace=workspace, name="Ops", color="#10B981") - assert list(ended) == [ended_entry] - assert list(running) == [running_entry] + entry_a = TimeEntry.objects.create( + workspace=workspace, + user=user, + project=project_a, + description="Backend work", + start_time=make_aware(2026, 4, 10, 10, 0, 0), + end_time=make_aware(2026, 4, 10, 12, 0, 0), + ) + entry_a.tags.set([tag_backend]) + + entry_b = TimeEntry.objects.create( + workspace=workspace, + user=user, + project=project_b, + description="Ops work", + start_time=make_aware(2026, 4, 18, 14, 0, 0), + end_time=make_aware(2026, 4, 18, 15, 30, 0), + ) + entry_b.tags.set([tag_ops]) + + queryset = TimeEntry.objects.filter(workspace=workspace, is_deleted=False) + + filtered = TimeEntryFilter( + data={ + "workspace": str(workspace.id), + "project": str(project_a.id), + "client": str(client_a.id), + "tags": str(tag_backend.id), + "started_after": "2026-04-01", + "started_before": "2026-04-15", + }, + queryset=queryset, + ).qs + + self.assertEqual(list(filtered), [entry_a]) + + def test_time_entry_filter_supports_status_values(self): + user = User.objects.create_user(mobile="09125555555", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + + ended_entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Ended entry", + start_time=make_aware(2026, 4, 24, 9, 0, 0), + end_time=make_aware(2026, 4, 24, 10, 0, 0), + ) + running_entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Running entry", + start_time=make_aware(2026, 4, 15, 9, 0, 0), + ) + + queryset = TimeEntry.objects.filter(workspace=workspace, is_deleted=False) + + ended = TimeEntryFilter(data={"status": "ended"}, queryset=queryset).qs + running = TimeEntryFilter(data={"status": "running"}, queryset=queryset).qs + + self.assertEqual(list(ended), [ended_entry]) + self.assertEqual(list(running), [running_entry]) diff --git a/apps/time_entries/tests/test_serializers.py b/apps/time_entries/tests/test_serializers.py index 39f4a6d..0fcfe80 100644 --- a/apps/time_entries/tests/test_serializers.py +++ b/apps/time_entries/tests/test_serializers.py @@ -1,59 +1,66 @@ from datetime import datetime +from django.test import TestCase from django.utils import timezone -from apps.time_entries.api.serializers import TimeEntrySerializer -from apps.time_entries.models import TimeEntry from apps.projects.models import Project from apps.tags.models import Tag +from apps.time_entries.api.serializers import TimeEntrySerializer +from apps.time_entries.models import TimeEntry from apps.users.models import User from apps.workspaces.models import Workspace -def test_time_entry_serializer_keeps_seconds(db): - user = User.objects.create_user(mobile="09123333333", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - current_timezone = timezone.get_current_timezone() +class TimeEntrySerializerTests(TestCase): + def test_time_entry_serializer_keeps_seconds(self): + user = User.objects.create_user(mobile="09123333333", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + current_timezone = timezone.get_current_timezone() - start_time = timezone.make_aware(datetime(2026, 4, 23, 10, 15, 42), current_timezone) - end_time = timezone.make_aware(datetime(2026, 4, 23, 11, 0, 5), current_timezone) + start_time = timezone.make_aware( + datetime(2026, 4, 23, 10, 15, 42), + current_timezone, + ) + end_time = timezone.make_aware( + datetime(2026, 4, 23, 11, 0, 5), + current_timezone, + ) - entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - start_time=start_time, - end_time=end_time, - ) + entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + start_time=start_time, + end_time=end_time, + ) - data = TimeEntrySerializer(entry).data + data = TimeEntrySerializer(entry).data - assert data["start_time"] == start_time.strftime("%Y-%m-%d %H:%M:%S") - assert data["end_time"] == end_time.strftime("%Y-%m-%d %H:%M:%S") + self.assertEqual(data["start_time"], start_time.strftime("%Y-%m-%d %H:%M:%S")) + self.assertEqual(data["end_time"], end_time.strftime("%Y-%m-%d %H:%M:%S")) + def test_time_entry_serializer_includes_deleted_project_and_tags(self): + user = User.objects.create_user(mobile="09124444444", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + project = Project.objects.create(workspace=workspace, name="Legacy Project") + tag = Tag.objects.create(workspace=workspace, name="Legacy Tag", color="#334155") + project.delete() + tag.delete() -def test_time_entry_serializer_includes_deleted_project_and_tags(db): - user = User.objects.create_user(mobile="09124444444", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - project = Project.objects.create(workspace=workspace, name="Legacy Project") - tag = Tag.objects.create(workspace=workspace, name="Legacy Tag", color="#334155") - project.delete() - tag.delete() + entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + project=Project.all_objects.get(id=project.id), + description="Historical work", + start_time=timezone.now(), + end_time=timezone.now(), + ) + entry.tags.set([Tag.all_objects.get(id=tag.id)]) - entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - project=Project.all_objects.get(id=project.id), - description="Historical work", - start_time=timezone.now(), - end_time=timezone.now(), - ) - entry.tags.set([Tag.all_objects.get(id=tag.id)]) + data = TimeEntrySerializer(entry).data - data = TimeEntrySerializer(entry).data - - assert data["project"] == str(project.id) - assert data["project_details"]["name"] == "Legacy Project" - assert data["project_details"]["is_deleted"] is True - assert data["tags"] == [str(tag.id)] - assert data["tag_details"][0]["name"] == "Legacy Tag" - assert data["tag_details"][0]["is_deleted"] is True + self.assertEqual(data["project"], str(project.id)) + self.assertEqual(data["project_details"]["name"], "Legacy Project") + self.assertTrue(data["project_details"]["is_deleted"]) + self.assertEqual(data["tags"], [str(tag.id)]) + self.assertEqual(data["tag_details"][0]["name"], "Legacy Tag") + self.assertTrue(data["tag_details"][0]["is_deleted"]) diff --git a/apps/time_entries/tests/test_services.py b/apps/time_entries/tests/test_services.py index 89d1ad0..241ccc4 100644 --- a/apps/time_entries/tests/test_services.py +++ b/apps/time_entries/tests/test_services.py @@ -1,78 +1,87 @@ from datetime import timedelta -import pytest +from django.test import TestCase from django.utils import timezone from rest_framework.exceptions import ValidationError from apps.projects.models import Project from apps.tags.models import Tag -from apps.time_entries.services.time_entries import create_time_entry, stop_time_entry, update_time_entry +from apps.time_entries.services.time_entries import ( + create_time_entry, + stop_time_entry, + update_time_entry, +) from apps.users.models import User from apps.workspaces.models import Workspace -@pytest.fixture -def workspace_owner(db): - user = User.objects.create_user(mobile="09121111111", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - return user, workspace +class TimeEntryServiceTests(TestCase): + @classmethod + def setUpTestData(cls): + cls.user = User.objects.create_user(mobile="09121111111", password="secret123") + cls.workspace = Workspace.objects.create(name="Core", owner=cls.user) - -def test_create_time_entry_allows_only_one_running_timer_per_workspace(workspace_owner): - user, workspace = workspace_owner - - create_time_entry( - user=user, - workspace_id=workspace.id, - start_time=timezone.now(), - ) - - with pytest.raises(ValidationError): + def test_create_time_entry_allows_only_one_running_timer_per_workspace(self): create_time_entry( - user=user, - workspace_id=workspace.id, - start_time=timezone.now() + timedelta(minutes=5), + user=self.user, + workspace_id=self.workspace.id, + start_time=timezone.now(), ) + with self.assertRaises(ValidationError): + create_time_entry( + user=self.user, + workspace_id=self.workspace.id, + start_time=timezone.now() + timedelta(minutes=5), + ) -def test_stop_time_entry_sets_end_time_and_duration(workspace_owner): - user, workspace = workspace_owner - entry = create_time_entry( - user=user, - workspace_id=workspace.id, - start_time=timezone.now() - timedelta(hours=1), - ) + def test_stop_time_entry_sets_end_time_and_duration(self): + entry = create_time_entry( + user=self.user, + workspace_id=self.workspace.id, + start_time=timezone.now() - timedelta(hours=1), + ) - stopped_entry = stop_time_entry(entry, end_time=timezone.now()) + stopped_entry = stop_time_entry(entry, end_time=timezone.now()) - assert stopped_entry.end_time is not None - assert stopped_entry.duration is not None + self.assertIsNotNone(stopped_entry.end_time) + self.assertIsNotNone(stopped_entry.duration) + def test_update_time_entry_preserves_deleted_project_and_tags(self): + project = Project.objects.create(workspace=self.workspace, name="Deleted project") + tag = Tag.objects.create( + workspace=self.workspace, + name="Deleted tag", + color="#0f172a", + ) + entry = create_time_entry( + user=self.user, + workspace_id=self.workspace.id, + start_time=timezone.now() - timedelta(hours=1), + end_time=timezone.now(), + project=project, + tags=[tag], + description="Before delete", + ) -def test_update_time_entry_preserves_deleted_project_and_tags(workspace_owner): - user, workspace = workspace_owner - project = Project.objects.create(workspace=workspace, name="Deleted project") - tag = Tag.objects.create(workspace=workspace, name="Deleted tag", color="#0f172a") - entry = create_time_entry( - user=user, - workspace_id=workspace.id, - start_time=timezone.now() - timedelta(hours=1), - end_time=timezone.now(), - project=project, - tags=[tag], - description="Before delete", - ) + project.delete() + tag.delete() - project.delete() - tag.delete() + updated_entry = update_time_entry( + entry, + project=Project.all_objects.get(id=project.id), + tags=[Tag.all_objects.get(id=tag.id)], + description="After delete", + ) - updated_entry = update_time_entry( - entry, - project=Project.all_objects.get(id=project.id), - tags=[Tag.all_objects.get(id=tag.id)], - description="After delete", - ) - - assert updated_entry.description == "After delete" - assert updated_entry.project_id == project.id - assert list(Tag.all_objects.filter(time_entries=updated_entry).values_list("id", flat=True)) == [tag.id] + self.assertEqual(updated_entry.description, "After delete") + self.assertEqual(updated_entry.project_id, project.id) + self.assertEqual( + list( + Tag.all_objects.filter(time_entries=updated_entry).values_list( + "id", + flat=True, + ) + ), + [tag.id], + ) diff --git a/apps/time_entries/tests/test_views.py b/apps/time_entries/tests/test_views.py index 8daf746..c7c1b4a 100644 --- a/apps/time_entries/tests/test_views.py +++ b/apps/time_entries/tests/test_views.py @@ -1,7 +1,7 @@ from datetime import datetime from django.utils import timezone -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.tags.models import Tag from apps.time_entries.models import TimeEntry @@ -10,131 +10,132 @@ from apps.workspaces.models import Workspace def make_aware(year, month, day, hour=9, minute=0, second=0): - return timezone.make_aware(datetime(year, month, day, hour, minute, second), timezone.get_current_timezone()) - - -def test_time_entry_list_returns_grouped_payload_for_ended_entries(db): - user = User.objects.create_user(mobile="09126666666", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - - first_entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Morning work", - start_time=make_aware(2026, 4, 24, 9, 0, 0), - end_time=make_aware(2026, 4, 24, 10, 30, 0), - ) - TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Running work", - start_time=make_aware(2026, 4, 24, 11, 0, 0), + current_timezone = timezone.get_current_timezone() + return timezone.make_aware( + datetime(year, month, day, hour, minute, second), + current_timezone, ) - client = APIClient() - client.force_authenticate(user=user) - response = client.get( - "/api/time-entries/", - { - "workspace": str(workspace.id), - "status": "ended", - "limit": 10, - "offset": 0, - }, - ) +class TimeEntryViewTests(APITestCase): + def test_time_entry_list_returns_grouped_payload_for_ended_entries(self): + user = User.objects.create_user(mobile="09126666666", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) - assert response.status_code == 200 - assert response.data["current_page_items_count"] == 1 - assert response.data["has_more"] is False - assert len(response.data["groups"]) == 1 - assert len(response.data["groups"][0]["days"]) == 1 - assert response.data["groups"][0]["days"][0]["entries"][0]["id"] == str(first_entry.id) + first_entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Morning work", + start_time=make_aware(2026, 4, 24, 9, 0, 0), + end_time=make_aware(2026, 4, 24, 10, 30, 0), + ) + TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Running work", + start_time=make_aware(2026, 4, 24, 11, 0, 0), + ) + self.client.force_authenticate(user=user) + response = self.client.get( + "/api/time-entries/", + { + "workspace": str(workspace.id), + "status": "ended", + "limit": 10, + "offset": 0, + }, + ) -def test_time_entry_update_preserves_current_deleted_tags(db): - user = User.objects.create_user(mobile="09127777777", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - tag = Tag.objects.create(workspace=workspace, name="Legacy Tag", color="#475569") - entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Old", - start_time=make_aware(2026, 4, 24, 9, 0, 0), - end_time=make_aware(2026, 4, 24, 10, 30, 0), - ) - entry.tags.set([tag]) - tag.delete() + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["current_page_items_count"], 1) + self.assertFalse(response.data["has_more"]) + self.assertEqual(len(response.data["groups"]), 1) + self.assertEqual(len(response.data["groups"][0]["days"]), 1) + self.assertEqual( + response.data["groups"][0]["days"][0]["entries"][0]["id"], + str(first_entry.id), + ) - client = APIClient() - client.force_authenticate(user=user) + def test_time_entry_update_preserves_current_deleted_tags(self): + user = User.objects.create_user(mobile="09127777777", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + tag = Tag.objects.create(workspace=workspace, name="Legacy Tag", color="#475569") + entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Old", + start_time=make_aware(2026, 4, 24, 9, 0, 0), + end_time=make_aware(2026, 4, 24, 10, 30, 0), + ) + entry.tags.set([tag]) + tag.delete() - response = client.patch( - f"/api/time-entries/{entry.id}/", - { - "description": "Still editable", - "tags": [str(tag.id)], - }, - format="json", - ) + self.client.force_authenticate(user=user) + response = self.client.patch( + f"/api/time-entries/{entry.id}/", + { + "description": "Still editable", + "tags": [str(tag.id)], + }, + format="json", + ) - assert response.status_code == 200 - assert response.data["description"] == "Still editable" - assert response.data["tag_details"][0]["is_deleted"] is True + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["description"], "Still editable") + self.assertTrue(response.data["tag_details"][0]["is_deleted"]) + def test_time_entry_update_rejects_new_deleted_tag_attachment(self): + user = User.objects.create_user(mobile="09128888888", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + deleted_tag = Tag.objects.create( + workspace=workspace, + name="Deleted tag", + color="#475569", + ) + deleted_tag.delete() + entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Entry", + start_time=make_aware(2026, 4, 24, 9, 0, 0), + end_time=make_aware(2026, 4, 24, 10, 30, 0), + ) -def test_time_entry_update_rejects_new_deleted_tag_attachment(db): - user = User.objects.create_user(mobile="09128888888", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - deleted_tag = Tag.objects.create(workspace=workspace, name="Deleted tag", color="#475569") - deleted_tag.delete() - entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Entry", - start_time=make_aware(2026, 4, 24, 9, 0, 0), - end_time=make_aware(2026, 4, 24, 10, 30, 0), - ) + self.client.force_authenticate(user=user) + response = self.client.patch( + f"/api/time-entries/{entry.id}/", + {"tags": [str(deleted_tag.id)]}, + format="json", + ) - client = APIClient() - client.force_authenticate(user=user) + self.assertEqual(response.status_code, 400) + self.assertIn("unavailable", response.data["error"].lower()) - response = client.patch( - f"/api/time-entries/{entry.id}/", - { - "tags": [str(deleted_tag.id)], - }, - format="json", - ) + def test_time_entry_update_can_remove_current_deleted_tag(self): + user = User.objects.create_user(mobile="09129999999", password="secret123") + workspace = Workspace.objects.create(name="Core", owner=user) + deleted_tag = Tag.objects.create( + workspace=workspace, + name="Deleted tag", + color="#475569", + ) + entry = TimeEntry.objects.create( + workspace=workspace, + user=user, + description="Entry", + start_time=make_aware(2026, 4, 24, 9, 0, 0), + end_time=make_aware(2026, 4, 24, 10, 30, 0), + ) + entry.tags.set([deleted_tag]) + deleted_tag.delete() - assert response.status_code == 400 - assert "unavailable" in response.data["error"].lower() + self.client.force_authenticate(user=user) + response = self.client.patch( + f"/api/time-entries/{entry.id}/", + {"tags": []}, + format="json", + ) - -def test_time_entry_update_can_remove_current_deleted_tag(db): - user = User.objects.create_user(mobile="09129999999", password="secret123") - workspace = Workspace.objects.create(name="Core", owner=user) - deleted_tag = Tag.objects.create(workspace=workspace, name="Deleted tag", color="#475569") - entry = TimeEntry.objects.create( - workspace=workspace, - user=user, - description="Entry", - start_time=make_aware(2026, 4, 24, 9, 0, 0), - end_time=make_aware(2026, 4, 24, 10, 30, 0), - ) - entry.tags.set([deleted_tag]) - deleted_tag.delete() - - client = APIClient() - client.force_authenticate(user=user) - - response = client.patch( - f"/api/time-entries/{entry.id}/", - { - "tags": [], - }, - format="json", - ) - - assert response.status_code == 200 - assert response.data["tags"] == [] + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data["tags"], []) diff --git a/apps/users/tests/test_profile_picture_api.py b/apps/users/tests/test_profile_picture_api.py index 87a5663..4b9867b 100644 --- a/apps/users/tests/test_profile_picture_api.py +++ b/apps/users/tests/test_profile_picture_api.py @@ -1,18 +1,18 @@ from rest_framework import status -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.users.models import User -def test_profile_picture_delete_returns_profile_payload(db): - user = User.objects.create_user(mobile="09120000000", password="secret123") - client = APIClient() - client.force_authenticate(user=user) +class ProfilePictureApiTests(APITestCase): + def test_profile_picture_delete_returns_profile_payload(self): + user = User.objects.create_user(mobile="09120000000", password="secret123") + self.client.force_authenticate(user=user) - response = client.delete("/api/users/profile/picture/") + response = self.client.delete("/api/users/profile/picture/") - assert response.status_code == status.HTTP_200_OK - assert response.data["profile_picture"] is None + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIsNone(response.data["profile_picture"]) - user.refresh_from_db() - assert not user.profile_picture + user.refresh_from_db() + self.assertFalse(user.profile_picture) diff --git a/apps/users/tests/test_tasks.py b/apps/users/tests/test_tasks.py index 0827959..e4702c5 100644 --- a/apps/users/tests/test_tasks.py +++ b/apps/users/tests/test_tasks.py @@ -1,13 +1,65 @@ -from apps.users.tasks import send_verification_sms +from unittest.mock import Mock, patch + +from django.test import TestCase + +from apps.users.tasks import _send_sms, send_verification_sms -def test_send_verification_sms_skips_real_delivery_without_api_key(settings): - settings.SMS_APIKEY = "" +class UserTaskTests(TestCase): + def test_send_verification_sms_skips_real_delivery_without_api_key(self): + with self.settings(SMS_APIKEY=""): + result = send_verification_sms("09123456789", "12345") - result = send_verification_sms("09123456789", "12345") + self.assertEqual( + result, + { + "mobile": "09123456789", + "code": "12345", + "sent": False, + }, + ) - assert result == { - "mobile": "09123456789", - "code": "12345", - "sent": False, - } + @patch("apps.users.tasks._send_sms") + def test_send_verification_sms_calls_sender_when_api_key_exists(self, send_sms): + send_sms.return_value = Mock(status_code=200) + + with self.settings(SMS_APIKEY="configured-key"): + send_verification_sms("09123456789", "12345") + + send_sms.assert_called_once_with( + "09123456789", + 570574, + variables=[{"name": "OTP", "value": "12345"}], + ) + + @patch("apps.users.tasks._send_sms", return_value=None) + def test_send_verification_sms_raises_when_delivery_fails(self, send_sms): + with self.settings(SMS_APIKEY="configured-key"): + with self.assertRaises(Exception): + send_verification_sms("09123456789", "12345") + + send_sms.assert_called_once() + + @patch("apps.users.tasks.requests.post") + def test_send_sms_posts_verify_payload(self, requests_post): + response = Mock(status_code=200, text="ok") + response.json.return_value = {"status": "1"} + requests_post.return_value = response + + with self.settings(SMS_APIKEY="configured-key"): + result = _send_sms( + "09123456789", + 570574, + variables=[{"name": "OTP", "value": "12345"}], + ) + + self.assertEqual(result, response) + requests_post.assert_called_once() + self.assertEqual( + requests_post.call_args.kwargs["json"], + { + "mobile": "09123456789", + "templateId": 570574, + "parameters": [{"name": "OTP", "value": "12345"}], + }, + ) diff --git a/apps/workspaces/tests/test_capabilities.py b/apps/workspaces/tests/test_capabilities.py index 2876d49..a737f5b 100644 --- a/apps/workspaces/tests/test_capabilities.py +++ b/apps/workspaces/tests/test_capabilities.py @@ -1,8 +1,7 @@ from datetime import timedelta -import pytest from django.utils import timezone -from rest_framework.test import APIClient +from rest_framework.test import APITestCase from apps.clients.models import Client from apps.projects.models import Project @@ -11,326 +10,378 @@ from apps.users.models import User from apps.workspaces.models import Workspace, WorkspaceMembership -@pytest.fixture() -def api_client(): - return APIClient() +class WorkspaceCapabilityTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.owner = cls._user(1) + cls.admin = cls._user(2) + cls.member = cls._user(3) + cls.guest = cls._user(4) + cls.extra_owner = cls._user(5) + cls.workspace = Workspace.objects.create(name="Ops", description="", owner=cls.owner) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.admin, + role=WorkspaceMembership.Role.ADMIN, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.guest, + role=WorkspaceMembership.Role.GUEST, + is_active=True, + ) + cls.project = Project.objects.create( + workspace=cls.workspace, + name="Alpha", + description="", + ) -def _user(index: int) -> User: - return User.objects.create_user( - mobile=f"091255500{index:02d}", - password="secret123", - first_name=f"User{index}", - ) + @staticmethod + def _user(index): + return User.objects.create_user( + mobile=f"091255500{index:02d}", + password="secret123", + first_name=f"User{index}", + ) + def test_member_is_read_only_for_clients_and_projects(self): + client = Client.objects.create( + workspace=self.workspace, + name="Existing Client", + notes="", + ) + self.client.force_authenticate(user=self.member) -@pytest.fixture() -def owner(db): - return _user(1) + client_response = self.client.post( + "/api/clients/", + { + "workspace_id": str(self.workspace.id), + "name": "Acme", + "notes": "", + }, + format="json", + ) + update_client_response = self.client.patch( + f"/api/clients/{client.id}/", + {"name": "Updated"}, + format="json", + ) + delete_client_response = self.client.delete(f"/api/clients/{client.id}/") + project_response = self.client.post( + "/api/projects/", + { + "workspace": str(self.workspace.id), + "name": "Beta", + "description": "", + "client": None, + }, + format="json", + ) + update_project_response = self.client.patch( + f"/api/projects/{self.project.id}/", + {"description": "Blocked edit"}, + format="json", + ) + archive_project_response = self.client.post( + f"/api/projects/{self.project.id}/archive/" + ) + delete_project_response = self.client.delete(f"/api/projects/{self.project.id}/") + self.assertEqual(client_response.status_code, 403) + self.assertEqual(update_client_response.status_code, 403) + self.assertEqual(delete_client_response.status_code, 403) + self.assertEqual(project_response.status_code, 403) + self.assertEqual(update_project_response.status_code, 403) + self.assertEqual(archive_project_response.status_code, 403) + self.assertEqual(delete_project_response.status_code, 403) -@pytest.fixture() -def admin(db): - return _user(2) + def test_member_can_create_tags_and_manage_own_time_entries(self): + tag = Tag.objects.create( + workspace=self.workspace, + name="Existing", + color="#000000", + ) + self.client.force_authenticate(user=self.member) + create_tag_response = self.client.post( + "/api/tags/", + { + "workspace_id": str(self.workspace.id), + "name": "New Tag", + "color": "#ffffff", + }, + format="json", + ) + update_tag_response = self.client.patch( + f"/api/tags/{tag.id}/", + {"name": "Changed"}, + format="json", + ) + delete_tag_response = self.client.delete(f"/api/tags/{tag.id}/") -@pytest.fixture() -def member(db): - return _user(3) + now = timezone.now() + create_entry_response = self.client.post( + "/api/time-entries/", + { + "workspace_id": str(self.workspace.id), + "start_time": now.isoformat(), + "end_time": (now + timedelta(hours=1)).isoformat(), + "description": "Focus block", + }, + format="json", + ) + self.assertEqual(create_tag_response.status_code, 201) + self.assertEqual(update_tag_response.status_code, 403) + self.assertEqual(delete_tag_response.status_code, 403) + self.assertEqual(create_entry_response.status_code, 201) -@pytest.fixture() -def guest(db): - return _user(4) + entry_id = create_entry_response.data["id"] + update_entry_response = self.client.patch( + f"/api/time-entries/{entry_id}/", + {"description": "Updated focus block"}, + format="json", + ) + delete_entry_response = self.client.delete(f"/api/time-entries/{entry_id}/") + self.assertEqual(update_entry_response.status_code, 200) + self.assertEqual(delete_entry_response.status_code, 204) -@pytest.fixture() -def extra_owner(db): - return _user(5) + def test_guest_is_read_only_for_workspace_resources(self): + Client.objects.create(workspace=self.workspace, name="Visible Client", notes="") + Tag.objects.create(workspace=self.workspace, name="Visible Tag", color="#123456") + self.client.force_authenticate(user=self.guest) -@pytest.fixture() -def workspace(owner, admin, member, guest): - workspace = Workspace.objects.create(name="Ops", description="", owner=owner) - WorkspaceMembership.objects.create( - workspace=workspace, - user=admin, - role=WorkspaceMembership.Role.ADMIN, - is_active=True, - ) - WorkspaceMembership.objects.create( - workspace=workspace, - user=member, - role=WorkspaceMembership.Role.MEMBER, - is_active=True, - ) - WorkspaceMembership.objects.create( - workspace=workspace, - user=guest, - role=WorkspaceMembership.Role.GUEST, - is_active=True, - ) - return workspace + list_clients_response = self.client.get( + f"/api/clients/?workspace={self.workspace.id}" + ) + list_projects_response = self.client.get( + f"/api/projects/?workspace={self.workspace.id}" + ) + create_tag_response = self.client.post( + "/api/tags/", + { + "workspace_id": str(self.workspace.id), + "name": "Blocked", + "color": "#ffffff", + }, + format="json", + ) + create_entry_response = self.client.post( + "/api/time-entries/", + { + "workspace_id": str(self.workspace.id), + "start_time": timezone.now().isoformat(), + "description": "Blocked guest entry", + }, + format="json", + ) + edit_project_response = self.client.patch( + f"/api/projects/{self.project.id}/", + {"description": "Blocked"}, + format="json", + ) + self.assertEqual(list_clients_response.status_code, 200) + self.assertEqual(list_projects_response.status_code, 200) + self.assertEqual(create_tag_response.status_code, 403) + self.assertEqual(create_entry_response.status_code, 403) + self.assertEqual(edit_project_response.status_code, 403) -@pytest.fixture() -def project(workspace, owner, member): - return Project.objects.create(workspace=workspace, name="Alpha", description="") + def test_member_cannot_edit_project(self): + self.client.force_authenticate(user=self.member) + response = self.client.patch( + f"/api/projects/{self.project.id}/", + {"description": "Still blocked"}, + format="json", + ) -def test_member_is_read_only_for_clients_and_projects(api_client, member, workspace, project): - client = Client.objects.create(workspace=workspace, name="Existing Client", notes="") - api_client.force_authenticate(user=member) + self.assertEqual(response.status_code, 403) - client_response = api_client.post( - "/api/clients/", - {"workspace_id": str(workspace.id), "name": "Acme", "notes": ""}, - format="json", - ) - update_client_response = api_client.patch( - f"/api/clients/{client.id}/", - {"name": "Updated"}, - format="json", - ) - delete_client_response = api_client.delete(f"/api/clients/{client.id}/") - project_response = api_client.post( - "/api/projects/", - {"workspace": str(workspace.id), "name": "Beta", "description": "", "client": None}, - format="json", - ) - update_project_response = api_client.patch( - f"/api/projects/{project.id}/", - {"description": "Blocked edit"}, - format="json", - ) - archive_project_response = api_client.post(f"/api/projects/{project.id}/archive/") - delete_project_response = api_client.delete(f"/api/projects/{project.id}/") - assert client_response.status_code == 403 - assert update_client_response.status_code == 403 - assert delete_client_response.status_code == 403 - assert project_response.status_code == 403 - assert update_project_response.status_code == 403 - assert archive_project_response.status_code == 403 - assert delete_project_response.status_code == 403 + def test_member_can_list_workspace_members_with_restricted_user_fields(self): + self.client.force_authenticate(user=self.member) + response = self.client.get( + f"/api/workspace-memberships/?workspace={self.workspace.id}" + ) -def test_member_can_create_tags_and_manage_own_time_entries(api_client, owner, member, workspace): - tag = Tag.objects.create(workspace=workspace, name="Existing", color="#000000") - api_client.force_authenticate(user=member) + self.assertEqual(response.status_code, 200) + payload = ( + response.data.get("items", response.data) + if isinstance(response.data, dict) + else response.data + ) + self.assertGreaterEqual(len(payload), 1) + first_user = payload[0]["user"] + self.assertNotIn("mobile", first_user) + self.assertNotIn("email", first_user) - create_tag_response = api_client.post( - "/api/tags/", - {"workspace_id": str(workspace.id), "name": "New Tag", "color": "#ffffff"}, - format="json", - ) - update_tag_response = api_client.patch( - f"/api/tags/{tag.id}/", - {"name": "Changed"}, - format="json", - ) - delete_tag_response = api_client.delete(f"/api/tags/{tag.id}/") + def test_owner_can_list_workspace_members_with_full_user_fields(self): + self.client.force_authenticate(user=self.owner) - now = timezone.now() - create_entry_response = api_client.post( - "/api/time-entries/", - { - "workspace_id": str(workspace.id), - "start_time": now.isoformat(), - "end_time": (now + timedelta(hours=1)).isoformat(), - "description": "Focus block", - }, - format="json", - ) + response = self.client.get( + f"/api/workspace-memberships/?workspace={self.workspace.id}" + ) - assert create_tag_response.status_code == 201 - assert update_tag_response.status_code == 403 - assert delete_tag_response.status_code == 403 - assert create_entry_response.status_code == 201 + self.assertEqual(response.status_code, 200) + payload = ( + response.data.get("items", response.data) + if isinstance(response.data, dict) + else response.data + ) + self.assertGreaterEqual(len(payload), 1) + first_user = payload[0]["user"] + self.assertIn("mobile", first_user) - entry_id = create_entry_response.data["id"] - update_entry_response = api_client.patch( - f"/api/time-entries/{entry_id}/", - {"description": "Updated focus block"}, - format="json", - ) - delete_entry_response = api_client.delete(f"/api/time-entries/{entry_id}/") + def test_admin_cannot_change_owner_membership_but_canonical_owner_can(self): + extra_owner_membership = WorkspaceMembership.objects.create( + workspace=self.workspace, + user=self.extra_owner, + role=WorkspaceMembership.Role.OWNER, + is_active=True, + ) - assert update_entry_response.status_code == 200 - assert delete_entry_response.status_code == 204 + self.client.force_authenticate(user=self.admin) + admin_response = self.client.patch( + f"/api/workspace-memberships/{extra_owner_membership.id}/", + {"role": WorkspaceMembership.Role.ADMIN}, + format="json", + ) + self.client.force_authenticate(user=self.owner) + owner_response = self.client.patch( + f"/api/workspace-memberships/{extra_owner_membership.id}/", + {"role": WorkspaceMembership.Role.ADMIN}, + format="json", + ) -def test_guest_is_read_only_for_workspace_resources(api_client, owner, guest, workspace, project): - Client.objects.create(workspace=workspace, name="Visible Client", notes="") - Tag.objects.create(workspace=workspace, name="Visible Tag", color="#123456") + self.assertEqual(admin_response.status_code, 403) + self.assertEqual(owner_response.status_code, 200) - api_client.force_authenticate(user=guest) + def test_admin_cannot_add_or_change_admin_memberships(self): + admin_membership = WorkspaceMembership.objects.get( + workspace=self.workspace, + user=self.admin, + is_deleted=False, + ) - list_clients_response = api_client.get(f"/api/clients/?workspace={workspace.id}") - list_projects_response = api_client.get(f"/api/projects/?workspace={workspace.id}") - create_tag_response = api_client.post( - "/api/tags/", - {"workspace_id": str(workspace.id), "name": "Blocked", "color": "#ffffff"}, - format="json", - ) - create_entry_response = api_client.post( - "/api/time-entries/", - { - "workspace_id": str(workspace.id), - "start_time": timezone.now().isoformat(), - "description": "Blocked guest entry", - }, - format="json", - ) - edit_project_response = api_client.patch( - f"/api/projects/{project.id}/", - {"description": "Blocked"}, - format="json", - ) + self.client.force_authenticate(user=self.admin) + create_response = self.client.post( + "/api/workspace-memberships/", + { + "workspace": str(self.workspace.id), + "user": str(self.member.id), + "role": WorkspaceMembership.Role.ADMIN, + }, + format="json", + ) + update_response = self.client.patch( + f"/api/workspace-memberships/{admin_membership.id}/", + {"role": WorkspaceMembership.Role.MEMBER}, + format="json", + ) + delete_response = self.client.delete( + f"/api/workspace-memberships/{admin_membership.id}/" + ) - assert list_clients_response.status_code == 200 - assert list_projects_response.status_code == 200 - assert create_tag_response.status_code == 403 - assert create_entry_response.status_code == 403 - assert edit_project_response.status_code == 403 + self.assertEqual(create_response.status_code, 403) + self.assertEqual(update_response.status_code, 403) + self.assertEqual(delete_response.status_code, 403) + def test_admin_can_delete_only_owned_clients_tags_and_projects(self): + self.client.force_authenticate(user=self.owner) + owner_client_response = self.client.post( + "/api/clients/", + { + "workspace_id": str(self.workspace.id), + "name": "Owner Client", + "notes": "", + }, + format="json", + ) + owner_tag_response = self.client.post( + "/api/tags/", + { + "workspace_id": str(self.workspace.id), + "name": "Owner Tag", + "color": "#123456", + }, + format="json", + ) + owner_project_response = self.client.post( + "/api/projects/", + { + "workspace": str(self.workspace.id), + "name": "Owner Project", + "description": "", + "client": None, + }, + format="json", + ) -def test_member_cannot_edit_project(api_client, member, project): - api_client.force_authenticate(user=member) + self.client.force_authenticate(user=self.admin) + admin_client_response = self.client.post( + "/api/clients/", + { + "workspace_id": str(self.workspace.id), + "name": "Admin Client", + "notes": "", + }, + format="json", + ) + admin_tag_response = self.client.post( + "/api/tags/", + { + "workspace_id": str(self.workspace.id), + "name": "Admin Tag", + "color": "#654321", + }, + format="json", + ) + admin_project_response = self.client.post( + "/api/projects/", + { + "workspace": str(self.workspace.id), + "name": "Admin Project", + "description": "", + "client": None, + }, + format="json", + ) - response = api_client.patch( - f"/api/projects/{project.id}/", - {"description": "Still blocked"}, - format="json", - ) + delete_owner_client = self.client.delete( + f"/api/clients/{owner_client_response.data['id']}/" + ) + delete_owner_tag = self.client.delete( + f"/api/tags/{owner_tag_response.data['id']}/" + ) + delete_owner_project = self.client.delete( + f"/api/projects/{owner_project_response.data['id']}/" + ) - assert response.status_code == 403 + delete_admin_client = self.client.delete( + f"/api/clients/{admin_client_response.data['id']}/" + ) + delete_admin_tag = self.client.delete( + f"/api/tags/{admin_tag_response.data['id']}/" + ) + delete_admin_project = self.client.delete( + f"/api/projects/{admin_project_response.data['id']}/" + ) - -def test_member_can_list_workspace_members_with_restricted_user_fields(api_client, member, workspace): - api_client.force_authenticate(user=member) - - response = api_client.get(f"/api/workspace-memberships/?workspace={workspace.id}") - - assert response.status_code == 200 - payload = response.data.get("items", response.data) if isinstance(response.data, dict) else response.data - assert len(payload) >= 1 - first_user = payload[0]["user"] - assert "mobile" not in first_user - assert "email" not in first_user - - -def test_owner_can_list_workspace_members_with_full_user_fields(api_client, owner, workspace): - api_client.force_authenticate(user=owner) - - response = api_client.get(f"/api/workspace-memberships/?workspace={workspace.id}") - - assert response.status_code == 200 - payload = response.data.get("items", response.data) if isinstance(response.data, dict) else response.data - assert len(payload) >= 1 - first_user = payload[0]["user"] - assert "mobile" in first_user - - -def test_admin_cannot_change_owner_membership_but_canonical_owner_can( - api_client, owner, admin, extra_owner, workspace -): - extra_owner_membership = WorkspaceMembership.objects.create( - workspace=workspace, - user=extra_owner, - role=WorkspaceMembership.Role.OWNER, - is_active=True, - ) - - api_client.force_authenticate(user=admin) - admin_response = api_client.patch( - f"/api/workspace-memberships/{extra_owner_membership.id}/", - {"role": WorkspaceMembership.Role.ADMIN}, - format="json", - ) - - api_client.force_authenticate(user=owner) - owner_response = api_client.patch( - f"/api/workspace-memberships/{extra_owner_membership.id}/", - {"role": WorkspaceMembership.Role.ADMIN}, - format="json", - ) - - assert admin_response.status_code == 403 - assert owner_response.status_code == 200 - - -def test_admin_cannot_add_or_change_admin_memberships(api_client, owner, admin, member, workspace): - admin_membership = WorkspaceMembership.objects.get(workspace=workspace, user=admin, is_deleted=False) - - api_client.force_authenticate(user=admin) - create_response = api_client.post( - "/api/workspace-memberships/", - { - "workspace": str(workspace.id), - "user": str(member.id), - "role": WorkspaceMembership.Role.ADMIN, - }, - format="json", - ) - update_response = api_client.patch( - f"/api/workspace-memberships/{admin_membership.id}/", - {"role": WorkspaceMembership.Role.MEMBER}, - format="json", - ) - delete_response = api_client.delete(f"/api/workspace-memberships/{admin_membership.id}/") - - assert create_response.status_code == 403 - assert update_response.status_code == 403 - assert delete_response.status_code == 403 - - -def test_admin_can_delete_only_owned_clients_tags_and_projects(api_client, owner, admin, workspace): - api_client.force_authenticate(user=owner) - owner_client_response = api_client.post( - "/api/clients/", - {"workspace_id": str(workspace.id), "name": "Owner Client", "notes": ""}, - format="json", - ) - owner_tag_response = api_client.post( - "/api/tags/", - {"workspace_id": str(workspace.id), "name": "Owner Tag", "color": "#123456"}, - format="json", - ) - owner_project_response = api_client.post( - "/api/projects/", - {"workspace": str(workspace.id), "name": "Owner Project", "description": "", "client": None}, - format="json", - ) - - api_client.force_authenticate(user=admin) - admin_client_response = api_client.post( - "/api/clients/", - {"workspace_id": str(workspace.id), "name": "Admin Client", "notes": ""}, - format="json", - ) - admin_tag_response = api_client.post( - "/api/tags/", - {"workspace_id": str(workspace.id), "name": "Admin Tag", "color": "#654321"}, - format="json", - ) - admin_project_response = api_client.post( - "/api/projects/", - {"workspace": str(workspace.id), "name": "Admin Project", "description": "", "client": None}, - format="json", - ) - - delete_owner_client = api_client.delete(f"/api/clients/{owner_client_response.data['id']}/") - delete_owner_tag = api_client.delete(f"/api/tags/{owner_tag_response.data['id']}/") - delete_owner_project = api_client.delete(f"/api/projects/{owner_project_response.data['id']}/") - - delete_admin_client = api_client.delete(f"/api/clients/{admin_client_response.data['id']}/") - delete_admin_tag = api_client.delete(f"/api/tags/{admin_tag_response.data['id']}/") - delete_admin_project = api_client.delete(f"/api/projects/{admin_project_response.data['id']}/") - - assert delete_owner_client.status_code == 403 - assert delete_owner_tag.status_code == 403 - assert delete_owner_project.status_code in {403, 404} - - assert delete_admin_client.status_code == 204 - assert delete_admin_tag.status_code == 204 - assert delete_admin_project.status_code == 204 + self.assertEqual(delete_owner_client.status_code, 403) + self.assertEqual(delete_owner_tag.status_code, 403) + self.assertIn(delete_owner_project.status_code, {403, 404}) + self.assertEqual(delete_admin_client.status_code, 204) + self.assertEqual(delete_admin_tag.status_code, 204) + self.assertEqual(delete_admin_project.status_code, 204) diff --git a/apps/workspaces/tests/test_rates.py b/apps/workspaces/tests/test_rates.py index 59dbfd7..d17c033 100644 --- a/apps/workspaces/tests/test_rates.py +++ b/apps/workspaces/tests/test_rates.py @@ -1,128 +1,184 @@ from decimal import Decimal -import pytest -from rest_framework.test import APIClient +from django.test import TestCase +from rest_framework.test import APITestCase from apps.projects.models import Project from apps.time_entries.services.rates import resolve_rate from apps.users.models import User -from apps.workspaces.models import PriceUnit, Workspace, WorkspaceMembership, WorkspaceUserRate +from apps.workspaces.models import ( + PriceUnit, + Workspace, + WorkspaceMembership, + WorkspaceUserRate, +) +from apps.workspaces.services.rates import ( + update_workspace_user_rate, + upsert_workspace_user_rate, +) -@pytest.fixture() -def api_client(): - return APIClient() +class WorkspaceRateTests(APITestCase): + @classmethod + def setUpTestData(cls): + cls.owner = User.objects.create_user(mobile="09127770001", password="secret123") + cls.admin = User.objects.create_user(mobile="09127770002", password="secret123") + cls.member = User.objects.create_user(mobile="09127770003", password="secret123") + + cls.workspace = Workspace.objects.create(name="Rates", owner=cls.owner) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.admin, + role=WorkspaceMembership.Role.ADMIN, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=cls.workspace, + user=cls.member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) + cls.project = Project.objects.create(workspace=cls.workspace, name="Billing") + + PriceUnit.objects.create( + code="USD", + name="US Dollar", + local_name="Dollar", + symbol="$", + ) + PriceUnit.objects.create( + code="EUR", + name="Euro", + local_name="Euro", + symbol="EUR", + ) + + def test_resolve_rate_uses_workspace_user_rate(self): + WorkspaceUserRate.objects.create( + workspace=self.workspace, + user=self.member, + hourly_rate=Decimal("40.00"), + currency="EUR", + effective_from=self.project.created_at, + is_active=True, + ) + + hourly_rate, currency = resolve_rate(self.member, self.project) + + self.assertEqual(hourly_rate, Decimal("40.00")) + self.assertEqual(currency, "EUR") + + def test_resolve_rate_returns_none_when_workspace_rate_is_missing(self): + hourly_rate, currency = resolve_rate(self.member, self.project) + + self.assertIsNone(hourly_rate) + self.assertEqual(currency, "") + + def test_admin_can_manage_workspace_user_rates(self): + self.client.force_authenticate(user=self.admin) + + create_response = self.client.post( + "/api/workspace-user-rates/", + { + "workspace_id": str(self.workspace.id), + "user_id": str(self.member.id), + "hourly_rate": "35.50", + "currency": "USD", + }, + format="json", + ) + + self.assertEqual(create_response.status_code, 201) + rate_id = create_response.data["id"] + self.assertTrue( + WorkspaceUserRate.objects.filter(id=rate_id, is_deleted=False).exists() + ) + + update_response = self.client.patch( + f"/api/workspace-user-rates/{rate_id}/", + {"hourly_rate": "42.00"}, + format="json", + ) + self.assertEqual(update_response.status_code, 200) + self.assertEqual(update_response.data["hourly_rate"], "42.00") + + delete_response = self.client.delete(f"/api/workspace-user-rates/{rate_id}/") + self.assertEqual(delete_response.status_code, 204) + self.assertTrue(WorkspaceUserRate.all_objects.get(id=rate_id).is_deleted) + + def test_member_cannot_manage_rates(self): + self.client.force_authenticate(user=self.member) + + response = self.client.post( + "/api/workspace-user-rates/", + { + "workspace_id": str(self.workspace.id), + "user_id": str(self.member.id), + "hourly_rate": "25.00", + "currency": "USD", + }, + format="json", + ) + + self.assertEqual(response.status_code, 403) -@pytest.fixture() -def owner(db): - return User.objects.create_user(mobile="09127770001", password="secret123") +class WorkspaceRateServiceTests(TestCase): + @classmethod + def setUpTestData(cls): + cls.owner = User.objects.create_user(mobile="09127770011", password="secret123") + cls.member = User.objects.create_user(mobile="09127770012", password="secret123") + cls.workspace = Workspace.objects.create(name="Rate Services", owner=cls.owner) + def test_upsert_workspace_user_rate_creates_uppercase_currency_rate(self): + rate = upsert_workspace_user_rate( + self.workspace, + self.member.id, + Decimal("12.50"), + "usd", + ) -@pytest.fixture() -def admin(db): - return User.objects.create_user(mobile="09127770002", password="secret123") + self.assertEqual(rate.hourly_rate, Decimal("12.50")) + self.assertEqual(rate.currency, "USD") + self.assertTrue(rate.is_active) + def test_upsert_workspace_user_rate_updates_existing_inactive_rate(self): + rate = WorkspaceUserRate.objects.create( + workspace=self.workspace, + user=self.member, + hourly_rate=Decimal("10.00"), + currency="USD", + effective_from=self.workspace.created_at, + is_active=False, + ) -@pytest.fixture() -def member(db): - return User.objects.create_user(mobile="09127770003", password="secret123") + updated = upsert_workspace_user_rate( + self.workspace, + self.member.id, + Decimal("20.00"), + "eur", + ) + self.assertEqual(updated.id, rate.id) + self.assertEqual(updated.hourly_rate, Decimal("20.00")) + self.assertEqual(updated.currency, "EUR") + self.assertTrue(updated.is_active) -@pytest.fixture() -def workspace(owner, admin, member): - workspace = Workspace.objects.create(name="Rates", owner=owner) - WorkspaceMembership.objects.create(workspace=workspace, user=admin, role=WorkspaceMembership.Role.ADMIN, is_active=True) - WorkspaceMembership.objects.create(workspace=workspace, user=member, role=WorkspaceMembership.Role.MEMBER, is_active=True) - return workspace + def test_update_workspace_user_rate_updates_only_changed_fields(self): + rate = WorkspaceUserRate.objects.create( + workspace=self.workspace, + user=self.member, + hourly_rate=Decimal("10.00"), + currency="USD", + effective_from=self.workspace.created_at, + is_active=True, + ) + updated = update_workspace_user_rate( + rate, + hourly_rate=Decimal("15.00"), + currency="gbp", + ) -@pytest.fixture() -def project(workspace, owner, admin, member): - return Project.objects.create(workspace=workspace, name="Billing") - - -@pytest.fixture() -def price_units(db): - PriceUnit.objects.create(code="USD", name="US Dollar", local_name="دلار آمریکا", symbol="$") - PriceUnit.objects.create(code="EUR", name="Euro", local_name="یورو", symbol="€") - - -def test_resolve_rate_uses_workspace_user_rate(workspace, project, member): - WorkspaceUserRate.objects.create( - workspace=workspace, - user=member, - hourly_rate=Decimal("40.00"), - currency="EUR", - effective_from=project.created_at, - is_active=True, - ) - - hourly_rate, currency = resolve_rate(member, project) - - assert hourly_rate == Decimal("40.00") - assert currency == "EUR" - - -def test_resolve_rate_falls_back_to_workspace_user_rate(workspace, project, member): - WorkspaceUserRate.objects.create( - workspace=workspace, - user=member, - hourly_rate=Decimal("40.00"), - currency="EUR", - effective_from=project.created_at, - is_active=True, - ) - - hourly_rate, currency = resolve_rate(member, project) - - assert hourly_rate == Decimal("40.00") - assert currency == "EUR" - - -def test_admin_can_manage_workspace_user_rates(api_client, admin, member, workspace, price_units): - api_client.force_authenticate(user=admin) - - create_response = api_client.post( - "/api/workspace-user-rates/", - { - "workspace_id": str(workspace.id), - "user_id": str(member.id), - "hourly_rate": "35.50", - "currency": "USD", - }, - format="json", - ) - - assert create_response.status_code == 201 - rate_id = create_response.data["id"] - assert WorkspaceUserRate.objects.filter(id=rate_id, is_deleted=False).exists() - - update_response = api_client.patch( - f"/api/workspace-user-rates/{rate_id}/", - {"hourly_rate": "42.00"}, - format="json", - ) - assert update_response.status_code == 200 - assert update_response.data["hourly_rate"] == "42.00" - - delete_response = api_client.delete(f"/api/workspace-user-rates/{rate_id}/") - assert delete_response.status_code == 204 - assert WorkspaceUserRate.all_objects.get(id=rate_id).is_deleted is True - - -def test_member_cannot_manage_rates(api_client, member, workspace, price_units): - api_client.force_authenticate(user=member) - - workspace_response = api_client.post( - "/api/workspace-user-rates/", - { - "workspace_id": str(workspace.id), - "user_id": str(member.id), - "hourly_rate": "25.00", - "currency": "USD", - }, - format="json", - ) - - assert workspace_response.status_code == 403 + self.assertEqual(updated.hourly_rate, Decimal("15.00")) + self.assertEqual(updated.currency, "GBP")