from datetime import timedelta from django.utils import timezone from rest_framework import status from rest_framework.viewsets import ModelViewSet from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from rest_framework.filters import SearchFilter from django_filters.rest_framework import DjangoFilterBackend from core.paginations.limit_offset import CustomLimitOffsetPagination from apps.time_entries.models import TimeEntry from apps.time_entries.api.serializers import ( TimeEntrySerializer, TimeEntryCreateSerializer, TimeEntryUpdateSerializer, TimeEntryStopSerializer ) from apps.time_entries.api.filters import TimeEntryFilter from apps.time_entries.services.time_entries import ( create_time_entry, update_time_entry, stop_time_entry ) from apps.workspaces.services import TIME_ENTRIES_MANAGE_OWN, has_workspace_capability class TimeEntryViewSet(ModelViewSet): """ API endpoints for managing time entries. """ pagination_class = CustomLimitOffsetPagination permission_classes = [IsAuthenticated] filter_backends = [DjangoFilterBackend, SearchFilter] filterset_class = TimeEntryFilter search_fields = ["description", "project__name", "project__client__name", "tags__name"] @staticmethod def _serialize_duration_ms(entry): if entry.duration is not None: return int(entry.duration.total_seconds() * 1000) if entry.end_time is None: return 0 return int(max((entry.end_time - entry.start_time).total_seconds(), 0) * 1000) @staticmethod def _get_week_start(local_dt): days_since_sunday = (local_dt.weekday() + 1) % 7 return (local_dt - timedelta(days=days_since_sunday)).date() def _build_grouped_entries(self, entries): serialized_entries = TimeEntrySerializer(entries, many=True, context=self.get_serializer_context()).data serialized_by_id = {item["id"]: item for item in serialized_entries} weeks = [] weeks_by_key = {} for entry in entries: local_start = timezone.localtime(entry.start_time) day_date = local_start.date() week_start = self._get_week_start(local_start) week_end = week_start + timedelta(days=6) week_key = week_start.isoformat() day_key = day_date.isoformat() duration_ms = self._serialize_duration_ms(entry) if week_key not in weeks_by_key: week_payload = { "key": week_key, "week_start": week_key, "week_end": week_end.isoformat(), "total_ms": 0, "days": [], } weeks_by_key[week_key] = week_payload weeks.append(week_payload) week_payload = weeks_by_key[week_key] week_payload["total_ms"] += duration_ms day_payload = next((day for day in week_payload["days"] if day["key"] == day_key), None) if day_payload is None: day_payload = { "key": day_key, "date": day_key, "total_ms": 0, "entries": [], } week_payload["days"].append(day_payload) day_payload["total_ms"] += duration_ms day_payload["entries"].append(serialized_by_id[str(entry.id)]) return weeks def get_queryset(self): """ Users can only interact with their own time entries within workspaces where they hold an active membership. """ if getattr(self, "swagger_fake_view", False) or not self.request.user.is_authenticated: return TimeEntry.objects.none() return TimeEntry.objects.filter( user=self.request.user, workspace__memberships__user=self.request.user, workspace__memberships__is_active=True, is_deleted=False ).distinct().select_related("project", "project__client", "workspace", "user").prefetch_related("tags").order_by("-start_time", "-created_at") def list(self, request, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) paginator = self.pagination_class() page = paginator.paginate_queryset(queryset, request, view=self) current_items_count = len(page) has_more = (paginator.offset + current_items_count) < paginator.count return Response( { "items_per_page": paginator.limit, "current_page_items_count": current_items_count, "total_items": paginator.count, "offset": paginator.offset, "next_offset": paginator.offset + current_items_count if has_more else None, "has_more": has_more, "groups": self._build_grouped_entries(page), } ) def get_serializer_class(self): if self.action == "create": return TimeEntryCreateSerializer elif self.action in ["update", "partial_update"]: return TimeEntryUpdateSerializer elif self.action == "stop": return TimeEntryStopSerializer return TimeEntrySerializer def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) entry = create_time_entry( user=request.user, workspace_id=serializer.validated_data.pop("workspace_id"), **serializer.validated_data ) output_serializer = TimeEntrySerializer(entry, context=self.get_serializer_context()) return Response(output_serializer.data, status=status.HTTP_201_CREATED) def update(self, request, *args, **kwargs): partial = kwargs.pop("partial", False) entry = self.get_object() if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): return Response( {"detail": "You do not have permission to manage time entries in this workspace."}, status=status.HTTP_403_FORBIDDEN, ) serializer = self.get_serializer(entry, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) updated_entry = update_time_entry( entry=entry, **serializer.validated_data ) output_serializer = TimeEntrySerializer(updated_entry, context=self.get_serializer_context()) return Response(output_serializer.data, status=status.HTTP_200_OK) @action(detail=True, methods=["post"]) def stop(self, request, pk=None): """ Dedicated endpoint to stop an actively running timer. """ entry = self.get_object() if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): return Response( {"detail": "You do not have permission to manage time entries in this workspace."}, status=status.HTTP_403_FORBIDDEN, ) serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) end_time = serializer.validated_data.get("end_time") stopped_entry = stop_time_entry(entry, end_time=end_time) output_serializer = TimeEntrySerializer(stopped_entry, context=self.get_serializer_context()) return Response(output_serializer.data, status=status.HTTP_200_OK) def destroy(self, request, *args, **kwargs): """ Soft deletes the time entry. """ entry = self.get_object() if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN): return Response( {"detail": "You do not have permission to manage time entries in this workspace."}, status=status.HTTP_403_FORBIDDEN, ) entry.is_deleted = True entry.save(update_fields=["is_deleted", "updated_at"]) return Response(status=status.HTTP_204_NO_CONTENT)