Files
nimbusflow/backend/repositories/schedule.py

270 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# myapp/repositories/schedule.py
# ------------------------------------------------------------
# Persistence layer for the ``Schedule`` model.
# ------------------------------------------------------------
from __future__ import annotations
from typing import Any, List, Optional, Sequence
from backend.db import BaseRepository
from backend.models import Schedule as ScheduleModel
from backend.models import ScheduleStatus
class ScheduleRepository(BaseRepository[ScheduleModel]):
"""Dataaccess object for the ``Schedules`` table."""
# ------------------------------------------------------------------
# Tablelevel constants change them in one place if the schema evolves.
# ------------------------------------------------------------------
_TABLE = "Schedules"
_PK = "ScheduleId"
# ------------------------------------------------------------------
# CRUD helpers
# ------------------------------------------------------------------
def create(
self,
*,
service_id: int,
member_id: int,
status: ScheduleStatus = ScheduleStatus.PENDING,
reason: Optional[str] = None,
scheduled_at: Optional[Any] = None,
expires_at: Optional[Any] = None,
) -> ScheduleModel:
"""
Insert a brandnew schedule row.
Parameters
----------
service_id, member_id : int
FK references.
status : ScheduleStatus
Desired initial status (PENDING, DECLINED, …).
reason : str | None
Stored in ``DeclineReason`` when the status is ``DECLINED``.
scheduled_at, expires_at : datetimecompatible | None
``scheduled_at`` defaults to SQLites ``CURRENT_TIMESTAMP``.
"""
schedule = ScheduleModel(
ScheduleId=-1, # placeholder will be replaced
ServiceId=service_id,
MemberId=member_id,
Status=status.value,
ScheduledAt=scheduled_at or "CURRENT_TIMESTAMP",
AcceptedAt=None,
DeclinedAt=None,
ExpiresAt=expires_at,
DeclineReason=reason if status == ScheduleStatus.DECLINED else None,
)
return self._insert(self._TABLE, schedule, self._PK)
def get_by_id(self, schedule_id: int) -> Optional[ScheduleModel]:
"""Fetch a schedule by its primary key."""
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (schedule_id,))
return ScheduleModel.from_row(row) if row else None
def list_all(self) -> List[ScheduleModel]:
"""Return every schedule row."""
return self._select_all(self._TABLE, ScheduleModel)
# ------------------------------------------------------------------
# Helper used by the SchedulingService to locate an existing row.
# ------------------------------------------------------------------
def get_one(self, *, member_id: int, service_id: int) -> Optional[ScheduleModel]:
"""
Return the *first* schedule (any status) for the supplied
``member_id`` / ``service_id`` pair, or ``None`` if none exists.
"""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
LIMIT 1
"""
row = self.db.fetchone(sql, (member_id, service_id))
return ScheduleModel.from_row(row) if row else None
# ------------------------------------------------------------------
# Generic statuschange helper (used for “decline” and similar ops).
# ------------------------------------------------------------------
def update_status(
self,
*,
schedule_id: int,
new_status: ScheduleStatus,
reason: Optional[str] = None,
) -> None:
"""
Switch a schedules status and optionally store a reason.
If ``new_status`` is ``DECLINED`` the ``DeclineReason`` column is
populated; otherwise it is cleared.
"""
# Build the SET clause dynamically we only touch the columns we need.
set_clause = "Status = ?, DeclinedAt = NULL, DeclineReason = NULL"
params: list[Any] = [new_status.value]
if new_status == ScheduleStatus.DECLINED:
set_clause = "Status = ?, DeclinedAt = ?, DeclineReason = ?"
params.extend(["CURRENT_TIMESTAMP", reason])
params.append(schedule_id) # WHERE clause param
sql = f"""
UPDATE {self._TABLE}
SET {set_clause}
WHERE {self._PK} = ?
"""
self.db.execute(sql, tuple(params))
# ------------------------------------------------------------------
# Query helpers used by the scheduling service
# ------------------------------------------------------------------
def has_any(
self,
member_id: int,
service_id: int,
statuses: Sequence[ScheduleStatus],
) -> bool:
"""True if a schedule exists for the pair with any of the given statuses."""
if not statuses:
return False
placeholders = ",".join("?" for _ in statuses)
sql = f"""
SELECT 1
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status IN ({placeholders})
LIMIT 1
"""
params = (member_id, service_id, *[s.value for s in statuses])
row = self.db.fetchone(sql, params)
return row is not None
def is_available(self, member_id: int, service_id: int) -> bool:
"""
Cooldown rule: a member is unavailable if they have accepted a
schedule for the same service within the last ``COOLDOWN_DAYS``.
"""
# Latest acceptance timestamp (if any)
sql_latest = f"""
SELECT MAX(AcceptedAt) AS last_accept
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
"""
row = self.db.fetchone(
sql_latest,
(member_id, service_id, ScheduleStatus.ACCEPTED.value),
)
last_accept: Optional[str] = row["last_accept"] if row else None
if not last_accept:
return True # never accepted → free to schedule
COOLDOWN_DAYS = 1
sql_cooldown = f"""
SELECT 1
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
AND DATE(AcceptedAt) >= DATE('now', '-{COOLDOWN_DAYS} day')
LIMIT 1
"""
row = self.db.fetchone(
sql_cooldown,
(member_id, service_id, ScheduleStatus.ACCEPTED.value),
)
return row is None # None → outside the cooldown window
# ------------------------------------------------------------------
# Statustransition helpers (accept / decline) kept for completeness.
# ------------------------------------------------------------------
def mark_accepted(
self,
schedule_id: int,
accepted_at: Optional[Any] = None,
) -> None:
sql = f"""
UPDATE {self._TABLE}
SET Status = ?,
AcceptedAt = ?,
DeclinedAt = NULL,
DeclineReason = NULL
WHERE {self._PK} = ?
"""
ts = accepted_at or "CURRENT_TIMESTAMP"
self.db.execute(sql, (ScheduleStatus.ACCEPTED.value, ts, schedule_id))
def mark_declined(
self,
schedule_id: int,
declined_at: Optional[Any] = None,
decline_reason: Optional[str] = None,
) -> None:
sql = f"""
UPDATE {self._TABLE}
SET Status = ?,
DeclinedAt = ?,
DeclineReason = ?
WHERE {self._PK} = ?
"""
ts = declined_at or "CURRENT_TIMESTAMP"
self.db.execute(sql, (ScheduleStatus.DECLINED.value, ts, decline_reason, schedule_id))
# ------------------------------------------------------------------
# Sameday helper used by the scheduling service
# ------------------------------------------------------------------
def has_schedule_on_date(self, member_id: int, service_date: str) -> bool:
"""
Return ``True`` if *any* schedule (regardless of status) exists for
``member_id`` on the calendar day ``service_date`` (format YYYYMMDD).
This abstracts the “a member can only be scheduled once per day”
rule so the service layer does not need to know the underlying
table layout.
"""
sql = f"""
SELECT 1
FROM {self._TABLE} AS s
JOIN Services AS sv ON s.ServiceId = sv.ServiceId
WHERE s.MemberId = ?
AND sv.ServiceDate = ?
LIMIT 1
"""
row = self.db.fetchone(sql, (member_id, service_date))
return row is not None
# ------------------------------------------------------------------
# Miscellaneous convenience queries
# ------------------------------------------------------------------
def get_pending_for_service(self, service_id: int) -> List[ScheduleModel]:
"""All PENDING schedules for a given service."""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE ServiceId = ?
AND Status = ?
"""
rows = self.db.fetchall(sql, (service_id, ScheduleStatus.PENDING.value))
return [ScheduleModel.from_row(r) for r in rows]
def delete_schedule(self, schedule_id: int) -> bool:
"""
Delete a schedule by ID.
Returns:
bool: True if a schedule was deleted, False if not found
"""
sql = f"DELETE FROM {self._TABLE} WHERE {self._PK} = ?"
cursor = self.db.execute(sql, (schedule_id,))
return cursor.rowcount > 0