feat(permissions): centralize workspace role capability checks

This commit is contained in:
2026-04-25 18:48:50 +03:30
parent 5f9d413a57
commit f960ca8221
14 changed files with 925 additions and 222 deletions

View File

@@ -1,9 +1,15 @@
from rest_framework import permissions
from apps.workspaces.models import Workspace, WorkspaceMembership
from rest_framework import permissions
from apps.workspaces.models import Workspace, WorkspaceMembership
from apps.workspaces.services import (
WORKSPACE_EDIT,
WORKSPACE_MEMBERS_CHANGE_ROLE,
WORKSPACE_VIEW,
has_workspace_capability,
)
class IsWorkspaceOwner(permissions.BasePermission):
class IsWorkspaceOwner(permissions.BasePermission):
"""
Permission check:
- User must be the explicit 'owner' on the Workspace model.
@@ -11,98 +17,86 @@ class IsWorkspaceOwner(permissions.BasePermission):
"""
message = "Access denied. You must be the Workspace Owner to perform this action."
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, 'workspace'):
workspace = obj.workspace
else:
return False
if workspace.owner == request.user:
return True
return WorkspaceMembership.objects.filter(
workspace=workspace,
user=request.user,
role=WorkspaceMembership.Role.OWNER,
is_active=True
).exists()
class IsWorkspaceAdmin(permissions.BasePermission):
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, "workspace"):
workspace = obj.workspace
else:
return False
return workspace.owner_id == request.user.id
class IsWorkspaceAdmin(permissions.BasePermission):
"""
Permission check:
- User's role in the workspace is either 'ADMIN' or 'OWNER'.
"""
message = "Access denied. You must be a Workspace Admin or Owner to perform this action."
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, 'workspace'):
workspace = obj.workspace
else:
return False
if workspace.owner == request.user:
return True
allowed_roles = [
WorkspaceMembership.Role.OWNER,
WorkspaceMembership.Role.ADMIN,
]
return WorkspaceMembership.objects.filter(
workspace=workspace,
user=request.user,
role__in=allowed_roles,
is_active=True
).exists()
class IsWorkspaceMember(permissions.BasePermission):
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, "workspace"):
workspace = obj.workspace
else:
return False
return has_workspace_capability(request.user, workspace, WORKSPACE_EDIT)
class IsWorkspaceMember(permissions.BasePermission):
"""
Permission check:
- User's role in the workspace is 'OWNER', 'ADMIN', or 'MEMBER'.
"""
message = "Access denied. You must be an active member of this workspace."
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, 'workspace'):
workspace = obj.workspace
else:
return False
if workspace.owner == request.user:
return True
allowed_roles = [
WorkspaceMembership.Role.OWNER,
WorkspaceMembership.Role.ADMIN,
WorkspaceMembership.Role.MEMBER,
]
return WorkspaceMembership.objects.filter(
workspace=workspace,
user=request.user,
role__in=allowed_roles,
is_active=True
).exists()
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, "workspace"):
workspace = obj.workspace
else:
return False
return has_workspace_capability(request.user, workspace, WORKSPACE_VIEW)
class CanWorkspaceManageMembers(permissions.BasePermission):
message = "Access denied. You do not have permission to manage workspace members."
def has_object_permission(self, request, view, obj):
if not request.user or not request.user.is_authenticated:
return False
if isinstance(obj, Workspace):
workspace = obj
elif isinstance(obj, WorkspaceMembership):
workspace = obj.workspace
elif hasattr(obj, "workspace"):
workspace = obj.workspace
else:
return False
return has_workspace_capability(
request.user,
workspace,
WORKSPACE_MEMBERS_CHANGE_ROLE,
)

View File

@@ -13,11 +13,22 @@ from apps.notifications.services import (
notify_workspace_membership_removed,
notify_workspace_membership_role_changed,
)
from apps.workspaces.api.permissions import IsWorkspaceOwner, IsWorkspaceAdmin
from apps.workspaces.api.permissions import (
CanWorkspaceManageMembers,
IsWorkspaceAdmin,
IsWorkspaceMember,
IsWorkspaceOwner,
)
from apps.workspaces.api.serializers import WorkspaceMembershipSerializer, WorkspaceSerializer
from apps.workspaces.api.filters import WorkspaceFilter, WorkspaceMembershipFilter
from apps.workspaces.models import Workspace, WorkspaceMembership
from core.paginations.limit_offset import CustomLimitOffsetPagination
from apps.workspaces.models import Workspace, WorkspaceMembership
from apps.workspaces.services import (
WORKSPACE_MEMBERS_VIEW,
can_assign_workspace_role,
can_change_workspace_membership,
has_workspace_capability,
)
from core.paginations.limit_offset import CustomLimitOffsetPagination
class WorkspaceViewSet(ModelViewSet):
@@ -39,10 +50,12 @@ class WorkspaceViewSet(ModelViewSet):
Q(memberships__user=user, memberships__is_active=True)
).distinct()
def get_permissions(self):
if self.action in ["update", "partial_update"]:
return [IsAuthenticated(), IsWorkspaceAdmin()]
def get_permissions(self):
if self.action in ["list", "retrieve"]:
return [IsAuthenticated(), IsWorkspaceMember()]
if self.action in ["update", "partial_update"]:
return [IsAuthenticated(), IsWorkspaceAdmin()]
elif self.action == "destroy":
return [IsAuthenticated(), IsWorkspaceOwner()]
@@ -77,14 +90,31 @@ class WorkspaceMembershipViewSet(ModelViewSet):
Q(workspace__memberships__user=user, workspace__memberships__is_active=True)
).distinct()
def get_permissions(self):
if self.action in ["update", "partial_update"]:
return [IsAuthenticated(), IsWorkspaceAdmin()]
if self.action in ["destroy"]:
return [IsAuthenticated(), IsWorkspaceOwner()]
return [IsAuthenticated()]
def get_permissions(self):
if self.action in ["list", "retrieve", "create", "update", "partial_update"]:
return [IsAuthenticated(), CanWorkspaceManageMembers()]
if self.action in ["destroy"]:
return [IsAuthenticated(), CanWorkspaceManageMembers()]
return [IsAuthenticated()]
def list(self, request, *args, **kwargs):
workspace_id = request.query_params.get("workspace")
if not workspace_id:
return Response(
{"detail": "workspace query parameter is required."},
status=status.HTTP_400_BAD_REQUEST,
)
workspace = get_object_or_404(Workspace, id=workspace_id, is_deleted=False)
if not has_workspace_capability(request.user, workspace, WORKSPACE_MEMBERS_VIEW):
return Response(
{"detail": "You do not have permission to view workspace members."},
status=status.HTTP_403_FORBIDDEN,
)
return super().list(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
"""
Overridden to check permissions manually.
@@ -100,13 +130,24 @@ class WorkspaceMembershipViewSet(ModelViewSet):
workspace = get_object_or_404(Workspace, id=workspace_id)
permission = IsWorkspaceAdmin()
if not permission.has_object_permission(request, self, workspace):
return Response(
{"detail": "You must be a Workspace Admin or Owner to add members."},
status=status.HTTP_403_FORBIDDEN
)
permission = IsWorkspaceAdmin()
if not permission.has_object_permission(request, self, workspace):
return Response(
{"detail": "You must be a Workspace Admin or Owner to add members."},
status=status.HTTP_403_FORBIDDEN
)
requested_role = request.data.get("role")
if requested_role and not can_assign_workspace_role(
request.user,
workspace,
requested_role,
):
return Response(
{"detail": "You do not have permission to assign this workspace role."},
status=status.HTTP_403_FORBIDDEN,
)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
membership = serializer.save()
@@ -127,6 +168,17 @@ class WorkspaceMembershipViewSet(ModelViewSet):
previous_role = membership.role
previous_is_active = membership.is_active
requested_role = request.data.get("role")
if not can_change_workspace_membership(
request.user,
membership,
new_role=requested_role,
):
return Response(
{"detail": "You do not have permission to change this workspace membership."},
status=status.HTTP_403_FORBIDDEN,
)
serializer = self.get_serializer(membership, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
updated_membership = serializer.save()
@@ -168,6 +220,11 @@ class WorkspaceMembershipViewSet(ModelViewSet):
def destroy(self, request, *args, **kwargs):
membership = self.get_object()
if not can_change_workspace_membership(request.user, membership):
return Response(
{"detail": "You do not have permission to remove this workspace membership."},
status=status.HTTP_403_FORBIDDEN,
)
recipient = membership.user
workspace = membership.workspace
role = membership.role