Compare commits

...

2 Commits

Author SHA1 Message Date
94900b19f7 feat(cli): improve member design and usability 2025-08-28 16:21:39 -04:00
954abb704e feat(backend): add database versioning 2025-08-28 10:04:56 -04:00
11 changed files with 669 additions and 708 deletions

View File

@@ -1,42 +0,0 @@
████████████████████████████████████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████████████████████████
█████████████████████████████████████████████▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░░░▒▒▒▒▒▓▓████████████████████████████
███████████████████████████████████▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░░░░▒▒▒████████████████████
████████████████████████████▓▒▒▒▒▒▒▒▒▓▓████████████████████████████████▓▒▒▒▒▒▒░░░░░▒▓███████████████
██████████████████████▓▒▒▒▒▓██████████████████████████████████████████████████▒▒▒▒░░░░░▒████████████
█████████████████▓▒▒▓████████████████████████████████████████████████████████████▓▒▒▒░░░░▒██████████
█████████████▓▓▓████████████████████████████████████████████████████████████████████▒▒▒░░░░▒████████
██████████▓██████████████████████████████████████████████████████████████████████████▓▒▒░░░░▒███████
███████████████████████████████████████▓▒▒▒▒▒▒▒▒▒▒▒███████████████████████████████████▓▒░░░░░▒██████
████████████████████████▓▓▓▓█████████▓▒▒▒▒▒░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒█████████████████████████▒▒░░░░▒▓█████
███████████████████▓▒▒▒▒░░░░░░▒▓▓▒▒▒▒▒▒▒▒░░░░░░░░░░░░░░░░░░░░░▒▒██████████████████████▓▒░░░░░▒▒█████
█████████████████▓▒▒▒░░░░░░░░░░░░▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒█████████████████████▒░░░░░▒▒▓█████
████████████████▒▒▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒███████████████████▒░░░░░▒▒▒▓█████
███████████████▓▒▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒████████████████▓▒░░░░░░▒▒▒▒██████
███████████████▒▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒▒██████████████▒░░░░░░░▒▒▒▒▒███████
█████████████▓▒▒▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒░░░░░░░░░░░░▒▒▒▒▒▓███████▓▒░░░░░░░░░▒▒▒▒▒▓████████
███████████▒▒▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒▒░░░░░░░░░░░░▒▒▒▒▓▓▒░░░░░░░░░░░░▒▒▒▒▒▒██████████
██████████▓▒░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░▒▒▒░░░░░░░░░░░░▒▒▒▒░░░░░░░░░░▒▒▒▒▒▒▒▒▓███████████
█████████▒▒▒▒░░░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒░░░░░░░░░░░░░░░░▒▒▒▒░░░░░░░░░░░▒▒▒▒░░░░░░▒▒▒▒▒▒▒▒▒▓██████████████
███████▒▒▒▒░░░░░░░░░░▒▒▒▒▒░░░░░░░░▒▒▒░░░░░░░░░░░░░░▒▒▒▒░░░░░▒░░░░▒▒▒▒▒░▒▒▒▒▒▒▒▒▒▒▒▒▓████████████████
██████▒▒▒▒░░░░░░░░░░▒▒▒░░░░░░░░░░░░░░▒▒░░░░░░░░░░░▒▒▒▒░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓████████████████████
█████▒▒▒▒░░░░░░░░░▒▒▒░░░░░░░░░░░░░░░░░▒▒░░░░░░░░░▒▒▒▒▒░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓████████████████████████
█████▒▒▒▒░░░░░▒▒░▒▒▒▒░░░░░░░▒▒░░░░░░░░▒▒░░░░░░░░░░▒▒▒▒░░░░▒▒▒▒▒▒▒▒▒▒▒▒▓▓████████████████████████████
█████▒▒▒▒░░░░░▒▒░▒▒▒▒░░░░░▒▒░░░░░░░░░░▒▒▒░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒██████████████████████████████
█████▒▒▒▒░░░░▒▒▒░▒▒▒▒░░░░▒▒░░░░░░░░░░░▒▒░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓██████████████████████████████
█████▓▒▒▒▒▒░░▒▒▒▒▒▒▒▒░░░░▒▒░░░░░░░░░░▒▒▒░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒███████████████████████████████
██████▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░░▒▒▒▒░░░░░░▒▒▒▒▒▒▒░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓████████████████████████████████
████████▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓██████████████████████████████████
██████████▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒███████████████████████████████████████
███████████▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒███████████████████████████████████████
███████████▓▒▒▒▒▒▒▒▒▒▒▒░░░░░░░░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒████████████████████████████████████████
█████████████▓▒▒▒▒▒▒▒▒▒░░░░░░░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒█████████████████████████████████████████
███████████████████▓▒▒▒▒░░░░░░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▓██████████████████████████████████████████
████████████████████▓▒▒▒▒░░░░░░░░░░░▒▒▒▒▒▒▒▒▒▒▓▒▒▒▒▒▒▒▓█████████████████████████████████████████████
█████████████████████▓▒▒▒▒▒▒░░░▒▒▒▒▒▒▒▒▒▒▒▓█████████████████████████████████████████████████████████
███████████████████████▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒██████████████████████████████████████████████████████████████
███████████████████████████▓▒▒▒▒▒▒▓█████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████████████████████████
████████████████████████████████████████████████████████████████████████████████████████████████████

View File

@@ -2,7 +2,10 @@
Base CLI class and utilities for NimbusFlow CLI.
"""
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
from backend.db.connection import DatabaseConnection
from backend.repositories import (
MemberRepository,
@@ -14,6 +17,19 @@ from backend.repositories import (
)
from backend.services.scheduling_service import SchedulingService
# Import Colors from interactive module for consistent styling
try:
from .interactive import Colors
except ImportError:
# Fallback colors if interactive module not available
class Colors:
RESET = '\033[0m'
SUCCESS = '\033[1m\033[92m'
WARNING = '\033[1m\033[93m'
ERROR = '\033[1m\033[91m'
CYAN = '\033[96m'
DIM = '\033[2m'
class CLIError(Exception):
"""Custom exception for CLI-specific errors."""
@@ -21,17 +37,105 @@ class CLIError(Exception):
class NimbusFlowCLI:
"""Main CLI application class."""
"""Main CLI application class with database versioning."""
def __init__(self, db_path: str = "database6_accepts_and_declines.db"):
"""Initialize CLI with database connection."""
self.db_path = Path(__file__).parent.parent / db_path
if not self.db_path.exists():
raise CLIError(f"Database not found: {self.db_path}")
def __init__(self, db_path: str = "database.db", create_version: bool = True):
"""Initialize CLI with database connection, always using most recent version."""
self.db_dir = Path(__file__).parent.parent / "db" / "sqlite"
self.base_db_path = self.db_dir / db_path
# Always find and use the most recent database version
self.db_path = self._get_most_recent_database()
if create_version:
# Create a new version based on the most recent one
self.db_path = self._create_versioned_database()
self.db = DatabaseConnection(self.db_path)
self._init_repositories()
def _get_most_recent_database(self) -> Path:
"""Get the most recent database version, or base database if no versions exist."""
versions = self.list_database_versions()
if versions:
# Return the most recent versioned database
most_recent = versions[0] # Already sorted newest first
return most_recent
else:
# No versions exist, use base database
if not self.base_db_path.exists():
raise CLIError(f"Base database not found: {self.base_db_path}")
return self.base_db_path
def _create_versioned_database(self) -> Path:
"""Create a versioned copy from the most recent database."""
source_db = self.db_path # Use the most recent database as source
# Generate timestamp-based version
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
version = self._get_next_version_number()
versioned_name = f"database_v{version}_{timestamp}.db"
versioned_path = self.db_dir / versioned_name
# Copy the most recent database to create the versioned copy
shutil.copy2(source_db, versioned_path)
print(f"{Colors.SUCCESS}Created versioned database:{Colors.RESET} {Colors.CYAN}{versioned_name}{Colors.RESET}")
print(f"{Colors.DIM}Based on: {source_db.name}{Colors.RESET}")
return versioned_path
def _get_next_version_number(self) -> int:
"""Get the next version number by checking existing versioned databases."""
version_pattern = "database_v*_*.db"
existing_versions = list(self.db_dir.glob(version_pattern))
if not existing_versions:
return 1
# Extract version numbers from existing files
versions = []
for db_file in existing_versions:
try:
# Parse version from filename like "database_v123_20250828_143022.db"
parts = db_file.stem.split('_')
if len(parts) >= 2 and parts[1].startswith('v'):
version_num = int(parts[1][1:]) # Remove 'v' prefix
versions.append(version_num)
except (ValueError, IndexError):
continue
return max(versions) + 1 if versions else 1
def list_database_versions(self) -> list[Path]:
"""List all versioned databases in chronological order."""
version_pattern = "database_v*_*.db"
versioned_dbs = list(self.db_dir.glob(version_pattern))
# Sort by modification time (newest first)
return sorted(versioned_dbs, key=lambda x: x.stat().st_mtime, reverse=True)
def cleanup_old_versions(self, keep_latest: int = 5) -> int:
"""Clean up old database versions, keeping only the latest N versions."""
versions = self.list_database_versions()
if len(versions) <= keep_latest:
return 0
versions_to_delete = versions[keep_latest:]
deleted_count = 0
for db_path in versions_to_delete:
try:
db_path.unlink()
deleted_count += 1
print(f"{Colors.DIM}Deleted old version: {db_path.name}{Colors.RESET}")
except OSError as e:
print(f"{Colors.WARNING}⚠️ Could not delete {db_path.name}: {e}{Colors.RESET}")
return deleted_count
def _init_repositories(self):
"""Initialize all repository instances."""
self.member_repo = MemberRepository(self.db)

View File

@@ -5,7 +5,7 @@ CLI command modules.
from .members import cmd_members_list, cmd_members_show, setup_members_parser
from .schedules import (
cmd_schedules_list, cmd_schedules_show, cmd_schedules_accept,
cmd_schedules_decline, cmd_schedules_schedule, setup_schedules_parser
cmd_schedules_decline, cmd_schedules_remove, cmd_schedules_schedule, setup_schedules_parser
)
from .services import cmd_services_list, setup_services_parser
@@ -14,7 +14,7 @@ __all__ = [
"cmd_members_list", "cmd_members_show", "setup_members_parser",
# Schedule commands
"cmd_schedules_list", "cmd_schedules_show", "cmd_schedules_accept",
"cmd_schedules_decline", "cmd_schedules_schedule", "setup_schedules_parser",
"cmd_schedules_decline", "cmd_schedules_remove", "cmd_schedules_schedule", "setup_schedules_parser",
# Service commands
"cmd_services_list", "setup_services_parser",
]

View File

@@ -21,6 +21,17 @@ def cmd_schedules_list(cli: "NimbusFlowCLI", args) -> None:
if args.service_id:
schedules = [s for s in schedules if s.ServiceId == args.service_id]
if hasattr(args, 'date') and args.date:
# Filter schedules by date - find services for that date first
try:
from datetime import date as date_type
target_date = date_type.fromisoformat(args.date)
services_on_date = [s.ServiceId for s in cli.service_repo.list_all() if s.ServiceDate == target_date]
schedules = [s for s in schedules if s.ServiceId in services_on_date]
except ValueError:
print(f"{TableColors.ERROR}Invalid date format '{args.date}'. Use YYYY-MM-DD{TableColors.RESET}")
return
if args.status:
try:
status_enum = ScheduleStatus.from_raw(args.status.lower())
@@ -128,15 +139,18 @@ def cmd_schedules_accept(cli: "NimbusFlowCLI", args) -> None:
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n📅 Services available on {args.date}:")
print("-" * 40)
print(f"\n{TableColors.HEADER}Services available on {args.date}{TableColors.RESET}")
print(f"{TableColors.BORDER}" * 50 + f"{TableColors.RESET}")
print()
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f"{i}. {type_name} (Service ID: {service.ServiceId})")
print(f" {TableColors.CYAN}{i}.{TableColors.RESET} {type_name}")
print()
# Let user select service
try:
choice = input(f"\nSelect service (1-{len(services_on_date)}): ").strip()
print(f"\n{TableColors.INPUT_BOX}┌─ Select service (1-{len(services_on_date)}) ─┐{TableColors.RESET}")
choice = input(f"{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
@@ -167,72 +181,321 @@ def cmd_schedules_accept(cli: "NimbusFlowCLI", args) -> None:
member_map = {m.MemberId: m for m in cli.member_repo.list_all()}
# Show available members to accept
print(f"\n👥 Members scheduled for {service_type_map.get(selected_service.ServiceTypeId, 'Unknown')} on {args.date}:")
print("-" * 60)
print(f"\n{TableColors.HEADER}Members scheduled for {service_type_map.get(selected_service.ServiceTypeId, 'Unknown')} on {args.date}:{TableColors.RESET}")
print(f"{TableColors.BORDER}{'' * 70}{TableColors.RESET}")
for i, schedule in enumerate(pending_schedules, 1):
member = member_map.get(schedule.MemberId)
if member:
print(f"{i}. {member.FirstName} {member.LastName} (Schedule ID: {schedule.ScheduleId})")
print(f"{TableColors.CYAN}{i:2d}.{TableColors.RESET} {TableColors.BOLD}{member.FirstName} {member.LastName}{TableColors.RESET} {TableColors.DIM}(Schedule ID: {schedule.ScheduleId}){TableColors.RESET}")
else:
print(f"{i}. Unknown Member (ID: {schedule.MemberId}) (Schedule ID: {schedule.ScheduleId})")
print(f"{TableColors.CYAN}{i:2d}.{TableColors.RESET} {TableColors.DIM}Unknown Member (ID: {schedule.MemberId}) (Schedule ID: {schedule.ScheduleId}){TableColors.RESET}")
# Let user select multiple members
print(f"\n{TableColors.SUCCESS}Multiple Selection Options:{TableColors.RESET}")
print(f"{TableColors.CYAN}Single:{TableColors.RESET} Enter a number (e.g., {TableColors.YELLOW}3{TableColors.RESET})")
print(f"{TableColors.CYAN}Multiple:{TableColors.RESET} Enter numbers separated by commas (e.g., {TableColors.YELLOW}1,3,5{TableColors.RESET})")
print(f"{TableColors.CYAN}Range:{TableColors.RESET} Enter a range (e.g., {TableColors.YELLOW}1-4{TableColors.RESET})")
print(f"{TableColors.CYAN}All:{TableColors.RESET} Enter {TableColors.YELLOW}all{TableColors.RESET} to select everyone")
# Let user select member
try:
choice = input(f"\nSelect member to accept (1-{len(pending_schedules)}): ").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
choice = input(f"\n{TableColors.INPUT_BOX}┌─ Select members to accept ─┐{TableColors.RESET}\n{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice:
print(f"{TableColors.ERROR}❌ No selection made{TableColors.RESET}")
return
member_index = int(choice) - 1
if member_index < 0 or member_index >= len(pending_schedules):
print("❌ Invalid selection")
selected_indices = []
if choice.lower() == 'all':
selected_indices = list(range(len(pending_schedules)))
elif '-' in choice and ',' not in choice:
# Handle range (e.g., "1-4")
try:
start, end = choice.split('-')
start_idx = int(start.strip()) - 1
end_idx = int(end.strip()) - 1
if start_idx < 0 or end_idx >= len(pending_schedules) or start_idx > end_idx:
print(f"{TableColors.ERROR}❌ Invalid range{TableColors.RESET}")
return
selected_indices = list(range(start_idx, end_idx + 1))
except (ValueError, IndexError):
print(f"{TableColors.ERROR}❌ Invalid range format{TableColors.RESET}")
return
else:
# Handle single number or comma-separated list
try:
numbers = [int(x.strip()) for x in choice.split(',')]
for num in numbers:
idx = num - 1
if idx < 0 or idx >= len(pending_schedules):
print(f"{TableColors.ERROR}❌ Invalid selection: {num}{TableColors.RESET}")
return
if idx not in selected_indices:
selected_indices.append(idx)
except ValueError:
print(f"{TableColors.ERROR}❌ Invalid input format{TableColors.RESET}")
return
if not selected_indices:
print(f"{TableColors.ERROR}❌ No valid selections{TableColors.RESET}")
return
selected_schedule = pending_schedules[member_index]
schedules_to_accept = [pending_schedules[i] for i in selected_indices]
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
print(f"\n{TableColors.WARNING}🛑 Cancelled{TableColors.RESET}")
return
# Accept the selected schedule
schedule_to_accept = selected_schedule
# Direct mode with schedule ID
# Direct mode with schedule ID (single schedule)
elif hasattr(args, 'schedule_id') and args.schedule_id:
schedule_to_accept = cli.schedule_repo.get_by_id(args.schedule_id)
if not schedule_to_accept:
print(f"❌ Schedule with ID {args.schedule_id} not found")
print(f"{TableColors.ERROR}❌ Schedule with ID {args.schedule_id} not found{TableColors.RESET}")
return
schedules_to_accept = [schedule_to_accept]
else:
print("❌ Either --date or schedule_id must be provided")
print(f"{TableColors.ERROR}❌ Either --date or schedule_id must be provided{TableColors.RESET}")
return
# Common validation and acceptance logic
if schedule_to_accept.Status == ScheduleStatus.ACCEPTED.value:
print(f"⚠️ Schedule {schedule_to_accept.ScheduleId} is already accepted")
# Process multiple schedules
successful_accepts = []
failed_accepts = []
print(f"\n{TableColors.SUCCESS}Processing {len(schedules_to_accept)} schedule(s)...{TableColors.RESET}")
for schedule in schedules_to_accept:
# Validation for each schedule
if schedule.Status == ScheduleStatus.ACCEPTED.value:
failed_accepts.append((schedule, f"already accepted"))
continue
if schedule.Status == ScheduleStatus.DECLINED.value:
failed_accepts.append((schedule, f"previously declined"))
continue
# Get member and service info for display
member = cli.member_repo.get_by_id(schedule.MemberId)
service = cli.service_repo.get_by_id(schedule.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
try:
# Mark the schedule as accepted
cli.schedule_repo.mark_accepted(schedule.ScheduleId)
# Update member's acceptance timestamp
cli.member_repo.set_last_accepted(schedule.MemberId)
successful_accepts.append((schedule, member, service, service_type))
except Exception as e:
failed_accepts.append((schedule, f"database error: {e}"))
# Display results
if successful_accepts:
print(f"\n{TableColors.SUCCESS}✅ Successfully accepted {len(successful_accepts)} schedule(s):{TableColors.RESET}")
for schedule, member, service, service_type in successful_accepts:
member_name = f"{member.FirstName} {member.LastName}" if member else f"Member ID {schedule.MemberId}"
service_info = f"{service_type.TypeName} on {service.ServiceDate}" if service and service_type else f"Service ID {schedule.ServiceId}"
print(f" {TableColors.CYAN}{TableColors.RESET} {TableColors.BOLD}{member_name}{TableColors.RESET} - {service_info}")
if failed_accepts:
print(f"\n{TableColors.WARNING}⚠️ Could not accept {len(failed_accepts)} schedule(s):{TableColors.RESET}")
for schedule, reason in failed_accepts:
member = cli.member_repo.get_by_id(schedule.MemberId)
member_name = f"{member.FirstName} {member.LastName}" if member else f"Member ID {schedule.MemberId}"
print(f" {TableColors.YELLOW}{TableColors.RESET} {TableColors.BOLD}{member_name}{TableColors.RESET} - {reason}")
if not successful_accepts and not failed_accepts:
print(f"{TableColors.DIM}No schedules processed{TableColors.RESET}")
def cmd_schedules_remove(cli: "NimbusFlowCLI", args) -> None:
"""Remove scheduled members and move them back to the front of the queue."""
# Interactive mode with date parameter
if hasattr(args, 'date') and args.date:
try:
target_date = date.fromisoformat(args.date)
except ValueError:
print(f"{TableColors.ERROR}❌ Invalid date format '{args.date}'. Use YYYY-MM-DD{TableColors.RESET}")
return
# Find services for the specified date
all_services = cli.service_repo.list_all()
services_on_date = [s for s in all_services if s.ServiceDate == target_date]
if not services_on_date:
print(f"{TableColors.ERROR}❌ No services found for {args.date}{TableColors.RESET}")
return
# Get service types for display
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n{TableColors.HEADER}Services available on {args.date}:{TableColors.RESET}")
print(f"{TableColors.BORDER}{'' * 40}{TableColors.RESET}")
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f"{TableColors.CYAN}{i}.{TableColors.RESET} {type_name} {TableColors.DIM}(Service ID: {service.ServiceId}){TableColors.RESET}")
# Let user select service
try:
choice = input(f"\n{TableColors.INPUT_BOX}┌─ Select service (1-{len(services_on_date)}) ─┐{TableColors.RESET}\n{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice or not choice.isdigit():
print(f"{TableColors.ERROR}❌ Invalid selection{TableColors.RESET}")
return
service_index = int(choice) - 1
if service_index < 0 or service_index >= len(services_on_date):
print(f"{TableColors.ERROR}❌ Invalid selection{TableColors.RESET}")
return
selected_service = services_on_date[service_index]
except (KeyboardInterrupt, EOFError):
print(f"\n{TableColors.WARNING}🛑 Cancelled{TableColors.RESET}")
return
# Find all schedules for this service (not just pending)
all_schedules = cli.schedule_repo.list_all()
service_schedules = [
s for s in all_schedules
if s.ServiceId == selected_service.ServiceId
]
if not service_schedules:
service_type_name = service_type_map.get(selected_service.ServiceTypeId, "Unknown")
print(f"{TableColors.WARNING}⚠️ No schedules found for {service_type_name} on {args.date}{TableColors.RESET}")
return
# Get member info for display
member_map = {m.MemberId: m for m in cli.member_repo.list_all()}
# Show available members to remove
print(f"\n{TableColors.HEADER}Members scheduled for {service_type_map.get(selected_service.ServiceTypeId, 'Unknown')} on {args.date}:{TableColors.RESET}")
print(f"{TableColors.BORDER}{'' * 70}{TableColors.RESET}")
for i, schedule in enumerate(service_schedules, 1):
member = member_map.get(schedule.MemberId)
status_color = TableColors.SUCCESS if schedule.Status == "accepted" else TableColors.WARNING if schedule.Status == "pending" else TableColors.ERROR
status_display = f"{status_color}{schedule.Status.upper()}{TableColors.RESET}"
if member:
print(f"{TableColors.CYAN}{i:2d}.{TableColors.RESET} {TableColors.BOLD}{member.FirstName} {member.LastName}{TableColors.RESET} {TableColors.DIM}({status_display}, ID: {schedule.ScheduleId}){TableColors.RESET}")
else:
print(f"{TableColors.CYAN}{i:2d}.{TableColors.RESET} {TableColors.DIM}Unknown Member (ID: {schedule.MemberId}) ({status_display}, ID: {schedule.ScheduleId}){TableColors.RESET}")
# Let user select multiple members to remove
print(f"\n{TableColors.SUCCESS}Multiple Selection Options:{TableColors.RESET}")
print(f"{TableColors.CYAN}Single:{TableColors.RESET} Enter a number (e.g., {TableColors.YELLOW}3{TableColors.RESET})")
print(f"{TableColors.CYAN}Multiple:{TableColors.RESET} Enter numbers separated by commas (e.g., {TableColors.YELLOW}1,3,5{TableColors.RESET})")
print(f"{TableColors.CYAN}Range:{TableColors.RESET} Enter a range (e.g., {TableColors.YELLOW}1-4{TableColors.RESET})")
print(f"{TableColors.CYAN}All:{TableColors.RESET} Enter {TableColors.YELLOW}all{TableColors.RESET} to remove everyone")
try:
choice = input(f"\n{TableColors.INPUT_BOX}┌─ Select members to remove ─┐{TableColors.RESET}\n{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice:
print(f"{TableColors.ERROR}❌ No selection made{TableColors.RESET}")
return
selected_indices = []
if choice.lower() == 'all':
selected_indices = list(range(len(service_schedules)))
elif '-' in choice and ',' not in choice:
# Handle range (e.g., "1-4")
try:
start, end = choice.split('-')
start_idx = int(start.strip()) - 1
end_idx = int(end.strip()) - 1
if start_idx < 0 or end_idx >= len(service_schedules) or start_idx > end_idx:
print(f"{TableColors.ERROR}❌ Invalid range{TableColors.RESET}")
return
selected_indices = list(range(start_idx, end_idx + 1))
except (ValueError, IndexError):
print(f"{TableColors.ERROR}❌ Invalid range format{TableColors.RESET}")
return
else:
# Handle single number or comma-separated list
try:
numbers = [int(x.strip()) for x in choice.split(',')]
for num in numbers:
idx = num - 1
if idx < 0 or idx >= len(service_schedules):
print(f"{TableColors.ERROR}❌ Invalid selection: {num}{TableColors.RESET}")
return
if idx not in selected_indices:
selected_indices.append(idx)
except ValueError:
print(f"{TableColors.ERROR}❌ Invalid input format{TableColors.RESET}")
return
if not selected_indices:
print(f"{TableColors.ERROR}❌ No valid selections{TableColors.RESET}")
return
schedules_to_remove = [service_schedules[i] for i in selected_indices]
except (KeyboardInterrupt, EOFError):
print(f"\n{TableColors.WARNING}🛑 Cancelled{TableColors.RESET}")
return
# Direct mode with schedule ID (single schedule)
elif hasattr(args, 'schedule_id') and args.schedule_id:
schedule_to_remove = cli.schedule_repo.get_by_id(args.schedule_id)
if not schedule_to_remove:
print(f"{TableColors.ERROR}❌ Schedule with ID {args.schedule_id} not found{TableColors.RESET}")
return
schedules_to_remove = [schedule_to_remove]
else:
print(f"{TableColors.ERROR}❌ Either --date or schedule_id must be provided{TableColors.RESET}")
return
if schedule_to_accept.Status == ScheduleStatus.DECLINED.value:
print(f"⚠️ Schedule {schedule_to_accept.ScheduleId} was previously declined")
return
# Process schedule removals
successful_removals = []
failed_removals = []
# Get member and service info for display
member = cli.member_repo.get_by_id(schedule_to_accept.MemberId)
service = cli.service_repo.get_by_id(schedule_to_accept.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
print(f"\n{TableColors.WARNING}Processing {len(schedules_to_remove)} schedule removal(s)...{TableColors.RESET}")
# Mark the schedule as accepted
cli.schedule_repo.mark_accepted(schedule_to_accept.ScheduleId)
for schedule in schedules_to_remove:
# Get member and service info for display
member = cli.member_repo.get_by_id(schedule.MemberId)
service = cli.service_repo.get_by_id(schedule.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
try:
# Delete the schedule
was_deleted = cli.schedule_repo.delete_schedule(schedule.ScheduleId)
if was_deleted:
# Move member back to front of round robin queue
cli.member_repo.reset_to_queue_front(schedule.MemberId)
successful_removals.append((schedule, member, service, service_type))
else:
failed_removals.append((schedule, "schedule not found"))
except Exception as e:
failed_removals.append((schedule, f"database error: {e}"))
# Update member's acceptance timestamp
cli.member_repo.set_last_accepted(schedule_to_accept.MemberId)
# Display results
if successful_removals:
print(f"\n{TableColors.SUCCESS}✅ Successfully removed {len(successful_removals)} schedule(s):{TableColors.RESET}")
print(f"{TableColors.SUCCESS} Members moved to front of round robin queue:{TableColors.RESET}")
for schedule, member, service, service_type in successful_removals:
member_name = f"{member.FirstName} {member.LastName}" if member else f"Member ID {schedule.MemberId}"
service_info = f"{service_type.TypeName} on {service.ServiceDate}" if service and service_type else f"Service ID {schedule.ServiceId}"
print(f" {TableColors.CYAN}{TableColors.RESET} {TableColors.BOLD}{member_name}{TableColors.RESET} - {service_info}")
print(f"✅ Schedule {schedule_to_accept.ScheduleId} accepted successfully!")
if member and service and service_type:
print(f" Member: {member.FirstName} {member.LastName}")
print(f" Service: {service_type.TypeName} on {service.ServiceDate}")
if failed_removals:
print(f"\n{TableColors.ERROR}❌ Could not remove {len(failed_removals)} schedule(s):{TableColors.RESET}")
for schedule, reason in failed_removals:
member = cli.member_repo.get_by_id(schedule.MemberId)
member_name = f"{member.FirstName} {member.LastName}" if member else f"Member ID {schedule.MemberId}"
print(f" {TableColors.YELLOW}{TableColors.RESET} {TableColors.BOLD}{member_name}{TableColors.RESET} - {reason}")
if not successful_removals and not failed_removals:
print(f"{TableColors.DIM}No schedules processed{TableColors.RESET}")
def cmd_schedules_decline(cli: "NimbusFlowCLI", args) -> None:
@@ -257,15 +520,18 @@ def cmd_schedules_decline(cli: "NimbusFlowCLI", args) -> None:
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n📅 Services available on {args.date}:")
print("-" * 40)
print(f"\n{TableColors.HEADER}Services available on {args.date}{TableColors.RESET}")
print(f"{TableColors.BORDER}" * 50 + f"{TableColors.RESET}")
print()
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f"{i}. {type_name} (Service ID: {service.ServiceId})")
print(f" {TableColors.CYAN}{i}.{TableColors.RESET} {type_name}")
print()
# Let user select service
try:
choice = input(f"\nSelect service (1-{len(services_on_date)}): ").strip()
print(f"\n{TableColors.INPUT_BOX}┌─ Select service (1-{len(services_on_date)}) ─┐{TableColors.RESET}")
choice = input(f"{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
@@ -405,15 +671,18 @@ def cmd_schedules_schedule(cli: "NimbusFlowCLI", args) -> None:
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n📅 Services available on {args.date}:")
print("-" * 50)
print(f"\n{TableColors.HEADER}Services available on {args.date}{TableColors.RESET}")
print(f"{TableColors.BORDER}" * 50 + f"{TableColors.RESET}")
print()
for i, svc in enumerate(services_on_date, 1):
type_name = service_type_map.get(svc.ServiceTypeId, "Unknown")
print(f"{i}. {type_name} (Service ID: {svc.ServiceId})")
print(f" {TableColors.CYAN}{i}.{TableColors.RESET} {type_name}")
print()
# Let user select service
try:
choice = input(f"\nSelect service (1-{len(services_on_date)}): ").strip()
print(f"\n{TableColors.INPUT_BOX}┌─ Select service (1-{len(services_on_date)}) ─┐{TableColors.RESET}")
choice = input(f"{TableColors.INPUT_BOX}└─> {TableColors.RESET}").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
@@ -577,6 +846,11 @@ def setup_schedules_parser(subparsers) -> None:
schedules_decline_parser.add_argument("--date", type=str, help="Interactive mode: select service and member by date (YYYY-MM-DD)")
schedules_decline_parser.add_argument("--reason", type=str, help="Reason for declining")
# schedules remove
schedules_remove_parser = schedules_subparsers.add_parser("remove", help="Remove scheduled members and move them to front of queue")
schedules_remove_parser.add_argument("schedule_id", type=int, nargs="?", help="Schedule ID to remove (optional if using --date)")
schedules_remove_parser.add_argument("--date", type=str, help="Interactive mode: select service and members by date (YYYY-MM-DD)")
# schedules schedule
schedules_schedule_parser = schedules_subparsers.add_parser("schedule", help="Schedule next member for a service (cycles through eligible members)")
schedules_schedule_parser.add_argument("service_id", type=int, nargs="?", help="Service ID to schedule for (optional if using --date)")

View File

@@ -3,6 +3,7 @@ Interactive CLI interface for NimbusFlow.
"""
import sys
import time
from pathlib import Path
from typing import TYPE_CHECKING
@@ -11,7 +12,7 @@ if TYPE_CHECKING:
from .commands import (
cmd_members_list, cmd_members_show,
cmd_schedules_list, cmd_schedules_show, cmd_schedules_accept, cmd_schedules_decline, cmd_schedules_schedule,
cmd_schedules_list, cmd_schedules_show, cmd_schedules_accept, cmd_schedules_decline, cmd_schedules_remove, cmd_schedules_schedule,
cmd_services_list,
)
@@ -35,6 +36,13 @@ class Colors:
ERROR = '\033[1m\033[91m' # Bold Red
WARNING = '\033[1m\033[93m' # Bold Yellow
INPUT_BOX = '\033[90m' # Grey
# Gold shimmer colors
GOLD_DARK = '\033[38;5;130m' # Dark gold
GOLD_MEDIUM = '\033[38;5;178m' # Medium gold
GOLD_BRIGHT = '\033[38;5;220m' # Bright gold
GOLD_SHINE = '\033[1m\033[38;5;226m' # Bright shining gold
GOLD_WHITE = '\033[1m\033[97m' # Bright white for peak shine
def create_input_box(prompt: str, width: int = 60) -> str:
@@ -61,27 +69,77 @@ def clear_screen():
print("\033[2J\033[H")
def get_shimmer_color(position: int, shimmer_center: int, shimmer_width: int = 8) -> str:
"""Get the appropriate shimmer color based on distance from shimmer center."""
distance = abs(position - shimmer_center)
if distance == 0:
return Colors.GOLD_WHITE
elif distance == 1:
return Colors.GOLD_SHINE
elif distance <= 3:
return Colors.GOLD_BRIGHT
elif distance <= 5:
return Colors.GOLD_MEDIUM
elif distance <= shimmer_width:
return Colors.GOLD_DARK
else:
return Colors.GOLD_DARK
def animate_nimbusflow_text() -> None:
"""Animate the NimbusFlow ASCII text and frame with a gold shimmer effect."""
# Complete welcome screen lines including borders
welcome_lines = [
"╔════════════════════════════════════════════════════════════════════════════════════════════╗",
"║ ║",
"║ ███╗ ██╗██╗███╗ ███╗██████╗ ██╗ ██╗███████╗ ███████╗██╗ ██████╗ ██╗ ██╗ ║",
"║ ████╗ ██║██║████╗ ████║██╔══██╗██║ ██║██╔════╝ ██╔════╝██║ ██╔═══██╗██║ ██║ ║",
"║ ██╔██╗ ██║██║██╔████╔██║██████╔╝██║ ██║███████╗ █████╗ ██║ ██║ ██║██║ █╗ ██║ ║",
"║ ██║╚██╗██║██║██║╚██╔╝██║██╔══██╗██║ ██║╚════██║ ██╔══╝ ██║ ██║ ██║██║███╗██║ ║",
"║ ██║ ╚████║██║██║ ╚═╝ ██║██████╔╝╚██████╔╝███████║ ██║ ███████╗╚██████╔╝╚███╔███╔╝ ║",
"║ ╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ ║",
"║ ║",
"╚════════════════════════════════════════════════════════════════════════════════════════════╝"
]
# Calculate max width for animation
max_width = max(len(line) for line in welcome_lines)
# Animation parameters
shimmer_width = 12
total_steps = max_width + shimmer_width * 2
step_delay = 0.025 # Seconds between frames (even faster animation)
# Animate the shimmer effect
for step in range(total_steps):
shimmer_center = step - shimmer_width
# Move cursor up to overwrite previous frame (10 lines total)
if step > 0:
print(f"\033[{len(welcome_lines)}A", end="")
for line in welcome_lines:
for i, char in enumerate(line):
if char.isspace():
print(char, end="")
else:
color = get_shimmer_color(i, shimmer_center, shimmer_width)
print(f"{color}{char}{Colors.RESET}", end="")
print() # New line after each row
# Add a small delay for animation
time.sleep(step_delay)
def display_welcome():
"""Display welcome screen."""
"""Display welcome screen with animated shimmer effect."""
print("\033[2J\033[H") # Clear screen and move cursor to top
# NimbusFlow branding
welcome_text = """
╔════════════════════════════════════════════════════════════════════════════════════════════╗
║ ║
║ ███╗ ██╗██╗███╗ ███╗██████╗ ██╗ ██╗███████╗ ███████╗██╗ ██████╗ ██╗ ██╗ ║
║ ████╗ ██║██║████╗ ████║██╔══██╗██║ ██║██╔════╝ ██╔════╝██║ ██╔═══██╗██║ ██║ ║
║ ██╔██╗ ██║██║██╔████╔██║██████╔╝██║ ██║███████╗ █████╗ ██║ ██║ ██║██║ █╗ ██║ ║
║ ██║╚██╗██║██║██║╚██╔╝██║██╔══██╗██║ ██║╚════██║ ██╔══╝ ██║ ██║ ██║██║███╗██║ ║
║ ██║ ╚████║██║██║ ╚═╝ ██║██████╔╝╚██████╔╝███████║ ██║ ███████╗╚██████╔╝╚███╔███╔╝ ║
║ ╚═╝ ╚═══╝╚═╝╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚══════╝ ╚═╝ ╚══════╝ ╚═════╝ ╚══╝╚══╝ ║
║ ║
║ 🎵 Scheduling System 🎵 ║
╚════════════════════════════════════════════════════════════════════════════════════════════╝
"""
print(welcome_text)
print() # Add some top padding
animate_nimbusflow_text()
print()
@@ -115,15 +173,12 @@ def display_schedules_menu():
print(f"\n{Colors.HEADER}Schedules{Colors.RESET}")
print(f"{Colors.GREY}" * 50 + f"{Colors.RESET}")
print()
print(f" {Colors.CYAN}1.{Colors.RESET} List all schedules")
print(f" {Colors.CYAN}2.{Colors.RESET} List pending schedules")
print(f" {Colors.CYAN}3.{Colors.RESET} List accepted schedules")
print(f" {Colors.CYAN}4.{Colors.RESET} List declined schedules")
print(f" {Colors.CYAN}5.{Colors.RESET} Show schedule details")
print(f" {Colors.CYAN}6.{Colors.RESET} {Colors.GREEN}Accept a schedule{Colors.RESET}")
print(f" {Colors.CYAN}7.{Colors.RESET} {Colors.RED}Decline a schedule{Colors.RESET}")
print(f" {Colors.CYAN}8.{Colors.RESET} {Colors.YELLOW}Schedule next member for service{Colors.RESET}")
print(f" {Colors.CYAN}9.{Colors.RESET} {Colors.DIM}Back to main menu{Colors.RESET}")
print(f" {Colors.CYAN}1.{Colors.RESET} Browse schedules")
print(f" {Colors.CYAN}2.{Colors.RESET} {Colors.GREEN}Accept a schedule{Colors.RESET}")
print(f" {Colors.CYAN}3.{Colors.RESET} {Colors.RED}Decline a schedule{Colors.RESET}")
print(f" {Colors.CYAN}4.{Colors.RESET} {Colors.ERROR}Remove scheduled members{Colors.RESET}")
print(f" {Colors.CYAN}5.{Colors.RESET} {Colors.YELLOW}Schedule next member for service{Colors.RESET}")
print(f" {Colors.CYAN}6.{Colors.RESET} {Colors.DIM}Back to main menu{Colors.RESET}")
print()
@@ -193,6 +248,25 @@ def get_date_input(prompt: str = "Enter date (YYYY-MM-DD)") -> str:
return ""
def get_date_input_optional(prompt: str = "Enter date (YYYY-MM-DD)") -> str:
"""Get optional date input from user (allows empty input)."""
while True:
try:
print(create_simple_input_box(prompt))
date_str = input(f"{Colors.INPUT_BOX}└─> {Colors.RESET}").strip()
if not date_str:
return "" # Allow empty input
# Basic date format validation
if len(date_str) == 10 and date_str.count('-') == 2:
parts = date_str.split('-')
if len(parts[0]) == 4 and len(parts[1]) == 2 and len(parts[2]) == 2:
return date_str
print(f"{Colors.ERROR}Please use format YYYY-MM-DD (e.g., 2025-09-07){Colors.RESET}")
except (KeyboardInterrupt, EOFError):
print(f"\n{Colors.WARNING}Operation cancelled{Colors.RESET}")
return ""
class MockArgs:
"""Mock args object for interactive commands."""
def __init__(self, **kwargs):
@@ -248,73 +322,105 @@ def handle_schedules_menu(cli: "NimbusFlowCLI"):
while True:
clear_screen()
display_schedules_menu()
choice = get_user_choice(9)
choice = get_user_choice(6)
if choice == 1: # List all schedules
clear_screen()
print(f"{Colors.SUCCESS}Listing all schedules...{Colors.RESET}\n")
cmd_schedules_list(cli, MockArgs(service_id=None, status=None))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 2: # List pending schedules
clear_screen()
print(f"{Colors.WARNING}Listing pending schedules...{Colors.RESET}\n")
cmd_schedules_list(cli, MockArgs(service_id=None, status="pending"))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 3: # List accepted schedules
clear_screen()
print(f"{Colors.SUCCESS}Listing accepted schedules...{Colors.RESET}\n")
cmd_schedules_list(cli, MockArgs(service_id=None, status="accepted"))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 4: # List declined schedules
clear_screen()
print(f"{Colors.ERROR}Listing declined schedules...{Colors.RESET}\n")
cmd_schedules_list(cli, MockArgs(service_id=None, status="declined"))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 5: # Show schedule details
clear_screen()
schedule_id = get_text_input("Enter schedule ID", True)
if schedule_id.isdigit():
# Get date filter
date = get_date_input_optional("Enter date to filter schedules (or press Enter to skip)")
if not date:
clear_screen()
print(f"{Colors.SUCCESS}Showing details for schedule {schedule_id}...{Colors.RESET}\n")
cmd_schedules_show(cli, MockArgs(schedule_id=int(schedule_id)))
cmd_schedules_list(cli, MockArgs(service_id=None, status=None))
else:
print(f"{Colors.ERROR}Invalid schedule ID{Colors.RESET}")
# Find services for the date
try:
from datetime import date as date_type
target_date = date_type.fromisoformat(date)
all_services = cli.service_repo.list_all()
services_on_date = [s for s in all_services if s.ServiceDate == target_date]
except ValueError:
clear_screen()
print(f"{Colors.ERROR}Invalid date format. Please use YYYY-MM-DD format.{Colors.RESET}")
services_on_date = []
if not services_on_date:
clear_screen()
print(f"{Colors.ERROR}No services found for {date}{Colors.RESET}")
else:
clear_screen()
# Show available services for selection
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
print(f"\n{Colors.HEADER}Services available on {date}{Colors.RESET}")
print(f"{Colors.GREY}" * 50 + f"{Colors.RESET}")
print()
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f" {Colors.CYAN}{i}.{Colors.RESET} {type_name}")
print()
# Get service selection
try:
print(create_simple_input_box(f"Select service (1-{len(services_on_date)}) or press Enter to show all"))
choice_input = input(f"{Colors.INPUT_BOX}└─> {Colors.RESET}").strip()
if not choice_input:
# Empty input - show all services for this date
clear_screen()
cmd_schedules_list(cli, MockArgs(service_id=None, status=None, date=date))
elif not choice_input.isdigit():
print(f"{Colors.ERROR}Invalid selection{Colors.RESET}")
else:
service_choice = int(choice_input)
if service_choice < 1 or service_choice > len(services_on_date):
print(f"{Colors.ERROR}Please enter a number between 1 and {len(services_on_date)}{Colors.RESET}")
else:
clear_screen()
selected_service = services_on_date[service_choice - 1]
cmd_schedules_list(cli, MockArgs(service_id=selected_service.ServiceId, status=None))
except (KeyboardInterrupt, EOFError):
print(f"\n{Colors.WARNING}Operation cancelled{Colors.RESET}")
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 6: # Accept schedule
elif choice == 2: # Accept schedule
clear_screen()
date = get_date_input("Enter date for interactive accept")
if date:
clear_screen()
print(f"{Colors.SUCCESS}Starting interactive accept for {date}...{Colors.RESET}\n")
cmd_schedules_accept(cli, MockArgs(date=date, schedule_id=None))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 7: # Decline schedule
elif choice == 3: # Decline schedule
clear_screen()
date = get_date_input("Enter date for interactive decline")
if date:
clear_screen()
reason = get_text_input("Enter decline reason (optional)", False)
clear_screen()
print(f"{Colors.ERROR}Starting interactive decline for {date}...{Colors.RESET}\n")
cmd_schedules_decline(cli, MockArgs(date=date, schedule_id=None, reason=reason))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 8: # Schedule next member
elif choice == 4: # Remove scheduled members
clear_screen()
date = get_date_input("Enter date to remove schedules for")
if date:
clear_screen()
cmd_schedules_remove(cli, MockArgs(date=date, schedule_id=None))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 5: # Schedule next member
clear_screen()
date = get_date_input("Enter date to schedule for")
if date:
clear_screen()
print(f"{Colors.WARNING}Starting scheduling for {date}...{Colors.RESET}\n")
cmd_schedules_schedule(cli, MockArgs(service_id=None, date=date, classifications=None))
input(f"\n{Colors.DIM}Press Enter to continue...{Colors.RESET}")
elif choice == 9: # Back to main menu
elif choice == 6: # Back to main menu
break
@@ -355,7 +461,7 @@ def run_interactive_mode(cli: "NimbusFlowCLI"):
display_welcome()
print(f"{Colors.HEADER}Welcome to the NimbusFlow Interactive CLI{Colors.RESET}")
print(f"{Colors.DIM}Navigate through menus to manage your choir scheduling system.{Colors.RESET}")
print(f"{Colors.DIM}Navigate through menus to manage your scheduling system.{Colors.RESET}")
print()
input(f"{Colors.DIM}Press Enter to continue...{Colors.RESET}")

View File

@@ -11,7 +11,7 @@ from .commands import (
cmd_members_list, cmd_members_show, setup_members_parser,
# Schedule commands
cmd_schedules_list, cmd_schedules_show, cmd_schedules_accept,
cmd_schedules_decline, cmd_schedules_schedule, setup_schedules_parser,
cmd_schedules_decline, cmd_schedules_remove, cmd_schedules_schedule, setup_schedules_parser,
# Service commands
cmd_services_list, setup_services_parser,
)
@@ -44,16 +44,30 @@ def main():
if not args.command:
# Launch interactive mode when no command is provided
try:
cli = NimbusFlowCLI()
cli = NimbusFlowCLI(create_version=True) # Always create versioned DB for interactive mode
# Show versioning info with colors
from .base import Colors
versions = cli.list_database_versions()
if len(versions) > 1:
print(f"{Colors.CYAN}Database versions available:{Colors.RESET} {Colors.SUCCESS}{len(versions)}{Colors.RESET}")
print(f"{Colors.CYAN}Using:{Colors.RESET} {Colors.CYAN}{cli.db_path.name}{Colors.RESET}")
# Auto-cleanup if too many versions
if len(versions) > 10:
deleted = cli.cleanup_old_versions(keep_latest=5)
if deleted > 0:
print(f"{Colors.WARNING}Cleaned up {deleted} old database versions{Colors.RESET}")
run_interactive_mode(cli)
except CLIError as e:
print(f"❌ Error: {e}")
print(f"{Colors.ERROR}❌ Error: {e}{Colors.RESET}")
return 1
except KeyboardInterrupt:
print("\n🛑 Interrupted by user")
print(f"\n{Colors.WARNING}🛑 Interrupted by user{Colors.RESET}")
return 1
except Exception as e:
print(f"❌ Unexpected error: {e}")
print(f"{Colors.ERROR}❌ Unexpected error: {e}{Colors.RESET}")
return 1
finally:
if 'cli' in locals():
@@ -61,7 +75,11 @@ def main():
return
try:
cli = NimbusFlowCLI()
cli = NimbusFlowCLI(create_version=False) # Don't version for regular CLI commands
# Show which database is being used for regular commands
from .base import Colors
print(f"{Colors.CYAN}Using database:{Colors.RESET} {Colors.CYAN}{cli.db_path.name}{Colors.RESET}")
# Route commands to their respective handlers
if args.command == "members":
@@ -70,7 +88,7 @@ def main():
elif args.members_action == "show":
cmd_members_show(cli, args)
else:
print("❌ Unknown members action. Use 'list' or 'show'")
print(f"{Colors.ERROR}❌ Unknown members action. Use 'list' or 'show'{Colors.RESET}")
elif args.command == "schedules":
if args.schedules_action == "list":
@@ -81,28 +99,30 @@ def main():
cmd_schedules_accept(cli, args)
elif args.schedules_action == "decline":
cmd_schedules_decline(cli, args)
elif args.schedules_action == "remove":
cmd_schedules_remove(cli, args)
elif args.schedules_action == "schedule":
cmd_schedules_schedule(cli, args)
else:
print("❌ Unknown schedules action. Use 'list', 'show', 'accept', 'decline', or 'schedule'")
print(f"{Colors.ERROR}❌ Unknown schedules action. Use 'list', 'show', 'accept', 'decline', 'remove', or 'schedule'{Colors.RESET}")
elif args.command == "services":
if args.services_action == "list":
cmd_services_list(cli, args)
else:
print("❌ Unknown services action. Use 'list'")
print(f"{Colors.ERROR}❌ Unknown services action. Use 'list'{Colors.RESET}")
else:
print(f"❌ Unknown command: {args.command}")
print(f"{Colors.ERROR}❌ Unknown command: {args.command}{Colors.RESET}")
except CLIError as e:
print(f"❌ Error: {e}")
print(f"{Colors.ERROR}❌ Error: {e}{Colors.RESET}")
return 1
except KeyboardInterrupt:
print("\n🛑 Interrupted by user")
print(f"\n{Colors.WARNING}🛑 Interrupted by user{Colors.RESET}")
return 1
except Exception as e:
print(f"❌ Unexpected error: {e}")
print(f"{Colors.ERROR}❌ Unexpected error: {e}{Colors.RESET}")
return 1
finally:
if 'cli' in locals():

View File

@@ -25,6 +25,7 @@ class TableColors:
ERROR = '\033[1m\033[91m' # Bold Red
WARNING = '\033[1m\033[93m' # Bold Yellow
BORDER = '\033[90m' # Grey
INPUT_BOX = '\033[90m' # Grey (for input styling)
def format_member_row(member, classification_name: Optional[str] = None) -> str:

View File

@@ -1,524 +0,0 @@
import datetime as dt
from typing import Optional, Tuple, List
from backend.db.connection import DatabaseConnection
from backend.models import (
Classification,
Member,
ServiceType,
Service,
ServiceAvailability,
Schedule,
AcceptedLog,
DeclineLog,
ScheduledLog,
)
class Repository:
"""
Highlevel dataaccess layer.
Responsibilities
----------------
* CRUD helpers for the core tables.
* Roundrobin queue that respects:
- Members.LastAcceptedAt (fair order)
- Members.LastDeclinedAt (oneday cooloff)
* “Reservation” handling using the **Schedules** table
(pending → accepted → declined).
* Audit logging (AcceptedLog, DeclineLog, ScheduledLog).
"""
def __init__(self, db: DatabaseConnection):
self.db = db
# -----------------------------------------------------------------
# CRUD helpers they now return model objects (or IDs)
# -----------------------------------------------------------------
# -----------------------------------------------------------------
# CREATE
# -----------------------------------------------------------------
def create_classification(self, classification_name: str) -> Classification:
"""Insert a new classification and return the saved model."""
classification = Classification(
ClassificationId=-1, # placeholder will be replaced by DB
ClassificationName=classification_name,
)
# Build INSERT statement from the dataclass dict (skip PK)
data = classification.to_dict()
data.pop("ClassificationId") # AUTOINCREMENT column
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Classifications ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
classification.ClassificationId = self.db.lastrowid
return classification
def create_member(
self,
first_name: str,
last_name: str,
email: Optional[str] = None,
phone_number: Optional[str] = None,
classification_id: Optional[int] = None,
notes: Optional[str] = None,
is_active: int = 1,
) -> Member:
"""Insert a new member and return the saved model."""
member = Member(
MemberId=-1,
FirstName=first_name,
LastName=last_name,
Email=email,
PhoneNumber=phone_number,
ClassificationId=classification_id,
Notes=notes,
IsActive=is_active,
LastAcceptedAt=None,
LastDeclinedAt=None,
)
data = member.to_dict()
data.pop("MemberId") # let SQLite fill the PK
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Members ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
member.MemberId = self.db.lastrowid
return member
def create_service_type(self, type_name: str) -> ServiceType:
"""Insert a new service type."""
st = ServiceType(ServiceTypeId=-1, TypeName=type_name)
data = st.to_dict()
data.pop("ServiceTypeId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO ServiceTypes ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
st.ServiceTypeId = self.db.lastrowid
return st
def create_service(self, service_type_id: int, service_date: dt.date) -> Service:
"""Insert a new service row (date + type)."""
sv = Service(ServiceId=-1, ServiceTypeId=service_type_id, ServiceDate=service_date)
data = sv.to_dict()
data.pop("ServiceId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO Services ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
sv.ServiceId = self.db.lastrowid
return sv
def create_service_availability(self, member_id: int, service_type_id: int) -> ServiceAvailability:
"""Link a member to a service type (availability matrix)."""
sa = ServiceAvailability(
ServiceAvailabilityId=-1,
MemberId=member_id,
ServiceTypeId=service_type_id,
)
data = sa.to_dict()
data.pop("ServiceAvailabilityId")
cols = ", ".join(data.keys())
placeholders = ", ".join("?" for _ in data)
sql = f"INSERT INTO ServiceAvailability ({cols}) VALUES ({placeholders})"
self.db.execute(sql, tuple(data.values()))
sa.ServiceAvailabilityId = self.db.lastrowid
return sa
# -----------------------------------------------------------------
# READ return **lists of models**
# -----------------------------------------------------------------
def get_all_classifications(self) -> List[Classification]:
rows = self.db.fetchall("SELECT * FROM Classifications")
return [Classification.from_row(r) for r in rows]
def get_all_members(self) -> List[Member]:
rows = self.db.fetchall("SELECT * FROM Members")
return [Member.from_row(r) for r in rows]
def get_all_service_types(self) -> List[ServiceType]:
rows = self.db.fetchall("SELECT * FROM ServiceTypes")
return [ServiceType.from_row(r) for r in rows]
def get_all_services(self) -> List[Service]:
rows = self.db.fetchall("SELECT * FROM Services")
return [Service.from_row(r) for r in rows]
def get_all_service_availability(self) -> List[ServiceAvailability]:
rows = self.db.fetchall("SELECT * FROM ServiceAvailability")
return [ServiceAvailability.from_row(r) for r in rows]
# -----------------------------------------------------------------
# INTERNAL helpers used by the queue logic
# -----------------------------------------------------------------
def _lookup_classification(self, name: str) -> int:
"""Return ClassificationId for a given name; raise if missing."""
row = self.db.fetchone(
"SELECT ClassificationId FROM Classifications WHERE ClassificationName = ?",
(name,),
)
if row is None:
raise ValueError(f'Classification "{name}" does not exist')
return row["ClassificationId"]
def _ensure_service(self, service_date: dt.date) -> int:
"""
Return a ServiceId for ``service_date``.
If the row does not exist we create a generic Service row
(using the first ServiceType as a default).
"""
row = self.db.fetchone(
"SELECT ServiceId FROM Services WHERE ServiceDate = ?", (service_date,)
)
if row:
return row["ServiceId"]
default_type = self.db.fetchone(
"SELECT ServiceTypeId FROM ServiceTypes LIMIT 1"
)
if not default_type:
raise RuntimeError(
"No ServiceTypes defined cannot create a Service row"
)
self.db.execute(
"INSERT INTO Services (ServiceTypeId, ServiceDate) VALUES (?,?)",
(default_type["ServiceTypeId"], service_date),
)
return self.db.lastrowid
def has_schedule_for_service(
self,
member_id: int,
service_id: int,
status: str,
include_expired: bool = False,
) -> bool:
"""
Return True if the member has a schedule row for the given ``service_id``
with the specified ``status``.
For ``status='pending'`` the default behaviour is to ignore rows whose
``ExpiresAt`` timestamp is already in the past (they are not actionable).
Set ``include_expired=True`` if you deliberately want to see *any* pending
row regardless of its expiration.
Parameters
----------
member_id : int
The member we are inspecting.
service_id : int
The service we are interested in.
status : str
One of the schedule statuses (e.g. ``'accepted'`` or ``'pending'``).
include_expired : bool, optional
When checking for pending rows, ignore the expiration guard if set to
``True``. Defaults to ``False`` (i.e. only nonexpired pending rows
count).
Returns
-------
bool
True if a matching row exists, otherwise False.
"""
sql = """
SELECT 1
FROM Schedules
WHERE MemberId = ?
AND ServiceId = ?
AND Status = ?
"""
args = [member_id, service_id, status]
# Guard against expired pending rows unless the caller explicitly wants them.
if not include_expired and status == "pending":
sql += " AND ExpiresAt > CURRENT_TIMESTAMP"
sql += " LIMIT 1"
row = self.db.fetchone(sql, tuple(args))
return row is not None
def schedule_next_member(
self,
classification_id: int,
service_id: int,
only_active: bool = True,
) -> Optional[Tuple[int, str, str, int]]:
"""
Choose the next member for ``service_id`` while respecting ServiceAvailability.
Ordering (highlevel):
1⃣ 5day decline boost only if DeclineStreak < 2.
2⃣ Oldest LastAcceptedAt (roundrobin).
3⃣ Oldest LastScheduledAt (tiebreaker).
Skipped if any of the following is true:
• Member lacks a ServiceAvailability row for the ServiceType of ``service_id``.
• Member already has an *accepted* schedule for this service.
• Member already has a *pending* schedule for this service.
• Member already has a *declined* schedule for this service.
"""
# -----------------------------------------------------------------
# 0⃣ Resolve ServiceTypeId (and ServiceDate) from the Services table.
# -----------------------------------------------------------------
svc_row = self.db.fetchone(
"SELECT ServiceTypeId, ServiceDate FROM Services WHERE ServiceId = ?",
(service_id,),
)
if not svc_row:
# No such service nothing to schedule.
return None
service_type_id = svc_row["ServiceTypeId"]
# If you need the actual calendar date later you can use:
# service_date = dt.datetime.strptime(svc_row["ServiceDate"], "%Y-%m-%d").date()
# -----------------------------------------------------------------
# 1⃣ Pull the candidate queue, ordered per the existing rules.
# -----------------------------------------------------------------
BOOST_SECONDS = 5 * 24 * 60 * 60 # 5 days
now_iso = dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
sql = f"""
SELECT
MemberId,
FirstName,
LastName,
LastAcceptedAt,
LastScheduledAt,
LastDeclinedAt,
DeclineStreak
FROM Members
WHERE ClassificationId = ?
{"AND IsActive = 1" if only_active else ""}
ORDER BY
/* ① 5day boost (only when streak < 2) */
CASE
WHEN DeclineStreak < 2
AND LastDeclinedAt IS NOT NULL
AND julianday(?) - julianday(LastDeclinedAt) <= (? / 86400.0)
THEN 0 -- boosted to the front
ELSE 1
END,
/* ② Roundrobin: oldest acceptance first */
COALESCE(LastAcceptedAt, '1970-01-01') ASC,
/* ③ Tiebreaker: oldest offer first */
COALESCE(LastScheduledAt, '1970-01-01') ASC
"""
queue = self.db.fetchall(sql, (classification_id, now_iso, BOOST_SECONDS))
# -----------------------------------------------------------------
# 2⃣ Walk the ordered queue and apply availability + status constraints.
# -----------------------------------------------------------------
for member in queue:
member_id = member["MemberId"]
# ----- Availability check -------------------------------------------------
# Skip members that do NOT have a row in ServiceAvailability for this
# ServiceType.
avail_ok = self.db.fetchone(
"""
SELECT 1
FROM ServiceAvailability
WHERE MemberId = ?
AND ServiceTypeId = ?
LIMIT 1
""",
(member_id, service_type_id),
)
if not avail_ok:
continue # Not eligible for this service type.
# ----- Status constraints (all by service_id) ----------------------------
# a) Already *accepted* for this service?
if self.has_schedule_for_service(member_id, service_id, status="accepted"):
continue
# b) Existing *pending* reservation for this service?
if self.has_schedule_for_service(member_id, service_id, status="pending"):
continue
# c) Already *declined* this service?
if self.has_schedule_for_service(member_id, service_id, status="declined"):
continue
# -------------------------------------------------------------
# SUCCESS create a pending schedule (minimal columns).
# -------------------------------------------------------------
self.db.execute(
"""
INSERT INTO Schedules
(ServiceId, MemberId, Status)
VALUES
(?,?,?)
""",
(service_id, member_id, "pending"),
)
schedule_id = self.db.lastrowid
# -------------------------------------------------------------
# Update the member's LastScheduledAt so the roundrobin stays fair.
# -------------------------------------------------------------
self.db.execute(
"""
UPDATE Members
SET LastScheduledAt = CURRENT_TIMESTAMP
WHERE MemberId = ?
""",
(member_id,),
)
# -------------------------------------------------------------
# Audit log historic record (no ScheduleId column any more).
# -------------------------------------------------------------
self.db.execute(
"""
INSERT INTO ScheduledLog (MemberId, ServiceId)
VALUES (?,?)
""",
(member_id, service_id),
)
# -------------------------------------------------------------
# Return the useful bits to the caller.
# -------------------------------------------------------------
return (
member_id,
member["FirstName"],
member["LastName"],
schedule_id,
)
# -----------------------------------------------------------------
# No eligible member found.
# -----------------------------------------------------------------
return None
# -----------------------------------------------------------------
# ACCEPT / DECLINE workflow (operates on the schedule row)
# -----------------------------------------------------------------
def accept_schedule(self, schedule_id: int) -> None:
"""
Convert a *pending* schedule into a real assignment.
- Updates the schedule row (status → accepted, timestamp).
- Writes an entry into ``AcceptedLog``.
- Updates ``Members.LastAcceptedAt`` (advances roundrobin) and clears any cooloff.
"""
# Load the pending schedule raise if it does not exist or is not pending
sched = self.db.fetchone(
"""
SELECT ScheduleId, ServiceId, MemberId
FROM Schedules
WHERE ScheduleId = ?
AND Status = 'pending'
""",
(schedule_id,),
)
if not sched:
raise ValueError("Schedule not found or not pending")
service_id = sched["ServiceId"]
member_id = sched["MemberId"]
# 1⃣ Mark the schedule as accepted
self.db.execute(
"""
UPDATE Schedules
SET Status = 'accepted',
AcceptedAt = CURRENT_TIMESTAMP,
ExpiresAt = CURRENT_TIMESTAMP -- no longer expires
WHERE ScheduleId = ?
""",
(schedule_id,),
)
# 2⃣ Audit log
self.db.execute(
"""
INSERT INTO AcceptedLog (MemberId, ServiceId)
VALUES (?,?)
""",
(member_id, service_id),
)
# 3⃣ Advance roundrobin for the member
self.db.execute(
"""
UPDATE Members
SET LastAcceptedAt = CURRENT_TIMESTAMP,
LastDeclinedAt = NULL -- a successful accept clears any cooloff
WHERE MemberId = ?
""",
(member_id,),
)
def decline_schedule(
self, schedule_id: int, reason: Optional[str] = None
) -> None:
"""
Record that the member declined the offered slot.
Effects
-------
* Inserts a row into ``DeclineLog`` (with the service day).
* Updates ``Members.LastDeclinedAt`` this implements the oneday cooloff.
* Marks the schedule row as ``declined`` (so it can be offered to someone else).
"""
# Load the pending schedule raise if not found / not pending
sched = self.db.fetchone(
"""
SELECT ScheduleId, ServiceId, MemberId
FROM Schedules
WHERE ScheduleId = ?
AND Status = 'pending'
""",
(schedule_id,),
)
if not sched:
raise ValueError("Schedule not found or not pending")
service_id = sched["ServiceId"]
member_id = sched["MemberId"]
# Need the service *day* for the oneday cooloff
svc = self.db.fetchone(
"SELECT ServiceDate FROM Services WHERE ServiceId = ?", (service_id,)
)
if not svc:
raise RuntimeError("Service row vanished while processing decline")
service_day = svc["ServiceDate"] # stored as TEXT 'YYYYMMDD'
# 1⃣ Insert into DeclineLog
self.db.execute(
"""
INSERT INTO DeclineLog (MemberId, ServiceId, DeclineDate, Reason)
VALUES (?,?,?,?)
""",
(member_id, service_id, service_day, reason),
)
# 2⃣ Update the member's cooloff day
self.db.execute(
"""
UPDATE Members
SET LastDeclinedAt = ?
WHERE MemberId = ?
""",
(service_day, member_id),
)
# 3⃣ Mark the schedule row as declined
self.db.execute(
"""
UPDATE Schedules
SET Status = 'declined',
DeclinedAt = CURRENT_TIMESTAMP,
DeclineReason = ?
WHERE ScheduleId = ?
""",
(reason, schedule_id),
)

View File

@@ -169,7 +169,7 @@ if __name__ == "__main__":
from backend.repositories import MemberRepository, ScheduleRepository, ServiceRepository, ServiceAvailabilityRepository
from backend.services.scheduling_service import SchedulingService
DB_PATH = Path(__file__).parent / "database6_accepts_and_declines3.db"
DB_PATH = Path(__file__).parent / "db" / "sqlite" / "database.db"
# Initialise DB connection (adjust DSN as needed)
db = DatabaseConnection(DB_PATH)

View File

@@ -234,4 +234,20 @@ class MemberRepository(BaseRepository[MemberModel]):
DeclineStreak = COALESCE(DeclineStreak, 0) + 1
WHERE {self._PK} = ?
"""
self.db.execute(sql, (decline_date, member_id))
self.db.execute(sql, (decline_date, member_id))
def reset_to_queue_front(self, member_id: int) -> None:
"""
Reset member timestamps to move them to the front of the round robin queue.
This sets LastScheduledAt and LastAcceptedAt to far past values, effectively
making them the highest priority for scheduling.
"""
sql = f"""
UPDATE {self._TABLE}
SET LastScheduledAt = '1970-01-01 00:00:00',
LastAcceptedAt = '1970-01-01 00:00:00',
LastDeclinedAt = NULL,
DeclineStreak = 0
WHERE {self._PK} = ?
"""
self.db.execute(sql, (member_id,))

View File

@@ -257,8 +257,14 @@ class ScheduleRepository(BaseRepository[ScheduleModel]):
"""
rows = self.db.fetchall(sql, (service_id, ScheduleStatus.PENDING.value))
return [ScheduleModel.from_row(r) for r in rows]
def delete(self, schedule_id: int) -> None:
"""Harddelete a schedule row (use with caution)."""
def delete_schedule(self, schedule_id: int) -> bool:
"""
Delete a schedule by ID.
Returns:
bool: True if a schedule was deleted, False if not found
"""
sql = f"DELETE FROM {self._TABLE} WHERE {self._PK} = ?"
self.db.execute(sql, (schedule_id,))
cursor = self.db.execute(sql, (schedule_id,))
return cursor.rowcount > 0