Files
qlockify-backend-deployment/core/services/cache.py

125 lines
3.6 KiB
Python

import hashlib
import json
from collections.abc import Callable, Mapping
from typing import Any
from django.core.cache import cache
CACHE_NAMESPACE_REPORTS = "reports"
CACHE_NAMESPACE_WORKSPACE_MEMBERSHIPS = "workspace-memberships"
CACHE_NAMESPACE_WORKSPACE_RATES = "workspace-rates"
CACHE_NAMESPACE_PRICE_UNITS = "price-units"
_CACHE_VERSION_TTL_SECONDS = 60 * 60 * 24 * 30
def _stringify_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "true" if value else "false"
return str(value)
def normalize_query_params(params: Any) -> dict[str, list[str]]:
if hasattr(params, "lists"):
raw_items = params.lists()
elif isinstance(params, Mapping):
raw_items = params.items()
else:
raw_items = []
normalized: dict[str, list[str]] = {}
for key, value in raw_items:
if isinstance(value, (list, tuple)):
values = [_stringify_value(item) for item in value if item is not None]
else:
values = [_stringify_value(value)]
normalized[str(key)] = sorted(values)
return dict(sorted(normalized.items()))
def get_namespace_version(namespace: str, workspace_id: str | None = None) -> int:
scope = workspace_id or "global"
cache_key = f"cache-version:{namespace}:{scope}"
version = cache.get(cache_key)
if version is None:
cache.set(cache_key, 1, timeout=_CACHE_VERSION_TTL_SECONDS)
return 1
return int(version)
def bump_namespace_version(namespace: str, workspace_id: str | None = None) -> int:
scope = workspace_id or "global"
cache_key = f"cache-version:{namespace}:{scope}"
version = cache.get(cache_key)
if version is None:
cache.set(cache_key, 2, timeout=_CACHE_VERSION_TTL_SECONDS)
return 2
try:
return int(cache.incr(cache_key))
except ValueError:
next_version = int(version) + 1
cache.set(cache_key, next_version, timeout=_CACHE_VERSION_TTL_SECONDS)
return next_version
def build_cache_key(
namespace: str,
*,
resource: str | None = None,
user_id: Any = None,
workspace_id: Any = None,
params: Any = None,
extra_versions: Mapping[str, int] | None = None,
) -> str:
normalized_params = normalize_query_params(params or {})
params_json = json.dumps(normalized_params, sort_keys=True, separators=(",", ":"))
params_hash = hashlib.md5(params_json.encode("utf-8")).hexdigest()
namespace_version = get_namespace_version(namespace, str(workspace_id) if workspace_id else None)
segments = [
namespace,
f"resource:{resource or 'default'}",
f"v{namespace_version}",
f"user:{user_id or 'anon'}",
f"workspace:{workspace_id or 'global'}",
]
if extra_versions:
for key, value in sorted(extra_versions.items()):
segments.append(f"{key}:v{value}")
segments.append(params_hash)
return ":".join(segments)
def get_or_set_cache_payload(
namespace: str,
*,
ttl_seconds: int,
builder: Callable[[], Any],
resource: str | None = None,
user_id: Any = None,
workspace_id: Any = None,
params: Any = None,
extra_versions: Mapping[str, int] | None = None,
) -> Any:
cache_key = build_cache_key(
namespace,
resource=resource,
user_id=user_id,
workspace_id=workspace_id,
params=params,
extra_versions=extra_versions,
)
payload = cache.get(cache_key)
if payload is not None:
return payload
payload = builder()
cache.set(cache_key, payload, timeout=ttl_seconds)
return payload