QuperSyncDomain LayerEntities
Domain Entities
Domain entities in QuperSync are pure Python dataclasses that represent the core data structures being synchronized. They contain no infrastructure dependencies — no SQLAlchemy, no Redshift drivers. This keeps the domain layer testable and portable.
DDD Principle
Domain entities represent business concepts, not database tables. The mapping between entities and database tables is handled by the infrastructure layer's repository implementations.
Entity Definitions
TableStats Entity
domain/entities/table_stats.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class TableStats:
"""
Represents metadata and health metrics for a Redshift table.
Sourced from svv_table_info and related system views.
"""
schema: str
table_name: str
table_id: int
tbl_rows: Optional[int] # Estimated row count
size_mb: float # Table size in megabytes
unsorted_pct: Optional[float] # % rows not in sort key order
pct_skew_across_slices: float # Distribution skew percentage
diststyle: str # EVEN | KEY | ALL | AUTO
sort_key1: Optional[str] # Primary sort key column
sortkey_num: int # Number of sort keys
encoded: str # Y/N — any compressed columns?
pct_rows_marked_for_deletion: float # Ghost row percentage
last_vacuum_date: Optional[datetime]
vacuum_type: Optional[str] # FULL | SORT ONLY | DELETE ONLY
synced_at: datetime = None # Set by sync job
@staticmethod
def field_types() -> dict:
return {
"schema": str,
"table_name": str,
"tbl_rows": int,
"size_mb": float,
...
}QueryHistory Entity
domain/entities/query_history.py
@dataclass
class QueryHistory:
"""
Represents a historical Redshift query execution record.
Sourced from sys_query_history (7-day rolling window).
"""
query_id: int
query_hash: str # MD5 of normalized query text
user_name: str
application_name: str
database_name: str
status: str # success | failed | cancelled
query_type: str # SELECT | INSERT | UPDATE | DELETE | DDL
execution_time_us: int # Microseconds
queue_time_us: int # WLM queue wait in microseconds
compile_time_us: int # Query compilation time
rows_returned: int
bytes_scanned: int
spilled_blocks: int # Disk spill blocks (8KB each)
start_time: datetime
end_time: datetime
error_message: Optional[str]
synced_at: datetime = None
@property
def execution_seconds(self) -> float:
return self.execution_time_us / 1_000_000
@property
def spill_gb(self) -> float:
return (self.spilled_blocks * 8.0) / (1024 * 1024)VacuumHistory Entity
domain/entities/vacuum_history.py
@dataclass
class VacuumHistory:
"""
Records of vacuum operations on Redshift tables.
Sourced from stl_vacuum.
"""
table_id: int
table_name: str
schema: str
vacuum_type: str # FULL | SORT ONLY | DELETE ONLY | RECLUSTER
status: str # Initiated | Started | Finished | Failed
start_time: datetime
end_time: Optional[datetime]
reclaimable_rows: Optional[int]
impacted_blocks: Optional[int] # Each block = 1MB reclaimable
@property
def reclaimable_gb(self) -> Optional[float]:
if self.impacted_blocks is None:
return None
return self.impacted_blocks / 1024.0NodeDetails Entity
domain/entities/node_details.py
@dataclass
class NodeDetails:
"""
Hardware specifications for each Redshift cluster node.
Sourced from stv_slices + stv_node_storage_capacity.
"""
node_id: int
node_type: str # dc1.large | dc2.8xlarge | ra3.4xlarge | etc.
slice_count: int # Number of data slices on this node
is_nvme: bool # NVMe SSD storage
capacity_mb: int # Total storage capacity
used_mb: int # Currently used storage
@property
def storage_utilization_pct(self) -> float:
return (1.0 * self.used_mb / self.capacity_mb) * 100 if self.capacity_mb > 0 else 0.0
@property
def capacity_gb(self) -> float:
return self.capacity_mb / 1024.0Base Entity
domain/entities/base.py
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class BaseEntity:
synced_at: datetime = None
def to_dict(self) -> dict:
"""Convert entity to a dict suitable for database insertion."""
d = asdict(self)
if self.synced_at is None:
d['synced_at'] = datetime.utcnow()
return d
@classmethod
def field_types(cls) -> dict[str, type]:
"""Returns field name → Python type mapping for transformation."""
return {
field.name: field.type
for field in cls.__dataclass_fields__.values()
}