from __future__ import annotations from collections import defaultdict from collections.abc import Iterable from dataclasses import dataclass, replace from datetime import date, datetime, time, timedelta from decimal import ROUND_DOWN, Decimal import jdatetime from django.contrib.auth import get_user_model from django.db.models import Prefetch, QuerySet from django.utils import timezone from django.utils.dateparse import parse_date from rest_framework import serializers from apps.clients.models import Client from apps.projects.models import Project from apps.projects.services.access import user_has_project_access from apps.tags.models import Tag from apps.time_entries.models import TimeEntry from apps.workspaces.models import Workspace from apps.workspaces.models import WorkspaceUserRate from apps.workspaces.services import WORKSPACE_VIEW, get_workspace_role, has_workspace_capability User = get_user_model() PERIOD_THIS_WEEK = "this_week" PERIOD_THIS_MONTH = "this_month" PERIOD_THIS_YEAR = "this_year" PERIOD_HALF_YEAR_FIRST = "half_year_first" PERIOD_HALF_YEAR_SECOND = "half_year_second" PERIOD_CUSTOM = "period" ALLOWED_PERIODS = { PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_THIS_YEAR, PERIOD_HALF_YEAR_FIRST, PERIOD_HALF_YEAR_SECOND, PERIOD_CUSTOM, } UNCATEGORIZED_IDS = { "clients": "__uncategorized_client__", "projects": "__uncategorized_project__", "tags": "__uncategorized_tag__", } UNCATEGORIZED_LABELS = { "en": { "clients": "No client", "projects": "No project", "tags": "No tag", }, "fa": { "clients": "بدون مشتری", "projects": "بدون پروژه", "tags": "بدون تگ", }, } def _start_of_week(local_date: date) -> date: days_since_sunday = (local_date.weekday() + 1) % 7 return local_date - timedelta(days=days_since_sunday) def _start_of_persian_week(local_date: date) -> date: days_since_saturday = (local_date.weekday() + 2) % 7 return local_date - timedelta(days=days_since_saturday) def _localize_datetime(value: datetime) -> datetime: if timezone.is_naive(value): value = timezone.make_aware(value, timezone.get_current_timezone()) return timezone.localtime(value) def _user_display(user) -> str: full_name = getattr(user, "full_name", "").strip() if full_name and full_name != "Anonymous": return full_name return getattr(user, "mobile", str(user)) def _format_duration_seconds(total_seconds: int) -> str: total_seconds = max(int(total_seconds), 0) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def _money_map() -> defaultdict[str, Decimal]: return defaultdict(lambda: Decimal("0.00")) def _serialize_money_totals(values: dict[str, Decimal]) -> list[dict]: return [ {"currency": currency, "amount": f"{amount.quantize(Decimal('0.01'))}"} for currency, amount in sorted(values.items()) if amount is not None ] def _serialize_rate(amount: Decimal | None, currency: str | None) -> dict | None: if amount is None: return None return { "amount": f"{Decimal(amount).quantize(Decimal('0.01'))}", "currency": currency or "USD", } def _serialize_distinct_rates_from_rows(rate_rows: list[dict]) -> list[dict]: unique_rates: set[tuple[str, str]] = set() for row in rate_rows: unique_rates.add((row["amount"], row["currency"])) return [ {"amount": amount, "currency": currency} for amount, currency in sorted(unique_rates, key=lambda item: (item[1], Decimal(item[0]))) ] def _serialize_rate_periods(entries: list[TimeEntry]) -> list[dict]: sorted_entries = sorted(entries, key=lambda entry: (entry.start_time, entry.end_time or entry.start_time, entry.id)) periods: list[dict] = [] current: dict | None = None for entry in sorted_entries: if not entry.hourly_rate or not entry.start_time: continue amount = f"{Decimal(entry.hourly_rate).quantize(Decimal('0.01'))}" currency = entry.currency or "USD" start_date = _localize_datetime(entry.start_time).date() end_source = entry.end_time or entry.start_time end_date = _localize_datetime(end_source).date() if ( current and current["amount"] == amount and current["currency"] == currency ): if end_date > current["to_date"]: current["to_date"] = end_date continue if current: periods.append( { "amount": current["amount"], "currency": current["currency"], "from_date": current["from_date"].isoformat(), "to_date": current["to_date"].isoformat(), } ) current = { "amount": amount, "currency": currency, "from_date": start_date, "to_date": end_date, } if current: periods.append( { "amount": current["amount"], "currency": current["currency"], "from_date": current["from_date"].isoformat(), "to_date": current["to_date"].isoformat(), } ) return periods def _serialize_current_rate_rows(*, user, workspace: Workspace) -> list[dict]: workspace_rate = ( WorkspaceUserRate.objects.filter( workspace=workspace, user=user, is_active=True, is_deleted=False, ) .order_by("-effective_from", "-updated_at") .first() ) if not workspace_rate or not workspace_rate.effective_from: return [] return [ { "amount": f"{Decimal(workspace_rate.hourly_rate).quantize(Decimal('0.01'))}", "currency": workspace_rate.currency or "USD", "from_date": _localize_datetime(workspace_rate.effective_from).date().isoformat(), "to_date": None, "is_current": True, } ] def _merge_rate_history_rows(history_rows: list[dict], current_rows: list[dict]) -> list[dict]: merged = [dict(row) for row in history_rows] latest_indexes = { (row["amount"], row["currency"]): index for index, row in enumerate(merged) } for row in current_rows: key = (row["amount"], row["currency"]) index = latest_indexes.get(key) if index is not None: merged[index]["to_date"] = None continue merged.append(dict(row)) latest_indexes[key] = len(merged) - 1 return sorted( merged, key=lambda item: ( item["from_date"], item["currency"], Decimal(item["amount"]), item.get("to_date") or "9999-12-31", ), ) def _uncategorized_label(kind: str, language: str) -> str: resolved_language = language if language in UNCATEGORIZED_LABELS else "en" return UNCATEGORIZED_LABELS[resolved_language][kind] def _share_bucket(bucket_id: str, name: str) -> dict: return { "id": bucket_id, "name": name, "seconds": Decimal("0"), "income": _money_map(), } def _entry_income_payload(entry: TimeEntry) -> tuple[str, Decimal] | None: if not entry.is_billable or not entry.hourly_rate: return None duration_seconds = get_entry_duration_seconds(entry) if duration_seconds <= 0: return None hourly_rate = Decimal(entry.hourly_rate) income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01")) return entry.currency or "USD", income def _add_money(bucket: defaultdict[str, Decimal], currency: str, amount: Decimal) -> None: bucket[currency] += amount def _breakdown_targets(entry: TimeEntry, kind: str, language: str) -> list[tuple[str, str]]: if kind == "clients": if entry.project and entry.project.client: return [(str(entry.project.client_id), entry.project.client.name)] return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))] if kind == "projects": if entry.project: return [(str(entry.project_id), entry.project.name)] return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))] tags = list(entry.tags.all()) if tags: return [(str(tag.id), tag.name) for tag in tags] return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))] def _accumulate_breakdown_shares(entries: list[TimeEntry], kind: str, *, language: str) -> dict[str, dict]: shares: dict[str, dict] = {} for entry in entries: if not entry.is_billable: continue duration_seconds = get_entry_duration_seconds(entry) if duration_seconds <= 0: continue targets = _breakdown_targets(entry, kind, language) divisor = Decimal(len(targets)) if kind == "tags" and targets else Decimal("1") income_payload = _entry_income_payload(entry) for bucket_id, bucket_name in targets: bucket = shares.setdefault(bucket_id, _share_bucket(bucket_id, bucket_name)) bucket["seconds"] += Decimal(duration_seconds) / divisor if income_payload: currency, amount = income_payload _add_money(bucket["income"], currency, amount / divisor) return shares def _allocate_percentage_rows(items: list[dict], total_value: Decimal) -> list[dict]: if total_value <= 0 or not items: return [] working_rows: list[dict] = [] assigned_total = 0 for item in items: value = Decimal(item["value"]) raw_percentage = (value * Decimal("100") / total_value) if value > 0 else Decimal("0") floored_percentage = int(raw_percentage.quantize(Decimal("1"), rounding=ROUND_DOWN)) assigned_total += floored_percentage working_rows.append( { "id": item["id"], "name": item["name"], "value": value, "percentage": floored_percentage, "remainder": raw_percentage - Decimal(floored_percentage), } ) remaining_points = max(0, 100 - assigned_total) for row in sorted( working_rows, key=lambda item: (-item["remainder"], -item["value"], item["name"].lower(), item["id"]), )[:remaining_points]: row["percentage"] += 1 serialized = [ { "id": row["id"], "name": row["name"], "percentage": str(row["percentage"]), } for row in working_rows ] serialized.sort(key=lambda item: (-Decimal(item["percentage"]), item["name"].lower())) return serialized def _single_currency_amount(income_totals: list[dict]) -> tuple[str | None, Decimal] | None: non_zero_totals: list[tuple[str, Decimal]] = [] for item in income_totals: amount = Decimal(item["amount"]) if amount == 0: continue non_zero_totals.append((item["currency"], amount)) if not non_zero_totals: return None, Decimal("0") currencies = {currency for currency, _ in non_zero_totals} if len(currencies) != 1: return None currency = non_zero_totals[0][0] total_amount = sum((amount for _, amount in non_zero_totals), Decimal("0")) return currency, total_amount def _complete_percentage_rows( rows: list[dict], percentage_rows: list[dict], *, unavailable: bool = False, ) -> list[dict]: if unavailable: return [] existing_ids = {row["id"] for row in percentage_rows} completed = percentage_rows + [ {"id": row["id"], "name": row["name"], "percentage": "0"} for row in rows if row["id"] not in existing_ids ] completed.sort(key=lambda item: (-Decimal(item["percentage"]), item["name"].lower())) return completed def _serialize_time_percentage_rows(rows: list[dict], shares: dict[str, dict]) -> list[dict]: items = [ { "id": bucket["id"], "name": bucket["name"], "value": Decimal(bucket["seconds"]), } for bucket in shares.values() ] total_seconds = sum((item["value"] for item in items), Decimal("0")) return _complete_percentage_rows(rows, _allocate_percentage_rows(items, total_seconds)) def _serialize_income_percentage_rows(rows: list[dict], shares: dict[str, dict]) -> list[dict]: items: list[dict] = [] currencies: set[str] = set() for bucket in shares.values(): income_totals = _serialize_money_totals(bucket["income"]) currency_amount = _single_currency_amount(income_totals) if currency_amount is None: return [] currency, amount = currency_amount if currency: currencies.add(currency) items.append( { "id": bucket["id"], "name": bucket["name"], "value": amount, } ) if len(currencies) > 1: return [] total_income = sum((item["value"] for item in items), Decimal("0")) if total_income <= 0: return [] return _complete_percentage_rows(rows, _allocate_percentage_rows(items, total_income)) def _build_user_summary(user, entries: list[TimeEntry], *, language: str) -> dict: summary = _summary_from_entries(entries) historical_rate_rows = _serialize_rate_periods(entries) current_rate_rows = _serialize_current_rate_rows(user=user, workspace=entries[0].workspace) rate_rows = _merge_rate_history_rows(historical_rate_rows, current_rate_rows) project_rows = _build_breakdown(entries, "projects", language=language) client_rows = _build_breakdown(entries, "clients", language=language) tag_rows = _build_breakdown(entries, "tags", language=language) project_shares = _accumulate_breakdown_shares(entries, "projects", language=language) client_shares = _accumulate_breakdown_shares(entries, "clients", language=language) tag_shares = _accumulate_breakdown_shares(entries, "tags", language=language) return { "user": { "id": str(user.id), "name": _user_display(user), "mobile": user.mobile, }, "hourly_rates": _serialize_distinct_rates_from_rows(rate_rows), "rate_periods": rate_rows, "total_seconds": summary["billable_seconds"], "total_duration": summary["total_duration"], "billable_seconds": summary["billable_seconds"], "billable_duration": summary["billable_duration"], "non_billable_seconds": summary["non_billable_seconds"], "non_billable_duration": summary["non_billable_duration"], "income_totals": summary["income_totals"], "project_percentages": _serialize_time_percentage_rows(project_rows, project_shares), "client_percentages": _serialize_time_percentage_rows(client_rows, client_shares), "tag_percentages": _serialize_time_percentage_rows(tag_rows, tag_shares), "project_income_percentages": _serialize_income_percentage_rows(project_rows, project_shares), "client_income_percentages": _serialize_income_percentage_rows(client_rows, client_shares), "tag_income_percentages": _serialize_income_percentage_rows(tag_rows, tag_shares), } def _build_user_summaries(entries: list[TimeEntry], *, language: str) -> list[dict]: grouped: dict[str, list[TimeEntry]] = defaultdict(list) for entry in entries: grouped[str(entry.user_id)].append(entry) summaries = [ _build_user_summary(grouped_entries[0].user, grouped_entries, language=language) for grouped_entries in grouped.values() if grouped_entries ] summaries.sort(key=lambda item: item["user"]["name"].lower()) return summaries def _build_overall_percentage_payload( entries: list[TimeEntry], *, language: str, rows_by_kind: dict[str, list[dict]], ) -> dict: project_shares = _accumulate_breakdown_shares(entries, "projects", language=language) client_shares = _accumulate_breakdown_shares(entries, "clients", language=language) tag_shares = _accumulate_breakdown_shares(entries, "tags", language=language) return { "project_percentages": _serialize_time_percentage_rows(rows_by_kind["projects"], project_shares), "client_percentages": _serialize_time_percentage_rows(rows_by_kind["clients"], client_shares), "tag_percentages": _serialize_time_percentage_rows(rows_by_kind["tags"], tag_shares), "project_income_percentages": _serialize_income_percentage_rows(rows_by_kind["projects"], project_shares), "client_income_percentages": _serialize_income_percentage_rows(rows_by_kind["clients"], client_shares), "tag_income_percentages": _serialize_income_percentage_rows(rows_by_kind["tags"], tag_shares), } def _add_income(bucket: defaultdict[str, Decimal], entry: TimeEntry): if not entry.is_billable or not entry.hourly_rate: return duration_seconds = get_entry_duration_seconds(entry) if duration_seconds <= 0: return hourly_rate = Decimal(entry.hourly_rate) income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01")) bucket[(entry.currency or "USD")] += income def get_entry_duration_seconds(entry: TimeEntry) -> int: if entry.duration is not None: return max(int(entry.duration.total_seconds()), 0) if entry.end_time: return max(int((entry.end_time - entry.start_time).total_seconds()), 0) return 0 def _parse_id_list(values: Iterable[str]) -> list[str]: return [str(value).strip() for value in values if str(value).strip()] @dataclass class ReportFilters: workspace: Workspace period: str from_date: date to_date: date user_id: str | None client_id: str | None project_id: str | None tag_ids: list[str] actor: object is_workspace_scope: bool language: str class ReportFilterSerializer(serializers.Serializer): workspace = serializers.UUIDField() period = serializers.ChoiceField(choices=sorted(ALLOWED_PERIODS)) from_date = serializers.DateField(required=False, allow_null=True) to_date = serializers.DateField(required=False, allow_null=True) user = serializers.UUIDField(required=False, allow_null=True) client = serializers.UUIDField(required=False, allow_null=True) project = serializers.UUIDField(required=False, allow_null=True) tags = serializers.ListField( child=serializers.UUIDField(), required=False, allow_empty=True, ) language = serializers.ChoiceField(choices=("en", "fa"), required=False, default="en") def _resolve_period_bounds( period: str, from_date: date | None, to_date: date | None, *, language: str, ) -> tuple[date, date]: today = timezone.localdate() if language == "fa": today_jalali = jdatetime.date.fromgregorian(date=today) if period == PERIOD_THIS_WEEK: start = _start_of_persian_week(today) return start, start + timedelta(days=6) if period == PERIOD_THIS_MONTH: start_j = jdatetime.date(today_jalali.year, today_jalali.month, 1) if today_jalali.month == 12: next_month_j = jdatetime.date(today_jalali.year + 1, 1, 1) else: next_month_j = jdatetime.date(today_jalali.year, today_jalali.month + 1, 1) return start_j.togregorian(), next_month_j.togregorian() - timedelta(days=1) if period == PERIOD_THIS_YEAR: start = jdatetime.date(today_jalali.year, 1, 1).togregorian() if jdatetime.date(today_jalali.year, 1, 1).isleap(): end = jdatetime.date(today_jalali.year, 12, 30).togregorian() else: end = jdatetime.date(today_jalali.year, 12, 29).togregorian() return start, end if period == PERIOD_HALF_YEAR_FIRST: start = jdatetime.date(today_jalali.year, 1, 1).togregorian() end = jdatetime.date(today_jalali.year, 6, 31).togregorian() return start, end if period == PERIOD_HALF_YEAR_SECOND: start = jdatetime.date(today_jalali.year, 7, 1).togregorian() if jdatetime.date(today_jalali.year, 1, 1).isleap(): end = jdatetime.date(today_jalali.year, 12, 30).togregorian() else: end = jdatetime.date(today_jalali.year, 12, 29).togregorian() return start, end if period == PERIOD_THIS_WEEK: start = _start_of_week(today) return start, start + timedelta(days=6) if period == PERIOD_THIS_MONTH: start = today.replace(day=1) if start.month == 12: next_month = start.replace(year=start.year + 1, month=1, day=1) else: next_month = start.replace(month=start.month + 1, day=1) return start, next_month - timedelta(days=1) if period == PERIOD_THIS_YEAR: return date(today.year, 1, 1), date(today.year, 12, 31) if period == PERIOD_HALF_YEAR_FIRST: return date(today.year, 1, 1), date(today.year, 6, 30) if period == PERIOD_HALF_YEAR_SECOND: return date(today.year, 7, 1), date(today.year, 12, 31) if period == PERIOD_CUSTOM: if not from_date or not to_date: raise serializers.ValidationError("Custom period requires from_date and to_date.") if from_date > to_date: raise serializers.ValidationError("from_date cannot be after to_date.") if (to_date - from_date).days > 30: raise serializers.ValidationError("Custom period cannot exceed 31 days.") return from_date, to_date raise serializers.ValidationError("Unsupported report period.") def load_report_filters(actor, raw_data) -> ReportFilters: normalized = { "workspace": raw_data.get("workspace"), "period": raw_data.get("period", PERIOD_THIS_MONTH), "from_date": raw_data.get("from_date") or raw_data.get("from"), "to_date": raw_data.get("to_date") or raw_data.get("to"), "user": raw_data.get("user"), "client": raw_data.get("client"), "project": raw_data.get("project"), "tags": ( raw_data.get("tags") or raw_data.getlist("tags") if hasattr(raw_data, "getlist") else raw_data.get("tags") ), "language": raw_data.get("language", "en"), } if normalized["tags"] and not isinstance(normalized["tags"], list): normalized["tags"] = [normalized["tags"]] serializer = ReportFilterSerializer(data=normalized) serializer.is_valid(raise_exception=True) validated = serializer.validated_data workspace = Workspace.objects.filter(id=validated["workspace"]).first() if not workspace: raise serializers.ValidationError("Workspace not found.") if not has_workspace_capability(actor, workspace, WORKSPACE_VIEW): raise serializers.ValidationError("You do not have access to this workspace.") from_date, to_date = _resolve_period_bounds( validated["period"], validated.get("from_date"), validated.get("to_date"), language=validated.get("language", "en"), ) role = get_workspace_role(actor, workspace) is_workspace_scope = role in {"owner", "admin"} requested_user_id = str(validated["user"]) if validated.get("user") else None if requested_user_id and not is_workspace_scope and requested_user_id != str(actor.id): raise serializers.ValidationError("You cannot view another user's report.") user_id = requested_user_id if is_workspace_scope else str(actor.id) client_id = str(validated["client"]) if validated.get("client") else None project_id = str(validated["project"]) if validated.get("project") else None tag_ids = [str(tag_id) for tag_id in validated.get("tags", [])] if client_id and not Client.objects.filter(id=client_id, workspace=workspace).exists(): raise serializers.ValidationError("Client does not belong to this workspace.") if project_id and not Project.objects.filter(id=project_id, workspace=workspace).exists(): raise serializers.ValidationError("Project does not belong to this workspace.") if project_id and not is_workspace_scope: project = Project.objects.filter(id=project_id, workspace=workspace).first() if project and not user_has_project_access(actor, project): raise serializers.ValidationError("Project does not belong to this workspace.") if tag_ids: existing_tag_ids = set(Tag.objects.filter(id__in=tag_ids, workspace=workspace).values_list("id", flat=True)) if len(existing_tag_ids) != len(tag_ids): raise serializers.ValidationError("One or more tags do not belong to this workspace.") return ReportFilters( workspace=workspace, period=validated["period"], from_date=from_date, to_date=to_date, user_id=user_id, client_id=client_id, project_id=project_id, tag_ids=tag_ids, actor=actor, is_workspace_scope=is_workspace_scope, language=validated.get("language", "en"), ) def _base_queryset(filters: ReportFilters) -> QuerySet[TimeEntry]: current_timezone = timezone.get_current_timezone() start_dt = timezone.make_aware( datetime.combine(filters.from_date, time.min), current_timezone, ) end_dt = timezone.make_aware( datetime.combine(filters.to_date + timedelta(days=1), time.min), current_timezone, ) queryset = ( TimeEntry.objects.filter( workspace=filters.workspace, is_deleted=False, start_time__gte=start_dt, start_time__lt=end_dt, ) .select_related("project", "project__client", "workspace", "user") .prefetch_related(Prefetch("tags", queryset=Tag.objects.filter(is_deleted=False))) .order_by("-start_time", "-created_at") .distinct() ) if filters.user_id: queryset = queryset.filter(user_id=filters.user_id) if filters.client_id: queryset = queryset.filter(project__client_id=filters.client_id) if filters.project_id: queryset = queryset.filter(project_id=filters.project_id) if filters.tag_ids: queryset = queryset.filter(tags__id__in=filters.tag_ids) return queryset.distinct() def _summary_from_entries(entries: list[TimeEntry]) -> dict: total_seconds = 0 billable_seconds = 0 non_billable_seconds = 0 income_by_currency = _money_map() for entry in entries: duration_seconds = get_entry_duration_seconds(entry) total_seconds += duration_seconds if entry.is_billable: billable_seconds += duration_seconds else: non_billable_seconds += duration_seconds _add_income(income_by_currency, entry) return { "total_seconds": total_seconds, "billable_seconds": billable_seconds, "non_billable_seconds": non_billable_seconds, "total_duration": _format_duration_seconds(total_seconds), "billable_duration": _format_duration_seconds(billable_seconds), "non_billable_duration": _format_duration_seconds(non_billable_seconds), "income_totals": _serialize_money_totals(income_by_currency), } def _entry_payload(entry: TimeEntry) -> dict: local_start = _localize_datetime(entry.start_time) local_end = _localize_datetime(entry.end_time) if entry.end_time else None duration_seconds = get_entry_duration_seconds(entry) income_by_currency = _money_map() _add_income(income_by_currency, entry) return { "id": str(entry.id), "description": entry.description, "user": { "id": str(entry.user_id), "name": _user_display(entry.user), "mobile": entry.user.mobile, }, "project": ( { "id": str(entry.project_id), "name": entry.project.name, "client": ( {"id": str(entry.project.client_id), "name": entry.project.client.name} if entry.project and entry.project.client else None ), } if entry.project else None ), "tags": [{"id": str(tag.id), "name": tag.name, "color": tag.color} for tag in entry.tags.all()], "start_time": local_start.isoformat(), "end_time": local_end.isoformat() if local_end else None, "duration_seconds": duration_seconds, "duration": _format_duration_seconds(duration_seconds), "is_billable": entry.is_billable, "hourly_rate": str(entry.hourly_rate) if entry.hourly_rate is not None else None, "currency": entry.currency, "income_totals": _serialize_money_totals(income_by_currency), } def _bucket_label(filters: ReportFilters, bucket_date: date) -> str: if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}: return bucket_date.isoformat() if filters.language == "fa": persian_date = jdatetime.date.fromgregorian(date=bucket_date) return f"{persian_date.year:04d}-{persian_date.month:02d}" return bucket_date.strftime("%Y-%m") def _bucket_key(filters: ReportFilters, local_dt: datetime) -> tuple[str, date]: if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}: bucket_date = local_dt.date() return bucket_date.isoformat(), bucket_date if filters.language == "fa": persian_date = jdatetime.date.fromgregorian(date=local_dt.date()) return f"{persian_date.year:04d}-{persian_date.month:02d}", local_dt.date() bucket_date = date(local_dt.year, local_dt.month, 1) return bucket_date.strftime("%Y-%m"), bucket_date def build_chart_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) entries = list(_base_queryset(filters)) summary = _summary_from_entries(entries) grouped_entries: dict[str | None, list[TimeEntry]] = defaultdict(list) if filters.is_workspace_scope and not filters.user_id: for entry in entries: grouped_entries[str(entry.user_id)].append(entry) else: grouped_entries[filters.user_id] = entries serialized_series = [] for _, series_entries in sorted( grouped_entries.items(), key=lambda item: _user_display(item[1][0].user).lower() if item[1] else "", ): if not series_entries: continue buckets: dict[str, dict] = {} for entry in series_entries: local_start = _localize_datetime(entry.start_time) bucket_id, bucket_date = _bucket_key(filters, local_start) bucket = buckets.setdefault( bucket_id, { "bucket_key": bucket_id, "bucket_label": _bucket_label(filters, bucket_date), "total_seconds": 0, "total_duration": "00:00:00", }, ) bucket["total_seconds"] += get_entry_duration_seconds(entry) serialized_buckets = [] for bucket in sorted(buckets.values(), key=lambda item: item["bucket_key"]): bucket["total_duration"] = _format_duration_seconds(bucket["total_seconds"]) serialized_buckets.append(bucket) user = series_entries[0].user serialized_series.append( { "user": { "id": str(user.id), "name": _user_display(user), "mobile": user.mobile, }, "buckets": serialized_buckets, } ) return { "scope": _scope_payload(filters), "summary": summary, "series": serialized_series, } def _scope_payload(filters: ReportFilters) -> dict: user_payload = None if filters.user_id: target_user = User.objects.filter(id=filters.user_id).first() if target_user: user_payload = { "id": str(target_user.id), "name": _user_display(target_user), "mobile": target_user.mobile, } return { "workspace": { "id": str(filters.workspace.id), "name": filters.workspace.name, "thumbnail_path": ( filters.workspace.thumbnail.path if getattr(filters.workspace, "thumbnail", None) else None ), }, "period": filters.period, "from_date": filters.from_date.isoformat(), "to_date": filters.to_date.isoformat(), "user": user_payload, "is_workspace_scope": filters.is_workspace_scope and not filters.user_id, "filters": { "client_id": filters.client_id, "project_id": filters.project_id, "tag_ids": filters.tag_ids, }, } def _table_report_payload( filters: ReportFilters, entries: list[TimeEntry], *, user_summary: dict | None = None, user_summaries: list[dict] | None = None, ) -> dict: summary = _summary_from_entries(entries) include_latest_rate = not (filters.is_workspace_scope and not filters.user_id) client_rows = _build_breakdown(entries, "clients", language=filters.language) project_rows = _build_breakdown(entries, "projects", language=filters.language) tag_rows = _build_breakdown(entries, "tags", language=filters.language) payload = { "scope": _scope_payload(filters), "summary": summary, "days": _group_daily(entries, include_latest_rate=include_latest_rate), "clients": client_rows, "projects": project_rows, "tags": tag_rows, } if filters.is_workspace_scope and not filters.user_id: payload.update( _build_overall_percentage_payload( entries, language=filters.language, rows_by_kind={ "clients": client_rows, "projects": project_rows, "tags": tag_rows, }, ) ) if user_summary is not None: payload["user_summary"] = user_summary if user_summaries is not None: payload["user_summaries"] = user_summaries return payload def _group_daily(entries: list[TimeEntry], *, include_latest_rate: bool) -> list[dict]: by_day: dict[str, dict] = {} for entry in entries: local_start = _localize_datetime(entry.start_time) day_key = local_start.date().isoformat() day_bucket = by_day.setdefault( day_key, { "date": day_key, "billable_seconds": 0, "non_billable_seconds": 0, "total_seconds": 0, "income": _money_map(), "latest_rate_amount": None, "latest_rate_currency": None, "latest_rate_timestamp": None, }, ) duration_seconds = get_entry_duration_seconds(entry) day_bucket["total_seconds"] += duration_seconds if entry.is_billable: day_bucket["billable_seconds"] += duration_seconds else: day_bucket["non_billable_seconds"] += duration_seconds _add_income(day_bucket["income"], entry) if ( include_latest_rate and entry.is_billable and entry.hourly_rate and ( day_bucket["latest_rate_timestamp"] is None or local_start >= day_bucket["latest_rate_timestamp"] ) ): day_bucket["latest_rate_amount"] = Decimal(entry.hourly_rate) day_bucket["latest_rate_currency"] = entry.currency or "USD" day_bucket["latest_rate_timestamp"] = local_start rows = [] for day_key in sorted(by_day.keys()): bucket = by_day[day_key] rows.append( { "date": bucket["date"], "billable_seconds": bucket["billable_seconds"], "non_billable_seconds": bucket["non_billable_seconds"], "total_seconds": bucket["total_seconds"], "billable_duration": _format_duration_seconds(bucket["billable_seconds"]), "non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]), "total_duration": _format_duration_seconds(bucket["total_seconds"]), "latest_hourly_rate": _serialize_rate( bucket["latest_rate_amount"], bucket["latest_rate_currency"], ) if include_latest_rate else None, "income_totals": _serialize_money_totals(bucket["income"]), } ) return rows def _build_breakdown(entries: list[TimeEntry], kind: str, *, language: str) -> list[dict]: data: dict[str, dict] = {} for entry in entries: duration_seconds = get_entry_duration_seconds(entry) for item_id, item_name in _breakdown_targets(entry, kind, language): bucket = data.setdefault( item_id, { "id": item_id, "name": item_name, "billable_seconds": 0, "non_billable_seconds": 0, "total_seconds": 0, "income": _money_map(), }, ) bucket["total_seconds"] += duration_seconds if entry.is_billable: bucket["billable_seconds"] += duration_seconds else: bucket["non_billable_seconds"] += duration_seconds _add_income(bucket["income"], entry) rows = [] for _item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()): rows.append( { "id": bucket["id"], "name": bucket["name"], "billable_seconds": bucket["billable_seconds"], "non_billable_seconds": bucket["non_billable_seconds"], "total_seconds": bucket["total_seconds"], "billable_duration": _format_duration_seconds(bucket["billable_seconds"]), "non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]), "total_duration": _format_duration_seconds(bucket["total_seconds"]), "income_totals": _serialize_money_totals(bucket["income"]), } ) return rows def build_table_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) entries = list(_base_queryset(filters)) if filters.is_workspace_scope and not filters.user_id: payload = _table_report_payload( filters, entries, user_summaries=_build_user_summaries(entries, language=filters.language), ) return payload user_summary = ( _build_user_summary(entries[0].user, entries, language=filters.language) if entries and filters.user_id else None ) return _table_report_payload(filters, entries, user_summary=user_summary) def build_user_summary_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) if not filters.user_id: raise serializers.ValidationError("A user is required.") entries = list(_base_queryset(filters)) user_summary = ( _build_user_summary(entries[0].user, entries, language=filters.language) if entries else None ) return _table_report_payload(filters, entries, user_summary=user_summary) def build_user_scoped_table_reports(actor, raw_filters) -> list[dict]: filters = load_report_filters(actor, raw_filters) if not (filters.is_workspace_scope and not filters.user_id): return [] entries = list(_base_queryset(filters)) grouped: dict[str, list[TimeEntry]] = {} for entry in entries: grouped.setdefault(str(entry.user_id), []).append(entry) sorted_groups = sorted( grouped.items(), key=lambda item: _user_display(item[1][0].user).lower(), ) reports: list[dict] = [] for user_id, user_entries in sorted_groups: user_filters = replace(filters, user_id=user_id) reports.append( _table_report_payload( user_filters, user_entries, user_summary=_build_user_summary( user_entries[0].user, user_entries, language=filters.language, ), ) ) return reports def build_day_details_report(actor, raw_filters) -> dict: filters = load_report_filters(actor, raw_filters) day_value = raw_filters.get("day") or raw_filters.get("date") target_day = parse_date(day_value) if day_value else None if not target_day: raise serializers.ValidationError("A valid day is required.") if target_day < filters.from_date or target_day > filters.to_date: raise serializers.ValidationError("Requested day is outside the filtered period.") entries = [ entry for entry in _base_queryset(filters) if _localize_datetime(entry.start_time).date() == target_day ] summary = _summary_from_entries(entries) return { "scope": _scope_payload(filters), "day": target_day.isoformat(), "summary": summary, "entries": [_entry_payload(entry) for entry in entries], }