QuperSync Workflow
This page documents the complete data synchronization lifecycle in QuperSync — from scheduler trigger to confirmed PostgreSQL write.
High-Level Flow
┌────────────────────────────────────────────────────────────┐
│ APScheduler (AsyncIO) │
│ │
│ Interval Job: every 300s (configurable via SYNC_INTERVAL) │
│ ↓ │
│ SyncOrchestrator.run_all_syncs() │
│ ↓ │
│ For each sync_job in registry: │
│ ┌────────────────────────────────────────────────────┐ │
│ │ 1. Extract: RedshiftAdapter.execute_query() │ │
│ │ → SQL query against Redshift system views │ │
│ │ → Returns List[Dict] rows │ │
│ │ │ │
│ │ 2. Transform: DataTransformer.transform() │ │
│ │ → Type coercion, null handling, renaming │ │
│ │ → Returns List[Entity] │ │
│ │ │ │
│ │ 3. Load: PostgresRepository.upsert_batch() │ │
│ │ → Upsert to PostgreSQL (ON CONFLICT DO UPDATE) │ │
│ │ → Batch size: SYNC_BATCH_SIZE rows │ │
│ │ │ │
│ │ 4. Log: SyncLogRepository.create() │ │
│ │ → Record sync timestamp, row count, duration │ │
│ └────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────┘Scheduler Configuration
QuperSync uses APScheduler with the AsyncIO scheduler backend, which integrates natively with FastAPI's async event loop:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import pytz
scheduler = AsyncIOScheduler(timezone=pytz.timezone("US/Pacific"))
# Register sync job
scheduler.add_job(
func=sync_orchestrator.run_all_syncs,
trigger=IntervalTrigger(seconds=SYNC_INTERVAL_SECONDS),
id="qupersync_main",
name="Redshift to PostgreSQL Sync",
replace_existing=True,
max_instances=1, # Prevent concurrent runs
misfire_grace_time=60 # Allow 60s grace if job misfires
)
# Start with FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.start()
yield
scheduler.shutdown(wait=False)Extract Phase
The extract phase runs SQL queries against Redshift system views. The RedshiftAdapter manages connection pooling and executes queries synchronously using redshift-connector, wrapped in anyio.to_thread.run_sync() to avoid blocking the async loop:
import redshift_connector
import anyio
class RedshiftAdapter:
def __init__(self, config: RedshiftConfig):
self.config = config
async def execute_query(self, sql: str) -> list[dict]:
# Run synchronous Redshift query in thread pool
return await anyio.to_thread.run_sync(
lambda: self._execute_sync(sql)
)
def _execute_sync(self, sql: str) -> list[dict]:
conn = redshift_connector.connect(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.user,
password=self.config.password,
)
cursor = conn.cursor()
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
conn.close()
return [dict(zip(columns, row)) for row in rows]What Data Gets Synced
QuperSync maintains a registry of sync jobs, each targeting a specific Redshift system view and mapping to a PostgreSQL table:
| Sync Job | Source (Redshift) | Target (PostgreSQL) |
|---|---|---|
| Table Stats Sync | svv_table_info | quper.table_stats |
| Vacuum History Sync | stl_vacuum | quper.vacuum_history |
| Query History Sync | sys_query_history (7d) | quper.query_history |
| Alert Events Sync | stl_alert_event_log (7d) | quper.alert_events |
| Node Details Sync | stv_node_storage_capacity | quper.node_details |
| Column Stats Sync | svv_columns + pg_attribute | quper.column_stats |
| Scan Stats Sync | stl_scan | quper.table_scan_stats |
| Application Sync | svl_user_activity | quper.applications |
Transform Phase
The transform phase converts raw Redshift query results into typed domain entities:
class DataTransformer:
def transform(
self,
rows: list[dict],
entity_class: type[BaseEntity]
) -> list[BaseEntity]:
entities = []
for row in rows:
try:
# Coerce types, handle nulls, rename fields
cleaned = {
field: self._coerce(value, field_type)
for field, (value, field_type)
in zip(row.items(), entity_class.field_types().items())
}
entities.append(entity_class(**cleaned))
except (ValueError, TypeError) as e:
# Log and skip bad rows, don't fail entire batch
logger.warning(f"Row transform failed: {e}, row={row}")
continue
return entitiesLoad Phase — Upsert Strategy
All data is written using PostgreSQL upserts (INSERT ... ON CONFLICT DO UPDATE). This ensures idempotency — running the sync multiple times produces the same result:
from sqlalchemy.dialects.postgresql import insert
async def upsert_batch(
self,
entities: list[BaseEntity],
session: AsyncSession
) -> int:
if not entities:
return 0
# Convert entities to dicts for bulk insert
rows = [entity.to_dict() for entity in entities]
# Batch by SYNC_BATCH_SIZE to avoid memory issues
total_upserted = 0
for i in range(0, len(rows), SYNC_BATCH_SIZE):
batch = rows[i:i + SYNC_BATCH_SIZE]
stmt = insert(self.model).values(batch)
stmt = stmt.on_conflict_do_update(
index_elements=self.unique_keys, # e.g., ["table_name", "schema"]
set_={
col: stmt.excluded[col]
for col in self.update_columns # All non-key columns
}
)
result = await session.execute(stmt)
total_upserted += result.rowcount
await session.commit()
return total_upsertedSync Log Recording
Every sync run is logged to quper.sync_logs for observability and debugging:
# Sync log entry schema
{
"sync_job_id": "table_stats_sync",
"started_at": "2024-01-15T10:00:00Z",
"completed_at": "2024-01-15T10:00:04.231Z",
"duration_ms": 4231,
"rows_extracted": 1247,
"rows_upserted": 1247,
"rows_skipped": 0,
"status": "success",
"error": null
}Error Recovery
QuperSync is designed for resilience:
- Row-level fault tolerance — Transformation errors on individual rows are logged and skipped; the batch continues
- Job isolation — Each sync job runs independently; a failure in one job doesn't affect others
- Automatic retry — APScheduler reruns missed jobs within the
misfire_grace_timewindow - Connection pooling — SQLAlchemy connection pool handles transient database disconnections
- Transaction rollback — Failed batches are rolled back; previous successful batches are preserved
Large Table Handling
LIMIT/OFFSET rather than loading all rows at once. The SYNC_BATCH_SIZE environment variable controls the page size (default: 1000 rows).