feat(reports): add uncategorized dual-share exports

This commit is contained in:
2026-05-21 19:10:33 +03:30
parent e234eac26d
commit 8d2f876c82
5 changed files with 838 additions and 319 deletions

View File

@@ -1,10 +1,10 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass, replace
from datetime import date, datetime, time, timedelta
from decimal import Decimal, ROUND_HALF_UP
from typing import Iterable
from decimal import ROUND_DOWN, Decimal
import jdatetime
from django.contrib.auth import get_user_model
@@ -39,6 +39,25 @@ ALLOWED_PERIODS = {
PERIOD_CUSTOM,
}
UNCATEGORIZED_IDS = {
"clients": "__uncategorized_client__",
"projects": "__uncategorized_project__",
"tags": "__uncategorized_tag__",
}
UNCATEGORIZED_LABELS = {
"en": {
"clients": "No client",
"projects": "No project",
"tags": "No tag",
},
"fa": {
"clients": "\u0628\u062f\u0648\u0646 \u0645\u0634\u062a\u0631\u06cc",
"projects": "\u0628\u062f\u0648\u0646 \u067e\u0631\u0648\u0698\u0647",
"tags": "\u0628\u062f\u0648\u0646 \u062a\u06af",
},
}
def _start_of_week(local_date: date) -> date:
days_since_sunday = (local_date.weekday() + 1) % 7
@@ -157,62 +176,209 @@ def _serialize_rate_periods(entries: list[TimeEntry]) -> list[dict]:
return periods
def _serialize_percentage_rows(shares: dict[str, dict], total_seconds: int) -> list[dict]:
if total_seconds <= 0:
return []
rows = []
for bucket in shares.values():
percentage = (
Decimal(bucket["seconds"]) * Decimal("100") / Decimal(total_seconds)
).quantize(Decimal("1"), rounding=ROUND_HALF_UP)
rows.append(
{
"id": bucket["id"],
"name": bucket["name"],
"percentage": f"{percentage}",
}
)
rows.sort(key=lambda item: (-Decimal(item["percentage"]), item["name"].lower()))
return rows
def _uncategorized_label(kind: str, language: str) -> str:
resolved_language = language if language in UNCATEGORIZED_LABELS else "en"
return UNCATEGORIZED_LABELS[resolved_language][kind]
def _build_user_summary(user, entries: list[TimeEntry]) -> dict:
summary = _summary_from_entries(entries)
project_shares: dict[str, dict] = {}
client_shares: dict[str, dict] = {}
tag_shares: dict[str, dict] = {}
def _share_bucket(bucket_id: str, name: str) -> dict:
return {
"id": bucket_id,
"name": name,
"seconds": Decimal("0"),
"income": _money_map(),
}
total_seconds = summary["billable_seconds"]
def _entry_income_payload(entry: TimeEntry) -> tuple[str, Decimal] | None:
if not entry.is_billable or not entry.hourly_rate:
return None
duration_seconds = get_entry_duration_seconds(entry)
if duration_seconds <= 0:
return None
hourly_rate = Decimal(entry.hourly_rate)
income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01"))
return entry.currency or "USD", income
def _add_money(bucket: defaultdict[str, Decimal], currency: str, amount: Decimal) -> None:
bucket[currency] += amount
def _breakdown_targets(entry: TimeEntry, kind: str, language: str) -> list[tuple[str, str]]:
if kind == "clients":
if entry.project and entry.project.client:
return [(str(entry.project.client_id), entry.project.client.name)]
return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))]
if kind == "projects":
if entry.project:
return [(str(entry.project_id), entry.project.name)]
return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))]
tags = list(entry.tags.all())
if tags:
return [(str(tag.id), tag.name) for tag in tags]
return [(UNCATEGORIZED_IDS[kind], _uncategorized_label(kind, language))]
def _accumulate_breakdown_shares(entries: list[TimeEntry], kind: str, *, language: str) -> dict[str, dict]:
shares: dict[str, dict] = {}
for entry in entries:
if not entry.is_billable:
continue
duration_seconds = get_entry_duration_seconds(entry)
if duration_seconds <= 0:
continue
if entry.project_id:
project_bucket = project_shares.setdefault(
str(entry.project_id),
{"id": str(entry.project_id), "name": entry.project.name, "seconds": 0},
)
project_bucket["seconds"] += duration_seconds
targets = _breakdown_targets(entry, kind, language)
divisor = Decimal(len(targets)) if kind == "tags" and targets else Decimal("1")
income_payload = _entry_income_payload(entry)
if entry.project and entry.project.client_id:
client_bucket = client_shares.setdefault(
str(entry.project.client_id),
{"id": str(entry.project.client_id), "name": entry.project.client.name, "seconds": 0},
)
client_bucket["seconds"] += duration_seconds
for bucket_id, bucket_name in targets:
bucket = shares.setdefault(bucket_id, _share_bucket(bucket_id, bucket_name))
bucket["seconds"] += Decimal(duration_seconds) / divisor
if income_payload:
currency, amount = income_payload
_add_money(bucket["income"], currency, amount / divisor)
tags = list(entry.tags.all())
if tags:
allocated_seconds = Decimal(duration_seconds) / Decimal(len(tags))
for tag in tags:
tag_bucket = tag_shares.setdefault(
str(tag.id),
{"id": str(tag.id), "name": tag.name, "seconds": Decimal("0")},
)
tag_bucket["seconds"] += allocated_seconds
return shares
def _allocate_percentage_rows(items: list[dict], total_value: Decimal) -> list[dict]:
if total_value <= 0 or not items:
return []
working_rows: list[dict] = []
assigned_total = 0
for item in items:
value = Decimal(item["value"])
raw_percentage = (value * Decimal("100") / total_value) if value > 0 else Decimal("0")
floored_percentage = int(raw_percentage.quantize(Decimal("1"), rounding=ROUND_DOWN))
assigned_total += floored_percentage
working_rows.append(
{
"id": item["id"],
"name": item["name"],
"value": value,
"percentage": floored_percentage,
"remainder": raw_percentage - Decimal(floored_percentage),
}
)
remaining_points = max(0, 100 - assigned_total)
for row in sorted(
working_rows,
key=lambda item: (-item["remainder"], -item["value"], item["name"].lower(), item["id"]),
)[:remaining_points]:
row["percentage"] += 1
serialized = [
{
"id": row["id"],
"name": row["name"],
"percentage": str(row["percentage"]),
}
for row in working_rows
]
serialized.sort(key=lambda item: (-Decimal(item["percentage"]), item["name"].lower()))
return serialized
def _single_currency_amount(income_totals: list[dict]) -> tuple[str | None, Decimal] | None:
non_zero_totals: list[tuple[str, Decimal]] = []
for item in income_totals:
amount = Decimal(item["amount"])
if amount == 0:
continue
non_zero_totals.append((item["currency"], amount))
if not non_zero_totals:
return None, Decimal("0")
currencies = {currency for currency, _ in non_zero_totals}
if len(currencies) != 1:
return None
currency = non_zero_totals[0][0]
total_amount = sum((amount for _, amount in non_zero_totals), Decimal("0"))
return currency, total_amount
def _complete_percentage_rows(
rows: list[dict],
percentage_rows: list[dict],
*,
unavailable: bool = False,
) -> list[dict]:
if unavailable:
return []
existing_ids = {row["id"] for row in percentage_rows}
completed = percentage_rows + [
{"id": row["id"], "name": row["name"], "percentage": "0"}
for row in rows
if row["id"] not in existing_ids
]
completed.sort(key=lambda item: (-Decimal(item["percentage"]), item["name"].lower()))
return completed
def _serialize_time_percentage_rows(rows: list[dict], shares: dict[str, dict]) -> list[dict]:
items = [
{
"id": bucket["id"],
"name": bucket["name"],
"value": Decimal(bucket["seconds"]),
}
for bucket in shares.values()
]
total_seconds = sum((item["value"] for item in items), Decimal("0"))
return _complete_percentage_rows(rows, _allocate_percentage_rows(items, total_seconds))
def _serialize_income_percentage_rows(rows: list[dict], shares: dict[str, dict]) -> list[dict]:
items: list[dict] = []
currencies: set[str] = set()
for bucket in shares.values():
income_totals = _serialize_money_totals(bucket["income"])
currency_amount = _single_currency_amount(income_totals)
if currency_amount is None:
return []
currency, amount = currency_amount
if currency:
currencies.add(currency)
items.append(
{
"id": bucket["id"],
"name": bucket["name"],
"value": amount,
}
)
if len(currencies) > 1:
return []
total_income = sum((item["value"] for item in items), Decimal("0"))
if total_income <= 0:
return []
return _complete_percentage_rows(rows, _allocate_percentage_rows(items, total_income))
def _build_user_summary(user, entries: list[TimeEntry], *, language: str) -> dict:
summary = _summary_from_entries(entries)
project_rows = _build_breakdown(entries, "projects", language=language)
client_rows = _build_breakdown(entries, "clients", language=language)
tag_rows = _build_breakdown(entries, "tags", language=language)
project_shares = _accumulate_breakdown_shares(entries, "projects", language=language)
client_shares = _accumulate_breakdown_shares(entries, "clients", language=language)
tag_shares = _accumulate_breakdown_shares(entries, "tags", language=language)
return {
"user": {
@@ -222,71 +388,53 @@ def _build_user_summary(user, entries: list[TimeEntry]) -> dict:
},
"hourly_rates": _serialize_distinct_rates(entries),
"rate_periods": _serialize_rate_periods(entries),
"total_seconds": total_seconds,
"total_seconds": summary["billable_seconds"],
"total_duration": summary["total_duration"],
"billable_seconds": summary["billable_seconds"],
"billable_duration": summary["billable_duration"],
"non_billable_seconds": summary["non_billable_seconds"],
"non_billable_duration": summary["non_billable_duration"],
"income_totals": summary["income_totals"],
"project_percentages": _serialize_percentage_rows(project_shares, total_seconds),
"client_percentages": _serialize_percentage_rows(client_shares, total_seconds),
"tag_percentages": _serialize_percentage_rows(tag_shares, total_seconds),
"project_percentages": _serialize_time_percentage_rows(project_rows, project_shares),
"client_percentages": _serialize_time_percentage_rows(client_rows, client_shares),
"tag_percentages": _serialize_time_percentage_rows(tag_rows, tag_shares),
"project_income_percentages": _serialize_income_percentage_rows(project_rows, project_shares),
"client_income_percentages": _serialize_income_percentage_rows(client_rows, client_shares),
"tag_income_percentages": _serialize_income_percentage_rows(tag_rows, tag_shares),
}
def _build_user_summaries(entries: list[TimeEntry]) -> list[dict]:
def _build_user_summaries(entries: list[TimeEntry], *, language: str) -> list[dict]:
grouped: dict[str, list[TimeEntry]] = defaultdict(list)
for entry in entries:
grouped[str(entry.user_id)].append(entry)
summaries = [_build_user_summary(grouped_entries[0].user, grouped_entries) for grouped_entries in grouped.values() if grouped_entries]
summaries = [
_build_user_summary(grouped_entries[0].user, grouped_entries, language=language)
for grouped_entries in grouped.values()
if grouped_entries
]
summaries.sort(key=lambda item: item["user"]["name"].lower())
return summaries
def _build_overall_percentage_payload(entries: list[TimeEntry]) -> dict:
project_shares: dict[str, dict] = {}
client_shares: dict[str, dict] = {}
tag_shares: dict[str, dict] = {}
total_seconds = 0
for entry in entries:
if not entry.is_billable:
continue
duration_seconds = get_entry_duration_seconds(entry)
if duration_seconds <= 0:
continue
total_seconds += duration_seconds
if entry.project_id:
project_bucket = project_shares.setdefault(
str(entry.project_id),
{"id": str(entry.project_id), "name": entry.project.name, "seconds": 0},
)
project_bucket["seconds"] += duration_seconds
if entry.project and entry.project.client_id:
client_bucket = client_shares.setdefault(
str(entry.project.client_id),
{"id": str(entry.project.client_id), "name": entry.project.client.name, "seconds": 0},
)
client_bucket["seconds"] += duration_seconds
tags = list(entry.tags.all())
if tags:
allocated_seconds = Decimal(duration_seconds) / Decimal(len(tags))
for tag in tags:
tag_bucket = tag_shares.setdefault(
str(tag.id),
{"id": str(tag.id), "name": tag.name, "seconds": Decimal("0")},
)
tag_bucket["seconds"] += allocated_seconds
def _build_overall_percentage_payload(
entries: list[TimeEntry],
*,
language: str,
rows_by_kind: dict[str, list[dict]],
) -> dict:
project_shares = _accumulate_breakdown_shares(entries, "projects", language=language)
client_shares = _accumulate_breakdown_shares(entries, "clients", language=language)
tag_shares = _accumulate_breakdown_shares(entries, "tags", language=language)
return {
"project_percentages": _serialize_percentage_rows(project_shares, total_seconds),
"client_percentages": _serialize_percentage_rows(client_shares, total_seconds),
"tag_percentages": _serialize_percentage_rows(tag_shares, total_seconds),
"project_percentages": _serialize_time_percentage_rows(rows_by_kind["projects"], project_shares),
"client_percentages": _serialize_time_percentage_rows(rows_by_kind["clients"], client_shares),
"tag_percentages": _serialize_time_percentage_rows(rows_by_kind["tags"], tag_shares),
"project_income_percentages": _serialize_income_percentage_rows(rows_by_kind["projects"], project_shares),
"client_income_percentages": _serialize_income_percentage_rows(rows_by_kind["clients"], client_shares),
"tag_income_percentages": _serialize_income_percentage_rows(rows_by_kind["tags"], tag_shares),
}
@@ -344,7 +492,13 @@ class ReportFilterSerializer(serializers.Serializer):
language = serializers.ChoiceField(choices=("en", "fa"), required=False, default="en")
def _resolve_period_bounds(period: str, from_date: date | None, to_date: date | None, *, language: str) -> tuple[date, date]:
def _resolve_period_bounds(
period: str,
from_date: date | None,
to_date: date | None,
*,
language: str,
) -> tuple[date, date]:
today = timezone.localdate()
if language == "fa":
today_jalali = jdatetime.date.fromgregorian(date=today)
@@ -412,7 +566,11 @@ def load_report_filters(actor, raw_data) -> ReportFilters:
"user": raw_data.get("user"),
"client": raw_data.get("client"),
"project": raw_data.get("project"),
"tags": raw_data.get("tags") or raw_data.getlist("tags") if hasattr(raw_data, "getlist") else raw_data.get("tags"),
"tags": (
raw_data.get("tags") or raw_data.getlist("tags")
if hasattr(raw_data, "getlist")
else raw_data.get("tags")
),
"language": raw_data.get("language", "en"),
}
if normalized["tags"] and not isinstance(normalized["tags"], list):
@@ -476,8 +634,15 @@ def load_report_filters(actor, raw_data) -> ReportFilters:
def _base_queryset(filters: ReportFilters) -> QuerySet[TimeEntry]:
start_dt = timezone.make_aware(datetime.combine(filters.from_date, time.min), timezone.get_current_timezone())
end_dt = timezone.make_aware(datetime.combine(filters.to_date + timedelta(days=1), time.min), timezone.get_current_timezone())
current_timezone = timezone.get_current_timezone()
start_dt = timezone.make_aware(
datetime.combine(filters.from_date, time.min),
current_timezone,
)
end_dt = timezone.make_aware(
datetime.combine(filters.to_date + timedelta(days=1), time.min),
current_timezone,
)
queryset = (
TimeEntry.objects.filter(
@@ -660,7 +825,11 @@ def _scope_payload(filters: ReportFilters) -> dict:
"workspace": {
"id": str(filters.workspace.id),
"name": filters.workspace.name,
"thumbnail_path": filters.workspace.thumbnail.path if getattr(filters.workspace, "thumbnail", None) else None,
"thumbnail_path": (
filters.workspace.thumbnail.path
if getattr(filters.workspace, "thumbnail", None)
else None
),
},
"period": filters.period,
"from_date": filters.from_date.isoformat(),
@@ -684,16 +853,29 @@ def _table_report_payload(
) -> dict:
summary = _summary_from_entries(entries)
include_latest_rate = not (filters.is_workspace_scope and not filters.user_id)
client_rows = _build_breakdown(entries, "clients", language=filters.language)
project_rows = _build_breakdown(entries, "projects", language=filters.language)
tag_rows = _build_breakdown(entries, "tags", language=filters.language)
payload = {
"scope": _scope_payload(filters),
"summary": summary,
"days": _group_daily(entries, include_latest_rate=include_latest_rate),
"clients": _build_breakdown(entries, "clients"),
"projects": _build_breakdown(entries, "projects"),
"tags": _build_breakdown(entries, "tags"),
"clients": client_rows,
"projects": project_rows,
"tags": tag_rows,
}
if filters.is_workspace_scope and not filters.user_id:
payload.update(_build_overall_percentage_payload(entries))
payload.update(
_build_overall_percentage_payload(
entries,
language=filters.language,
rows_by_kind={
"clients": client_rows,
"projects": project_rows,
"tags": tag_rows,
},
)
)
if user_summary is not None:
payload["user_summary"] = user_summary
if user_summaries is not None:
@@ -761,64 +943,31 @@ def _group_daily(entries: list[TimeEntry], *, include_latest_rate: bool) -> list
return rows
def _build_breakdown(entries: list[TimeEntry], kind: str) -> list[dict]:
def _build_breakdown(entries: list[TimeEntry], kind: str, *, language: str) -> list[dict]:
data: dict[str, dict] = {}
for entry in entries:
if kind == "clients":
if not entry.project or not entry.project.client:
continue
item_id = str(entry.project.client_id)
item_name = entry.project.client.name
elif kind == "projects":
if not entry.project:
continue
item_id = str(entry.project_id)
item_name = entry.project.name
else:
if not entry.tags.exists():
continue
for tag in entry.tags.all():
bucket = data.setdefault(
str(tag.id),
{
"id": str(tag.id),
"name": tag.name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
continue
bucket = data.setdefault(
item_id,
{
"id": item_id,
"name": item_name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
for item_id, item_name in _breakdown_targets(entry, kind, language):
bucket = data.setdefault(
item_id,
{
"id": item_id,
"name": item_name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
rows = []
for item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()):
for _item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()):
rows.append(
{
"id": bucket["id"],
@@ -839,8 +988,16 @@ def build_table_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
entries = list(_base_queryset(filters))
if filters.is_workspace_scope and not filters.user_id:
return _table_report_payload(filters, entries, user_summaries=_build_user_summaries(entries))
user_summary = _build_user_summary(entries[0].user, entries) if entries and filters.user_id else None
return _table_report_payload(
filters,
entries,
user_summaries=_build_user_summaries(entries, language=filters.language),
)
user_summary = (
_build_user_summary(entries[0].user, entries, language=filters.language)
if entries and filters.user_id
else None
)
return _table_report_payload(filters, entries, user_summary=user_summary)
@@ -866,7 +1023,11 @@ def build_user_scoped_table_reports(actor, raw_filters) -> list[dict]:
_table_report_payload(
user_filters,
user_entries,
user_summary=_build_user_summary(user_entries[0].user, user_entries),
user_summary=_build_user_summary(
user_entries[0].user,
user_entries,
language=filters.language,
),
)
)
return reports