QuperSyncDomain Layer
Repositories
The repository pattern abstracts data access behind interfaces. The domain layer defines abstract repository classes (ABCs) that specify what data operations are needed. The infrastructure layer provides concrete implementations that use real database drivers. Domain services only depend on the abstract interfaces — never on the implementations.
Abstract Repository Interface
domain/repositories/target.py
from abc import ABC, abstractmethod
from typing import List, Optional
from datetime import datetime
from domain.entities import CostUsageRecord
class CostUsageRepository(ABC):
@abstractmethod
async def upsert(self, record: CostUsageRecord) -> None:
"""Insert or update a single cost usage record."""
...
@abstractmethod
async def upsert_batch(self, records: List[CostUsageRecord]) -> int:
"""
Batch upsert. Returns the count of rows inserted/updated.
Must execute as a single transaction — roll back entire batch on failure.
"""
...
@abstractmethod
async def get_last_sync_time(self, table_name: str) -> Optional[datetime]:
"""
Returns the last successful sync watermark for the given table.
Returns None if the table has never been synced.
"""
...
@abstractmethod
async def update_sync_watermark(
self, table_name: str, sync_time: datetime
) -> None:
"""
Persists the sync watermark for incremental sync state.
Called only after a fully successful sync run.
"""
...Source Repository Interface
domain/repositories/source.py
from abc import ABC, abstractmethod
from typing import Iterator, List
from datetime import datetime
class RedshiftSourceRepository(ABC):
@abstractmethod
def extract_full(self, table_name: str, batch_size: int) -> Iterator[List[dict]]:
"""
Full extraction — yields batches of rows as dicts.
No watermark filtering applied.
"""
...
@abstractmethod
def extract_incremental(
self,
table_name: str,
watermark_column: str,
watermark_value: datetime,
batch_size: int,
) -> Iterator[List[dict]]:
"""
Incremental extraction — yields batches of rows where
watermark_column > watermark_value.
"""
...PostgreSQL Implementation
The infrastructure layer provides the concrete PostgreSQL implementation ofCostUsageRepository:
infrastructure/postgres/repository.py
from typing import List, Optional
from datetime import datetime
import psycopg2
from domain.repositories.target import CostUsageRepository
from domain.entities import CostUsageRecord
class PostgresCostUsageRepository(CostUsageRepository):
def __init__(self, connection):
self._conn = connection
async def upsert_batch(self, records: List[CostUsageRecord]) -> int:
sql = """
INSERT INTO cost_usage (id, account_id, service, effective_cost, usage_date)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
effective_cost = EXCLUDED.effective_cost,
usage_date = EXCLUDED.usage_date
"""
values = [
(r.id, r.account_id, r.service, r.effective_cost, r.usage_date)
for r in records
]
with self._conn.cursor() as cur:
cur.executemany(sql, values)
self._conn.commit()
return len(records)
async def get_last_sync_time(self, table_name: str) -> Optional[datetime]:
with self._conn.cursor() as cur:
cur.execute(
"SELECT last_sync_time FROM qupersync_state WHERE table_name = %s",
(table_name,)
)
row = cur.fetchone()
return row[0] if row else None
async def update_sync_watermark(
self, table_name: str, sync_time: datetime
) -> None:
with self._conn.cursor() as cur:
cur.execute("""
INSERT INTO qupersync_state (table_name, last_sync_time)
VALUES (%s, %s)
ON CONFLICT (table_name) DO UPDATE
SET last_sync_time = EXCLUDED.last_sync_time
""", (table_name, sync_time))
self._conn.commit()Watermark State Table
The sync watermark is persisted in a qupersync_state table in PostgreSQL. This table tracks the last successful sync time per source table:
qupersync_state — DDL
CREATE TABLE qupersync_state (
table_name VARCHAR(255) PRIMARY KEY,
last_sync_time TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);Dependency Inversion
Domain services reference only
CostUsageRepository (the abstract interface) — never PostgresCostUsageRepository (the concrete class). The concrete implementation is injected at application startup via dependency injection. This means domain services can be unit tested with a simple in-memory mock repository, no PostgreSQL connection required.