QuperSyncDomain Layer

Domain Services

Domain services coordinate entities and repositories to implement the core sync workflows. They contain business logic that doesn't belong to a single entity — orchestration, decision making, and cross-entity validation. Domain services never import from the infrastructure layer; they work exclusively with abstract repository interfaces.

SyncService

SyncService is the primary orchestrator of the Extract-Transform-Load cycle for a single sync job.

domain/services/sync_service.py
from dataclasses import dataclass
from domain.repositories.source import RedshiftSourceRepository
from domain.repositories.target import CostUsageRepository

@dataclass
class SyncResult:
    rows_extracted: int
    rows_upserted: int
    errors: int
    success: bool

class SyncService:
    def __init__(
        self,
        source: RedshiftSourceRepository,
        target: CostUsageRepository,
    ):
        self.source = source
        self.target = target

    async def run_incremental_sync(
        self, table_name: str, watermark_column: str
    ) -> SyncResult:
        last_sync = await self.target.get_last_sync_time(table_name)

        extracted = 0
        upserted = 0
        errors = 0

        for batch in self.source.extract_incremental(
            table_name, watermark_column, last_sync, batch_size=1000
        ):
            try:
                entities = [CostUsageRecord.from_dict(row) for row in batch]
                count = await self.target.upsert_batch(entities)
                extracted += len(batch)
                upserted += count
            except Exception as e:
                errors += 1
                # Log and continue to next batch

        if errors == 0:
            await self.target.update_sync_watermark(table_name, now())

        return SyncResult(extracted, upserted, errors, errors == 0)

WatermarkService

WatermarkService manages incremental sync state — it determines whether the next sync run should be full or incremental, and tracks the high-water mark for each table.

DecisionConditionAction
Full syncNo watermark exists in qupersync_state for this tableRun full extraction (no WHERE clause)
Full syncMore than 24 hours since the last full syncRun full extraction to catch any backfills or corrections in the source
Incremental syncWatermark exists and was updated less than 24 hours agoRun incremental extraction filtered by watermark

ValidationService

ValidationService runs pre-flight checks before a sync job begins. This catches configuration errors early before any data is moved.

domain/services/validation.py
@dataclass
class ValidationResult:
    passed: bool
    checks: list[dict]  # [{name, passed, message}]

class ValidationService:
    async def validate_sync_config(
        self,
        source: RedshiftSourceRepository,
        target: CostUsageRepository,
        table_name: str,
    ) -> ValidationResult:
        checks = []

        # Check 1: Source table exists
        exists = await source.table_exists(table_name)
        checks.append({
            "name": "source_table_exists",
            "passed": exists,
            "message": f"Table {table_name} {'found' if exists else 'NOT FOUND'} in Redshift"
        })

        # Check 2: Watermark column exists
        # Check 3: Target schema matches source schema
        # Check 4: Required columns present in source
        # ... additional checks

        return ValidationResult(
            passed=all(c["passed"] for c in checks),
            checks=checks
        )

Sync Statistics Tracking

Each SyncResult returned by SyncService includes:

  • rows_extracted — Total rows read from Redshift across all batches
  • rows_upserted — Total rows successfully written to PostgreSQL
  • errors — Number of batches that failed (after retries)
  • success — True only if errors == 0 (watermark is updated)

Sync results are logged to the application log and accessible via theGET /scheduler/jobs API endpoint for the most recent run.

Testability by Design

Domain services never import from the infrastructure layer directly — they only work with abstract repository interfaces. This means they can be tested with simple mock repositories that implement the abstract interface using in-memory dictionaries or lists, without any Redshift or PostgreSQL connections. The full sync workflow can be validated in milliseconds in a unit test.