QuperSyncDomain LayerEntities

Domain Entities

Domain entities in QuperSync are pure Python dataclasses that represent the core data structures being synchronized. They contain no infrastructure dependencies — no SQLAlchemy, no Redshift drivers. This keeps the domain layer testable and portable.

DDD Principle

Domain entities represent business concepts, not database tables. The mapping between entities and database tables is handled by the infrastructure layer's repository implementations.

Entity Definitions

TableStats Entity

domain/entities/table_stats.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class TableStats:
    """
    Represents metadata and health metrics for a Redshift table.
    Sourced from svv_table_info and related system views.
    """
    schema: str
    table_name: str
    table_id: int
    tbl_rows: Optional[int]             # Estimated row count
    size_mb: float                      # Table size in megabytes
    unsorted_pct: Optional[float]       # % rows not in sort key order
    pct_skew_across_slices: float       # Distribution skew percentage
    diststyle: str                      # EVEN | KEY | ALL | AUTO
    sort_key1: Optional[str]            # Primary sort key column
    sortkey_num: int                    # Number of sort keys
    encoded: str                        # Y/N — any compressed columns?
    pct_rows_marked_for_deletion: float # Ghost row percentage
    last_vacuum_date: Optional[datetime]
    vacuum_type: Optional[str]          # FULL | SORT ONLY | DELETE ONLY
    synced_at: datetime = None          # Set by sync job

    @staticmethod
    def field_types() -> dict:
        return {
            "schema": str,
            "table_name": str,
            "tbl_rows": int,
            "size_mb": float,
            ...
        }

QueryHistory Entity

domain/entities/query_history.py
@dataclass
class QueryHistory:
    """
    Represents a historical Redshift query execution record.
    Sourced from sys_query_history (7-day rolling window).
    """
    query_id: int
    query_hash: str           # MD5 of normalized query text
    user_name: str
    application_name: str
    database_name: str
    status: str               # success | failed | cancelled
    query_type: str           # SELECT | INSERT | UPDATE | DELETE | DDL
    execution_time_us: int    # Microseconds
    queue_time_us: int        # WLM queue wait in microseconds
    compile_time_us: int      # Query compilation time
    rows_returned: int
    bytes_scanned: int
    spilled_blocks: int       # Disk spill blocks (8KB each)
    start_time: datetime
    end_time: datetime
    error_message: Optional[str]
    synced_at: datetime = None

    @property
    def execution_seconds(self) -> float:
        return self.execution_time_us / 1_000_000

    @property
    def spill_gb(self) -> float:
        return (self.spilled_blocks * 8.0) / (1024 * 1024)

VacuumHistory Entity

domain/entities/vacuum_history.py
@dataclass
class VacuumHistory:
    """
    Records of vacuum operations on Redshift tables.
    Sourced from stl_vacuum.
    """
    table_id: int
    table_name: str
    schema: str
    vacuum_type: str          # FULL | SORT ONLY | DELETE ONLY | RECLUSTER
    status: str               # Initiated | Started | Finished | Failed
    start_time: datetime
    end_time: Optional[datetime]
    reclaimable_rows: Optional[int]
    impacted_blocks: Optional[int]  # Each block = 1MB reclaimable

    @property
    def reclaimable_gb(self) -> Optional[float]:
        if self.impacted_blocks is None:
            return None
        return self.impacted_blocks / 1024.0

NodeDetails Entity

domain/entities/node_details.py
@dataclass
class NodeDetails:
    """
    Hardware specifications for each Redshift cluster node.
    Sourced from stv_slices + stv_node_storage_capacity.
    """
    node_id: int
    node_type: str            # dc1.large | dc2.8xlarge | ra3.4xlarge | etc.
    slice_count: int          # Number of data slices on this node
    is_nvme: bool             # NVMe SSD storage
    capacity_mb: int          # Total storage capacity
    used_mb: int              # Currently used storage

    @property
    def storage_utilization_pct(self) -> float:
        return (1.0 * self.used_mb / self.capacity_mb) * 100 if self.capacity_mb > 0 else 0.0

    @property
    def capacity_gb(self) -> float:
        return self.capacity_mb / 1024.0

Base Entity

domain/entities/base.py
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class BaseEntity:
    synced_at: datetime = None

    def to_dict(self) -> dict:
        """Convert entity to a dict suitable for database insertion."""
        d = asdict(self)
        if self.synced_at is None:
            d['synced_at'] = datetime.utcnow()
        return d

    @classmethod
    def field_types(cls) -> dict[str, type]:
        """Returns field name → Python type mapping for transformation."""
        return {
            field.name: field.type
            for field in cls.__dataclass_fields__.values()
        }