Files
nimbusflow/backend/db/connection.py

186 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Thin convenience layer over the builtin ``sqlite3`` module.
Why we need a wrapper
---------------------
* The repository (`repository.py`) expects the following public API:
- ``execute`` run an INSERT/UPDATE/DELETE.
- ``fetchone`` / ``fetchall`` run a SELECT and get the result(s).
- ``lastrowid`` primarykey of the most recent INSERT.
- ``close`` close the DB connection.
* A wrapper lets us:
• Set a sensible ``row_factory`` (``sqlite3.Row``) so column names are
accessible as ``row["ColumnName"]``.
• Centralise ``commit``/``rollback`` handling.
• Provide type hints and a contextmanager interface
(``with DatabaseConnection(...):``) which is handy for tests.
* No external dependencies everything stays purePython/SQLite.
The implementation below is deliberately tiny: it only does what the
application needs while remaining easy to extend later (e.g. add
connectionpooling or logging).
"""
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Any, Iterable, Tuple, List, Optional, Union
class DatabaseConnection:
"""
Simple wrapper around a SQLite connection.
Core behaviour
---------------
* ``row_factory`` is set to :class:`sqlite3.Row` callers can use
``row["col_name"]`` or treat the row like a mapping.
* All ``execute`` calls are automatically committed.
If an exception bubbles out, the transaction is rolled back.
* ``execute`` returns the cursor so callers can chain
``cursor.lastrowid`` if they need the autogenerated PK.
* Implements the contextmanager protocol (``with ... as db:``).
The public API matches what ``Repository`` expects:
- execute(sql, params=None) → None
- fetchone(sql, params=None) → Optional[sqlite3.Row]
- fetchall(sql, params=None) → List[sqlite3.Row]
- lastrowid → int
- close()
"""
# -----------------------------------------------------------------
# Construction / contextmanager protocol
# -----------------------------------------------------------------
def __init__(
self,
db_path: Union[str, Path],
*,
timeout: float = 5.0,
detect_types: int = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
) -> None:
"""
Parameters
----------
db_path
Path to the SQLite file. ``":memory:"`` works for tests.
timeout
Seconds to wait for a lock before raising ``sqlite3.OperationalError``.
detect_types
Enable type conversion (so DATE/DATETIME are returned as ``datetime``).
"""
self._conn: sqlite3.Connection = sqlite3.connect(
str(db_path), timeout=timeout, detect_types=detect_types
)
# ``Row`` makes column access dictionarylike and preserves order.
self._conn.row_factory = sqlite3.Row
self._cursor: sqlite3.Cursor = self._conn.cursor()
def __enter__(self) -> "DatabaseConnection":
"""Allow ``with DatabaseConnection(...) as db:`` usage."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
"""
On normal exit commit the transaction, otherwise roll back.
Returning ``False`` propagates any exception.
"""
if exc_type is None:
try:
self._conn.commit()
finally:
self.close()
else:
# Something went wrong roll back to keep the DB clean.
self._conn.rollback()
self.close()
# ``None`` means “dont suppress exceptions”
return None
# -----------------------------------------------------------------
# Lowlevel helpers used by the repository
# -----------------------------------------------------------------
@property
def cursor(self) -> sqlite3.Cursor:
"""Expose the underlying cursor rarely needed outside the repo."""
return self._cursor
@property
def lastrowid(self) -> int:
"""PK of the most recent ``INSERT`` executed on this connection."""
return self._cursor.lastrowid
# -----------------------------------------------------------------
# Public API the four methods used throughout the code base
# -----------------------------------------------------------------
def execute(self, sql: str, params: Optional[Tuple[Any, ...]] = None) -> sqlite3.Cursor:
"""
Run an INSERT/UPDATE/DELETE statement and commit immediately.
Returns the underlying ``sqlite3.Cursor`` so callers can inspect
``lastrowid``, ``rowcount`` etc. This mirrors the behaviour of the
standard ``sqlite3.Connection.execute`` method.
"""
try:
if params is None:
cursor = self._cursor.execute(sql) # ← capture cursor
else:
cursor = self._cursor.execute(sql, params) # ← capture cursor
self._conn.commit()
return cursor # ← **return it**
except sqlite3.Error as exc:
# Keep the original error handling but reraise after logging.
# self._logger.error(
# "SQL execution error: %s SQL: %s Params: %s",
# exc,
# sql,
# params,
# )
raise
def fetchone(
self, sql: str, params: Optional[Tuple[Any, ...]] = None
) -> Optional[sqlite3.Row]:
"""
Execute a SELECT that returns at most one row.
Returns ``None`` when the result set is empty.
"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
return self._cursor.fetchone()
def fetchall(
self, sql: str, params: Optional[Tuple[Any, ...]] = None
) -> List[sqlite3.Row]:
"""
Execute a SELECT and return **all** rows as a list.
The rows are ``sqlite3.Row`` instances, which behave like dicts.
"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
return self._cursor.fetchall()
def executescript(self, script: str) -> None:
"""Convenient wrapper for sqlite3.Connection.executescript."""
try:
self._conn.executescript(script)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
def close(self) -> None:
"""Close the underlying SQLite connection."""
# ``cursor`` is automatically closed when the connection closes,
# but we explicitly close it for clarity.
try:
self._cursor.close()
finally:
self._conn.close()