from __future__ import annotations from dataclasses import dataclass from datetime import datetime, time from django.db.models import Q, QuerySet, TextField from django.db.models.functions import Cast from django.utils import timezone from django.utils.dateparse import parse_date, parse_datetime from auditlog.models import LogEntry from apps.logs.services.constants import ( EVENT_ACTIVATE, EVENT_ARCHIVE, EVENT_CREATE, EVENT_DEACTIVATE, EVENT_DELETE, EVENT_RESTORE, EVENT_UNARCHIVE, EVENT_UPDATE, LOG_EVENTS, LOG_SECTIONS, SECTION_BY_MODEL_LABEL, SECTION_REPORT_EXPORTS, SECTION_WORKSPACE_MEMBERS, ) @dataclass(frozen=True) class WorkspaceLogFilters: workspace_id: str section: str | None = None actor_id: str | None = None event: str | None = None search: str | None = None from_value: datetime | None = None to_value: datetime | None = None ordering: str = "-timestamp" def get_log_model_label(entry: LogEntry) -> str | None: if not entry.content_type_id: return None return f"{entry.content_type.app_label}.{entry.content_type.model}" def get_log_section(entry: LogEntry) -> str | None: additional_data = entry.additional_data or {} section = additional_data.get("section") if section in LOG_SECTIONS: return section return SECTION_BY_MODEL_LABEL.get(get_log_model_label(entry) or "") def get_log_workspace_id(entry: LogEntry) -> str | None: additional_data = entry.additional_data or {} workspace_id = additional_data.get("workspace_id") return str(workspace_id) if workspace_id else None def normalize_log_event(entry: LogEntry) -> str: changes = entry.changes or {} is_deleted = changes.get("is_deleted") if isinstance(is_deleted, (list, tuple)) and len(is_deleted) >= 2: if is_deleted[0] in {False, "False", "false", None, "None"} and is_deleted[1] in {True, "True", "true"}: return EVENT_DELETE if is_deleted[0] in {True, "True", "true"} and is_deleted[1] in {False, "False", "false", None, "None"}: return EVENT_RESTORE is_archived = changes.get("is_archived") if isinstance(is_archived, (list, tuple)) and len(is_archived) >= 2: if is_archived[0] in {False, "False", "false", None, "None"} and is_archived[1] in {True, "True", "true"}: return EVENT_ARCHIVE if is_archived[0] in {True, "True", "true"} and is_archived[1] in {False, "False", "false", None, "None"}: return EVENT_UNARCHIVE is_active = changes.get("is_active") if isinstance(is_active, (list, tuple)) and len(is_active) >= 2: if is_active[0] in {False, "False", "false", None, "None"} and is_active[1] in {True, "True", "true"}: return EVENT_ACTIVATE if is_active[0] in {True, "True", "true"} and is_active[1] in {False, "False", "false", None, "None"}: return EVENT_DEACTIVATE if entry.action == LogEntry.Action.CREATE: return EVENT_CREATE return EVENT_UPDATE def _stringify_change_value(value) -> str | None: if value in (None, "", "None"): return None if isinstance(value, bool): return "true" if value else "false" if isinstance(value, list): return ", ".join(str(item) for item in value if item not in (None, "")) return str(value) def get_log_change_rows(entry: LogEntry) -> list[dict]: changes = entry.changes or {} display_changes = {} try: display_changes = entry.changes_display_dict or {} except Exception: display_changes = {} rows = [] for field_name, raw_value in changes.items(): if isinstance(raw_value, dict) and raw_value.get("type") == "m2m": operation = raw_value.get("operation", "update") objects = [str(item) for item in raw_value.get("objects", [])] rows.append( { "field": field_name, "label": field_name.replace("_", " ").title(), "change_type": "m2m", "operation": operation, "old_value": None, "new_value": ", ".join(objects) if objects else None, "summary": f"{operation.title()}: {', '.join(objects)}" if objects else operation.title(), } ) continue old_value = None new_value = None display_value = display_changes.get(field_name.replace("_", " ").title()) or display_changes.get(field_name) if isinstance(display_value, (list, tuple)) and len(display_value) >= 2: old_value = _stringify_change_value(display_value[0]) new_value = _stringify_change_value(display_value[1]) elif isinstance(raw_value, (list, tuple)) and len(raw_value) >= 2: old_value = _stringify_change_value(raw_value[0]) new_value = _stringify_change_value(raw_value[1]) rows.append( { "field": field_name, "label": field_name.replace("_", " ").title(), "change_type": "field", "operation": "replace", "old_value": old_value, "new_value": new_value, "summary": f"{field_name}: {old_value or '-'} -> {new_value or '-'}", } ) return rows def get_log_preview_changes(entry: LogEntry, limit: int = 3) -> list[dict]: return [ { "field": row["field"], "label": row["label"], "summary": row["summary"], } for row in get_log_change_rows(entry)[:limit] ] def is_visible_workspace_log(entry: LogEntry) -> bool: if entry.actor_id is None: return False section = get_log_section(entry) if section not in LOG_SECTIONS: return False additional_data = entry.additional_data or {} if section == SECTION_REPORT_EXPORTS and entry.actor_id is None: return False if ( section == SECTION_WORKSPACE_MEMBERS and normalize_log_event(entry) == EVENT_CREATE and additional_data.get("canonical_owner_membership") is True ): return False return True def filter_workspace_logs(queryset: QuerySet, filters: WorkspaceLogFilters) -> QuerySet: queryset = queryset.filter(additional_data__workspace_id=filters.workspace_id) if filters.section: queryset = queryset.filter(additional_data__section=filters.section) if filters.actor_id: queryset = queryset.filter(actor_id=filters.actor_id) if filters.from_value: queryset = queryset.filter(timestamp__gte=filters.from_value) if filters.to_value: queryset = queryset.filter(timestamp__lte=filters.to_value) queryset = queryset.annotate(changes_json_text=Cast("changes", TextField())) if filters.search: queryset = queryset.filter( Q(object_repr__icontains=filters.search) | Q(changes_text__icontains=filters.search) | Q(changes_json_text__icontains=filters.search) | Q(actor__first_name__icontains=filters.search) | Q(actor__last_name__icontains=filters.search) | Q(actor__mobile__icontains=filters.search) | Q(additional_data__target_label__icontains=filters.search) ) queryset = queryset.order_by(filters.ordering) if filters.event: matching_ids = [ entry.id for entry in queryset if normalize_log_event(entry) == filters.event and is_visible_workspace_log(entry) ] return queryset.filter(id__in=matching_ids) matching_ids = [entry.id for entry in queryset if is_visible_workspace_log(entry)] return queryset.filter(id__in=matching_ids) def build_log_actor_payload(entry: LogEntry) -> dict | None: actor = entry.actor if not actor: return None full_name = actor.full_name if getattr(actor, "full_name", "").strip() else actor.mobile return { "id": str(actor.id), "full_name": full_name, "mobile": actor.mobile, "profile_picture": actor.profile_picture.url if getattr(actor, "profile_picture", None) else None, } def build_log_target_payload(entry: LogEntry) -> dict: additional_data = entry.additional_data or {} return { "id": str(additional_data.get("target_id") or entry.object_pk), "name": additional_data.get("target_label") or entry.object_repr, "section": get_log_section(entry), "workspace_id": get_log_workspace_id(entry), } def serialize_workspace_log_list_item(entry: LogEntry) -> dict: rows = get_log_change_rows(entry) return { "id": entry.id, "timestamp": timezone.localtime(entry.timestamp).isoformat(), "section": get_log_section(entry), "model": get_log_model_label(entry), "event": normalize_log_event(entry), "audit_action": entry.get_action_display(), "actor": build_log_actor_payload(entry), "target": build_log_target_payload(entry), "changed_fields": [row["field"] for row in rows], "preview_changes": get_log_preview_changes(entry), } def serialize_workspace_log_detail(entry: LogEntry) -> dict: payload = serialize_workspace_log_list_item(entry) payload.update( { "remote_addr": entry.remote_addr, "changes": get_log_change_rows(entry), "raw_changes": entry.changes or {}, "serialized_snapshot": entry.serialized_data, "additional_data": entry.additional_data or {}, } ) return payload def parse_filter_datetime(value: str | None, *, is_end: bool = False) -> datetime | None: if not value: return None parsed_datetime = parse_datetime(value) if parsed_datetime is not None: if timezone.is_naive(parsed_datetime): return timezone.make_aware(parsed_datetime, timezone.get_current_timezone()) return parsed_datetime parsed_date = parse_date(value) if parsed_date is None: return None boundary = time.max if is_end else time.min return timezone.make_aware( datetime.combine(parsed_date, boundary), timezone.get_current_timezone(), )