QuperSyncInfrastructureDatabase Adapters

Infrastructure Layer

The infrastructure layer provides concrete implementations of the repository interfaces defined in the domain layer. It handles all database interactions, connection management, and external system integrations.

Database Adapters

Redshift Adapter

The RedshiftAdapter connects to Amazon Redshift using the official redshift-connector library. Since this library is synchronous, all queries are executed in a thread pool via anyio.to_thread.run_sync():

infrastructure/redshift/adapter.py
import redshift_connector
import anyio
from typing import Any

class RedshiftAdapter:
    def __init__(self, config: RedshiftConfig):
        self.config = config

    async def execute_query(self, sql: str) -> list[dict[str, Any]]:
        """Execute a SQL query and return results as list of dicts."""
        return await anyio.to_thread.run_sync(
            lambda: self._execute_sync(sql)
        )

    def _execute_sync(self, sql: str) -> list[dict]:
        conn = None
        try:
            conn = redshift_connector.connect(
                host=self.config.host,
                port=self.config.port,
                database=self.config.database,
                user=self.config.user,
                password=self.config.password,
                ssl=True,
                timeout=30
            )
            cursor = conn.cursor()
            cursor.execute(sql)
            if cursor.description is None:
                return []
            columns = [desc[0] for desc in cursor.description]
            rows = cursor.fetchall()
            return [dict(zip(columns, row)) for row in rows]
        finally:
            if conn:
                conn.close()

PostgreSQL Repository (Target)

The PostgreSQL repository uses SQLAlchemy async for all write operations. The asyncpg driver provides high-throughput async inserts:

infrastructure/postgres/repository.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.dialects.postgresql import insert

class PostgresRepository:
    def __init__(self, connection_url: str):
        # asyncpg driver for async PostgreSQL
        self.engine = create_async_engine(
            connection_url.replace("postgresql://", "postgresql+asyncpg://"),
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True   # Validate connections before use
        )

    async def upsert_batch(
        self,
        model,              # SQLAlchemy model class
        rows: list[dict],
        unique_keys: list[str],
        batch_size: int = 1000
    ) -> int:
        total = 0
        async with AsyncSession(self.engine) as session:
            for i in range(0, len(rows), batch_size):
                batch = rows[i:i + batch_size]
                stmt = insert(model).values(batch)
                stmt = stmt.on_conflict_do_update(
                    index_elements=unique_keys,
                    set_={col: stmt.excluded[col]
                          for col in rows[0].keys()
                          if col not in unique_keys}
                )
                result = await session.execute(stmt)
                total += result.rowcount
            await session.commit()
        return total

SQLAlchemy Models

PostgreSQL schema models define the target table structure. All tables are in the quper schema:

infrastructure/postgres/models.py
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class TableStatsModel(Base):
    __tablename__ = "table_stats"
    __table_args__ = {"schema": "quper"}

    id = Column(Integer, primary_key=True, autoincrement=True)
    schema = Column(String, nullable=False)
    table_name = Column(String, nullable=False)
    table_id = Column(Integer, nullable=False)
    tbl_rows = Column(Integer)
    size_mb = Column(Float)
    unsorted_pct = Column(Float)
    pct_skew_across_slices = Column(Float)
    diststyle = Column(String)
    sort_key1 = Column(String)
    encoded = Column(String)
    pct_rows_marked_for_deletion = Column(Float)
    last_vacuum_date = Column(DateTime)
    synced_at = Column(DateTime)

    __table_args__ = (
        # Unique constraint for upsert conflict resolution
        UniqueConstraint("schema", "table_name", name="uq_table_stats"),
        {"schema": "quper"}
    )

Alembic Migrations

Database schema changes are managed via Alembic migrations. The migration history is tracked in the alembic/versions/ directory:

Run Migrations
# Apply all pending migrations
alembic upgrade head

# Create a new migration (after model changes)
alembic revision --autogenerate -m "add_query_history_table"

# Check current revision
alembic current

# Downgrade one revision
alembic downgrade -1

Connection Configuration

infrastructure/config.py
from pydantic import BaseSettings

class RedshiftConfig(BaseSettings):
    host: str
    port: int = 5439
    database: str
    user: str
    password: str

    class Config:
        env_prefix = "REDSHIFT_"

class PostgresConfig(BaseSettings):
    host: str = "localhost"
    port: int = 5432
    database: str = "quper"
    user: str
    password: str

    @property
    def connection_url(self) -> str:
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"

    class Config:
        env_prefix = "POSTGRES_"

Connection Pooling

The SQLAlchemy async engine maintains a connection pool with pool_size=10 base connections and max_overflow=20 additional connections during peak load. pool_pre_ping=True validates connections before use to handle network interruptions gracefully.