ViduraWorkflowLangChain

Vidura Workflow

This page describes the complete request lifecycle for a query submitted to Vidura — from user input to structured analytics response.

Step-by-Step Flow

Step 1 — API Request

The Vidura web interface sends a POST /ask request with the user's natural language prompt:

json
POST /ask
{
  "prompt": "Which tables need vacuuming most urgently?"
}

Step 2 — LangChain Agent Invocation

FastAPI calls ask_agent(prompt) which invokes the LangChain agent with the full conversation context:

python
async def ask_agent(prompt: str) -> str:
    result = agent.invoke({
        "messages": [
            {"role": "user", "content": prompt}
        ]
    })
    # Extract final text response from agent output
    return result["messages"][-1].content

Step 3 — Tool Selection

The GPT-4o-mini model analyzes the prompt and determines which tools to call. For the vacuum query above, it would select:

  1. vacuum_stats() — Get top tables by unsorted percentage and reclaimable space
  2. table_info() — Enrich results with row counts and last vacuum timestamps

Tool Chaining

The LangChain agent can call multiple tools in sequence, using the output of one tool to inform the next query. For example, after getting the top tables from vacuum_stats(), it might call table_storage_summary() to add storage context.

Step 4 — Tool Execution

Each tool executes its SQL query against Redshift and returns formatted results:

vacuum_stats() Execution
def vacuum_stats() -> str:
    conn = get_redshift_connection()
    cur = conn.cursor()
    cur.execute("""
        SELECT
            t.schema, t.table,
            t.unsorted_pct,
            v.reclaimable_rows,
            v.reclaimable_space_gb,
            v.last_vacuum_date
        FROM svv_table_info t
        LEFT JOIN (
            SELECT tbl, reclaimable_rows,
                   impacted_blocks / 1024.0 AS reclaimable_space_gb,
                   MAX(endtime)::DATE AS last_vacuum_date
            FROM stl_vacuum
            GROUP BY tbl, reclaimable_rows, impacted_blocks
        ) v ON t.table_id = v.tbl
        WHERE t.unsorted_pct > 5
        ORDER BY t.unsorted_pct DESC
        LIMIT 5
    """)
    rows = cur.fetchall()
    return format_as_markdown_table(rows, cur.description)

Step 5 — Response Synthesis

The agent synthesizes tool outputs into a structured markdown response following the FinOps persona guidelines:

Example Agent Response
## 🔧 Vacuum Priority Analysis

Based on current cluster state, here are the tables requiring immediate vacuum attention:

### Tables by Urgency Score

| Table | Unsorted % | Est. Reclaimable | Last Vacuum |
|-------|-----------|------------------|-------------|
| public.orders | 42.3% | 18.4 GB | 14 days ago |
| analytics.events | 38.1% | 12.2 GB | 21 days ago |
| reporting.sessions | 27.9% | 8.7 GB | 9 days ago |

### 🎯 Recommended Actions

**Immediate (Today):**
1. `VACUUM public.orders BOOST;` — 42% unsorted, 18GB recoverable
   - Estimated duration: ~45 min
   - Scheduling recommendation: Weekday off-peak (2–5 AM UTC)

**This Week:**
2. `VACUUM analytics.events;` — 38% unsorted, 12GB recoverable

Step 6 — Response Delivery

The FastAPI handler returns the markdown string as JSON, which the web frontend renders using react-markdown with GitHub Flavored Markdown support.

Agent Decision Flow

ReAct Loop (Reason + Act)
User Prompt: "Which tables need vacuuming most urgently?"

[Reasoning]
  → Vacuum urgency depends on: unsorted %, reclaimable space, last vacuum date
  → I should call vacuum_stats() first for a ranked list

[Action]
  → Call: vacuum_stats()

[Observation]
  → Got top 5 tables with unsorted_pct, reclaimable_space_gb, last_vacuum_date

[Reasoning]
  → I have enough data to give a prioritized recommendation
  → I should also check if any have active vacuum jobs running

[Action]
  → Call: long_running_queries()
  → Filter for queries matching VACUUM pattern

[Observation]
  → No active vacuum jobs found

[Final Response]
  → Synthesize both results into actionable markdown response

Query Parameter Substitution

CID Athena queries use Python's string.Template for safe parameter substitution:

SQL Template Substitution
from string import Template

# SQL template with ${variable} syntax (Python's string.Template format)
sql_template = Template("""
    SELECT line_item_usage_account_id,
           SUM(line_item_blended_cost) AS total_cost
    FROM ${cur2_database}.${cur2_table_name}
    WHERE line_item_usage_start_date
          BETWEEN DATE '${start_date}' AND DATE '${end_date}'
    GROUP BY 1
    ORDER BY 2 DESC
""")

# Safe substitution (unknown keys left as-is, no KeyError)
params = {
    'cur2_database': 'cid_cur',
    'cur2_table_name': 'cur2',
    'start_date': '2024-01-01',
    'end_date': '2024-01-31',
}
final_sql = sql_template.safe_substitute(params)

Error Handling

Each tool wraps its execution in try/except and returns a structured error string instead of raising, so the agent can handle failures gracefully:

python
def some_tool(param: str) -> str:
    try:
        conn = get_redshift_connection()
        # ... execute query
        return format_result(rows)
    except psycopg2.OperationalError as e:
        return f"❌ Database connection error: {str(e)}"
    except Exception as e:
        return f"❌ Unexpected error: {str(e)}"