Files
guilan-ace-backend/apps/users/tests/integration/test_users.py
Amirhossein Khalili 88b793ed9f
Some checks failed
Backend CI/CD / test (push) Has been cancelled
Backend CI/CD / deploy (push) Has been cancelled
initial commit
2026-05-19 20:53:08 +03:30

725 lines
24 KiB
Python

import json
import shutil
import tempfile
import uuid
from datetime import timedelta
from unittest import mock
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, override_settings
from django.utils import timezone
import jwt
from apps.users.models import User, Major, University
class UsersAPIIntegrationTests(TestCase):
password = "Sup3rSecure!123"
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.major_cs, _ = Major.objects.get_or_create(
code="CS", defaults={"name": "Computer Science"}
)
cls.major_gil, _ = Major.objects.get_or_create(
code="GIL_CS", defaults={"name": "Gilan Computer Science"}
)
cls.university_ut, _ = University.objects.get_or_create(
code="UT", defaults={"name": "University of Tehran"}
)
cls.university_gilan, _ = University.objects.get_or_create(
code="GILAN", defaults={"name": "Gilan University"}
)
def setUp(self):
super().setUp()
patchers = [
mock.patch("apps.users.tasks.send_verification_email.delay"),
mock.patch("apps.users.signals.send_verification_email.delay"),
mock.patch("apps.users.tasks.send_password_reset_email.delay"),
]
(
self.mock_send_verification_task,
self.mock_signal_verification_task,
self.mock_password_reset_task,
) = [patcher.start() for patcher in patchers]
for patcher in patchers:
self.addCleanup(patcher.stop)
# Helper utilities -----------------------------------------------------
def _numeric_student_id(self) -> str:
return str(uuid.uuid4().int)[-10:]
def _resolve_major(self, value):
if value is None:
return None
if isinstance(value, Major):
return value
return Major.objects.filter(code=value).first()
def _resolve_university(self, value):
if value is None:
return None
if isinstance(value, University):
return value
return University.objects.filter(code=value).first()
def _create_user(self, **overrides) -> User:
unique = uuid.uuid4().hex[:8]
defaults = {
"username": f"user_{unique}",
"email": f"{unique}@example.com",
"student_id": self._numeric_student_id(),
"first_name": "Test",
"last_name": "User",
"year_of_study": 2,
"major": self.major_cs,
"university": self.university_ut,
}
defaults.update(overrides)
if isinstance(defaults.get("major"), str):
defaults["major"] = self._resolve_major(defaults["major"])
if isinstance(defaults.get("university"), str):
defaults["university"] = self._resolve_university(defaults["university"])
password = defaults.pop("password", self.password)
return User.objects.create_user(password=password, **defaults)
def _auth_headers(self, token: str) -> dict:
return {"HTTP_AUTHORIZATION": f"Bearer {token}"}
def _login_and_get_tokens(self, user: User, password: str | None = None) -> dict:
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": password or self.password}),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
return response.json()
def _refresh_token_value(self, user: User | None = None, **overrides) -> str:
now = timezone.now()
payload = {
"type": "refresh",
"exp": now + timedelta(minutes=5),
"iat": now,
}
if user is not None:
payload["user_id"] = user.id
payload.update(overrides)
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
# Registration ---------------------------------------------------------
def test_register_creates_user_and_enqueues_signal(self):
# Arrange
payload = {
"username": "integration_user",
"email": "integration@example.com",
"password": "RegisterPass!9",
"student_id": "2023123456",
"first_name": "Integration",
"last_name": "Tester",
"university": self.university_ut.code,
"major": self.major_cs.code,
"year_of_study": 3,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 201)
self.assertTrue(User.objects.filter(email=payload["email"]).exists())
self.assertTrue(self.mock_signal_verification_task.called)
def test_register_rejects_short_student_id(self):
# Arrange
payload = {
"username": "short_id",
"email": "short@example.com",
"password": "RegisterPass!9",
"student_id": "123456789", # 9 digits
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_username(self):
# Arrange
existing = self._create_user(username="duplicate")
payload = {
"username": existing.username,
"email": "someone@example.com",
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_email(self):
# Arrange
existing = self._create_user(email="duplicate@example.com")
payload = {
"username": "newuser",
"email": existing.email,
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_student_id_in_same_university(self):
# Arrange
student_id = "2023012345"
self._create_user(student_id=student_id, university=self.university_gilan)
payload = {
"username": "dupstudent",
"email": "dupstudent@example.com",
"password": "RegisterPass!9",
"student_id": student_id,
"university": self.university_gilan.code,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
# Login & Refresh ------------------------------------------------------
def test_login_returns_tokens_for_verified_user(self):
# Arrange
user = self._create_user()
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
body = response.json()
self.assertIn("access_token", body)
self.assertIn("refresh_token", body)
def test_login_rejects_unverified_user(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_login_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_returns_tokens(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["refresh_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
refreshed = response.json()
self.assertIn("access_token", refreshed)
self.assertIn("refresh_token", refreshed)
def test_refresh_rejects_non_refresh_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["access_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_missing_user_id(self):
# Arrange
token = self._refresh_token_value()
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_unverified_user(self):
# Arrange
user = self._create_user()
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_expired_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
token = self._refresh_token_value(
user=user,
exp=timezone.now() - timedelta(minutes=1),
)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_invalid_token_string(self):
# Arrange
token = "not-a-valid-token"
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
# Email verification ---------------------------------------------------
def test_verify_email_marks_user_verified(self):
# Arrange
user = self._create_user()
token = str(user.email_verification_token)
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertTrue(user.is_email_verified)
def test_verify_email_rejects_unknown_token(self):
# Arrange
token = uuid.uuid4()
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 404)
def test_resend_verification_rejects_unknown_email(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(f"/api/auth/resend-verification?email={payload['email']}")
# Assert
self.assertEqual(response.status_code, 404)
# Profiles -------------------------------------------------------------
def test_get_profile_returns_schema_fields(self):
# Arrange
user = self._create_user(major=self.major_cs, university=self.university_gilan)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get("/api/auth/profile", **self._auth_headers(tokens["access_token"]))
# Assert
self.assertEqual(response.status_code, 200)
profile = response.json()
self.assertEqual(profile["major"], user.get_major_display())
self.assertEqual(profile["university"], user.get_university_display())
def test_get_profile_requires_authentication(self):
# Arrange
# No token supplied.
# Act
response = self.client.get("/api/auth/profile")
# Assert
self.assertEqual(response.status_code, 401)
def test_update_profile_persists_changes(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
payload = {"bio": "Updated bio", "year_of_study": 4}
# Act
response = self.client.put(
"/api/auth/profile",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertEqual(user.bio, payload["bio"])
self.assertEqual(user.year_of_study, payload["year_of_study"])
@override_settings(MEDIA_URL="/media/", MEDIA_ROOT=tempfile.gettempdir())
def test_upload_profile_picture_succeeds(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
image = SimpleUploadedFile(
"avatar.png", b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR", content_type="image/png"
)
# Act
response = self.client.post(
"/api/auth/profile/picture", {"file": image}, **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
profile = self.client.get(
"/api/auth/profile", **self._auth_headers(tokens["access_token"])
).json()
self.assertIn("profile_pictures", profile["profile_picture"])
def test_upload_profile_picture_requires_file(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_invalid_type(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
text_file = SimpleUploadedFile("doc.txt", b"text", content_type="text/plain")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": text_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_large_files(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
large_content = b"x" * (5 * 1024 * 1024 + 1)
large_file = SimpleUploadedFile("large.png", large_content, content_type="image/png")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": large_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_delete_profile_picture_removes_file(self):
# Arrange
temp_media = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_media, ignore_errors=True))
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
with override_settings(MEDIA_ROOT=temp_media, MEDIA_URL="/media/"):
image = SimpleUploadedFile("avatar.png", b"data", content_type="image/png")
self.client.post(
"/api/auth/profile/picture",
{"file": image},
**self._auth_headers(tokens["access_token"]),
)
# Act
response = self.client.delete(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertFalse(bool(user.profile_picture))
# Password reset ------------------------------------------------------
def test_request_password_reset_enqueues_email(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps({"email": user.email}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNotNone(user.password_reset_token)
self.mock_password_reset_task.assert_called_once()
def test_request_password_reset_unknown_email_returns_error(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_updates_credentials(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
payload = {"token": str(user.password_reset_token), "new_password": "BrandNewPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNone(user.password_reset_token)
self.assertTrue(user.check_password(payload["new_password"]))
def test_reset_password_confirm_rejects_expired_token(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
user.password_reset_token_expires_at = timezone.now() - timedelta(minutes=1)
user.save(update_fields=["password_reset_token_expires_at"])
payload = {"token": str(user.password_reset_token), "new_password": "New!!!Pass"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_rejects_unknown_token(self):
# Arrange
payload = {"token": str(uuid.uuid4()), "new_password": "AnotherPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
# Admin utilities -----------------------------------------------------
def test_list_deleted_users_requires_privileged_user(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_list_deleted_users_returns_payload_for_staff(self):
# Arrange
deleted = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertTrue(any(item["id"] == deleted.id for item in payload))
def test_restore_user_requires_privileged_user(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_restore_user_restores_record_for_staff(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
target.refresh_from_db()
self.assertFalse(target.is_deleted)
def test_restore_user_missing_returns_error(self):
# Arrange
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
"/api/auth/users/999/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
# Username checks ------------------------------------------------------
def test_check_username_reports_existing(self):
# Arrange
user = self._create_user()
# Act
response = self.client.get("/api/auth/check-username", {"username": user.username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertTrue(response.json()["exists"])
def test_check_username_reports_availability(self):
# Arrange
username = "available_user"
# Act
response = self.client.get("/api/auth/check-username", {"username": username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertFalse(response.json()["exists"])