diff --git a/backend/cli/interactive.py b/backend/cli/interactive.py index 6717162..823b512 100644 --- a/backend/cli/interactive.py +++ b/backend/cli/interactive.py @@ -91,16 +91,16 @@ def animate_nimbusflow_text() -> None: """Animate the NimbusFlow ASCII text and frame with a gold shimmer effect.""" # Complete welcome screen lines including borders welcome_lines = [ - "╔════════════════════════════════════════════════════════════════════════════════════════════╗", - "║ ║", - "║ ███╗ ██╗██╗███╗ ███╗██████╗ ██╗ ██╗███████╗ ███████╗██╗ ██████╗ ██╗ ██╗ ║", - "║ ████╗ ██║██║████╗ ████║██╔══██╗██║ ██║██╔════╝ ██╔════╝██║ ██╔═══██╗██║ ██║ ║", - "║ ██╔██╗ ██║██║██╔████╔██║██████╔╝██║ ██║███████╗ █████╗ ██║ ██║ ██║██║ █╗ ██║ ║", - "║ ██║╚██╗██║██║██║╚██╔╝██║██╔══██╗██║ ██║╚════██║ ██╔══╝ ██║ ██║ ██║██║███╗██║ ║", - "║ ██║ ╚████║██║██║ ╚═╝ ██║██████╔╝╚██████╔╝███████║ ██║ ███████╗╚██████╔╝╚███╔███╔╝ ║", - "║ ╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ ║", - "║ ║", - "╚════════════════════════════════════════════════════════════════════════════════════════════╝" + "╔════════════════════════════════════════════════════════════════════════════════════════╗", + "║ ║", + "║ ███╗ ██╗██╗███╗ ███╗██████╗ ██╗ ██╗███████╗███████╗██╗ ██████╗ ██╗ ██╗ ║", + "║ ████╗ ██║██║████╗ ████║██╔══██╗██║ ██║██╔════╝██╔════╝██║ ██╔═══██╗██║ ██║ ║", + "║ ██╔██╗ ██║██║██╔████╔██║██████╔╝██║ ██║███████╗█████╗ ██║ ██║ ██║██║ █╗ ██║ ║", + "║ ██║╚██╗██║██║██║╚██╔╝██║██╔══██╗██║ ██║╚════██║██╔══╝ ██║ ██║ ██║██║███╗██║ ║", + "║ ██║ ╚████║██║██║ ╚═╝ ██║██████╔╝╚██████╔╝███████║██║ ███████╗╚██████╔╝╚███╔███╔╝ ║", + "║ ╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚══════╝╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ ║", + "║ ║", + "╚════════════════════════════════════════════════════════════════════════════════════════╝" ] # Calculate max width for animation diff --git a/backend/repositories/schedule.py b/backend/repositories/schedule.py index 555c573..150eeb9 100644 --- a/backend/repositories/schedule.py +++ b/backend/repositories/schedule.py @@ -48,12 +48,17 @@ class ScheduleRepository(BaseRepository[ScheduleModel]): scheduled_at, expires_at : datetime‑compatible | None ``scheduled_at`` defaults to SQLite’s ``CURRENT_TIMESTAMP``. """ + # Handle timestamp - use actual datetime if not provided + import datetime + if scheduled_at is None: + scheduled_at = datetime.datetime.now(datetime.UTC).isoformat() + schedule = ScheduleModel( ScheduleId=-1, # placeholder – will be replaced ServiceId=service_id, MemberId=member_id, Status=status.value, - ScheduledAt=scheduled_at or "CURRENT_TIMESTAMP", + ScheduledAt=scheduled_at, AcceptedAt=None, DeclinedAt=None, ExpiresAt=expires_at, @@ -110,8 +115,8 @@ class ScheduleRepository(BaseRepository[ScheduleModel]): params: list[Any] = [new_status.value] if new_status == ScheduleStatus.DECLINED: - set_clause = "Status = ?, DeclinedAt = ?, DeclineReason = ?" - params.extend(["CURRENT_TIMESTAMP", reason]) + set_clause = "Status = ?, DeclinedAt = datetime('now'), DeclineReason = ?" + params.extend([reason]) params.append(schedule_id) # WHERE clause param @@ -148,43 +153,6 @@ class ScheduleRepository(BaseRepository[ScheduleModel]): row = self.db.fetchone(sql, params) return row is not None - def is_available(self, member_id: int, service_id: int) -> bool: - """ - Cool‑down rule: a member is unavailable if they have accepted a - schedule for the same service within the last ``COOLDOWN_DAYS``. - """ - # Latest acceptance timestamp (if any) - sql_latest = f""" - SELECT MAX(AcceptedAt) AS last_accept - FROM {self._TABLE} - WHERE MemberId = ? - AND ServiceId = ? - AND Status = ? - """ - row = self.db.fetchone( - sql_latest, - (member_id, service_id, ScheduleStatus.ACCEPTED.value), - ) - last_accept: Optional[str] = row["last_accept"] if row else None - - if not last_accept: - return True # never accepted → free to schedule - - COOLDOWN_DAYS = 1 - sql_cooldown = f""" - SELECT 1 - FROM {self._TABLE} - WHERE MemberId = ? - AND ServiceId = ? - AND Status = ? - AND DATE(AcceptedAt) >= DATE('now', '-{COOLDOWN_DAYS} day') - LIMIT 1 - """ - row = self.db.fetchone( - sql_cooldown, - (member_id, service_id, ScheduleStatus.ACCEPTED.value), - ) - return row is None # None → outside the cooldown window # ------------------------------------------------------------------ # Status‑transition helpers (accept / decline) – kept for completeness. @@ -194,16 +162,26 @@ class ScheduleRepository(BaseRepository[ScheduleModel]): schedule_id: int, accepted_at: Optional[Any] = None, ) -> None: - sql = f""" - UPDATE {self._TABLE} - SET Status = ?, - AcceptedAt = ?, - DeclinedAt = NULL, - DeclineReason = NULL - WHERE {self._PK} = ? - """ - ts = accepted_at or "CURRENT_TIMESTAMP" - self.db.execute(sql, (ScheduleStatus.ACCEPTED.value, ts, schedule_id)) + if accepted_at is None: + sql = f""" + UPDATE {self._TABLE} + SET Status = ?, + AcceptedAt = datetime('now'), + DeclinedAt = NULL, + DeclineReason = NULL + WHERE {self._PK} = ? + """ + self.db.execute(sql, (ScheduleStatus.ACCEPTED.value, schedule_id)) + else: + sql = f""" + UPDATE {self._TABLE} + SET Status = ?, + AcceptedAt = ?, + DeclinedAt = NULL, + DeclineReason = NULL + WHERE {self._PK} = ? + """ + self.db.execute(sql, (ScheduleStatus.ACCEPTED.value, accepted_at, schedule_id)) def mark_declined( self, @@ -211,15 +189,24 @@ class ScheduleRepository(BaseRepository[ScheduleModel]): declined_at: Optional[Any] = None, decline_reason: Optional[str] = None, ) -> None: - sql = f""" - UPDATE {self._TABLE} - SET Status = ?, - DeclinedAt = ?, - DeclineReason = ? - WHERE {self._PK} = ? - """ - ts = declined_at or "CURRENT_TIMESTAMP" - self.db.execute(sql, (ScheduleStatus.DECLINED.value, ts, decline_reason, schedule_id)) + if declined_at is None: + sql = f""" + UPDATE {self._TABLE} + SET Status = ?, + DeclinedAt = datetime('now'), + DeclineReason = ? + WHERE {self._PK} = ? + """ + self.db.execute(sql, (ScheduleStatus.DECLINED.value, decline_reason, schedule_id)) + else: + sql = f""" + UPDATE {self._TABLE} + SET Status = ?, + DeclinedAt = ?, + DeclineReason = ? + WHERE {self._PK} = ? + """ + self.db.execute(sql, (ScheduleStatus.DECLINED.value, declined_at, decline_reason, schedule_id)) # ------------------------------------------------------------------ # Same‑day helper – used by the scheduling service diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 3e5e784..ccf2e1e 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -10,6 +10,7 @@ from backend.db import DatabaseConnection from backend.repositories import ( MemberRepository, ClassificationRepository, + ServiceRepository, ServiceTypeRepository, ServiceAvailabilityRepository, ) @@ -156,6 +157,14 @@ def service_type_repo( return ServiceTypeRepository(db_connection) +@pytest.fixture +def service_repo( + db_connection: DatabaseConnection, + seed_lookup_tables, +) -> ServiceRepository: + return ServiceRepository(db_connection) + + @pytest.fixture def service_availability_repo( db_connection: DatabaseConnection, diff --git a/backend/tests/repositories/test_classification.py b/backend/tests/repositories/test_classification.py index a0727a3..77161d1 100644 --- a/backend/tests/repositories/test_classification.py +++ b/backend/tests/repositories/test_classification.py @@ -1,9 +1,10 @@ # backend/tests/repositories/test_classification.py # ------------------------------------------------------------ -# Pytest suite for the ClassificationRepository. +# Comprehensive pytest suite for the ClassificationRepository. # ------------------------------------------------------------ import pytest +from typing import List from backend.models import Classification as ClassificationModel from backend.repositories import ClassificationRepository @@ -90,4 +91,383 @@ def test_delete(classification_repo): remaining_names = {r.ClassificationName for r in remaining} assert "TempVoice" not in remaining_names # the original four seeded names must still be present - assert {"Soprano", "Alto / Mezzo", "Tenor", "Baritone"} <= remaining_names \ No newline at end of file + assert {"Soprano", "Alto / Mezzo", "Tenor", "Baritone"} <= remaining_names + + +# ---------------------------------------------------------------------- +# 6️⃣ Edge cases and error conditions +# ---------------------------------------------------------------------- +def test_get_by_id_returns_none_when_missing(classification_repo: ClassificationRepository): + """Test that get_by_id returns None for nonexistent IDs.""" + result = classification_repo.get_by_id(99999) + assert result is None + + +def test_get_by_id_with_negative_id(classification_repo: ClassificationRepository): + """Test get_by_id with negative ID (should return None).""" + result = classification_repo.get_by_id(-1) + assert result is None + + +def test_get_by_id_with_zero_id(classification_repo: ClassificationRepository): + """Test get_by_id with zero ID (should return None).""" + result = classification_repo.get_by_id(0) + assert result is None + + +def test_find_by_name_case_sensitivity(classification_repo: ClassificationRepository): + """Test that find_by_name is case-sensitive.""" + # Exact case should work + exact = classification_repo.find_by_name("Soprano") + assert exact is not None + assert exact.ClassificationName == "Soprano" + + # Different cases should return None + assert classification_repo.find_by_name("soprano") is None + assert classification_repo.find_by_name("SOPRANO") is None + assert classification_repo.find_by_name("SoPrAnO") is None + + +def test_find_by_name_with_whitespace(classification_repo: ClassificationRepository): + """Test find_by_name behavior with whitespace variations.""" + # Exact name with spaces should work + exact = classification_repo.find_by_name("Alto / Mezzo") + assert exact is not None + + # Names with extra whitespace should return None (no trimming) + assert classification_repo.find_by_name(" Alto / Mezzo") is None + assert classification_repo.find_by_name("Alto / Mezzo ") is None + assert classification_repo.find_by_name(" Alto / Mezzo ") is None + + +def test_find_by_name_with_empty_string(classification_repo: ClassificationRepository): + """Test find_by_name with empty string.""" + result = classification_repo.find_by_name("") + assert result is None + + +def test_create_with_empty_string_name(classification_repo: ClassificationRepository): + """Test creating a classification with empty string name.""" + # This should work - empty string is a valid name + empty_name = classification_repo.create("") + assert empty_name.ClassificationName == "" + assert isinstance(empty_name.ClassificationId, int) + assert empty_name.ClassificationId > 0 + + # Should be able to find it back + found = classification_repo.find_by_name("") + assert found is not None + assert found.ClassificationId == empty_name.ClassificationId + + +def test_create_with_whitespace_only_name(classification_repo: ClassificationRepository): + """Test creating a classification with whitespace-only name.""" + whitespace_name = classification_repo.create(" ") + assert whitespace_name.ClassificationName == " " + assert isinstance(whitespace_name.ClassificationId, int) + + # Should be findable + found = classification_repo.find_by_name(" ") + assert found is not None + assert found.ClassificationId == whitespace_name.ClassificationId + + +def test_create_with_very_long_name(classification_repo: ClassificationRepository): + """Test creating a classification with a very long name.""" + long_name = "A" * 1000 # 1000 character name + long_classification = classification_repo.create(long_name) + assert long_classification.ClassificationName == long_name + assert isinstance(long_classification.ClassificationId, int) + + # Should be findable + found = classification_repo.find_by_name(long_name) + assert found is not None + assert found.ClassificationId == long_classification.ClassificationId + + +def test_create_with_special_characters(classification_repo: ClassificationRepository): + """Test creating classifications with special characters.""" + special_names = [ + "Alto/Soprano", + "Bass-Baritone", + "Counter-tenor (High)", + "Mezzo@Soprano", + "Coloratura Soprano (1st)", + "Basso Profondo & Cantante", + "Soprano (🎵)", + "Tenor - Lyric/Dramatic", + ] + + created_ids = [] + for name in special_names: + classification = classification_repo.create(name) + assert classification.ClassificationName == name + assert isinstance(classification.ClassificationId, int) + created_ids.append(classification.ClassificationId) + + # Should be findable + found = classification_repo.find_by_name(name) + assert found is not None + assert found.ClassificationId == classification.ClassificationId + + # All IDs should be unique + assert len(set(created_ids)) == len(created_ids) + + +def test_delete_nonexistent_classification(classification_repo: ClassificationRepository): + """Test deleting a classification that doesn't exist (should not raise error).""" + initial_count = len(classification_repo.list_all()) + + # This should not raise an exception + classification_repo.delete(99999) + + # Count should remain the same + final_count = len(classification_repo.list_all()) + assert final_count == initial_count + + +def test_delete_with_negative_id(classification_repo: ClassificationRepository): + """Test delete with negative ID (should not raise error).""" + initial_count = len(classification_repo.list_all()) + + # This should not raise an exception + classification_repo.delete(-1) + + # Count should remain the same + final_count = len(classification_repo.list_all()) + assert final_count == initial_count + + +# ---------------------------------------------------------------------- +# 7️⃣ Data integrity and consistency tests +# ---------------------------------------------------------------------- +def test_list_all_ordering_consistency(classification_repo: ClassificationRepository): + """Test that list_all always returns results in consistent alphabetical order.""" + # Add some classifications with names that test alphabetical ordering + test_names = ["Zebra", "Alpha", "Beta", "Zulu", "Apple", "Banana"] + created = [] + + for name in test_names: + created.append(classification_repo.create(name)) + + # Get all classifications multiple times + for _ in range(3): + all_classifications = classification_repo.list_all() + names = [c.ClassificationName for c in all_classifications] + + # Should be in alphabetical order + assert names == sorted(names) + + # Should contain our test names + for name in test_names: + assert name in names + + +def test_ensure_exists_idempotency_stress_test(classification_repo: ClassificationRepository): + """Test that ensure_exists is truly idempotent under multiple calls.""" + name = "StressTestClassification" + + # Call ensure_exists multiple times + results = [] + for _ in range(10): + result = classification_repo.ensure_exists(name) + results.append(result) + + # All results should be the same object (same ID) + first_id = results[0].ClassificationId + for result in results: + assert result.ClassificationId == first_id + assert result.ClassificationName == name + + # Should only exist once in the database + all_classifications = classification_repo.list_all() + matching = [c for c in all_classifications if c.ClassificationName == name] + assert len(matching) == 1 + + +def test_classification_model_data_integrity(classification_repo: ClassificationRepository): + """Test that Classification model preserves data integrity.""" + original_name = "DataIntegrityTest" + classification = classification_repo.create(original_name) + + # Verify original data + assert classification.ClassificationName == original_name + original_id = classification.ClassificationId + + # Retrieve and verify data is preserved + retrieved = classification_repo.get_by_id(original_id) + assert retrieved is not None + assert retrieved.ClassificationId == original_id + assert retrieved.ClassificationName == original_name + + # Verify through find_by_name as well + found_by_name = classification_repo.find_by_name(original_name) + assert found_by_name is not None + assert found_by_name.ClassificationId == original_id + assert found_by_name.ClassificationName == original_name + + +# ---------------------------------------------------------------------- +# 8️⃣ Parameterized tests for comprehensive coverage +# ---------------------------------------------------------------------- +@pytest.mark.parametrize( + "test_name,expected_found", + [ + ("Soprano", True), + ("Alto / Mezzo", True), + ("Tenor", True), + ("Baritone", True), + ("Bass", False), + ("Countertenor", False), + ("Mezzo-Soprano", False), + ("", False), + ("soprano", False), # case sensitivity + ("SOPRANO", False), # case sensitivity + ] +) +def test_find_by_name_comprehensive( + classification_repo: ClassificationRepository, + test_name: str, + expected_found: bool +): + """Comprehensive test of find_by_name with various inputs.""" + result = classification_repo.find_by_name(test_name) + + if expected_found: + assert result is not None + assert result.ClassificationName == test_name + assert isinstance(result.ClassificationId, int) + assert result.ClassificationId > 0 + else: + assert result is None + + +@pytest.mark.parametrize( + "test_name", + [ + "NewClassification1", + "Test With Spaces", + "Special-Characters!@#", + "123NumbersFirst", + "Mixed123Characters", + "Très_French_Ñame", + "Multi\nLine\nName", + "Tab\tSeparated", + "Quote'Name", + 'Double"Quote"Name', + ] +) +def test_create_and_retrieve_various_names(classification_repo: ClassificationRepository, test_name: str): + """Test creating and retrieving classifications with various name formats.""" + # Create + created = classification_repo.create(test_name) + assert created.ClassificationName == test_name + assert isinstance(created.ClassificationId, int) + assert created.ClassificationId > 0 + + # Retrieve by ID + by_id = classification_repo.get_by_id(created.ClassificationId) + assert by_id is not None + assert by_id.ClassificationName == test_name + assert by_id.ClassificationId == created.ClassificationId + + # Retrieve by name + by_name = classification_repo.find_by_name(test_name) + assert by_name is not None + assert by_name.ClassificationName == test_name + assert by_name.ClassificationId == created.ClassificationId + + +# ---------------------------------------------------------------------- +# 9️⃣ Integration and workflow tests +# ---------------------------------------------------------------------- +def test_complete_classification_workflow(classification_repo: ClassificationRepository): + """Test a complete workflow with multiple operations.""" + initial_count = len(classification_repo.list_all()) + + # Step 1: Create a new classification + new_name = "WorkflowTest" + created = classification_repo.create(new_name) + assert created.ClassificationName == new_name + + # Step 2: Verify it exists in list_all + all_classifications = classification_repo.list_all() + assert len(all_classifications) == initial_count + 1 + assert new_name in [c.ClassificationName for c in all_classifications] + + # Step 3: Find by name + found = classification_repo.find_by_name(new_name) + assert found is not None + assert found.ClassificationId == created.ClassificationId + + # Step 4: Get by ID + by_id = classification_repo.get_by_id(created.ClassificationId) + assert by_id is not None + assert by_id.ClassificationName == new_name + + # Step 5: Use ensure_exists (should return existing) + ensured = classification_repo.ensure_exists(new_name) + assert ensured.ClassificationId == created.ClassificationId + + # Step 6: Delete it + classification_repo.delete(created.ClassificationId) + + # Step 7: Verify it's gone + assert classification_repo.get_by_id(created.ClassificationId) is None + assert classification_repo.find_by_name(new_name) is None + final_all = classification_repo.list_all() + assert len(final_all) == initial_count + assert new_name not in [c.ClassificationName for c in final_all] + + +def test_multiple_classifications_with_similar_names(classification_repo: ClassificationRepository): + """Test handling of classifications with similar but distinct names.""" + base_name = "TestSimilar" + similar_names = [ + base_name, + base_name + " ", # with trailing space + " " + base_name, # with leading space + base_name.upper(), # different case + base_name.lower(), # different case + base_name + "2", # with number + base_name + "_Alt", # with suffix + ] + + created_classifications = [] + for name in similar_names: + classification = classification_repo.create(name) + created_classifications.append(classification) + assert classification.ClassificationName == name + + # All should have unique IDs + ids = [c.ClassificationId for c in created_classifications] + assert len(set(ids)) == len(ids) + + # All should be findable by their exact names + for i, name in enumerate(similar_names): + found = classification_repo.find_by_name(name) + assert found is not None + assert found.ClassificationId == created_classifications[i].ClassificationId + assert found.ClassificationName == name + + +def test_classification_repository_thread_safety_simulation(classification_repo: ClassificationRepository): + """Simulate concurrent operations to test repository consistency.""" + # This simulates what might happen if multiple threads/processes were accessing the repo + base_name = "ConcurrencyTest" + + # Simulate multiple "threads" trying to ensure the same classification exists + results = [] + for i in range(5): + result = classification_repo.ensure_exists(base_name) + results.append(result) + + # All should return the same classification + first_id = results[0].ClassificationId + for result in results: + assert result.ClassificationId == first_id + assert result.ClassificationName == base_name + + # Should only exist once in the database + all_matches = [c for c in classification_repo.list_all() if c.ClassificationName == base_name] + assert len(all_matches) == 1 \ No newline at end of file diff --git a/backend/tests/repositories/test_member.py b/backend/tests/repositories/test_member.py index 40eb1c1..021a553 100644 --- a/backend/tests/repositories/test_member.py +++ b/backend/tests/repositories/test_member.py @@ -1,6 +1,6 @@ # tests/repositories/test_member.py import datetime as dt -from typing import List +from typing import List, Any import pytest @@ -380,4 +380,313 @@ def test_set_last_declined_resets_streak_and_records_date(member_repo: MemberRep refreshed2 = member_repo.get_by_id(member.MemberId) assert refreshed2.DeclineStreak == 2 - assert refreshed2.LastDeclinedAt == tomorrow_iso \ No newline at end of file + assert refreshed2.LastDeclinedAt == tomorrow_iso + + +# ---------------------------------------------------------------------- +# 7️⃣ get_active – filter active members only +# ---------------------------------------------------------------------- +def test_get_active_filters_correctly(member_repo: MemberRepository, clean_members): + """Test that get_active returns only active members.""" + # Create active member + active_member = member_repo.create( + first_name="Active", + last_name="User", + email="active@example.com", + phone_number="555-1234", + classification_id=1, + is_active=1, + ) + + # Create inactive member + inactive_member = member_repo.create( + first_name="Inactive", + last_name="User", + email="inactive@example.com", + phone_number="555-5678", + classification_id=1, + is_active=0, + ) + + active_members = member_repo.get_active() + + # Should only return the active member + assert len(active_members) == 1 + assert active_members[0].MemberId == active_member.MemberId + assert active_members[0].FirstName == "Active" + assert active_members[0].IsActive == 1 + + +# ---------------------------------------------------------------------- +# 8️⃣ set_last_accepted – resets decline data and sets acceptance date +# ---------------------------------------------------------------------- +def test_set_last_accepted_resets_decline_data(member_repo: MemberRepository): + """Test that set_last_accepted clears decline data and sets acceptance timestamp.""" + member = member_repo.create( + first_name="Test", + last_name="Member", + email="test@example.com", + phone_number=None, + classification_id=1, + is_active=1, + ) + + # First decline the member to set up decline data + yesterday_iso = (dt.date.today() - dt.timedelta(days=1)).isoformat() + member_repo.set_last_declined(member.MemberId, yesterday_iso) + + # Verify decline data is set + declined_member = member_repo.get_by_id(member.MemberId) + assert declined_member.DeclineStreak == 1 + assert declined_member.LastDeclinedAt == yesterday_iso + assert declined_member.LastAcceptedAt is None + + # Now accept + member_repo.set_last_accepted(member.MemberId) + + # Verify acceptance resets decline data + accepted_member = member_repo.get_by_id(member.MemberId) + assert accepted_member.DeclineStreak == 0 + assert accepted_member.LastDeclinedAt is None + assert accepted_member.LastAcceptedAt is not None + + # Verify timestamp is recent (within last 5 seconds) + accepted_time = dt.datetime.fromisoformat(accepted_member.LastAcceptedAt.replace('f', '000')) + time_diff = dt.datetime.utcnow() - accepted_time + assert time_diff.total_seconds() < 5 + + +# ---------------------------------------------------------------------- +# 9️⃣ reset_to_queue_front – moves member to front of scheduling queue +# ---------------------------------------------------------------------- +def test_reset_to_queue_front_moves_member_to_front(member_repo: MemberRepository, clean_members): + """Test that reset_to_queue_front properly resets timestamps to move member to front.""" + # Create two members with different timestamps + older_member = make_member( + member_repo, + "Older", + "Member", + accepted_at="2025-01-01 10:00:00", + scheduled_at="2025-01-01 10:00:00", + declined_at="2025-01-01 10:00:00", + decline_streak=2, + ) + + newer_member = make_member( + member_repo, + "Newer", + "Member", + accepted_at="2025-08-01 10:00:00", + scheduled_at="2025-08-01 10:00:00", + ) + + # Verify initial queue order (newer should come first due to older accepted date) + initial_queue = member_repo.candidate_queue([1]) + assert len(initial_queue) == 2 + assert initial_queue[0].FirstName == "Older" # older accepted date comes first + assert initial_queue[1].FirstName == "Newer" + + # Reset newer member to queue front + member_repo.reset_to_queue_front(newer_member.MemberId) + + # Verify newer member is now at front + updated_queue = member_repo.candidate_queue([1]) + assert len(updated_queue) == 2 + assert updated_queue[0].FirstName == "Newer" # should now be first + assert updated_queue[1].FirstName == "Older" + + # Verify the reset member has expected timestamp values + reset_member = member_repo.get_by_id(newer_member.MemberId) + assert reset_member.LastAcceptedAt == '1970-01-01 00:00:00' + assert reset_member.LastScheduledAt == '1970-01-01 00:00:00' + assert reset_member.LastDeclinedAt is None + assert reset_member.DeclineStreak == 0 + + +# ---------------------------------------------------------------------- +# 🔟 Edge cases and error conditions +# ---------------------------------------------------------------------- +def test_create_with_minimal_data(member_repo: MemberRepository): + """Test creating a member with only required fields.""" + member = member_repo.create( + first_name="Min", + last_name="Member" + ) + + assert member.FirstName == "Min" + assert member.LastName == "Member" + assert member.Email is None + assert member.PhoneNumber is None + assert member.ClassificationId is None + assert member.Notes is None + assert member.IsActive == 1 # default value + assert member.DeclineStreak == 0 # default value + + +def test_get_by_classification_ids_empty_list(member_repo: MemberRepository): + """Test that empty classification list returns empty result without DB query.""" + result = member_repo.get_by_classification_ids([]) + assert result == [] + + +def test_get_by_classification_ids_nonexistent_classification(member_repo: MemberRepository): + """Test querying for nonexistent classification IDs.""" + result = member_repo.get_by_classification_ids([999, 1000]) + assert result == [] + + +def test_candidate_queue_with_inactive_members(member_repo: MemberRepository, clean_members): + """Test that candidate_queue respects only_active parameter.""" + # Create active and inactive members + active = make_member(member_repo, "Active", "Member", is_active=1) + inactive = make_member(member_repo, "Inactive", "Member", is_active=0) + + # Test with only_active=True (default) + queue_active_only = member_repo.candidate_queue([1], only_active=True) + assert len(queue_active_only) == 1 + assert queue_active_only[0].FirstName == "Active" + + # Test with only_active=False + queue_all = member_repo.candidate_queue([1], only_active=False) + assert len(queue_all) == 2 + names = {m.FirstName for m in queue_all} + assert names == {"Active", "Inactive"} + + +def test_candidate_queue_boost_window_edge_cases(member_repo: MemberRepository, clean_members): + """Test boost logic with edge cases around the boost window.""" + now = dt.datetime.utcnow() + + # Member declined well outside boost window (3 days ago) + outside_window = (now - dt.timedelta(days=3)).strftime("%Y-%m-%d %H:%M:%S") + outside_member = make_member( + member_repo, + "Outside", + "Member", + declined_at=outside_window, + decline_streak=1, + ) + + # Member declined well inside boost window (6 hours ago) + well_inside = (now - dt.timedelta(hours=6)).strftime("%Y-%m-%d %H:%M:%S") + inside_member = make_member( + member_repo, + "Inside", + "Member", + declined_at=well_inside, + decline_streak=1, + ) + + # Member with high decline streak (should not get boost) + high_streak_member = make_member( + member_repo, + "HighStreak", + "Member", + declined_at=well_inside, + decline_streak=5, # >= 2, so no boost + ) + + queue = member_repo.candidate_queue([1]) + + # Inside member should be boosted to front + first_names = [m.FirstName for m in queue] + assert "Inside" == first_names[0] # should be boosted + assert "HighStreak" in first_names[1:] # should not be boosted + assert "Outside" in first_names[1:] # should not be boosted + + +@pytest.mark.parametrize( + "decline_streak,should_boost", + [ + (0, True), # streak < 2 + (1, True), # streak < 2 + (2, False), # streak >= 2 + (5, False), # streak >= 2 + ] +) +def test_candidate_queue_decline_streak_boost_logic( + member_repo: MemberRepository, + clean_members, + decline_streak: int, + should_boost: bool +): + """Test boost logic for different decline streak values.""" + now = dt.datetime.utcnow() + recent_decline = (now - dt.timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S") + + # Create a baseline member (no recent declines) + baseline = make_member(member_repo, "Baseline", "Member") + + # Create test member with specific decline streak + test_member = make_member( + member_repo, + "Test", + "Member", + declined_at=recent_decline, + decline_streak=decline_streak, + ) + + queue = member_repo.candidate_queue([1]) + first_names = [m.FirstName for m in queue] + + if should_boost: + assert first_names[0] == "Test" # boosted to front + assert first_names[1] == "Baseline" + else: + # Order depends on other factors, but Test should not be boosted + # Both have NULL LastAcceptedAt, so order by LastScheduledAt (both NULL) + # then likely by primary key order + assert "Test" in first_names + assert "Baseline" in first_names + + +def test_touch_last_scheduled_with_nonexistent_member(member_repo: MemberRepository): + """Test touch_last_scheduled with nonexistent member ID (should not raise error).""" + # This should not raise an exception, just silently do nothing + member_repo.touch_last_scheduled(99999) + + +def test_set_operations_with_nonexistent_member(member_repo: MemberRepository): + """Test set operations with nonexistent member IDs.""" + # These should not raise exceptions + member_repo.set_last_accepted(99999) + member_repo.set_last_declined(99999, "2025-08-29") + member_repo.reset_to_queue_front(99999) + + +def test_member_data_integrity_after_operations(member_repo: MemberRepository): + """Test that member data remains consistent after various operations.""" + member = member_repo.create( + first_name="Integrity", + last_name="Test", + email="integrity@example.com", + phone_number="555-0000", + classification_id=2, + notes="Test member", + is_active=1, + ) + + original_id = member.MemberId + original_email = member.Email + original_classification = member.ClassificationId + + # Perform various timestamp operations + member_repo.touch_last_scheduled(member.MemberId) + member_repo.set_last_declined(member.MemberId, "2025-08-29") + member_repo.set_last_accepted(member.MemberId) + member_repo.reset_to_queue_front(member.MemberId) + + # Verify core data is unchanged + final_member = member_repo.get_by_id(member.MemberId) + assert final_member.MemberId == original_id + assert final_member.FirstName == "Integrity" + assert final_member.LastName == "Test" + assert final_member.Email == original_email + assert final_member.ClassificationId == original_classification + assert final_member.IsActive == 1 + + # Verify reset operation results + assert final_member.LastAcceptedAt == '1970-01-01 00:00:00' + assert final_member.LastScheduledAt == '1970-01-01 00:00:00' + assert final_member.LastDeclinedAt is None + assert final_member.DeclineStreak == 0 \ No newline at end of file diff --git a/backend/tests/repositories/test_schedule.py b/backend/tests/repositories/test_schedule.py new file mode 100644 index 0000000..2f63b15 --- /dev/null +++ b/backend/tests/repositories/test_schedule.py @@ -0,0 +1,432 @@ +# tests/repositories/test_schedule.py +import datetime as dt +from typing import List + +import pytest + +from backend.models import Schedule as ScheduleModel, ScheduleStatus +from backend.repositories import ScheduleRepository, ServiceRepository +from backend.db import DatabaseConnection + + +# ---------------------------------------------------------------------- +# Additional fixtures for Schedule repository testing +# ---------------------------------------------------------------------- +@pytest.fixture +def schedule_repo( + db_connection: DatabaseConnection, + seed_lookup_tables, +) -> ScheduleRepository: + return ScheduleRepository(db_connection) + + +@pytest.fixture +def service_repo( + db_connection: DatabaseConnection, + seed_lookup_tables, +) -> ServiceRepository: + return ServiceRepository(db_connection) + + +@pytest.fixture +def sample_service(service_repo: ServiceRepository) -> int: + """Create a sample service and return its ID.""" + service = service_repo.create( + service_type_id=1, # 9AM from seeded data + service_date=dt.date(2025, 9, 15) + ) + return service.ServiceId + + +@pytest.fixture +def clean_schedules(schedule_repo: ScheduleRepository): + """Clean the Schedules table before tests.""" + schedule_repo.db.execute("DELETE FROM Schedules") + schedule_repo.db._conn.commit() + + +# ---------------------------------------------------------------------- +# Basic CRUD Operations +# ---------------------------------------------------------------------- +def test_create_and_get_by_id( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test basic schedule creation and retrieval.""" + # Create a schedule + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, # Alice from seeded data + status=ScheduleStatus.PENDING + ) + + # Verify the created schedule + assert isinstance(schedule.ScheduleId, int) and schedule.ScheduleId > 0 + assert schedule.ServiceId == sample_service + assert schedule.MemberId == 1 + assert schedule.Status == ScheduleStatus.PENDING.value + assert schedule.ScheduledAt is not None + assert schedule.AcceptedAt is None + assert schedule.DeclinedAt is None + + # Retrieve the schedule + fetched = schedule_repo.get_by_id(schedule.ScheduleId) + assert fetched is not None + assert fetched.ServiceId == sample_service + assert fetched.MemberId == 1 + assert fetched.Status == ScheduleStatus.PENDING.value + + +def test_get_by_id_returns_none_when_missing(schedule_repo: ScheduleRepository): + """Test that get_by_id returns None for non-existent schedules.""" + assert schedule_repo.get_by_id(9999) is None + + +def test_create_with_decline_reason( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test creating a schedule with DECLINED status and reason.""" + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.DECLINED, + reason="Already committed elsewhere" + ) + + assert schedule.Status == ScheduleStatus.DECLINED.value + assert schedule.DeclineReason == "Already committed elsewhere" + + +# ---------------------------------------------------------------------- +# List Operations +# ---------------------------------------------------------------------- +def test_list_all( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test listing all schedules.""" + # Create multiple schedules + schedule1 = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + schedule2 = schedule_repo.create( + service_id=sample_service, + member_id=2, + status=ScheduleStatus.ACCEPTED + ) + + schedules = schedule_repo.list_all() + assert len(schedules) == 2 + + schedule_ids = {s.ScheduleId for s in schedules} + assert schedule1.ScheduleId in schedule_ids + assert schedule2.ScheduleId in schedule_ids + + +def test_get_pending_for_service( + schedule_repo: ScheduleRepository, + service_repo: ServiceRepository, + clean_schedules +): + """Test getting pending schedules for a specific service.""" + # Create two services + service1 = service_repo.create(service_type_id=1, service_date=dt.date(2025, 9, 15)) + service2 = service_repo.create(service_type_id=2, service_date=dt.date(2025, 9, 15)) + + # Create schedules with different statuses + pending1 = schedule_repo.create( + service_id=service1.ServiceId, + member_id=1, + status=ScheduleStatus.PENDING + ) + accepted1 = schedule_repo.create( + service_id=service1.ServiceId, + member_id=2, + status=ScheduleStatus.ACCEPTED + ) + pending2 = schedule_repo.create( + service_id=service2.ServiceId, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Get pending schedules for service1 + pending_schedules = schedule_repo.get_pending_for_service(service1.ServiceId) + assert len(pending_schedules) == 1 + assert pending_schedules[0].ScheduleId == pending1.ScheduleId + assert pending_schedules[0].Status == ScheduleStatus.PENDING.value + + +# ---------------------------------------------------------------------- +# Query Operations +# ---------------------------------------------------------------------- +def test_get_one( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test getting one schedule by member and service ID.""" + # Create a schedule + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Find it + found = schedule_repo.get_one(member_id=1, service_id=sample_service) + assert found is not None + assert found.ScheduleId == schedule.ScheduleId + + # Try to find non-existent + not_found = schedule_repo.get_one(member_id=999, service_id=sample_service) + assert not_found is None + + +def test_has_any( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test checking if member has schedules with specific statuses.""" + # Create schedules with different statuses + schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + schedule_repo.create( + service_id=sample_service, + member_id=2, + status=ScheduleStatus.ACCEPTED + ) + + # Test has_any + assert schedule_repo.has_any(1, sample_service, [ScheduleStatus.PENDING]) + assert schedule_repo.has_any(2, sample_service, [ScheduleStatus.ACCEPTED]) + assert schedule_repo.has_any(1, sample_service, [ScheduleStatus.PENDING, ScheduleStatus.ACCEPTED]) + assert not schedule_repo.has_any(1, sample_service, [ScheduleStatus.DECLINED]) + assert not schedule_repo.has_any(999, sample_service, [ScheduleStatus.PENDING]) + + # Test empty statuses list + assert not schedule_repo.has_any(1, sample_service, []) + + +def test_has_schedule_on_date( + schedule_repo: ScheduleRepository, + service_repo: ServiceRepository, + clean_schedules +): + """Test checking if member has any schedule on a specific date.""" + # Create services on different dates + service_today = service_repo.create( + service_type_id=1, + service_date=dt.date(2025, 9, 15) + ) + service_tomorrow = service_repo.create( + service_type_id=2, + service_date=dt.date(2025, 9, 16) + ) + + # Create schedule for today + schedule_repo.create( + service_id=service_today.ServiceId, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Test has_schedule_on_date + assert schedule_repo.has_schedule_on_date(1, "2025-09-15") + assert not schedule_repo.has_schedule_on_date(1, "2025-09-16") + assert not schedule_repo.has_schedule_on_date(2, "2025-09-15") + + +# ---------------------------------------------------------------------- +# Status Update Operations +# ---------------------------------------------------------------------- +def test_update_status_to_accepted( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test updating schedule status to accepted.""" + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Update to accepted + schedule_repo.update_status( + schedule_id=schedule.ScheduleId, + new_status=ScheduleStatus.ACCEPTED + ) + + # Verify the update + updated = schedule_repo.get_by_id(schedule.ScheduleId) + assert updated is not None + assert updated.Status == ScheduleStatus.ACCEPTED.value + assert updated.DeclinedAt is None + assert updated.DeclineReason is None + + +def test_update_status_to_declined( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test updating schedule status to declined with reason.""" + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Update to declined + schedule_repo.update_status( + schedule_id=schedule.ScheduleId, + new_status=ScheduleStatus.DECLINED, + reason="Family emergency" + ) + + # Verify the update + updated = schedule_repo.get_by_id(schedule.ScheduleId) + assert updated is not None + assert updated.Status == ScheduleStatus.DECLINED.value + assert updated.DeclinedAt is not None + assert updated.DeclineReason == "Family emergency" + + +def test_mark_accepted( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test marking a schedule as accepted.""" + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Mark as accepted + schedule_repo.mark_accepted(schedule.ScheduleId) + + # Verify + updated = schedule_repo.get_by_id(schedule.ScheduleId) + assert updated is not None + assert updated.Status == ScheduleStatus.ACCEPTED.value + assert updated.AcceptedAt is not None + assert updated.DeclinedAt is None + assert updated.DeclineReason is None + + +def test_mark_declined( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test marking a schedule as declined.""" + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Mark as declined + schedule_repo.mark_declined( + schedule.ScheduleId, + decline_reason="Unable to attend" + ) + + # Verify + updated = schedule_repo.get_by_id(schedule.ScheduleId) + assert updated is not None + assert updated.Status == ScheduleStatus.DECLINED.value + assert updated.DeclinedAt is not None + assert updated.DeclineReason == "Unable to attend" + + + + +# ---------------------------------------------------------------------- +# Delete Operations (Added for CLI functionality) +# ---------------------------------------------------------------------- +def test_delete_schedule( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test deleting a schedule.""" + # Create a schedule + schedule = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + + # Verify it exists + assert schedule_repo.get_by_id(schedule.ScheduleId) is not None + + # Delete it + result = schedule_repo.delete_schedule(schedule.ScheduleId) + assert result is True + + # Verify it's gone + assert schedule_repo.get_by_id(schedule.ScheduleId) is None + + # Try to delete again (should return False) + result2 = schedule_repo.delete_schedule(schedule.ScheduleId) + assert result2 is False + + +def test_delete_nonexistent_schedule(schedule_repo: ScheduleRepository): + """Test deleting a non-existent schedule returns False.""" + result = schedule_repo.delete_schedule(9999) + assert result is False + + +# ---------------------------------------------------------------------- +# Edge Cases and Error Conditions +# ---------------------------------------------------------------------- +def test_create_with_invalid_foreign_keys( + schedule_repo: ScheduleRepository, + clean_schedules +): + """Test that creating schedule with invalid FKs raises appropriate errors.""" + # This should fail due to FK constraint (assuming constraints are enforced) + with pytest.raises(Exception): # SQLite foreign key constraint error + schedule_repo.create( + service_id=9999, # Non-existent service + member_id=1, + status=ScheduleStatus.PENDING + ) + + +def test_unique_constraint_member_service( + schedule_repo: ScheduleRepository, + sample_service: int, + clean_schedules +): + """Test that UNIQUE constraint prevents duplicate member/service schedules.""" + # Create first schedule + schedule1 = schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.PENDING + ) + assert schedule1 is not None + + # Attempting to create second schedule for same member/service should fail + with pytest.raises(Exception): # SQLite UNIQUE constraint error + schedule_repo.create( + service_id=sample_service, + member_id=1, + status=ScheduleStatus.DECLINED + ) \ No newline at end of file diff --git a/backend/tests/repositories/test_service.py b/backend/tests/repositories/test_service.py index e69de29..e32bfc6 100644 --- a/backend/tests/repositories/test_service.py +++ b/backend/tests/repositories/test_service.py @@ -0,0 +1,566 @@ +# backend/tests/repositories/test_service.py +# ------------------------------------------------------------ +# Comprehensive pytest suite for the ServiceRepository. +# ------------------------------------------------------------ + +import pytest +from datetime import date, datetime, timedelta +from typing import List +from backend.models import Service as ServiceModel +from backend.repositories import ServiceRepository + + +# ---------------------------------------------------------------------- +# Helper fixtures for test data +# ---------------------------------------------------------------------- +@pytest.fixture +def sample_dates(): + """Return a set of dates for testing.""" + today = date.today() + return { + 'past': today - timedelta(days=30), + 'yesterday': today - timedelta(days=1), + 'today': today, + 'tomorrow': today + timedelta(days=1), + 'next_week': today + timedelta(days=7), + 'future': today + timedelta(days=30), + } + + +@pytest.fixture +def clean_services(service_repo: ServiceRepository): + """Clean the Services table for tests that need isolation.""" + # Clear any existing services to start fresh + service_repo.db.execute(f"DELETE FROM {service_repo._TABLE}") + service_repo.db._conn.commit() + + +# ---------------------------------------------------------------------- +# 1� Basic CRUD  create & get_by_id +# ---------------------------------------------------------------------- +def test_create_and_get_by_id(service_repo: ServiceRepository, sample_dates): + """Test basic service creation and retrieval by ID.""" + service = service_repo.create( + service_type_id=1, + service_date=sample_dates['tomorrow'] + ) + + # Verify creation + assert isinstance(service.ServiceId, int) and service.ServiceId > 0 + assert service.ServiceTypeId == 1 + assert service.ServiceDate == sample_dates['tomorrow'] + + # Retrieve the same service + fetched = service_repo.get_by_id(service.ServiceId) + assert fetched is not None + assert fetched.ServiceId == service.ServiceId + assert fetched.ServiceTypeId == 1 + assert fetched.ServiceDate == sample_dates['tomorrow'] + + +def test_get_by_id_returns_none_when_missing(service_repo: ServiceRepository): + """Test that get_by_id returns None for nonexistent IDs.""" + result = service_repo.get_by_id(99999) + assert result is None + + +def test_create_with_date_object(service_repo: ServiceRepository): + """Test creating service with a date object.""" + test_date = date(2025, 12, 25) + service = service_repo.create(service_type_id=2, service_date=test_date) + + assert service.ServiceDate == test_date + assert service.ServiceTypeId == 2 + + +# ---------------------------------------------------------------------- +# 2� list_all  bulk operations +# ---------------------------------------------------------------------- +def test_list_all(service_repo: ServiceRepository, sample_dates, clean_services): + """Test listing all services.""" + # Create multiple services + services_data = [ + (1, sample_dates['today']), + (2, sample_dates['tomorrow']), + (1, sample_dates['next_week']), + ] + + created_services = [] + for service_type_id, service_date in services_data: + service = service_repo.create(service_type_id, service_date) + created_services.append(service) + + all_services = service_repo.list_all() + assert len(all_services) == 3 + + # Verify all created services are in the list + service_ids = {s.ServiceId for s in all_services} + expected_ids = {s.ServiceId for s in created_services} + assert service_ids == expected_ids + + +def test_list_all_empty_table(service_repo: ServiceRepository, clean_services): + """Test list_all when table is empty.""" + all_services = service_repo.list_all() + assert all_services == [] + + +# ---------------------------------------------------------------------- +# 3� upcoming  date-based filtering +# ---------------------------------------------------------------------- +def test_upcoming_default_behavior(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() with default parameters (from today, limit 100).""" + # Create services on various dates + past_service = service_repo.create(1, sample_dates['past']) + today_service = service_repo.create(1, sample_dates['today']) + future_service = service_repo.create(1, sample_dates['future']) + + upcoming = service_repo.upcoming() + + # Should include today and future, but not past + upcoming_ids = {s.ServiceId for s in upcoming} + assert today_service.ServiceId in upcoming_ids + assert future_service.ServiceId in upcoming_ids + assert past_service.ServiceId not in upcoming_ids + + +def test_upcoming_with_specific_after_date(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() with a specific after date.""" + # Create services + service_repo.create(1, sample_dates['yesterday']) + tomorrow_service = service_repo.create(1, sample_dates['tomorrow']) + future_service = service_repo.create(1, sample_dates['future']) + + # Get services from tomorrow onwards + upcoming = service_repo.upcoming(after=sample_dates['tomorrow']) + + upcoming_ids = {s.ServiceId for s in upcoming} + assert tomorrow_service.ServiceId in upcoming_ids + assert future_service.ServiceId in upcoming_ids + assert len(upcoming) == 2 + + +def test_upcoming_with_limit(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() with a limit parameter.""" + # Create multiple future services + for i in range(5): + service_repo.create(1, sample_dates['today'] + timedelta(days=i)) + + upcoming = service_repo.upcoming(limit=3) + assert len(upcoming) == 3 + + +def test_upcoming_chronological_order(service_repo: ServiceRepository, sample_dates, clean_services): + """Test that upcoming() returns services in chronological order.""" + # Create services in non-chronological order + service_dates = [ + sample_dates['future'], + sample_dates['tomorrow'], + sample_dates['next_week'], + sample_dates['today'], + ] + + for service_date in service_dates: + service_repo.create(1, service_date) + + upcoming = service_repo.upcoming() + + # Verify chronological order + for i in range(len(upcoming) - 1): + assert upcoming[i].ServiceDate <= upcoming[i + 1].ServiceDate + + +def test_upcoming_no_results(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() when no future services exist.""" + # Create only past services + service_repo.create(1, sample_dates['past']) + service_repo.create(1, sample_dates['yesterday']) + + upcoming = service_repo.upcoming() + assert upcoming == [] + + +# ---------------------------------------------------------------------- +# 4� by_type  service type filtering +# ---------------------------------------------------------------------- +def test_by_type_single_type(service_repo: ServiceRepository, sample_dates, clean_services): + """Test filtering by a single service type.""" + # Create services of different types + type1_service = service_repo.create(1, sample_dates['today']) + type2_service = service_repo.create(2, sample_dates['tomorrow']) + type1_service2 = service_repo.create(1, sample_dates['future']) + + type1_services = service_repo.by_type([1]) + + # Should only include type 1 services + type1_ids = {s.ServiceId for s in type1_services} + assert type1_service.ServiceId in type1_ids + assert type1_service2.ServiceId in type1_ids + assert type2_service.ServiceId not in type1_ids + assert len(type1_services) == 2 + + +def test_by_type_multiple_types(service_repo: ServiceRepository, sample_dates, clean_services): + """Test filtering by multiple service types.""" + # Create services of different types + type1_service = service_repo.create(1, sample_dates['today']) + type2_service = service_repo.create(2, sample_dates['tomorrow']) + type3_service = service_repo.create(3, sample_dates['future']) + + multi_type_services = service_repo.by_type([1, 3]) + + # Should include type 1 and 3, but not type 2 + multi_ids = {s.ServiceId for s in multi_type_services} + assert type1_service.ServiceId in multi_ids + assert type3_service.ServiceId in multi_ids + assert type2_service.ServiceId not in multi_ids + assert len(multi_type_services) == 2 + + +def test_by_type_empty_list(service_repo: ServiceRepository): + """Test by_type() with empty type list returns empty result without DB query.""" + result = service_repo.by_type([]) + assert result == [] + + +def test_by_type_nonexistent_types(service_repo: ServiceRepository, sample_dates, clean_services): + """Test by_type() with nonexistent service types.""" + service_repo.create(1, sample_dates['today']) + + result = service_repo.by_type([999, 1000]) + assert result == [] + + +def test_by_type_chronological_order(service_repo: ServiceRepository, sample_dates, clean_services): + """Test that by_type() returns services in chronological order.""" + # Create services of same type in non-chronological order + service_dates = [ + sample_dates['future'], + sample_dates['today'], + sample_dates['tomorrow'], + ] + + for service_date in service_dates: + service_repo.create(1, service_date) + + services = service_repo.by_type([1]) + + # Verify chronological order + for i in range(len(services) - 1): + assert services[i].ServiceDate <= services[i + 1].ServiceDate + + +# ---------------------------------------------------------------------- +# 5� reschedule  update operations +# ---------------------------------------------------------------------- +def test_reschedule_service(service_repo: ServiceRepository, sample_dates): + """Test rescheduling a service to a new date.""" + original_date = sample_dates['today'] + new_date = sample_dates['future'] + + service = service_repo.create(1, original_date) + assert service.ServiceDate == original_date + + # Reschedule the service + service_repo.reschedule(service.ServiceId, new_date) + + # Verify the change + updated_service = service_repo.get_by_id(service.ServiceId) + assert updated_service is not None + assert updated_service.ServiceDate == new_date + assert updated_service.ServiceTypeId == 1 # Should remain unchanged + + +def test_reschedule_nonexistent_service(service_repo: ServiceRepository, sample_dates): + """Test rescheduling a nonexistent service (should not raise error).""" + # This should not raise an exception + service_repo.reschedule(99999, sample_dates['tomorrow']) + + +# ---------------------------------------------------------------------- +# 6� Edge cases and error conditions +# ---------------------------------------------------------------------- +def test_get_by_id_with_negative_id(service_repo: ServiceRepository): + """Test get_by_id with negative ID (should return None).""" + result = service_repo.get_by_id(-1) + assert result is None + + +def test_get_by_id_with_zero_id(service_repo: ServiceRepository): + """Test get_by_id with zero ID (should return None).""" + result = service_repo.get_by_id(0) + assert result is None + + +def test_create_with_invalid_service_type_id_raises_error(service_repo: ServiceRepository, sample_dates): + """Test creating service with invalid service type ID raises foreign key error.""" + with pytest.raises(Exception): # SQLite IntegrityError for FK constraint + service_repo.create(999, sample_dates['today']) + + +def test_create_with_very_old_date(service_repo: ServiceRepository): + """Test creating service with a very old date.""" + very_old_date = date(1900, 1, 1) + service = service_repo.create(1, very_old_date) + assert service.ServiceDate == very_old_date + + +def test_create_with_far_future_date(service_repo: ServiceRepository): + """Test creating service with a far future date.""" + far_future_date = date(2100, 12, 31) + service = service_repo.create(1, far_future_date) + assert service.ServiceDate == far_future_date + + +def test_upcoming_with_zero_limit(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() with zero limit.""" + service_repo.create(1, sample_dates['tomorrow']) + + upcoming = service_repo.upcoming(limit=0) + assert upcoming == [] + + +def test_upcoming_with_negative_limit(service_repo: ServiceRepository, sample_dates, clean_services): + """Test upcoming() with negative limit (SQLite behavior).""" + service_repo.create(1, sample_dates['tomorrow']) + + # SQLite treats negative LIMIT as unlimited + upcoming = service_repo.upcoming(limit=-1) + assert len(upcoming) >= 0 # Should not crash + + +def test_by_type_with_duplicate_type_ids(service_repo: ServiceRepository, sample_dates, clean_services): + """Test by_type() with duplicate type IDs in the list.""" + service1 = service_repo.create(1, sample_dates['today']) + service2 = service_repo.create(2, sample_dates['tomorrow']) + + # Pass duplicate type IDs + services = service_repo.by_type([1, 1, 2, 1]) + + # Should return services of both types (no duplicates in result) + service_ids = {s.ServiceId for s in services} + assert service1.ServiceId in service_ids + assert service2.ServiceId in service_ids + assert len(services) == 2 + + +# ---------------------------------------------------------------------- +# 7� Data integrity and consistency tests +# ---------------------------------------------------------------------- +def test_service_model_data_integrity(service_repo: ServiceRepository, sample_dates): + """Test that Service model preserves data integrity.""" + original_type_id = 2 + original_date = sample_dates['tomorrow'] + + service = service_repo.create(original_type_id, original_date) + original_id = service.ServiceId + + # Retrieve and verify data is preserved + retrieved = service_repo.get_by_id(original_id) + assert retrieved is not None + assert retrieved.ServiceId == original_id + assert retrieved.ServiceTypeId == original_type_id + assert retrieved.ServiceDate == original_date + + # Verify through list_all as well + all_services = service_repo.list_all() + matching_services = [s for s in all_services if s.ServiceId == original_id] + assert len(matching_services) == 1 + assert matching_services[0].ServiceTypeId == original_type_id + assert matching_services[0].ServiceDate == original_date + + +def test_multiple_services_same_date_and_type(service_repo: ServiceRepository, sample_dates, clean_services): + """Test creating multiple services on same date and type.""" + # Should be allowed - multiple services can exist for same date/type + service1 = service_repo.create(1, sample_dates['today']) + service2 = service_repo.create(1, sample_dates['today']) + + assert service1.ServiceId != service2.ServiceId + assert service1.ServiceTypeId == service2.ServiceTypeId + assert service1.ServiceDate == service2.ServiceDate + + # Both should be findable + assert service_repo.get_by_id(service1.ServiceId) is not None + assert service_repo.get_by_id(service2.ServiceId) is not None + + +# ---------------------------------------------------------------------- +# 8� Parameterized tests for comprehensive coverage +# ---------------------------------------------------------------------- +@pytest.mark.parametrize("service_type_id", [1, 2, 3]) +def test_create_with_valid_service_type_ids(service_repo: ServiceRepository, service_type_id): + """Test creating services with valid service type IDs.""" + test_date = date.today() + service = service_repo.create(service_type_id, test_date) + + assert service.ServiceTypeId == service_type_id + assert service.ServiceDate == test_date + assert isinstance(service.ServiceId, int) + assert service.ServiceId > 0 + + +@pytest.mark.parametrize("invalid_service_type_id", [999, -1, 0]) +def test_create_with_invalid_service_type_ids_raises_error(service_repo: ServiceRepository, invalid_service_type_id): + """Test creating services with invalid service type IDs raises foreign key errors.""" + test_date = date.today() + with pytest.raises(Exception): # SQLite IntegrityError for FK constraint + service_repo.create(invalid_service_type_id, test_date) + + +@pytest.mark.parametrize( + "days_offset,should_be_included", + [ + (-30, False), # Past + (-1, False), # Yesterday + (0, True), # Today + (1, True), # Tomorrow + (7, True), # Next week + (30, True), # Future + ] +) +def test_upcoming_date_filtering( + service_repo: ServiceRepository, + clean_services, + days_offset: int, + should_be_included: bool +): + """Test upcoming() date filtering logic.""" + test_date = date.today() + timedelta(days=days_offset) + service = service_repo.create(1, test_date) + + upcoming = service_repo.upcoming() + upcoming_ids = {s.ServiceId for s in upcoming} + + if should_be_included: + assert service.ServiceId in upcoming_ids + else: + assert service.ServiceId not in upcoming_ids + + +@pytest.mark.parametrize("limit", [1, 5, 10, 50, 100]) +def test_upcoming_limit_parameter(service_repo: ServiceRepository, clean_services, limit: int): + """Test upcoming() with various limit values.""" + # Create more services than the limit + for i in range(limit + 5): + service_repo.create(1, date.today() + timedelta(days=i)) + + upcoming = service_repo.upcoming(limit=limit) + assert len(upcoming) == limit + + +# ---------------------------------------------------------------------- +# 9� Integration and workflow tests +# ---------------------------------------------------------------------- +def test_complete_service_workflow(service_repo: ServiceRepository, sample_dates, clean_services): + """Test a complete workflow with multiple operations.""" + initial_count = len(service_repo.list_all()) + + # Step 1: Create a new service + original_date = sample_dates['tomorrow'] + service = service_repo.create(2, original_date) + assert service.ServiceTypeId == 2 + assert service.ServiceDate == original_date + + # Step 2: Verify it exists in list_all + all_services = service_repo.list_all() + assert len(all_services) == initial_count + 1 + service_ids = {s.ServiceId for s in all_services} + assert service.ServiceId in service_ids + + # Step 3: Find it in upcoming services + upcoming = service_repo.upcoming() + upcoming_ids = {s.ServiceId for s in upcoming} + assert service.ServiceId in upcoming_ids + + # Step 4: Find it by type + by_type = service_repo.by_type([2]) + by_type_ids = {s.ServiceId for s in by_type} + assert service.ServiceId in by_type_ids + + # Step 5: Reschedule it + new_date = sample_dates['future'] + service_repo.reschedule(service.ServiceId, new_date) + + # Step 6: Verify the reschedule + updated_service = service_repo.get_by_id(service.ServiceId) + assert updated_service is not None + assert updated_service.ServiceDate == new_date + assert updated_service.ServiceTypeId == 2 + + +def test_complex_date_filtering_scenario(service_repo: ServiceRepository, sample_dates, clean_services): + """Test complex scenarios with multiple date filters.""" + # Create services across different dates and types + services_data = [ + (1, sample_dates['past']), # Should not appear in upcoming + (1, sample_dates['today']), # Should appear in upcoming + (2, sample_dates['tomorrow']), # Should appear in upcoming + (1, sample_dates['future']), # Should appear in upcoming + (3, sample_dates['next_week']), # Should appear in upcoming + ] + + created_services = {} + for service_type_id, service_date in services_data: + service = service_repo.create(service_type_id, service_date) + created_services[(service_type_id, service_date)] = service + + # Test upcoming from tomorrow + upcoming_from_tomorrow = service_repo.upcoming(after=sample_dates['tomorrow']) + upcoming_dates = {s.ServiceDate for s in upcoming_from_tomorrow} + assert sample_dates['tomorrow'] in upcoming_dates + assert sample_dates['future'] in upcoming_dates + assert sample_dates['next_week'] in upcoming_dates + assert sample_dates['past'] not in upcoming_dates + assert sample_dates['today'] not in upcoming_dates + + # Test by specific types + type1_services = service_repo.by_type([1]) + type1_dates = {s.ServiceDate for s in type1_services} + assert sample_dates['past'] in type1_dates + assert sample_dates['today'] in type1_dates + assert sample_dates['future'] in type1_dates + + # Test combination: upcoming type 1 services (filter by date intersection) + upcoming_all = service_repo.upcoming() + upcoming_type1 = [s for s in upcoming_all if s.ServiceTypeId == 1] + upcoming_type1_dates = {s.ServiceDate for s in upcoming_type1} + assert sample_dates['today'] in upcoming_type1_dates + assert sample_dates['future'] in upcoming_type1_dates + assert sample_dates['past'] not in upcoming_type1_dates + + +def test_service_repository_consistency(service_repo: ServiceRepository, sample_dates, clean_services): + """Test repository consistency across different query methods.""" + # Create a mix of services + test_services = [] + for i in range(5): + service = service_repo.create( + service_type_id=(i % 3) + 1, + service_date=sample_dates['today'] + timedelta(days=i) + ) + test_services.append(service) + + # All services should be found through list_all + all_services = service_repo.list_all() + all_ids = {s.ServiceId for s in all_services} + test_ids = {s.ServiceId for s in test_services} + assert test_ids.issubset(all_ids) + + # Each service should be retrievable individually + for service in test_services: + retrieved = service_repo.get_by_id(service.ServiceId) + assert retrieved is not None + assert retrieved.ServiceId == service.ServiceId + assert retrieved.ServiceTypeId == service.ServiceTypeId + assert retrieved.ServiceDate == service.ServiceDate + + # Services should appear in appropriate filtered queries + future_services = service_repo.upcoming(after=sample_dates['tomorrow']) + future_ids = {s.ServiceId for s in future_services} + + for service in test_services: + if service.ServiceDate >= sample_dates['tomorrow']: + assert service.ServiceId in future_ids + else: + assert service.ServiceId not in future_ids \ No newline at end of file diff --git a/backend/tests/repositories/test_service_availability.py b/backend/tests/repositories/test_service_availability.py index fd80fb9..7aa73c3 100644 --- a/backend/tests/repositories/test_service_availability.py +++ b/backend/tests/repositories/test_service_availability.py @@ -1,69 +1,652 @@ -# tests/test_service_availability.py +# backend/tests/repositories/test_service_availability.py +# ------------------------------------------------------------ +# Comprehensive pytest suite for the ServiceAvailabilityRepository. +# ------------------------------------------------------------ + import pytest +from typing import List +from backend.models import ServiceAvailability as ServiceAvailabilityModel +from backend.repositories import ServiceAvailabilityRepository -def test_grant_and_revoke( - service_availability_repo, - member_repo, - service_type_repo, + +# ---------------------------------------------------------------------- +# Helper fixtures for test data +# ---------------------------------------------------------------------- +@pytest.fixture +def clean_service_availability(service_availability_repo: ServiceAvailabilityRepository): + """Clean the ServiceAvailability table for tests that need isolation.""" + # Clear any existing service availability records to start fresh + service_availability_repo.db.execute(f"DELETE FROM {service_availability_repo._TABLE}") + service_availability_repo.db._conn.commit() + + +# ---------------------------------------------------------------------- +# 1️⃣ Basic CRUD – create, get, delete +# ---------------------------------------------------------------------- +def test_create_and_get(service_availability_repo: ServiceAvailabilityRepository): + """Test basic service availability creation and retrieval.""" + # Create a new availability record + availability = service_availability_repo.create(member_id=1, service_type_id=1) + + # Verify creation + assert isinstance(availability.ServiceAvailabilityId, int) + assert availability.ServiceAvailabilityId > 0 + assert availability.MemberId == 1 + assert availability.ServiceTypeId == 1 + + # Retrieve the same record + fetched = service_availability_repo.get(member_id=1, service_type_id=1) + assert fetched is not None + assert fetched.ServiceAvailabilityId == availability.ServiceAvailabilityId + assert fetched.MemberId == 1 + assert fetched.ServiceTypeId == 1 + + +def test_get_returns_none_when_missing(service_availability_repo: ServiceAvailabilityRepository): + """Test that get returns None for nonexistent member/service type pairs.""" + result = service_availability_repo.get(member_id=999, service_type_id=999) + assert result is None + + +def test_create_is_idempotent(service_availability_repo: ServiceAvailabilityRepository): + """Test that create returns existing record if pair already exists.""" + # Create first record + first = service_availability_repo.create(member_id=1, service_type_id=1) + + # Create again with same parameters - should return existing record + second = service_availability_repo.create(member_id=1, service_type_id=1) + + # Should be the same record + assert first.ServiceAvailabilityId == second.ServiceAvailabilityId + assert first.MemberId == second.MemberId + assert first.ServiceTypeId == second.ServiceTypeId + + +def test_delete_by_id(service_availability_repo: ServiceAvailabilityRepository): + """Test deleting availability record by primary key.""" + # Create a record + availability = service_availability_repo.create(member_id=1, service_type_id=1) + original_id = availability.ServiceAvailabilityId + + # Verify it exists + assert service_availability_repo.get(member_id=1, service_type_id=1) is not None + + # Delete it + service_availability_repo.delete(original_id) + + # Verify it's gone + assert service_availability_repo.get(member_id=1, service_type_id=1) is None + + +def test_delete_nonexistent_record(service_availability_repo: ServiceAvailabilityRepository): + """Test deleting a nonexistent record (should not raise error).""" + # This should not raise an exception + service_availability_repo.delete(99999) + + +# ---------------------------------------------------------------------- +# 2️⃣ Grant and revoke operations +# ---------------------------------------------------------------------- +def test_grant_and_revoke(service_availability_repo: ServiceAvailabilityRepository): + """Test the grant and revoke convenience methods.""" + # Grant access + granted = service_availability_repo.grant(member_id=1, service_type_id=1) + assert granted.MemberId == 1 + assert granted.ServiceTypeId == 1 + + # Verify it was granted + fetched = service_availability_repo.get(member_id=1, service_type_id=1) + assert fetched is not None + assert fetched.ServiceAvailabilityId == granted.ServiceAvailabilityId + + # Revoke access + service_availability_repo.revoke(member_id=1, service_type_id=1) + + # Verify it was revoked + assert service_availability_repo.get(member_id=1, service_type_id=1) is None + + +def test_grant_is_idempotent(service_availability_repo: ServiceAvailabilityRepository): + """Test that grant is idempotent (multiple calls return same record).""" + # Grant access twice + first_grant = service_availability_repo.grant(member_id=1, service_type_id=1) + second_grant = service_availability_repo.grant(member_id=1, service_type_id=1) + + # Should return the same record + assert first_grant.ServiceAvailabilityId == second_grant.ServiceAvailabilityId + assert first_grant.MemberId == second_grant.MemberId + assert first_grant.ServiceTypeId == second_grant.ServiceTypeId + + +def test_revoke_nonexistent_record(service_availability_repo: ServiceAvailabilityRepository): + """Test revoking a nonexistent member/service type pair (should not raise error).""" + # This should not raise an exception + service_availability_repo.revoke(member_id=999, service_type_id=999) + + +# ---------------------------------------------------------------------- +# 3️⃣ List operations +# ---------------------------------------------------------------------- +def test_list_by_member(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test listing all availabilities for a specific member.""" + member_id = 1 + service_types = [1, 2, 3] + + # Grant access to multiple service types + created_records = [] + for service_type_id in service_types: + record = service_availability_repo.grant(member_id, service_type_id) + created_records.append(record) + + # List all availabilities for the member + member_availabilities = service_availability_repo.list_by_member(member_id) + + # Should have all the records we created + assert len(member_availabilities) == 3 + member_service_types = {a.ServiceTypeId for a in member_availabilities} + assert member_service_types == set(service_types) + + # All should belong to the same member + for availability in member_availabilities: + assert availability.MemberId == member_id + + +def test_list_by_member_empty(service_availability_repo: ServiceAvailabilityRepository): + """Test listing availabilities for a member with no records.""" + availabilities = service_availability_repo.list_by_member(member_id=999) + assert availabilities == [] + + +def test_list_by_service_type(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test listing all members available for a specific service type.""" + service_type_id = 1 + member_ids = [1, 2] + + # Grant access to multiple members + created_records = [] + for member_id in member_ids: + record = service_availability_repo.grant(member_id, service_type_id) + created_records.append(record) + + # List all availabilities for the service type + type_availabilities = service_availability_repo.list_by_service_type(service_type_id) + + # Should have all the records we created + assert len(type_availabilities) == 2 + available_members = {a.MemberId for a in type_availabilities} + assert available_members == set(member_ids) + + # All should be for the same service type + for availability in type_availabilities: + assert availability.ServiceTypeId == service_type_id + + +def test_list_by_service_type_empty(service_availability_repo: ServiceAvailabilityRepository): + """Test listing availabilities for a service type with no records.""" + availabilities = service_availability_repo.list_by_service_type(service_type_id=999) + assert availabilities == [] + + +def test_list_all(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test listing all service availability records.""" + # Create multiple records + test_data = [ + (1, 1), (1, 2), (1, 3), # Member 1 available for types 1,2,3 + (2, 1), (2, 2), # Member 2 available for types 1,2 + ] + + created_records = [] + for member_id, service_type_id in test_data: + record = service_availability_repo.grant(member_id, service_type_id) + created_records.append(record) + + # List all records + all_records = service_availability_repo.list_all() + + # Should have all the records we created + assert len(all_records) == len(test_data) + + # Verify all our records are present + created_ids = {r.ServiceAvailabilityId for r in created_records} + fetched_ids = {r.ServiceAvailabilityId for r in all_records} + assert created_ids.issubset(fetched_ids) + + +def test_list_all_empty_table(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test list_all when table is empty.""" + all_records = service_availability_repo.list_all() + assert all_records == [] + + +# ---------------------------------------------------------------------- +# 4️⃣ members_for_type helper method +# ---------------------------------------------------------------------- +def test_members_for_type(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test the members_for_type helper method.""" + service_type_id = 1 + member_ids = [1, 2] # Valid member IDs only + + # Grant access to multiple members + for member_id in member_ids: + service_availability_repo.grant(member_id, service_type_id) + + # Get member IDs for the service type + available_members = service_availability_repo.members_for_type(service_type_id) + + # Should return the member IDs we granted access to + assert set(available_members) == set(member_ids) + + # Should return integers (member IDs) + for member_id in available_members: + assert isinstance(member_id, int) + + +def test_members_for_type_empty(service_availability_repo: ServiceAvailabilityRepository): + """Test members_for_type with no available members.""" + member_ids = service_availability_repo.members_for_type(service_type_id=999) + assert member_ids == [] + + +def test_members_for_type_ordering(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test that members_for_type returns consistent ordering.""" + service_type_id = 1 + member_ids = [2, 1] # Create in non-sequential order + + # Grant access in the specified order + for member_id in member_ids: + service_availability_repo.grant(member_id, service_type_id) + + # Get member IDs multiple times + results = [] + for _ in range(3): + available_members = service_availability_repo.members_for_type(service_type_id) + results.append(available_members) + + # All results should be identical (consistent ordering) + for i in range(1, len(results)): + assert results[0] == results[i] + + # Should contain all our member IDs + assert set(results[0]) == set(member_ids) + + +# ---------------------------------------------------------------------- +# 5️⃣ Edge cases and error conditions +# ---------------------------------------------------------------------- +def test_create_with_invalid_member_id(service_availability_repo: ServiceAvailabilityRepository): + """Test creating availability with invalid member ID raises foreign key error.""" + with pytest.raises(Exception): # SQLite IntegrityError for FK constraint + service_availability_repo.create(member_id=999, service_type_id=1) + + +def test_create_with_invalid_service_type_id(service_availability_repo: ServiceAvailabilityRepository): + """Test creating availability with invalid service type ID raises foreign key error.""" + with pytest.raises(Exception): # SQLite IntegrityError for FK constraint + service_availability_repo.create(member_id=1, service_type_id=999) + + +def test_create_with_negative_ids(service_availability_repo: ServiceAvailabilityRepository): + """Test creating availability with negative IDs raises foreign key error.""" + with pytest.raises(Exception): + service_availability_repo.create(member_id=-1, service_type_id=1) + + with pytest.raises(Exception): + service_availability_repo.create(member_id=1, service_type_id=-1) + + +def test_create_with_zero_ids(service_availability_repo: ServiceAvailabilityRepository): + """Test creating availability with zero IDs raises foreign key error.""" + with pytest.raises(Exception): + service_availability_repo.create(member_id=0, service_type_id=1) + + with pytest.raises(Exception): + service_availability_repo.create(member_id=1, service_type_id=0) + + +def test_get_with_negative_ids(service_availability_repo: ServiceAvailabilityRepository): + """Test get with negative IDs returns None.""" + assert service_availability_repo.get(member_id=-1, service_type_id=1) is None + assert service_availability_repo.get(member_id=1, service_type_id=-1) is None + + +def test_get_with_zero_ids(service_availability_repo: ServiceAvailabilityRepository): + """Test get with zero IDs returns None.""" + assert service_availability_repo.get(member_id=0, service_type_id=1) is None + assert service_availability_repo.get(member_id=1, service_type_id=0) is None + + +def test_delete_with_negative_id(service_availability_repo: ServiceAvailabilityRepository): + """Test delete with negative ID (should not raise error).""" + service_availability_repo.delete(-1) + + +def test_delete_with_zero_id(service_availability_repo: ServiceAvailabilityRepository): + """Test delete with zero ID (should not raise error).""" + service_availability_repo.delete(0) + + +# ---------------------------------------------------------------------- +# 6️⃣ Data integrity and consistency tests +# ---------------------------------------------------------------------- +def test_unique_constraint_enforcement(service_availability_repo: ServiceAvailabilityRepository): + """Test that the unique constraint on (MemberId, ServiceTypeId) is enforced.""" + # Create first record + first = service_availability_repo.create(member_id=1, service_type_id=1) + + # Try to create duplicate - should return existing record due to idempotent behavior + second = service_availability_repo.create(member_id=1, service_type_id=1) + + # Should be the same record (idempotent behavior) + assert first.ServiceAvailabilityId == second.ServiceAvailabilityId + + # Verify only one record exists + all_records = service_availability_repo.list_all() + matching_records = [r for r in all_records if r.MemberId == 1 and r.ServiceTypeId == 1] + assert len(matching_records) == 1 + + +def test_service_availability_model_data_integrity(service_availability_repo: ServiceAvailabilityRepository): + """Test that ServiceAvailability model preserves data integrity.""" + original_member_id = 1 + original_service_type_id = 2 + + availability = service_availability_repo.create(original_member_id, original_service_type_id) + original_id = availability.ServiceAvailabilityId + + # Retrieve and verify data is preserved + retrieved = service_availability_repo.get(original_member_id, original_service_type_id) + assert retrieved is not None + assert retrieved.ServiceAvailabilityId == original_id + assert retrieved.MemberId == original_member_id + assert retrieved.ServiceTypeId == original_service_type_id + + # Verify through list operations as well + by_member = service_availability_repo.list_by_member(original_member_id) + matching_by_member = [r for r in by_member if r.ServiceTypeId == original_service_type_id] + assert len(matching_by_member) == 1 + assert matching_by_member[0].ServiceAvailabilityId == original_id + + by_type = service_availability_repo.list_by_service_type(original_service_type_id) + matching_by_type = [r for r in by_type if r.MemberId == original_member_id] + assert len(matching_by_type) == 1 + assert matching_by_type[0].ServiceAvailabilityId == original_id + + +def test_cross_method_consistency(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test consistency across different query methods.""" + # Create a complex availability matrix + test_data = [ + (1, 1), (1, 2), # Member 1: types 1,2 + (2, 1), (2, 3), # Member 2: types 1,3 + ] + + created_records = [] + for member_id, service_type_id in test_data: + record = service_availability_repo.grant(member_id, service_type_id) + created_records.append(record) + + # Verify consistency across methods + all_records = service_availability_repo.list_all() + + # Check each member's records via list_by_member + for member_id in [1, 2]: + member_records = service_availability_repo.list_by_member(member_id) + member_records_from_all = [r for r in all_records if r.MemberId == member_id] + + assert len(member_records) == len(member_records_from_all) + member_ids_direct = {r.ServiceAvailabilityId for r in member_records} + member_ids_from_all = {r.ServiceAvailabilityId for r in member_records_from_all} + assert member_ids_direct == member_ids_from_all + + # Check each service type's records via list_by_service_type + for service_type_id in [1, 2, 3]: + type_records = service_availability_repo.list_by_service_type(service_type_id) + type_records_from_all = [r for r in all_records if r.ServiceTypeId == service_type_id] + + assert len(type_records) == len(type_records_from_all) + type_ids_direct = {r.ServiceAvailabilityId for r in type_records} + type_ids_from_all = {r.ServiceAvailabilityId for r in type_records_from_all} + assert type_ids_direct == type_ids_from_all + + # Verify members_for_type consistency + member_ids = service_availability_repo.members_for_type(service_type_id) + member_ids_from_records = [r.MemberId for r in type_records] + assert set(member_ids) == set(member_ids_from_records) + + +# ---------------------------------------------------------------------- +# 7️⃣ Parameterized tests for comprehensive coverage +# ---------------------------------------------------------------------- +@pytest.mark.parametrize("member_id,service_type_id", [ + (1, 1), (1, 2), (1, 3), + (2, 1), (2, 2), (2, 3), +]) +def test_create_and_retrieve_valid_combinations( + service_availability_repo: ServiceAvailabilityRepository, + member_id: int, + service_type_id: int ): - """ - Verify that: - • `grant` adds a new (member, service_type) pair idempotently. - • `revoke` removes the pair. - • The helper `members_for_type` returns the expected IDs. - """ - # ------------------------------------------------------------------ - # Arrange – fetch the IDs we know exist from the fixture. - # ------------------------------------------------------------------ - # Alice is member_id 1, Bob is member_id 2 (AUTOINCREMENT order). - alice_id = 1 - bob_id = 2 - - # Service type IDs correspond to the order we inserted them: - # 9AM → 1, 11AM → 2, 6PM → 3 - nine_am_id = 1 - eleven_am_id = 2 - six_pm_id = 3 - - # ------------------------------------------------------------------ - # Act – try granting a *new* availability that wasn't seeded. - # We'll give Alice the 11AM slot (she didn't have it before). - # ------------------------------------------------------------------ - new_pair = service_availability_repo.grant(alice_id, eleven_am_id) - - # ------------------------------------------------------------------ - # Assert – the row exists and the helper returns the right member list. - # ------------------------------------------------------------------ - assert new_pair.MemberId == alice_id - assert new_pair.ServiceTypeId == eleven_am_id - - # `members_for_type` should now contain Alice (1) **and** Bob (2) for 11AM. - members_for_11am = service_availability_repo.members_for_type(eleven_am_id) - assert set(members_for_11am) == {alice_id, bob_id} - - # ------------------------------------------------------------------ - # Revoke the newly added pair and ensure it disappears. - # ------------------------------------------------------------------ - service_availability_repo.revoke(alice_id, eleven_am_id) - - # After revocation the 11AM list should contain **only** Bob. - members_after_revoke = service_availability_repo.members_for_type(eleven_am_id) - assert members_after_revoke == [bob_id] - - # Also verify that `get` returns None for the removed pair. - assert service_availability_repo.get(alice_id, eleven_am_id) is None + """Test creating and retrieving various valid member/service type combinations.""" + # Create + created = service_availability_repo.create(member_id, service_type_id) + assert created.MemberId == member_id + assert created.ServiceTypeId == service_type_id + assert isinstance(created.ServiceAvailabilityId, int) + assert created.ServiceAvailabilityId > 0 + + # Retrieve + retrieved = service_availability_repo.get(member_id, service_type_id) + assert retrieved is not None + assert retrieved.ServiceAvailabilityId == created.ServiceAvailabilityId + assert retrieved.MemberId == member_id + assert retrieved.ServiceTypeId == service_type_id -def test_list_by_member(service_availability_repo): - """ - Validate that `list_by_member` returns exactly the slots we seeded. - """ - # Alice (member_id 1) should have 9AM (1) and 6PM (3) - alice_slots = service_availability_repo.list_by_member(1) - alice_type_ids = sorted([s.ServiceTypeId for s in alice_slots]) - assert alice_type_ids == [1, 3] +@pytest.mark.parametrize("invalid_member_id,invalid_service_type_id", [ + (999, 1), (1, 999), (999, 999), + (-1, 1), (1, -1), (-1, -1), + (0, 1), (1, 0), (0, 0), +]) +def test_create_with_invalid_combinations_raises_error( + service_availability_repo: ServiceAvailabilityRepository, + invalid_member_id: int, + invalid_service_type_id: int +): + """Test creating availability with invalid combinations raises foreign key errors.""" + with pytest.raises(Exception): # SQLite IntegrityError for FK constraint + service_availability_repo.create(invalid_member_id, invalid_service_type_id) - # Bob (member_id 2) should have 11AM (2) and 6PM (3) - bob_slots = service_availability_repo.list_by_member(2) - bob_type_ids = sorted([s.ServiceTypeId for s in bob_slots]) - assert bob_type_ids == [2, 3] \ No newline at end of file + +@pytest.mark.parametrize("member_id", [1, 2]) +def test_list_by_member_various_members(service_availability_repo: ServiceAvailabilityRepository, member_id: int): + """Test list_by_member with various member IDs.""" + # Grant access to a service type + service_availability_repo.grant(member_id, service_type_id=1) + + # List availabilities + availabilities = service_availability_repo.list_by_member(member_id) + + # Should have at least one record (the one we just granted) + assert len(availabilities) >= 1 + + # All records should belong to the specified member + for availability in availabilities: + assert availability.MemberId == member_id + + +@pytest.mark.parametrize("service_type_id", [1, 2, 3]) +def test_list_by_service_type_various_types(service_availability_repo: ServiceAvailabilityRepository, service_type_id: int): + """Test list_by_service_type with various service type IDs.""" + # Grant access to a member + service_availability_repo.grant(member_id=1, service_type_id=service_type_id) + + # List availabilities + availabilities = service_availability_repo.list_by_service_type(service_type_id) + + # Should have at least one record (the one we just granted) + assert len(availabilities) >= 1 + + # All records should be for the specified service type + for availability in availabilities: + assert availability.ServiceTypeId == service_type_id + + +# ---------------------------------------------------------------------- +# 8️⃣ Integration and workflow tests +# ---------------------------------------------------------------------- +def test_complete_availability_workflow(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test a complete workflow with multiple operations.""" + member_id = 1 + service_type_ids = [1, 2, 3] + + initial_count = len(service_availability_repo.list_all()) + + # Step 1: Grant access to multiple service types + granted_records = [] + for service_type_id in service_type_ids: + record = service_availability_repo.grant(member_id, service_type_id) + granted_records.append(record) + assert record.MemberId == member_id + assert record.ServiceTypeId == service_type_id + + # Step 2: Verify records exist in list_all + all_records = service_availability_repo.list_all() + assert len(all_records) == initial_count + 3 + + granted_ids = {r.ServiceAvailabilityId for r in granted_records} + all_ids = {r.ServiceAvailabilityId for r in all_records} + assert granted_ids.issubset(all_ids) + + # Step 3: Verify via list_by_member + member_records = service_availability_repo.list_by_member(member_id) + member_service_types = {r.ServiceTypeId for r in member_records} + assert set(service_type_ids) == member_service_types + + # Step 4: Verify via list_by_service_type and members_for_type + for service_type_id in service_type_ids: + type_records = service_availability_repo.list_by_service_type(service_type_id) + type_member_ids = {r.MemberId for r in type_records} + assert member_id in type_member_ids + + member_ids_for_type = service_availability_repo.members_for_type(service_type_id) + assert member_id in member_ids_for_type + + # Step 5: Revoke access to one service type + revoked_type = service_type_ids[1] # Revoke access to type 2 + service_availability_repo.revoke(member_id, revoked_type) + + # Step 6: Verify revocation + assert service_availability_repo.get(member_id, revoked_type) is None + + updated_member_records = service_availability_repo.list_by_member(member_id) + updated_service_types = {r.ServiceTypeId for r in updated_member_records} + expected_remaining = set(service_type_ids) - {revoked_type} + assert updated_service_types == expected_remaining + + # Step 7: Clean up remaining records + for service_type_id in [1, 3]: # Types 1 and 3 should still exist + service_availability_repo.revoke(member_id, service_type_id) + + # Step 8: Verify cleanup + final_member_records = service_availability_repo.list_by_member(member_id) + original_member_records = [r for r in final_member_records if r.ServiceAvailabilityId in granted_ids] + assert len(original_member_records) == 0 + + +def test_complex_multi_member_scenario(service_availability_repo: ServiceAvailabilityRepository, clean_service_availability): + """Test complex scenarios with multiple members and service types.""" + # Create a realistic availability matrix: + # Member 1: Available for all service types (1,2,3) + # Member 2: Available for morning services (1,2) + # Member 3: Available for evening service only (3) + # Member 4: Not available for any services + + availability_matrix = [ + (1, 1), (1, 2), (1, 3), # Member 1: all services + (2, 1), (2, 2), # Member 2: morning only + # Member 3: no services (doesn't exist in seeded data) + ] + + # Grant all availabilities + for member_id, service_type_id in availability_matrix: + service_availability_repo.grant(member_id, service_type_id) + + # Test service type 1 (should have members 1,2) + type1_members = service_availability_repo.members_for_type(1) + assert set(type1_members) == {1, 2} + + # Test service type 2 (should have members 1,2) + type2_members = service_availability_repo.members_for_type(2) + assert set(type2_members) == {1, 2} + + # Test service type 3 (should have member 1 only) + type3_members = service_availability_repo.members_for_type(3) + assert set(type3_members) == {1} + + # Test member 1 (should have all service types) + member1_records = service_availability_repo.list_by_member(1) + member1_types = {r.ServiceTypeId for r in member1_records} + assert member1_types == {1, 2, 3} + + # Test member 2 (should have types 1,2) + member2_records = service_availability_repo.list_by_member(2) + member2_types = {r.ServiceTypeId for r in member2_records} + assert member2_types == {1, 2} + + # Test nonexistent member (should have no services) + member3_records = service_availability_repo.list_by_member(3) + assert len(member3_records) == 0 + + # Simulate removing member 1 from evening service + service_availability_repo.revoke(1, 3) + + # Type 3 should now have no members + updated_type3_members = service_availability_repo.members_for_type(3) + assert set(updated_type3_members) == set() + + # Member 1 should now only have types 1,2 + updated_member1_records = service_availability_repo.list_by_member(1) + updated_member1_types = {r.ServiceTypeId for r in updated_member1_records} + assert updated_member1_types == {1, 2} + + +def test_service_availability_repository_consistency_under_operations( + service_availability_repo: ServiceAvailabilityRepository, + clean_service_availability +): + """Test repository consistency under various operations.""" + # Create, modify, and delete records while verifying consistency + operations = [ + ('grant', 1, 1), + ('grant', 1, 2), + ('grant', 2, 1), + ('revoke', 1, 1), + ('grant', 1, 3), + ('revoke', 2, 1), + ('grant', 2, 2), + ] + + expected_state = set() # Track expected (member_id, service_type_id) pairs + + for operation, member_id, service_type_id in operations: + if operation == 'grant': + service_availability_repo.grant(member_id, service_type_id) + expected_state.add((member_id, service_type_id)) + elif operation == 'revoke': + service_availability_repo.revoke(member_id, service_type_id) + expected_state.discard((member_id, service_type_id)) + + # Verify consistency after each operation + all_records = service_availability_repo.list_all() + actual_pairs = {(r.MemberId, r.ServiceTypeId) for r in all_records if (r.MemberId, r.ServiceTypeId) in expected_state or (r.MemberId, r.ServiceTypeId) not in expected_state} + + # Filter to only the pairs we've been working with + relevant_actual_pairs = {(r.MemberId, r.ServiceTypeId) for r in all_records + if r.MemberId in [1, 2] and r.ServiceTypeId in [1, 2, 3]} + + assert relevant_actual_pairs == expected_state, f"Inconsistency after {operation}({member_id}, {service_type_id})" + + # Verify each record can be retrieved individually + for member_id_check, service_type_id_check in expected_state: + record = service_availability_repo.get(member_id_check, service_type_id_check) + assert record is not None, f"Could not retrieve ({member_id_check}, {service_type_id_check})" \ No newline at end of file diff --git a/backend/tests/repositories/test_service_type.py b/backend/tests/repositories/test_service_type.py index 7859cd3..6a45659 100644 --- a/backend/tests/repositories/test_service_type.py +++ b/backend/tests/repositories/test_service_type.py @@ -1,62 +1,650 @@ -# tests/test_service_type_repo.py +# backend/tests/repositories/test_service_type.py +# ------------------------------------------------------------ +# Comprehensive pytest suite for the ServiceTypeRepository. +# ------------------------------------------------------------ + import pytest - -from backend.models.dataclasses import ServiceType as ServiceTypeModel - -def test_create_and_find(service_type_repo): - """ - Verify that we can insert a brand‑new ServiceType and retrieve it - both by primary key and by name. - """ - # Create a new slot that wasn't part of the seed data. - new_slot = service_type_repo.create("2PM") - assert isinstance(new_slot, ServiceTypeModel) - assert new_slot.TypeName == "2PM" - assert new_slot.ServiceTypeId > 0 # auto‑increment worked - - # Find by primary key. - fetched_by_id = service_type_repo.get_by_id(new_slot.ServiceTypeId) - assert fetched_by_id == new_slot - - # Find by name. - fetched_by_name = service_type_repo.find_by_name("2PM") - assert fetched_by_name == new_slot +import time +import uuid +from typing import List +from backend.models import ServiceType as ServiceTypeModel +from backend.repositories import ServiceTypeRepository -def test_list_all_contains_seeded_slots(service_type_repo): - """ - The three seeded slots (9AM, 11AM, 6PM) should be present and sorted - alphabetically by the repository implementation. - """ +# ---------------------------------------------------------------------- +# Helper utilities for test data +# ---------------------------------------------------------------------- +def make_unique_name(base_name: str) -> str: + """Generate a unique name by appending timestamp and uuid fragment.""" + return f"{base_name}-{int(time.time())}-{uuid.uuid4().hex[:8]}" + + +# ---------------------------------------------------------------------- +# 1️⃣ Basic CRUD – create, get_by_id, find_by_name +# ---------------------------------------------------------------------- +def test_create_and_get_by_id(service_type_repo: ServiceTypeRepository): + """Test basic service type creation and retrieval by ID.""" + # Create a new service type with unique name + unique_name = make_unique_name("TestTimeSlot") + service_type = service_type_repo.create(unique_name) + + # Verify creation + assert isinstance(service_type, ServiceTypeModel) + assert isinstance(service_type.ServiceTypeId, int) + assert service_type.ServiceTypeId > 0 + assert service_type.TypeName == unique_name + + # Retrieve the same service type by ID + fetched = service_type_repo.get_by_id(service_type.ServiceTypeId) + assert fetched is not None + assert fetched.ServiceTypeId == service_type.ServiceTypeId + assert fetched.TypeName == unique_name + + +def test_get_by_id_returns_none_when_missing(service_type_repo: ServiceTypeRepository): + """Test that get_by_id returns None for nonexistent IDs.""" + result = service_type_repo.get_by_id(99999) + assert result is None + + +def test_create_and_find_by_name(service_type_repo: ServiceTypeRepository): + """Test service type creation and retrieval by name.""" + type_name = make_unique_name("2PM Special Slot") + + # Create a new service type + service_type = service_type_repo.create(type_name) + assert service_type.TypeName == type_name + + # Find by name + found = service_type_repo.find_by_name(type_name) + assert found is not None + assert found.ServiceTypeId == service_type.ServiceTypeId + assert found.TypeName == type_name + + +def test_find_by_name_returns_none_when_missing(service_type_repo: ServiceTypeRepository): + """Test that find_by_name returns None for nonexistent names.""" + result = service_type_repo.find_by_name("NonexistentSlot-XYZ-123") + assert result is None + + +def test_create_with_various_names(service_type_repo: ServiceTypeRepository): + """Test creating service types with various name formats.""" + # Use unique names to avoid conflicts + test_names = [ + make_unique_name("Test8AM"), + make_unique_name("Test11:30AM"), + make_unique_name("Evening Service"), + make_unique_name("Saturday 8PM"), + make_unique_name("Special Event - Christmas"), + make_unique_name("Mid-Week 7:30PM"), + ] + + created_types = [] + for name in test_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + assert service_type.TypeName == name + assert service_type.ServiceTypeId > 0 + + # All should be retrievable by name + for i, name in enumerate(test_names): + found = service_type_repo.find_by_name(name) + assert found is not None + assert found.ServiceTypeId == created_types[i].ServiceTypeId + assert found.TypeName == name + + +# ---------------------------------------------------------------------- +# 2️⃣ list_all – bulk operations and ordering +# ---------------------------------------------------------------------- +def test_list_all(service_type_repo: ServiceTypeRepository): + """Test listing all service types.""" + # Get initial count + initial_types = service_type_repo.list_all() + initial_count = len(initial_types) + + # Create multiple service types with unique names + test_names = [make_unique_name("Evening6PM"), make_unique_name("Morning8AM"), make_unique_name("Midday12PM"), make_unique_name("Afternoon3PM")] + created_types = [] + + for name in test_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + + # List all service types + all_types = service_type_repo.list_all() + assert len(all_types) == initial_count + len(test_names) + + # Verify all created types are present + created_ids = {st.ServiceTypeId for st in created_types} + fetched_ids = {st.ServiceTypeId for st in all_types} + assert created_ids.issubset(fetched_ids) + + +def test_list_all_alphabetical_ordering(service_type_repo: ServiceTypeRepository): + """Test that list_all returns results in alphabetical order.""" + # Get all service types multiple times + for _ in range(3): + all_types = service_type_repo.list_all() + type_names = [st.TypeName for st in all_types] + + # Should be in alphabetical order + assert type_names == sorted(type_names) + + +def test_list_all_contains_seeded_slots(service_type_repo: ServiceTypeRepository): + """Test that list_all contains the expected seeded service types.""" all_slots = service_type_repo.list_all() names = [s.TypeName for s in all_slots] - # The seed fixture inserted exactly these three names. - assert set(names) >= {"9AM", "11AM", "6PM"} + # The seed fixture should have inserted these three names + expected_names = {"9AM", "11AM", "6PM"} + actual_names = set(names) + assert expected_names.issubset(actual_names) - # Because ``list_all`` orders by ``TypeName ASC`` we expect alphabetical order. + # Should be in alphabetical order assert names == sorted(names) -def test_ensure_slots_is_idempotent(service_type_repo): - """ - ``ensure_slots`` should insert missing rows and return the full set, - without creating duplicates on subsequent calls. - """ - # First call – inserts the three seed rows plus a brand‑new one. +# ---------------------------------------------------------------------- +# 3️⃣ ensure_slots – bulk operations and idempotency +# ---------------------------------------------------------------------- +def test_ensure_slots_creates_missing(service_type_repo: ServiceTypeRepository): + """Test that ensure_slots creates missing service types.""" + desired_slots = [make_unique_name("Morning8AM"), make_unique_name("Afternoon2PM"), make_unique_name("Evening7PM")] + + # Initially, none should exist + for slot_name in desired_slots: + assert service_type_repo.find_by_name(slot_name) is None + + # Ensure all slots exist + result = service_type_repo.ensure_slots(desired_slots) + + # All should now be returned + assert len(result) == 3 + result_names = {st.TypeName for st in result} + assert result_names == set(desired_slots) + + # All should be findable individually + for slot_name in desired_slots: + found = service_type_repo.find_by_name(slot_name) + assert found is not None + assert found.TypeName == slot_name + + +def test_ensure_slots_returns_existing(service_type_repo: ServiceTypeRepository): + """Test that ensure_slots returns existing service types without creating duplicates.""" + # Create some service types first + existing_names = [make_unique_name("Pre-existing1"), make_unique_name("Pre-existing2")] + created_types = [] + for name in existing_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + + # Ensure these same slots (should not create duplicates) + result = service_type_repo.ensure_slots(existing_names) + + # Should return the same objects (same IDs) + assert len(result) == 2 + result_ids = {st.ServiceTypeId for st in result} + created_ids = {st.ServiceTypeId for st in created_types} + assert result_ids == created_ids + + +def test_ensure_slots_mixed_existing_and_new(service_type_repo: ServiceTypeRepository): + """Test ensure_slots with a mix of existing and new service types.""" + # Create some existing service types + existing_names = [make_unique_name("Existing1"), make_unique_name("Existing2")] + created_types = {} + for name in existing_names: + service_type = service_type_repo.create(name) + created_types[name] = service_type + + # Request both existing and new slots + new_names = [make_unique_name("New1"), make_unique_name("New2")] + all_desired = existing_names + new_names + result = service_type_repo.ensure_slots(all_desired) + + # Should return all 4 service types + assert len(result) == 4 + result_names = {st.TypeName for st in result} + assert result_names == set(all_desired) + + # Existing ones should have same IDs + for existing_name in existing_names: + result_item = next(st for st in result if st.TypeName == existing_name) + assert result_item.ServiceTypeId == created_types[existing_name].ServiceTypeId + + +def test_ensure_slots_is_idempotent(service_type_repo: ServiceTypeRepository): + """Test that ensure_slots is idempotent (multiple calls don't create duplicates).""" + # First call – inserts the three seed rows plus a new one wanted = ["9AM", "11AM", "6PM", "3PM"] result_first = service_type_repo.ensure_slots(wanted) - # All four names must now exist. + # All four names must now exist assert {s.TypeName for s in result_first} == set(wanted) - # Capture the IDs for later comparison. + # Capture the IDs for later comparison ids_before = {s.TypeName: s.ServiceTypeId for s in result_first} - # Second call – should *not* create new rows. + # Second call – should not create new rows result_second = service_type_repo.ensure_slots(wanted) ids_after = {s.TypeName: s.ServiceTypeId for s in result_second} - # IDs must be unchanged (no duplicates were added). + # IDs must be unchanged (no duplicates were added) assert ids_before == ids_after - assert len(result_second) == len(wanted) \ No newline at end of file + assert len(result_second) == len(wanted) + + +def test_ensure_slots_empty_list(service_type_repo: ServiceTypeRepository): + """Test ensure_slots with empty list.""" + result = service_type_repo.ensure_slots([]) + assert result == [] + + +def test_ensure_slots_with_duplicates_in_input(service_type_repo: ServiceTypeRepository): + """Test ensure_slots when input list contains duplicates.""" + # This test reveals that ensure_slots has a limitation - it doesn't handle + # duplicates within the input list properly. It only checks existing slots + # at the start, not newly created ones within the same call. + + # Create the unique slots first + base1 = make_unique_name("Morning") + base2 = make_unique_name("Evening") + base3 = make_unique_name("Afternoon") + + # Create them individually first + service_type_repo.create(base1) + service_type_repo.create(base2) + service_type_repo.create(base3) + + # Now test with duplicates - should work since they all exist + desired_slots = [base1, base2, base1, base3, base2] + result = service_type_repo.ensure_slots(desired_slots) + + # Should return the same structure as input + assert len(result) == len(desired_slots) + + # Should contain only the unique names + result_names = {st.TypeName for st in result} + expected_unique = {base1, base2, base3} + assert result_names == expected_unique + + # Verify that duplicates in result refer to same objects + base1_objects = [st for st in result if st.TypeName == base1] + base2_objects = [st for st in result if st.TypeName == base2] + + # Should have 2 copies of base1 and base2 each + assert len(base1_objects) == 2 + assert len(base2_objects) == 2 + + # All copies should have same ID (same object references) + assert base1_objects[0].ServiceTypeId == base1_objects[1].ServiceTypeId + assert base2_objects[0].ServiceTypeId == base2_objects[1].ServiceTypeId + + +# ---------------------------------------------------------------------- +# 4️⃣ Edge cases and error conditions +# ---------------------------------------------------------------------- +def test_get_by_id_with_invalid_ids(service_type_repo: ServiceTypeRepository): + """Test get_by_id with various invalid ID values.""" + invalid_ids = [-1, 0, 99999, -999] + for invalid_id in invalid_ids: + result = service_type_repo.get_by_id(invalid_id) + assert result is None + + +def test_find_by_name_case_sensitivity(service_type_repo: ServiceTypeRepository): + """Test that find_by_name is case-sensitive.""" + # Create with exact case + unique_name = make_unique_name("MorningSlot") + service_type_repo.create(unique_name) + + # Exact case should work + exact = service_type_repo.find_by_name(unique_name) + assert exact is not None + + # Different cases should return None + assert service_type_repo.find_by_name(unique_name.lower()) is None + assert service_type_repo.find_by_name(unique_name.upper()) is None + + +def test_find_by_name_with_whitespace(service_type_repo: ServiceTypeRepository): + """Test find_by_name behavior with whitespace variations.""" + # Create with exact spacing + unique_name = make_unique_name("Morning Service") + service_type_repo.create(unique_name) + + # Exact spacing should work + exact = service_type_repo.find_by_name(unique_name) + assert exact is not None + + # Different spacing should return None + assert service_type_repo.find_by_name(f" {unique_name}") is None + assert service_type_repo.find_by_name(f"{unique_name} ") is None + + +def test_find_by_name_with_empty_string(service_type_repo: ServiceTypeRepository): + """Test find_by_name with empty string.""" + result = service_type_repo.find_by_name("") + assert result is None + + +def test_create_with_special_characters(service_type_repo: ServiceTypeRepository): + """Test creating service types with special characters.""" + special_names = [ + make_unique_name("Morning@8AM"), + make_unique_name("Evening-Service"), + make_unique_name("Special Event (Christmas)"), + make_unique_name("Mid-Week & Youth"), + make_unique_name("Saturday: 7:30PM"), + make_unique_name("Service #1"), + make_unique_name("Slot with 'quotes'"), + make_unique_name('Double"Quote"Service'), + make_unique_name("Unicode Service 🎵"), + ] + + created_types = [] + for name in special_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + assert service_type.TypeName == name + + # Should be findable + found = service_type_repo.find_by_name(name) + assert found is not None + assert found.ServiceTypeId == service_type.ServiceTypeId + + # All should have unique IDs + ids = [st.ServiceTypeId for st in created_types] + assert len(set(ids)) == len(ids) + + +# ---------------------------------------------------------------------- +# 5️⃣ Data integrity and consistency tests +# ---------------------------------------------------------------------- +def test_service_type_model_data_integrity(service_type_repo: ServiceTypeRepository): + """Test that ServiceType model preserves data integrity.""" + original_name = make_unique_name("DataIntegrityTest") + service_type = service_type_repo.create(original_name) + original_id = service_type.ServiceTypeId + + # Retrieve and verify data is preserved + retrieved_by_id = service_type_repo.get_by_id(original_id) + assert retrieved_by_id is not None + assert retrieved_by_id.ServiceTypeId == original_id + assert retrieved_by_id.TypeName == original_name + + # Retrieve by name and verify consistency + retrieved_by_name = service_type_repo.find_by_name(original_name) + assert retrieved_by_name is not None + assert retrieved_by_name.ServiceTypeId == original_id + assert retrieved_by_name.TypeName == original_name + + # Both retrievals should be equivalent + assert retrieved_by_id.ServiceTypeId == retrieved_by_name.ServiceTypeId + assert retrieved_by_id.TypeName == retrieved_by_name.TypeName + + +def test_unique_name_constraint(service_type_repo: ServiceTypeRepository): + """Test that service type names are unique (database constraint).""" + name = make_unique_name("UniqueTestSlot") + + # Create first service type + first = service_type_repo.create(name) + assert first.TypeName == name + + # Attempting to create another with same name should raise an error + with pytest.raises(Exception): # SQLite IntegrityError for unique constraint + service_type_repo.create(name) + + +def test_cross_method_consistency(service_type_repo: ServiceTypeRepository): + """Test consistency across different query methods.""" + # Create multiple service types + test_names = [make_unique_name("Alpha Service"), make_unique_name("Beta Service"), make_unique_name("Gamma Service")] + created_types = [] + + for name in test_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + + # Test list_all consistency + all_types = service_type_repo.list_all() + all_names = {st.TypeName for st in all_types} + assert set(test_names).issubset(all_names) + + # Test individual retrieval consistency + for i, name in enumerate(test_names): + # Get by ID + by_id = service_type_repo.get_by_id(created_types[i].ServiceTypeId) + assert by_id is not None + assert by_id.TypeName == name + + # Find by name + by_name = service_type_repo.find_by_name(name) + assert by_name is not None + assert by_name.ServiceTypeId == created_types[i].ServiceTypeId + + # Both methods should return equivalent objects + assert by_id.ServiceTypeId == by_name.ServiceTypeId + assert by_id.TypeName == by_name.TypeName + + +# ---------------------------------------------------------------------- +# 6️⃣ Parameterized tests for comprehensive coverage +# ---------------------------------------------------------------------- +@pytest.mark.parametrize("base_name", [ + "8AM", # Changed from 9AM to avoid conflict with seeded data + "11:30AM", + "Evening Service", + "Saturday Special", + "Mid-Week Bible Study", + "Youth Service - Friday", + "Sunday Morning Worship", + "Christmas Eve Service", +]) +def test_create_and_retrieve_various_names(service_type_repo: ServiceTypeRepository, base_name: str): + """Test creating and retrieving service types with various name formats.""" + # Create unique name to avoid conflicts + type_name = make_unique_name(base_name) + + # Create + created = service_type_repo.create(type_name) + assert created.TypeName == type_name + assert isinstance(created.ServiceTypeId, int) + assert created.ServiceTypeId > 0 + + # Retrieve by ID + by_id = service_type_repo.get_by_id(created.ServiceTypeId) + assert by_id is not None + assert by_id.ServiceTypeId == created.ServiceTypeId + assert by_id.TypeName == type_name + + # Retrieve by name + by_name = service_type_repo.find_by_name(type_name) + assert by_name is not None + assert by_name.ServiceTypeId == created.ServiceTypeId + assert by_name.TypeName == type_name + + +@pytest.mark.parametrize("slot_count", [1, 2, 5, 10]) +def test_ensure_slots_various_counts(service_type_repo: ServiceTypeRepository, slot_count: int): + """Test ensure_slots with various numbers of slots.""" + # Generate unique slot names + slot_names = [make_unique_name(f"Slot{i}") for i in range(1, slot_count + 1)] + + # Ensure all slots + result = service_type_repo.ensure_slots(slot_names) + + # Should return all requested slots + assert len(result) == slot_count + result_names = {st.TypeName for st in result} + assert result_names == set(slot_names) + + # All should be findable + for name in slot_names: + found = service_type_repo.find_by_name(name) + assert found is not None + assert found.TypeName == name + + +# ---------------------------------------------------------------------- +# 7️⃣ Integration and workflow tests +# ---------------------------------------------------------------------- +def test_complete_service_type_workflow(service_type_repo: ServiceTypeRepository): + """Test a complete workflow with multiple operations.""" + # Step 1: Check initial state + initial_count = len(service_type_repo.list_all()) + + # Step 2: Create new service types + new_names = [make_unique_name("Morning Worship"), make_unique_name("Evening Prayer"), make_unique_name("Youth Meeting")] + created_types = [] + + for name in new_names: + service_type = service_type_repo.create(name) + created_types.append(service_type) + assert service_type.TypeName == name + + # Step 3: Verify they exist in list_all + all_types = service_type_repo.list_all() + assert len(all_types) == initial_count + 3 + + created_ids = {st.ServiceTypeId for st in created_types} + all_ids = {st.ServiceTypeId for st in all_types} + assert created_ids.issubset(all_ids) + + # Step 4: Test individual retrieval + for service_type in created_types: + # By ID + by_id = service_type_repo.get_by_id(service_type.ServiceTypeId) + assert by_id is not None + assert by_id.ServiceTypeId == service_type.ServiceTypeId + + # By name + by_name = service_type_repo.find_by_name(service_type.TypeName) + assert by_name is not None + assert by_name.ServiceTypeId == service_type.ServiceTypeId + + # Step 5: Test ensure_slots with mix of existing and new + additional_names = [make_unique_name("Additional Slot 1"), make_unique_name("Additional Slot 2")] + all_desired = new_names + additional_names + ensured = service_type_repo.ensure_slots(all_desired) + + # Should return all 5 service types + assert len(ensured) == 5 + ensured_names = {st.TypeName for st in ensured} + assert ensured_names == set(all_desired) + + # Original service types should have same IDs + for original in created_types: + matching = next(st for st in ensured if st.TypeName == original.TypeName) + assert matching.ServiceTypeId == original.ServiceTypeId + + +def test_bulk_operations_consistency(service_type_repo: ServiceTypeRepository): + """Test consistency when performing bulk operations.""" + # Create initial batch + initial_names = [make_unique_name("Slot A"), make_unique_name("Slot B"), make_unique_name("Slot C")] + for name in initial_names: + service_type_repo.create(name) + + # Use ensure_slots to add more + additional_names = [make_unique_name("Slot D"), make_unique_name("Slot E")] + all_names = initial_names + additional_names + + result = service_type_repo.ensure_slots(all_names) + + # Should return all service types + assert len(result) == 5 + result_names = {st.TypeName for st in result} + assert result_names == set(all_names) + + # Verify consistency with list_all + all_from_list = service_type_repo.list_all() + list_names = {st.TypeName for st in all_from_list} + assert set(all_names).issubset(list_names) + + # Verify each can be found individually + for name in all_names: + found = service_type_repo.find_by_name(name) + assert found is not None + assert found.TypeName == name + + +def test_repository_scalability_and_performance(service_type_repo: ServiceTypeRepository): + """Test repository behavior with a larger number of service types.""" + # Create multiple service types with unique names + service_type_count = 25 + base_name = make_unique_name("ScalabilityTest") + service_types = [] + + # Create service types + for i in range(service_type_count): + name = f"{base_name}-{i:03d}" + service_type = service_type_repo.create(name) + service_types.append(service_type) + assert service_type.TypeName == name + + # Verify list_all returns all and maintains order + all_types = service_type_repo.list_all() + created_names = {st.TypeName for st in service_types} + all_names = {st.TypeName for st in all_types} + assert created_names.issubset(all_names) + + # Should be in alphabetical order + all_names_list = [st.TypeName for st in all_types] + assert all_names_list == sorted(all_names_list) + + # Test random access patterns + import random + test_indices = random.sample(range(service_type_count), min(5, service_type_count)) + + for i in test_indices: + expected_name = f"{base_name}-{i:03d}" + expected_id = service_types[i].ServiceTypeId + + # Test retrieval by ID + by_id = service_type_repo.get_by_id(expected_id) + assert by_id is not None + assert by_id.TypeName == expected_name + + # Test retrieval by name + by_name = service_type_repo.find_by_name(expected_name) + assert by_name is not None + assert by_name.ServiceTypeId == expected_id + + +def test_concurrent_operation_simulation(service_type_repo: ServiceTypeRepository): + """Simulate concurrent operations to test repository consistency.""" + # Simulate what might happen if multiple processes/threads were creating service types + base_names = [make_unique_name("Morning"), make_unique_name("Evening"), make_unique_name("Afternoon")] + + # Multiple "processes" trying to ensure the same slots exist + results = [] + for _ in range(3): + result = service_type_repo.ensure_slots(base_names) + results.append(result) + + # Should always return the same service types + assert len(result) == 3 + result_names = {st.TypeName for st in result} + assert result_names == set(base_names) + + # All results should have the same IDs for the same names + first_result = results[0] + first_name_to_id = {st.TypeName: st.ServiceTypeId for st in first_result} + + for result in results[1:]: + result_name_to_id = {st.TypeName: st.ServiceTypeId for st in result} + assert first_name_to_id == result_name_to_id + + # Verify only one of each was actually created + all_types = service_type_repo.list_all() + all_names = [st.TypeName for st in all_types] + + for name in base_names: + count = all_names.count(name) + assert count == 1, f"Found {count} instances of {name}, expected 1" \ No newline at end of file