init
Some checks failed
CI/CD / Backend & Frontend Checks (push) Has been cancelled
CI/CD / Deploy to Production (push) Has been cancelled

This commit is contained in:
2026-05-18 11:34:07 +03:30
commit 7a8ddeabed
279 changed files with 37390 additions and 0 deletions

View File

@@ -0,0 +1,724 @@
import json
import shutil
import tempfile
import uuid
from datetime import timedelta
from unittest import mock
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, override_settings
from django.utils import timezone
import jwt
from users.models import User, Major, University
class UsersAPIIntegrationTests(TestCase):
password = "Sup3rSecure!123"
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.major_cs, _ = Major.objects.get_or_create(
code="CS", defaults={"name": "Computer Science"}
)
cls.major_gil, _ = Major.objects.get_or_create(
code="GIL_CS", defaults={"name": "Gilan Computer Science"}
)
cls.university_ut, _ = University.objects.get_or_create(
code="UT", defaults={"name": "University of Tehran"}
)
cls.university_gilan, _ = University.objects.get_or_create(
code="GILAN", defaults={"name": "Gilan University"}
)
def setUp(self):
super().setUp()
patchers = [
mock.patch("users.tasks.send_verification_email.delay"),
mock.patch("users.signals.send_verification_email.delay"),
mock.patch("users.tasks.send_password_reset_email.delay"),
]
(
self.mock_send_verification_task,
self.mock_signal_verification_task,
self.mock_password_reset_task,
) = [patcher.start() for patcher in patchers]
for patcher in patchers:
self.addCleanup(patcher.stop)
# Helper utilities -----------------------------------------------------
def _numeric_student_id(self) -> str:
return str(uuid.uuid4().int)[-10:]
def _resolve_major(self, value):
if value is None:
return None
if isinstance(value, Major):
return value
return Major.objects.filter(code=value).first()
def _resolve_university(self, value):
if value is None:
return None
if isinstance(value, University):
return value
return University.objects.filter(code=value).first()
def _create_user(self, **overrides) -> User:
unique = uuid.uuid4().hex[:8]
defaults = {
"username": f"user_{unique}",
"email": f"{unique}@example.com",
"student_id": self._numeric_student_id(),
"first_name": "Test",
"last_name": "User",
"year_of_study": 2,
"major": self.major_cs,
"university": self.university_ut,
}
defaults.update(overrides)
if isinstance(defaults.get("major"), str):
defaults["major"] = self._resolve_major(defaults["major"])
if isinstance(defaults.get("university"), str):
defaults["university"] = self._resolve_university(defaults["university"])
password = defaults.pop("password", self.password)
return User.objects.create_user(password=password, **defaults)
def _auth_headers(self, token: str) -> dict:
return {"HTTP_AUTHORIZATION": f"Bearer {token}"}
def _login_and_get_tokens(self, user: User, password: str | None = None) -> dict:
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": password or self.password}),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
return response.json()
def _refresh_token_value(self, user: User | None = None, **overrides) -> str:
now = timezone.now()
payload = {
"type": "refresh",
"exp": now + timedelta(minutes=5),
"iat": now,
}
if user is not None:
payload["user_id"] = user.id
payload.update(overrides)
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
# Registration ---------------------------------------------------------
def test_register_creates_user_and_enqueues_signal(self):
# Arrange
payload = {
"username": "integration_user",
"email": "integration@example.com",
"password": "RegisterPass!9",
"student_id": "2023123456",
"first_name": "Integration",
"last_name": "Tester",
"university": self.university_ut.code,
"major": self.major_cs.code,
"year_of_study": 3,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 201)
self.assertTrue(User.objects.filter(email=payload["email"]).exists())
self.assertTrue(self.mock_signal_verification_task.called)
def test_register_rejects_short_student_id(self):
# Arrange
payload = {
"username": "short_id",
"email": "short@example.com",
"password": "RegisterPass!9",
"student_id": "123456789", # 9 digits
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_username(self):
# Arrange
existing = self._create_user(username="duplicate")
payload = {
"username": existing.username,
"email": "someone@example.com",
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_email(self):
# Arrange
existing = self._create_user(email="duplicate@example.com")
payload = {
"username": "newuser",
"email": existing.email,
"password": "RegisterPass!9",
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
def test_register_rejects_duplicate_student_id_in_same_university(self):
# Arrange
student_id = "2023012345"
self._create_user(student_id=student_id, university=self.university_gilan)
payload = {
"username": "dupstudent",
"email": "dupstudent@example.com",
"password": "RegisterPass!9",
"student_id": student_id,
"university": self.university_gilan.code,
}
# Act
response = self.client.post(
"/api/auth/register", data=json.dumps(payload), content_type="application/json"
)
# Assert
self.assertEqual(response.status_code, 400)
# Login & Refresh ------------------------------------------------------
def test_login_returns_tokens_for_verified_user(self):
# Arrange
user = self._create_user()
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
body = response.json()
self.assertIn("access_token", body)
self.assertIn("refresh_token", body)
def test_login_rejects_unverified_user(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_login_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
# Act
response = self.client.post(
"/api/auth/login",
data=json.dumps({"email": user.email, "password": self.password}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_returns_tokens(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["refresh_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
refreshed = response.json()
self.assertIn("access_token", refreshed)
self.assertIn("refresh_token", refreshed)
def test_refresh_rejects_non_refresh_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": tokens["access_token"]}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_missing_user_id(self):
# Arrange
token = self._refresh_token_value()
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_unverified_user(self):
# Arrange
user = self._create_user()
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_inactive_user(self):
# Arrange
user = self._create_user(is_email_verified=True, is_active=False)
token = self._refresh_token_value(user=user)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_expired_token(self):
# Arrange
user = self._create_user(is_email_verified=True)
token = self._refresh_token_value(
user=user,
exp=timezone.now() - timedelta(minutes=1),
)
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
def test_refresh_rejects_invalid_token_string(self):
# Arrange
token = "not-a-valid-token"
# Act
response = self.client.post(
"/api/auth/refresh",
data=json.dumps({"refresh_token": token}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 401)
# Email verification ---------------------------------------------------
def test_verify_email_marks_user_verified(self):
# Arrange
user = self._create_user()
token = str(user.email_verification_token)
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertTrue(user.is_email_verified)
def test_verify_email_rejects_unknown_token(self):
# Arrange
token = uuid.uuid4()
# Act
response = self.client.get(f"/api/auth/verify-email/{token}")
# Assert
self.assertEqual(response.status_code, 404)
def test_resend_verification_rejects_unknown_email(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(f"/api/auth/resend-verification?email={payload['email']}")
# Assert
self.assertEqual(response.status_code, 404)
# Profiles -------------------------------------------------------------
def test_get_profile_returns_schema_fields(self):
# Arrange
user = self._create_user(major=self.major_cs, university=self.university_gilan)
user.is_email_verified = True
user.save(update_fields=["is_email_verified"])
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get("/api/auth/profile", **self._auth_headers(tokens["access_token"]))
# Assert
self.assertEqual(response.status_code, 200)
profile = response.json()
self.assertEqual(profile["major"], user.get_major_display())
self.assertEqual(profile["university"], user.get_university_display())
def test_get_profile_requires_authentication(self):
# Arrange
# No token supplied.
# Act
response = self.client.get("/api/auth/profile")
# Assert
self.assertEqual(response.status_code, 401)
def test_update_profile_persists_changes(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
payload = {"bio": "Updated bio", "year_of_study": 4}
# Act
response = self.client.put(
"/api/auth/profile",
data=json.dumps(payload),
content_type="application/json",
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertEqual(user.bio, payload["bio"])
self.assertEqual(user.year_of_study, payload["year_of_study"])
@override_settings(MEDIA_URL="/media/", MEDIA_ROOT=tempfile.gettempdir())
def test_upload_profile_picture_succeeds(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
image = SimpleUploadedFile(
"avatar.png", b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR", content_type="image/png"
)
# Act
response = self.client.post(
"/api/auth/profile/picture", {"file": image}, **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
profile = self.client.get(
"/api/auth/profile", **self._auth_headers(tokens["access_token"])
).json()
self.assertIn("profile_pictures", profile["profile_picture"])
def test_upload_profile_picture_requires_file(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_invalid_type(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
text_file = SimpleUploadedFile("doc.txt", b"text", content_type="text/plain")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": text_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_upload_profile_picture_rejects_large_files(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
large_content = b"x" * (5 * 1024 * 1024 + 1)
large_file = SimpleUploadedFile("large.png", large_content, content_type="image/png")
# Act
response = self.client.post(
"/api/auth/profile/picture",
{"file": large_file},
**self._auth_headers(tokens["access_token"]),
)
# Assert
self.assertEqual(response.status_code, 400)
def test_delete_profile_picture_removes_file(self):
# Arrange
temp_media = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_media, ignore_errors=True))
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
with override_settings(MEDIA_ROOT=temp_media, MEDIA_URL="/media/"):
image = SimpleUploadedFile("avatar.png", b"data", content_type="image/png")
self.client.post(
"/api/auth/profile/picture",
{"file": image},
**self._auth_headers(tokens["access_token"]),
)
# Act
response = self.client.delete(
"/api/auth/profile/picture", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertFalse(bool(user.profile_picture))
# Password reset ------------------------------------------------------
def test_request_password_reset_enqueues_email(self):
# Arrange
user = self._create_user()
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps({"email": user.email}),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNotNone(user.password_reset_token)
self.mock_password_reset_task.assert_called_once()
def test_request_password_reset_unknown_email_returns_error(self):
# Arrange
payload = {"email": "missing@example.com"}
# Act
response = self.client.post(
"/api/auth/request-password-reset",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_updates_credentials(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
payload = {"token": str(user.password_reset_token), "new_password": "BrandNewPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 200)
user.refresh_from_db()
self.assertIsNone(user.password_reset_token)
self.assertTrue(user.check_password(payload["new_password"]))
def test_reset_password_confirm_rejects_expired_token(self):
# Arrange
user = self._create_user()
user.set_password_reset_token()
user.password_reset_token_expires_at = timezone.now() - timedelta(minutes=1)
user.save(update_fields=["password_reset_token_expires_at"])
payload = {"token": str(user.password_reset_token), "new_password": "New!!!Pass"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
def test_reset_password_confirm_rejects_unknown_token(self):
# Arrange
payload = {"token": str(uuid.uuid4()), "new_password": "AnotherPass!9"}
# Act
response = self.client.post(
"/api/auth/reset-password-confirm",
data=json.dumps(payload),
content_type="application/json",
)
# Assert
self.assertEqual(response.status_code, 400)
# Admin utilities -----------------------------------------------------
def test_list_deleted_users_requires_privileged_user(self):
# Arrange
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_list_deleted_users_returns_payload_for_staff(self):
# Arrange
deleted = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.get(
"/api/auth/users/deleted", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertTrue(any(item["id"] == deleted.id for item in payload))
def test_restore_user_requires_privileged_user(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
user = self._create_user(is_email_verified=True)
tokens = self._login_and_get_tokens(user)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 403)
def test_restore_user_restores_record_for_staff(self):
# Arrange
target = self._create_user(is_deleted=True, deleted_at=timezone.now())
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
f"/api/auth/users/{target.id}/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 200)
target.refresh_from_db()
self.assertFalse(target.is_deleted)
def test_restore_user_missing_returns_error(self):
# Arrange
staff = self._create_user(is_email_verified=True, is_staff=True)
tokens = self._login_and_get_tokens(staff)
# Act
response = self.client.post(
"/api/auth/users/999/restore", **self._auth_headers(tokens["access_token"])
)
# Assert
self.assertEqual(response.status_code, 400)
# Username checks ------------------------------------------------------
def test_check_username_reports_existing(self):
# Arrange
user = self._create_user()
# Act
response = self.client.get("/api/auth/check-username", {"username": user.username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertTrue(response.json()["exists"])
def test_check_username_reports_availability(self):
# Arrange
username = "available_user"
# Act
response = self.client.get("/api/auth/check-username", {"username": username})
# Assert
self.assertEqual(response.status_code, 200)
self.assertFalse(response.json()["exists"])