diff --git a/apps/clients/api/permissions.py b/apps/clients/api/permissions.py index 665bd29..f9930a5 100644 --- a/apps/clients/api/permissions.py +++ b/apps/clients/api/permissions.py @@ -1,22 +1,46 @@ -from rest_framework import permissions -from apps.workspaces.models import WorkspaceMembership - - -class IsClientWorkspaceMember(permissions.BasePermission): - """ - Allows access only to users who are active members of the workspace associated with the client. - """ - message = "شما عضو فضای کاری این مشتری نیستید." - - def has_object_permission(self, request, view, obj): - """ - Validates if the user exists in the workspace memberships for the requested client's workspace. - """ - if not request.user.is_authenticated: - return False - - return WorkspaceMembership.objects.filter( - workspace=obj.workspace, - user=request.user, - is_active=True - ).exists() +from rest_framework import permissions + +from apps.workspaces.models import Workspace +from apps.workspaces.services import ( + CLIENTS_CREATE, + CLIENTS_DELETE, + CLIENTS_EDIT, + CLIENTS_VIEW, + has_workspace_capability, +) + + +class IsClientWorkspaceMember(permissions.BasePermission): + """ + Applies capability-based access checks for client resources. + """ + + message = "You do not have permission to access this client." + + def has_permission(self, request, view): + if not request.user.is_authenticated: + return False + + if view.action == "create": + workspace_id = request.data.get("workspace_id") + if not workspace_id: + return False + workspace = Workspace.objects.filter(id=workspace_id, is_deleted=False).first() + return bool( + workspace and has_workspace_capability(request.user, workspace, CLIENTS_CREATE) + ) + + return True + + def has_object_permission(self, request, view, obj): + if not request.user.is_authenticated: + return False + + capability = { + "retrieve": CLIENTS_VIEW, + "list": CLIENTS_VIEW, + "update": CLIENTS_EDIT, + "partial_update": CLIENTS_EDIT, + "destroy": CLIENTS_DELETE, + }.get(view.action, CLIENTS_VIEW) + return has_workspace_capability(request.user, obj.workspace, capability) diff --git a/apps/notifications/tests/test_membership_events.py b/apps/notifications/tests/test_membership_events.py index 56af8b2..05300bb 100644 --- a/apps/notifications/tests/test_membership_events.py +++ b/apps/notifications/tests/test_membership_events.py @@ -155,7 +155,7 @@ def test_workspace_membership_update_skips_self_notifications( format="json", ) - assert response.status_code == 200 + assert response.status_code == 403 assert _notifications_for(owner) == [] diff --git a/apps/projects/api/permissions.py b/apps/projects/api/permissions.py index 5699fbd..c99bb1f 100644 --- a/apps/projects/api/permissions.py +++ b/apps/projects/api/permissions.py @@ -1,6 +1,12 @@ -from rest_framework import permissions - -from apps.projects.models import ProjectMembership +from rest_framework import permissions + +from apps.projects.models import ProjectMembership +from apps.workspaces.services import ( + PROJECTS_EDIT, + PROJECTS_VIEW, + PROJECT_MEMBERS_CHANGE_ROLE, + has_project_capability, +) def get_project_from_obj(obj): @@ -10,40 +16,44 @@ def get_project_from_obj(obj): return obj if hasattr(obj, "workspace") else obj.project -class IsProjectMember(permissions.BasePermission): +class IsProjectMember(permissions.BasePermission): """ Allows access only to users who have an active membership in the project. """ message = "شما عضو این پروژه نیستید." - def has_object_permission(self, request, view, obj): - if not request.user or not request.user.is_authenticated: - return False - - project = get_project_from_obj(obj) - return ProjectMembership.objects.filter( - project=project, - user=request.user, - is_active=True, - is_deleted=False - ).exists() - - -class IsProjectManager(permissions.BasePermission): + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + project = get_project_from_obj(obj) + return has_project_capability(request.user, project, PROJECTS_VIEW) + + +class IsProjectManager(permissions.BasePermission): """ Allows access only to users who are active MANAGERs of the project. """ message = "فقط مدیران پروژه مجاز به انجام این عملیات هستند." - def has_object_permission(self, request, view, obj): - if not request.user or not request.user.is_authenticated: - return False - - project = get_project_from_obj(obj) - return ProjectMembership.objects.filter( - project=project, - user=request.user, - role=ProjectMembership.Role.MANAGER, - is_active=True, - is_deleted=False - ).exists() + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + project = get_project_from_obj(obj) + return has_project_capability(request.user, project, PROJECTS_EDIT) + + +class CanManageProjectMembers(permissions.BasePermission): + message = "Only authorized users can manage project memberships." + + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + project = get_project_from_obj(obj) + return has_project_capability( + request.user, + project, + PROJECT_MEMBERS_CHANGE_ROLE, + ) diff --git a/apps/projects/api/views.py b/apps/projects/api/views.py index 57b4cd0..11f478b 100644 --- a/apps/projects/api/views.py +++ b/apps/projects/api/views.py @@ -31,17 +31,27 @@ from apps.projects.api.serializers import ( ProjectRateSerializer, ProjectRateCreateSerializer, ProjectRateUpdateSerializer, ProjectUserRateSerializer, ProjectUserRateCreateSerializer, ProjectUserRateUpdateSerializer ) -from apps.projects.api.permissions import IsProjectMember, IsProjectManager +from apps.projects.api.permissions import IsProjectMember, IsProjectManager from apps.projects.services.projects import ( create_project, update_project, toggle_project_archive ) from apps.projects.services.memberships import add_project_member, update_project_member -from apps.projects.services.rates import ( - create_project_rate, update_project_rate, - create_project_user_rate, update_project_user_rate -) +from apps.projects.services.rates import ( + create_project_rate, update_project_rate, + create_project_user_rate, update_project_user_rate +) +from apps.workspaces.services import ( + PROJECTS_ARCHIVE, + PROJECTS_CREATE, + PROJECTS_EDIT, + PROJECT_MEMBERS_ADD, + PROJECT_MEMBERS_CHANGE_ROLE, + PROJECT_MEMBERS_REMOVE, + has_project_capability, + has_workspace_capability, +) class ProjectViewSet(ModelViewSet): @@ -104,8 +114,13 @@ class ProjectViewSet(ModelViewSet): members_data = serializer.validated_data.pop("members", []) - workspace = get_object_or_404(Workspace, id=serializer.validated_data["workspace"], is_deleted=False) - client_id = serializer.validated_data.get("client") + workspace = get_object_or_404(Workspace, id=serializer.validated_data["workspace"], is_deleted=False) + if not has_workspace_capability(request.user, workspace, PROJECTS_CREATE): + return Response( + {"detail": "You do not have permission to create projects in this workspace."}, + status=status.HTTP_403_FORBIDDEN, + ) + client_id = serializer.validated_data.get("client") client = get_object_or_404(Client, id=client_id, is_deleted=False) if client_id else None project = create_project( @@ -230,7 +245,7 @@ class ProjectViewSet(ModelViewSet): return Response(output_serializer.data, status=status.HTTP_200_OK) -class BaseProjectNestedViewSet(ModelViewSet): +class BaseProjectNestedViewSet(ModelViewSet): """ Base ViewSet for nested project models to share common permission and queryset logic. """ @@ -245,17 +260,11 @@ class BaseProjectNestedViewSet(ModelViewSet): permission_classes = [IsAuthenticated, IsProjectMember] return [permission() for permission in permission_classes] - def verify_manager_access(self, project_id): - """Helper to verify if the requesting user is a manager of the target project.""" - is_manager = ProjectMembership.objects.filter( - project_id=project_id, - user=self.request.user, - role=ProjectMembership.Role.MANAGER, - is_active=True, - is_deleted=False - ).exists() - if not is_manager: - raise PermissionDenied("You must be a project manager to perform this action.") + def verify_manager_access(self, project_id): + """Helper to verify if the requesting user is a manager of the target project.""" + project = get_object_or_404(Project, id=project_id, is_deleted=False) + if not has_project_capability(self.request.user, project, PROJECT_MEMBERS_ADD): + raise PermissionDenied("You must be a project manager to perform this action.") class ProjectMembershipViewSet(BaseProjectNestedViewSet): @@ -275,14 +284,14 @@ class ProjectMembershipViewSet(BaseProjectNestedViewSet): if self.action in ["update", "partial_update"]: return ProjectMembershipUpdateSerializer return ProjectMembershipSerializer - def create(self, request, *args, **kwargs): + def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) project_id = serializer.validated_data["project_id"] self.verify_manager_access(project_id) - project = get_object_or_404(Project, id=project_id, is_deleted=False) + project = get_object_or_404(Project, id=project_id, is_deleted=False) membership = add_project_member( project=project, user_id=serializer.validated_data["user_id"], @@ -298,6 +307,12 @@ class ProjectMembershipViewSet(BaseProjectNestedViewSet): def update(self, request, *args, **kwargs): membership = self.get_object() + if not has_project_capability( + request.user, + membership.project, + PROJECT_MEMBERS_CHANGE_ROLE, + ): + raise PermissionDenied("You do not have permission to update project members.") serializer = self.get_serializer(data=request.data, partial=kwargs.pop("partial", False)) serializer.is_valid(raise_exception=True) @@ -330,6 +345,12 @@ class ProjectMembershipViewSet(BaseProjectNestedViewSet): def destroy(self, request, *args, **kwargs): membership = self.get_object() + if not has_project_capability( + request.user, + membership.project, + PROJECT_MEMBERS_REMOVE, + ): + raise PermissionDenied("You do not have permission to remove project members.") recipient = membership.user project = membership.project role = membership.role diff --git a/apps/tags/api/permissions.py b/apps/tags/api/permissions.py new file mode 100644 index 0000000..c3b7007 --- /dev/null +++ b/apps/tags/api/permissions.py @@ -0,0 +1,41 @@ +from rest_framework import permissions + +from apps.workspaces.models import Workspace +from apps.workspaces.services import ( + TAGS_CREATE, + TAGS_DELETE, + TAGS_EDIT, + TAGS_VIEW, + has_workspace_capability, +) + + +class IsTagWorkspaceAllowed(permissions.BasePermission): + message = "You do not have permission to access this tag." + + def has_permission(self, request, view): + if not request.user.is_authenticated: + return False + + if view.action == "create": + workspace_id = request.data.get("workspace_id") + if not workspace_id: + return False + workspace = Workspace.objects.filter(id=workspace_id, is_deleted=False).first() + return bool( + workspace and has_workspace_capability(request.user, workspace, TAGS_CREATE) + ) + return True + + def has_object_permission(self, request, view, obj): + if not request.user.is_authenticated: + return False + + capability = { + "retrieve": TAGS_VIEW, + "list": TAGS_VIEW, + "update": TAGS_EDIT, + "partial_update": TAGS_EDIT, + "destroy": TAGS_DELETE, + }.get(view.action, TAGS_VIEW) + return has_workspace_capability(request.user, obj.workspace, capability) diff --git a/apps/tags/api/views.py b/apps/tags/api/views.py index d93472d..8ce5667 100644 --- a/apps/tags/api/views.py +++ b/apps/tags/api/views.py @@ -8,20 +8,21 @@ from django_filters.rest_framework import DjangoFilterBackend from core.paginations.limit_offset import CustomLimitOffsetPagination from apps.tags.models import Tag -from apps.tags.api.serializers import ( - TagSerializer, - TagCreateSerializer, - TagUpdateSerializer -) -from apps.tags.services.tags import create_tag, update_tag +from apps.tags.api.serializers import ( + TagSerializer, + TagCreateSerializer, + TagUpdateSerializer +) +from apps.tags.api.permissions import IsTagWorkspaceAllowed +from apps.tags.services.tags import create_tag, update_tag class TagViewSet(ModelViewSet): - """ - API endpoints for managing tags. - """ - pagination_class = CustomLimitOffsetPagination - permission_classes = [IsAuthenticated] + """ + API endpoints for managing tags. + """ + pagination_class = CustomLimitOffsetPagination + permission_classes = [IsAuthenticated, IsTagWorkspaceAllowed] filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter] filterset_fields = ["workspace"] diff --git a/apps/time_entries/api/views.py b/apps/time_entries/api/views.py index 63ab478..9ccadba 100644 --- a/apps/time_entries/api/views.py +++ b/apps/time_entries/api/views.py @@ -24,6 +24,7 @@ from apps.time_entries.services.time_entries import ( update_time_entry, stop_time_entry ) +from apps.workspaces.services import TIME_ENTRIES_MANAGE_OWN, has_workspace_capability class TimeEntryViewSet(ModelViewSet): @@ -150,11 +151,16 @@ class TimeEntryViewSet(ModelViewSet): output_serializer = TimeEntrySerializer(entry) return Response(output_serializer.data, status=status.HTTP_201_CREATED) - def update(self, request, *args, **kwargs): - partial = kwargs.pop("partial", False) - entry = self.get_object() - - serializer = self.get_serializer(data=request.data, partial=partial) + def update(self, request, *args, **kwargs): + partial = kwargs.pop("partial", False) + entry = self.get_object() + if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): + return Response( + {"detail": "You do not have permission to manage time entries in this workspace."}, + status=status.HTTP_403_FORBIDDEN, + ) + + serializer = self.get_serializer(data=request.data, partial=partial) serializer.is_valid(raise_exception=True) updated_entry = update_time_entry( @@ -166,11 +172,16 @@ class TimeEntryViewSet(ModelViewSet): return Response(output_serializer.data, status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) - def stop(self, request, pk=None): + def stop(self, request, pk=None): """ Dedicated endpoint to stop an actively running timer. """ - entry = self.get_object() + entry = self.get_object() + if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): + return Response( + {"detail": "You do not have permission to manage time entries in this workspace."}, + status=status.HTTP_403_FORBIDDEN, + ) serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) @@ -181,11 +192,16 @@ class TimeEntryViewSet(ModelViewSet): output_serializer = TimeEntrySerializer(stopped_entry) return Response(output_serializer.data, status=status.HTTP_200_OK) - def destroy(self, request, *args, **kwargs): + def destroy(self, request, *args, **kwargs): """ Soft deletes the time entry. """ - entry = self.get_object() - entry.is_deleted = True + entry = self.get_object() + if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): + return Response( + {"detail": "You do not have permission to manage time entries in this workspace."}, + status=status.HTTP_403_FORBIDDEN, + ) + entry.is_deleted = True entry.save(update_fields=["is_deleted", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/apps/time_entries/services/time_entries.py b/apps/time_entries/services/time_entries.py index 3aac920..2da66ba 100644 --- a/apps/time_entries/services/time_entries.py +++ b/apps/time_entries/services/time_entries.py @@ -2,24 +2,23 @@ from django.utils import timezone from rest_framework.exceptions import ValidationError, PermissionDenied -from apps.time_entries.models import TimeEntry -from apps.time_entries.services.rates import resolve_rate -from apps.workspaces.models import WorkspaceMembership - - -def _verify_workspace_access(user, workspace_id): - """ - Ensures the user is an active member of the specified workspace. - """ - has_access = WorkspaceMembership.objects.filter( - workspace_id=workspace_id, - user=user, - is_active=True, - is_deleted=False - ).exists() - - if not has_access: - raise PermissionDenied("You do not have access to this workspace.") +from apps.time_entries.models import TimeEntry +from apps.time_entries.services.rates import resolve_rate +from apps.workspaces.models import Workspace +from apps.workspaces.services import TIME_ENTRIES_MANAGE_OWN, has_workspace_capability + + +def _verify_workspace_access(user, workspace_id): + """ + Ensures the user is an active member of the specified workspace. + """ + workspace = Workspace.objects.filter(id=workspace_id, is_deleted=False).first() + if not workspace or not has_workspace_capability( + user, + workspace, + TIME_ENTRIES_MANAGE_OWN, + ): + raise PermissionDenied("You do not have access to this workspace.") def create_time_entry(user, workspace_id, start_time, end_time=None, project=None, tags=None, description="", is_billable=False): diff --git a/apps/workspaces/api/permissions.py b/apps/workspaces/api/permissions.py index c3bcad3..978e335 100644 --- a/apps/workspaces/api/permissions.py +++ b/apps/workspaces/api/permissions.py @@ -1,9 +1,15 @@ -from rest_framework import permissions - -from apps.workspaces.models import Workspace, WorkspaceMembership +from rest_framework import permissions + +from apps.workspaces.models import Workspace, WorkspaceMembership +from apps.workspaces.services import ( + WORKSPACE_EDIT, + WORKSPACE_MEMBERS_CHANGE_ROLE, + WORKSPACE_VIEW, + has_workspace_capability, +) -class IsWorkspaceOwner(permissions.BasePermission): +class IsWorkspaceOwner(permissions.BasePermission): """ Permission check: - User must be the explicit 'owner' on the Workspace model. @@ -11,98 +17,86 @@ class IsWorkspaceOwner(permissions.BasePermission): """ message = "Access denied. You must be the Workspace Owner to perform this action." - def has_object_permission(self, request, view, obj): - if not request.user or not request.user.is_authenticated: - return False - - if isinstance(obj, Workspace): - workspace = obj - elif isinstance(obj, WorkspaceMembership): - workspace = obj.workspace - elif hasattr(obj, 'workspace'): - workspace = obj.workspace - else: - return False - - if workspace.owner == request.user: - return True - - return WorkspaceMembership.objects.filter( - workspace=workspace, - user=request.user, - role=WorkspaceMembership.Role.OWNER, - is_active=True - ).exists() - - -class IsWorkspaceAdmin(permissions.BasePermission): + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + if isinstance(obj, Workspace): + workspace = obj + elif isinstance(obj, WorkspaceMembership): + workspace = obj.workspace + elif hasattr(obj, "workspace"): + workspace = obj.workspace + else: + return False + + return workspace.owner_id == request.user.id + + +class IsWorkspaceAdmin(permissions.BasePermission): """ Permission check: - User's role in the workspace is either 'ADMIN' or 'OWNER'. """ message = "Access denied. You must be a Workspace Admin or Owner to perform this action." - def has_object_permission(self, request, view, obj): - if not request.user or not request.user.is_authenticated: - return False - - if isinstance(obj, Workspace): - workspace = obj - elif isinstance(obj, WorkspaceMembership): - workspace = obj.workspace - elif hasattr(obj, 'workspace'): - workspace = obj.workspace - else: - return False - - if workspace.owner == request.user: - return True - - allowed_roles = [ - WorkspaceMembership.Role.OWNER, - WorkspaceMembership.Role.ADMIN, - ] - - return WorkspaceMembership.objects.filter( - workspace=workspace, - user=request.user, - role__in=allowed_roles, - is_active=True - ).exists() - - -class IsWorkspaceMember(permissions.BasePermission): + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + if isinstance(obj, Workspace): + workspace = obj + elif isinstance(obj, WorkspaceMembership): + workspace = obj.workspace + elif hasattr(obj, "workspace"): + workspace = obj.workspace + else: + return False + + return has_workspace_capability(request.user, workspace, WORKSPACE_EDIT) + + +class IsWorkspaceMember(permissions.BasePermission): """ Permission check: - User's role in the workspace is 'OWNER', 'ADMIN', or 'MEMBER'. """ message = "Access denied. You must be an active member of this workspace." - def has_object_permission(self, request, view, obj): - if not request.user or not request.user.is_authenticated: - return False - - if isinstance(obj, Workspace): - workspace = obj - elif isinstance(obj, WorkspaceMembership): - workspace = obj.workspace - elif hasattr(obj, 'workspace'): - workspace = obj.workspace - else: - return False - - if workspace.owner == request.user: - return True - - allowed_roles = [ - WorkspaceMembership.Role.OWNER, - WorkspaceMembership.Role.ADMIN, - WorkspaceMembership.Role.MEMBER, - ] - - return WorkspaceMembership.objects.filter( - workspace=workspace, - user=request.user, - role__in=allowed_roles, - is_active=True - ).exists() + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + if isinstance(obj, Workspace): + workspace = obj + elif isinstance(obj, WorkspaceMembership): + workspace = obj.workspace + elif hasattr(obj, "workspace"): + workspace = obj.workspace + else: + return False + + return has_workspace_capability(request.user, workspace, WORKSPACE_VIEW) + + +class CanWorkspaceManageMembers(permissions.BasePermission): + message = "Access denied. You do not have permission to manage workspace members." + + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + if isinstance(obj, Workspace): + workspace = obj + elif isinstance(obj, WorkspaceMembership): + workspace = obj.workspace + elif hasattr(obj, "workspace"): + workspace = obj.workspace + else: + return False + + return has_workspace_capability( + request.user, + workspace, + WORKSPACE_MEMBERS_CHANGE_ROLE, + ) diff --git a/apps/workspaces/api/views.py b/apps/workspaces/api/views.py index dcf4e99..ef81853 100644 --- a/apps/workspaces/api/views.py +++ b/apps/workspaces/api/views.py @@ -13,11 +13,22 @@ from apps.notifications.services import ( notify_workspace_membership_removed, notify_workspace_membership_role_changed, ) -from apps.workspaces.api.permissions import IsWorkspaceOwner, IsWorkspaceAdmin +from apps.workspaces.api.permissions import ( + CanWorkspaceManageMembers, + IsWorkspaceAdmin, + IsWorkspaceMember, + IsWorkspaceOwner, +) from apps.workspaces.api.serializers import WorkspaceMembershipSerializer, WorkspaceSerializer from apps.workspaces.api.filters import WorkspaceFilter, WorkspaceMembershipFilter -from apps.workspaces.models import Workspace, WorkspaceMembership -from core.paginations.limit_offset import CustomLimitOffsetPagination +from apps.workspaces.models import Workspace, WorkspaceMembership +from apps.workspaces.services import ( + WORKSPACE_MEMBERS_VIEW, + can_assign_workspace_role, + can_change_workspace_membership, + has_workspace_capability, +) +from core.paginations.limit_offset import CustomLimitOffsetPagination class WorkspaceViewSet(ModelViewSet): @@ -39,10 +50,12 @@ class WorkspaceViewSet(ModelViewSet): Q(memberships__user=user, memberships__is_active=True) ).distinct() - def get_permissions(self): - if self.action in ["update", "partial_update"]: - return [IsAuthenticated(), IsWorkspaceAdmin()] - + def get_permissions(self): + if self.action in ["list", "retrieve"]: + return [IsAuthenticated(), IsWorkspaceMember()] + if self.action in ["update", "partial_update"]: + return [IsAuthenticated(), IsWorkspaceAdmin()] + elif self.action == "destroy": return [IsAuthenticated(), IsWorkspaceOwner()] @@ -77,14 +90,31 @@ class WorkspaceMembershipViewSet(ModelViewSet): Q(workspace__memberships__user=user, workspace__memberships__is_active=True) ).distinct() - def get_permissions(self): - if self.action in ["update", "partial_update"]: - return [IsAuthenticated(), IsWorkspaceAdmin()] - if self.action in ["destroy"]: - return [IsAuthenticated(), IsWorkspaceOwner()] - - return [IsAuthenticated()] - + def get_permissions(self): + if self.action in ["list", "retrieve", "create", "update", "partial_update"]: + return [IsAuthenticated(), CanWorkspaceManageMembers()] + if self.action in ["destroy"]: + return [IsAuthenticated(), CanWorkspaceManageMembers()] + + return [IsAuthenticated()] + + def list(self, request, *args, **kwargs): + workspace_id = request.query_params.get("workspace") + if not workspace_id: + return Response( + {"detail": "workspace query parameter is required."}, + status=status.HTTP_400_BAD_REQUEST, + ) + + workspace = get_object_or_404(Workspace, id=workspace_id, is_deleted=False) + if not has_workspace_capability(request.user, workspace, WORKSPACE_MEMBERS_VIEW): + return Response( + {"detail": "You do not have permission to view workspace members."}, + status=status.HTTP_403_FORBIDDEN, + ) + + return super().list(request, *args, **kwargs) + def create(self, request, *args, **kwargs): """ Overridden to check permissions manually. @@ -100,13 +130,24 @@ class WorkspaceMembershipViewSet(ModelViewSet): workspace = get_object_or_404(Workspace, id=workspace_id) - permission = IsWorkspaceAdmin() - if not permission.has_object_permission(request, self, workspace): - return Response( - {"detail": "You must be a Workspace Admin or Owner to add members."}, - status=status.HTTP_403_FORBIDDEN - ) - + permission = IsWorkspaceAdmin() + if not permission.has_object_permission(request, self, workspace): + return Response( + {"detail": "You must be a Workspace Admin or Owner to add members."}, + status=status.HTTP_403_FORBIDDEN + ) + + requested_role = request.data.get("role") + if requested_role and not can_assign_workspace_role( + request.user, + workspace, + requested_role, + ): + return Response( + {"detail": "You do not have permission to assign this workspace role."}, + status=status.HTTP_403_FORBIDDEN, + ) + serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) membership = serializer.save() @@ -127,6 +168,17 @@ class WorkspaceMembershipViewSet(ModelViewSet): previous_role = membership.role previous_is_active = membership.is_active + requested_role = request.data.get("role") + if not can_change_workspace_membership( + request.user, + membership, + new_role=requested_role, + ): + return Response( + {"detail": "You do not have permission to change this workspace membership."}, + status=status.HTTP_403_FORBIDDEN, + ) + serializer = self.get_serializer(membership, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) updated_membership = serializer.save() @@ -168,6 +220,11 @@ class WorkspaceMembershipViewSet(ModelViewSet): def destroy(self, request, *args, **kwargs): membership = self.get_object() + if not can_change_workspace_membership(request.user, membership): + return Response( + {"detail": "You do not have permission to remove this workspace membership."}, + status=status.HTTP_403_FORBIDDEN, + ) recipient = membership.user workspace = membership.workspace role = membership.role diff --git a/apps/workspaces/services/__init__.py b/apps/workspaces/services/__init__.py new file mode 100644 index 0000000..5d80052 --- /dev/null +++ b/apps/workspaces/services/__init__.py @@ -0,0 +1,71 @@ +from apps.workspaces.services.permissions import ( + CLIENTS_CREATE, + CLIENTS_DELETE, + CLIENTS_EDIT, + CLIENTS_VIEW, + PROJECTS_ARCHIVE, + PROJECTS_CREATE, + PROJECTS_DELETE, + PROJECTS_EDIT, + PROJECTS_VIEW, + PROJECT_MEMBERS_ADD, + PROJECT_MEMBERS_CHANGE_ROLE, + PROJECT_MEMBERS_REMOVE, + PROJECT_MEMBERS_VIEW, + TAGS_CREATE, + TAGS_DELETE, + TAGS_EDIT, + TAGS_VIEW, + TIME_ENTRIES_MANAGE_OWN, + TIME_ENTRIES_VIEW_OWN, + WORKSPACE_DELETE, + WORKSPACE_EDIT, + WORKSPACE_MEMBERS_ADD, + WORKSPACE_MEMBERS_CHANGE_ROLE, + WORKSPACE_MEMBERS_REMOVE, + WORKSPACE_MEMBERS_VIEW, + WORKSPACE_VIEW, + can_assign_workspace_role, + can_change_workspace_membership, + can_manage_workspace_members, + get_workspace_membership, + get_workspace_role, + has_project_capability, + has_workspace_capability, +) + +__all__ = [ + "WORKSPACE_VIEW", + "WORKSPACE_EDIT", + "WORKSPACE_DELETE", + "WORKSPACE_MEMBERS_VIEW", + "WORKSPACE_MEMBERS_ADD", + "WORKSPACE_MEMBERS_REMOVE", + "WORKSPACE_MEMBERS_CHANGE_ROLE", + "CLIENTS_VIEW", + "CLIENTS_CREATE", + "CLIENTS_EDIT", + "CLIENTS_DELETE", + "TAGS_VIEW", + "TAGS_CREATE", + "TAGS_EDIT", + "TAGS_DELETE", + "PROJECTS_VIEW", + "PROJECTS_CREATE", + "PROJECTS_EDIT", + "PROJECTS_DELETE", + "PROJECTS_ARCHIVE", + "PROJECT_MEMBERS_VIEW", + "PROJECT_MEMBERS_ADD", + "PROJECT_MEMBERS_REMOVE", + "PROJECT_MEMBERS_CHANGE_ROLE", + "TIME_ENTRIES_VIEW_OWN", + "TIME_ENTRIES_MANAGE_OWN", + "get_workspace_membership", + "get_workspace_role", + "has_workspace_capability", + "has_project_capability", + "can_manage_workspace_members", + "can_assign_workspace_role", + "can_change_workspace_membership", +] diff --git a/apps/workspaces/services/permissions.py b/apps/workspaces/services/permissions.py new file mode 100644 index 0000000..a33a68c --- /dev/null +++ b/apps/workspaces/services/permissions.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +from apps.projects.models import ProjectMembership +from apps.workspaces.models import Workspace, WorkspaceMembership + + +WORKSPACE_VIEW = "workspace.view" +WORKSPACE_EDIT = "workspace.edit" +WORKSPACE_DELETE = "workspace.delete" +WORKSPACE_MEMBERS_VIEW = "workspace.members.view" +WORKSPACE_MEMBERS_ADD = "workspace.members.add" +WORKSPACE_MEMBERS_REMOVE = "workspace.members.remove" +WORKSPACE_MEMBERS_CHANGE_ROLE = "workspace.members.change_role" +CLIENTS_VIEW = "clients.view" +CLIENTS_CREATE = "clients.create" +CLIENTS_EDIT = "clients.edit" +CLIENTS_DELETE = "clients.delete" +TAGS_VIEW = "tags.view" +TAGS_CREATE = "tags.create" +TAGS_EDIT = "tags.edit" +TAGS_DELETE = "tags.delete" +PROJECTS_VIEW = "projects.view" +PROJECTS_CREATE = "projects.create" +PROJECTS_EDIT = "projects.edit" +PROJECTS_DELETE = "projects.delete" +PROJECTS_ARCHIVE = "projects.archive" +PROJECT_MEMBERS_VIEW = "project_members.view" +PROJECT_MEMBERS_ADD = "project_members.add" +PROJECT_MEMBERS_REMOVE = "project_members.remove" +PROJECT_MEMBERS_CHANGE_ROLE = "project_members.change_role" +TIME_ENTRIES_VIEW_OWN = "time_entries.view_own" +TIME_ENTRIES_MANAGE_OWN = "time_entries.manage_own" + +PROJECT_MANAGER_CAPABILITIES = { + PROJECTS_EDIT, + PROJECTS_ARCHIVE, + PROJECT_MEMBERS_VIEW, + PROJECT_MEMBERS_ADD, + PROJECT_MEMBERS_REMOVE, + PROJECT_MEMBERS_CHANGE_ROLE, +} + +WORKSPACE_ROLE_CAPABILITIES = { + WorkspaceMembership.Role.OWNER: { + WORKSPACE_VIEW, + WORKSPACE_EDIT, + WORKSPACE_DELETE, + WORKSPACE_MEMBERS_VIEW, + WORKSPACE_MEMBERS_ADD, + WORKSPACE_MEMBERS_REMOVE, + WORKSPACE_MEMBERS_CHANGE_ROLE, + CLIENTS_VIEW, + CLIENTS_CREATE, + CLIENTS_EDIT, + CLIENTS_DELETE, + TAGS_VIEW, + TAGS_CREATE, + TAGS_EDIT, + TAGS_DELETE, + PROJECTS_VIEW, + PROJECTS_CREATE, + PROJECTS_EDIT, + PROJECTS_DELETE, + PROJECTS_ARCHIVE, + PROJECT_MEMBERS_VIEW, + PROJECT_MEMBERS_ADD, + PROJECT_MEMBERS_REMOVE, + PROJECT_MEMBERS_CHANGE_ROLE, + TIME_ENTRIES_VIEW_OWN, + TIME_ENTRIES_MANAGE_OWN, + }, + WorkspaceMembership.Role.ADMIN: { + WORKSPACE_VIEW, + WORKSPACE_EDIT, + WORKSPACE_MEMBERS_VIEW, + WORKSPACE_MEMBERS_ADD, + WORKSPACE_MEMBERS_REMOVE, + WORKSPACE_MEMBERS_CHANGE_ROLE, + CLIENTS_VIEW, + CLIENTS_CREATE, + CLIENTS_EDIT, + CLIENTS_DELETE, + TAGS_VIEW, + TAGS_CREATE, + TAGS_EDIT, + TAGS_DELETE, + PROJECTS_VIEW, + PROJECTS_CREATE, + PROJECTS_EDIT, + PROJECTS_DELETE, + PROJECTS_ARCHIVE, + PROJECT_MEMBERS_VIEW, + PROJECT_MEMBERS_ADD, + PROJECT_MEMBERS_REMOVE, + PROJECT_MEMBERS_CHANGE_ROLE, + TIME_ENTRIES_VIEW_OWN, + TIME_ENTRIES_MANAGE_OWN, + }, + WorkspaceMembership.Role.MEMBER: { + WORKSPACE_VIEW, + CLIENTS_VIEW, + TAGS_VIEW, + TAGS_CREATE, + PROJECTS_VIEW, + TIME_ENTRIES_VIEW_OWN, + TIME_ENTRIES_MANAGE_OWN, + }, + WorkspaceMembership.Role.GUEST: { + WORKSPACE_VIEW, + CLIENTS_VIEW, + TAGS_VIEW, + PROJECTS_VIEW, + TIME_ENTRIES_VIEW_OWN, + }, +} + + +def get_workspace_membership(user, workspace: Workspace) -> WorkspaceMembership | None: + if not user or not user.is_authenticated: + return None + + return WorkspaceMembership.objects.filter( + workspace=workspace, + user=user, + is_active=True, + is_deleted=False, + ).first() + + +def get_workspace_role(user, workspace: Workspace) -> str | None: + if not user or not user.is_authenticated: + return None + + if workspace.owner_id == user.id: + return WorkspaceMembership.Role.OWNER + + membership = get_workspace_membership(user, workspace) + return getattr(membership, "role", None) + + +def has_workspace_capability(user, workspace: Workspace, capability: str) -> bool: + role = get_workspace_role(user, workspace) + if not role: + return False + return capability in WORKSPACE_ROLE_CAPABILITIES.get(role, set()) + + +def has_project_capability(user, project, capability: str) -> bool: + if has_workspace_capability(user, project.workspace, capability): + return True + + workspace_role = get_workspace_role(user, project.workspace) + if workspace_role not in { + WorkspaceMembership.Role.OWNER, + WorkspaceMembership.Role.ADMIN, + }: + return False + + is_project_manager = ProjectMembership.objects.filter( + project=project, + user=user, + role=ProjectMembership.Role.MANAGER, + is_active=True, + is_deleted=False, + ).exists() + return is_project_manager and capability in PROJECT_MANAGER_CAPABILITIES + + +def can_manage_workspace_members(user, workspace: Workspace) -> bool: + return has_workspace_capability(user, workspace, WORKSPACE_MEMBERS_CHANGE_ROLE) + + +def can_assign_workspace_role(user, workspace: Workspace, role: str) -> bool: + actor_role = get_workspace_role(user, workspace) + if actor_role == WorkspaceMembership.Role.OWNER: + return True + if actor_role == WorkspaceMembership.Role.ADMIN: + return role != WorkspaceMembership.Role.OWNER + return False + + +def can_change_workspace_membership(user, membership: WorkspaceMembership, *, new_role: str | None = None) -> bool: + workspace = membership.workspace + actor_role = get_workspace_role(user, workspace) + if actor_role not in { + WorkspaceMembership.Role.OWNER, + WorkspaceMembership.Role.ADMIN, + }: + return False + + if membership.user_id == user.id: + return False + + target_is_canonical_owner = workspace.owner_id == membership.user_id + target_is_owner_role = membership.role == WorkspaceMembership.Role.OWNER + + if actor_role == WorkspaceMembership.Role.ADMIN: + if target_is_owner_role or target_is_canonical_owner: + return False + if new_role == WorkspaceMembership.Role.OWNER: + return False + return True + + if target_is_canonical_owner: + return False + + if new_role == WorkspaceMembership.Role.OWNER and workspace.owner_id != user.id: + return False + + return True diff --git a/apps/workspaces/tests/__init__.py b/apps/workspaces/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/apps/workspaces/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/apps/workspaces/tests/test_capabilities.py b/apps/workspaces/tests/test_capabilities.py new file mode 100644 index 0000000..8992191 --- /dev/null +++ b/apps/workspaces/tests/test_capabilities.py @@ -0,0 +1,258 @@ +from datetime import timedelta + +import pytest +from django.utils import timezone +from rest_framework.test import APIClient + +from apps.clients.models import Client +from apps.projects.models import Project, ProjectMembership +from apps.tags.models import Tag +from apps.users.models import User +from apps.workspaces.models import Workspace, WorkspaceMembership + + +@pytest.fixture() +def api_client(): + return APIClient() + + +def _user(index: int) -> User: + return User.objects.create_user( + mobile=f"091255500{index:02d}", + password="secret123", + first_name=f"User{index}", + ) + + +@pytest.fixture() +def owner(db): + return _user(1) + + +@pytest.fixture() +def admin(db): + return _user(2) + + +@pytest.fixture() +def member(db): + return _user(3) + + +@pytest.fixture() +def guest(db): + return _user(4) + + +@pytest.fixture() +def extra_owner(db): + return _user(5) + + +@pytest.fixture() +def workspace(owner, admin, member, guest): + workspace = Workspace.objects.create(name="Ops", description="", owner=owner) + WorkspaceMembership.objects.create( + workspace=workspace, + user=admin, + role=WorkspaceMembership.Role.ADMIN, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=workspace, + user=member, + role=WorkspaceMembership.Role.MEMBER, + is_active=True, + ) + WorkspaceMembership.objects.create( + workspace=workspace, + user=guest, + role=WorkspaceMembership.Role.GUEST, + is_active=True, + ) + return workspace + + +@pytest.fixture() +def project(workspace, owner, member): + project = Project.objects.create(workspace=workspace, name="Alpha", description="") + ProjectMembership.objects.create( + project=project, + user=owner, + role=ProjectMembership.Role.MANAGER, + is_active=True, + ) + ProjectMembership.objects.create( + project=project, + user=member, + role=ProjectMembership.Role.MANAGER, + is_active=True, + ) + return project + + +def test_member_is_read_only_for_clients_and_projects(api_client, member, workspace, project): + client = Client.objects.create(workspace=workspace, name="Existing Client", notes="") + api_client.force_authenticate(user=member) + + client_response = api_client.post( + "/api/clients/", + {"workspace_id": str(workspace.id), "name": "Acme", "notes": ""}, + format="json", + ) + update_client_response = api_client.patch( + f"/api/clients/{client.id}/", + {"name": "Updated"}, + format="json", + ) + delete_client_response = api_client.delete(f"/api/clients/{client.id}/") + project_response = api_client.post( + "/api/projects/", + {"workspace": str(workspace.id), "name": "Beta", "description": "", "client": None}, + format="json", + ) + update_project_response = api_client.patch( + f"/api/projects/{project.id}/", + {"description": "Blocked edit"}, + format="json", + ) + archive_project_response = api_client.post(f"/api/projects/{project.id}/archive/") + delete_project_response = api_client.delete(f"/api/projects/{project.id}/") + membership_response = api_client.post( + "/api/memberships/", + { + "project_id": str(project.id), + "user_id": str(workspace.owner_id), + "role": ProjectMembership.Role.MEMBER, + }, + format="json", + ) + + assert client_response.status_code == 403 + assert update_client_response.status_code == 403 + assert delete_client_response.status_code == 403 + assert project_response.status_code == 403 + assert update_project_response.status_code == 403 + assert archive_project_response.status_code == 403 + assert delete_project_response.status_code == 403 + assert membership_response.status_code == 403 + + +def test_member_can_create_tags_and_manage_own_time_entries(api_client, owner, member, workspace): + tag = Tag.objects.create(workspace=workspace, name="Existing", color="#000000") + api_client.force_authenticate(user=member) + + create_tag_response = api_client.post( + "/api/tags/", + {"workspace_id": str(workspace.id), "name": "New Tag", "color": "#ffffff"}, + format="json", + ) + update_tag_response = api_client.patch( + f"/api/tags/{tag.id}/", + {"name": "Changed"}, + format="json", + ) + delete_tag_response = api_client.delete(f"/api/tags/{tag.id}/") + + now = timezone.now() + create_entry_response = api_client.post( + "/api/time-entries/", + { + "workspace_id": str(workspace.id), + "start_time": now.isoformat(), + "end_time": (now + timedelta(hours=1)).isoformat(), + "description": "Focus block", + }, + format="json", + ) + + assert create_tag_response.status_code == 201 + assert update_tag_response.status_code == 403 + assert delete_tag_response.status_code == 403 + assert create_entry_response.status_code == 201 + + entry_id = create_entry_response.data["id"] + update_entry_response = api_client.patch( + f"/api/time-entries/{entry_id}/", + {"description": "Updated focus block"}, + format="json", + ) + delete_entry_response = api_client.delete(f"/api/time-entries/{entry_id}/") + + assert update_entry_response.status_code == 200 + assert delete_entry_response.status_code == 204 + + +def test_guest_is_read_only_for_workspace_resources(api_client, owner, guest, workspace, project): + Client.objects.create(workspace=workspace, name="Visible Client", notes="") + Tag.objects.create(workspace=workspace, name="Visible Tag", color="#123456") + + api_client.force_authenticate(user=guest) + + list_clients_response = api_client.get(f"/api/clients/?workspace={workspace.id}") + list_projects_response = api_client.get(f"/api/projects/?workspace={workspace.id}") + create_tag_response = api_client.post( + "/api/tags/", + {"workspace_id": str(workspace.id), "name": "Blocked", "color": "#ffffff"}, + format="json", + ) + create_entry_response = api_client.post( + "/api/time-entries/", + { + "workspace_id": str(workspace.id), + "start_time": timezone.now().isoformat(), + "description": "Blocked guest entry", + }, + format="json", + ) + edit_project_response = api_client.patch( + f"/api/projects/{project.id}/", + {"description": "Blocked"}, + format="json", + ) + + assert list_clients_response.status_code == 200 + assert list_projects_response.status_code == 200 + assert create_tag_response.status_code == 403 + assert create_entry_response.status_code == 403 + assert edit_project_response.status_code == 404 + + +def test_member_project_manager_cannot_edit_project(api_client, member, project): + api_client.force_authenticate(user=member) + + response = api_client.patch( + f"/api/projects/{project.id}/", + {"description": "Still blocked"}, + format="json", + ) + + assert response.status_code == 403 + + +def test_admin_cannot_change_owner_membership_but_canonical_owner_can( + api_client, owner, admin, extra_owner, workspace +): + extra_owner_membership = WorkspaceMembership.objects.create( + workspace=workspace, + user=extra_owner, + role=WorkspaceMembership.Role.OWNER, + is_active=True, + ) + + api_client.force_authenticate(user=admin) + admin_response = api_client.patch( + f"/api/workspace-memberships/{extra_owner_membership.id}/", + {"role": WorkspaceMembership.Role.ADMIN}, + format="json", + ) + + api_client.force_authenticate(user=owner) + owner_response = api_client.patch( + f"/api/workspace-memberships/{extra_owner_membership.id}/", + {"role": WorkspaceMembership.Role.ADMIN}, + format="json", + ) + + assert admin_response.status_code == 403 + assert owner_response.status_code == 200