Data Pipeline
The QuperSync data pipeline moves records from Amazon Redshift to PostgreSQL in six sequential steps. Each step has a clearly defined input, output, and failure mode. The pipeline is designed to be resumable — a failure at any step does not corrupt previously synced data.
Pipeline Steps
Connection
redshift-connector establishes a connection to the Redshift cluster using credentials from environment variables. Connection parameters are loaded at job startup and pooled per sync job to avoid re-connecting on every batch.
Env vars: REDSHIFT_HOST, REDSHIFT_DB, REDSHIFT_USER, REDSHIFT_PASSWORD, REDSHIFT_PORT
Extraction
The extraction query is executed on Redshift. For incremental syncs, the WHERE clause filters by the watermark column:
SELECT * FROM source_table WHERE updated_at > '2024-03-01 00:00:00'
Results are streamed in batches using cursor.fetchmany(batch_size)(default 1,000 rows per batch) to bound memory usage.
Type Mapping
Redshift column types are converted to Python-native types before entity creation:
| Redshift Type | Python Type |
|---|---|
| VARCHAR / TEXT | str |
| INTEGER / BIGINT | int |
| NUMERIC / DECIMAL | float |
| BOOLEAN | bool |
| TIMESTAMP / DATE | datetime |
| NULL (any type) | None |
Entity Creation
Each mapped row dict is passed to the domain Entity class constructor. The entity validates required fields (raises ValidationError for missing required columns) and applies business rules (NULL coercion, value normalization). Invalid rows are logged and skipped — they do not abort the batch.
Repository Upsert
The entity is passed to the PostgreSQL repository's upsert_batch() method, which generates and executes:
INSERT INTO target (pk, col1, col2) VALUES (%s, %s, %s), ... ON CONFLICT (pk) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
The entire batch is wrapped in a single transaction. If any row in the batch fails, the whole batch is rolled back and retried.
Watermark Update
After all batches complete successfully, the WatermarkService updates the last_sync_time in the qupersync_state table. The next scheduled sync run will use this value as the starting point for incremental extraction. If any batch failed, the watermark is NOT updated — the next run will re-process from the previous watermark.
Pipeline Resumability