Files
qlockify-backend-deployment/apps/time_entries/api/views.py
Amirhossein Khalili b5ddcb76aa
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
fix(timezone): fix timer clock-skew
2026-05-26 12:59:49 +03:30

236 lines
9.1 KiB
Python

from datetime import timedelta
from django.utils import timezone
from rest_framework import status
from rest_framework.viewsets import ModelViewSet
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework.filters import SearchFilter
from django_filters.rest_framework import DjangoFilterBackend
from core.paginations.limit_offset import CustomLimitOffsetPagination
from apps.time_entries.models import TimeEntry
from apps.time_entries.api.serializers import (
TimeEntrySerializer,
TimeEntryCreateSerializer,
TimeEntryUpdateSerializer,
TimeEntryStopSerializer
)
from apps.time_entries.api.filters import TimeEntryFilter
from apps.time_entries.services.time_entries import (
create_time_entry,
update_time_entry,
stop_time_entry
)
from apps.workspaces.services import TIME_ENTRIES_MANAGE_OWN, has_workspace_capability
class TimeEntryViewSet(ModelViewSet):
"""
API endpoints for managing time entries.
"""
pagination_class = CustomLimitOffsetPagination
permission_classes = [IsAuthenticated]
filter_backends = [DjangoFilterBackend, SearchFilter]
filterset_class = TimeEntryFilter
search_fields = ["description", "project__name", "project__client__name", "tags__name"]
@staticmethod
def _epoch_ms(value):
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return int(value.timestamp() * 1000)
def _serializer_context(self, *, server_now=None):
context = self.get_serializer_context()
context["server_now"] = server_now or timezone.now()
return context
@staticmethod
def _serialize_duration_ms(entry):
if entry.duration is not None:
return int(entry.duration.total_seconds() * 1000)
if entry.end_time is None:
return 0
return int(max((entry.end_time - entry.start_time).total_seconds(), 0) * 1000)
@staticmethod
def _get_week_start(local_dt):
days_since_sunday = (local_dt.weekday() + 1) % 7
return (local_dt - timedelta(days=days_since_sunday)).date()
def _build_grouped_entries(self, entries, *, server_now):
serialized_entries = TimeEntrySerializer(
entries,
many=True,
context=self._serializer_context(server_now=server_now),
).data
serialized_by_id = {item["id"]: item for item in serialized_entries}
weeks = []
weeks_by_key = {}
for entry in entries:
local_start = timezone.localtime(entry.start_time)
day_date = local_start.date()
week_start = self._get_week_start(local_start)
week_end = week_start + timedelta(days=6)
week_key = week_start.isoformat()
day_key = day_date.isoformat()
duration_ms = self._serialize_duration_ms(entry)
if week_key not in weeks_by_key:
week_payload = {
"key": week_key,
"week_start": week_key,
"week_end": week_end.isoformat(),
"total_ms": 0,
"days": [],
}
weeks_by_key[week_key] = week_payload
weeks.append(week_payload)
week_payload = weeks_by_key[week_key]
week_payload["total_ms"] += duration_ms
day_payload = next((day for day in week_payload["days"] if day["key"] == day_key), None)
if day_payload is None:
day_payload = {
"key": day_key,
"date": day_key,
"total_ms": 0,
"entries": [],
}
week_payload["days"].append(day_payload)
day_payload["total_ms"] += duration_ms
day_payload["entries"].append(serialized_by_id[str(entry.id)])
return weeks
def get_queryset(self):
"""
Users can only interact with their own time entries within workspaces
where they hold an active membership.
"""
if getattr(self, "swagger_fake_view", False) or not self.request.user.is_authenticated:
return TimeEntry.objects.none()
return TimeEntry.objects.filter(
user=self.request.user,
workspace__memberships__user=self.request.user,
workspace__memberships__is_active=True,
is_deleted=False
).distinct().select_related("project", "project__client", "workspace", "user").prefetch_related("tags").order_by("-start_time", "-created_at")
def list(self, request, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request, view=self)
server_now = timezone.now()
current_items_count = len(page)
has_more = (paginator.offset + current_items_count) < paginator.count
return Response(
{
"items_per_page": paginator.limit,
"current_page_items_count": current_items_count,
"total_items": paginator.count,
"offset": paginator.offset,
"next_offset": paginator.offset + current_items_count if has_more else None,
"has_more": has_more,
"server_now_ms": self._epoch_ms(server_now),
"server_now": server_now.isoformat(),
"groups": self._build_grouped_entries(page, server_now=server_now),
}
)
@action(detail=False, methods=["get"], url_path="debug-time")
def debug_time(self, request):
server_now = timezone.now()
return Response(
{
"server_now_ms": self._epoch_ms(server_now),
"server_now": server_now.isoformat(),
}
)
def get_serializer_class(self):
if self.action == "create":
return TimeEntryCreateSerializer
elif self.action in ["update", "partial_update"]:
return TimeEntryUpdateSerializer
elif self.action == "stop":
return TimeEntryStopSerializer
return TimeEntrySerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
entry = create_time_entry(
user=request.user,
workspace_id=serializer.validated_data.pop("workspace_id"),
**serializer.validated_data
)
output_serializer = TimeEntrySerializer(entry, context=self._serializer_context())
return Response(output_serializer.data, status=status.HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
partial = kwargs.pop("partial", False)
entry = self.get_object()
if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN):
return Response(
{"detail": "You do not have permission to manage time entries in this workspace."},
status=status.HTTP_403_FORBIDDEN,
)
serializer = self.get_serializer(entry, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
updated_entry = update_time_entry(
entry=entry,
**serializer.validated_data
)
output_serializer = TimeEntrySerializer(updated_entry, context=self._serializer_context())
return Response(output_serializer.data, status=status.HTTP_200_OK)
@action(detail=True, methods=["post"])
def stop(self, request, pk=None):
"""
Dedicated endpoint to stop an actively running timer.
"""
entry = self.get_object()
if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN):
return Response(
{"detail": "You do not have permission to manage time entries in this workspace."},
status=status.HTTP_403_FORBIDDEN,
)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
end_time = serializer.validated_data.get("end_time")
stopped_entry = stop_time_entry(entry, end_time=end_time)
output_serializer = TimeEntrySerializer(stopped_entry, context=self._serializer_context())
return Response(output_serializer.data, status=status.HTTP_200_OK)
def destroy(self, request, *args, **kwargs):
"""
Soft deletes the time entry.
"""
entry = self.get_object()
if not has_workspace_capability(request.user, entry.workspace, TIME_ENTRIES_MANAGE_OWN):
return Response(
{"detail": "You do not have permission to manage time entries in this workspace."},
status=status.HTTP_403_FORBIDDEN,
)
entry.is_deleted = True
entry.save(update_fields=["is_deleted", "updated_at"])
return Response(status=status.HTTP_204_NO_CONTENT)