QuperSyncWorkflowAPScheduler

QuperSync Workflow

This page documents the complete data synchronization lifecycle in QuperSync — from scheduler trigger to confirmed PostgreSQL write.

High-Level Flow

Sync Pipeline
┌────────────────────────────────────────────────────────────┐
│                    APScheduler (AsyncIO)                     │
│                                                              │
│  Interval Job: every 300s (configurable via SYNC_INTERVAL)  │
│       ↓                                                      │
│  SyncOrchestrator.run_all_syncs()                           │
│       ↓                                                      │
│  For each sync_job in registry:                             │
│    ┌────────────────────────────────────────────────────┐   │
│    │ 1. Extract: RedshiftAdapter.execute_query()        │   │
│    │    → SQL query against Redshift system views       │   │
│    │    → Returns List[Dict] rows                       │   │
│    │                                                    │   │
│    │ 2. Transform: DataTransformer.transform()          │   │
│    │    → Type coercion, null handling, renaming        │   │
│    │    → Returns List[Entity]                          │   │
│    │                                                    │   │
│    │ 3. Load: PostgresRepository.upsert_batch()         │   │
│    │    → Upsert to PostgreSQL (ON CONFLICT DO UPDATE)  │   │
│    │    → Batch size: SYNC_BATCH_SIZE rows              │   │
│    │                                                    │   │
│    │ 4. Log: SyncLogRepository.create()                 │   │
│    │    → Record sync timestamp, row count, duration    │   │
│    └────────────────────────────────────────────────────┘   │
└────────────────────────────────────────────────────────────┘

Scheduler Configuration

QuperSync uses APScheduler with the AsyncIO scheduler backend, which integrates natively with FastAPI's async event loop:

Scheduler Setup
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
import pytz

scheduler = AsyncIOScheduler(timezone=pytz.timezone("US/Pacific"))

# Register sync job
scheduler.add_job(
    func=sync_orchestrator.run_all_syncs,
    trigger=IntervalTrigger(seconds=SYNC_INTERVAL_SECONDS),
    id="qupersync_main",
    name="Redshift to PostgreSQL Sync",
    replace_existing=True,
    max_instances=1,      # Prevent concurrent runs
    misfire_grace_time=60 # Allow 60s grace if job misfires
)

# Start with FastAPI lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.start()
    yield
    scheduler.shutdown(wait=False)

Extract Phase

The extract phase runs SQL queries against Redshift system views. The RedshiftAdapter manages connection pooling and executes queries synchronously using redshift-connector, wrapped in anyio.to_thread.run_sync() to avoid blocking the async loop:

Redshift Extraction
import redshift_connector
import anyio

class RedshiftAdapter:
    def __init__(self, config: RedshiftConfig):
        self.config = config

    async def execute_query(self, sql: str) -> list[dict]:
        # Run synchronous Redshift query in thread pool
        return await anyio.to_thread.run_sync(
            lambda: self._execute_sync(sql)
        )

    def _execute_sync(self, sql: str) -> list[dict]:
        conn = redshift_connector.connect(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            user=self.config.user,
            password=self.config.password,
        )
        cursor = conn.cursor()
        cursor.execute(sql)
        columns = [desc[0] for desc in cursor.description]
        rows = cursor.fetchall()
        conn.close()
        return [dict(zip(columns, row)) for row in rows]

What Data Gets Synced

QuperSync maintains a registry of sync jobs, each targeting a specific Redshift system view and mapping to a PostgreSQL table:

Sync JobSource (Redshift)Target (PostgreSQL)
Table Stats Syncsvv_table_infoquper.table_stats
Vacuum History Syncstl_vacuumquper.vacuum_history
Query History Syncsys_query_history (7d)quper.query_history
Alert Events Syncstl_alert_event_log (7d)quper.alert_events
Node Details Syncstv_node_storage_capacityquper.node_details
Column Stats Syncsvv_columns + pg_attributequper.column_stats
Scan Stats Syncstl_scanquper.table_scan_stats
Application Syncsvl_user_activityquper.applications

Transform Phase

The transform phase converts raw Redshift query results into typed domain entities:

Data Transformation
class DataTransformer:
    def transform(
        self,
        rows: list[dict],
        entity_class: type[BaseEntity]
    ) -> list[BaseEntity]:
        entities = []
        for row in rows:
            try:
                # Coerce types, handle nulls, rename fields
                cleaned = {
                    field: self._coerce(value, field_type)
                    for field, (value, field_type)
                    in zip(row.items(), entity_class.field_types().items())
                }
                entities.append(entity_class(**cleaned))
            except (ValueError, TypeError) as e:
                # Log and skip bad rows, don't fail entire batch
                logger.warning(f"Row transform failed: {e}, row={row}")
                continue
        return entities

Load Phase — Upsert Strategy

All data is written using PostgreSQL upserts (INSERT ... ON CONFLICT DO UPDATE). This ensures idempotency — running the sync multiple times produces the same result:

Upsert Pattern
from sqlalchemy.dialects.postgresql import insert

async def upsert_batch(
    self,
    entities: list[BaseEntity],
    session: AsyncSession
) -> int:
    if not entities:
        return 0

    # Convert entities to dicts for bulk insert
    rows = [entity.to_dict() for entity in entities]

    # Batch by SYNC_BATCH_SIZE to avoid memory issues
    total_upserted = 0
    for i in range(0, len(rows), SYNC_BATCH_SIZE):
        batch = rows[i:i + SYNC_BATCH_SIZE]
        stmt = insert(self.model).values(batch)
        stmt = stmt.on_conflict_do_update(
            index_elements=self.unique_keys,    # e.g., ["table_name", "schema"]
            set_={
                col: stmt.excluded[col]
                for col in self.update_columns  # All non-key columns
            }
        )
        result = await session.execute(stmt)
        total_upserted += result.rowcount

    await session.commit()
    return total_upserted

Sync Log Recording

Every sync run is logged to quper.sync_logs for observability and debugging:

python
# Sync log entry schema
{
    "sync_job_id": "table_stats_sync",
    "started_at": "2024-01-15T10:00:00Z",
    "completed_at": "2024-01-15T10:00:04.231Z",
    "duration_ms": 4231,
    "rows_extracted": 1247,
    "rows_upserted": 1247,
    "rows_skipped": 0,
    "status": "success",
    "error": null
}

Error Recovery

QuperSync is designed for resilience:

  • Row-level fault tolerance — Transformation errors on individual rows are logged and skipped; the batch continues
  • Job isolation — Each sync job runs independently; a failure in one job doesn't affect others
  • Automatic retry — APScheduler reruns missed jobs within the misfire_grace_time window
  • Connection pooling — SQLAlchemy connection pool handles transient database disconnections
  • Transaction rollback — Failed batches are rolled back; previous successful batches are preserved

Large Table Handling

For tables with millions of rows, the sync uses cursor-based pagination with LIMIT/OFFSET rather than loading all rows at once. The SYNC_BATCH_SIZE environment variable controls the page size (default: 1000 rows).