from __future__ import annotations from collections import defaultdict from dataclasses import dataclass, replace from datetime import date, datetime, time, timedelta from decimal import Decimal from typing import Iterable import jdatetime from django.contrib.auth import get_user_model from django.db.models import Prefetch, QuerySet from django.utils import timezone from django.utils.dateparse import parse_date from rest_framework import serializers from apps.clients.models import Client from apps.projects.models import Project from apps.tags.models import Tag from apps.time_entries.models import TimeEntry from apps.workspaces.models import Workspace from apps.workspaces.services import WORKSPACE_VIEW, get_workspace_role, has_workspace_capability User = get_user_model() PERIOD_THIS_WEEK = "this_week" PERIOD_THIS_MONTH = "this_month" PERIOD_THIS_YEAR = "this_year" PERIOD_HALF_YEAR_FIRST = "half_year_first" PERIOD_HALF_YEAR_SECOND = "half_year_second" PERIOD_CUSTOM = "period" ALLOWED_PERIODS = { PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_THIS_YEAR, PERIOD_HALF_YEAR_FIRST, PERIOD_HALF_YEAR_SECOND, PERIOD_CUSTOM, } def _start_of_week(local_date: date) -> date: days_since_sunday = (local_date.weekday() + 1) % 7 return local_date - timedelta(days=days_since_sunday) def _start_of_persian_week(local_date: date) -> date: days_since_saturday = (local_date.weekday() + 2) % 7 return local_date - timedelta(days=days_since_saturday) def _localize_datetime(value: datetime) -> datetime: if timezone.is_naive(value): value = timezone.make_aware(value, timezone.get_current_timezone()) return timezone.localtime(value) def _user_display(user) -> str: full_name = getattr(user, "full_name", "").strip() if full_name and full_name != "Anonymous": return full_name return getattr(user, "mobile", str(user)) def _format_duration_seconds(total_seconds: int) -> str: total_seconds = max(int(total_seconds), 0) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def _money_map() -> defaultdict[str, Decimal]: return defaultdict(lambda: Decimal("0.00")) def _serialize_money_totals(values: dict[str, Decimal]) -> list[dict]: return [ {"currency": currency, "amount": f"{amount.quantize(Decimal('0.01'))}"} for currency, amount in sorted(values.items()) if amount is not None ] def _serialize_rate(amount: Decimal | None, currency: str | None) -> dict | None: if amount is None: return None return { "amount": f"{Decimal(amount).quantize(Decimal('0.01'))}", "currency": currency or "USD", } def _add_income(bucket: defaultdict[str, Decimal], entry: TimeEntry): if not entry.is_billable or not entry.hourly_rate: return duration_seconds = get_entry_duration_seconds(entry) if duration_seconds <= 0: return hourly_rate = Decimal(entry.hourly_rate) income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01")) bucket[(entry.currency or "USD")] += income def get_entry_duration_seconds(entry: TimeEntry) -> int: if entry.duration is not None: return max(int(entry.duration.total_seconds()), 0) if entry.end_time: return max(int((entry.end_time - entry.start_time).total_seconds()), 0) return 0 def _parse_id_list(values: Iterable[str]) -> list[str]: return [str(value).strip() for value in values if str(value).strip()] @dataclass class ReportFilters: workspace: Workspace period: str from_date: date to_date: date user_id: str | None client_id: str | None project_id: str | None tag_ids: list[str] actor: object is_workspace_scope: bool language: str class ReportFilterSerializer(serializers.Serializer): workspace = serializers.UUIDField() period = serializers.ChoiceField(choices=sorted(ALLOWED_PERIODS)) from_date = serializers.DateField(required=False, allow_null=True) to_date = serializers.DateField(required=False, allow_null=True) user = serializers.UUIDField(required=False, allow_null=True) client = serializers.UUIDField(required=False, allow_null=True) project = serializers.UUIDField(required=False, allow_null=True) tags = serializers.ListField( child=serializers.UUIDField(), required=False, allow_empty=True, ) language = serializers.ChoiceField(choices=("en", "fa"), required=False, default="en") def _resolve_period_bounds(period: str, from_date: date | None, to_date: date | None, *, language: str) -> tuple[date, date]: today = timezone.localdate() if language == "fa": today_jalali = jdatetime.date.fromgregorian(date=today) if period == PERIOD_THIS_WEEK: start = _start_of_persian_week(today) return start, start + timedelta(days=6) if period == PERIOD_THIS_MONTH: start_j = jdatetime.date(today_jalali.year, today_jalali.month, 1) if today_jalali.month == 12: next_month_j = jdatetime.date(today_jalali.year + 1, 1, 1) else: next_month_j = jdatetime.date(today_jalali.year, today_jalali.month + 1, 1) return start_j.togregorian(), next_month_j.togregorian() - timedelta(days=1) if period == PERIOD_THIS_YEAR: start = jdatetime.date(today_jalali.year, 1, 1).togregorian() if jdatetime.date(today_jalali.year, 1, 1).isleap(): end = jdatetime.date(today_jalali.year, 12, 30).togregorian() else: end = jdatetime.date(today_jalali.year, 12, 29).togregorian() return start, end if period == PERIOD_HALF_YEAR_FIRST: start = jdatetime.date(today_jalali.year, 1, 1).togregorian() end = jdatetime.date(today_jalali.year, 6, 31).togregorian() return start, end if period == PERIOD_HALF_YEAR_SECOND: start = jdatetime.date(today_jalali.year, 7, 1).togregorian() if jdatetime.date(today_jalali.year, 1, 1).isleap(): end = jdatetime.date(today_jalali.year, 12, 30).togregorian() else: end = jdatetime.date(today_jalali.year, 12, 29).togregorian() return start, end if period == PERIOD_THIS_WEEK: start = _start_of_week(today) return start, start + timedelta(days=6) if period == PERIOD_THIS_MONTH: start = today.replace(day=1) if start.month == 12: next_month = start.replace(year=start.year + 1, month=1, day=1) else: next_month = start.replace(month=start.month + 1, day=1) return start, next_month - timedelta(days=1) if period == PERIOD_THIS_YEAR: return date(today.year, 1, 1), date(today.year, 12, 31) if period == PERIOD_HALF_YEAR_FIRST: return date(today.year, 1, 1), date(today.year, 6, 30) if period == PERIOD_HALF_YEAR_SECOND: return date(today.year, 7, 1), date(today.year, 12, 31) if period == PERIOD_CUSTOM: if not from_date or not to_date: raise serializers.ValidationError("Custom period requires from_date and to_date.") if from_date > to_date: raise serializers.ValidationError("from_date cannot be after to_date.") if (to_date - from_date).days > 30: raise serializers.ValidationError("Custom period cannot exceed 31 days.") return from_date, to_date raise serializers.ValidationError("Unsupported report period.") def load_report_filters(actor, raw_data) -> ReportFilters: normalized = { "workspace": raw_data.get("workspace"), "period": raw_data.get("period", PERIOD_THIS_MONTH), "from_date": raw_data.get("from_date") or raw_data.get("from"), "to_date": raw_data.get("to_date") or raw_data.get("to"), "user": raw_data.get("user"), "client": raw_data.get("client"), "project": raw_data.get("project"), "tags": raw_data.get("tags") or raw_data.getlist("tags") if hasattr(raw_data, "getlist") else raw_data.get("tags"), "language": raw_data.get("language", "en"), } if normalized["tags"] and not isinstance(normalized["tags"], list): normalized["tags"] = [normalized["tags"]] serializer = ReportFilterSerializer(data=normalized) serializer.is_valid(raise_exception=True) validated = serializer.validated_data workspace = Workspace.objects.filter(id=validated["workspace"]).first() if not workspace: raise serializers.ValidationError("Workspace not found.") if not has_workspace_capability(actor, workspace, WORKSPACE_VIEW): raise serializers.ValidationError("You do not have access to this workspace.") from_date, to_date = _resolve_period_bounds( validated["period"], validated.get("from_date"), validated.get("to_date"), language=validated.get("language", "en"), ) role = get_workspace_role(actor, workspace) is_workspace_scope = role in {"owner", "admin"} requested_user_id = str(validated["user"]) if validated.get("user") else None if requested_user_id and not is_workspace_scope and requested_user_id != str(actor.id): raise serializers.ValidationError("You cannot view another user's report.") user_id = requested_user_id if is_workspace_scope else str(actor.id) client_id = str(validated["client"]) if validated.get("client") else None project_id = str(validated["project"]) if validated.get("project") else None tag_ids = [str(tag_id) for tag_id in validated.get("tags", [])] if client_id and not Client.objects.filter(id=client_id, workspace=workspace).exists(): raise serializers.ValidationError("Client does not belong to this workspace.") if project_id and not Project.objects.filter(id=project_id, workspace=workspace).exists(): raise serializers.ValidationError("Project does not belong to this workspace.") if tag_ids: existing_tag_ids = set(Tag.objects.filter(id__in=tag_ids, workspace=workspace).values_list("id", flat=True)) if len(existing_tag_ids) != len(tag_ids): raise serializers.ValidationError("One or more tags do not belong to this workspace.") return ReportFilters( workspace=workspace, period=validated["period"], from_date=from_date, to_date=to_date, user_id=user_id, client_id=client_id, project_id=project_id, tag_ids=tag_ids, actor=actor, is_workspace_scope=is_workspace_scope, language=validated.get("language", "en"), ) def _base_queryset(filters: ReportFilters) -> QuerySet[TimeEntry]: start_dt = timezone.make_aware(datetime.combine(filters.from_date, time.min), timezone.get_current_timezone()) end_dt = timezone.make_aware(datetime.combine(filters.to_date + timedelta(days=1), time.min), timezone.get_current_timezone()) queryset = ( TimeEntry.objects.filter( workspace=filters.workspace, is_deleted=False, start_time__gte=start_dt, start_time__lt=end_dt, ) .select_related("project", "project__client", "workspace", "user") .prefetch_related(Prefetch("tags", queryset=Tag.objects.filter(is_deleted=False))) .order_by("-start_time", "-created_at") .distinct() ) if filters.user_id: queryset = queryset.filter(user_id=filters.user_id) if filters.client_id: queryset = queryset.filter(project__client_id=filters.client_id) if filters.project_id: queryset = queryset.filter(project_id=filters.project_id) if filters.tag_ids: queryset = queryset.filter(tags__id__in=filters.tag_ids) return queryset.distinct() def _summary_from_entries(entries: list[TimeEntry]) -> dict: total_seconds = 0 billable_seconds = 0 non_billable_seconds = 0 income_by_currency = _money_map() for entry in entries: duration_seconds = get_entry_duration_seconds(entry) total_seconds += duration_seconds if entry.is_billable: billable_seconds += duration_seconds else: non_billable_seconds += duration_seconds _add_income(income_by_currency, entry) return { "total_seconds": total_seconds, "billable_seconds": billable_seconds, "non_billable_seconds": non_billable_seconds, "total_duration": _format_duration_seconds(total_seconds), "billable_duration": _format_duration_seconds(billable_seconds), "non_billable_duration": _format_duration_seconds(non_billable_seconds), "income_totals": _serialize_money_totals(income_by_currency), } def _entry_payload(entry: TimeEntry) -> dict: local_start = _localize_datetime(entry.start_time) local_end = _localize_datetime(entry.end_time) if entry.end_time else None duration_seconds = get_entry_duration_seconds(entry) income_by_currency = _money_map() _add_income(income_by_currency, entry) return { "id": str(entry.id), "description": entry.description, "user": { "id": str(entry.user_id), "name": _user_display(entry.user), "mobile": entry.user.mobile, }, "project": ( { "id": str(entry.project_id), "name": entry.project.name, "client": ( {"id": str(entry.project.client_id), "name": entry.project.client.name} if entry.project and entry.project.client else None ), } if entry.project else None ), "tags": [{"id": str(tag.id), "name": tag.name, "color": tag.color} for tag in entry.tags.all()], "start_time": local_start.isoformat(), "end_time": local_end.isoformat() if local_end else None, "duration_seconds": duration_seconds, "duration": _format_duration_seconds(duration_seconds), "is_billable": entry.is_billable, "hourly_rate": str(entry.hourly_rate) if entry.hourly_rate is not None else None, "currency": entry.currency, "income_totals": _serialize_money_totals(income_by_currency), } def _bucket_label(filters: ReportFilters, bucket_date: date) -> str: if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}: return bucket_date.isoformat() if filters.language == "fa": persian_date = jdatetime.date.fromgregorian(date=bucket_date) return f"{persian_date.year:04d}-{persian_date.month:02d}" return bucket_date.strftime("%Y-%m") def _bucket_key(filters: ReportFilters, local_dt: datetime) -> tuple[str, date]: if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}: bucket_date = local_dt.date() return bucket_date.isoformat(), bucket_date if filters.language == "fa": persian_date = jdatetime.date.fromgregorian(date=local_dt.date()) return f"{persian_date.year:04d}-{persian_date.month:02d}", local_dt.date() bucket_date = date(local_dt.year, local_dt.month, 1) return bucket_date.strftime("%Y-%m"), bucket_date def build_chart_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) entries = list(_base_queryset(filters)) summary = _summary_from_entries(entries) grouped_entries: dict[str | None, list[TimeEntry]] = defaultdict(list) if filters.is_workspace_scope and not filters.user_id: for entry in entries: grouped_entries[str(entry.user_id)].append(entry) else: grouped_entries[filters.user_id] = entries serialized_series = [] for _, series_entries in sorted( grouped_entries.items(), key=lambda item: _user_display(item[1][0].user).lower() if item[1] else "", ): if not series_entries: continue buckets: dict[str, dict] = {} for entry in series_entries: local_start = _localize_datetime(entry.start_time) bucket_id, bucket_date = _bucket_key(filters, local_start) bucket = buckets.setdefault( bucket_id, { "bucket_key": bucket_id, "bucket_label": _bucket_label(filters, bucket_date), "total_seconds": 0, "total_duration": "00:00:00", }, ) bucket["total_seconds"] += get_entry_duration_seconds(entry) serialized_buckets = [] for bucket in sorted(buckets.values(), key=lambda item: item["bucket_key"]): bucket["total_duration"] = _format_duration_seconds(bucket["total_seconds"]) serialized_buckets.append(bucket) user = series_entries[0].user serialized_series.append( { "user": { "id": str(user.id), "name": _user_display(user), "mobile": user.mobile, }, "buckets": serialized_buckets, } ) return { "scope": _scope_payload(filters), "summary": summary, "series": serialized_series, } def _scope_payload(filters: ReportFilters) -> dict: user_payload = None if filters.user_id: target_user = User.objects.filter(id=filters.user_id).first() if target_user: user_payload = { "id": str(target_user.id), "name": _user_display(target_user), "mobile": target_user.mobile, } return { "workspace": { "id": str(filters.workspace.id), "name": filters.workspace.name, "thumbnail_path": filters.workspace.thumbnail.path if getattr(filters.workspace, "thumbnail", None) else None, }, "period": filters.period, "from_date": filters.from_date.isoformat(), "to_date": filters.to_date.isoformat(), "user": user_payload, "is_workspace_scope": filters.is_workspace_scope and not filters.user_id, "filters": { "client_id": filters.client_id, "project_id": filters.project_id, "tag_ids": filters.tag_ids, }, } def _table_report_payload(filters: ReportFilters, entries: list[TimeEntry]) -> dict: summary = _summary_from_entries(entries) include_latest_rate = not (filters.is_workspace_scope and not filters.user_id) return { "scope": _scope_payload(filters), "summary": summary, "days": _group_daily(entries, include_latest_rate=include_latest_rate), "clients": _build_breakdown(entries, "clients"), "projects": _build_breakdown(entries, "projects"), "tags": _build_breakdown(entries, "tags"), } def _group_daily(entries: list[TimeEntry], *, include_latest_rate: bool) -> list[dict]: by_day: dict[str, dict] = {} for entry in entries: local_start = _localize_datetime(entry.start_time) day_key = local_start.date().isoformat() day_bucket = by_day.setdefault( day_key, { "date": day_key, "billable_seconds": 0, "non_billable_seconds": 0, "total_seconds": 0, "income": _money_map(), "latest_rate_amount": None, "latest_rate_currency": None, "latest_rate_timestamp": None, }, ) duration_seconds = get_entry_duration_seconds(entry) day_bucket["total_seconds"] += duration_seconds if entry.is_billable: day_bucket["billable_seconds"] += duration_seconds else: day_bucket["non_billable_seconds"] += duration_seconds _add_income(day_bucket["income"], entry) if ( include_latest_rate and entry.is_billable and entry.hourly_rate and ( day_bucket["latest_rate_timestamp"] is None or local_start >= day_bucket["latest_rate_timestamp"] ) ): day_bucket["latest_rate_amount"] = Decimal(entry.hourly_rate) day_bucket["latest_rate_currency"] = entry.currency or "USD" day_bucket["latest_rate_timestamp"] = local_start rows = [] for day_key in sorted(by_day.keys()): bucket = by_day[day_key] rows.append( { "date": bucket["date"], "billable_seconds": bucket["billable_seconds"], "non_billable_seconds": bucket["non_billable_seconds"], "total_seconds": bucket["total_seconds"], "billable_duration": _format_duration_seconds(bucket["billable_seconds"]), "non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]), "total_duration": _format_duration_seconds(bucket["total_seconds"]), "latest_hourly_rate": _serialize_rate( bucket["latest_rate_amount"], bucket["latest_rate_currency"], ) if include_latest_rate else None, "income_totals": _serialize_money_totals(bucket["income"]), } ) return rows def _build_breakdown(entries: list[TimeEntry], kind: str) -> list[dict]: data: dict[str, dict] = {} for entry in entries: if kind == "clients": if not entry.project or not entry.project.client: continue item_id = str(entry.project.client_id) item_name = entry.project.client.name elif kind == "projects": if not entry.project: continue item_id = str(entry.project_id) item_name = entry.project.name else: if not entry.tags.exists(): continue for tag in entry.tags.all(): bucket = data.setdefault( str(tag.id), { "id": str(tag.id), "name": tag.name, "billable_seconds": 0, "non_billable_seconds": 0, "total_seconds": 0, "income": _money_map(), }, ) duration_seconds = get_entry_duration_seconds(entry) bucket["total_seconds"] += duration_seconds if entry.is_billable: bucket["billable_seconds"] += duration_seconds else: bucket["non_billable_seconds"] += duration_seconds _add_income(bucket["income"], entry) continue bucket = data.setdefault( item_id, { "id": item_id, "name": item_name, "billable_seconds": 0, "non_billable_seconds": 0, "total_seconds": 0, "income": _money_map(), }, ) duration_seconds = get_entry_duration_seconds(entry) bucket["total_seconds"] += duration_seconds if entry.is_billable: bucket["billable_seconds"] += duration_seconds else: bucket["non_billable_seconds"] += duration_seconds _add_income(bucket["income"], entry) rows = [] for item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()): rows.append( { "id": bucket["id"], "name": bucket["name"], "billable_seconds": bucket["billable_seconds"], "non_billable_seconds": bucket["non_billable_seconds"], "total_seconds": bucket["total_seconds"], "billable_duration": _format_duration_seconds(bucket["billable_seconds"]), "non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]), "total_duration": _format_duration_seconds(bucket["total_seconds"]), "income_totals": _serialize_money_totals(bucket["income"]), } ) return rows def build_table_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) entries = list(_base_queryset(filters)) return _table_report_payload(filters, entries) def build_user_scoped_table_reports(actor, raw_filters) -> list[dict]: filters = load_report_filters(actor, raw_filters) if not (filters.is_workspace_scope and not filters.user_id): return [] entries = list(_base_queryset(filters)) grouped: dict[str, list[TimeEntry]] = {} for entry in entries: grouped.setdefault(str(entry.user_id), []).append(entry) sorted_groups = sorted( grouped.items(), key=lambda item: _user_display(item[1][0].user).lower(), ) reports: list[dict] = [] for user_id, user_entries in sorted_groups: user_filters = replace(filters, user_id=user_id) reports.append(_table_report_payload(user_filters, user_entries)) return reports def build_day_details_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) day_value = raw_filters.get("day") or raw_filters.get("date") target_day = parse_date(day_value) if day_value else None if not target_day: raise serializers.ValidationError("A valid day is required.") if target_day < filters.from_date or target_day > filters.to_date: raise serializers.ValidationError("Requested day is outside the filtered period.") entries = [ entry for entry in _base_queryset(filters) if _localize_datetime(entry.start_time).date() == target_day ] summary = _summary_from_entries(entries) return { "scope": _scope_payload(filters), "day": target_day.isoformat(), "summary": summary, "entries": [_entry_payload(entry) for entry in entries], }