Infrastructure Layer
The infrastructure layer provides concrete implementations of the repository interfaces defined in the domain layer. It handles all database interactions, connection management, and external system integrations.
Database Adapters
Redshift Adapter
The RedshiftAdapter connects to Amazon Redshift using the official redshift-connector library. Since this library is synchronous, all queries are executed in a thread pool via anyio.to_thread.run_sync():
import redshift_connector
import anyio
from typing import Any
class RedshiftAdapter:
def __init__(self, config: RedshiftConfig):
self.config = config
async def execute_query(self, sql: str) -> list[dict[str, Any]]:
"""Execute a SQL query and return results as list of dicts."""
return await anyio.to_thread.run_sync(
lambda: self._execute_sync(sql)
)
def _execute_sync(self, sql: str) -> list[dict]:
conn = None
try:
conn = redshift_connector.connect(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user=self.config.user,
password=self.config.password,
ssl=True,
timeout=30
)
cursor = conn.cursor()
cursor.execute(sql)
if cursor.description is None:
return []
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]
finally:
if conn:
conn.close()PostgreSQL Repository (Target)
The PostgreSQL repository uses SQLAlchemy async for all write operations. The asyncpg driver provides high-throughput async inserts:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.dialects.postgresql import insert
class PostgresRepository:
def __init__(self, connection_url: str):
# asyncpg driver for async PostgreSQL
self.engine = create_async_engine(
connection_url.replace("postgresql://", "postgresql+asyncpg://"),
pool_size=10,
max_overflow=20,
pool_pre_ping=True # Validate connections before use
)
async def upsert_batch(
self,
model, # SQLAlchemy model class
rows: list[dict],
unique_keys: list[str],
batch_size: int = 1000
) -> int:
total = 0
async with AsyncSession(self.engine) as session:
for i in range(0, len(rows), batch_size):
batch = rows[i:i + batch_size]
stmt = insert(model).values(batch)
stmt = stmt.on_conflict_do_update(
index_elements=unique_keys,
set_={col: stmt.excluded[col]
for col in rows[0].keys()
if col not in unique_keys}
)
result = await session.execute(stmt)
total += result.rowcount
await session.commit()
return totalSQLAlchemy Models
PostgreSQL schema models define the target table structure. All tables are in the quper schema:
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TableStatsModel(Base):
__tablename__ = "table_stats"
__table_args__ = {"schema": "quper"}
id = Column(Integer, primary_key=True, autoincrement=True)
schema = Column(String, nullable=False)
table_name = Column(String, nullable=False)
table_id = Column(Integer, nullable=False)
tbl_rows = Column(Integer)
size_mb = Column(Float)
unsorted_pct = Column(Float)
pct_skew_across_slices = Column(Float)
diststyle = Column(String)
sort_key1 = Column(String)
encoded = Column(String)
pct_rows_marked_for_deletion = Column(Float)
last_vacuum_date = Column(DateTime)
synced_at = Column(DateTime)
__table_args__ = (
# Unique constraint for upsert conflict resolution
UniqueConstraint("schema", "table_name", name="uq_table_stats"),
{"schema": "quper"}
)Alembic Migrations
Database schema changes are managed via Alembic migrations. The migration history is tracked in the alembic/versions/ directory:
# Apply all pending migrations
alembic upgrade head
# Create a new migration (after model changes)
alembic revision --autogenerate -m "add_query_history_table"
# Check current revision
alembic current
# Downgrade one revision
alembic downgrade -1Connection Configuration
from pydantic import BaseSettings
class RedshiftConfig(BaseSettings):
host: str
port: int = 5439
database: str
user: str
password: str
class Config:
env_prefix = "REDSHIFT_"
class PostgresConfig(BaseSettings):
host: str = "localhost"
port: int = 5432
database: str = "quper"
user: str
password: str
@property
def connection_url(self) -> str:
return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
class Config:
env_prefix = "POSTGRES_"Connection Pooling
pool_size=10 base connections and max_overflow=20 additional connections during peak load. pool_pre_ping=True validates connections before use to handle network interruptions gracefully.