Files
nimbusflow/backend/database/repository.py

525 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime as dt
from typing import Optional, Tuple, List
from .connection import DatabaseConnection
from .models import (
Classification,
Member,
ServiceType,
Service,
ServiceAvailability,
Schedule,
AcceptedLog,
DeclineLog,
ScheduledLog,
)
class Repository:
"""
Highlevel dataaccess layer.
Responsibilities
----------------
* CRUD helpers for the core tables.
* Roundrobin queue that respects:
- Members.LastAcceptedAt (fair order)
- Members.LastDeclinedAt (oneday cooloff)
* “Reservation” handling using the **Schedules** table
(pending → accepted → declined).
* Audit logging (AcceptedLog, DeclineLog, ScheduledLog).
"""
def __init__(self, db: DatabaseConnection):
self.db = db
# -----------------------------------------------------------------
# CRUD helpers they now return model objects (or IDs)
# -----------------------------------------------------------------
# -----------------------------------------------------------------
# CREATE
# -----------------------------------------------------------------
def create_classification(self, classification_name: str) -> Classification:
"""Insert a new classification and return the saved model."""
classification = Classification(
ClassificationId=-1, # placeholder will be replaced by DB
ClassificationName=classification_name,
)
# Build INSERT statement from the dataclass dict (skip PK)
data = classification.to_dict()
data.pop("ClassificationId") # AUTOINCREMENT column
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Classifications ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
classification.ClassificationId = self.db.lastrowid
return classification
def create_member(
self,
first_name: str,
last_name: str,
email: Optional[str] = None,
phone_number: Optional[str] = None,
classification_id: Optional[int] = None,
notes: Optional[str] = None,
is_active: int = 1,
) -> Member:
"""Insert a new member and return the saved model."""
member = Member(
MemberId=-1,
FirstName=first_name,
LastName=last_name,
Email=email,
PhoneNumber=phone_number,
ClassificationId=classification_id,
Notes=notes,
IsActive=is_active,
LastAcceptedAt=None,
LastDeclinedAt=None,
)
data = member.to_dict()
data.pop("MemberId") # let SQLite fill the PK
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Members ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
member.MemberId = self.db.lastrowid
return member
def create_service_type(self, type_name: str) -> ServiceType:
"""Insert a new service type."""
st = ServiceType(ServiceTypeId=-1, TypeName=type_name)
data = st.to_dict()
data.pop("ServiceTypeId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO ServiceTypes ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
st.ServiceTypeId = self.db.lastrowid
return st
def create_service(self, service_type_id: int, service_date: dt.date) -> Service:
"""Insert a new service row (date + type)."""
sv = Service(ServiceId=-1, ServiceTypeId=service_type_id, ServiceDate=service_date)
data = sv.to_dict()
data.pop("ServiceId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Services ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
sv.ServiceId = self.db.lastrowid
return sv
def create_service_availability(self, member_id: int, service_type_id: int) -> ServiceAvailability:
"""Link a member to a service type (availability matrix)."""
sa = ServiceAvailability(
ServiceAvailabilityId=-1,
MemberId=member_id,
ServiceTypeId=service_type_id,
)
data = sa.to_dict()
data.pop("ServiceAvailabilityId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO ServiceAvailability ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
sa.ServiceAvailabilityId = self.db.lastrowid
return sa
# -----------------------------------------------------------------
# READ return **lists of models**
# -----------------------------------------------------------------
def get_all_classifications(self) -> List[Classification]:
rows = self.db.fetchall("SELECT * FROM Classifications")
return [Classification.from_row(r) for r in rows]
def get_all_members(self) -> List[Member]:
rows = self.db.fetchall("SELECT * FROM Members")
return [Member.from_row(r) for r in rows]
def get_all_service_types(self) -> List[ServiceType]:
rows = self.db.fetchall("SELECT * FROM ServiceTypes")
return [ServiceType.from_row(r) for r in rows]
def get_all_services(self) -> List[Service]:
rows = self.db.fetchall("SELECT * FROM Services")
return [Service.from_row(r) for r in rows]
def get_all_service_availability(self) -> List[ServiceAvailability]:
rows = self.db.fetchall("SELECT * FROM ServiceAvailability")
return [ServiceAvailability.from_row(r) for r in rows]
# -----------------------------------------------------------------
# INTERNAL helpers used by the queue logic
# -----------------------------------------------------------------
def _lookup_classification(self, name: str) -> int:
"""Return ClassificationId for a given name; raise if missing."""
row = self.db.fetchone(
"SELECT ClassificationId FROM Classifications WHERE ClassificationName = ?",
(name,),
)
if row is None:
raise ValueError(f'Classification "{name}" does not exist')
return row["ClassificationId"]
def _ensure_service(self, service_date: dt.date) -> int:
"""
Return a ServiceId for ``service_date``.
If the row does not exist we create a generic Service row
(using the first ServiceType as a default).
"""
row = self.db.fetchone(
"SELECT ServiceId FROM Services WHERE ServiceDate = ?", (service_date,)
)
if row:
return row["ServiceId"]
default_type = self.db.fetchone(
"SELECT ServiceTypeId FROM ServiceTypes LIMIT 1"
)
if not default_type:
raise RuntimeError(
"No ServiceTypes defined cannot create a Service row"
)
self.db.execute(
"INSERT INTO Services (ServiceTypeId, ServiceDate) VALUES (?,?)",
(default_type["ServiceTypeId"], service_date),
)
return self.db.lastrowid
def has_schedule_for_service(
self,
member_id: int,
service_id: int,
status: str,
include_expired: bool = False,
) -> bool:
"""
Return True if the member has a schedule row for the given ``service_id``
with the specified ``status``.
For ``status='pending'`` the default behaviour is to ignore rows whose
``ExpiresAt`` timestamp is already in the past (they are not actionable).
Set ``include_expired=True`` if you deliberately want to see *any* pending
row regardless of its expiration.
Parameters
----------
member_id : int
The member we are inspecting.
service_id : int
The service we are interested in.
status : str
One of the schedule statuses (e.g. ``'accepted'`` or ``'pending'``).
include_expired : bool, optional
When checking for pending rows, ignore the expiration guard if set to
``True``. Defaults to ``False`` (i.e. only nonexpired pending rows
count).
Returns
-------
bool
True if a matching row exists, otherwise False.
"""
sql = """
SELECT 1
FROM Schedules
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
"""
args = [member_id, service_id, status]
# Guard against expired pending rows unless the caller explicitly wants them.
if not include_expired and status == "pending":
sql += " AND ExpiresAt > CURRENT_TIMESTAMP"
sql += " LIMIT 1"
row = self.db.fetchone(sql, tuple(args))
return row is not None
def schedule_next_member(
self,
classification_id: int,
service_id: int,
only_active: bool = True,
) -> Optional[Tuple[int, str, str, int]]:
"""
Choose the next member for ``service_id`` while respecting ServiceAvailability.
Ordering (highlevel):
1⃣ 5day decline boost only if DeclineStreak < 2.
2⃣ Oldest LastAcceptedAt (roundrobin).
3⃣ Oldest LastScheduledAt (tiebreaker).
Skipped if any of the following is true:
• Member lacks a ServiceAvailability row for the ServiceType of ``service_id``.
• Member already has an *accepted* schedule for this service.
• Member already has a *pending* schedule for this service.
• Member already has a *declined* schedule for this service.
"""
# -----------------------------------------------------------------
# 0⃣ Resolve ServiceTypeId (and ServiceDate) from the Services table.
# -----------------------------------------------------------------
svc_row = self.db.fetchone(
"SELECT ServiceTypeId, ServiceDate FROM Services WHERE ServiceId = ?",
(service_id,),
)
if not svc_row:
# No such service nothing to schedule.
return None
service_type_id = svc_row["ServiceTypeId"]
# If you need the actual calendar date later you can use:
# service_date = dt.datetime.strptime(svc_row["ServiceDate"], "%Y-%m-%d").date()
# -----------------------------------------------------------------
# 1⃣ Pull the candidate queue, ordered per the existing rules.
# -----------------------------------------------------------------
BOOST_SECONDS = 5 * 24 * 60 * 60 # 5 days
now_iso = dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
sql = f"""
SELECT
MemberId,
FirstName,
LastName,
LastAcceptedAt,
LastScheduledAt,
LastDeclinedAt,
DeclineStreak
FROM Members
WHERE ClassificationId = ?
{"AND IsActive = 1" if only_active else ""}
ORDER BY
/* ① 5day boost (only when streak < 2) */
CASE
WHEN DeclineStreak < 2
AND LastDeclinedAt IS NOT NULL
AND julianday(?) - julianday(LastDeclinedAt) <= (? / 86400.0)
THEN 0 -- boosted to the front
ELSE 1
END,
/* ② Roundrobin: oldest acceptance first */
COALESCE(LastAcceptedAt, '1970-01-01') ASC,
/* ③ Tiebreaker: oldest offer first */
COALESCE(LastScheduledAt, '1970-01-01') ASC
"""
queue = self.db.fetchall(sql, (classification_id, now_iso, BOOST_SECONDS))
# -----------------------------------------------------------------
# 2⃣ Walk the ordered queue and apply availability + status constraints.
# -----------------------------------------------------------------
for member in queue:
member_id = member["MemberId"]
# ----- Availability check -------------------------------------------------
# Skip members that do NOT have a row in ServiceAvailability for this
# ServiceType.
avail_ok = self.db.fetchone(
"""
SELECT 1
FROM ServiceAvailability
WHERE MemberId = ?
AND ServiceTypeId = ?
LIMIT 1
""",
(member_id, service_type_id),
)
if not avail_ok:
continue # Not eligible for this service type.
# ----- Status constraints (all by service_id) ----------------------------
# a) Already *accepted* for this service?
if self.has_schedule_for_service(member_id, service_id, status="accepted"):
continue
# b) Existing *pending* reservation for this service?
if self.has_schedule_for_service(member_id, service_id, status="pending"):
continue
# c) Already *declined* this service?
if self.has_schedule_for_service(member_id, service_id, status="declined"):
continue
# -------------------------------------------------------------
# SUCCESS create a pending schedule (minimal columns).
# -------------------------------------------------------------
self.db.execute(
"""
INSERT INTO Schedules
(ServiceId, MemberId, Status)
VALUES
(?,?,?)
""",
(service_id, member_id, "pending"),
)
schedule_id = self.db.lastrowid
# -------------------------------------------------------------
# Update the member's LastScheduledAt so the roundrobin stays fair.
# -------------------------------------------------------------
self.db.execute(
"""
UPDATE Members
SET LastScheduledAt = CURRENT_TIMESTAMP
WHERE MemberId = ?
""",
(member_id,),
)
# -------------------------------------------------------------
# Audit log historic record (no ScheduleId column any more).
# -------------------------------------------------------------
self.db.execute(
"""
INSERT INTO ScheduledLog (MemberId, ServiceId)
VALUES (?,?)
""",
(member_id, service_id),
)
# -------------------------------------------------------------
# Return the useful bits to the caller.
# -------------------------------------------------------------
return (
member_id,
member["FirstName"],
member["LastName"],
schedule_id,
)
# -----------------------------------------------------------------
# No eligible member found.
# -----------------------------------------------------------------
return None
# -----------------------------------------------------------------
# ACCEPT / DECLINE workflow (operates on the schedule row)
# -----------------------------------------------------------------
def accept_schedule(self, schedule_id: int) -> None:
"""
Convert a *pending* schedule into a real assignment.
- Updates the schedule row (status → accepted, timestamp).
- Writes an entry into ``AcceptedLog``.
- Updates ``Members.LastAcceptedAt`` (advances roundrobin) and clears any cooloff.
"""
# Load the pending schedule raise if it does not exist or is not pending
sched = self.db.fetchone(
"""
SELECT ScheduleId, ServiceId, MemberId
FROM Schedules
WHERE ScheduleId = ?
AND Status = 'pending'
""",
(schedule_id,),
)
if not sched:
raise ValueError("Schedule not found or not pending")
service_id = sched["ServiceId"]
member_id = sched["MemberId"]
# 1⃣ Mark the schedule as accepted
self.db.execute(
"""
UPDATE Schedules
SET Status = 'accepted',
AcceptedAt = CURRENT_TIMESTAMP,
ExpiresAt = CURRENT_TIMESTAMP -- no longer expires
WHERE ScheduleId = ?
""",
(schedule_id,),
)
# 2⃣ Audit log
self.db.execute(
"""
INSERT INTO AcceptedLog (MemberId, ServiceId)
VALUES (?,?)
""",
(member_id, service_id),
)
# 3⃣ Advance roundrobin for the member
self.db.execute(
"""
UPDATE Members
SET LastAcceptedAt = CURRENT_TIMESTAMP,
LastDeclinedAt = NULL -- a successful accept clears any cooloff
WHERE MemberId = ?
""",
(member_id,),
)
def decline_schedule(
self, schedule_id: int, reason: Optional[str] = None
) -> None:
"""
Record that the member declined the offered slot.
Effects
-------
* Inserts a row into ``DeclineLog`` (with the service day).
* Updates ``Members.LastDeclinedAt`` this implements the oneday cooloff.
* Marks the schedule row as ``declined`` (so it can be offered to someone else).
"""
# Load the pending schedule raise if not found / not pending
sched = self.db.fetchone(
"""
SELECT ScheduleId, ServiceId, MemberId
FROM Schedules
WHERE ScheduleId = ?
AND Status = 'pending'
""",
(schedule_id,),
)
if not sched:
raise ValueError("Schedule not found or not pending")
service_id = sched["ServiceId"]
member_id = sched["MemberId"]
# Need the service *day* for the oneday cooloff
svc = self.db.fetchone(
"SELECT ServiceDate FROM Services WHERE ServiceId = ?", (service_id,)
)
if not svc:
raise RuntimeError("Service row vanished while processing decline")
service_day = svc["ServiceDate"] # stored as TEXT 'YYYYMMDD'
# 1⃣ Insert into DeclineLog
self.db.execute(
"""
INSERT INTO DeclineLog (MemberId, ServiceId, DeclineDate, Reason)
VALUES (?,?,?,?)
""",
(member_id, service_id, service_day, reason),
)
# 2⃣ Update the member's cooloff day
self.db.execute(
"""
UPDATE Members
SET LastDeclinedAt = ?
WHERE MemberId = ?
""",
(service_day, member_id),
)
# 3⃣ Mark the schedule row as declined
self.db.execute(
"""
UPDATE Schedules
SET Status = 'declined',
DeclinedAt = CURRENT_TIMESTAMP,
DeclineReason = ?
WHERE ScheduleId = ?
""",
(reason, schedule_id),
)