feat(reports): add localized workspace reports and exports

This commit is contained in:
2026-04-27 16:15:41 +03:30
parent fadf898486
commit e26263e93f
22 changed files with 2029 additions and 8 deletions

View File

@@ -0,0 +1,15 @@
from apps.reports.services.aggregation import (
build_chart_report,
build_day_details_report,
build_table_report,
build_user_scoped_table_reports,
load_report_filters,
)
__all__ = [
"load_report_filters",
"build_chart_report",
"build_table_report",
"build_user_scoped_table_reports",
"build_day_details_report",
]

View File

@@ -0,0 +1,604 @@
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, replace
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Iterable
import jdatetime
from django.contrib.auth import get_user_model
from django.db.models import Prefetch, QuerySet
from django.utils import timezone
from django.utils.dateparse import parse_date
from rest_framework import serializers
from apps.clients.models import Client
from apps.projects.models import Project
from apps.tags.models import Tag
from apps.time_entries.models import TimeEntry
from apps.workspaces.models import Workspace
from apps.workspaces.services import WORKSPACE_VIEW, get_workspace_role, has_workspace_capability
User = get_user_model()
PERIOD_THIS_WEEK = "this_week"
PERIOD_THIS_MONTH = "this_month"
PERIOD_THIS_YEAR = "this_year"
PERIOD_HALF_YEAR_FIRST = "half_year_first"
PERIOD_HALF_YEAR_SECOND = "half_year_second"
PERIOD_CUSTOM = "period"
ALLOWED_PERIODS = {
PERIOD_THIS_WEEK,
PERIOD_THIS_MONTH,
PERIOD_THIS_YEAR,
PERIOD_HALF_YEAR_FIRST,
PERIOD_HALF_YEAR_SECOND,
PERIOD_CUSTOM,
}
def _start_of_week(local_date: date) -> date:
days_since_sunday = (local_date.weekday() + 1) % 7
return local_date - timedelta(days=days_since_sunday)
def _start_of_persian_week(local_date: date) -> date:
days_since_saturday = (local_date.weekday() + 2) % 7
return local_date - timedelta(days=days_since_saturday)
def _localize_datetime(value: datetime) -> datetime:
if timezone.is_naive(value):
value = timezone.make_aware(value, timezone.get_current_timezone())
return timezone.localtime(value)
def _user_display(user) -> str:
full_name = getattr(user, "full_name", "").strip()
if full_name and full_name != "Anonymous":
return full_name
return getattr(user, "mobile", str(user))
def _format_duration_seconds(total_seconds: int) -> str:
total_seconds = max(int(total_seconds), 0)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def _money_map() -> defaultdict[str, Decimal]:
return defaultdict(lambda: Decimal("0.00"))
def _serialize_money_totals(values: dict[str, Decimal]) -> list[dict]:
return [
{"currency": currency, "amount": f"{amount.quantize(Decimal('0.01'))}"}
for currency, amount in sorted(values.items())
if amount is not None
]
def _add_income(bucket: defaultdict[str, Decimal], entry: TimeEntry):
if not entry.is_billable or not entry.hourly_rate:
return
duration_seconds = get_entry_duration_seconds(entry)
if duration_seconds <= 0:
return
hourly_rate = Decimal(entry.hourly_rate)
income = (hourly_rate * Decimal(duration_seconds) / Decimal(3600)).quantize(Decimal("0.01"))
bucket[(entry.currency or "USD")] += income
def get_entry_duration_seconds(entry: TimeEntry) -> int:
if entry.duration is not None:
return max(int(entry.duration.total_seconds()), 0)
if entry.end_time:
return max(int((entry.end_time - entry.start_time).total_seconds()), 0)
return 0
def _parse_id_list(values: Iterable[str]) -> list[str]:
return [str(value).strip() for value in values if str(value).strip()]
@dataclass
class ReportFilters:
workspace: Workspace
period: str
from_date: date
to_date: date
user_id: str | None
client_id: str | None
project_id: str | None
tag_ids: list[str]
actor: object
is_workspace_scope: bool
language: str
class ReportFilterSerializer(serializers.Serializer):
workspace = serializers.UUIDField()
period = serializers.ChoiceField(choices=sorted(ALLOWED_PERIODS))
from_date = serializers.DateField(required=False, allow_null=True)
to_date = serializers.DateField(required=False, allow_null=True)
user = serializers.UUIDField(required=False, allow_null=True)
client = serializers.UUIDField(required=False, allow_null=True)
project = serializers.UUIDField(required=False, allow_null=True)
tags = serializers.ListField(
child=serializers.UUIDField(),
required=False,
allow_empty=True,
)
language = serializers.ChoiceField(choices=("en", "fa"), required=False, default="en")
def _resolve_period_bounds(period: str, from_date: date | None, to_date: date | None, *, language: str) -> tuple[date, date]:
today = timezone.localdate()
if language == "fa":
today_jalali = jdatetime.date.fromgregorian(date=today)
if period == PERIOD_THIS_WEEK:
start = _start_of_persian_week(today)
return start, start + timedelta(days=6)
if period == PERIOD_THIS_MONTH:
start_j = jdatetime.date(today_jalali.year, today_jalali.month, 1)
if today_jalali.month == 12:
next_month_j = jdatetime.date(today_jalali.year + 1, 1, 1)
else:
next_month_j = jdatetime.date(today_jalali.year, today_jalali.month + 1, 1)
return start_j.togregorian(), next_month_j.togregorian() - timedelta(days=1)
if period == PERIOD_THIS_YEAR:
start = jdatetime.date(today_jalali.year, 1, 1).togregorian()
if jdatetime.date(today_jalali.year, 1, 1).isleap():
end = jdatetime.date(today_jalali.year, 12, 30).togregorian()
else:
end = jdatetime.date(today_jalali.year, 12, 29).togregorian()
return start, end
if period == PERIOD_HALF_YEAR_FIRST:
start = jdatetime.date(today_jalali.year, 1, 1).togregorian()
end = jdatetime.date(today_jalali.year, 6, 31).togregorian()
return start, end
if period == PERIOD_HALF_YEAR_SECOND:
start = jdatetime.date(today_jalali.year, 7, 1).togregorian()
if jdatetime.date(today_jalali.year, 1, 1).isleap():
end = jdatetime.date(today_jalali.year, 12, 30).togregorian()
else:
end = jdatetime.date(today_jalali.year, 12, 29).togregorian()
return start, end
if period == PERIOD_THIS_WEEK:
start = _start_of_week(today)
return start, start + timedelta(days=6)
if period == PERIOD_THIS_MONTH:
start = today.replace(day=1)
if start.month == 12:
next_month = start.replace(year=start.year + 1, month=1, day=1)
else:
next_month = start.replace(month=start.month + 1, day=1)
return start, next_month - timedelta(days=1)
if period == PERIOD_THIS_YEAR:
return date(today.year, 1, 1), date(today.year, 12, 31)
if period == PERIOD_HALF_YEAR_FIRST:
return date(today.year, 1, 1), date(today.year, 6, 30)
if period == PERIOD_HALF_YEAR_SECOND:
return date(today.year, 7, 1), date(today.year, 12, 31)
if period == PERIOD_CUSTOM:
if not from_date or not to_date:
raise serializers.ValidationError("Custom period requires from_date and to_date.")
if from_date > to_date:
raise serializers.ValidationError("from_date cannot be after to_date.")
if (to_date - from_date).days > 30:
raise serializers.ValidationError("Custom period cannot exceed 31 days.")
return from_date, to_date
raise serializers.ValidationError("Unsupported report period.")
def load_report_filters(actor, raw_data) -> ReportFilters:
normalized = {
"workspace": raw_data.get("workspace"),
"period": raw_data.get("period", PERIOD_THIS_MONTH),
"from_date": raw_data.get("from_date") or raw_data.get("from"),
"to_date": raw_data.get("to_date") or raw_data.get("to"),
"user": raw_data.get("user"),
"client": raw_data.get("client"),
"project": raw_data.get("project"),
"tags": raw_data.get("tags") or raw_data.getlist("tags") if hasattr(raw_data, "getlist") else raw_data.get("tags"),
"language": raw_data.get("language", "en"),
}
if normalized["tags"] and not isinstance(normalized["tags"], list):
normalized["tags"] = [normalized["tags"]]
serializer = ReportFilterSerializer(data=normalized)
serializer.is_valid(raise_exception=True)
validated = serializer.validated_data
workspace = Workspace.objects.filter(id=validated["workspace"]).first()
if not workspace:
raise serializers.ValidationError("Workspace not found.")
if not has_workspace_capability(actor, workspace, WORKSPACE_VIEW):
raise serializers.ValidationError("You do not have access to this workspace.")
from_date, to_date = _resolve_period_bounds(
validated["period"],
validated.get("from_date"),
validated.get("to_date"),
language=validated.get("language", "en"),
)
role = get_workspace_role(actor, workspace)
is_workspace_scope = role in {"owner", "admin"}
requested_user_id = str(validated["user"]) if validated.get("user") else None
if requested_user_id and not is_workspace_scope and requested_user_id != str(actor.id):
raise serializers.ValidationError("You cannot view another user's report.")
user_id = requested_user_id if is_workspace_scope else str(actor.id)
client_id = str(validated["client"]) if validated.get("client") else None
project_id = str(validated["project"]) if validated.get("project") else None
tag_ids = [str(tag_id) for tag_id in validated.get("tags", [])]
if client_id and not Client.objects.filter(id=client_id, workspace=workspace).exists():
raise serializers.ValidationError("Client does not belong to this workspace.")
if project_id and not Project.objects.filter(id=project_id, workspace=workspace).exists():
raise serializers.ValidationError("Project does not belong to this workspace.")
if tag_ids:
existing_tag_ids = set(Tag.objects.filter(id__in=tag_ids, workspace=workspace).values_list("id", flat=True))
if len(existing_tag_ids) != len(tag_ids):
raise serializers.ValidationError("One or more tags do not belong to this workspace.")
return ReportFilters(
workspace=workspace,
period=validated["period"],
from_date=from_date,
to_date=to_date,
user_id=user_id,
client_id=client_id,
project_id=project_id,
tag_ids=tag_ids,
actor=actor,
is_workspace_scope=is_workspace_scope,
language=validated.get("language", "en"),
)
def _base_queryset(filters: ReportFilters) -> QuerySet[TimeEntry]:
start_dt = timezone.make_aware(datetime.combine(filters.from_date, time.min), timezone.get_current_timezone())
end_dt = timezone.make_aware(datetime.combine(filters.to_date + timedelta(days=1), time.min), timezone.get_current_timezone())
queryset = (
TimeEntry.objects.filter(
workspace=filters.workspace,
is_deleted=False,
start_time__gte=start_dt,
start_time__lt=end_dt,
)
.select_related("project", "project__client", "workspace", "user")
.prefetch_related(Prefetch("tags", queryset=Tag.objects.filter(is_deleted=False)))
.order_by("-start_time", "-created_at")
.distinct()
)
if filters.user_id:
queryset = queryset.filter(user_id=filters.user_id)
if filters.client_id:
queryset = queryset.filter(project__client_id=filters.client_id)
if filters.project_id:
queryset = queryset.filter(project_id=filters.project_id)
if filters.tag_ids:
queryset = queryset.filter(tags__id__in=filters.tag_ids)
return queryset.distinct()
def _summary_from_entries(entries: list[TimeEntry]) -> dict:
total_seconds = 0
billable_seconds = 0
non_billable_seconds = 0
income_by_currency = _money_map()
for entry in entries:
duration_seconds = get_entry_duration_seconds(entry)
total_seconds += duration_seconds
if entry.is_billable:
billable_seconds += duration_seconds
else:
non_billable_seconds += duration_seconds
_add_income(income_by_currency, entry)
return {
"total_seconds": total_seconds,
"billable_seconds": billable_seconds,
"non_billable_seconds": non_billable_seconds,
"total_duration": _format_duration_seconds(total_seconds),
"billable_duration": _format_duration_seconds(billable_seconds),
"non_billable_duration": _format_duration_seconds(non_billable_seconds),
"income_totals": _serialize_money_totals(income_by_currency),
}
def _entry_payload(entry: TimeEntry) -> dict:
local_start = _localize_datetime(entry.start_time)
local_end = _localize_datetime(entry.end_time) if entry.end_time else None
duration_seconds = get_entry_duration_seconds(entry)
income_by_currency = _money_map()
_add_income(income_by_currency, entry)
return {
"id": str(entry.id),
"description": entry.description,
"user": {
"id": str(entry.user_id),
"name": _user_display(entry.user),
"mobile": entry.user.mobile,
},
"project": (
{
"id": str(entry.project_id),
"name": entry.project.name,
"client": (
{"id": str(entry.project.client_id), "name": entry.project.client.name}
if entry.project and entry.project.client
else None
),
}
if entry.project
else None
),
"tags": [{"id": str(tag.id), "name": tag.name, "color": tag.color} for tag in entry.tags.all()],
"start_time": local_start.isoformat(),
"end_time": local_end.isoformat() if local_end else None,
"duration_seconds": duration_seconds,
"duration": _format_duration_seconds(duration_seconds),
"is_billable": entry.is_billable,
"hourly_rate": str(entry.hourly_rate) if entry.hourly_rate is not None else None,
"currency": entry.currency,
"income_totals": _serialize_money_totals(income_by_currency),
}
def _bucket_label(filters: ReportFilters, bucket_date: date) -> str:
if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}:
return bucket_date.isoformat()
return bucket_date.strftime("%Y-%m")
def _bucket_key(filters: ReportFilters, local_dt: datetime) -> tuple[str, date]:
if filters.period in {PERIOD_THIS_WEEK, PERIOD_THIS_MONTH, PERIOD_CUSTOM}:
bucket_date = local_dt.date()
return bucket_date.isoformat(), bucket_date
bucket_date = date(local_dt.year, local_dt.month, 1)
return bucket_date.strftime("%Y-%m"), bucket_date
def build_chart_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
entries = list(_base_queryset(filters))
summary = _summary_from_entries(entries)
buckets: dict[str, dict] = {}
for entry in entries:
local_start = _localize_datetime(entry.start_time)
bucket_id, bucket_date = _bucket_key(filters, local_start)
bucket = buckets.setdefault(
bucket_id,
{
"bucket_key": bucket_id,
"bucket_label": _bucket_label(filters, bucket_date),
"total_seconds": 0,
"total_duration": "00:00:00",
},
)
bucket["total_seconds"] += get_entry_duration_seconds(entry)
serialized_buckets = []
for bucket in sorted(buckets.values(), key=lambda item: item["bucket_key"]):
bucket["total_duration"] = _format_duration_seconds(bucket["total_seconds"])
serialized_buckets.append(bucket)
return {
"scope": _scope_payload(filters),
"summary": summary,
"buckets": serialized_buckets,
}
def _scope_payload(filters: ReportFilters) -> dict:
user_payload = None
if filters.user_id:
target_user = User.objects.filter(id=filters.user_id).first()
if target_user:
user_payload = {
"id": str(target_user.id),
"name": _user_display(target_user),
"mobile": target_user.mobile,
}
return {
"workspace": {"id": str(filters.workspace.id), "name": filters.workspace.name},
"period": filters.period,
"from_date": filters.from_date.isoformat(),
"to_date": filters.to_date.isoformat(),
"user": user_payload,
"is_workspace_scope": filters.is_workspace_scope and not filters.user_id,
"filters": {
"client_id": filters.client_id,
"project_id": filters.project_id,
"tag_ids": filters.tag_ids,
},
}
def _table_report_payload(filters: ReportFilters, entries: list[TimeEntry]) -> dict:
summary = _summary_from_entries(entries)
return {
"scope": _scope_payload(filters),
"summary": summary,
"days": _group_daily(entries),
"clients": _build_breakdown(entries, "clients"),
"projects": _build_breakdown(entries, "projects"),
"tags": _build_breakdown(entries, "tags"),
}
def _group_daily(entries: list[TimeEntry]) -> list[dict]:
by_day: dict[str, dict] = {}
for entry in entries:
local_start = _localize_datetime(entry.start_time)
day_key = local_start.date().isoformat()
day_bucket = by_day.setdefault(
day_key,
{
"date": day_key,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
day_bucket["total_seconds"] += duration_seconds
if entry.is_billable:
day_bucket["billable_seconds"] += duration_seconds
else:
day_bucket["non_billable_seconds"] += duration_seconds
_add_income(day_bucket["income"], entry)
rows = []
for day_key in sorted(by_day.keys()):
bucket = by_day[day_key]
rows.append(
{
"date": bucket["date"],
"billable_seconds": bucket["billable_seconds"],
"non_billable_seconds": bucket["non_billable_seconds"],
"total_seconds": bucket["total_seconds"],
"billable_duration": _format_duration_seconds(bucket["billable_seconds"]),
"non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]),
"total_duration": _format_duration_seconds(bucket["total_seconds"]),
"income_totals": _serialize_money_totals(bucket["income"]),
}
)
return rows
def _build_breakdown(entries: list[TimeEntry], kind: str) -> list[dict]:
data: dict[str, dict] = {}
for entry in entries:
if kind == "clients":
if not entry.project or not entry.project.client:
continue
item_id = str(entry.project.client_id)
item_name = entry.project.client.name
elif kind == "projects":
if not entry.project:
continue
item_id = str(entry.project_id)
item_name = entry.project.name
else:
if not entry.tags.exists():
continue
for tag in entry.tags.all():
bucket = data.setdefault(
str(tag.id),
{
"id": str(tag.id),
"name": tag.name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
continue
bucket = data.setdefault(
item_id,
{
"id": item_id,
"name": item_name,
"billable_seconds": 0,
"non_billable_seconds": 0,
"total_seconds": 0,
"income": _money_map(),
},
)
duration_seconds = get_entry_duration_seconds(entry)
bucket["total_seconds"] += duration_seconds
if entry.is_billable:
bucket["billable_seconds"] += duration_seconds
else:
bucket["non_billable_seconds"] += duration_seconds
_add_income(bucket["income"], entry)
rows = []
for item_id, bucket in sorted(data.items(), key=lambda item: item[1]["name"].lower()):
rows.append(
{
"id": bucket["id"],
"name": bucket["name"],
"billable_seconds": bucket["billable_seconds"],
"non_billable_seconds": bucket["non_billable_seconds"],
"total_seconds": bucket["total_seconds"],
"billable_duration": _format_duration_seconds(bucket["billable_seconds"]),
"non_billable_duration": _format_duration_seconds(bucket["non_billable_seconds"]),
"total_duration": _format_duration_seconds(bucket["total_seconds"]),
"income_totals": _serialize_money_totals(bucket["income"]),
}
)
return rows
def build_table_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
entries = list(_base_queryset(filters))
return _table_report_payload(filters, entries)
def build_user_scoped_table_reports(actor, raw_filters) -> list[dict]:
filters = load_report_filters(actor, raw_filters)
if not (filters.is_workspace_scope and not filters.user_id):
return []
entries = list(_base_queryset(filters))
grouped: dict[str, list[TimeEntry]] = {}
for entry in entries:
grouped.setdefault(str(entry.user_id), []).append(entry)
sorted_groups = sorted(
grouped.items(),
key=lambda item: _user_display(item[1][0].user).lower(),
)
reports: list[dict] = []
for user_id, user_entries in sorted_groups:
user_filters = replace(filters, user_id=user_id)
reports.append(_table_report_payload(user_filters, user_entries))
return reports
def build_day_details_report(actor, raw_filters) -> dict:
filters = load_report_filters(actor, raw_filters)
day_value = raw_filters.get("day") or raw_filters.get("date")
target_day = parse_date(day_value) if day_value else None
if not target_day:
raise serializers.ValidationError("A valid day is required.")
if target_day < filters.from_date or target_day > filters.to_date:
raise serializers.ValidationError("Requested day is outside the filtered period.")
entries = [
entry
for entry in _base_queryset(filters)
if _localize_datetime(entry.start_time).date() == target_day
]
summary = _summary_from_entries(entries)
return {
"scope": _scope_payload(filters),
"day": target_day.isoformat(),
"summary": summary,
"entries": [_entry_payload(entry) for entry in entries],
}

View File

@@ -0,0 +1,173 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from typing import Iterable
import jdatetime
from arabic_reshaper import reshape
from bidi.algorithm import get_display
PERSIAN_DIGITS = str.maketrans("0123456789", "۰۱۲۳۴۵۶۷۸۹")
ARABIC_RANGES = (
(0x0600, 0x06FF),
(0x0750, 0x077F),
(0x08A0, 0x08FF),
(0xFB50, 0xFDFF),
(0xFE70, 0xFEFF),
)
TRANSLATIONS = {
"en": {
"report_title": "Workspace Report",
"overall_sheet": "Overall Report",
"workspace": "Workspace",
"period": "Period",
"from_date": "From date",
"to_date": "To date",
"user": "User",
"all_users": "All users",
"generated_at": "Generated at",
"summary": "Summary",
"total_hours": "Total hours",
"billable_hours": "Billable hours",
"non_billable_hours": "Non-billable hours",
"income": "Income",
"daily_summary": "Daily Summary",
"clients": "Clients",
"projects": "Projects",
"tags": "Tags",
"date": "Date",
"name": "Name",
"total": "Total",
"no_data": "No data",
},
"fa": {
"report_title": "گزارش فضای کاری",
"overall_sheet": "گزارش کلی",
"workspace": "فضای کاری",
"period": "بازه",
"from_date": "از تاریخ",
"to_date": "تا تاریخ",
"user": "کاربر",
"all_users": "همه کاربران",
"generated_at": "تاریخ تولید",
"summary": "خلاصه",
"total_hours": "کل ساعات",
"billable_hours": "ساعات کاری",
"non_billable_hours": "ساعات غیر کاری",
"income": "درآمد",
"daily_summary": "خلاصه روزانه",
"clients": "مشتریان",
"projects": "پروژه‌ها",
"tags": "تگ‌ها",
"date": "تاریخ",
"name": "نام",
"total": "جمع",
"no_data": "بدون داده",
},
}
PERIOD_LABELS = {
"en": {
"this_week": "This week",
"this_month": "This month",
"this_year": "This year",
"half_year_first": "First half of year",
"half_year_second": "Second half of year",
"period": "Custom period",
},
"fa": {
"this_week": "این هفته",
"this_month": "این ماه",
"this_year": "امسال",
"half_year_first": "نیمه اول سال",
"half_year_second": "نیمه دوم سال",
"period": "بازه دلخواه",
},
}
@dataclass(frozen=True)
class ExportLocale:
language: str
is_rtl: bool
font_regular: str
font_bold: str
def t(self, key: str) -> str:
return TRANSLATIONS[self.language][key]
def period_label(self, period: str) -> str:
return PERIOD_LABELS[self.language].get(period, period)
def format_number(self, value: object, *, ascii_digits: bool = False) -> str:
text = str(value)
if self.language == "fa" and not ascii_digits:
return text.translate(PERSIAN_DIGITS)
return text
def format_date(self, value: date | str | None, *, ascii_digits: bool = False) -> str:
if value is None:
return "-"
if isinstance(value, str):
value = date.fromisoformat(value)
if self.language == "fa":
jalali = jdatetime.date.fromgregorian(date=value)
return self.format_number(jalali.strftime("%Y/%m/%d"), ascii_digits=ascii_digits)
return value.strftime("%Y/%m/%d")
def format_duration(self, value: str, *, ascii_digits: bool = False) -> str:
return self.format_number(value, ascii_digits=ascii_digits)
def format_money_label(self, income_totals: list[dict], *, ascii_digits: bool = False) -> str:
if not income_totals:
return "-"
parts = []
for item in income_totals:
parts.append(f"{self.format_number(item['amount'], ascii_digits=ascii_digits)} {item['currency']}")
return " | ".join(parts)
def shape(self, text: object) -> str:
raw = str(text)
if not any(start <= ord(char) <= end for char in raw for start, end in ARABIC_RANGES):
return raw
return get_display(reshape(raw))
def build_export_locale(language: str | None) -> ExportLocale:
resolved = language if language in {"en", "fa"} else "en"
assets_dir = Path(__file__).resolve().parent.parent / "assets" / "fonts"
return ExportLocale(
language=resolved,
is_rtl=resolved == "fa",
font_regular=str(assets_dir / "Vazirmatn-Regular.ttf"),
font_bold=str(assets_dir / "Vazirmatn-Bold.ttf"),
)
def user_label(user_payload: dict | None, locale: ExportLocale, *, ascii_digits: bool = False) -> str:
if not user_payload:
return locale.t("all_users")
mobile = locale.format_number(user_payload.get("mobile") or "", ascii_digits=ascii_digits)
if mobile:
return f"{user_payload['name']} - {mobile}"
return str(user_payload["name"])
def safe_sheet_title(title: str, used: Iterable[str]) -> str:
invalid = set('[]:*?/\\')
sanitized = "".join("-" if char in invalid else char for char in title).strip() or "Sheet"
base = sanitized[:31]
used_set = set(used)
if base not in used_set:
return base
index = 2
while True:
suffix = f"-{index}"
candidate = f"{sanitized[:31 - len(suffix)]}{suffix}"
if candidate not in used_set:
return candidate
index += 1

View File

@@ -0,0 +1,410 @@
from __future__ import annotations
import io
from datetime import datetime
from openpyxl import Workbook
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4, landscape
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
from apps.reports.services.export_i18n import ExportLocale, safe_sheet_title, user_label
HEADER_FILL = PatternFill("solid", fgColor="E7F2FF")
SECTION_FILL = PatternFill("solid", fgColor="F8FAFC")
BORDER = Border(
left=Side(style="thin", color="D0D7DE"),
right=Side(style="thin", color="D0D7DE"),
top=Side(style="thin", color="D0D7DE"),
bottom=Side(style="thin", color="D0D7DE"),
)
def _register_pdf_fonts(locale: ExportLocale) -> None:
registered = set(pdfmetrics.getRegisteredFontNames())
if "Vazirmatn" not in registered:
pdfmetrics.registerFont(TTFont("Vazirmatn", locale.font_regular))
if "Vazirmatn-Bold" not in registered:
pdfmetrics.registerFont(TTFont("Vazirmatn-Bold", locale.font_bold))
def _apply_cell_style(cell, *, bold: bool = False, fill=None, rtl: bool = False) -> None:
cell.font = Font(name="Calibri", bold=bold, size=11)
cell.border = BORDER
cell.alignment = Alignment(horizontal="right" if rtl else "left", vertical="center")
if fill is not None:
cell.fill = fill
def _autosize_columns(worksheet) -> None:
widths: dict[int, int] = {}
for row in worksheet.iter_rows():
for cell in row:
if cell.value is None:
continue
widths[cell.column] = max(widths.get(cell.column, 0), len(str(cell.value)))
for column_index, width in widths.items():
worksheet.column_dimensions[get_column_letter(column_index)].width = min(max(width + 4, 12), 30)
def _money_label(locale: ExportLocale, income_totals: list[dict]) -> str:
return locale.format_money_label(income_totals)
def _money_label_excel(locale: ExportLocale, income_totals: list[dict]) -> str:
return locale.format_money_label(income_totals, ascii_digits=True)
def _section_headers(locale: ExportLocale) -> list[str]:
headers = [
locale.t("name"),
locale.t("billable_hours"),
locale.t("non_billable_hours"),
locale.t("total_hours"),
locale.t("income"),
]
return list(reversed(headers)) if locale.is_rtl else headers
def _rtl_row(locale: ExportLocale, row: list[str]) -> list[str]:
return list(reversed(row)) if locale.is_rtl else row
def _append_meta_block(worksheet, *, locale: ExportLocale, report_data: dict) -> None:
scope = report_data["scope"]
summary = report_data["summary"]
worksheet.append([locale.t("report_title"), scope["workspace"]["name"]])
worksheet.append([locale.t("workspace"), scope["workspace"]["name"]])
worksheet.append([locale.t("period"), locale.period_label(scope["period"])])
worksheet.append([locale.t("from_date"), locale.format_date(scope["from_date"], ascii_digits=True)])
worksheet.append([locale.t("to_date"), locale.format_date(scope["to_date"], ascii_digits=True)])
worksheet.append([locale.t("user"), user_label(scope.get("user"), locale, ascii_digits=True)])
worksheet.append([locale.t("generated_at"), locale.format_date(datetime.now().date(), ascii_digits=True)])
worksheet.append([])
worksheet.append([locale.t("summary")])
worksheet.append([locale.t("total_hours"), locale.format_duration(summary["total_duration"], ascii_digits=True)])
worksheet.append([locale.t("billable_hours"), locale.format_duration(summary["billable_duration"], ascii_digits=True)])
worksheet.append([locale.t("non_billable_hours"), locale.format_duration(summary["non_billable_duration"], ascii_digits=True)])
worksheet.append([locale.t("income"), _money_label_excel(locale, summary["income_totals"])])
for row_index in range(1, worksheet.max_row + 1):
first_cell = worksheet.cell(row=row_index, column=1)
second_cell = worksheet.cell(row=row_index, column=2)
if row_index in {1, 9}:
_apply_cell_style(first_cell, bold=True, fill=HEADER_FILL, rtl=locale.is_rtl)
if second_cell.value:
_apply_cell_style(second_cell, bold=row_index == 1, fill=HEADER_FILL if row_index == 1 else None, rtl=locale.is_rtl)
elif first_cell.value:
_apply_cell_style(first_cell, bold=False, fill=SECTION_FILL if row_index == 8 else None, rtl=locale.is_rtl)
if second_cell.value:
_apply_cell_style(second_cell, rtl=locale.is_rtl)
def _append_daily_table(worksheet, *, locale: ExportLocale, report_data: dict) -> None:
worksheet.append([])
worksheet.append([locale.t("daily_summary")])
header_row = worksheet.max_row + 1
worksheet.append(
_rtl_row(
locale,
[
locale.t("date"),
locale.t("billable_hours"),
locale.t("non_billable_hours"),
locale.t("total_hours"),
locale.t("income"),
],
)
)
for cell in worksheet[header_row]:
_apply_cell_style(cell, bold=True, fill=HEADER_FILL, rtl=locale.is_rtl)
if not report_data["days"]:
worksheet.append([locale.t("no_data")])
_apply_cell_style(worksheet.cell(row=worksheet.max_row, column=1), rtl=locale.is_rtl)
return
for row in report_data["days"]:
worksheet.append(
_rtl_row(
locale,
[
locale.format_date(row["date"], ascii_digits=True),
locale.format_duration(row["billable_duration"], ascii_digits=True),
locale.format_duration(row["non_billable_duration"], ascii_digits=True),
locale.format_duration(row["total_duration"], ascii_digits=True),
_money_label_excel(locale, row["income_totals"]),
],
)
)
for cell in worksheet[worksheet.max_row]:
_apply_cell_style(cell, rtl=locale.is_rtl)
worksheet.append(
_rtl_row(
locale,
[
locale.t("total"),
locale.format_duration(report_data["summary"]["billable_duration"], ascii_digits=True),
locale.format_duration(report_data["summary"]["non_billable_duration"], ascii_digits=True),
locale.format_duration(report_data["summary"]["total_duration"], ascii_digits=True),
_money_label_excel(locale, report_data["summary"]["income_totals"]),
],
)
)
for cell in worksheet[worksheet.max_row]:
_apply_cell_style(cell, bold=True, fill=SECTION_FILL, rtl=locale.is_rtl)
def _append_breakdown_table(worksheet, *, locale: ExportLocale, title_key: str, rows: list[dict]) -> None:
worksheet.append([])
worksheet.append([locale.t(title_key)])
header_row = worksheet.max_row + 1
worksheet.append(_section_headers(locale))
for cell in worksheet[header_row]:
_apply_cell_style(cell, bold=True, fill=HEADER_FILL, rtl=locale.is_rtl)
if not rows:
worksheet.append([locale.t("no_data")])
_apply_cell_style(worksheet.cell(row=worksheet.max_row, column=1), rtl=locale.is_rtl)
return
for row in rows:
worksheet.append(
_rtl_row(
locale,
[
row["name"],
locale.format_duration(row["billable_duration"], ascii_digits=True),
locale.format_duration(row["non_billable_duration"], ascii_digits=True),
locale.format_duration(row["total_duration"], ascii_digits=True),
_money_label_excel(locale, row["income_totals"]),
],
)
)
for cell in worksheet[worksheet.max_row]:
_apply_cell_style(cell, rtl=locale.is_rtl)
def _render_excel_sheet(worksheet, *, locale: ExportLocale, report_data: dict) -> None:
if locale.is_rtl:
worksheet.sheet_view.rightToLeft = True
_append_meta_block(worksheet, locale=locale, report_data=report_data)
_append_daily_table(worksheet, locale=locale, report_data=report_data)
_append_breakdown_table(worksheet, locale=locale, title_key="clients", rows=report_data["clients"])
_append_breakdown_table(worksheet, locale=locale, title_key="projects", rows=report_data["projects"])
_append_breakdown_table(worksheet, locale=locale, title_key="tags", rows=report_data["tags"])
_autosize_columns(worksheet)
def build_excel_report(*, report_data: dict, locale: ExportLocale, per_user_reports: list[dict] | None = None) -> bytes:
workbook = Workbook()
overall_sheet = workbook.active
overall_sheet.title = safe_sheet_title(locale.t("overall_sheet"), [])
_render_excel_sheet(overall_sheet, locale=locale, report_data=report_data)
used_titles = {overall_sheet.title}
for user_report in per_user_reports or []:
user_title = safe_sheet_title(user_label(user_report["scope"].get("user"), locale, ascii_digits=True), used_titles)
worksheet = workbook.create_sheet(title=user_title)
_render_excel_sheet(worksheet, locale=locale, report_data=user_report)
used_titles.add(user_title)
buffer = io.BytesIO()
workbook.save(buffer)
return buffer.getvalue()
def _paragraph(text: str, style: ParagraphStyle, locale: ExportLocale) -> Paragraph:
return Paragraph(locale.shape(text), style)
def _styled_table(data: list[list[str]], *, locale: ExportLocale, column_widths: list[float]) -> Table:
shaped_data = [
[locale.shape(cell) if cell is not None else "" for cell in row]
for row in data
]
table = Table(shaped_data, colWidths=column_widths, repeatRows=1)
table.setStyle(
TableStyle(
[
("FONTNAME", (0, 0), (-1, 0), "Vazirmatn-Bold" if locale.language == "fa" else "Helvetica-Bold"),
("FONTNAME", (0, 1), (-1, -1), "Vazirmatn" if locale.language == "fa" else "Helvetica"),
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#E7F2FF")),
("TEXTCOLOR", (0, 0), (-1, -1), colors.HexColor("#0F172A")),
("GRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#CBD5E1")),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("ALIGN", (0, 0), (-1, -1), "RIGHT" if locale.is_rtl else "LEFT"),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F8FAFC")]),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
]
)
)
return table
def _report_table_rows(locale: ExportLocale, rows: list[dict]) -> list[list[str]]:
if not rows:
return [_rtl_row(locale, [locale.t("no_data"), "", "", "", ""])]
return [
_rtl_row(
locale,
[
locale.format_date(row.get("date")) if row.get("date") else row["name"],
locale.format_duration(row["billable_duration"]),
locale.format_duration(row["non_billable_duration"]),
locale.format_duration(row["total_duration"]),
_money_label(locale, row["income_totals"]),
],
)
for row in rows
]
def build_pdf_report(*, report_data: dict, locale: ExportLocale) -> bytes:
_register_pdf_fonts(locale)
font_regular = "Vazirmatn"
font_bold = "Vazirmatn-Bold"
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
"ReportTitle",
parent=styles["Heading1"],
fontName=font_bold,
fontSize=18,
leading=24,
alignment=2 if locale.is_rtl else 0,
textColor=colors.HexColor("#0F172A"),
)
section_style = ParagraphStyle(
"ReportSection",
parent=styles["Heading3"],
fontName=font_bold,
fontSize=12,
leading=16,
alignment=2 if locale.is_rtl else 0,
textColor=colors.HexColor("#0F172A"),
)
body_style = ParagraphStyle(
"ReportBody",
parent=styles["BodyText"],
fontName=font_regular,
fontSize=10,
leading=14,
alignment=2 if locale.is_rtl else 0,
textColor=colors.HexColor("#334155"),
)
buffer = io.BytesIO()
doc = SimpleDocTemplate(
buffer,
pagesize=landscape(A4),
leftMargin=14 * mm,
rightMargin=14 * mm,
topMargin=14 * mm,
bottomMargin=14 * mm,
)
scope = report_data["scope"]
summary = report_data["summary"]
story = [
_paragraph(f"{locale.t('report_title')} - {scope['workspace']['name']}", title_style, locale),
Spacer(1, 6 * mm),
]
meta_rows = [
[locale.t("workspace"), scope["workspace"]["name"]],
[locale.t("period"), locale.period_label(scope["period"])],
[locale.t("from_date"), locale.format_date(scope["from_date"])],
[locale.t("to_date"), locale.format_date(scope["to_date"])],
[locale.t("user"), user_label(scope.get("user"), locale)],
[locale.t("generated_at"), locale.format_date(datetime.now().date())],
]
if locale.is_rtl:
meta_rows = [_rtl_row(locale, row) for row in meta_rows]
meta_rows = [[locale.shape(cell) for cell in row] for row in meta_rows]
meta_table = Table(meta_rows, colWidths=[doc.width * 0.24, doc.width * 0.76])
meta_table.setStyle(
TableStyle(
[
("FONTNAME", (0, 0), (0, -1), font_bold),
("FONTNAME", (1, 0), (1, -1), font_regular),
("BACKGROUND", (0, 0), (0, -1), colors.HexColor("#F8FAFC")),
("BOX", (0, 0), (-1, -1), 0.5, colors.HexColor("#CBD5E1")),
("INNERGRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#CBD5E1")),
("ALIGN", (0, 0), (-1, -1), "RIGHT" if locale.is_rtl else "LEFT"),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.extend([meta_table, Spacer(1, 5 * mm)])
summary_data = [
[locale.t("total_hours"), locale.format_duration(summary["total_duration"])],
[locale.t("billable_hours"), locale.format_duration(summary["billable_duration"])],
[locale.t("non_billable_hours"), locale.format_duration(summary["non_billable_duration"])],
[locale.t("income"), _money_label(locale, summary["income_totals"])],
]
if locale.is_rtl:
summary_data = [_rtl_row(locale, row) for row in summary_data]
summary_data = [[locale.shape(cell) for cell in row] for row in summary_data]
summary_table = Table(summary_data, colWidths=[doc.width * 0.38, doc.width * 0.62])
summary_table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), colors.HexColor("#EFF6FF")),
("BOX", (0, 0), (-1, -1), 0.5, colors.HexColor("#BFDBFE")),
("INNERGRID", (0, 0), (-1, -1), 0.5, colors.HexColor("#BFDBFE")),
("FONTNAME", (0, 0), (0, -1), font_bold),
("FONTNAME", (1, 0), (1, -1), font_regular),
("ALIGN", (0, 0), (-1, -1), "RIGHT" if locale.is_rtl else "LEFT"),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
]
)
)
story.extend([summary_table, Spacer(1, 6 * mm)])
sections = [
("daily_summary", report_data["days"], True),
("clients", report_data["clients"], False),
("projects", report_data["projects"], False),
("tags", report_data["tags"], False),
]
for title_key, rows, is_daily in sections:
story.append(_paragraph(locale.t(title_key), section_style, locale))
story.append(Spacer(1, 2 * mm))
header = _rtl_row(
locale,
[
locale.t("date") if is_daily else locale.t("name"),
locale.t("billable_hours"),
locale.t("non_billable_hours"),
locale.t("total_hours"),
locale.t("income"),
],
)
table = _styled_table(
[header, *_report_table_rows(locale, rows)],
locale=locale,
column_widths=[
doc.width * 0.26,
doc.width * 0.15,
doc.width * 0.17,
doc.width * 0.14,
doc.width * 0.28,
],
)
story.extend([table, Spacer(1, 5 * mm)])
doc.build(story)
return buffer.getvalue()