QuperSyncSync Engine

Data Pipeline

The QuperSync data pipeline moves records from Amazon Redshift to PostgreSQL in six sequential steps. Each step has a clearly defined input, output, and failure mode. The pipeline is designed to be resumable — a failure at any step does not corrupt previously synced data.

Pipeline Steps

1

Connection

redshift-connector establishes a connection to the Redshift cluster using credentials from environment variables. Connection parameters are loaded at job startup and pooled per sync job to avoid re-connecting on every batch.

Env vars: REDSHIFT_HOST, REDSHIFT_DB, REDSHIFT_USER, REDSHIFT_PASSWORD, REDSHIFT_PORT

2

Extraction

The extraction query is executed on Redshift. For incremental syncs, the WHERE clause filters by the watermark column:

SELECT * FROM source_table WHERE updated_at > '2024-03-01 00:00:00'

Results are streamed in batches using cursor.fetchmany(batch_size)(default 1,000 rows per batch) to bound memory usage.

3

Type Mapping

Redshift column types are converted to Python-native types before entity creation:

Redshift TypePython Type
VARCHAR / TEXTstr
INTEGER / BIGINTint
NUMERIC / DECIMALfloat
BOOLEANbool
TIMESTAMP / DATEdatetime
NULL (any type)None
4

Entity Creation

Each mapped row dict is passed to the domain Entity class constructor. The entity validates required fields (raises ValidationError for missing required columns) and applies business rules (NULL coercion, value normalization). Invalid rows are logged and skipped — they do not abort the batch.

5

Repository Upsert

The entity is passed to the PostgreSQL repository's upsert_batch() method, which generates and executes:

INSERT INTO target (pk, col1, col2) VALUES (%s, %s, %s), ... ON CONFLICT (pk) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2

The entire batch is wrapped in a single transaction. If any row in the batch fails, the whole batch is rolled back and retried.

6

Watermark Update

After all batches complete successfully, the WatermarkService updates the last_sync_time in the qupersync_state table. The next scheduled sync run will use this value as the starting point for incremental extraction. If any batch failed, the watermark is NOT updated — the next run will re-process from the previous watermark.

Pipeline Resumability

The watermark is only advanced after a fully successful sync. If a run fails partway through, the next run starts from the same watermark — it will re-process some rows that were already synced, but the upsert pattern makes this harmless. No data is lost or duplicated.