Vidura Workflow
This page describes the complete request lifecycle for a query submitted to Vidura — from user input to structured analytics response.
Step-by-Step Flow
Step 1 — API Request
The Vidura web interface sends a POST /ask request with the user's natural language prompt:
POST /ask
{
"prompt": "Which tables need vacuuming most urgently?"
}Step 2 — LangChain Agent Invocation
FastAPI calls ask_agent(prompt) which invokes the LangChain agent with the full conversation context:
async def ask_agent(prompt: str) -> str:
result = agent.invoke({
"messages": [
{"role": "user", "content": prompt}
]
})
# Extract final text response from agent output
return result["messages"][-1].contentStep 3 — Tool Selection
The GPT-4o-mini model analyzes the prompt and determines which tools to call. For the vacuum query above, it would select:
- vacuum_stats() — Get top tables by unsorted percentage and reclaimable space
- table_info() — Enrich results with row counts and last vacuum timestamps
Tool Chaining
vacuum_stats(), it might call table_storage_summary() to add storage context.Step 4 — Tool Execution
Each tool executes its SQL query against Redshift and returns formatted results:
def vacuum_stats() -> str:
conn = get_redshift_connection()
cur = conn.cursor()
cur.execute("""
SELECT
t.schema, t.table,
t.unsorted_pct,
v.reclaimable_rows,
v.reclaimable_space_gb,
v.last_vacuum_date
FROM svv_table_info t
LEFT JOIN (
SELECT tbl, reclaimable_rows,
impacted_blocks / 1024.0 AS reclaimable_space_gb,
MAX(endtime)::DATE AS last_vacuum_date
FROM stl_vacuum
GROUP BY tbl, reclaimable_rows, impacted_blocks
) v ON t.table_id = v.tbl
WHERE t.unsorted_pct > 5
ORDER BY t.unsorted_pct DESC
LIMIT 5
""")
rows = cur.fetchall()
return format_as_markdown_table(rows, cur.description)Step 5 — Response Synthesis
The agent synthesizes tool outputs into a structured markdown response following the FinOps persona guidelines:
## 🔧 Vacuum Priority Analysis
Based on current cluster state, here are the tables requiring immediate vacuum attention:
### Tables by Urgency Score
| Table | Unsorted % | Est. Reclaimable | Last Vacuum |
|-------|-----------|------------------|-------------|
| public.orders | 42.3% | 18.4 GB | 14 days ago |
| analytics.events | 38.1% | 12.2 GB | 21 days ago |
| reporting.sessions | 27.9% | 8.7 GB | 9 days ago |
### 🎯 Recommended Actions
**Immediate (Today):**
1. `VACUUM public.orders BOOST;` — 42% unsorted, 18GB recoverable
- Estimated duration: ~45 min
- Scheduling recommendation: Weekday off-peak (2–5 AM UTC)
**This Week:**
2. `VACUUM analytics.events;` — 38% unsorted, 12GB recoverableStep 6 — Response Delivery
The FastAPI handler returns the markdown string as JSON, which the web frontend renders using react-markdown with GitHub Flavored Markdown support.
Agent Decision Flow
User Prompt: "Which tables need vacuuming most urgently?"
[Reasoning]
→ Vacuum urgency depends on: unsorted %, reclaimable space, last vacuum date
→ I should call vacuum_stats() first for a ranked list
[Action]
→ Call: vacuum_stats()
[Observation]
→ Got top 5 tables with unsorted_pct, reclaimable_space_gb, last_vacuum_date
[Reasoning]
→ I have enough data to give a prioritized recommendation
→ I should also check if any have active vacuum jobs running
[Action]
→ Call: long_running_queries()
→ Filter for queries matching VACUUM pattern
[Observation]
→ No active vacuum jobs found
[Final Response]
→ Synthesize both results into actionable markdown responseQuery Parameter Substitution
CID Athena queries use Python's string.Template for safe parameter substitution:
from string import Template
# SQL template with ${variable} syntax (Python's string.Template format)
sql_template = Template("""
SELECT line_item_usage_account_id,
SUM(line_item_blended_cost) AS total_cost
FROM ${cur2_database}.${cur2_table_name}
WHERE line_item_usage_start_date
BETWEEN DATE '${start_date}' AND DATE '${end_date}'
GROUP BY 1
ORDER BY 2 DESC
""")
# Safe substitution (unknown keys left as-is, no KeyError)
params = {
'cur2_database': 'cid_cur',
'cur2_table_name': 'cur2',
'start_date': '2024-01-01',
'end_date': '2024-01-31',
}
final_sql = sql_template.safe_substitute(params)Error Handling
Each tool wraps its execution in try/except and returns a structured error string instead of raising, so the agent can handle failures gracefully:
def some_tool(param: str) -> str:
try:
conn = get_redshift_connection()
# ... execute query
return format_result(rows)
except psycopg2.OperationalError as e:
return f"❌ Database connection error: {str(e)}"
except Exception as e:
return f"❌ Unexpected error: {str(e)}"