Files
nimbusflow/backend/cli.py

701 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""
NimbusFlow CLI Tool
-------------------
Command-line interface for managing the scheduling system backend.
Usage (run from nimbusflow directory):
python -m backend.cli members list [--active] [--classification NAME]
python -m backend.cli members show <member_id>
python -m backend.cli schedules list [--service_id N] [--status STATUS]
python -m backend.cli schedules show <schedule_id>
python -m backend.cli schedules accept <schedule_id>
python -m backend.cli schedules accept --date YYYY-MM-DD
python -m backend.cli schedules decline <schedule_id> [--reason "reason"]
python -m backend.cli schedules decline --date YYYY-MM-DD [--reason "reason"]
python -m backend.cli services list [--date YYYY-MM-DD] [--upcoming] [--limit N]
"""
import argparse
import sys
from pathlib import Path
from typing import Optional, List, Any
from datetime import date, datetime
# Note: This CLI should be run as: python -m backend.cli
# This ensures proper module resolution without sys.path manipulation
from backend.db import DatabaseConnection
from backend.repositories import (
MemberRepository,
ClassificationRepository,
ServiceRepository,
ServiceAvailabilityRepository,
ScheduleRepository,
ServiceTypeRepository
)
from backend.models.enums import ScheduleStatus
class CLIError(Exception):
"""Custom exception for CLI-specific errors."""
pass
class NimbusFlowCLI:
"""Main CLI application class."""
def __init__(self, db_path: str = "database6_accepts_and_declines.db"):
"""Initialize CLI with database connection."""
self.db_path = Path(__file__).parent / db_path
if not self.db_path.exists():
raise CLIError(f"Database not found: {self.db_path}")
self.db = DatabaseConnection(self.db_path)
self._init_repositories()
def _init_repositories(self):
"""Initialize all repository instances."""
self.member_repo = MemberRepository(self.db)
self.classification_repo = ClassificationRepository(self.db)
self.service_repo = ServiceRepository(self.db)
self.availability_repo = ServiceAvailabilityRepository(self.db)
self.schedule_repo = ScheduleRepository(self.db)
self.service_type_repo = ServiceTypeRepository(self.db)
def close(self):
"""Clean up database connection."""
if hasattr(self, 'db'):
self.db.close()
def format_member_row(member, classification_name: Optional[str] = None) -> str:
"""Format a member for table display."""
active = "" if member.IsActive else ""
classification = classification_name or "N/A"
return f"{member.MemberId:3d} | {member.FirstName:<12} | {member.LastName:<15} | {classification:<12} | {active:^6} | {member.Email or 'N/A'}"
def format_schedule_row(schedule, member_name: str = "", service_info: str = "") -> str:
"""Format a schedule for table display."""
status_symbols = {
ScheduleStatus.PENDING: "",
ScheduleStatus.ACCEPTED: "",
ScheduleStatus.DECLINED: ""
}
status_symbol = status_symbols.get(ScheduleStatus.from_raw(schedule.Status), "")
# Handle ScheduledAt - could be datetime object or string from DB
if schedule.ScheduledAt:
if isinstance(schedule.ScheduledAt, str):
# If it's a string, try to parse and format it, or use as-is
try:
dt_obj = datetime.fromisoformat(schedule.ScheduledAt.replace('Z', '+00:00'))
scheduled_date = dt_obj.strftime("%Y-%m-%d %H:%M")
except (ValueError, AttributeError):
scheduled_date = str(schedule.ScheduledAt)
else:
# If it's already a datetime object
scheduled_date = schedule.ScheduledAt.strftime("%Y-%m-%d %H:%M")
else:
scheduled_date = "N/A"
return f"{schedule.ScheduleId:3d} | {status_symbol} {schedule.Status:<8} | {member_name:<20} | {service_info:<15} | {scheduled_date}"
def cmd_members_list(cli: NimbusFlowCLI, args) -> None:
"""List all members with optional filters."""
print("Listing members...")
# Get all classifications for lookup
classifications = cli.classification_repo.list_all()
classification_map = {c.ClassificationId: c.ClassificationName for c in classifications}
# Apply filters
if args.classification:
# Find classification ID by name
classification_id = None
for c in classifications:
if c.ClassificationName.lower() == args.classification.lower():
classification_id = c.ClassificationId
break
if classification_id is None:
print(f"❌ Classification '{args.classification}' not found")
return
members = cli.member_repo.get_by_classification_ids([classification_id])
elif args.active:
members = cli.member_repo.get_active()
else:
members = cli.member_repo.list_all()
if not members:
print("No members found.")
return
# Print header
print(f"\n{'ID':<3} | {'First Name':<12} | {'Last Name':<15} | {'Classification':<12} | {'Active':<6} | {'Email'}")
print("-" * 80)
# Print members
for member in members:
classification_name = classification_map.get(member.ClassificationId)
print(format_member_row(member, classification_name))
print(f"\nTotal: {len(members)} members")
def cmd_members_show(cli: NimbusFlowCLI, args) -> None:
"""Show detailed information about a specific member."""
member = cli.member_repo.get_by_id(args.member_id)
if not member:
print(f"❌ Member with ID {args.member_id} not found")
return
# Get classification name
classification = None
if member.ClassificationId:
classification = cli.classification_repo.get_by_id(member.ClassificationId)
print(f"\n📋 Member Details (ID: {member.MemberId})")
print("-" * 50)
print(f"Name: {member.FirstName} {member.LastName}")
print(f"Email: {member.Email or 'N/A'}")
print(f"Phone: {member.PhoneNumber or 'N/A'}")
print(f"Classification: {classification.ClassificationName if classification else 'N/A'}")
print(f"Active: {'Yes' if member.IsActive else 'No'}")
print(f"Notes: {member.Notes or 'N/A'}")
print(f"\n⏰ Schedule History:")
print(f"Last Scheduled: {member.LastScheduledAt or 'Never'}")
print(f"Last Accepted: {member.LastAcceptedAt or 'Never'}")
print(f"Last Declined: {member.LastDeclinedAt or 'Never'}")
print(f"Decline Streak: {member.DeclineStreak}")
def cmd_schedules_list(cli: NimbusFlowCLI, args) -> None:
"""List schedules with optional filters."""
print("Listing schedules...")
schedules = cli.schedule_repo.list_all()
# Apply filters
if args.service_id:
schedules = [s for s in schedules if s.ServiceId == args.service_id]
if args.status:
try:
status_enum = ScheduleStatus.from_raw(args.status.lower())
schedules = [s for s in schedules if s.Status == status_enum.value]
except ValueError:
print(f"❌ Invalid status '{args.status}'. Valid options: pending, accepted, declined")
return
if not schedules:
print("No schedules found.")
return
# Get member and service info for display
member_map = {m.MemberId: f"{m.FirstName} {m.LastName}" for m in cli.member_repo.list_all()}
service_map = {}
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
for service in cli.service_repo.list_all():
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
service_map[service.ServiceId] = f"{type_name} {service.ServiceDate}"
# Print header
print(f"\n{'ID':<3} | {'Status':<10} | {'Member':<20} | {'Service':<15} | {'Scheduled'}")
print("-" * 80)
# Print schedules
for schedule in schedules:
member_name = member_map.get(schedule.MemberId, f"ID:{schedule.MemberId}")
service_info = service_map.get(schedule.ServiceId, f"ID:{schedule.ServiceId}")
print(format_schedule_row(schedule, member_name, service_info))
print(f"\nTotal: {len(schedules)} schedules")
def cmd_schedules_show(cli: NimbusFlowCLI, args) -> None:
"""Show detailed information about a specific schedule."""
schedule = cli.schedule_repo.get_by_id(args.schedule_id)
if not schedule:
print(f"❌ Schedule with ID {args.schedule_id} not found")
return
# Get related information
member = cli.member_repo.get_by_id(schedule.MemberId)
service = cli.service_repo.get_by_id(schedule.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
print(f"\n📅 Schedule Details (ID: {schedule.ScheduleId})")
print("-" * 50)
print(f"Member: {member.FirstName} {member.LastName} (ID: {member.MemberId})" if member else f"Member ID: {schedule.MemberId}")
print(f"Service: {service_type.TypeName if service_type else 'Unknown'} on {service.ServiceDate if service else 'Unknown'}")
print(f"Status: {schedule.Status.upper()}")
print(f"Scheduled At: {schedule.ScheduledAt}")
print(f"Accepted At: {schedule.AcceptedAt or 'N/A'}")
print(f"Declined At: {schedule.DeclinedAt or 'N/A'}")
print(f"Expires At: {schedule.ExpiresAt or 'N/A'}")
if schedule.DeclineReason:
print(f"Decline Reason: {schedule.DeclineReason}")
def cmd_schedules_accept(cli: NimbusFlowCLI, args) -> None:
"""Accept a scheduled position."""
# Interactive mode with date parameter
if hasattr(args, 'date') and args.date:
try:
target_date = date.fromisoformat(args.date)
except ValueError:
print(f"❌ Invalid date format '{args.date}'. Use YYYY-MM-DD")
return
# Find services for the specified date
all_services = cli.service_repo.list_all()
services_on_date = [s for s in all_services if s.ServiceDate == target_date]
if not services_on_date:
print(f"❌ No services found for {args.date}")
return
# Get service types for display
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n📅 Services available on {args.date}:")
print("-" * 40)
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f"{i}. {type_name} (Service ID: {service.ServiceId})")
# Let user select service
try:
choice = input(f"\nSelect service (1-{len(services_on_date)}): ").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
service_index = int(choice) - 1
if service_index < 0 or service_index >= len(services_on_date):
print("❌ Invalid selection")
return
selected_service = services_on_date[service_index]
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
return
# Find pending schedules for this service
all_schedules = cli.schedule_repo.list_all()
pending_schedules = [
s for s in all_schedules
if s.ServiceId == selected_service.ServiceId and s.Status == ScheduleStatus.PENDING.value
]
if not pending_schedules:
service_type_name = service_type_map.get(selected_service.ServiceTypeId, "Unknown")
print(f"❌ No pending schedules found for {service_type_name} on {args.date}")
return
# Get member info for display
member_map = {m.MemberId: m for m in cli.member_repo.list_all()}
# Show available members to accept
print(f"\n👥 Members scheduled for {service_type_map.get(selected_service.ServiceTypeId, 'Unknown')} on {args.date}:")
print("-" * 60)
for i, schedule in enumerate(pending_schedules, 1):
member = member_map.get(schedule.MemberId)
if member:
print(f"{i}. {member.FirstName} {member.LastName} (Schedule ID: {schedule.ScheduleId})")
else:
print(f"{i}. Unknown Member (ID: {schedule.MemberId}) (Schedule ID: {schedule.ScheduleId})")
# Let user select member
try:
choice = input(f"\nSelect member to accept (1-{len(pending_schedules)}): ").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
member_index = int(choice) - 1
if member_index < 0 or member_index >= len(pending_schedules):
print("❌ Invalid selection")
return
selected_schedule = pending_schedules[member_index]
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
return
# Accept the selected schedule
schedule_to_accept = selected_schedule
# Direct mode with schedule ID
elif hasattr(args, 'schedule_id') and args.schedule_id:
schedule_to_accept = cli.schedule_repo.get_by_id(args.schedule_id)
if not schedule_to_accept:
print(f"❌ Schedule with ID {args.schedule_id} not found")
return
else:
print("❌ Either --date or schedule_id must be provided")
return
# Common validation and acceptance logic
if schedule_to_accept.Status == ScheduleStatus.ACCEPTED.value:
print(f"⚠️ Schedule {schedule_to_accept.ScheduleId} is already accepted")
return
if schedule_to_accept.Status == ScheduleStatus.DECLINED.value:
print(f"⚠️ Schedule {schedule_to_accept.ScheduleId} was previously declined")
return
# Get member and service info for display
member = cli.member_repo.get_by_id(schedule_to_accept.MemberId)
service = cli.service_repo.get_by_id(schedule_to_accept.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
# Mark the schedule as accepted
cli.schedule_repo.mark_accepted(schedule_to_accept.ScheduleId)
# Update member's acceptance timestamp
cli.member_repo.set_last_accepted(schedule_to_accept.MemberId)
print(f"✅ Schedule {schedule_to_accept.ScheduleId} accepted successfully!")
if member and service and service_type:
print(f" Member: {member.FirstName} {member.LastName}")
print(f" Service: {service_type.TypeName} on {service.ServiceDate}")
def cmd_schedules_decline(cli: NimbusFlowCLI, args) -> None:
"""Decline a scheduled position."""
# Interactive mode with date parameter
if hasattr(args, 'date') and args.date:
try:
target_date = date.fromisoformat(args.date)
except ValueError:
print(f"❌ Invalid date format '{args.date}'. Use YYYY-MM-DD")
return
# Find services for the specified date
all_services = cli.service_repo.list_all()
services_on_date = [s for s in all_services if s.ServiceDate == target_date]
if not services_on_date:
print(f"❌ No services found for {args.date}")
return
# Get service types for display
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Show available services for the date
print(f"\n📅 Services available on {args.date}:")
print("-" * 40)
for i, service in enumerate(services_on_date, 1):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
print(f"{i}. {type_name} (Service ID: {service.ServiceId})")
# Let user select service
try:
choice = input(f"\nSelect service (1-{len(services_on_date)}): ").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
service_index = int(choice) - 1
if service_index < 0 or service_index >= len(services_on_date):
print("❌ Invalid selection")
return
selected_service = services_on_date[service_index]
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
return
# Find pending schedules for this service
all_schedules = cli.schedule_repo.list_all()
pending_schedules = [
s for s in all_schedules
if s.ServiceId == selected_service.ServiceId and s.Status == ScheduleStatus.PENDING.value
]
if not pending_schedules:
service_type_name = service_type_map.get(selected_service.ServiceTypeId, "Unknown")
print(f"❌ No pending schedules found for {service_type_name} on {args.date}")
return
# Get member info for display
member_map = {m.MemberId: m for m in cli.member_repo.list_all()}
# Show available members to decline
print(f"\n👥 Members scheduled for {service_type_map.get(selected_service.ServiceTypeId, 'Unknown')} on {args.date}:")
print("-" * 60)
for i, schedule in enumerate(pending_schedules, 1):
member = member_map.get(schedule.MemberId)
if member:
print(f"{i}. {member.FirstName} {member.LastName} (Schedule ID: {schedule.ScheduleId})")
else:
print(f"{i}. Unknown Member (ID: {schedule.MemberId}) (Schedule ID: {schedule.ScheduleId})")
# Let user select member
try:
choice = input(f"\nSelect member to decline (1-{len(pending_schedules)}): ").strip()
if not choice or not choice.isdigit():
print("❌ Invalid selection")
return
member_index = int(choice) - 1
if member_index < 0 or member_index >= len(pending_schedules):
print("❌ Invalid selection")
return
selected_schedule = pending_schedules[member_index]
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
return
# Get decline reason if not provided
decline_reason = args.reason if hasattr(args, 'reason') and args.reason else None
if not decline_reason:
try:
decline_reason = input("\nEnter decline reason (optional, press Enter to skip): ").strip()
if not decline_reason:
decline_reason = None
except (KeyboardInterrupt, EOFError):
print("\n🛑 Cancelled")
return
# Decline the selected schedule
schedule_to_decline = selected_schedule
# Direct mode with schedule ID
elif hasattr(args, 'schedule_id') and args.schedule_id:
schedule_to_decline = cli.schedule_repo.get_by_id(args.schedule_id)
if not schedule_to_decline:
print(f"❌ Schedule with ID {args.schedule_id} not found")
return
decline_reason = args.reason if hasattr(args, 'reason') else None
else:
print("❌ Either --date or schedule_id must be provided")
return
# Common validation and decline logic
if schedule_to_decline.Status == ScheduleStatus.DECLINED.value:
print(f"⚠️ Schedule {schedule_to_decline.ScheduleId} is already declined")
return
if schedule_to_decline.Status == ScheduleStatus.ACCEPTED.value:
print(f"⚠️ Schedule {schedule_to_decline.ScheduleId} was previously accepted")
return
# Get member and service info for display
member = cli.member_repo.get_by_id(schedule_to_decline.MemberId)
service = cli.service_repo.get_by_id(schedule_to_decline.ServiceId)
service_type = None
if service:
service_type = cli.service_type_repo.get_by_id(service.ServiceTypeId)
# Mark the schedule as declined
cli.schedule_repo.mark_declined(schedule_to_decline.ScheduleId, decline_reason=decline_reason)
# Update member's decline timestamp (using service date)
if service:
cli.member_repo.set_last_declined(schedule_to_decline.MemberId, str(service.ServiceDate))
print(f"❌ Schedule {schedule_to_decline.ScheduleId} declined successfully!")
if member and service and service_type:
print(f" Member: {member.FirstName} {member.LastName}")
print(f" Service: {service_type.TypeName} on {service.ServiceDate}")
if decline_reason:
print(f" Reason: {decline_reason}")
def cmd_services_list(cli: NimbusFlowCLI, args) -> None:
"""List services with optional filters."""
print("Listing services...")
# Apply filters
if args.upcoming:
services = cli.service_repo.upcoming(limit=args.limit or 50)
elif args.date:
try:
target_date = date.fromisoformat(args.date)
# Get services for specific date
all_services = cli.service_repo.list_all()
services = [s for s in all_services if s.ServiceDate == target_date]
except ValueError:
print(f"❌ Invalid date format '{args.date}'. Use YYYY-MM-DD")
return
else:
services = cli.service_repo.list_all()
if args.limit:
services = services[:args.limit]
if not services:
print("No services found.")
return
# Get service type names for display
service_type_map = {st.ServiceTypeId: st.TypeName for st in cli.service_type_repo.list_all()}
# Get schedule counts for each service
schedule_counts = {}
for service in services:
schedules = cli.schedule_repo.list_all()
service_schedules = [s for s in schedules if s.ServiceId == service.ServiceId]
pending_count = len([s for s in service_schedules if s.Status == ScheduleStatus.PENDING.value])
accepted_count = len([s for s in service_schedules if s.Status == ScheduleStatus.ACCEPTED.value])
declined_count = len([s for s in service_schedules if s.Status == ScheduleStatus.DECLINED.value])
schedule_counts[service.ServiceId] = {
'pending': pending_count,
'accepted': accepted_count,
'declined': declined_count,
'total': len(service_schedules)
}
# Print header
print(f"\n{'ID':<3} | {'Date':<12} | {'Service Type':<12} | {'Total':<5} | {'Pending':<7} | {'Accepted':<8} | {'Declined'}")
print("-" * 85)
# Print services
for service in sorted(services, key=lambda s: s.ServiceDate):
type_name = service_type_map.get(service.ServiceTypeId, "Unknown")
counts = schedule_counts.get(service.ServiceId, {'total': 0, 'pending': 0, 'accepted': 0, 'declined': 0})
# Format date properly
date_str = str(service.ServiceDate) if service.ServiceDate else "N/A"
print(f"{service.ServiceId:<3} | {date_str:<12} | {type_name:<12} | "
f"{counts['total']:<5} | {counts['pending']:<7} | {counts['accepted']:<8} | {counts['declined']}")
print(f"\nTotal: {len(services)} services")
def setup_parser() -> argparse.ArgumentParser:
"""Set up the command-line argument parser."""
parser = argparse.ArgumentParser(
description="NimbusFlow CLI - Manage the scheduling system",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# Members commands
members_parser = subparsers.add_parser("members", help="Manage members")
members_subparsers = members_parser.add_subparsers(dest="members_action", help="Members actions")
# members list
members_list_parser = members_subparsers.add_parser("list", help="List members")
members_list_parser.add_argument("--active", action="store_true", help="Show only active members")
members_list_parser.add_argument("--classification", type=str, help="Filter by classification name")
# members show
members_show_parser = members_subparsers.add_parser("show", help="Show member details")
members_show_parser.add_argument("member_id", type=int, help="Member ID to show")
# Schedules commands
schedules_parser = subparsers.add_parser("schedules", help="Manage schedules")
schedules_subparsers = schedules_parser.add_subparsers(dest="schedules_action", help="Schedule actions")
# schedules list
schedules_list_parser = schedules_subparsers.add_parser("list", help="List schedules")
schedules_list_parser.add_argument("--service-id", type=int, help="Filter by service ID")
schedules_list_parser.add_argument("--status", type=str, help="Filter by status (pending/accepted/declined)")
# schedules show
schedules_show_parser = schedules_subparsers.add_parser("show", help="Show schedule details")
schedules_show_parser.add_argument("schedule_id", type=int, help="Schedule ID to show")
# schedules accept
schedules_accept_parser = schedules_subparsers.add_parser("accept", help="Accept a scheduled position")
schedules_accept_parser.add_argument("schedule_id", type=int, nargs="?", help="Schedule ID to accept (optional if using --date)")
schedules_accept_parser.add_argument("--date", type=str, help="Interactive mode: select service and member by date (YYYY-MM-DD)")
# schedules decline
schedules_decline_parser = schedules_subparsers.add_parser("decline", help="Decline a scheduled position")
schedules_decline_parser.add_argument("schedule_id", type=int, nargs="?", help="Schedule ID to decline (optional if using --date)")
schedules_decline_parser.add_argument("--date", type=str, help="Interactive mode: select service and member by date (YYYY-MM-DD)")
schedules_decline_parser.add_argument("--reason", type=str, help="Reason for declining")
# Services commands
services_parser = subparsers.add_parser("services", help="Manage services")
services_subparsers = services_parser.add_subparsers(dest="services_action", help="Services actions")
# services list
services_list_parser = services_subparsers.add_parser("list", help="List services")
services_list_parser.add_argument("--date", type=str, help="Filter by specific date (YYYY-MM-DD)")
services_list_parser.add_argument("--upcoming", action="store_true", help="Show only upcoming services")
services_list_parser.add_argument("--limit", type=int, help="Limit number of results")
return parser
def main():
"""Main CLI entry point."""
parser = setup_parser()
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
cli = NimbusFlowCLI()
# Route commands
if args.command == "members":
if args.members_action == "list":
cmd_members_list(cli, args)
elif args.members_action == "show":
cmd_members_show(cli, args)
else:
print("❌ Unknown members action. Use 'list' or 'show'")
elif args.command == "schedules":
if args.schedules_action == "list":
cmd_schedules_list(cli, args)
elif args.schedules_action == "show":
cmd_schedules_show(cli, args)
elif args.schedules_action == "accept":
cmd_schedules_accept(cli, args)
elif args.schedules_action == "decline":
cmd_schedules_decline(cli, args)
else:
print("❌ Unknown schedules action. Use 'list', 'show', 'accept', or 'decline'")
elif args.command == "services":
if args.services_action == "list":
cmd_services_list(cli, args)
else:
print("❌ Unknown services action. Use 'list'")
else:
print(f"❌ Unknown command: {args.command}")
except CLIError as e:
print(f"❌ Error: {e}")
return 1
except KeyboardInterrupt:
print("\n🛑 Interrupted by user")
return 1
except Exception as e:
print(f"❌ Unexpected error: {e}")
return 1
finally:
if 'cli' in locals():
cli.close()
return 0
if __name__ == "__main__":
sys.exit(main())