QuperSyncSync Engine

Scheduling

QuperSync uses APScheduler with an AsyncIOScheduler backend to manage sync job timing. The scheduler runs in the same async event loop as the FastAPI server, so IO-bound sync operations (database queries) yield correctly and do not block API request handling.

Scheduler Configuration

ParameterValueDescription
scheduler_typeAsyncIOSchedulerNon-blocking scheduler that integrates with Python asyncio event loop
default_interval15 minutes (configurable)Default sync frequency for all registered jobs
misfire_grace_time60 secondsIf a job is overdue by more than 60 seconds, it is skipped for that cycle rather than running immediately
max_instances1 per jobPrevents concurrent runs of the same sync job. If a run is still in progress when the next trigger fires, the new trigger is skipped.
job_storememoryJobs are stored in memory only. They are re-registered on each server restart from the job definitions in code.

Scheduler Initialization

scheduler/setup.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from interface.scheduler.jobs import sync_cost_usage, sync_redshift_tables

scheduler = AsyncIOScheduler()

def setup_scheduler():
    scheduler.add_job(
        func=sync_cost_usage,
        trigger=IntervalTrigger(minutes=15),
        id="sync_cost_usage",
        name="Sync Cost Usage Records",
        max_instances=1,
        misfire_grace_time=60,
        replace_existing=True,
    )

    scheduler.add_job(
        func=sync_redshift_tables,
        trigger=IntervalTrigger(minutes=30),
        id="sync_redshift_tables",
        name="Sync Redshift Table Metadata",
        max_instances=1,
        misfire_grace_time=60,
        replace_existing=True,
    )

    scheduler.start()

Scheduler Management API

The interface layer exposes REST endpoints for runtime job management:

MethodEndpointDescription
GET/scheduler/jobsList all registered jobs with their next scheduled run time and status
POST/scheduler/jobs/{job_id}/pausePause a job — it will not run on its schedule until resumed
POST/scheduler/jobs/{job_id}/resumeResume a previously paused job
POST/scheduler/jobs/{job_id}/triggerExecute the job immediately, outside of its normal schedule (useful for manual syncs)

Async Event Loop

All scheduled jobs run in the same async event loop as the FastAPI server. IO-bound operations (Redshift queries, PostgreSQL upserts) use async-compatible drivers where available, or are wrapped with asyncio.to_thread() for synchronous drivers. This ensures database waits do not block API request handling.

Job State Persistence

Job state (last run time, next run time, paused/active status) is stored in memory only — it is lost on server restart. The sync watermark (which determines how far back to sync from) is persisted in the PostgreSQL qupersync_state table and survives restarts. This means after a restart, jobs pick up from where they left off based on the watermark, not from the beginning of time.