Domain Services
Domain services coordinate entities and repositories to implement the core sync workflows. They contain business logic that doesn't belong to a single entity — orchestration, decision making, and cross-entity validation. Domain services never import from the infrastructure layer; they work exclusively with abstract repository interfaces.
SyncService
SyncService is the primary orchestrator of the Extract-Transform-Load cycle for a single sync job.
from dataclasses import dataclass
from domain.repositories.source import RedshiftSourceRepository
from domain.repositories.target import CostUsageRepository
@dataclass
class SyncResult:
rows_extracted: int
rows_upserted: int
errors: int
success: bool
class SyncService:
def __init__(
self,
source: RedshiftSourceRepository,
target: CostUsageRepository,
):
self.source = source
self.target = target
async def run_incremental_sync(
self, table_name: str, watermark_column: str
) -> SyncResult:
last_sync = await self.target.get_last_sync_time(table_name)
extracted = 0
upserted = 0
errors = 0
for batch in self.source.extract_incremental(
table_name, watermark_column, last_sync, batch_size=1000
):
try:
entities = [CostUsageRecord.from_dict(row) for row in batch]
count = await self.target.upsert_batch(entities)
extracted += len(batch)
upserted += count
except Exception as e:
errors += 1
# Log and continue to next batch
if errors == 0:
await self.target.update_sync_watermark(table_name, now())
return SyncResult(extracted, upserted, errors, errors == 0)WatermarkService
WatermarkService manages incremental sync state — it determines whether the next sync run should be full or incremental, and tracks the high-water mark for each table.
| Decision | Condition | Action |
|---|---|---|
| Full sync | No watermark exists in qupersync_state for this table | Run full extraction (no WHERE clause) |
| Full sync | More than 24 hours since the last full sync | Run full extraction to catch any backfills or corrections in the source |
| Incremental sync | Watermark exists and was updated less than 24 hours ago | Run incremental extraction filtered by watermark |
ValidationService
ValidationService runs pre-flight checks before a sync job begins. This catches configuration errors early before any data is moved.
@dataclass
class ValidationResult:
passed: bool
checks: list[dict] # [{name, passed, message}]
class ValidationService:
async def validate_sync_config(
self,
source: RedshiftSourceRepository,
target: CostUsageRepository,
table_name: str,
) -> ValidationResult:
checks = []
# Check 1: Source table exists
exists = await source.table_exists(table_name)
checks.append({
"name": "source_table_exists",
"passed": exists,
"message": f"Table {table_name} {'found' if exists else 'NOT FOUND'} in Redshift"
})
# Check 2: Watermark column exists
# Check 3: Target schema matches source schema
# Check 4: Required columns present in source
# ... additional checks
return ValidationResult(
passed=all(c["passed"] for c in checks),
checks=checks
)Sync Statistics Tracking
Each SyncResult returned by SyncService includes:
- rows_extracted — Total rows read from Redshift across all batches
- rows_upserted — Total rows successfully written to PostgreSQL
- errors — Number of batches that failed (after retries)
- success — True only if errors == 0 (watermark is updated)
Sync results are logged to the application log and accessible via theGET /scheduler/jobs API endpoint for the most recent run.
Testability by Design