Files
nimbusflow/backend/tests/repositories/test_schedule.py

444 lines
14 KiB
Python

# tests/repositories/test_schedule.py
import datetime as dt
from typing import List
import pytest
from backend.models import Schedule as ScheduleModel, ScheduleStatus
from backend.repositories import ScheduleRepository, ServiceRepository
from backend.db import DatabaseConnection
# ----------------------------------------------------------------------
# Additional fixtures for Schedule repository testing
# ----------------------------------------------------------------------
@pytest.fixture
def schedule_repo(
db_connection: DatabaseConnection,
seed_lookup_tables,
) -> ScheduleRepository:
return ScheduleRepository(db_connection)
@pytest.fixture
def service_repo(
db_connection: DatabaseConnection,
seed_lookup_tables,
) -> ServiceRepository:
return ServiceRepository(db_connection)
@pytest.fixture
def sample_service(service_repo: ServiceRepository) -> int:
"""Create a sample service and return its ID."""
service = service_repo.create(
service_type_id=1, # 9AM from seeded data
service_date=dt.date(2025, 9, 15)
)
return service.ServiceId
@pytest.fixture
def clean_schedules(schedule_repo: ScheduleRepository):
"""Clean the Schedules table before tests."""
schedule_repo.db.execute("DELETE FROM Schedules")
schedule_repo.db._conn.commit()
# ----------------------------------------------------------------------
# Basic CRUD Operations
# ----------------------------------------------------------------------
def test_create_and_get_by_id(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test basic schedule creation and retrieval."""
# Create a schedule
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1, # Alice from seeded data
status=ScheduleStatus.PENDING
)
# Verify the created schedule
assert isinstance(schedule.ScheduleId, int) and schedule.ScheduleId > 0
assert schedule.ServiceId == sample_service
assert schedule.MemberId == 1
assert schedule.Status == ScheduleStatus.PENDING.value
assert schedule.ScheduledAt is not None
assert schedule.AcceptedAt is None
assert schedule.DeclinedAt is None
# Retrieve the schedule
fetched = schedule_repo.get_by_id(schedule.ScheduleId)
assert fetched is not None
assert fetched.ServiceId == sample_service
assert fetched.MemberId == 1
assert fetched.Status == ScheduleStatus.PENDING.value
def test_get_by_id_returns_none_when_missing(schedule_repo: ScheduleRepository):
"""Test that get_by_id returns None for non-existent schedules."""
assert schedule_repo.get_by_id(9999) is None
def test_create_with_decline_reason(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test creating a schedule with DECLINED status and reason."""
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.DECLINED,
reason="Already committed elsewhere"
)
assert schedule.Status == ScheduleStatus.DECLINED.value
assert schedule.DeclineReason == "Already committed elsewhere"
# ----------------------------------------------------------------------
# List Operations
# ----------------------------------------------------------------------
def test_list_all(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test listing all schedules."""
# Create multiple schedules
schedule1 = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
schedule2 = schedule_repo.create(
service_id=sample_service,
member_id=2,
status=ScheduleStatus.ACCEPTED
)
schedules = schedule_repo.list_all()
assert len(schedules) == 2
schedule_ids = {s.ScheduleId for s in schedules}
assert schedule1.ScheduleId in schedule_ids
assert schedule2.ScheduleId in schedule_ids
def test_get_pending_for_service(
schedule_repo: ScheduleRepository,
service_repo: ServiceRepository,
clean_schedules
):
"""Test getting pending schedules for a specific service."""
# Create two services
service1 = service_repo.create(service_type_id=1, service_date=dt.date(2025, 9, 15))
service2 = service_repo.create(service_type_id=2, service_date=dt.date(2025, 9, 15))
# Create schedules with different statuses
pending1 = schedule_repo.create(
service_id=service1.ServiceId,
member_id=1,
status=ScheduleStatus.PENDING
)
accepted1 = schedule_repo.create(
service_id=service1.ServiceId,
member_id=2,
status=ScheduleStatus.ACCEPTED
)
pending2 = schedule_repo.create(
service_id=service2.ServiceId,
member_id=1,
status=ScheduleStatus.PENDING
)
# Get pending schedules for service1
pending_schedules = schedule_repo.get_pending_for_service(service1.ServiceId)
assert len(pending_schedules) == 1
assert pending_schedules[0].ScheduleId == pending1.ScheduleId
assert pending_schedules[0].Status == ScheduleStatus.PENDING.value
# ----------------------------------------------------------------------
# Query Operations
# ----------------------------------------------------------------------
def test_get_one(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test getting one schedule by member and service ID."""
# Create a schedule
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Find it
found = schedule_repo.get_one(member_id=1, service_id=sample_service)
assert found is not None
assert found.ScheduleId == schedule.ScheduleId
# Try to find non-existent
not_found = schedule_repo.get_one(member_id=999, service_id=sample_service)
assert not_found is None
def test_has_any(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test checking if member has schedules with specific statuses."""
# Create schedules with different statuses
schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
schedule_repo.create(
service_id=sample_service,
member_id=2,
status=ScheduleStatus.ACCEPTED
)
# Test has_any
assert schedule_repo.has_any(1, sample_service, [ScheduleStatus.PENDING])
assert schedule_repo.has_any(2, sample_service, [ScheduleStatus.ACCEPTED])
assert schedule_repo.has_any(1, sample_service, [ScheduleStatus.PENDING, ScheduleStatus.ACCEPTED])
assert not schedule_repo.has_any(1, sample_service, [ScheduleStatus.DECLINED])
assert not schedule_repo.has_any(999, sample_service, [ScheduleStatus.PENDING])
# Test empty statuses list
assert not schedule_repo.has_any(1, sample_service, [])
def test_has_schedule_on_date(
schedule_repo: ScheduleRepository,
service_repo: ServiceRepository,
clean_schedules
):
"""Test checking if member has any active schedule on a specific date."""
# Create services on different dates
service_today_9am = service_repo.create(
service_type_id=1,
service_date=dt.date(2025, 9, 15)
)
service_today_11am = service_repo.create(
service_type_id=2,
service_date=dt.date(2025, 9, 15)
)
service_tomorrow = service_repo.create(
service_type_id=2,
service_date=dt.date(2025, 9, 16)
)
# Create pending schedule for today
schedule_repo.create(
service_id=service_today_9am.ServiceId,
member_id=1,
status=ScheduleStatus.PENDING
)
# Create declined schedule for today (should not block)
schedule_repo.create(
service_id=service_today_11am.ServiceId,
member_id=2,
status=ScheduleStatus.DECLINED
)
# Test has_schedule_on_date
assert schedule_repo.has_schedule_on_date(1, "2025-09-15") # pending schedule blocks
assert not schedule_repo.has_schedule_on_date(2, "2025-09-15") # declined schedule doesn't block
assert not schedule_repo.has_schedule_on_date(1, "2025-09-16") # different date
assert not schedule_repo.has_schedule_on_date(3, "2025-09-15") # different member
# ----------------------------------------------------------------------
# Status Update Operations
# ----------------------------------------------------------------------
def test_update_status_to_accepted(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test updating schedule status to accepted."""
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Update to accepted
schedule_repo.update_status(
schedule_id=schedule.ScheduleId,
new_status=ScheduleStatus.ACCEPTED
)
# Verify the update
updated = schedule_repo.get_by_id(schedule.ScheduleId)
assert updated is not None
assert updated.Status == ScheduleStatus.ACCEPTED.value
assert updated.DeclinedAt is None
assert updated.DeclineReason is None
def test_update_status_to_declined(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test updating schedule status to declined with reason."""
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Update to declined
schedule_repo.update_status(
schedule_id=schedule.ScheduleId,
new_status=ScheduleStatus.DECLINED,
reason="Family emergency"
)
# Verify the update
updated = schedule_repo.get_by_id(schedule.ScheduleId)
assert updated is not None
assert updated.Status == ScheduleStatus.DECLINED.value
assert updated.DeclinedAt is not None
assert updated.DeclineReason == "Family emergency"
def test_mark_accepted(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test marking a schedule as accepted."""
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Mark as accepted
schedule_repo.mark_accepted(schedule.ScheduleId)
# Verify
updated = schedule_repo.get_by_id(schedule.ScheduleId)
assert updated is not None
assert updated.Status == ScheduleStatus.ACCEPTED.value
assert updated.AcceptedAt is not None
assert updated.DeclinedAt is None
assert updated.DeclineReason is None
def test_mark_declined(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test marking a schedule as declined."""
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Mark as declined
schedule_repo.mark_declined(
schedule.ScheduleId,
decline_reason="Unable to attend"
)
# Verify
updated = schedule_repo.get_by_id(schedule.ScheduleId)
assert updated is not None
assert updated.Status == ScheduleStatus.DECLINED.value
assert updated.DeclinedAt is not None
assert updated.DeclineReason == "Unable to attend"
# ----------------------------------------------------------------------
# Delete Operations (Added for CLI functionality)
# ----------------------------------------------------------------------
def test_delete_schedule(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test deleting a schedule."""
# Create a schedule
schedule = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
# Verify it exists
assert schedule_repo.get_by_id(schedule.ScheduleId) is not None
# Delete it
result = schedule_repo.delete_schedule(schedule.ScheduleId)
assert result is True
# Verify it's gone
assert schedule_repo.get_by_id(schedule.ScheduleId) is None
# Try to delete again (should return False)
result2 = schedule_repo.delete_schedule(schedule.ScheduleId)
assert result2 is False
def test_delete_nonexistent_schedule(schedule_repo: ScheduleRepository):
"""Test deleting a non-existent schedule returns False."""
result = schedule_repo.delete_schedule(9999)
assert result is False
# ----------------------------------------------------------------------
# Edge Cases and Error Conditions
# ----------------------------------------------------------------------
def test_create_with_invalid_foreign_keys(
schedule_repo: ScheduleRepository,
clean_schedules
):
"""Test that creating schedule with invalid FKs raises appropriate errors."""
# This should fail due to FK constraint (assuming constraints are enforced)
with pytest.raises(Exception): # SQLite foreign key constraint error
schedule_repo.create(
service_id=9999, # Non-existent service
member_id=1,
status=ScheduleStatus.PENDING
)
def test_unique_constraint_member_service(
schedule_repo: ScheduleRepository,
sample_service: int,
clean_schedules
):
"""Test that UNIQUE constraint prevents duplicate member/service schedules."""
# Create first schedule
schedule1 = schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.PENDING
)
assert schedule1 is not None
# Attempting to create second schedule for same member/service should fail
with pytest.raises(Exception): # SQLite UNIQUE constraint error
schedule_repo.create(
service_id=sample_service,
member_id=1,
status=ScheduleStatus.DECLINED
)