""" FastAPI application for NimbusFlow backend. Provides REST API endpoints for the frontend Blazor application. """ from __future__ import annotations from pathlib import Path from typing import List, Optional, Union from datetime import datetime, date from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field # Import the existing backend modules from backend.db import DatabaseConnection from backend.repositories import ( MemberRepository, ClassificationRepository, ServiceRepository, ServiceTypeRepository, ScheduleRepository, ServiceAvailabilityRepository, ) from backend.services.scheduling_service import SchedulingService from backend.models.dataclasses import ( Member as DbMember, Classification as DbClassification, Service as DbService, ServiceType as DbServiceType, Schedule as DbSchedule, ) # Initialize FastAPI app app = FastAPI(title="NimbusFlow API", version="1.0.0") # Add CORS middleware to allow frontend access app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:5059", "https://localhost:5059" ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database path DB_PATH = Path(__file__).parent.parent / "db" / "sqlite" / "database.db" # Configure SQLite to allow threading import sqlite3 sqlite3.threadsafety = 3 # Custom DatabaseConnection for FastAPI that handles threading class FastAPIDatabaseConnection(DatabaseConnection): """DatabaseConnection that allows cross-thread usage for FastAPI.""" def __init__( self, db_path: Union[str, Path], *, timeout: float = 5.0, detect_types: int = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, ) -> None: # Call parent constructor but modify connection to allow threading self._conn: sqlite3.Connection = sqlite3.connect( str(db_path), timeout=timeout, detect_types=detect_types, check_same_thread=False # Allow cross-thread usage ) # ``Row`` makes column access dictionary‑like and preserves order. self._conn.row_factory = sqlite3.Row self._cursor: sqlite3.Cursor = self._conn.cursor() # Pydantic models for API requests/responses class Member(BaseModel): memberId: int = Field(alias="MemberId") firstName: str = Field(alias="FirstName") lastName: str = Field(alias="LastName") email: Optional[str] = Field(default=None, alias="Email") phoneNumber: Optional[str] = Field(default=None, alias="PhoneNumber") classificationId: Optional[int] = Field(default=None, alias="ClassificationId") notes: Optional[str] = Field(default=None, alias="Notes") isActive: int = Field(default=1, alias="IsActive") lastScheduledAt: Optional[datetime] = Field(default=None, alias="LastScheduledAt") lastAcceptedAt: Optional[datetime] = Field(default=None, alias="LastAcceptedAt") lastDeclinedAt: Optional[datetime] = Field(default=None, alias="LastDeclinedAt") declineStreak: int = Field(default=0, alias="DeclineStreak") class Config: populate_by_name = True class MemberCreate(BaseModel): firstName: str = Field(alias="FirstName") lastName: str = Field(alias="LastName") email: Optional[str] = Field(default=None, alias="Email") phoneNumber: Optional[str] = Field(default=None, alias="PhoneNumber") classificationId: Optional[int] = Field(default=None, alias="ClassificationId") notes: Optional[str] = Field(default=None, alias="Notes") isActive: int = Field(default=1, alias="IsActive") class Config: populate_by_name = True class Classification(BaseModel): classificationId: int = Field(alias="ClassificationId") classificationName: str = Field(alias="ClassificationName") class Config: populate_by_name = True class Service(BaseModel): serviceId: int = Field(alias="ServiceId") serviceTypeId: int = Field(alias="ServiceTypeId") serviceDate: date = Field(alias="ServiceDate") class Config: populate_by_name = True class ServiceCreate(BaseModel): serviceTypeId: int = Field(alias="ServiceTypeId") serviceDate: date = Field(alias="ServiceDate") class Config: populate_by_name = True class ServiceType(BaseModel): serviceTypeId: int = Field(alias="ServiceTypeId") typeName: str = Field(alias="TypeName") class Config: populate_by_name = True class Schedule(BaseModel): scheduleId: int = Field(alias="ScheduleId") serviceId: int = Field(alias="ServiceId") memberId: int = Field(alias="MemberId") status: str = Field(alias="Status") scheduledAt: datetime = Field(alias="ScheduledAt") acceptedAt: Optional[datetime] = Field(default=None, alias="AcceptedAt") declinedAt: Optional[datetime] = Field(default=None, alias="DeclinedAt") expiresAt: Optional[datetime] = Field(default=None, alias="ExpiresAt") declineReason: Optional[str] = Field(default=None, alias="DeclineReason") class Config: populate_by_name = True class ScheduleNextRequest(BaseModel): serviceId: int classificationIds: List[int] class DeclineRequest(BaseModel): reason: Optional[str] = None # Context manager for database operations from contextlib import contextmanager @contextmanager def get_db_context(): """Context manager to handle database connections properly.""" db = DatabaseConnection(DB_PATH) try: with db: yield db finally: pass # Context manager handles cleanup # Dependency to get database connection and repositories def get_repositories(): # Create a new database connection for each request to avoid thread safety issues db = FastAPIDatabaseConnection(DB_PATH) return { "db": db, "member_repo": MemberRepository(db), "classification_repo": ClassificationRepository(db), "service_repo": ServiceRepository(db), "service_type_repo": ServiceTypeRepository(db), "schedule_repo": ScheduleRepository(db), "availability_repo": ServiceAvailabilityRepository(db), } def get_scheduling_service(repos: dict = Depends(get_repositories)): return SchedulingService( classification_repo=repos["classification_repo"], member_repo=repos["member_repo"], service_repo=repos["service_repo"], availability_repo=repos["availability_repo"], schedule_repo=repos["schedule_repo"], ) # Helper functions to convert between DB and API models def db_member_to_api(db_member: DbMember) -> Member: return Member( MemberId=db_member.MemberId, FirstName=db_member.FirstName, LastName=db_member.LastName, Email=db_member.Email, PhoneNumber=db_member.PhoneNumber, ClassificationId=db_member.ClassificationId, Notes=db_member.Notes, IsActive=db_member.IsActive, LastScheduledAt=db_member.LastScheduledAt, LastAcceptedAt=db_member.LastAcceptedAt, LastDeclinedAt=db_member.LastDeclinedAt, DeclineStreak=db_member.DeclineStreak, ) def api_member_to_db(api_member: Member) -> DbMember: return DbMember( MemberId=api_member.memberId, FirstName=api_member.firstName, LastName=api_member.lastName, Email=api_member.email, PhoneNumber=api_member.phoneNumber, ClassificationId=api_member.classificationId, Notes=api_member.notes, IsActive=api_member.isActive, LastScheduledAt=api_member.lastScheduledAt, LastAcceptedAt=api_member.lastAcceptedAt, LastDeclinedAt=api_member.lastDeclinedAt, DeclineStreak=api_member.declineStreak, ) def db_classification_to_api(db_classification: DbClassification) -> Classification: return Classification( ClassificationId=db_classification.ClassificationId, ClassificationName=db_classification.ClassificationName, ) def db_service_to_api(db_service: DbService) -> Service: return Service( ServiceId=db_service.ServiceId, ServiceTypeId=db_service.ServiceTypeId, ServiceDate=db_service.ServiceDate, ) def db_service_type_to_api(db_service_type: DbServiceType) -> ServiceType: return ServiceType( ServiceTypeId=db_service_type.ServiceTypeId, TypeName=db_service_type.TypeName, ) def db_schedule_to_api(db_schedule: DbSchedule) -> Schedule: return Schedule( ScheduleId=db_schedule.ScheduleId, ServiceId=db_schedule.ServiceId, MemberId=db_schedule.MemberId, Status=db_schedule.Status, ScheduledAt=db_schedule.ScheduledAt, AcceptedAt=db_schedule.AcceptedAt, DeclinedAt=db_schedule.DeclinedAt, ExpiresAt=db_schedule.ExpiresAt, DeclineReason=db_schedule.DeclineReason, ) # API Endpoints # Member endpoints @app.get("/api/members", response_model=List[Member]) async def get_members(repos: dict = Depends(get_repositories)): db_members = repos["member_repo"].list_all() return [db_member_to_api(member) for member in db_members] @app.get("/api/members/{member_id}", response_model=Member) async def get_member(member_id: int, repos: dict = Depends(get_repositories)): db_member = repos["member_repo"].get_by_id(member_id) if not db_member: raise HTTPException(status_code=404, detail="Member not found") return db_member_to_api(db_member) @app.post("/api/members", response_model=Member) async def create_member(member_data: MemberCreate, repos: dict = Depends(get_repositories)): db_member = repos["member_repo"].create( first_name=member_data.firstName, last_name=member_data.lastName, email=member_data.email, phone_number=member_data.phoneNumber, classification_id=member_data.classificationId, notes=member_data.notes, is_active=member_data.isActive, ) return db_member_to_api(db_member) @app.put("/api/members/{member_id}", response_model=Member) async def update_member(member_id: int, member_data: Member, repos: dict = Depends(get_repositories)): existing_member = repos["member_repo"].get_by_id(member_id) if not existing_member: raise HTTPException(status_code=404, detail="Member not found") # Use the base repository _update method updates = { "FirstName": member_data.firstName, "LastName": member_data.lastName, "Email": member_data.email, "PhoneNumber": member_data.phoneNumber, "ClassificationId": member_data.classificationId, "Notes": member_data.notes, "IsActive": member_data.isActive, } repos["member_repo"]._update("Members", "MemberId", member_id, updates) # Return the updated member updated_member = repos["member_repo"].get_by_id(member_id) return db_member_to_api(updated_member) @app.delete("/api/members/{member_id}") async def delete_member(member_id: int, repos: dict = Depends(get_repositories)): existing_member = repos["member_repo"].get_by_id(member_id) if not existing_member: raise HTTPException(status_code=404, detail="Member not found") repos["member_repo"]._delete("Members", "MemberId", member_id) return {"message": "Member deleted successfully"} @app.get("/api/members/{member_id}/schedules", response_model=List[Schedule]) async def get_member_schedules(member_id: int, repos: dict = Depends(get_repositories)): existing_member = repos["member_repo"].get_by_id(member_id) if not existing_member: raise HTTPException(status_code=404, detail="Member not found") # Get all schedules and filter by member ID (since there's no specific method) all_schedules = repos["schedule_repo"].list_all() member_schedules = [s for s in all_schedules if s.MemberId == member_id] return [db_schedule_to_api(schedule) for schedule in member_schedules] # Classification endpoints @app.get("/api/classifications", response_model=List[Classification]) async def get_classifications(repos: dict = Depends(get_repositories)): db_classifications = repos["classification_repo"].list_all() return [db_classification_to_api(classification) for classification in db_classifications] # Service endpoints @app.get("/api/services", response_model=List[Service]) async def get_services(repos: dict = Depends(get_repositories)): db_services = repos["service_repo"].list_all() return [db_service_to_api(service) for service in db_services] @app.get("/api/services/{service_id}", response_model=Service) async def get_service(service_id: int, repos: dict = Depends(get_repositories)): db_service = repos["service_repo"].get_by_id(service_id) if not db_service: raise HTTPException(status_code=404, detail="Service not found") return db_service_to_api(db_service) @app.post("/api/services", response_model=Service) async def create_service(service_data: ServiceCreate, repos: dict = Depends(get_repositories)): db_service = repos["service_repo"].create( service_type_id=service_data.serviceTypeId, service_date=service_data.serviceDate, ) return db_service_to_api(db_service) # Service Type endpoints @app.get("/api/service-types", response_model=List[ServiceType]) async def get_service_types(repos: dict = Depends(get_repositories)): db_service_types = repos["service_type_repo"].list_all() return [db_service_type_to_api(service_type) for service_type in db_service_types] # Schedule endpoints @app.get("/api/schedules", response_model=List[Schedule]) async def get_schedules(repos: dict = Depends(get_repositories)): db_schedules = repos["schedule_repo"].list_all() return [db_schedule_to_api(schedule) for schedule in db_schedules] @app.get("/api/schedules/{schedule_id}", response_model=Schedule) async def get_schedule(schedule_id: int, repos: dict = Depends(get_repositories)): db_schedule = repos["schedule_repo"].get_by_id(schedule_id) if not db_schedule: raise HTTPException(status_code=404, detail="Schedule not found") return db_schedule_to_api(db_schedule) @app.post("/api/schedules/{schedule_id}/accept", response_model=Schedule) async def accept_schedule(schedule_id: int, repos: dict = Depends(get_repositories)): db_schedule = repos["schedule_repo"].get_by_id(schedule_id) if not db_schedule: raise HTTPException(status_code=404, detail="Schedule not found") repos["schedule_repo"].mark_accepted(schedule_id) # Return the updated schedule updated_schedule = repos["schedule_repo"].get_by_id(schedule_id) return db_schedule_to_api(updated_schedule) @app.post("/api/schedules/{schedule_id}/decline", response_model=Schedule) async def decline_schedule( schedule_id: int, decline_data: DeclineRequest, repos: dict = Depends(get_repositories), scheduling_service: SchedulingService = Depends(get_scheduling_service) ): db_schedule = repos["schedule_repo"].get_by_id(schedule_id) if not db_schedule: raise HTTPException(status_code=404, detail="Schedule not found") # Use the scheduling service to decline (handles the business logic) scheduling_service.decline_service_for_user( member_id=db_schedule.MemberId, service_id=db_schedule.ServiceId, reason=decline_data.reason, ) # Return the updated schedule updated_schedule = repos["schedule_repo"].get_by_id(schedule_id) return db_schedule_to_api(updated_schedule) @app.delete("/api/schedules/{schedule_id}") async def remove_schedule(schedule_id: int, repos: dict = Depends(get_repositories)): existing_schedule = repos["schedule_repo"].get_by_id(schedule_id) if not existing_schedule: raise HTTPException(status_code=404, detail="Schedule not found") repos["schedule_repo"]._delete("Schedules", "ScheduleId", schedule_id) return {"message": "Schedule removed successfully"} @app.post("/api/schedules/schedule-next", response_model=Optional[Schedule]) async def schedule_next_member( request: ScheduleNextRequest, scheduling_service: SchedulingService = Depends(get_scheduling_service), repos: dict = Depends(get_repositories) ): result = scheduling_service.schedule_next_member( classification_ids=request.classificationIds, service_id=request.serviceId, only_active=True, exclude_member_ids=set(), ) if not result: raise HTTPException(status_code=404, detail="No eligible member found") # result is a tuple: (schedule_id, first_name, last_name, member_id) schedule_id = result[0] db_schedule = repos["schedule_repo"].get_by_id(schedule_id) return db_schedule_to_api(db_schedule) # Health check endpoint @app.get("/api/health") async def health_check(): return {"status": "healthy", "message": "NimbusFlow API is running"}