272 lines
9.1 KiB
Python
272 lines
9.1 KiB
Python
from apps.notifications.services.store import RedisNotificationStore
|
|
|
|
|
|
def _actor_name(actor) -> str:
|
|
full_name = getattr(actor, "full_name", "").strip()
|
|
if full_name and full_name != "Anonymous":
|
|
return full_name
|
|
return getattr(actor, "mobile", str(actor))
|
|
|
|
|
|
def _role_label(role_enum, role_value: str) -> str:
|
|
try:
|
|
return str(role_enum(role_value).label).lower()
|
|
except Exception:
|
|
pass
|
|
with_labels = getattr(role_enum, "labels", None)
|
|
if isinstance(with_labels, dict):
|
|
return str(with_labels.get(role_value, role_value)).lower()
|
|
return str(role_value).replace("_", " ").lower()
|
|
|
|
|
|
def _should_skip(actor, recipient) -> bool:
|
|
return not recipient or str(actor.id) == str(recipient.id)
|
|
|
|
|
|
def _notify_user(recipient, payload: dict) -> None:
|
|
RedisNotificationStore.add(str(recipient.id), payload)
|
|
|
|
|
|
def _workspace_action_url(workspace) -> str:
|
|
return f"/workspaces/{workspace.id}"
|
|
|
|
|
|
def _project_action_url(project) -> str:
|
|
return "/projects"
|
|
|
|
|
|
def notify_workspace_membership_added(*, actor, recipient, workspace, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
role_label = _role_label(recipient.workspace_memberships.model.Role, role)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "workspace_membership_added",
|
|
"title": "Added to workspace",
|
|
"message": (
|
|
f"{actor_display} added you to {workspace.name} as {role_label}."
|
|
),
|
|
"level": "info",
|
|
"action_url": _workspace_action_url(workspace),
|
|
"entity_type": "workspace",
|
|
"entity_id": str(workspace.id),
|
|
"meta": {
|
|
"workspace_id": str(workspace.id),
|
|
"workspace_name": workspace.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"new_role": role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_workspace_membership_role_changed(
|
|
*, actor, recipient, workspace, previous_role: str, new_role: str
|
|
) -> None:
|
|
if _should_skip(actor, recipient) or previous_role == new_role:
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
previous_role_label = _role_label(recipient.workspace_memberships.model.Role, previous_role)
|
|
new_role_label = _role_label(recipient.workspace_memberships.model.Role, new_role)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "workspace_membership_role_changed",
|
|
"title": "Workspace role changed",
|
|
"message": (
|
|
f"{actor_display} changed your role in {workspace.name} "
|
|
f"from {previous_role_label} to {new_role_label}."
|
|
),
|
|
"level": "info",
|
|
"action_url": _workspace_action_url(workspace),
|
|
"entity_type": "workspace",
|
|
"entity_id": str(workspace.id),
|
|
"meta": {
|
|
"workspace_id": str(workspace.id),
|
|
"workspace_name": workspace.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": previous_role,
|
|
"new_role": new_role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_workspace_membership_deactivated(*, actor, recipient, workspace, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "workspace_membership_deactivated",
|
|
"title": "Workspace access deactivated",
|
|
"message": f"{actor_display} deactivated your access to {workspace.name}.",
|
|
"level": "warning",
|
|
"action_url": _workspace_action_url(workspace),
|
|
"entity_type": "workspace",
|
|
"entity_id": str(workspace.id),
|
|
"meta": {
|
|
"workspace_id": str(workspace.id),
|
|
"workspace_name": workspace.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_workspace_membership_removed(*, actor, recipient, workspace, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "workspace_membership_removed",
|
|
"title": "Removed from workspace",
|
|
"message": f"{actor_display} removed you from {workspace.name}.",
|
|
"level": "warning",
|
|
"action_url": _workspace_action_url(workspace),
|
|
"entity_type": "workspace",
|
|
"entity_id": str(workspace.id),
|
|
"meta": {
|
|
"workspace_id": str(workspace.id),
|
|
"workspace_name": workspace.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_project_membership_added(*, actor, recipient, project, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
role_label = _role_label(recipient.project_memberships.model.Role, role)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "project_membership_added",
|
|
"title": "Added to project",
|
|
"message": f"{actor_display} added you to {project.name} as {role_label}.",
|
|
"level": "info",
|
|
"action_url": _project_action_url(project),
|
|
"entity_type": "project",
|
|
"entity_id": str(project.id),
|
|
"meta": {
|
|
"workspace_id": str(project.workspace_id),
|
|
"workspace_name": project.workspace.name,
|
|
"project_id": str(project.id),
|
|
"project_name": project.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"new_role": role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_project_membership_role_changed(
|
|
*, actor, recipient, project, previous_role: str, new_role: str
|
|
) -> None:
|
|
if _should_skip(actor, recipient) or previous_role == new_role:
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
previous_role_label = _role_label(recipient.project_memberships.model.Role, previous_role)
|
|
new_role_label = _role_label(recipient.project_memberships.model.Role, new_role)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "project_membership_role_changed",
|
|
"title": "Project role changed",
|
|
"message": (
|
|
f"{actor_display} changed your role in {project.name} "
|
|
f"from {previous_role_label} to {new_role_label}."
|
|
),
|
|
"level": "info",
|
|
"action_url": _project_action_url(project),
|
|
"entity_type": "project",
|
|
"entity_id": str(project.id),
|
|
"meta": {
|
|
"workspace_id": str(project.workspace_id),
|
|
"workspace_name": project.workspace.name,
|
|
"project_id": str(project.id),
|
|
"project_name": project.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": previous_role,
|
|
"new_role": new_role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_project_membership_deactivated(*, actor, recipient, project, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "project_membership_deactivated",
|
|
"title": "Project access deactivated",
|
|
"message": f"{actor_display} deactivated your access to {project.name}.",
|
|
"level": "warning",
|
|
"action_url": _project_action_url(project),
|
|
"entity_type": "project",
|
|
"entity_id": str(project.id),
|
|
"meta": {
|
|
"workspace_id": str(project.workspace_id),
|
|
"workspace_name": project.workspace.name,
|
|
"project_id": str(project.id),
|
|
"project_name": project.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": role,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def notify_project_membership_removed(*, actor, recipient, project, role: str) -> None:
|
|
if _should_skip(actor, recipient):
|
|
return
|
|
|
|
actor_display = _actor_name(actor)
|
|
_notify_user(
|
|
recipient,
|
|
{
|
|
"type": "project_membership_removed",
|
|
"title": "Removed from project",
|
|
"message": f"{actor_display} removed you from {project.name}.",
|
|
"level": "warning",
|
|
"action_url": _project_action_url(project),
|
|
"entity_type": "project",
|
|
"entity_id": str(project.id),
|
|
"meta": {
|
|
"workspace_id": str(project.workspace_id),
|
|
"workspace_name": project.workspace.name,
|
|
"project_id": str(project.id),
|
|
"project_name": project.name,
|
|
"actor_id": str(actor.id),
|
|
"actor_name": actor_display,
|
|
"previous_role": role,
|
|
},
|
|
},
|
|
)
|