feat(logs): add workspace activity log api

This commit is contained in:
2026-04-28 16:42:37 +03:30
parent c8a118788b
commit 71924ce6fb
32 changed files with 1118 additions and 122 deletions

297
apps/logs/services/query.py Normal file
View File

@@ -0,0 +1,297 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, time
from django.db.models import Q, QuerySet, TextField
from django.db.models.functions import Cast
from django.utils import timezone
from django.utils.dateparse import parse_date, parse_datetime
from auditlog.models import LogEntry
from apps.logs.services.constants import (
EVENT_ACTIVATE,
EVENT_ARCHIVE,
EVENT_CREATE,
EVENT_DEACTIVATE,
EVENT_DELETE,
EVENT_RESTORE,
EVENT_UNARCHIVE,
EVENT_UPDATE,
LOG_EVENTS,
LOG_SECTIONS,
SECTION_BY_MODEL_LABEL,
SECTION_REPORT_EXPORTS,
SECTION_WORKSPACE_MEMBERS,
)
@dataclass(frozen=True)
class WorkspaceLogFilters:
workspace_id: str
section: str | None = None
actor_id: str | None = None
event: str | None = None
search: str | None = None
from_value: datetime | None = None
to_value: datetime | None = None
ordering: str = "-timestamp"
def get_log_model_label(entry: LogEntry) -> str | None:
if not entry.content_type_id:
return None
return f"{entry.content_type.app_label}.{entry.content_type.model}"
def get_log_section(entry: LogEntry) -> str | None:
additional_data = entry.additional_data or {}
section = additional_data.get("section")
if section in LOG_SECTIONS:
return section
return SECTION_BY_MODEL_LABEL.get(get_log_model_label(entry) or "")
def get_log_workspace_id(entry: LogEntry) -> str | None:
additional_data = entry.additional_data or {}
workspace_id = additional_data.get("workspace_id")
return str(workspace_id) if workspace_id else None
def normalize_log_event(entry: LogEntry) -> str:
changes = entry.changes or {}
is_deleted = changes.get("is_deleted")
if isinstance(is_deleted, (list, tuple)) and len(is_deleted) >= 2:
if is_deleted[0] in {False, "False", "false", None, "None"} and is_deleted[1] in {True, "True", "true"}:
return EVENT_DELETE
if is_deleted[0] in {True, "True", "true"} and is_deleted[1] in {False, "False", "false", None, "None"}:
return EVENT_RESTORE
is_archived = changes.get("is_archived")
if isinstance(is_archived, (list, tuple)) and len(is_archived) >= 2:
if is_archived[0] in {False, "False", "false", None, "None"} and is_archived[1] in {True, "True", "true"}:
return EVENT_ARCHIVE
if is_archived[0] in {True, "True", "true"} and is_archived[1] in {False, "False", "false", None, "None"}:
return EVENT_UNARCHIVE
is_active = changes.get("is_active")
if isinstance(is_active, (list, tuple)) and len(is_active) >= 2:
if is_active[0] in {False, "False", "false", None, "None"} and is_active[1] in {True, "True", "true"}:
return EVENT_ACTIVATE
if is_active[0] in {True, "True", "true"} and is_active[1] in {False, "False", "false", None, "None"}:
return EVENT_DEACTIVATE
if entry.action == LogEntry.Action.CREATE:
return EVENT_CREATE
return EVENT_UPDATE
def _stringify_change_value(value) -> str | None:
if value in (None, "", "None"):
return None
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, list):
return ", ".join(str(item) for item in value if item not in (None, ""))
return str(value)
def get_log_change_rows(entry: LogEntry) -> list[dict]:
changes = entry.changes or {}
display_changes = {}
try:
display_changes = entry.changes_display_dict or {}
except Exception:
display_changes = {}
rows = []
for field_name, raw_value in changes.items():
if isinstance(raw_value, dict) and raw_value.get("type") == "m2m":
operation = raw_value.get("operation", "update")
objects = [str(item) for item in raw_value.get("objects", [])]
rows.append(
{
"field": field_name,
"label": field_name.replace("_", " ").title(),
"change_type": "m2m",
"operation": operation,
"old_value": None,
"new_value": ", ".join(objects) if objects else None,
"summary": f"{operation.title()}: {', '.join(objects)}" if objects else operation.title(),
}
)
continue
old_value = None
new_value = None
display_value = display_changes.get(field_name.replace("_", " ").title()) or display_changes.get(field_name)
if isinstance(display_value, (list, tuple)) and len(display_value) >= 2:
old_value = _stringify_change_value(display_value[0])
new_value = _stringify_change_value(display_value[1])
elif isinstance(raw_value, (list, tuple)) and len(raw_value) >= 2:
old_value = _stringify_change_value(raw_value[0])
new_value = _stringify_change_value(raw_value[1])
rows.append(
{
"field": field_name,
"label": field_name.replace("_", " ").title(),
"change_type": "field",
"operation": "replace",
"old_value": old_value,
"new_value": new_value,
"summary": f"{field_name}: {old_value or '-'} -> {new_value or '-'}",
}
)
return rows
def get_log_preview_changes(entry: LogEntry, limit: int = 3) -> list[dict]:
return [
{
"field": row["field"],
"label": row["label"],
"summary": row["summary"],
}
for row in get_log_change_rows(entry)[:limit]
]
def is_visible_workspace_log(entry: LogEntry) -> bool:
if entry.actor_id is None:
return False
section = get_log_section(entry)
if section not in LOG_SECTIONS:
return False
additional_data = entry.additional_data or {}
if section == SECTION_REPORT_EXPORTS and entry.actor_id is None:
return False
if (
section == SECTION_WORKSPACE_MEMBERS
and normalize_log_event(entry) == EVENT_CREATE
and additional_data.get("canonical_owner_membership") is True
):
return False
return True
def filter_workspace_logs(queryset: QuerySet, filters: WorkspaceLogFilters) -> QuerySet:
queryset = queryset.filter(additional_data__workspace_id=filters.workspace_id)
if filters.section:
queryset = queryset.filter(additional_data__section=filters.section)
if filters.actor_id:
queryset = queryset.filter(actor_id=filters.actor_id)
if filters.from_value:
queryset = queryset.filter(timestamp__gte=filters.from_value)
if filters.to_value:
queryset = queryset.filter(timestamp__lte=filters.to_value)
queryset = queryset.annotate(changes_json_text=Cast("changes", TextField()))
if filters.search:
queryset = queryset.filter(
Q(object_repr__icontains=filters.search)
| Q(changes_text__icontains=filters.search)
| Q(changes_json_text__icontains=filters.search)
| Q(actor__first_name__icontains=filters.search)
| Q(actor__last_name__icontains=filters.search)
| Q(actor__mobile__icontains=filters.search)
| Q(additional_data__target_label__icontains=filters.search)
)
queryset = queryset.order_by(filters.ordering)
if filters.event:
matching_ids = [
entry.id
for entry in queryset
if normalize_log_event(entry) == filters.event and is_visible_workspace_log(entry)
]
return queryset.filter(id__in=matching_ids)
matching_ids = [entry.id for entry in queryset if is_visible_workspace_log(entry)]
return queryset.filter(id__in=matching_ids)
def build_log_actor_payload(entry: LogEntry) -> dict | None:
actor = entry.actor
if not actor:
return None
full_name = actor.full_name if getattr(actor, "full_name", "").strip() else actor.mobile
return {
"id": str(actor.id),
"full_name": full_name,
"mobile": actor.mobile,
"profile_picture": actor.profile_picture.url if getattr(actor, "profile_picture", None) else None,
}
def build_log_target_payload(entry: LogEntry) -> dict:
additional_data = entry.additional_data or {}
return {
"id": str(additional_data.get("target_id") or entry.object_pk),
"name": additional_data.get("target_label") or entry.object_repr,
"section": get_log_section(entry),
"workspace_id": get_log_workspace_id(entry),
}
def serialize_workspace_log_list_item(entry: LogEntry) -> dict:
rows = get_log_change_rows(entry)
return {
"id": entry.id,
"timestamp": timezone.localtime(entry.timestamp).isoformat(),
"section": get_log_section(entry),
"model": get_log_model_label(entry),
"event": normalize_log_event(entry),
"audit_action": entry.get_action_display(),
"actor": build_log_actor_payload(entry),
"target": build_log_target_payload(entry),
"changed_fields": [row["field"] for row in rows],
"preview_changes": get_log_preview_changes(entry),
}
def serialize_workspace_log_detail(entry: LogEntry) -> dict:
payload = serialize_workspace_log_list_item(entry)
payload.update(
{
"remote_addr": entry.remote_addr,
"changes": get_log_change_rows(entry),
"raw_changes": entry.changes or {},
"serialized_snapshot": entry.serialized_data,
"additional_data": entry.additional_data or {},
}
)
return payload
def parse_filter_datetime(value: str | None, *, is_end: bool = False) -> datetime | None:
if not value:
return None
parsed_datetime = parse_datetime(value)
if parsed_datetime is not None:
if timezone.is_naive(parsed_datetime):
return timezone.make_aware(parsed_datetime, timezone.get_current_timezone())
return parsed_datetime
parsed_date = parse_date(value)
if parsed_date is None:
return None
boundary = time.max if is_end else time.min
return timezone.make_aware(
datetime.combine(parsed_date, boundary),
timezone.get_current_timezone(),
)