Files
nimbusflow/backend/tests/repositories/test_service.py

566 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# backend/tests/repositories/test_service.py
# ------------------------------------------------------------
# Comprehensive pytest suite for the ServiceRepository.
# ------------------------------------------------------------
import pytest
from datetime import date, datetime, timedelta
from typing import List
from backend.models import Service as ServiceModel
from backend.repositories import ServiceRepository
# ----------------------------------------------------------------------
# Helper fixtures for test data
# ----------------------------------------------------------------------
@pytest.fixture
def sample_dates():
"""Return a set of dates for testing."""
today = date.today()
return {
'past': today - timedelta(days=30),
'yesterday': today - timedelta(days=1),
'today': today,
'tomorrow': today + timedelta(days=1),
'next_week': today + timedelta(days=7),
'future': today + timedelta(days=30),
}
@pytest.fixture
def clean_services(service_repo: ServiceRepository):
"""Clean the Services table for tests that need isolation."""
# Clear any existing services to start fresh
service_repo.db.execute(f"DELETE FROM {service_repo._TABLE}")
service_repo.db._conn.commit()
# ----------------------------------------------------------------------
# 1<0F> Basic CRUD  create & get_by_id
# ----------------------------------------------------------------------
def test_create_and_get_by_id(service_repo: ServiceRepository, sample_dates):
"""Test basic service creation and retrieval by ID."""
service = service_repo.create(
service_type_id=1,
service_date=sample_dates['tomorrow']
)
# Verify creation
assert isinstance(service.ServiceId, int) and service.ServiceId > 0
assert service.ServiceTypeId == 1
assert service.ServiceDate == sample_dates['tomorrow']
# Retrieve the same service
fetched = service_repo.get_by_id(service.ServiceId)
assert fetched is not None
assert fetched.ServiceId == service.ServiceId
assert fetched.ServiceTypeId == 1
assert fetched.ServiceDate == sample_dates['tomorrow']
def test_get_by_id_returns_none_when_missing(service_repo: ServiceRepository):
"""Test that get_by_id returns None for nonexistent IDs."""
result = service_repo.get_by_id(99999)
assert result is None
def test_create_with_date_object(service_repo: ServiceRepository):
"""Test creating service with a date object."""
test_date = date(2025, 12, 25)
service = service_repo.create(service_type_id=2, service_date=test_date)
assert service.ServiceDate == test_date
assert service.ServiceTypeId == 2
# ----------------------------------------------------------------------
# 2<0F> list_all  bulk operations
# ----------------------------------------------------------------------
def test_list_all(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test listing all services."""
# Create multiple services
services_data = [
(1, sample_dates['today']),
(2, sample_dates['tomorrow']),
(1, sample_dates['next_week']),
]
created_services = []
for service_type_id, service_date in services_data:
service = service_repo.create(service_type_id, service_date)
created_services.append(service)
all_services = service_repo.list_all()
assert len(all_services) == 3
# Verify all created services are in the list
service_ids = {s.ServiceId for s in all_services}
expected_ids = {s.ServiceId for s in created_services}
assert service_ids == expected_ids
def test_list_all_empty_table(service_repo: ServiceRepository, clean_services):
"""Test list_all when table is empty."""
all_services = service_repo.list_all()
assert all_services == []
# ----------------------------------------------------------------------
# 3<0F> upcoming  date-based filtering
# ----------------------------------------------------------------------
def test_upcoming_default_behavior(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() with default parameters (from today, limit 100)."""
# Create services on various dates
past_service = service_repo.create(1, sample_dates['past'])
today_service = service_repo.create(1, sample_dates['today'])
future_service = service_repo.create(1, sample_dates['future'])
upcoming = service_repo.upcoming()
# Should include today and future, but not past
upcoming_ids = {s.ServiceId for s in upcoming}
assert today_service.ServiceId in upcoming_ids
assert future_service.ServiceId in upcoming_ids
assert past_service.ServiceId not in upcoming_ids
def test_upcoming_with_specific_after_date(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() with a specific after date."""
# Create services
service_repo.create(1, sample_dates['yesterday'])
tomorrow_service = service_repo.create(1, sample_dates['tomorrow'])
future_service = service_repo.create(1, sample_dates['future'])
# Get services from tomorrow onwards
upcoming = service_repo.upcoming(after=sample_dates['tomorrow'])
upcoming_ids = {s.ServiceId for s in upcoming}
assert tomorrow_service.ServiceId in upcoming_ids
assert future_service.ServiceId in upcoming_ids
assert len(upcoming) == 2
def test_upcoming_with_limit(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() with a limit parameter."""
# Create multiple future services
for i in range(5):
service_repo.create(1, sample_dates['today'] + timedelta(days=i))
upcoming = service_repo.upcoming(limit=3)
assert len(upcoming) == 3
def test_upcoming_chronological_order(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test that upcoming() returns services in chronological order."""
# Create services in non-chronological order
service_dates = [
sample_dates['future'],
sample_dates['tomorrow'],
sample_dates['next_week'],
sample_dates['today'],
]
for service_date in service_dates:
service_repo.create(1, service_date)
upcoming = service_repo.upcoming()
# Verify chronological order
for i in range(len(upcoming) - 1):
assert upcoming[i].ServiceDate <= upcoming[i + 1].ServiceDate
def test_upcoming_no_results(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() when no future services exist."""
# Create only past services
service_repo.create(1, sample_dates['past'])
service_repo.create(1, sample_dates['yesterday'])
upcoming = service_repo.upcoming()
assert upcoming == []
# ----------------------------------------------------------------------
# 4<0F> by_type  service type filtering
# ----------------------------------------------------------------------
def test_by_type_single_type(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test filtering by a single service type."""
# Create services of different types
type1_service = service_repo.create(1, sample_dates['today'])
type2_service = service_repo.create(2, sample_dates['tomorrow'])
type1_service2 = service_repo.create(1, sample_dates['future'])
type1_services = service_repo.by_type([1])
# Should only include type 1 services
type1_ids = {s.ServiceId for s in type1_services}
assert type1_service.ServiceId in type1_ids
assert type1_service2.ServiceId in type1_ids
assert type2_service.ServiceId not in type1_ids
assert len(type1_services) == 2
def test_by_type_multiple_types(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test filtering by multiple service types."""
# Create services of different types
type1_service = service_repo.create(1, sample_dates['today'])
type2_service = service_repo.create(2, sample_dates['tomorrow'])
type3_service = service_repo.create(3, sample_dates['future'])
multi_type_services = service_repo.by_type([1, 3])
# Should include type 1 and 3, but not type 2
multi_ids = {s.ServiceId for s in multi_type_services}
assert type1_service.ServiceId in multi_ids
assert type3_service.ServiceId in multi_ids
assert type2_service.ServiceId not in multi_ids
assert len(multi_type_services) == 2
def test_by_type_empty_list(service_repo: ServiceRepository):
"""Test by_type() with empty type list returns empty result without DB query."""
result = service_repo.by_type([])
assert result == []
def test_by_type_nonexistent_types(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test by_type() with nonexistent service types."""
service_repo.create(1, sample_dates['today'])
result = service_repo.by_type([999, 1000])
assert result == []
def test_by_type_chronological_order(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test that by_type() returns services in chronological order."""
# Create services of same type in non-chronological order
service_dates = [
sample_dates['future'],
sample_dates['today'],
sample_dates['tomorrow'],
]
for service_date in service_dates:
service_repo.create(1, service_date)
services = service_repo.by_type([1])
# Verify chronological order
for i in range(len(services) - 1):
assert services[i].ServiceDate <= services[i + 1].ServiceDate
# ----------------------------------------------------------------------
# 5<0F> reschedule  update operations
# ----------------------------------------------------------------------
def test_reschedule_service(service_repo: ServiceRepository, sample_dates):
"""Test rescheduling a service to a new date."""
original_date = sample_dates['today']
new_date = sample_dates['future']
service = service_repo.create(1, original_date)
assert service.ServiceDate == original_date
# Reschedule the service
service_repo.reschedule(service.ServiceId, new_date)
# Verify the change
updated_service = service_repo.get_by_id(service.ServiceId)
assert updated_service is not None
assert updated_service.ServiceDate == new_date
assert updated_service.ServiceTypeId == 1 # Should remain unchanged
def test_reschedule_nonexistent_service(service_repo: ServiceRepository, sample_dates):
"""Test rescheduling a nonexistent service (should not raise error)."""
# This should not raise an exception
service_repo.reschedule(99999, sample_dates['tomorrow'])
# ----------------------------------------------------------------------
# 6<0F> Edge cases and error conditions
# ----------------------------------------------------------------------
def test_get_by_id_with_negative_id(service_repo: ServiceRepository):
"""Test get_by_id with negative ID (should return None)."""
result = service_repo.get_by_id(-1)
assert result is None
def test_get_by_id_with_zero_id(service_repo: ServiceRepository):
"""Test get_by_id with zero ID (should return None)."""
result = service_repo.get_by_id(0)
assert result is None
def test_create_with_invalid_service_type_id_raises_error(service_repo: ServiceRepository, sample_dates):
"""Test creating service with invalid service type ID raises foreign key error."""
with pytest.raises(Exception): # SQLite IntegrityError for FK constraint
service_repo.create(999, sample_dates['today'])
def test_create_with_very_old_date(service_repo: ServiceRepository):
"""Test creating service with a very old date."""
very_old_date = date(1900, 1, 1)
service = service_repo.create(1, very_old_date)
assert service.ServiceDate == very_old_date
def test_create_with_far_future_date(service_repo: ServiceRepository):
"""Test creating service with a far future date."""
far_future_date = date(2100, 12, 31)
service = service_repo.create(1, far_future_date)
assert service.ServiceDate == far_future_date
def test_upcoming_with_zero_limit(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() with zero limit."""
service_repo.create(1, sample_dates['tomorrow'])
upcoming = service_repo.upcoming(limit=0)
assert upcoming == []
def test_upcoming_with_negative_limit(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test upcoming() with negative limit (SQLite behavior)."""
service_repo.create(1, sample_dates['tomorrow'])
# SQLite treats negative LIMIT as unlimited
upcoming = service_repo.upcoming(limit=-1)
assert len(upcoming) >= 0 # Should not crash
def test_by_type_with_duplicate_type_ids(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test by_type() with duplicate type IDs in the list."""
service1 = service_repo.create(1, sample_dates['today'])
service2 = service_repo.create(2, sample_dates['tomorrow'])
# Pass duplicate type IDs
services = service_repo.by_type([1, 1, 2, 1])
# Should return services of both types (no duplicates in result)
service_ids = {s.ServiceId for s in services}
assert service1.ServiceId in service_ids
assert service2.ServiceId in service_ids
assert len(services) == 2
# ----------------------------------------------------------------------
# 7<0F> Data integrity and consistency tests
# ----------------------------------------------------------------------
def test_service_model_data_integrity(service_repo: ServiceRepository, sample_dates):
"""Test that Service model preserves data integrity."""
original_type_id = 2
original_date = sample_dates['tomorrow']
service = service_repo.create(original_type_id, original_date)
original_id = service.ServiceId
# Retrieve and verify data is preserved
retrieved = service_repo.get_by_id(original_id)
assert retrieved is not None
assert retrieved.ServiceId == original_id
assert retrieved.ServiceTypeId == original_type_id
assert retrieved.ServiceDate == original_date
# Verify through list_all as well
all_services = service_repo.list_all()
matching_services = [s for s in all_services if s.ServiceId == original_id]
assert len(matching_services) == 1
assert matching_services[0].ServiceTypeId == original_type_id
assert matching_services[0].ServiceDate == original_date
def test_multiple_services_same_date_and_type(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test creating multiple services on same date and type."""
# Should be allowed - multiple services can exist for same date/type
service1 = service_repo.create(1, sample_dates['today'])
service2 = service_repo.create(1, sample_dates['today'])
assert service1.ServiceId != service2.ServiceId
assert service1.ServiceTypeId == service2.ServiceTypeId
assert service1.ServiceDate == service2.ServiceDate
# Both should be findable
assert service_repo.get_by_id(service1.ServiceId) is not None
assert service_repo.get_by_id(service2.ServiceId) is not None
# ----------------------------------------------------------------------
# 8<0F> Parameterized tests for comprehensive coverage
# ----------------------------------------------------------------------
@pytest.mark.parametrize("service_type_id", [1, 2, 3])
def test_create_with_valid_service_type_ids(service_repo: ServiceRepository, service_type_id):
"""Test creating services with valid service type IDs."""
test_date = date.today()
service = service_repo.create(service_type_id, test_date)
assert service.ServiceTypeId == service_type_id
assert service.ServiceDate == test_date
assert isinstance(service.ServiceId, int)
assert service.ServiceId > 0
@pytest.mark.parametrize("invalid_service_type_id", [999, -1, 0])
def test_create_with_invalid_service_type_ids_raises_error(service_repo: ServiceRepository, invalid_service_type_id):
"""Test creating services with invalid service type IDs raises foreign key errors."""
test_date = date.today()
with pytest.raises(Exception): # SQLite IntegrityError for FK constraint
service_repo.create(invalid_service_type_id, test_date)
@pytest.mark.parametrize(
"days_offset,should_be_included",
[
(-30, False), # Past
(-1, False), # Yesterday
(0, True), # Today
(1, True), # Tomorrow
(7, True), # Next week
(30, True), # Future
]
)
def test_upcoming_date_filtering(
service_repo: ServiceRepository,
clean_services,
days_offset: int,
should_be_included: bool
):
"""Test upcoming() date filtering logic."""
test_date = date.today() + timedelta(days=days_offset)
service = service_repo.create(1, test_date)
upcoming = service_repo.upcoming()
upcoming_ids = {s.ServiceId for s in upcoming}
if should_be_included:
assert service.ServiceId in upcoming_ids
else:
assert service.ServiceId not in upcoming_ids
@pytest.mark.parametrize("limit", [1, 5, 10, 50, 100])
def test_upcoming_limit_parameter(service_repo: ServiceRepository, clean_services, limit: int):
"""Test upcoming() with various limit values."""
# Create more services than the limit
for i in range(limit + 5):
service_repo.create(1, date.today() + timedelta(days=i))
upcoming = service_repo.upcoming(limit=limit)
assert len(upcoming) == limit
# ----------------------------------------------------------------------
# 9<0F> Integration and workflow tests
# ----------------------------------------------------------------------
def test_complete_service_workflow(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test a complete workflow with multiple operations."""
initial_count = len(service_repo.list_all())
# Step 1: Create a new service
original_date = sample_dates['tomorrow']
service = service_repo.create(2, original_date)
assert service.ServiceTypeId == 2
assert service.ServiceDate == original_date
# Step 2: Verify it exists in list_all
all_services = service_repo.list_all()
assert len(all_services) == initial_count + 1
service_ids = {s.ServiceId for s in all_services}
assert service.ServiceId in service_ids
# Step 3: Find it in upcoming services
upcoming = service_repo.upcoming()
upcoming_ids = {s.ServiceId for s in upcoming}
assert service.ServiceId in upcoming_ids
# Step 4: Find it by type
by_type = service_repo.by_type([2])
by_type_ids = {s.ServiceId for s in by_type}
assert service.ServiceId in by_type_ids
# Step 5: Reschedule it
new_date = sample_dates['future']
service_repo.reschedule(service.ServiceId, new_date)
# Step 6: Verify the reschedule
updated_service = service_repo.get_by_id(service.ServiceId)
assert updated_service is not None
assert updated_service.ServiceDate == new_date
assert updated_service.ServiceTypeId == 2
def test_complex_date_filtering_scenario(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test complex scenarios with multiple date filters."""
# Create services across different dates and types
services_data = [
(1, sample_dates['past']), # Should not appear in upcoming
(1, sample_dates['today']), # Should appear in upcoming
(2, sample_dates['tomorrow']), # Should appear in upcoming
(1, sample_dates['future']), # Should appear in upcoming
(3, sample_dates['next_week']), # Should appear in upcoming
]
created_services = {}
for service_type_id, service_date in services_data:
service = service_repo.create(service_type_id, service_date)
created_services[(service_type_id, service_date)] = service
# Test upcoming from tomorrow
upcoming_from_tomorrow = service_repo.upcoming(after=sample_dates['tomorrow'])
upcoming_dates = {s.ServiceDate for s in upcoming_from_tomorrow}
assert sample_dates['tomorrow'] in upcoming_dates
assert sample_dates['future'] in upcoming_dates
assert sample_dates['next_week'] in upcoming_dates
assert sample_dates['past'] not in upcoming_dates
assert sample_dates['today'] not in upcoming_dates
# Test by specific types
type1_services = service_repo.by_type([1])
type1_dates = {s.ServiceDate for s in type1_services}
assert sample_dates['past'] in type1_dates
assert sample_dates['today'] in type1_dates
assert sample_dates['future'] in type1_dates
# Test combination: upcoming type 1 services (filter by date intersection)
upcoming_all = service_repo.upcoming()
upcoming_type1 = [s for s in upcoming_all if s.ServiceTypeId == 1]
upcoming_type1_dates = {s.ServiceDate for s in upcoming_type1}
assert sample_dates['today'] in upcoming_type1_dates
assert sample_dates['future'] in upcoming_type1_dates
assert sample_dates['past'] not in upcoming_type1_dates
def test_service_repository_consistency(service_repo: ServiceRepository, sample_dates, clean_services):
"""Test repository consistency across different query methods."""
# Create a mix of services
test_services = []
for i in range(5):
service = service_repo.create(
service_type_id=(i % 3) + 1,
service_date=sample_dates['today'] + timedelta(days=i)
)
test_services.append(service)
# All services should be found through list_all
all_services = service_repo.list_all()
all_ids = {s.ServiceId for s in all_services}
test_ids = {s.ServiceId for s in test_services}
assert test_ids.issubset(all_ids)
# Each service should be retrievable individually
for service in test_services:
retrieved = service_repo.get_by_id(service.ServiceId)
assert retrieved is not None
assert retrieved.ServiceId == service.ServiceId
assert retrieved.ServiceTypeId == service.ServiceTypeId
assert retrieved.ServiceDate == service.ServiceDate
# Services should appear in appropriate filtered queries
future_services = service_repo.upcoming(after=sample_dates['tomorrow'])
future_ids = {s.ServiceId for s in future_services}
for service in test_services:
if service.ServiceDate >= sample_dates['tomorrow']:
assert service.ServiceId in future_ids
else:
assert service.ServiceId not in future_ids