from __future__ import annotations from django.contrib.auth import get_user_model from django.db.models import Q, QuerySet from django.utils import timezone from rest_framework.exceptions import PermissionDenied, ValidationError from apps.projects.models import Project, ProjectAccess, ProjectUserRate from apps.workspaces.models import Workspace, WorkspaceMembership, WorkspaceUserRate from apps.workspaces.services import PROJECTS_EDIT, get_workspace_role, has_workspace_capability User = get_user_model() PROJECT_ACCESS_MANAGED_ROLES = { WorkspaceMembership.Role.MEMBER, WorkspaceMembership.Role.GUEST, } PROJECT_ACCESS_IMPLICIT_ROLES = { WorkspaceMembership.Role.OWNER, WorkspaceMembership.Role.ADMIN, } def user_has_implicit_project_access(user, workspace: Workspace) -> bool: return get_workspace_role(user, workspace) in PROJECT_ACCESS_IMPLICIT_ROLES def user_has_project_access(user, project: Project) -> bool: if not user or not getattr(user, "is_authenticated", False): return False if user_has_implicit_project_access(user, project.workspace): return True return ProjectAccess.objects.filter(project=project, user=user).exists() def filter_projects_for_user(user, queryset: QuerySet[Project] | None = None) -> QuerySet[Project]: if queryset is None: queryset = Project.objects.all() if not user or not getattr(user, "is_authenticated", False): return queryset.none() return queryset.filter( Q(workspace__owner=user) | Q( workspace__memberships__user=user, workspace__memberships__is_active=True, workspace__memberships__role__in=PROJECT_ACCESS_IMPLICIT_ROLES, ) | Q( workspace__memberships__user=user, workspace__memberships__is_active=True, workspace__memberships__role__in=PROJECT_ACCESS_MANAGED_ROLES, access_memberships__user=user, ) ).distinct() def ensure_project_access(user, project: Project, *, message: str = "Selected project is unavailable.") -> None: if not user_has_project_access(user, project): raise ValidationError({"project_id": message}) def ensure_workspace_project_access(user, workspace: Workspace) -> None: if not has_workspace_capability(user, workspace, PROJECTS_EDIT): raise PermissionDenied("You do not have permission to manage project access in this workspace.") def get_access_managed_membership(workspace: Workspace, user_id: str) -> WorkspaceMembership: membership = WorkspaceMembership.objects.filter( workspace=workspace, user_id=user_id, is_active=True, is_deleted=False, ).select_related("user").first() if not membership: raise ValidationError({"user": "Selected user is not an active member of this workspace."}) if membership.role not in PROJECT_ACCESS_MANAGED_ROLES: raise ValidationError({"user": "Owners and admins have implicit access to all projects."}) return membership def serialize_rate(rate) -> dict | None: if not rate: return None return { "id": str(rate.id), "hourly_rate": str(rate.hourly_rate), "currency": rate.currency, "effective_from": rate.effective_from.isoformat() if rate.effective_from else None, } def build_project_access_item(*, project: Project, has_access: bool, workspace_rate, project_rate) -> dict: return { "id": str(project.id), "name": project.name, "description": project.description, "color": project.color, "is_archived": project.is_archived, "client": ( {"id": str(project.client_id), "name": project.client.name} if project.client_id and project.client else None ), "has_access": has_access, "workspace_rate": serialize_rate(workspace_rate), "project_rate": serialize_rate(project_rate), } def build_project_access_items(*, workspace: Workspace, target_user) -> list[dict]: explicit_access_ids = { str(project_id) for project_id in ProjectAccess.objects.filter( project__workspace=workspace, user=target_user, ).values_list("project_id", flat=True) } workspace_rate = ( WorkspaceUserRate.objects.filter( workspace=workspace, user=target_user, is_deleted=False, ) .order_by("-effective_from", "-updated_at") .first() ) project_rates: dict[str, ProjectUserRate] = {} for rate in ( ProjectUserRate.objects.filter( project__workspace=workspace, user=target_user, is_active=True, is_deleted=False, ) .select_related("project") .order_by("project_id", "-effective_from", "-updated_at") ): project_rates.setdefault(str(rate.project_id), rate) projects = ( Project.objects.filter(workspace=workspace, is_deleted=False) .select_related("client") .order_by("client__name", "name") ) return [ build_project_access_item( project=project, has_access=str(project.id) in explicit_access_ids, workspace_rate=workspace_rate, project_rate=project_rates.get(str(project.id)), ) for project in projects ] def grant_project_accesses(*, actor, workspace: Workspace, target_user, project_ids: list[str]) -> int: ensure_workspace_project_access(actor, workspace) get_access_managed_membership(workspace, str(target_user.id)) projects = list(Project.objects.filter(workspace=workspace, id__in=project_ids, is_deleted=False)) if len(projects) != len(set(project_ids)): raise ValidationError({"project_ids": "One or more selected projects do not belong to this workspace."}) changed = 0 for project in projects: access, created, restored = ProjectAccess.get_or_restore(project=project, user=target_user) if created or restored: access.is_active = True access.updated_at = timezone.now() access.save(update_fields=["is_active", "updated_at"]) changed += 1 return changed def revoke_project_accesses(*, actor, workspace: Workspace, target_user, project_ids: list[str]) -> int: ensure_workspace_project_access(actor, workspace) get_access_managed_membership(workspace, str(target_user.id)) accesses = list( ProjectAccess.objects.filter( project__workspace=workspace, user=target_user, project_id__in=project_ids, ).select_related("project") ) changed = 0 for access in accesses: access.delete() changed += 1 return changed