Files
qlockify-backend-deployment/apps/reports/services/aggregation.py

611 lines
23 KiB
Python

from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, replace
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Iterable
import jdatetime
from django.contrib.auth import get_user_model
from django.db.models import Prefetch, QuerySet
from django.utils import timezone
from django.utils.dateparse import parse_date
from rest_framework import serializers
from apps.clients.models import Client
from apps.projects.models import Project
from apps.tags.models import Tag
from apps.time_entries.models import TimeEntry
from apps.workspaces.models import Workspace
from apps.workspaces.services import WORKSPACE_VIEW, get_workspace_role, has_workspace_capability
User = get_user_model()
PERIOD_THIS_WEEK = "this_week"
PERIOD_THIS_MONTH = "this_month"
PERIOD_THIS_YEAR = "this_year"
PERIOD_HALF_YEAR_FIRST = "half_year_first"
PERIOD_HALF_YEAR_SECOND = "half_year_second"
PERIOD_CUSTOM = "period"
ALLOWED_PERIODS = {
PERIOD_THIS_WEEK,
PERIOD_THIS_MONTH,
PERIOD_THIS_YEAR,
PERIOD_HALF_YEAR_FIRST,
PERIOD_HALF_YEAR_SECOND,
PERIOD_CUSTOM,
}
def _start_of_week(local_date: date) -> date:
days_since_sunday = (local_date.weekday() + 1) % 7
return local_date - timedelta(days=days_since_sunday)
def _start_of_persian_week(local_date: date) -> date:
days_since_saturday = (local_date.weekday() + 2) % 7
return local_date - timedelta(days=days_since_saturday)
def _localize_datetime(value: datetime) -> datetime:
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return timezone.localtime(value)
def _user_display(user) -> str:
full_name = getattr(user, "full_name", "").strip()
if full_name and full_name != "Anonymous":
return full_name
return getattr(user, "mobile", str(user))
def _format_duration_seconds(total_seconds: int) -> str:
total_seconds = max(int(total_seconds), 0)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _money_map() -> defaultdict[str, Decimal]:
return defaultdict(lambda: Decimal("0.00"))
def _serialize_money_totals(values: dict[str, Decimal]) -> list[dict]:
return [
{"currency": currency, "amount": f"{amount.quantize(Decimal('0.01'))}"}
for currency, amount in sorted(values.items())
if amount is not None
]
def _add_income(bucket: defaultdict[str, Decimal], entry: TimeEntry):
if not entry.is_billable or not entry.hourly_rate:
return
duration_seconds = get_entry_duration_seconds(entry)
if duration_seconds <= 0:
return
hourly_rate = Decimal(entry.hourly_rate)
income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01"))
bucket[(entry.currency or "USD")] += income
def get_entry_duration_seconds(entry: TimeEntry) -> int:
if entry.duration is not None:
return max(int(entry.duration.total_seconds()), 0)
if entry.end_time:
return max(int((entry.end_time - entry.start_time).total_seconds()), 0)
return 0
def _parse_id_list(values: Iterable[str]) -> list[str]:
return [str(value).strip() for value in values if str(value).strip()]
@dataclass
class ReportFilters:
workspace: Workspace
period: str
from_date: date
to_date: date
user_id: str | None
client_id: str | None
project_id: str | None
tag_ids: list[str]
actor: object
is_workspace_scope: bool
language: str
class ReportFilterSerializer(serializers.Serializer):
workspace = serializers.UUIDField()
period = serializers.ChoiceField(choices=sorted(ALLOWED_PERIODS))
from_date = serializers.DateField(required=False, allow_null=True)
to_date = serializers.DateField(required=False, allow_null=True)
user = serializers.UUIDField(required=False, allow_null=True)
client = serializers.UUIDField(required=False, allow_null=True)
project = serializers.UUIDField(required=False, allow_null=True)
tags = serializers.ListField(
child=serializers.UUIDField(),
required=False,
allow_empty=True,
)
language = serializers.ChoiceField(choices=("en", "fa"), required=False, default="en")
def _resolve_period_bounds(period: str, from_date: date | None, to_date: date | None, *, language: str) -> tuple[date, date]:
today = timezone.localdate()
if language == "fa":
today_jalali = jdatetime.date.fromgregorian(date=today)
if period == PERIOD_THIS_WEEK:
start = _start_of_persian_week(today)
return start, start + timedelta(days=6)
if period == PERIOD_THIS_MONTH:
start_j = jdatetime.date(today_jalali.year, today_jalali.month, 1)
if today_jalali.month == 12:
next_month_j = jdatetime.date(today_jalali.year + 1, 1, 1)
else:
next_month_j = jdatetime.date(today_jalali.year, today_jalali.month + 1, 1)
return start_j.togregorian(), next_month_j.togregorian() - timedelta(days=1)
if period == PERIOD_THIS_YEAR:
start = jdatetime.date(today_jalali.year, 1, 1).togregorian()
if jdatetime.date(today_jalali.year, 1, 1).isleap():
end = jdatetime.date(today_jalali.year, 12, 30).togregorian()
else:
end = jdatetime.date(today_jalali.year, 12, 29).togregorian()
return start, end
if period == PERIOD_HALF_YEAR_FIRST:
start = jdatetime.date(today_jalali.year, 1, 1).togregorian()
end = jdatetime.date(today_jalali.year, 6, 31).togregorian()
return start, end
if period == PERIOD_HALF_YEAR_SECOND:
start = jdatetime.date(today_jalali.year, 7, 1).togregorian()
if jdatetime.date(today_jalali.year, 1, 1).isleap():
end = jdatetime.date(today_jalali.year, 12, 30).togregorian()
else:
end = jdatetime.date(today_jalali.year, 12, 29).togregorian()
return start, end
if period == PERIOD_THIS_WEEK:
start = _start_of_week(today)
return start, start + timedelta(days=6)
if period == PERIOD_THIS_MONTH:
start = today.replace(day=1)
if start.month == 12:
next_month = start.replace(year=start.year + 1, month=1, day=1)
else:
next_month = start.replace(month=start.month + 1, day=1)
return start, next_month - timedelta(days=1)
if period == PERIOD_THIS_YEAR:
return date(today.year, 1, 1), date(today.year, 12, 31)
if period == PERIOD_HALF_YEAR_FIRST:
return date(today.year, 1, 1), date(today.year, 6, 30)
if period == PERIOD_HALF_YEAR_SECOND:
return date(today.year, 7, 1), date(today.year, 12, 31)
if period == PERIOD_CUSTOM:
if not from_date or not to_date:
raise serializers.ValidationError("Custom period requires from_date and to_date.")
if from_date > to_date:
raise serializers.ValidationError("from_date cannot be after to_date.")
if (to_date - from_date).days > 30:
raise serializers.ValidationError("Custom period cannot exceed 31 days.")
return from_date, to_date
raise serializers.ValidationError("Unsupported report period.")
def load_report_filters(actor, raw_data) -> ReportFilters:
normalized = {
"workspace": raw_data.get("workspace"),
"period": raw_data.get("period", PERIOD_THIS_MONTH),
"from_date": raw_data.get("from_date") or raw_data.get("from"),
"to_date": raw_data.get("to_date") or raw_data.get("to"),
"user": raw_data.get("user"),
"client": raw_data.get("client"),
"project": raw_data.get("project"),
"tags": raw_data.get("tags") or raw_data.getlist("tags") if hasattr(raw_data, "getlist") else raw_data.get("tags"),
"language": raw_data.get("language", "en"),
}
if normalized["tags"] and not isinstance(normalized["tags"], list):
normalized["tags"] = [normalized["tags"]]
serializer = ReportFilterSerializer(data=normalized)
serializer.is_valid(raise_exception=True)
validated = serializer.validated_data
workspace = Workspace.objects.filter(id=validated["workspace"]).first()
if not workspace:
raise serializers.ValidationError("Workspace not found.")
if not has_workspace_capability(actor, workspace, WORKSPACE_VIEW):
raise serializers.ValidationError("You do not have access to this workspace.")
from_date, to_date = _resolve_period_bounds(
validated["period"],
validated.get("from_date"),
validated.get("to_date"),
language=validated.get("language", "en"),
)
role = get_workspace_role(actor, workspace)
is_workspace_scope = role in {"owner", "admin"}
requested_user_id = str(validated["user"]) if validated.get("user") else None
if requested_user_id and not is_workspace_scope and requested_user_id != str(actor.id):
raise serializers.ValidationError("You cannot view another user's report.")
user_id = requested_user_id if is_workspace_scope else str(actor.id)
client_id = str(validated["client"]) if validated.get("client") else None
project_id = str(validated["project"]) if validated.get("project") else None
tag_ids = [str(tag_id) for tag_id in validated.get("tags", [])]
if client_id and not Client.objects.filter(id=client_id, workspace=workspace).exists():
raise serializers.ValidationError("Client does not belong to this workspace.")
if project_id and not Project.objects.filter(id=project_id, workspace=workspace).exists():
raise serializers.ValidationError("Project does not belong to this workspace.")
if tag_ids:
existing_tag_ids = set(Tag.objects.filter(id__in=tag_ids, workspace=workspace).values_list("id", flat=True))
if len(existing_tag_ids) != len(tag_ids):
raise serializers.ValidationError("One or more tags do not belong to this workspace.")
return ReportFilters(
workspace=workspace,
period=validated["period"],
from_date=from_date,
to_date=to_date,
user_id=user_id,
client_id=client_id,
project_id=project_id,
tag_ids=tag_ids,
actor=actor,
is_workspace_scope=is_workspace_scope,
language=validated.get("language", "en"),
)
def _base_queryset(filters: ReportFilters) -> QuerySet[TimeEntry]:
start_dt = timezone.make_aware(datetime.combine(filters.from_date, time.min), timezone.get_current_timezone())
end_dt = timezone.make_aware(datetime.combine(filters.to_date + timedelta(days=1), time.min), timezone.get_current_timezone())
queryset = (
TimeEntry.objects.filter(
workspace=filters.workspace,
is_deleted=False,
start_time__gte=start_dt,
start_time__lt=end_dt,
)
.select_related("project", "project__client", "workspace", "user")
.prefetch_related(Prefetch("tags", queryset=Tag.objects.filter(is_deleted=False)))
.order_by("-start_time", "-created_at")
.distinct()
)
if filters.user_id:
queryset = queryset.filter(user_id=filters.user_id)
if filters.client_id:
queryset = queryset.filter(project__client_id=filters.client_id)
if filters.project_id:
queryset = queryset.filter(project_id=filters.project_id)
if filters.tag_ids:
queryset = queryset.filter(tags__id__in=filters.tag_ids)
return queryset.distinct()
def _summary_from_entries(entries: list[TimeEntry]) -> dict:
total_seconds = 0
billable_seconds = 0
non_billable_seconds = 0
income_by_currency = _money_map()
for entry in entries:
duration_seconds = get_entry_duration_seconds(entry)
total_seconds += duration_seconds
if entry.is_billable:
billable_seconds += duration_seconds
else:
non_billable_seconds += duration_seconds
_add_income(income_by_currency, entry)
return {
"total_seconds": total_seconds,
"billable_seconds": billable_seconds,
"non_billable_seconds": non_billable_seconds,
"total_duration": _format_duration_seconds(total_seconds),
"billable_duration": _format_duration_seconds(billable_seconds),
"non_billable_duration": _format_duration_seconds(non_billable_seconds),
"income_totals": _serialize_money_totals(income_by_currency),
}
def _entry_payload(entry: TimeEntry) -> dict:
local_start = _localize_datetime(entry.start_time)
local_end = _localize_datetime(entry.end_time) if entry.end_time else None
duration_seconds = get_entry_duration_seconds(entry)
income_by_currency = _money_map()
_add_income(income_by_currency, entry)
return {
"id": str(entry.id),
"description": entry.description,
"user": {
"id": str(entry.user_id),
"name": _user_display(entry.user),
"mobile": entry.user.mobile,
},
"project": (
{
"id": str(entry.project_id),
"name": entry.project.name,
"client": (
{"id": str(entry.project.client_id), "name": entry.project.client.name}
if entry.project and entry.project.client
else None
),
}
if entry.project
else None
),
"tags": [{"id": str(tag.id), "name": tag.name, "color": tag.color} for tag in entry.tags.all()],
"start_time": local_start.isoformat(),
"end_time": local_end.isoformat() if local_end else None,
"duration_seconds": duration_seconds,
"duration": _format_duration_seconds(duration_seconds),
"is_billable": entry.is_billable,
"hourly_rate": str(entry.hourly_rate) if entry.hourly_rate is not None else None,
"currency": entry.currency,
"income_totals": _serialize_money_totals(income_by_currency),
}
def _bucket_label(filters: ReportFilters, bucket_date: date) -> str:
if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}:
return bucket_date.isoformat()
if filters.language == "fa":
persian_date = jdatetime.date.fromgregorian(date=bucket_date)
return f"{persian_date.year:04d}-{persian_date.month:02d}"
return bucket_date.strftime("%Y-%m")
def _bucket_key(filters: ReportFilters, local_dt: datetime) -> tuple[str, date]:
if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}:
bucket_date = local_dt.date()
return bucket_date.isoformat(), bucket_date
if filters.language == "fa":
persian_date = jdatetime.date.fromgregorian(date=local_dt.date())
return f"{persian_date.year:04d}-{persian_date.month:02d}", local_dt.date()
bucket_date = date(local_dt.year, local_dt.month, 1)
return bucket_date.strftime("%Y-%m"), bucket_date
def build_chart_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
entries = list(_base_queryset(filters))
summary = _summary_from_entries(entries)
buckets: dict[str, dict] = {}
for entry in entries:
local_start = _localize_datetime(entry.start_time)
bucket_id, bucket_date = _bucket_key(filters, local_start)
bucket = buckets.setdefault(
bucket_id,
{
"bucket_key": bucket_id,
"bucket_label": _bucket_label(filters, bucket_date),
"total_seconds": 0,
"total_duration": "00:00:00",
},
)
bucket["total_seconds"] += get_entry_duration_seconds(entry)
serialized_buckets = []
for bucket in sorted(buckets.values(), key=lambda item: item["bucket_key"]):
bucket["total_duration"] = _format_duration_seconds(bucket["total_seconds"])
serialized_buckets.append(bucket)
return {
"scope": _scope_payload(filters),
"summary": summary,
"buckets": serialized_buckets,
}
def _scope_payload(filters: ReportFilters) -> dict:
user_payload = None
if filters.user_id:
target_user = User.objects.filter(id=filters.user_id).first()
if target_user:
user_payload = {
"id": str(target_user.id),
"name": _user_display(target_user),
"mobile": target_user.mobile,
}
return {
"workspace": {"id": str(filters.workspace.id), "name": filters.workspace.name},
"period": filters.period,
"from_date": filters.from_date.isoformat(),
"to_date": filters.to_date.isoformat(),
"user": user_payload,
"is_workspace_scope": filters.is_workspace_scope and not filters.user_id,
"filters": {
"client_id": filters.client_id,
"project_id": filters.project_id,
"tag_ids": filters.tag_ids,
},
}
def _table_report_payload(filters: ReportFilters, entries: list[TimeEntry]) -> dict:
summary = _summary_from_entries(entries)
return {
"scope": _scope_payload(filters),
"summary": summary,
"days": _group_daily(entries),
"clients": _build_breakdown(entries, "clients"),
"projects": _build_breakdown(entries, "projects"),
"tags": _build_breakdown(entries, "tags"),
}
def _group_daily(entries: list[TimeEntry]) -> list[dict]:
by_day: dict[str, dict] = {}
for entry in entries:
local_start = _localize_datetime(entry.start_time)
day_key = local_start.date().isoformat()
day_bucket = by_day.setdefault(
day_key,
{
"date": day_key,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
day_bucket["total_seconds"] += duration_seconds
if entry.is_billable:
day_bucket["billable_seconds"] += duration_seconds
else:
day_bucket["non_billable_seconds"] += duration_seconds
_add_income(day_bucket["income"], entry)
rows = []
for day_key in sorted(by_day.keys()):
bucket = by_day[day_key]
rows.append(
{
"date": bucket["date"],
"billable_seconds": bucket["billable_seconds"],
"non_billable_seconds": bucket["non_billable_seconds"],
"total_seconds": bucket["total_seconds"],
"billable_duration": _format_duration_seconds(bucket["billable_seconds"]),
"non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]),
"total_duration": _format_duration_seconds(bucket["total_seconds"]),
"income_totals": _serialize_money_totals(bucket["income"]),
}
)
return rows
def _build_breakdown(entries: list[TimeEntry], kind: str) -> list[dict]:
data: dict[str, dict] = {}
for entry in entries:
if kind == "clients":
if not entry.project or not entry.project.client:
continue
item_id = str(entry.project.client_id)
item_name = entry.project.client.name
elif kind == "projects":
if not entry.project:
continue
item_id = str(entry.project_id)
item_name = entry.project.name
else:
if not entry.tags.exists():
continue
for tag in entry.tags.all():
bucket = data.setdefault(
str(tag.id),
{
"id": str(tag.id),
"name": tag.name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
continue
bucket = data.setdefault(
item_id,
{
"id": item_id,
"name": item_name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
rows = []
for item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()):
rows.append(
{
"id": bucket["id"],
"name": bucket["name"],
"billable_seconds": bucket["billable_seconds"],
"non_billable_seconds": bucket["non_billable_seconds"],
"total_seconds": bucket["total_seconds"],
"billable_duration": _format_duration_seconds(bucket["billable_seconds"]),
"non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]),
"total_duration": _format_duration_seconds(bucket["total_seconds"]),
"income_totals": _serialize_money_totals(bucket["income"]),
}
)
return rows
def build_table_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
entries = list(_base_queryset(filters))
return _table_report_payload(filters, entries)
def build_user_scoped_table_reports(actor, raw_filters) -> list[dict]:
filters = load_report_filters(actor, raw_filters)
if not (filters.is_workspace_scope and not filters.user_id):
return []
entries = list(_base_queryset(filters))
grouped: dict[str, list[TimeEntry]] = {}
for entry in entries:
grouped.setdefault(str(entry.user_id), []).append(entry)
sorted_groups = sorted(
grouped.items(),
key=lambda item: _user_display(item[1][0].user).lower(),
)
reports: list[dict] = []
for user_id, user_entries in sorted_groups:
user_filters = replace(filters, user_id=user_id)
reports.append(_table_report_payload(user_filters, user_entries))
return reports
def build_day_details_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
day_value = raw_filters.get("day") or raw_filters.get("date")
target_day = parse_date(day_value) if day_value else None
if not target_day:
raise serializers.ValidationError("A valid day is required.")
if target_day < filters.from_date or target_day > filters.to_date:
raise serializers.ValidationError("Requested day is outside the filtered period.")
entries = [
entry
for entry in _base_queryset(filters)
if _localize_datetime(entry.start_time).date() == target_day
]
summary = _summary_from_entries(entries)
return {
"scope": _scope_payload(filters),
"day": target_day.isoformat(),
"summary": summary,
"entries": [_entry_payload(entry) for entry in entries],
}