QuperSyncDomain Layer

Repositories

The repository pattern abstracts data access behind interfaces. The domain layer defines abstract repository classes (ABCs) that specify what data operations are needed. The infrastructure layer provides concrete implementations that use real database drivers. Domain services only depend on the abstract interfaces — never on the implementations.

Abstract Repository Interface

domain/repositories/target.py
from abc import ABC, abstractmethod
from typing import List, Optional
from datetime import datetime
from domain.entities import CostUsageRecord

class CostUsageRepository(ABC):
    @abstractmethod
    async def upsert(self, record: CostUsageRecord) -> None:
        """Insert or update a single cost usage record."""
        ...

    @abstractmethod
    async def upsert_batch(self, records: List[CostUsageRecord]) -> int:
        """
        Batch upsert. Returns the count of rows inserted/updated.
        Must execute as a single transaction — roll back entire batch on failure.
        """
        ...

    @abstractmethod
    async def get_last_sync_time(self, table_name: str) -> Optional[datetime]:
        """
        Returns the last successful sync watermark for the given table.
        Returns None if the table has never been synced.
        """
        ...

    @abstractmethod
    async def update_sync_watermark(
        self, table_name: str, sync_time: datetime
    ) -> None:
        """
        Persists the sync watermark for incremental sync state.
        Called only after a fully successful sync run.
        """
        ...

Source Repository Interface

domain/repositories/source.py
from abc import ABC, abstractmethod
from typing import Iterator, List
from datetime import datetime

class RedshiftSourceRepository(ABC):
    @abstractmethod
    def extract_full(self, table_name: str, batch_size: int) -> Iterator[List[dict]]:
        """
        Full extraction — yields batches of rows as dicts.
        No watermark filtering applied.
        """
        ...

    @abstractmethod
    def extract_incremental(
        self,
        table_name: str,
        watermark_column: str,
        watermark_value: datetime,
        batch_size: int,
    ) -> Iterator[List[dict]]:
        """
        Incremental extraction — yields batches of rows where
        watermark_column > watermark_value.
        """
        ...

PostgreSQL Implementation

The infrastructure layer provides the concrete PostgreSQL implementation ofCostUsageRepository:

infrastructure/postgres/repository.py
from typing import List, Optional
from datetime import datetime
import psycopg2
from domain.repositories.target import CostUsageRepository
from domain.entities import CostUsageRecord

class PostgresCostUsageRepository(CostUsageRepository):

    def __init__(self, connection):
        self._conn = connection

    async def upsert_batch(self, records: List[CostUsageRecord]) -> int:
        sql = """
            INSERT INTO cost_usage (id, account_id, service, effective_cost, usage_date)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (id) DO UPDATE SET
                effective_cost = EXCLUDED.effective_cost,
                usage_date = EXCLUDED.usage_date
        """
        values = [
            (r.id, r.account_id, r.service, r.effective_cost, r.usage_date)
            for r in records
        ]
        with self._conn.cursor() as cur:
            cur.executemany(sql, values)
            self._conn.commit()
        return len(records)

    async def get_last_sync_time(self, table_name: str) -> Optional[datetime]:
        with self._conn.cursor() as cur:
            cur.execute(
                "SELECT last_sync_time FROM qupersync_state WHERE table_name = %s",
                (table_name,)
            )
            row = cur.fetchone()
            return row[0] if row else None

    async def update_sync_watermark(
        self, table_name: str, sync_time: datetime
    ) -> None:
        with self._conn.cursor() as cur:
            cur.execute("""
                INSERT INTO qupersync_state (table_name, last_sync_time)
                VALUES (%s, %s)
                ON CONFLICT (table_name) DO UPDATE
                SET last_sync_time = EXCLUDED.last_sync_time
            """, (table_name, sync_time))
            self._conn.commit()

Watermark State Table

The sync watermark is persisted in a qupersync_state table in PostgreSQL. This table tracks the last successful sync time per source table:

qupersync_state — DDL
CREATE TABLE qupersync_state (
    table_name      VARCHAR(255) PRIMARY KEY,
    last_sync_time  TIMESTAMP WITH TIME ZONE NOT NULL,
    updated_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Dependency Inversion

Domain services reference only CostUsageRepository (the abstract interface) — never PostgresCostUsageRepository (the concrete class). The concrete implementation is injected at application startup via dependency injection. This means domain services can be unit tested with a simple in-memory mock repository, no PostgreSQL connection required.