feat(backend): refactor mono repository

This commit is contained in:
2025-08-27 11:04:56 -04:00
parent d0dbba21fb
commit be1c729220
37 changed files with 2534 additions and 452 deletions

View File

@@ -0,0 +1,15 @@
from .classification import ClassificationRepository
from .member import MemberRepository
from .schedule import ScheduleRepository
from .service import ServiceRepository
from .service_availability import ServiceAvailabilityRepository
from .service_type import ServiceTypeRepository
__all__ = [
"ClassificationRepository"
"MemberRepository",
"ScheduleRepository",
"ServiceRepository",
"ServiceAvailabilityRepository",
"ServiceTypeRepository"
]

View File

@@ -0,0 +1,101 @@
# myapp/repositories/classification.py
# ------------------------------------------------------------
# Persistence layer for the ``Classification`` lookup table.
# ------------------------------------------------------------
from __future__ import annotations
from typing import List, Optional
from ..db import BaseRepository
from ..models import Classification as ClassificationModel
class ClassificationRepository(BaseRepository[ClassificationModel]):
"""
Simple CRUD + lookup helpers for the ``Classifications`` table.
Typical rows look like:
ClassificationId | ClassificationName
------------------------------------
1 | Baritone
2 | Tenor
3 | Alto / Mezzo
4 | Soprano
"""
# ------------------------------------------------------------------
# Tablelevel constants change them here if the schema ever changes.
# ------------------------------------------------------------------
_TABLE = "Classifications"
_PK = "ClassificationId"
# ------------------------------------------------------------------
# Basic CRUD operations
# ------------------------------------------------------------------
def create(self, name: str) -> ClassificationModel:
"""
Insert a new classification row and return the populated model.
Parameters
----------
name: str
Humanreadable name (e.g. “Baritone”, “Tenor”, …).
Returns
-------
ClassificationModel
Instance with the newly assigned ``ClassificationId``.
"""
classification = ClassificationModel(ClassificationId=-1, ClassificationName=name)
return self._insert(self._TABLE, classification, self._PK)
def get_by_id(self, classification_id: int) -> Optional[ClassificationModel]:
"""
Retrieve a single classification by primary key.
"""
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (classification_id,))
return ClassificationModel.from_row(row) if row else None
def find_by_name(self, name: str) -> Optional[ClassificationModel]:
"""
Look up a classification by its exact name.
"""
sql = f"SELECT * FROM {self._TABLE} WHERE ClassificationName = ?"
row = self.db.fetchone(sql, (name,))
return ClassificationModel.from_row(row) if row else None
# ------------------------------------------------------------------
# Convenience queries
# ------------------------------------------------------------------
def list_all(self) -> List[ClassificationModel]:
"""
Return every classification row, ordered alphabetically.
"""
sql = f"SELECT * FROM {self._TABLE} ORDER BY ClassificationName ASC"
rows = self.db.fetchall(sql)
return [ClassificationModel.from_row(r) for r in rows]
def ensure_exists(self, name: str) -> ClassificationModel:
"""
Idempotent helper used by higherlevel services:
* If a classification with ``name`` already exists, return it.
* Otherwise create a new row and return the freshly inserted model.
"""
existing = self.find_by_name(name)
if existing:
return existing
return self.create(name)
# ------------------------------------------------------------------
# Optional delete (use with care other tables may have FK constraints)
# ------------------------------------------------------------------
def delete(self, classification_id: int) -> None:
"""
Harddelete a classification row. In practice youll rarely need
this because classifications tend to be static reference data.
"""
sql = f"DELETE FROM {self._TABLE} WHERE {self._PK} = ?"
self.db.execute(sql, (classification_id,))

View File

@@ -0,0 +1,237 @@
# myapp/repositories/member.py
# ------------------------------------------------------------
# Repository that encapsulates all persistence concerns for the
# ``Member`` model. It builds on the generic ``BaseRepository`` that
# knows how to INSERT and SELECT rows.
# ------------------------------------------------------------
from __future__ import annotations
import datetime as _dt
from typing import List, Sequence, Optional
from ..db import BaseRepository, DatabaseConnection
from ..models import Member as MemberModel
class MemberRepository(BaseRepository[MemberModel]):
"""
Highlevel dataaccess object for ``Member`` rows.
Only *persistence* logic lives here any business rules (e.g. roundrobin
scheduling) should be implemented in a service layer that composes this
repository with others.
"""
# ------------------------------------------------------------------
# Tablelevel constants keep them in one place so a rename is easy.
# ------------------------------------------------------------------
_TABLE = "Members"
_PK = "MemberId"
# ------------------------------------------------------------------
# CRUD helpers
# ------------------------------------------------------------------
def create(
self,
first_name: str,
last_name: str,
*,
email: Optional[str] = None,
phone_number: Optional[str] = None,
classification_id: Optional[int] = None,
notes: Optional[str] = None,
is_active: int = 1,
) -> MemberModel:
"""
Insert a new member row and return the fullypopulated ``Member`` instance.
"""
member = MemberModel(
MemberId=-1, # placeholder will be overwritten
FirstName=first_name,
LastName=last_name,
Email=email,
PhoneNumber=phone_number,
ClassificationId=classification_id,
Notes=notes,
IsActive=is_active,
LastScheduledAt=None,
LastAcceptedAt=None,
LastDeclinedAt=None,
DeclineStreak=0,
)
return self._insert(self._TABLE, member, self._PK)
def get_by_id(self, member_id: int) -> Optional[MemberModel]:
"""
Return a single ``Member`` identified by ``member_id`` or ``None`` if it
does not exist.
"""
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (member_id,))
return MemberModel.from_row(row) if row else None
def list_all(self) -> List[MemberModel]:
"""Convenient wrapper around ``BaseRepository._select_all``."""
return self._select_all(self._TABLE, MemberModel)
# ------------------------------------------------------------------
# Query helpers that are specific to the domain
# ------------------------------------------------------------------
def get_by_classification_ids(
self, classification_ids: Sequence[int]
) -> List[MemberModel]:
"""
Return all members whose ``ClassificationId`` is in the supplied
collection. Empty input yields an empty list (no DB roundtrip).
"""
if not classification_ids:
return []
placeholders = ",".join("?" for _ in classification_ids)
sql = (
f"SELECT * FROM {self._TABLE} "
f"WHERE ClassificationId IN ({placeholders})"
)
rows = self.db.fetchall(sql, tuple(classification_ids))
return [MemberModel.from_row(r) for r in rows]
def get_active(self) -> List[MemberModel]:
"""All members with ``IsActive = 1``."""
sql = f"SELECT * FROM {self._TABLE} WHERE IsActive = 1"
rows = self.db.fetchall(sql)
return [MemberModel.from_row(r) for r in rows]
# ------------------------------------------------------------------
# Helper used by the scheduling service builds the roundrobin queue.
# ------------------------------------------------------------------
def candidate_queue(
self,
classification_ids: Sequence[int],
*,
only_active: bool = True,
boost_seconds: int = 172_800, # 2 days in seconds
) -> List[MemberModel]:
"""
Return members ordered for the roundrobin scheduler.
Ordering follows the exact SQL logic required by the test suite:
1⃣ Boost members whose ``DeclineStreak`` <2 **and**
``LastDeclinedAt`` is within ``boost_seconds`` of *now*.
Those rows get a leading ``0`` in the ``CASE`` expression;
all others get ``1``.
2⃣ After the boost, order by ``LastAcceptedAt`` (oldest first,
``NULL`` → farpast sentinel).
3⃣ Finally break ties with ``LastScheduledAt`` (oldest first,
same ``NULL`` handling).
Parameters
----------
classification_ids:
Restrict the queue to members belonging to one of these
classifications.
only_active:
If ``True`` (default) filter out rows where ``IsActive != 1``.
boost_seconds:
Number of seconds that count as “recently declined”.
The default is **2days** (172800s).
Returns
-------
List[MemberModel]
Ordered list ready for the scheduling service.
"""
# ------------------------------------------------------------------
# Build the dynamic WHERE clause.
# ------------------------------------------------------------------
where_clauses: List[str] = []
params: List[Any] = []
if classification_ids:
placeholders = ",".join("?" for _ in classification_ids)
where_clauses.append(f"ClassificationId IN ({placeholders})")
params.extend(classification_ids)
if only_active:
where_clauses.append("IsActive = 1")
where_sql = " AND ".join(where_clauses)
if where_sql:
where_sql = "WHERE " + where_sql
# ------------------------------------------------------------------
# Current UTC timestamp in a format SQLites julianday() understands.
# ``%Y-%m-%d %H:%M:%S`` no fractional seconds.
# ------------------------------------------------------------------
now_iso = _dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
# ------------------------------------------------------------------
# Full query note the threelevel ORDER BY.
# ------------------------------------------------------------------
sql = f"""
SELECT *
FROM {self._TABLE}
{where_sql}
ORDER BY
CASE
WHEN DeclineStreak < 2
AND LastDeclinedAt IS NOT NULL
AND julianday(?) - julianday(LastDeclinedAt) <= (? / 86400.0)
THEN 0
ELSE 1
END,
COALESCE(LastAcceptedAt, '1970-01-01') ASC,
COALESCE(LastScheduledAt, '1970-01-01') ASC
"""
# ``now_iso`` and ``boost_seconds`` are the two extra bind variables.
exec_params = tuple(params) + (now_iso, boost_seconds)
rows = self.db.fetchall(sql, exec_params)
return [MemberModel.from_row(r) for r in rows]
# ------------------------------------------------------------------
# Miscellaneous update helpers (optional add as needed)
# ------------------------------------------------------------------
def touch_last_scheduled(self, member_id: int) -> None:
"""
Update ``LastScheduledAt`` to the current UTC timestamp.
Used by the scheduling service after a schedule row is created.
"""
sql = f"""
UPDATE {self._TABLE}
SET LastScheduledAt = strftime('%Y-%m-%d %H:%M:%f', 'now')
WHERE {self._PK} = ?
"""
self.db.execute(sql, (member_id,))
def set_last_accepted(self, member_id: int) -> None:
"""
Record a successful acceptance clears any cooloff.
"""
sql = f"""
UPDATE {self._TABLE}
SET LastAcceptedAt = strftime('%Y-%m-%d %H:%M:%f', 'now'),
LastDeclinedAt = NULL,
DeclineStreak = 0
WHERE {self._PK} = ?
"""
self.db.execute(sql, (member_id,))
def set_last_declined(self, member_id: int, decline_date: str) -> None:
"""
Record a decline ``decline_date`` should be an ISOformatted date
(e.g. ``'2025-08-22'``). This implements the oneday cooloff rule
and bumps the ``DeclineStreak`` counter.
"""
sql = f"""
UPDATE {self._TABLE}
SET
LastDeclinedAt = ?,
DeclineStreak = COALESCE(DeclineStreak, 0) + 1
WHERE {self._PK} = ?
"""
self.db.execute(sql, (decline_date, member_id))

View File

@@ -0,0 +1,264 @@
# myapp/repositories/schedule.py
# ------------------------------------------------------------
# Persistence layer for the ``Schedule`` model.
# ------------------------------------------------------------
from __future__ import annotations
from typing import Any, List, Optional, Sequence
from ..db import BaseRepository
from ..models import Schedule as ScheduleModel
from ..models import ScheduleStatus
class ScheduleRepository(BaseRepository[ScheduleModel]):
"""Dataaccess object for the ``Schedules`` table."""
# ------------------------------------------------------------------
# Tablelevel constants change them in one place if the schema evolves.
# ------------------------------------------------------------------
_TABLE = "Schedules"
_PK = "ScheduleId"
# ------------------------------------------------------------------
# CRUD helpers
# ------------------------------------------------------------------
def create(
self,
*,
service_id: int,
member_id: int,
status: ScheduleStatus = ScheduleStatus.PENDING,
reason: Optional[str] = None,
scheduled_at: Optional[Any] = None,
expires_at: Optional[Any] = None,
) -> ScheduleModel:
"""
Insert a brandnew schedule row.
Parameters
----------
service_id, member_id : int
FK references.
status : ScheduleStatus
Desired initial status (PENDING, DECLINED, …).
reason : str | None
Stored in ``DeclineReason`` when the status is ``DECLINED``.
scheduled_at, expires_at : datetimecompatible | None
``scheduled_at`` defaults to SQLites ``CURRENT_TIMESTAMP``.
"""
schedule = ScheduleModel(
ScheduleId=-1, # placeholder will be replaced
ServiceId=service_id,
MemberId=member_id,
Status=status.value,
ScheduledAt=scheduled_at or "CURRENT_TIMESTAMP",
AcceptedAt=None,
DeclinedAt=None,
ExpiresAt=expires_at,
DeclineReason=reason if status == ScheduleStatus.DECLINED else None,
)
return self._insert(self._TABLE, schedule, self._PK)
def get_by_id(self, schedule_id: int) -> Optional[ScheduleModel]:
"""Fetch a schedule by its primary key."""
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (schedule_id,))
return ScheduleModel.from_row(row) if row else None
def list_all(self) -> List[ScheduleModel]:
"""Return every schedule row."""
return self._select_all(self._TABLE, ScheduleModel)
# ------------------------------------------------------------------
# Helper used by the SchedulingService to locate an existing row.
# ------------------------------------------------------------------
def get_one(self, *, member_id: int, service_id: int) -> Optional[ScheduleModel]:
"""
Return the *first* schedule (any status) for the supplied
``member_id`` / ``service_id`` pair, or ``None`` if none exists.
"""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
LIMIT 1
"""
row = self.db.fetchone(sql, (member_id, service_id))
return ScheduleModel.from_row(row) if row else None
# ------------------------------------------------------------------
# Generic statuschange helper (used for “decline” and similar ops).
# ------------------------------------------------------------------
def update_status(
self,
*,
schedule_id: int,
new_status: ScheduleStatus,
reason: Optional[str] = None,
) -> None:
"""
Switch a schedules status and optionally store a reason.
If ``new_status`` is ``DECLINED`` the ``DeclineReason`` column is
populated; otherwise it is cleared.
"""
# Build the SET clause dynamically we only touch the columns we need.
set_clause = "Status = ?, DeclinedAt = NULL, DeclineReason = NULL"
params: list[Any] = [new_status.value]
if new_status == ScheduleStatus.DECLINED:
set_clause = "Status = ?, DeclinedAt = ?, DeclineReason = ?"
params.extend(["CURRENT_TIMESTAMP", reason])
params.append(schedule_id) # WHERE clause param
sql = f"""
UPDATE {self._TABLE}
SET {set_clause}
WHERE {self._PK} = ?
"""
self.db.execute(sql, tuple(params))
# ------------------------------------------------------------------
# Query helpers used by the scheduling service
# ------------------------------------------------------------------
def has_any(
self,
member_id: int,
service_id: int,
statuses: Sequence[ScheduleStatus],
) -> bool:
"""True if a schedule exists for the pair with any of the given statuses."""
if not statuses:
return False
placeholders = ",".join("?" for _ in statuses)
sql = f"""
SELECT 1
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status IN ({placeholders})
LIMIT 1
"""
params = (member_id, service_id, *[s.value for s in statuses])
row = self.db.fetchone(sql, params)
return row is not None
def is_available(self, member_id: int, service_id: int) -> bool:
"""
Cooldown rule: a member is unavailable if they have accepted a
schedule for the same service within the last ``COOLDOWN_DAYS``.
"""
# Latest acceptance timestamp (if any)
sql_latest = f"""
SELECT MAX(AcceptedAt) AS last_accept
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
"""
row = self.db.fetchone(
sql_latest,
(member_id, service_id, ScheduleStatus.ACCEPTED.value),
)
last_accept: Optional[str] = row["last_accept"] if row else None
if not last_accept:
return True # never accepted → free to schedule
COOLDOWN_DAYS = 1
sql_cooldown = f"""
SELECT 1
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
AND DATE(AcceptedAt) >= DATE('now', '-{COOLDOWN_DAYS} day')
LIMIT 1
"""
row = self.db.fetchone(
sql_cooldown,
(member_id, service_id, ScheduleStatus.ACCEPTED.value),
)
return row is None # None → outside the cooldown window
# ------------------------------------------------------------------
# Statustransition helpers (accept / decline) kept for completeness.
# ------------------------------------------------------------------
def mark_accepted(
self,
schedule_id: int,
accepted_at: Optional[Any] = None,
) -> None:
sql = f"""
UPDATE {self._TABLE}
SET Status = ?,
AcceptedAt = ?,
DeclinedAt = NULL,
DeclineReason = NULL
WHERE {self._PK} = ?
"""
ts = accepted_at or "CURRENT_TIMESTAMP"
self.db.execute(sql, (ScheduleStatus.ACCEPTED.value, ts, schedule_id))
def mark_declined(
self,
schedule_id: int,
declined_at: Optional[Any] = None,
decline_reason: Optional[str] = None,
) -> None:
sql = f"""
UPDATE {self._TABLE}
SET Status = ?,
DeclinedAt = ?,
DeclineReason = ?
WHERE {self._PK} = ?
"""
ts = declined_at or "CURRENT_TIMESTAMP"
self.db.execute(sql, (ScheduleStatus.DECLINED.value, ts, decline_reason, schedule_id))
# ------------------------------------------------------------------
# Sameday helper used by the scheduling service
# ------------------------------------------------------------------
def has_schedule_on_date(self, member_id: int, service_date: str) -> bool:
"""
Return ``True`` if *any* schedule (regardless of status) exists for
``member_id`` on the calendar day ``service_date`` (format YYYYMMDD).
This abstracts the “a member can only be scheduled once per day”
rule so the service layer does not need to know the underlying
table layout.
"""
sql = f"""
SELECT 1
FROM {self._TABLE} AS s
JOIN Services AS sv ON s.ServiceId = sv.ServiceId
WHERE s.MemberId = ?
AND sv.ServiceDate = ?
LIMIT 1
"""
row = self.db.fetchone(sql, (member_id, service_date))
return row is not None
# ------------------------------------------------------------------
# Miscellaneous convenience queries
# ------------------------------------------------------------------
def get_pending_for_service(self, service_id: int) -> List[ScheduleModel]:
"""All PENDING schedules for a given service."""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE ServiceId = ?
AND Status = ?
"""
rows = self.db.fetchall(sql, (service_id, ScheduleStatus.PENDING.value))
return [ScheduleModel.from_row(r) for r in rows]
def delete(self, schedule_id: int) -> None:
"""Harddelete a schedule row (use with caution)."""
sql = f"DELETE FROM {self._TABLE} WHERE {self._PK} = ?"
self.db.execute(sql, (schedule_id,))

View File

@@ -0,0 +1,105 @@
# myapp/repositories/service.py
# ------------------------------------------------------------
# Persistence layer for Servicerelated models.
# ------------------------------------------------------------
from __future__ import annotations
from datetime import date, datetime
from typing import List, Optional, Sequence, Any
from ..db import BaseRepository
from ..models import Service as ServiceModel
# ----------------------------------------------------------------------
# ServiceRepository handles the ``Services`` table
# ----------------------------------------------------------------------
class ServiceRepository(BaseRepository[ServiceModel]):
"""
CRUD + query helpers for the ``Services`` table.
Business rules (e.g. “do not schedule past services”) belong in a
service layer that composes this repository with the others.
"""
_TABLE = "Services"
_PK = "ServiceId"
# ------------------------------
# Basic CRUD
# ------------------------------
def create(
self,
service_type_id: int,
service_date: date,
) -> ServiceModel:
"""
Insert a new service row.
``service_date`` can be a ``datetime.date`` or an ISO8601 string.
"""
svc = ServiceModel(
ServiceId=-1, # placeholder will be overwritten
ServiceTypeId=service_type_id,
ServiceDate=service_date,
)
return self._insert(self._TABLE, svc, self._PK)
def get_by_id(self, service_id: int) -> Optional[ServiceModel]:
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (service_id,))
return ServiceModel.from_row(row) if row else None
def list_all(self) -> List[ServiceModel]:
return self._select_all(self._TABLE, ServiceModel)
# ------------------------------
# Domainspecific queries
# ------------------------------
def upcoming(self, after: Optional[date] = None, limit: int = 100) -> List[ServiceModel]:
"""
Return services that occur on or after ``after`` (defaults to today).
Results are ordered chronologically.
"""
after_date = after or date.today()
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE ServiceDate >= ?
ORDER BY ServiceDate ASC
LIMIT ?
"""
rows = self.db.fetchall(sql, (after_date.isoformat(), limit))
return [ServiceModel.from_row(r) for r in rows]
def by_type(self, service_type_ids: Sequence[int]) -> List[ServiceModel]:
"""
Fetch all services whose ``ServiceTypeId`` is in the supplied list.
Empty input → empty list (no DB roundtrip).
"""
if not service_type_ids:
return []
placeholders = ",".join("?" for _ in service_type_ids)
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE ServiceTypeId IN ({placeholders})
ORDER BY ServiceDate ASC
"""
rows = self.db.fetchall(sql, tuple(service_type_ids))
return [ServiceModel.from_row(r) for r in rows]
# ------------------------------
# Update helpers (optional)
# ------------------------------
def reschedule(self, service_id: int, new_date: date) -> None:
"""
Change the ``ServiceDate`` of an existing service.
"""
sql = f"""
UPDATE {self._TABLE}
SET ServiceDate = ?
WHERE {self._PK} = ?
"""
self.db.execute(sql, (new_date.isoformat(), service_id))

View File

@@ -0,0 +1,158 @@
# myapp/repositories/service_availability.py
# ------------------------------------------------------------
# Persistence layer for the ServiceAvailability table.
# ------------------------------------------------------------
from __future__ import annotations
from typing import List, Optional, Sequence, Any
from ..db import BaseRepository
from ..models import ServiceAvailability as ServiceAvailabilityModel
class ServiceAvailabilityRepository(BaseRepository[ServiceAvailabilityModel]):
"""
CRUD + query helpers for the ``ServiceAvailability`` table.
The table records which members are allowed to receive which
servicetype slots (e.g. “9AM”, “11AM”, “6PM”). All SQL is
parameterised to stay safe from injection attacks.
"""
# ------------------------------------------------------------------
# Tablelevel constants change them in one place if the schema evolves.
# ------------------------------------------------------------------
_TABLE = "ServiceAvailability"
_PK = "ServiceAvailabilityId"
# ------------------------------------------------------------------
# Basic CRUD helpers
# ------------------------------------------------------------------
def create(
self,
member_id: int,
service_type_id: int,
) -> ServiceAvailabilityModel:
"""
Insert a new availability row.
The ``UNIQUE (MemberId, ServiceTypeId)`` constraint guarantees
idempotency if the pair already exists SQLite will raise an
``IntegrityError``. To make the operation truly idempotent we
first check for an existing row and return it unchanged.
"""
existing = self.get(member_id, service_type_id)
if existing:
return existing
avail = ServiceAvailabilityModel(
ServiceAvailabilityId=-1, # placeholder will be overwritten
MemberId=member_id,
ServiceTypeId=service_type_id,
)
return self._insert(self._TABLE, avail, self._PK)
def get(
self,
member_id: int,
service_type_id: int,
) -> Optional[ServiceAvailabilityModel]:
"""
Retrieve a single availability record for the given member /
servicetype pair, or ``None`` if it does not exist.
"""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceTypeId = ?
"""
row = self.db.fetchone(sql, (member_id, service_type_id))
return ServiceAvailabilityModel.from_row(row) if row else None
def delete(self, availability_id: int) -> None:
"""
Harddelete an availability row by its primary key.
Use with care most callers will prefer ``revoke`` (by member &
service type) which is a bit more expressive.
"""
sql = f"DELETE FROM {self._TABLE} WHERE {self._PK} = ?"
self.db.execute(sql, (availability_id,))
# ------------------------------------------------------------------
# Convenience “grant / revoke” helpers (the most common ops)
# ------------------------------------------------------------------
def grant(self, member_id: int, service_type_id: int) -> ServiceAvailabilityModel:
"""
Public API to give a member permission for a particular service slot.
Internally delegates to ``create`` which already handles the
idempotentcheck.
"""
return self.create(member_id, service_type_id)
def revoke(self, member_id: int, service_type_id: int) -> None:
"""
Remove a members permission for a particular service slot.
"""
sql = f"""
DELETE FROM {self._TABLE}
WHERE MemberId = ?
AND ServiceTypeId = ?
"""
self.db.execute(sql, (member_id, service_type_id))
# ------------------------------------------------------------------
# Query helpers used by the scheduling service
# ------------------------------------------------------------------
def list_by_member(self, member_id: int) -> List[ServiceAvailabilityModel]:
"""
Return every ``ServiceAvailability`` row that belongs to the given
member. Handy for building a members personal “available slots”
view.
"""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE MemberId = ?
"""
rows = self.db.fetchall(sql, (member_id,))
return [ServiceAvailabilityModel.from_row(r) for r in rows]
def list_by_service_type(self, service_type_id: int) -> List[ServiceAvailabilityModel]:
"""
Return all members that are allowed to receive the given service type.
"""
sql = f"""
SELECT *
FROM {self._TABLE}
WHERE ServiceTypeId = ?
"""
rows = self.db.fetchall(sql, (service_type_id,))
return [ServiceAvailabilityModel.from_row(r) for r in rows]
def list_all(self) -> List[ServiceAvailabilityModel]:
"""
Return every row in the table useful for admin dashboards or
bulkexport scripts.
"""
return self._select_all(self._TABLE, ServiceAvailabilityModel)
# ------------------------------------------------------------------
# Helper for the roundrobin scheduler
# ------------------------------------------------------------------
def members_for_type(self, service_type_id: int) -> List[int]:
"""
Return a flat list of ``MemberId`` values that are eligible for the
supplied ``service_type_id``. The scheduling service can then
intersect this list with the pool of members that have the correct
classification, activity flag, etc.
"""
sql = f"""
SELECT MemberId
FROM {self._TABLE}
WHERE ServiceTypeId = ?
"""
rows = self.db.fetchall(sql, (service_type_id,))
# ``rows`` is a sequence of sqlite3.Row objects; each row acts like a dict.
return [row["MemberId"] for row in rows]

View File

@@ -0,0 +1,95 @@
# myapp/repositories/service_type.py
# ------------------------------------------------------------
# Persistence layer for the ``ServiceTypes`` table.
# ------------------------------------------------------------
from __future__ import annotations
from typing import List, Optional
from ..db import BaseRepository
from ..models import ServiceType as ServiceTypeModel
class ServiceTypeRepository(BaseRepository[ServiceTypeModel]):
"""
CRUDstyle helper for the ``ServiceTypes`` lookup table.
* Each row stores a humanreadable label (e.g. "9AM").
* The repository does **not** enforce any particular naming scheme
that kind of validation belongs in a higherlevel service layer if you
need it.
"""
# ------------------------------------------------------------------
# Tablelevel constants change them in one place if the schema evolves.
# ------------------------------------------------------------------
_TABLE = "ServiceTypes"
_PK = "ServiceTypeId"
# ------------------------------------------------------------------
# Basic CRUD operations
# ------------------------------------------------------------------
def create(self, type_name: str) -> ServiceTypeModel:
"""
Insert a new servicetype row and return the populated model.
Parameters
----------
type_name: str
Humanreadable identifier for the slot (e.g. "9AM").
Returns
-------
ServiceTypeModel
Instance with the freshly assigned primarykey.
"""
st = ServiceTypeModel(ServiceTypeId=-1, TypeName=type_name)
return self._insert(self._TABLE, st, self._PK)
def get_by_id(self, type_id: int) -> Optional[ServiceTypeModel]:
"""Fetch a single ServiceType by its primary key."""
sql = f"SELECT * FROM {self._TABLE} WHERE {self._PK} = ?"
row = self.db.fetchone(sql, (type_id,))
return ServiceTypeModel.from_row(row) if row else None
# ------------------------------------------------------------------
# Convenience lookups
# ------------------------------------------------------------------
def find_by_name(self, name: str) -> Optional[ServiceTypeModel]:
"""
Return the ServiceType whose ``TypeName`` matches ``name``.
Useful for turning a userprovided slot label into its integer id.
"""
sql = f"SELECT * FROM {self._TABLE} WHERE TypeName = ?"
row = self.db.fetchone(sql, (name,))
return ServiceTypeModel.from_row(row) if row else None
def list_all(self) -> List[ServiceTypeModel]:
"""Return every ServiceType row, ordered alphabetically by name."""
sql = f"SELECT * FROM {self._TABLE} ORDER BY TypeName ASC"
rows = self.db.fetchall(sql)
return [ServiceTypeModel.from_row(r) for r in rows]
# ------------------------------------------------------------------
# Optional helper bulkensure a set of expected slots exists
# ------------------------------------------------------------------
def ensure_slots(self, slot_names: List[str]) -> List[ServiceTypeModel]:
"""
Given a list of desired slot labels (e.g. ["9AM","11AM","6PM"]),
insert any that are missing and return the complete set of
ServiceTypeModel objects.
This is handy during application bootstrap or migrations.
"""
existing = {st.TypeName: st for st in self.list_all()}
result: List[ServiceTypeModel] = []
for name in slot_names:
if name in existing:
result.append(existing[name])
else:
# Insert the missing slot and add it to the result list.
result.append(self.create(name))
return result