initial commit
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled

This commit is contained in:
2026-05-19 20:53:08 +03:30
commit 88b793ed9f
169 changed files with 16763 additions and 0 deletions

View File

View File

View File

@@ -0,0 +1,724 @@
import json
import shutil
import tempfile
import uuid
from datetime import timedelta
from unittest import mock
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, override_settings
from django.utils import timezone
import jwt
from apps.users.models import User, Major, University
class UsersAPIIntegrationTests(TestCase):
password = "Sup3rSecure!123"
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.major_cs, _ = Major.objects.get_or_create(
code="CS", defaults={"name": "Computer Science"}
)
cls.major_gil, _ = Major.objects.get_or_create(
code="GIL_CS", defaults={"name": "Gilan Computer Science"}
)
cls.university_ut, _ = University.objects.get_or_create(
code="UT", defaults={"name": "University of Tehran"}
)
cls.university_gilan, _ = University.objects.get_or_create(
code="GILAN", defaults={"name": "Gilan University"}
)
def setUp(self):
super().setUp()
patchers = [
mock.patch("apps.users.tasks.send_verification_email.delay"),
mock.patch("apps.users.signals.send_verification_email.delay"),
mock.patch("apps.users.tasks.send_password_reset_email.delay"),
]
(
self.mock_send_verification_task,
self.mock_signal_verification_task,
self.mock_password_reset_task,
) = [patcher.start() for patcher in patchers]
for patcher in patchers:
self.addCleanup(patcher.stop)
# Helper utilities -----------------------------------------------------
def _numeric_student_id(self) -> str:
return str(uuid.uuid4().int)[-10:]
def _resolve_major(self, value):
if value is None:
return None
if isinstance(value, Major):
return value
return Major.objects.filter(code=value).first()
def _resolve_university(self, value):
if value is None:
return None
if isinstance(value, University):
return value
return University.objects.filter(code=value).first()
def _create_user(self, **overrides) -> User:
unique = uuid.uuid4().hex[:8]
defaults = {
"username": f"user_{unique}",
"email": f"{unique}@example.com",
"student_id": self._numeric_student_id(),
"first_name": "Test",
"last_name": "User",
"year_of_study": 2,
"major": self.major_cs,
"university": self.university_ut,
}
defaults.update(overrides)
if isinstance(defaults.get("major"), str):
defaults["major"] = self._resolve_major(defaults["major"])
if isinstance(defaults.get("university"), str):
defaults["university"] = self._resolve_university(defaults["university"])
password = defaults.pop("password", self.password)
return User.objects.create_user(password=password, **defaults)
def _auth_headers(self, token: str) -> dict:
return {"HTTP_AUTHORIZATION": f"Bearer {token}"}
def _login_and_get_tokens(self, user: User, password: str | None = None) -> dict:
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": password or self.password}),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
return response.json()
def _refresh_token_value(self, user: User | None = None, **overrides) -> str:
now = timezone.now()
payload = {
"type": "refresh",
"exp": now + timedelta(minutes=5),
"iat": now,
}
if user is not None:
payload["user_id"] = user.id
payload.update(overrides)
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
# Registration ---------------------------------------------------------
def test_register_creates_user_and_enqueues_signal(self):
# Arrange
payload = {
"username": "integration_user",
"email": "integration@example.com",
"password": "RegisterPass!9",
"student_id": "2023123456",
"first_name": "Integration",
"last_name": "Tester",
"university": self.university_ut.code,
"major": self.major_cs.code,
"year_of_study": 3,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 201)
self.assertTrue(User.objects.filter(email=payload["email"]).exists())
self.assertTrue(self.mock_signal_verification_task.called)
def test_register_rejects_short_student_id(self):
# Arrange
payload = {
"username": "short_id",
"email": "short@example.com",
"password": "RegisterPass!9",
"student_id": "123456789", # 9 digits
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_username(self):
# Arrange
existing = self._create_user(username="duplicate")
payload = {
"username": existing.username,
"email": "someone@example.com",
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_email(self):
# Arrange
existing = self._create_user(email="duplicate@example.com")
payload = {
"username": "newuser",
"email": existing.email,
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_student_id_in_same_university(self):
# Arrange
student_id = "2023012345"
self._create_user(student_id=student_id, university=self.university_gilan)
payload = {
"username": "dupstudent",
"email": "dupstudent@example.com",
"password": "RegisterPass!9",
"student_id": student_id,
"university": self.university_gilan.code,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
# Login & Refresh ------------------------------------------------------
def test_login_returns_tokens_for_verified_user(self):
# Arrange
user = self._create_user()
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
body = response.json()
self.assertIn("access_token", body)
self.assertIn("refresh_token", body)
def test_login_rejects_unverified_user(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_login_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_returns_tokens(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["refresh_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
refreshed = response.json()
self.assertIn("access_token", refreshed)
self.assertIn("refresh_token", refreshed)
def test_refresh_rejects_non_refresh_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["access_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_missing_user_id(self):
# Arrange
token = self._refresh_token_value()
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_unverified_user(self):
# Arrange
user = self._create_user()
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_expired_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
token = self._refresh_token_value(
user=user,
exp=timezone.now() - timedelta(minutes=1),
)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_invalid_token_string(self):
# Arrange
token = "not-a-valid-token"
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
# Email verification ---------------------------------------------------
def test_verify_email_marks_user_verified(self):
# Arrange
user = self._create_user()
token = str(user.email_verification_token)
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertTrue(user.is_email_verified)
def test_verify_email_rejects_unknown_token(self):
# Arrange
token = uuid.uuid4()
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 404)
def test_resend_verification_rejects_unknown_email(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(f"/api/auth/resend-verification?email={payload['email']}")
# Assert
self.assertEqual(response.status_code, 404)
# Profiles -------------------------------------------------------------
def test_get_profile_returns_schema_fields(self):
# Arrange
user = self._create_user(major=self.major_cs, university=self.university_gilan)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get("/api/auth/profile", **self._auth_headers(tokens["access_token"]))
# Assert
self.assertEqual(response.status_code, 200)
profile = response.json()
self.assertEqual(profile["major"], user.get_major_display())
self.assertEqual(profile["university"], user.get_university_display())
def test_get_profile_requires_authentication(self):
# Arrange
# No token supplied.
# Act
response = self.client.get("/api/auth/profile")
# Assert
self.assertEqual(response.status_code, 401)
def test_update_profile_persists_changes(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
payload = {"bio": "Updated bio", "year_of_study": 4}
# Act
response = self.client.put(
"/api/auth/profile",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertEqual(user.bio, payload["bio"])
self.assertEqual(user.year_of_study, payload["year_of_study"])
@override_settings(MEDIA_URL="/media/", MEDIA_ROOT=tempfile.gettempdir())
def test_upload_profile_picture_succeeds(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
image = SimpleUploadedFile(
"avatar.png", b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR", content_type="image/png"
)
# Act
response = self.client.post(
"/api/auth/profile/picture", {"file": image}, **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
profile = self.client.get(
"/api/auth/profile", **self._auth_headers(tokens["access_token"])
).json()
self.assertIn("profile_pictures", profile["profile_picture"])
def test_upload_profile_picture_requires_file(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_invalid_type(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
text_file = SimpleUploadedFile("doc.txt", b"text", content_type="text/plain")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": text_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_large_files(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
large_content = b"x" * (5 * 1024 * 1024 + 1)
large_file = SimpleUploadedFile("large.png", large_content, content_type="image/png")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": large_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_delete_profile_picture_removes_file(self):
# Arrange
temp_media = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_media, ignore_errors=True))
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
with override_settings(MEDIA_ROOT=temp_media, MEDIA_URL="/media/"):
image = SimpleUploadedFile("avatar.png", b"data", content_type="image/png")
self.client.post(
"/api/auth/profile/picture",
{"file": image},
**self._auth_headers(tokens["access_token"]),
)
# Act
response = self.client.delete(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertFalse(bool(user.profile_picture))
# Password reset ------------------------------------------------------
def test_request_password_reset_enqueues_email(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps({"email": user.email}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNotNone(user.password_reset_token)
self.mock_password_reset_task.assert_called_once()
def test_request_password_reset_unknown_email_returns_error(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_updates_credentials(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
payload = {"token": str(user.password_reset_token), "new_password": "BrandNewPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNone(user.password_reset_token)
self.assertTrue(user.check_password(payload["new_password"]))
def test_reset_password_confirm_rejects_expired_token(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
user.password_reset_token_expires_at = timezone.now() - timedelta(minutes=1)
user.save(update_fields=["password_reset_token_expires_at"])
payload = {"token": str(user.password_reset_token), "new_password": "New!!!Pass"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_rejects_unknown_token(self):
# Arrange
payload = {"token": str(uuid.uuid4()), "new_password": "AnotherPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
# Admin utilities -----------------------------------------------------
def test_list_deleted_users_requires_privileged_user(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_list_deleted_users_returns_payload_for_staff(self):
# Arrange
deleted = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertTrue(any(item["id"] == deleted.id for item in payload))
def test_restore_user_requires_privileged_user(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_restore_user_restores_record_for_staff(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
target.refresh_from_db()
self.assertFalse(target.is_deleted)
def test_restore_user_missing_returns_error(self):
# Arrange
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
"/api/auth/users/999/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
# Username checks ------------------------------------------------------
def test_check_username_reports_existing(self):
# Arrange
user = self._create_user()
# Act
response = self.client.get("/api/auth/check-username", {"username": user.username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertTrue(response.json()["exists"])
def test_check_username_reports_availability(self):
# Arrange
username = "available_user"
# Act
response = self.client.get("/api/auth/check-username", {"username": username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertFalse(response.json()["exists"])

View File

View File

@@ -0,0 +1,400 @@
import uuid
from datetime import timedelta
from unittest import mock
from django.db.models.signals import post_save
from django.test import SimpleTestCase, TestCase, override_settings
from django.utils import timezone
from import_export.widgets import BooleanWidget
from apps.users.models import User, Major, University
from apps.users.resources import UserResource
from apps.users.signals import send_verification_email_on_registration
from apps.users.tasks import (
send_email_verified_success,
send_password_reset_email,
send_verification_email,
)
class UserFactoryMixin:
def _ensure_reference_objects(self):
if not hasattr(self, "_default_major"):
self._default_major, _ = Major.objects.get_or_create(
code="CS",
defaults={"name": "Computer Science"},
)
self._default_university, _ = University.objects.get_or_create(
code="UT",
defaults={"name": "University of Tehran"},
)
def _resolve_major(self, value):
if value is None:
return None
if isinstance(value, Major):
return value
obj, _ = Major.objects.get_or_create(code=value, defaults={"name": value})
return obj
def _resolve_university(self, value):
if value is None:
return None
if isinstance(value, University):
return value
obj, _ = University.objects.get_or_create(code=value, defaults={"name": value})
return obj
def create_user(self, **extra_fields):
self._ensure_reference_objects()
unique = uuid.uuid4().hex
data = {
"email": f"user_{unique}@example.com",
"username": f"user_{unique[:10]}",
"first_name": "Test",
"last_name": "User",
}
password = extra_fields.pop("password", "StrongPass!123")
major = extra_fields.pop("major", self._default_major)
university = extra_fields.pop("university", self._default_university)
if isinstance(major, str):
major = self._resolve_major(major)
if isinstance(university, str):
university = self._resolve_university(university)
data.update(extra_fields)
data.setdefault("major", major)
data.setdefault("university", university)
return User.objects.create_user(password=password, **data)
class UserModelTests(UserFactoryMixin, TestCase):
def setUp(self):
super().setUp()
patcher = mock.patch("apps.users.signals.send_verification_email.delay")
patcher.start()
self.addCleanup(patcher.stop)
def test_str_returns_full_name_with_email(self):
# Arrange
user = self.create_user(first_name="Ada", last_name="Lovelace")
# Act
result = str(user)
# Assert
expected = f"{user.get_full_name()} ({user.email})"
self.assertEqual(result, expected)
def test_get_full_name_handles_missing_names(self):
# Arrange
user = self.create_user(first_name="Grace", last_name="")
# Act
result = user.get_full_name()
# Assert
self.assertEqual(result, "Grace")
def test_regenerate_verification_token_generates_new_value(self):
# Arrange
user = self.create_user()
original_token = user.email_verification_token
# Act
user.regenerate_verification_token()
# Assert
self.assertNotEqual(user.email_verification_token, original_token)
def test_set_password_reset_token_assigns_future_expiry(self):
# Arrange
user = self.create_user()
frozen = timezone.now()
# Act
with mock.patch("apps.users.models.timezone.now", return_value=frozen):
user.set_password_reset_token()
# Assert
self.assertIsNotNone(user.password_reset_token)
self.assertEqual(
user.password_reset_token_expires_at,
frozen + timedelta(hours=1),
)
def test_save_triggers_verified_task_on_state_change(self):
# Arrange
user = self.create_user()
# Act
with mock.patch("apps.users.tasks.send_email_verified_success.delay") as mock_delay:
user.is_email_verified = True
user.save()
# Assert
mock_delay.assert_called_once_with(user.id)
def test_save_skips_task_when_already_verified(self):
# Arrange
user = self.create_user(is_email_verified=True)
# Act
with mock.patch("apps.users.tasks.send_email_verified_success.delay") as mock_delay:
user.bio = "Updated bio"
user.save()
# Assert
mock_delay.assert_not_called()
class UserSignalTests(TestCase):
def setUp(self):
super().setUp()
post_save.disconnect(send_verification_email_on_registration, sender=User)
self.addCleanup(
post_save.connect,
send_verification_email_on_registration,
User,
False,
)
@override_settings(FRONTEND_ROOT="https://frontend.example/")
@mock.patch("apps.users.signals.send_verification_email.delay")
@mock.patch("apps.users.signals.uuid.uuid4")
def test_signal_sets_username_timestamp_and_dispatches_email(
self,
mock_uuid,
mock_delay,
):
# Arrange
fake_uuid = uuid.UUID("12345678-1234-5678-1234-567812345678")
mock_uuid.return_value = fake_uuid
fake_now = timezone.now()
user = User.objects.create(
email="new.user@example.com",
username="",
password="pass",
is_email_verified=False,
)
# Act
with mock.patch("apps.users.signals.timezone.now", return_value=fake_now):
send_verification_email_on_registration(User, user, created=True)
# Assert
user.refresh_from_db()
self.assertEqual(user.username, str(fake_uuid)[:10])
self.assertEqual(user.email_verification_sent_at, fake_now)
expected_url = (
f"https://frontend.example/verify-email/{user.email_verification_token}"
)
mock_delay.assert_called_once_with(user.id, expected_url)
@override_settings(FRONTEND_ROOT="https://frontend.example/")
@mock.patch("apps.users.signals.send_verification_email.delay")
def test_signal_preserves_existing_username(self, mock_delay):
# Arrange
fake_now = timezone.now()
user = User.objects.create(
email="existing@example.com",
username="existing_name",
password="pass",
is_email_verified=False,
)
# Act
with mock.patch("apps.users.signals.timezone.now", return_value=fake_now):
send_verification_email_on_registration(User, user, created=True)
# Assert
user.refresh_from_db()
self.assertEqual(user.username, "existing_name")
self.assertEqual(user.email_verification_sent_at, fake_now)
mock_delay.assert_called_once()
@mock.patch("apps.users.signals.send_verification_email.delay")
def test_signal_skips_when_user_already_verified(self, mock_delay):
# Arrange
user = User.objects.create(
email="verified@example.com",
username="verified_user",
password="pass",
is_email_verified=True,
)
# Act
send_verification_email_on_registration(User, user, created=True)
# Assert
self.assertIsNone(user.email_verification_sent_at)
mock_delay.assert_not_called()
@mock.patch("apps.users.signals.send_verification_email.delay")
def test_signal_skips_when_email_missing(self, mock_delay):
# Arrange
user = User.objects.create(
email="",
username="no_email",
password="pass",
is_email_verified=False,
)
# Act
send_verification_email_on_registration(User, user, created=True)
# Assert
self.assertIsNone(user.email_verification_sent_at)
mock_delay.assert_not_called()
@mock.patch("apps.users.signals.send_verification_email.delay")
def test_signal_ignores_updates_to_existing_users(self, mock_delay):
# Arrange
user = User.objects.create(
email="existing-update@example.com",
username="existing_update",
password="pass",
is_email_verified=False,
)
# Act
send_verification_email_on_registration(User, user, created=False)
# Assert
self.assertIsNone(user.email_verification_sent_at)
mock_delay.assert_not_called()
class UserTaskTests(UserFactoryMixin, TestCase):
def setUp(self):
super().setUp()
patcher = mock.patch("apps.users.signals.send_verification_email.delay")
patcher.start()
self.addCleanup(patcher.stop)
@override_settings(DEFAULT_FROM_EMAIL="no-reply@example.com")
@mock.patch("apps.users.tasks.send_mail")
@mock.patch("apps.users.tasks.render_to_string", return_value="<p>Hi</p>")
def test_send_verification_email_task_sends_expected_payload(
self,
mock_render,
mock_send_mail,
):
# Arrange
user = self.create_user()
verification_url = "https://example.com/verify"
# Act
result = send_verification_email.run(user.id, verification_url)
# Assert
self.assertEqual(result, f"Verification email sent to {user.email}")
mock_render.assert_called_once_with(
"emails/verification_email.html",
{"user": user, "verification_url": verification_url},
)
kwargs = mock_send_mail.call_args.kwargs
self.assertEqual(kwargs["recipient_list"], [user.email])
self.assertEqual(kwargs["from_email"], "no-reply@example.com")
self.assertEqual(kwargs["message"], "Hi")
@override_settings(DEFAULT_FROM_EMAIL="support@example.com")
@mock.patch("apps.users.tasks.send_mail")
@mock.patch("apps.users.tasks.render_to_string", return_value="<p>Reset</p>")
def test_send_password_reset_email_task_uses_reset_template(
self,
mock_render,
mock_send_mail,
):
# Arrange
user = self.create_user()
reset_url = "https://example.com/reset"
# Act
result = send_password_reset_email.run(user.id, reset_url)
# Assert
self.assertEqual(result, f"Password reset email sent to {user.email}")
mock_render.assert_called_once_with(
"emails/password_reset_email.html",
{"user": user, "reset_url": reset_url},
)
kwargs = mock_send_mail.call_args.kwargs
self.assertEqual(kwargs["recipient_list"], [user.email])
self.assertEqual(kwargs["from_email"], "support@example.com")
self.assertEqual(kwargs["message"], "Reset")
@override_settings(
DEFAULT_FROM_EMAIL="success@example.com",
FRONTEND_ROOT="https://frontend.example/",
)
@mock.patch("apps.users.tasks.send_mail")
@mock.patch("apps.users.tasks.render_to_string", return_value="<p>Success</p>")
def test_send_email_verified_success_task_renders_success_template(
self,
mock_render,
mock_send_mail,
):
# Arrange
user = self.create_user()
# Act
result = send_email_verified_success.run(user.id)
# Assert
self.assertEqual(result, f"verified success email sent to {user.email}")
mock_render.assert_called_once_with(
"emails/verification_success.html",
{"user": user, "home_url": "https://frontend.example/"},
)
kwargs = mock_send_mail.call_args.kwargs
self.assertEqual(kwargs["recipient_list"], [user.email])
self.assertEqual(kwargs["from_email"], "success@example.com")
self.assertEqual(kwargs["message"], "Success")
def test_send_verification_email_task_retries_on_lookup_error(self):
# Arrange
retry_patch = mock.patch.object(
send_verification_email,
"retry",
side_effect=RuntimeError("retry"),
)
# Act / Assert
with mock.patch(
"apps.users.tasks.User.objects.get",
side_effect=ValueError("missing"),
), retry_patch as mock_retry:
with self.assertRaises(RuntimeError):
send_verification_email.run(999, "https://example.com/verify")
self.assertEqual(mock_retry.call_args.kwargs.get("countdown"), 60)
self.assertIsInstance(mock_retry.call_args.kwargs.get("exc"), ValueError)
class UserResourceTests(SimpleTestCase):
def test_boolean_fields_use_boolean_widget(self):
# Arrange
resource = UserResource()
# Act
widgets = [
resource.fields["is_staff"].widget,
resource.fields["is_superuser"].widget,
resource.fields["is_email_verified"].widget,
]
# Assert
for widget in widgets:
self.assertIsInstance(widget, BooleanWidget)
def test_field_order_matches_meta_definition(self):
# Arrange
resource = UserResource()
# Act
field_names = tuple(resource.fields.keys())
# Assert
self.assertEqual(resource._meta.export_order, resource._meta.fields)
self.assertSetEqual(set(field_names), set(resource._meta.fields))