Files
nimbusflow/backend/api/app.py

574 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FastAPI application for NimbusFlow backend.
Provides REST API endpoints for the frontend Blazor application.
"""
from __future__ import annotations
from pathlib import Path
from typing import List, Optional, Union
from datetime import datetime, date
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# Import the existing backend modules
from backend.db import DatabaseConnection
from backend.repositories import (
MemberRepository,
ClassificationRepository,
ServiceRepository,
ServiceTypeRepository,
ScheduleRepository,
ServiceAvailabilityRepository,
)
from backend.services.scheduling_service import SchedulingService
from backend.models.dataclasses import (
Member as DbMember,
Classification as DbClassification,
Service as DbService,
ServiceType as DbServiceType,
Schedule as DbSchedule,
)
# Initialize FastAPI app
app = FastAPI(title="NimbusFlow API", version="1.0.0")
# Add CORS middleware to allow frontend access
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5059",
"https://localhost:5059"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database path
DB_PATH = Path(__file__).parent.parent / "db" / "sqlite" / "database.db"
# Configure SQLite to allow threading
import sqlite3
sqlite3.threadsafety = 3
# Custom DatabaseConnection for FastAPI that handles threading
class FastAPIDatabaseConnection(DatabaseConnection):
"""DatabaseConnection that allows cross-thread usage for FastAPI."""
def __init__(
self,
db_path: Union[str, Path],
*,
timeout: float = 5.0,
detect_types: int = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
) -> None:
# Call parent constructor but modify connection to allow threading
self._conn: sqlite3.Connection = sqlite3.connect(
str(db_path),
timeout=timeout,
detect_types=detect_types,
check_same_thread=False # Allow cross-thread usage
)
# ``Row`` makes column access dictionarylike and preserves order.
self._conn.row_factory = sqlite3.Row
self._cursor: sqlite3.Cursor = self._conn.cursor()
# Pydantic models for API requests/responses
class Member(BaseModel):
memberId: int = Field(alias="MemberId")
firstName: str = Field(alias="FirstName")
lastName: str = Field(alias="LastName")
email: Optional[str] = Field(default=None, alias="Email")
phoneNumber: Optional[str] = Field(default=None, alias="PhoneNumber")
classificationId: Optional[int] = Field(default=None, alias="ClassificationId")
classificationName: Optional[str] = Field(default=None, alias="ClassificationName")
notes: Optional[str] = Field(default=None, alias="Notes")
isActive: int = Field(default=1, alias="IsActive")
lastScheduledAt: Optional[datetime] = Field(default=None, alias="LastScheduledAt")
lastAcceptedAt: Optional[datetime] = Field(default=None, alias="LastAcceptedAt")
lastDeclinedAt: Optional[datetime] = Field(default=None, alias="LastDeclinedAt")
declineStreak: int = Field(default=0, alias="DeclineStreak")
class Config:
populate_by_name = True
class MemberCreate(BaseModel):
firstName: str = Field(alias="FirstName")
lastName: str = Field(alias="LastName")
email: Optional[str] = Field(default=None, alias="Email")
phoneNumber: Optional[str] = Field(default=None, alias="PhoneNumber")
classificationId: Optional[int] = Field(default=None, alias="ClassificationId")
notes: Optional[str] = Field(default=None, alias="Notes")
isActive: int = Field(default=1, alias="IsActive")
class Config:
populate_by_name = True
class Classification(BaseModel):
classificationId: int = Field(alias="ClassificationId")
classificationName: str = Field(alias="ClassificationName")
class Config:
populate_by_name = True
class Service(BaseModel):
serviceId: int = Field(alias="ServiceId")
serviceTypeId: int = Field(alias="ServiceTypeId")
serviceDate: date = Field(alias="ServiceDate")
serviceTypeName: Optional[str] = Field(default=None, alias="ServiceTypeName")
class Config:
populate_by_name = True
class ServiceCreate(BaseModel):
serviceTypeId: int = Field(alias="ServiceTypeId")
serviceDate: date = Field(alias="ServiceDate")
class Config:
populate_by_name = True
class ServiceType(BaseModel):
serviceTypeId: int = Field(alias="ServiceTypeId")
typeName: str = Field(alias="TypeName")
class Config:
populate_by_name = True
class Schedule(BaseModel):
scheduleId: int = Field(alias="ScheduleId")
serviceId: int = Field(alias="ServiceId")
memberId: int = Field(alias="MemberId")
status: str = Field(alias="Status")
scheduledAt: datetime = Field(alias="ScheduledAt")
acceptedAt: Optional[datetime] = Field(default=None, alias="AcceptedAt")
declinedAt: Optional[datetime] = Field(default=None, alias="DeclinedAt")
expiresAt: Optional[datetime] = Field(default=None, alias="ExpiresAt")
declineReason: Optional[str] = Field(default=None, alias="DeclineReason")
member: Optional["Member"] = Field(default=None, alias="Member")
service: Optional["Service"] = Field(default=None, alias="Service")
class Config:
populate_by_name = True
class ScheduleNextRequest(BaseModel):
serviceId: int
classificationIds: List[int]
class DeclineRequest(BaseModel):
reason: Optional[str] = None
# Context manager for database operations
from contextlib import contextmanager
@contextmanager
def get_db_context():
"""Context manager to handle database connections properly."""
db = DatabaseConnection(DB_PATH)
try:
with db:
yield db
finally:
pass # Context manager handles cleanup
# Dependency to get database connection and repositories
def get_repositories():
# Create a new database connection for each request to avoid thread safety issues
db = FastAPIDatabaseConnection(DB_PATH)
return {
"db": db,
"member_repo": MemberRepository(db),
"classification_repo": ClassificationRepository(db),
"service_repo": ServiceRepository(db),
"service_type_repo": ServiceTypeRepository(db),
"schedule_repo": ScheduleRepository(db),
"availability_repo": ServiceAvailabilityRepository(db),
}
def get_scheduling_service(repos: dict = Depends(get_repositories)):
return SchedulingService(
classification_repo=repos["classification_repo"],
member_repo=repos["member_repo"],
service_repo=repos["service_repo"],
availability_repo=repos["availability_repo"],
schedule_repo=repos["schedule_repo"],
)
# Helper functions to convert between DB and API models
def db_member_to_api(db_member: DbMember, classification_name: str = None) -> Member:
return Member(
MemberId=db_member.MemberId,
FirstName=db_member.FirstName,
LastName=db_member.LastName,
Email=db_member.Email,
PhoneNumber=db_member.PhoneNumber,
ClassificationId=db_member.ClassificationId,
ClassificationName=classification_name,
Notes=db_member.Notes,
IsActive=db_member.IsActive,
LastScheduledAt=db_member.LastScheduledAt,
LastAcceptedAt=db_member.LastAcceptedAt,
LastDeclinedAt=db_member.LastDeclinedAt,
DeclineStreak=db_member.DeclineStreak,
)
def api_member_to_db(api_member: Member) -> DbMember:
return DbMember(
MemberId=api_member.memberId,
FirstName=api_member.firstName,
LastName=api_member.lastName,
Email=api_member.email,
PhoneNumber=api_member.phoneNumber,
ClassificationId=api_member.classificationId,
Notes=api_member.notes,
IsActive=api_member.isActive,
LastScheduledAt=api_member.lastScheduledAt,
LastAcceptedAt=api_member.lastAcceptedAt,
LastDeclinedAt=api_member.lastDeclinedAt,
DeclineStreak=api_member.declineStreak,
)
def db_classification_to_api(db_classification: DbClassification) -> Classification:
return Classification(
ClassificationId=db_classification.ClassificationId,
ClassificationName=db_classification.ClassificationName,
)
def db_service_to_api(db_service: DbService, service_type: DbServiceType = None) -> Service:
service_dict = {
"ServiceId": db_service.ServiceId,
"ServiceTypeId": db_service.ServiceTypeId,
"ServiceDate": db_service.ServiceDate,
}
# Add service type name if provided
if service_type:
service_dict["ServiceTypeName"] = service_type.TypeName
return Service(**service_dict)
def db_service_type_to_api(db_service_type: DbServiceType) -> ServiceType:
return ServiceType(
ServiceTypeId=db_service_type.ServiceTypeId,
TypeName=db_service_type.TypeName,
)
def db_schedule_to_api(db_schedule: DbSchedule, member: DbMember = None, service: DbService = None) -> Schedule:
schedule_dict = {
"ScheduleId": db_schedule.ScheduleId,
"ServiceId": db_schedule.ServiceId,
"MemberId": db_schedule.MemberId,
"Status": db_schedule.Status,
"ScheduledAt": db_schedule.ScheduledAt,
"AcceptedAt": db_schedule.AcceptedAt,
"DeclinedAt": db_schedule.DeclinedAt,
"ExpiresAt": db_schedule.ExpiresAt,
"DeclineReason": db_schedule.DeclineReason,
}
# Add nested member data if provided
if member:
schedule_dict["Member"] = db_member_to_api(member)
# Add nested service data if provided
if service:
schedule_dict["Service"] = db_service_to_api(service)
return Schedule(**schedule_dict)
# API Endpoints
# Member endpoints
@app.get("/api/members", response_model=List[Member])
async def get_members(repos: dict = Depends(get_repositories)):
db_members = repos["member_repo"].list_all()
result = []
for member in db_members:
# Fetch classification name if classification ID exists
classification_name = None
if member.ClassificationId:
classification = repos["classification_repo"].get_by_id(member.ClassificationId)
if classification:
classification_name = classification.ClassificationName
result.append(db_member_to_api(member, classification_name))
return result
@app.get("/api/members/{member_id}", response_model=Member)
async def get_member(member_id: int, repos: dict = Depends(get_repositories)):
db_member = repos["member_repo"].get_by_id(member_id)
if not db_member:
raise HTTPException(status_code=404, detail="Member not found")
return db_member_to_api(db_member)
@app.post("/api/members", response_model=Member)
async def create_member(member_data: MemberCreate, repos: dict = Depends(get_repositories)):
db_member = repos["member_repo"].create(
first_name=member_data.firstName,
last_name=member_data.lastName,
email=member_data.email,
phone_number=member_data.phoneNumber,
classification_id=member_data.classificationId,
notes=member_data.notes,
is_active=member_data.isActive,
)
return db_member_to_api(db_member)
@app.put("/api/members/{member_id}", response_model=Member)
async def update_member(member_id: int, member_data: Member, repos: dict = Depends(get_repositories)):
existing_member = repos["member_repo"].get_by_id(member_id)
if not existing_member:
raise HTTPException(status_code=404, detail="Member not found")
# Use the base repository _update method
updates = {
"FirstName": member_data.firstName,
"LastName": member_data.lastName,
"Email": member_data.email,
"PhoneNumber": member_data.phoneNumber,
"ClassificationId": member_data.classificationId,
"Notes": member_data.notes,
"IsActive": member_data.isActive,
}
repos["member_repo"]._update("Members", "MemberId", member_id, updates)
# Return the updated member
updated_member = repos["member_repo"].get_by_id(member_id)
return db_member_to_api(updated_member)
@app.delete("/api/members/{member_id}")
async def delete_member(member_id: int, repos: dict = Depends(get_repositories)):
existing_member = repos["member_repo"].get_by_id(member_id)
if not existing_member:
raise HTTPException(status_code=404, detail="Member not found")
repos["member_repo"]._delete("Members", "MemberId", member_id)
return {"message": "Member deleted successfully"}
@app.get("/api/members/{member_id}/schedules", response_model=List[Schedule])
async def get_member_schedules(member_id: int, repos: dict = Depends(get_repositories)):
existing_member = repos["member_repo"].get_by_id(member_id)
if not existing_member:
raise HTTPException(status_code=404, detail="Member not found")
# Get all schedules and filter by member ID (since there's no specific method)
all_schedules = repos["schedule_repo"].list_all()
member_schedules = [s for s in all_schedules if s.MemberId == member_id]
return [db_schedule_to_api(schedule) for schedule in member_schedules]
# Classification endpoints
@app.get("/api/classifications", response_model=List[Classification])
async def get_classifications(repos: dict = Depends(get_repositories)):
db_classifications = repos["classification_repo"].list_all()
return [db_classification_to_api(classification) for classification in db_classifications]
# Service endpoints
@app.get("/api/services", response_model=List[Service])
async def get_services(repos: dict = Depends(get_repositories)):
db_services = repos["service_repo"].list_all()
return [db_service_to_api(service) for service in db_services]
@app.get("/api/services/{service_id}", response_model=Service)
async def get_service(service_id: int, repos: dict = Depends(get_repositories)):
db_service = repos["service_repo"].get_by_id(service_id)
if not db_service:
raise HTTPException(status_code=404, detail="Service not found")
return db_service_to_api(db_service)
@app.post("/api/services", response_model=Service)
async def create_service(service_data: ServiceCreate, repos: dict = Depends(get_repositories)):
db_service = repos["service_repo"].create(
service_type_id=service_data.serviceTypeId,
service_date=service_data.serviceDate,
)
return db_service_to_api(db_service)
# Service Type endpoints
@app.get("/api/service-types", response_model=List[ServiceType])
async def get_service_types(repos: dict = Depends(get_repositories)):
db_service_types = repos["service_type_repo"].list_all()
return [db_service_type_to_api(service_type) for service_type in db_service_types]
# Schedule endpoints
@app.get("/api/schedules", response_model=List[Schedule])
async def get_schedules(repos: dict = Depends(get_repositories)):
db_schedules = repos["schedule_repo"].list_all()
result = []
for schedule in db_schedules:
# Fetch related member and service data
member = repos["member_repo"].get_by_id(schedule.MemberId)
service = repos["service_repo"].get_by_id(schedule.ServiceId)
# Fetch service type if service exists
service_with_type = None
if service:
service_type = repos["service_type_repo"].get_by_id(service.ServiceTypeId)
service_with_type = db_service_to_api(service, service_type)
# Convert to API format with nested data
schedule_dict = {
"ScheduleId": schedule.ScheduleId,
"ServiceId": schedule.ServiceId,
"MemberId": schedule.MemberId,
"Status": schedule.Status,
"ScheduledAt": schedule.ScheduledAt,
"AcceptedAt": schedule.AcceptedAt,
"DeclinedAt": schedule.DeclinedAt,
"ExpiresAt": schedule.ExpiresAt,
"DeclineReason": schedule.DeclineReason,
}
if member:
schedule_dict["Member"] = db_member_to_api(member)
if service_with_type:
schedule_dict["Service"] = service_with_type
result.append(Schedule(**schedule_dict))
return result
@app.get("/api/schedules/{schedule_id}", response_model=Schedule)
async def get_schedule(schedule_id: int, repos: dict = Depends(get_repositories)):
db_schedule = repos["schedule_repo"].get_by_id(schedule_id)
if not db_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
# Fetch related member and service data
member = repos["member_repo"].get_by_id(db_schedule.MemberId)
service = repos["service_repo"].get_by_id(db_schedule.ServiceId)
# Fetch service type if service exists
service_with_type = None
if service:
service_type = repos["service_type_repo"].get_by_id(service.ServiceTypeId)
service_with_type = db_service_to_api(service, service_type)
# Convert to API format with nested data
schedule_dict = {
"ScheduleId": db_schedule.ScheduleId,
"ServiceId": db_schedule.ServiceId,
"MemberId": db_schedule.MemberId,
"Status": db_schedule.Status,
"ScheduledAt": db_schedule.ScheduledAt,
"AcceptedAt": db_schedule.AcceptedAt,
"DeclinedAt": db_schedule.DeclinedAt,
"ExpiresAt": db_schedule.ExpiresAt,
"DeclineReason": db_schedule.DeclineReason,
}
if member:
schedule_dict["Member"] = db_member_to_api(member)
if service_with_type:
schedule_dict["Service"] = service_with_type
return Schedule(**schedule_dict)
@app.post("/api/schedules/{schedule_id}/accept", response_model=Schedule)
async def accept_schedule(schedule_id: int, repos: dict = Depends(get_repositories)):
db_schedule = repos["schedule_repo"].get_by_id(schedule_id)
if not db_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
repos["schedule_repo"].mark_accepted(schedule_id)
# Return the updated schedule
updated_schedule = repos["schedule_repo"].get_by_id(schedule_id)
return db_schedule_to_api(updated_schedule)
@app.post("/api/schedules/{schedule_id}/decline", response_model=Schedule)
async def decline_schedule(
schedule_id: int,
decline_data: DeclineRequest,
repos: dict = Depends(get_repositories),
scheduling_service: SchedulingService = Depends(get_scheduling_service)
):
db_schedule = repos["schedule_repo"].get_by_id(schedule_id)
if not db_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
# Use the scheduling service to decline (handles the business logic)
scheduling_service.decline_service_for_user(
member_id=db_schedule.MemberId,
service_id=db_schedule.ServiceId,
reason=decline_data.reason,
)
# Return the updated schedule
updated_schedule = repos["schedule_repo"].get_by_id(schedule_id)
return db_schedule_to_api(updated_schedule)
@app.delete("/api/schedules/{schedule_id}")
async def remove_schedule(schedule_id: int, repos: dict = Depends(get_repositories)):
existing_schedule = repos["schedule_repo"].get_by_id(schedule_id)
if not existing_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
repos["schedule_repo"]._delete("Schedules", "ScheduleId", schedule_id)
return {"message": "Schedule removed successfully"}
@app.post("/api/schedules/schedule-next", response_model=Optional[Schedule])
async def schedule_next_member(
request: ScheduleNextRequest,
scheduling_service: SchedulingService = Depends(get_scheduling_service),
repos: dict = Depends(get_repositories)
):
result = scheduling_service.schedule_next_member(
classification_ids=request.classificationIds,
service_id=request.serviceId,
only_active=True,
exclude_member_ids=set(),
)
if not result:
raise HTTPException(status_code=404, detail="No eligible member found")
# result is a tuple: (schedule_id, first_name, last_name, member_id)
schedule_id = result[0]
db_schedule = repos["schedule_repo"].get_by_id(schedule_id)
return db_schedule_to_api(db_schedule)
# Health check endpoint
@app.get("/api/health")
async def health_check():
return {"status": "healthy", "message": "NimbusFlow API is running"}