298 lines
10 KiB
Python
298 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, time
|
|
|
|
from django.db.models import Q, QuerySet, TextField
|
|
from django.db.models.functions import Cast
|
|
from django.utils import timezone
|
|
from django.utils.dateparse import parse_date, parse_datetime
|
|
|
|
from auditlog.models import LogEntry
|
|
|
|
from apps.logs.services.constants import (
|
|
EVENT_ACTIVATE,
|
|
EVENT_ARCHIVE,
|
|
EVENT_CREATE,
|
|
EVENT_DEACTIVATE,
|
|
EVENT_DELETE,
|
|
EVENT_RESTORE,
|
|
EVENT_UNARCHIVE,
|
|
EVENT_UPDATE,
|
|
LOG_EVENTS,
|
|
LOG_SECTIONS,
|
|
SECTION_BY_MODEL_LABEL,
|
|
SECTION_REPORT_EXPORTS,
|
|
SECTION_WORKSPACE_MEMBERS,
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WorkspaceLogFilters:
|
|
workspace_id: str
|
|
section: str | None = None
|
|
actor_id: str | None = None
|
|
event: str | None = None
|
|
search: str | None = None
|
|
from_value: datetime | None = None
|
|
to_value: datetime | None = None
|
|
ordering: str = "-timestamp"
|
|
|
|
|
|
def get_log_model_label(entry: LogEntry) -> str | None:
|
|
if not entry.content_type_id:
|
|
return None
|
|
return f"{entry.content_type.app_label}.{entry.content_type.model}"
|
|
|
|
|
|
def get_log_section(entry: LogEntry) -> str | None:
|
|
additional_data = entry.additional_data or {}
|
|
section = additional_data.get("section")
|
|
if section in LOG_SECTIONS:
|
|
return section
|
|
return SECTION_BY_MODEL_LABEL.get(get_log_model_label(entry) or "")
|
|
|
|
|
|
def get_log_workspace_id(entry: LogEntry) -> str | None:
|
|
additional_data = entry.additional_data or {}
|
|
workspace_id = additional_data.get("workspace_id")
|
|
return str(workspace_id) if workspace_id else None
|
|
|
|
|
|
def normalize_log_event(entry: LogEntry) -> str:
|
|
changes = entry.changes or {}
|
|
|
|
is_deleted = changes.get("is_deleted")
|
|
if isinstance(is_deleted, (list, tuple)) and len(is_deleted) >= 2:
|
|
if is_deleted[0] in {False, "False", "false", None, "None"} and is_deleted[1] in {True, "True", "true"}:
|
|
return EVENT_DELETE
|
|
if is_deleted[0] in {True, "True", "true"} and is_deleted[1] in {False, "False", "false", None, "None"}:
|
|
return EVENT_RESTORE
|
|
|
|
is_archived = changes.get("is_archived")
|
|
if isinstance(is_archived, (list, tuple)) and len(is_archived) >= 2:
|
|
if is_archived[0] in {False, "False", "false", None, "None"} and is_archived[1] in {True, "True", "true"}:
|
|
return EVENT_ARCHIVE
|
|
if is_archived[0] in {True, "True", "true"} and is_archived[1] in {False, "False", "false", None, "None"}:
|
|
return EVENT_UNARCHIVE
|
|
|
|
is_active = changes.get("is_active")
|
|
if isinstance(is_active, (list, tuple)) and len(is_active) >= 2:
|
|
if is_active[0] in {False, "False", "false", None, "None"} and is_active[1] in {True, "True", "true"}:
|
|
return EVENT_ACTIVATE
|
|
if is_active[0] in {True, "True", "true"} and is_active[1] in {False, "False", "false", None, "None"}:
|
|
return EVENT_DEACTIVATE
|
|
|
|
if entry.action == LogEntry.Action.CREATE:
|
|
return EVENT_CREATE
|
|
return EVENT_UPDATE
|
|
|
|
|
|
def _stringify_change_value(value) -> str | None:
|
|
if value in (None, "", "None"):
|
|
return None
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
if isinstance(value, list):
|
|
return ", ".join(str(item) for item in value if item not in (None, ""))
|
|
return str(value)
|
|
|
|
|
|
def get_log_change_rows(entry: LogEntry) -> list[dict]:
|
|
changes = entry.changes or {}
|
|
display_changes = {}
|
|
try:
|
|
display_changes = entry.changes_display_dict or {}
|
|
except Exception:
|
|
display_changes = {}
|
|
|
|
rows = []
|
|
for field_name, raw_value in changes.items():
|
|
if isinstance(raw_value, dict) and raw_value.get("type") == "m2m":
|
|
operation = raw_value.get("operation", "update")
|
|
objects = [str(item) for item in raw_value.get("objects", [])]
|
|
rows.append(
|
|
{
|
|
"field": field_name,
|
|
"label": field_name.replace("_", " ").title(),
|
|
"change_type": "m2m",
|
|
"operation": operation,
|
|
"old_value": None,
|
|
"new_value": ", ".join(objects) if objects else None,
|
|
"summary": f"{operation.title()}: {', '.join(objects)}" if objects else operation.title(),
|
|
}
|
|
)
|
|
continue
|
|
|
|
old_value = None
|
|
new_value = None
|
|
display_value = display_changes.get(field_name.replace("_", " ").title()) or display_changes.get(field_name)
|
|
if isinstance(display_value, (list, tuple)) and len(display_value) >= 2:
|
|
old_value = _stringify_change_value(display_value[0])
|
|
new_value = _stringify_change_value(display_value[1])
|
|
elif isinstance(raw_value, (list, tuple)) and len(raw_value) >= 2:
|
|
old_value = _stringify_change_value(raw_value[0])
|
|
new_value = _stringify_change_value(raw_value[1])
|
|
|
|
rows.append(
|
|
{
|
|
"field": field_name,
|
|
"label": field_name.replace("_", " ").title(),
|
|
"change_type": "field",
|
|
"operation": "replace",
|
|
"old_value": old_value,
|
|
"new_value": new_value,
|
|
"summary": f"{field_name}: {old_value or '-'} -> {new_value or '-'}",
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def get_log_preview_changes(entry: LogEntry, limit: int = 3) -> list[dict]:
|
|
return [
|
|
{
|
|
"field": row["field"],
|
|
"label": row["label"],
|
|
"summary": row["summary"],
|
|
}
|
|
for row in get_log_change_rows(entry)[:limit]
|
|
]
|
|
|
|
|
|
def is_visible_workspace_log(entry: LogEntry) -> bool:
|
|
if entry.actor_id is None:
|
|
return False
|
|
|
|
section = get_log_section(entry)
|
|
if section not in LOG_SECTIONS:
|
|
return False
|
|
|
|
additional_data = entry.additional_data or {}
|
|
if section == SECTION_REPORT_EXPORTS and entry.actor_id is None:
|
|
return False
|
|
|
|
if (
|
|
section == SECTION_WORKSPACE_MEMBERS
|
|
and normalize_log_event(entry) == EVENT_CREATE
|
|
and additional_data.get("canonical_owner_membership") is True
|
|
):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def filter_workspace_logs(queryset: QuerySet, filters: WorkspaceLogFilters) -> QuerySet:
|
|
queryset = queryset.filter(additional_data__workspace_id=filters.workspace_id)
|
|
|
|
if filters.section:
|
|
queryset = queryset.filter(additional_data__section=filters.section)
|
|
|
|
if filters.actor_id:
|
|
queryset = queryset.filter(actor_id=filters.actor_id)
|
|
|
|
if filters.from_value:
|
|
queryset = queryset.filter(timestamp__gte=filters.from_value)
|
|
|
|
if filters.to_value:
|
|
queryset = queryset.filter(timestamp__lte=filters.to_value)
|
|
|
|
queryset = queryset.annotate(changes_json_text=Cast("changes", TextField()))
|
|
|
|
if filters.search:
|
|
queryset = queryset.filter(
|
|
Q(object_repr__icontains=filters.search)
|
|
| Q(changes_text__icontains=filters.search)
|
|
| Q(changes_json_text__icontains=filters.search)
|
|
| Q(actor__first_name__icontains=filters.search)
|
|
| Q(actor__last_name__icontains=filters.search)
|
|
| Q(actor__mobile__icontains=filters.search)
|
|
| Q(additional_data__target_label__icontains=filters.search)
|
|
)
|
|
|
|
queryset = queryset.order_by(filters.ordering)
|
|
if filters.event:
|
|
matching_ids = [
|
|
entry.id
|
|
for entry in queryset
|
|
if normalize_log_event(entry) == filters.event and is_visible_workspace_log(entry)
|
|
]
|
|
return queryset.filter(id__in=matching_ids)
|
|
|
|
matching_ids = [entry.id for entry in queryset if is_visible_workspace_log(entry)]
|
|
return queryset.filter(id__in=matching_ids)
|
|
|
|
|
|
def build_log_actor_payload(entry: LogEntry) -> dict | None:
|
|
actor = entry.actor
|
|
if not actor:
|
|
return None
|
|
full_name = actor.full_name if getattr(actor, "full_name", "").strip() else actor.mobile
|
|
return {
|
|
"id": str(actor.id),
|
|
"full_name": full_name,
|
|
"mobile": actor.mobile,
|
|
"profile_picture": actor.profile_picture.url if getattr(actor, "profile_picture", None) else None,
|
|
}
|
|
|
|
|
|
def build_log_target_payload(entry: LogEntry) -> dict:
|
|
additional_data = entry.additional_data or {}
|
|
return {
|
|
"id": str(additional_data.get("target_id") or entry.object_pk),
|
|
"name": additional_data.get("target_label") or entry.object_repr,
|
|
"section": get_log_section(entry),
|
|
"workspace_id": get_log_workspace_id(entry),
|
|
}
|
|
|
|
|
|
def serialize_workspace_log_list_item(entry: LogEntry) -> dict:
|
|
rows = get_log_change_rows(entry)
|
|
return {
|
|
"id": entry.id,
|
|
"timestamp": timezone.localtime(entry.timestamp).isoformat(),
|
|
"section": get_log_section(entry),
|
|
"model": get_log_model_label(entry),
|
|
"event": normalize_log_event(entry),
|
|
"audit_action": entry.get_action_display(),
|
|
"actor": build_log_actor_payload(entry),
|
|
"target": build_log_target_payload(entry),
|
|
"changed_fields": [row["field"] for row in rows],
|
|
"preview_changes": get_log_preview_changes(entry),
|
|
}
|
|
|
|
|
|
def serialize_workspace_log_detail(entry: LogEntry) -> dict:
|
|
payload = serialize_workspace_log_list_item(entry)
|
|
payload.update(
|
|
{
|
|
"remote_addr": entry.remote_addr,
|
|
"changes": get_log_change_rows(entry),
|
|
"raw_changes": entry.changes or {},
|
|
"serialized_snapshot": entry.serialized_data,
|
|
"additional_data": entry.additional_data or {},
|
|
}
|
|
)
|
|
return payload
|
|
|
|
|
|
def parse_filter_datetime(value: str | None, *, is_end: bool = False) -> datetime | None:
|
|
if not value:
|
|
return None
|
|
|
|
parsed_datetime = parse_datetime(value)
|
|
if parsed_datetime is not None:
|
|
if timezone.is_naive(parsed_datetime):
|
|
return timezone.make_aware(parsed_datetime, timezone.get_current_timezone())
|
|
return parsed_datetime
|
|
|
|
parsed_date = parse_date(value)
|
|
if parsed_date is None:
|
|
return None
|
|
|
|
boundary = time.max if is_end else time.min
|
|
return timezone.make_aware(
|
|
datetime.combine(parsed_date, boundary),
|
|
timezone.get_current_timezone(),
|
|
)
|
|
|