{
 "nbformat": 4,
 "nbformat_minor": 5,
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.11.0"
  }
 },
 "cells": [
  {
   "cell_type": "markdown",
   "id": "cell-0",
   "metadata": {},
   "source": [
    "# C2 - Custom Tools: Building a Multi-Tool Data Platform Agent\n",
    "\n",
    "Companion notebook for article **C2** in *Building with Claude - A Practitioner's Guide to the Anthropic API*.\n",
    "\n",
    "**Attribution.** Concepts adapted from Anthropic's \"Building with the Claude API\" course (Coursera) and public API documentation at [docs.anthropic.com](https://docs.anthropic.com). All code below is original work (c) 2026 DataMy. Not affiliated with Anthropic.\n",
    "\n",
    "---\n",
    "\n",
    "## What you'll build in this notebook\n",
    "\n",
    "A four-tool data platform monitoring agent over two datasets (`warehouse_usage.csv` and `pipeline_jobs.csv`):\n",
    "\n",
    "1. **Tool suite definition** -- four tools with precise descriptions; anatomy of a well-designed tool schema.\n",
    "2. **Parallel tool calls** -- demonstrate Claude emitting two `tool_use` blocks in one response; verify the loop handles both.\n",
    "3. **Tool chaining** -- a multi-step diagnostic session where each tool call informs the next.\n",
    "4. **Side-effect tool** -- `send_alert` writes to a session log; inspect the alert log at session end.\n",
    "5. **Full monitoring agent** -- end-to-end: detect issue, diagnose, alert, report.\n",
    "\n",
    "**Prerequisites:**\n",
    "- `pip install -r ../requirements.txt`\n",
    "- A `.env` file with `ANTHROPIC_API_KEY` set\n",
    "- Datasets: `python ../scripts/generate_data.py` (creates `warehouse_usage.csv` and `pipeline_jobs.csv`)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-1",
   "metadata": {},
   "source": [
    "## Section 1 - Setup\n",
    "\n",
    "Load both datasets and define the data access helpers that back the four tools.\n",
    "The tools are pure Python functions over pandas DataFrames -- no external API calls required.\n",
    "The formatted string output is deliberate: contextualised output (baselines, anomaly flags, SLA\n",
    "status) produces substantially better agent reasoning than raw numbers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-2",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pathlib import Path\n",
    "from datetime import datetime, timedelta\n",
    "\n",
    "import anthropic\n",
    "import pandas as pd\n",
    "from dotenv import load_dotenv\n",
    "\n",
    "from llm_client import ClaudeClient\n",
    "\n",
    "load_dotenv(\"../.env\")\n",
    "\n",
    "DATA_DIR = Path(\"..\") / \"data\"\n",
    "for fname in (\"warehouse_usage.csv\", \"pipeline_jobs.csv\"):\n",
    "    assert (DATA_DIR / fname).exists(), f\"Missing: {DATA_DIR / fname}. Run python ../scripts/generate_data.py\"\n",
    "\n",
    "usage_df = pd.read_csv(DATA_DIR / \"warehouse_usage.csv\", parse_dates=[\"date\"])\n",
    "jobs_df  = pd.read_csv(DATA_DIR / \"pipeline_jobs.csv\",\n",
    "                        parse_dates=[\"started_at\", \"finished_at\"])\n",
    "\n",
    "print(f\"warehouse_usage : {len(usage_df):,} rows  ({usage_df.date.min().date()} to {usage_df.date.max().date()})\")\n",
    "print(f\"pipeline_jobs   : {len(jobs_df):,}  rows  ({jobs_df.started_at.min().date()} to {jobs_df.started_at.max().date()})\")\n",
    "print(f\"Job names       : {sorted(jobs_df.job_name.unique())}\")\n",
    "\n",
    "client = anthropic.Anthropic()\n",
    "cc = ClaudeClient()\n",
    "MODEL = cc.default_model\n",
    "\n",
    "# Alert log -- side-effect tool writes here\n",
    "ALERT_LOG: list[dict] = []"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-3",
   "metadata": {},
   "source": [
    "## Section 2 - Tool implementations and schemas\n",
    "\n",
    "Each tool is a Python function that returns a formatted string. The function is the\n",
    "implementation; the schema dict is the interface Claude reads.\n",
    "\n",
    "Notice how the tool descriptions answer three questions:\n",
    "1. **When** to call this tool (trigger phrases, use cases)\n",
    "2. **What** it returns (format, fields, units)\n",
    "3. **How** to distinguish it from related tools"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-4",
   "metadata": {},
   "outputs": [],
   "source": [
    "# ---- Tool implementations ------------------------------------------------\n\ndef impl_get_job_status(job_name: str, days: int = 7, status_filter: str = \"\") -> str:\n    \"\"\"Return recent run history for a job, with baseline comparison.\"\"\"\n    cutoff = jobs_df.started_at.max() - timedelta(days=days - 1)\n    subset = jobs_df[jobs_df.started_at >= cutoff].copy()\n    if job_name != \"*\":\n        subset = subset[subset.job_name == job_name]\n    if status_filter:\n        subset = subset[subset.status == status_filter]\n    if subset.empty:\n        return f\"No runs found for job='{job_name}' in the last {days} days\"\n    # Success rate over the window\n    total = len(subset)\n    success = (subset.status == \"success\").sum()\n    lines = [f\"Job: {job_name}  |  window: last {days} days  |  runs: {total}  |  success rate: {success/total*100:.0f}%\", \"\"]\n    cols = [\"started_at\", \"status\", \"duration_s\", \"credits_used\", \"rows_processed\", \"error_message\"]\n    lines.append(subset[cols].tail(10).to_string(index=False))\n    return \"\\n\".join(lines)\n\n\ndef impl_get_warehouse_spend(warehouse_name: str, start_date: str, end_date: str) -> str:\n    \"\"\"Return daily credits for a warehouse with baseline annotation.\"\"\"\n    sub = usage_df[\n        (usage_df.warehouse_name == warehouse_name) &\n        (usage_df.date >= pd.Timestamp(start_date)) &\n        (usage_df.date <= pd.Timestamp(end_date))\n    ].copy()\n    if sub.empty:\n        return f\"No data for {warehouse_name} between {start_date} and {end_date}\"\n    baseline = usage_df[usage_df.warehouse_name == warehouse_name].credits_used.median()\n    sub[\"vs_baseline\"] = (sub.credits_used / baseline - 1).map(\"{:+.0%}\".format)\n    sub[\"flag\"] = sub.credits_used.apply(lambda c: \"ANOMALY\" if c > baseline * 1.25 else \"\")\n    header = f\"{warehouse_name} daily credits ({start_date} to {end_date})  |  30-day median baseline: {baseline:.2f}\"\n    table  = sub[[\"date\", \"credits_used\", \"vs_baseline\", \"flag\", \"query_count\"]].to_string(index=False)\n    return header + \"\\n\\n\" + table\n\n\ndef impl_check_data_freshness(job_name: str) -> str:\n    \"\"\"Return time-since-last-success and SLA status for a job.\"\"\"\n    successful = jobs_df[(jobs_df.job_name == job_name) & (jobs_df.status == \"success\")]\n    if successful.empty:\n        return f\"No successful runs found for '{job_name}' in the dataset.\"\n    last_ok   = successful.finished_at.max()\n    dataset_end = jobs_df.started_at.max()\n    hours_ago = (dataset_end - last_ok).total_seconds() / 3600\n    sla_hours = 26  # daily jobs SLA\n    sla_status = \"OK\" if hours_ago <= sla_hours else f\"BREACHED (>{sla_hours}h since last success)\"\n    return (\n        f\"Job: {job_name}\\n\"\n        f\"Last successful run : {last_ok.isoformat(timespec='seconds')}\\n\"\n        f\"Hours since success : {hours_ago:.1f}h\\n\"\n        f\"SLA status (26h)    : {sla_status}\"\n    )\n\n\ndef impl_send_alert(severity: str, title: str, message: str) -> str:\n    \"\"\"Record an alert. Severity: info | warning | critical.\"\"\"\n    entry = {\n        \"timestamp\": datetime.now().isoformat(timespec=\"seconds\"),\n        \"severity\":  severity.upper(),\n        \"title\":     title,\n        \"message\":   message,\n    }\n    ALERT_LOG.append(entry)\n    return f\"Alert recorded [{severity.upper()}]: {title}\"\n\n\n# ---- Tool schemas (what Claude reads) ------------------------------------\n\nTOOLS = [\n    {\n        \"name\": \"get_job_status\",\n        \"description\": (\n            \"Look up the recent run history for a pipeline job. \"\n            \"Call this when asked about pipeline health, recent failures, job duration, \"\n            \"or whether a specific job ran successfully. \"\n            \"Use job_name='*' to see all jobs. \"\n            \"For cross-dataset questions (e.g. correlating failures with warehouse cost), \"\n            \"call this alongside get_warehouse_spend in the same turn.\"\n        ),\n        \"input_schema\": {\n            \"type\": \"object\",\n            \"properties\": {\n                \"job_name\": {\"type\": \"string\", \"description\": \"Exact job name, or '*' for all jobs.\"},\n                \"days\":     {\"type\": \"integer\", \"description\": \"How many days back to look. Default 7.\"},\n                \"status_filter\": {\"type\": \"string\", \"description\": \"Filter to: success | failed | skipped | running. Empty string = all.\"},\n            },\n            \"required\": [\"job_name\"],\n        },\n    },\n    {\n        \"name\": \"get_warehouse_spend\",\n        \"description\": (\n            \"Return daily Snowflake credit consumption for one warehouse over a date range. \"\n            \"Use for cost trend analysis, anomaly detection, or correlating spend with pipeline events. \"\n            \"Warehouse names: WH_ELT_M, WH_ELT_L, WH_AD_HOC_S, WH_BI_M, WH_REVERSE_S, WH_EMBEDDED_M, WH_DS_L.\"\n        ),\n        \"input_schema\": {\n            \"type\": \"object\",\n            \"properties\": {\n                \"warehouse_name\": {\"type\": \"string\", \"description\": \"Exact warehouse name.\"},\n                \"start_date\":     {\"type\": \"string\", \"description\": \"ISO date, e.g. 2025-07-01.\"},\n                \"end_date\":       {\"type\": \"string\", \"description\": \"ISO date, e.g. 2025-07-31.\"},\n            },\n            \"required\": [\"warehouse_name\", \"start_date\", \"end_date\"],\n        },\n    },\n    {\n        \"name\": \"check_data_freshness\",\n        \"description\": (\n            \"Return the time elapsed since a job's last successful run and whether it is within SLA. \"\n            \"SLA is 26 hours for all daily jobs. \"\n            \"Use this when asked 'is this table up to date?', 'when did X last succeed?', \"\n            \"or to confirm a job is not silently stale after an incident.\"\n        ),\n        \"input_schema\": {\n            \"type\": \"object\",\n            \"properties\": {\n                \"job_name\": {\"type\": \"string\", \"description\": \"Exact job name.\"},\n            },\n            \"required\": [\"job_name\"],\n        },\n    },\n    {\n        \"name\": \"send_alert\",\n        \"description\": (\n            \"Record an alert to the incident log. IMPORTANT: this action is logged and cannot be undone. \"\n            \"Use 'info' for observations, 'warning' for degraded-but-not-broken conditions, \"\n            \"'critical' only for incidents requiring immediate human attention. \"\n            \"Do not call this for hypothetical or exploratory analysis.\"\n        ),\n        \"input_schema\": {\n            \"type\": \"object\",\n            \"properties\": {\n                \"severity\": {\"type\": \"string\", \"description\": \"info | warning | critical\"},\n                \"title\":    {\"type\": \"string\", \"description\": \"Short incident title (< 80 chars).\"},\n                \"message\":  {\"type\": \"string\", \"description\": \"Detailed description of the issue and recommended action.\"},\n            },\n            \"required\": [\"severity\", \"title\", \"message\"],\n        },\n    },\n]\n\nTOOL_DISPATCH = {\n    \"get_job_status\":       lambda i: impl_get_job_status(i[\"job_name\"], i.get(\"days\", 7), i.get(\"status_filter\", \"\")),\n    \"get_warehouse_spend\":  lambda i: impl_get_warehouse_spend(i[\"warehouse_name\"], i[\"start_date\"], i[\"end_date\"]),\n    \"check_data_freshness\": lambda i: impl_check_data_freshness(i[\"job_name\"]),\n    \"send_alert\":           lambda i: impl_send_alert(i[\"severity\"], i[\"title\"], i[\"message\"]),\n}\n\nprint(f\"Tool suite ready: {[t['name'] for t in TOOLS]}\")\n\n# Quick smoke test of each tool function directly\nprint(\"\\n-- get_job_status smoke test --\")\nprint(impl_get_job_status(\"dbt_fct_subscriptions\", days=5)[:300])\nprint(\"\\n-- check_data_freshness smoke test --\")\nprint(impl_check_data_freshness(\"dbt_fct_subscriptions\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-5",
   "metadata": {},
   "source": [
    "## Section 3 - The execution loop (multi-tool aware)\n",
    "\n",
    "The loop from C1 already handles parallel tool calls correctly: it collects ALL `tool_use`\n",
    "blocks from a response, executes each, and returns all results in a single user message.\n",
    "This section makes the parallel-call behaviour explicit by logging the turn structure."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-6",
   "metadata": {},
   "outputs": [],
   "source": [
    "MONITORING_SYSTEM = (\n",
    "    \"You are a data platform monitoring assistant for Acme SaaS Co. \"\n",
    "    \"You have access to pipeline job history and warehouse cost data. \"\n",
    "    \"When diagnosing issues: (1) gather data first, (2) identify anomalies, \"\n",
    "    \"(3) check data freshness if relevant, (4) send an alert only for genuine incidents. \"\n",
    "    \"Be specific: cite job names, dates, credit figures, and error messages.\"\n",
    ")\n",
    "\n",
    "\n",
    "def run_monitoring_agent(\n",
    "    question: str,\n",
    "    *,\n",
    "    max_turns: int = 12,\n",
    "    verbose: bool = True,\n",
    ") -> str:\n",
    "    \"\"\"Run the four-tool monitoring agent and return Claude's final answer.\"\"\"\n",
    "    messages = [{\"role\": \"user\", \"content\": question}]\n",
    "\n",
    "    for turn in range(1, max_turns + 1):\n",
    "        response = client.messages.create(\n",
    "            model=MODEL,\n",
    "            max_tokens=4096,\n",
    "            system=MONITORING_SYSTEM,\n",
    "            tools=TOOLS,\n",
    "            messages=messages,\n",
    "        )\n",
    "\n",
    "        if response.stop_reason == \"end_turn\":\n",
    "            if verbose:\n",
    "                print(f\"[turn {turn}] end_turn\")\n",
    "            for block in response.content:\n",
    "                if block.type == \"text\":\n",
    "                    return block.text\n",
    "            return \"(no text in final response)\"\n",
    "\n",
    "        if response.stop_reason == \"tool_use\":\n",
    "            tool_use_blocks = [b for b in response.content if b.type == \"tool_use\"]\n",
    "            if verbose:\n",
    "                calls = \", \".join(f\"{b.name}(...)\" for b in tool_use_blocks)\n",
    "                print(f\"[turn {turn}] tool_use x{len(tool_use_blocks)}: {calls}\")\n",
    "\n",
    "            messages.append({\"role\": \"assistant\", \"content\": response.content})\n",
    "            tool_results = []\n",
    "            for block in tool_use_blocks:\n",
    "                result = TOOL_DISPATCH.get(\n",
    "                    block.name,\n",
    "                    lambda _: f\"ERROR: unknown tool '{block.name}'\"\n",
    "                )(block.input)\n",
    "                if verbose:\n",
    "                    print(f\"  -> {block.name}: {str(result)[:100].replace(chr(10), ' | ')} ...\")\n",
    "                tool_results.append({\n",
    "                    \"type\":        \"tool_result\",\n",
    "                    \"tool_use_id\": block.id,\n",
    "                    \"content\":     str(result),\n",
    "                })\n",
    "            # All results for this turn in ONE user message\n",
    "            messages.append({\"role\": \"user\", \"content\": tool_results})\n",
    "\n",
    "    raise RuntimeError(f\"max_turns={max_turns} reached without end_turn\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-7",
   "metadata": {},
   "source": [
    "## Section 4 - Parallel tool calls in action\n",
    "\n",
    "Ask a question that requires two independent data sources: pipeline job failures AND warehouse\n",
    "cost data for the same date window. Claude should emit two `tool_use` blocks in one response\n",
    "turn. Watch the turn log: `[turn 1] tool_use x2: get_job_status(...), get_warehouse_spend(...)`\n",
    "\n",
    "If Claude calls them sequentially instead, both behaviours are valid -- Claude's parallelism\n",
    "decision depends on context and model version. The loop handles both correctly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-8",
   "metadata": {},
   "outputs": [],
   "source": [
    "parallel_q = (\n",
    "    \"Check whether the pipeline failures on and around 2025-07-15 correlate with \"\n",
    "    \"any cost anomaly on WH_ELT_M. Look at both datasets for the week of July 14-20.\"\n",
    ")\n",
    "\n",
    "print(f\"Q: {parallel_q}\\n\")\n",
    "answer = run_monitoring_agent(parallel_q)\n",
    "print()\n",
    "print(\"=== Agent answer ===\")\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-9",
   "metadata": {},
   "source": [
    "## Section 5 - Tool chaining: freshness check after failure\n",
    "\n",
    "A question that requires sequential tool calls: check which jobs failed recently, then\n",
    "verify whether affected tables are still within SLA. Claude cannot know which job to check\n",
    "freshness for until it has the failure data -- so these calls must be sequential, not parallel."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-10",
   "metadata": {},
   "outputs": [],
   "source": [
    "chaining_q = (\n",
    "    \"Which dbt jobs failed in the last 90 days? For the most frequently failing job, \"\n",
    "    \"check whether its table is currently within the 26-hour SLA.\"\n",
    ")\n",
    "\n",
    "print(f\"Q: {chaining_q}\\n\")\n",
    "answer = run_monitoring_agent(chaining_q)\n",
    "print()\n",
    "print(\"=== Agent answer ===\")\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-11",
   "metadata": {},
   "source": [
    "## Section 6 - Side-effect tool: triggering an alert\n",
    "\n",
    "Ask the agent to run a full incident check and alert if it finds something warranting attention.\n",
    "After this cell, `ALERT_LOG` will contain whatever alerts the agent decided to send.\n",
    "\n",
    "Note the difference from previous cells: here the agent is not just answering a question --\n",
    "it is *taking an action* (writing to the alert log). The verbose turn log shows `send_alert`\n",
    "appearing alongside (or after) the data-gathering calls."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-12",
   "metadata": {},
   "outputs": [],
   "source": [
    "alert_q = (\n",
    "    \"Run a morning health check for the data platform. Check for: \"\n",
    "    \"(1) any failed jobs in the last 48 hours, \"\n",
    "    \"(2) any SLA breaches for the core dbt models (fct_subscriptions, fct_mrr, fct_events), \"\n",
    "    \"(3) any warehouse spend anomalies on WH_BI_M or WH_ELT_L in the last 7 days. \"\n",
    "    \"Send an alert for any issues you find, then summarise what you checked.\"\n",
    ")\n",
    "\n",
    "print(f\"Q: {alert_q}\\n\")\n",
    "answer = run_monitoring_agent(alert_q)\n",
    "print()\n",
    "print(\"=== Agent answer ===\")\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-13",
   "metadata": {},
   "source": [
    "## Section 7 - Inspect the alert log\n",
    "\n",
    "The alert log is the agent's action audit trail. Every `send_alert` call is recorded here\n",
    "regardless of what the agent said in its text response. In a production system this would be\n",
    "your incident ticket queue or PagerDuty integration."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-14",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(f\"Total alerts recorded this session: {len(ALERT_LOG)}\")\n",
    "print()\n",
    "for i, alert in enumerate(ALERT_LOG, 1):\n",
    "    print(f\"Alert {i}:\")\n",
    "    print(f\"  [{alert['severity']}] {alert['title']}\")\n",
    "    print(f\"  Time    : {alert['timestamp']}\")\n",
    "    print(f\"  Message : {alert['message'][:200]}\")\n",
    "    print()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cell-15",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Note: this notebook routes all model calls through a standalone anthropic client\n# (client.messages.create), not through cc.complete(). cc.records will be empty;\n# use resp.usage on individual responses to inspect per-call token counts.\ncc.print_summary()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cell-16",
   "metadata": {},
   "source": [
    "## Section 8 - Practitioner Lab\n",
    "\n",
    "Open-ended extension. No reference solution.\n",
    "\n",
    "**Goal:** add a `get_incident_history` tool and demonstrate multi-turn escalation.\n",
    "\n",
    "**Background:** the current agent can detect and alert on individual incidents. A mature\n",
    "monitoring agent should also understand *patterns* -- e.g., \"this warehouse had three cost\n",
    "spikes in six weeks; that is a recurring issue, not a one-off.\"\n",
    "\n",
    "**Task A -- implement the tool:**\n",
    "```python\n",
    "def impl_get_incident_history(warehouse_or_job: str, days: int = 90) -> str:\n",
    "    \"\"\"\n",
    "    Return a summary of all anomalous periods for a warehouse or job over the past N days.\n",
    "    For warehouses: flag days where credits exceeded 1.25x the 30-day rolling median.\n",
    "    For jobs: flag all failed/skipped runs.\n",
    "    Group consecutive anomalous days into 'incidents' with start date, end date, severity.\n",
    "    \"\"\"\n",
    "    ...\n",
    "```\n",
    "\n",
    "Add the corresponding schema to `TOOLS` and `TOOL_DISPATCH`.\n",
    "\n",
    "**Task B -- test the escalation pattern:**\n",
    "Ask the agent: \"Has WH_BI_M had recurring cost anomalies this quarter? If there have been\n",
    "two or more distinct incidents, escalate to critical severity.\"\n",
    "\n",
    "The agent should: call `get_incident_history` to count distinct anomaly windows, then\n",
    "conditionally call `send_alert` with the appropriate severity.\n",
    "\n",
    "**Stretch:** implement a `list_all_jobs` tool that takes no arguments and returns the full\n",
    "list of job names and their warehouses. Ask the agent \"give me a health dashboard for all\n",
    "jobs\" -- it should call `list_all_jobs` first, then fan out to `check_data_freshness` for\n",
    "each job. Watch how many parallel calls Claude makes in the fan-out turn.\n",
    "\n",
    "Why this matters: the difference between a demo agent and a production agent is usually\n",
    "not the model -- it is the tool suite. An agent that can only see individual data points\n",
    "cannot reason about trends. An agent with a history tool can. The escalation logic (two or\n",
    "more incidents -> critical) is the kind of business rule that belongs in the system prompt\n",
    "or tool output, not in prompt engineering.\n",
    "\n",
    "---\n",
    "\n",
    "*Companion article: C2 - Custom Tools: Building a Multi-Tool Data Platform Agent.*\n",
    "*Next notebook: D1_mcp_client.ipynb*"
   ]
  }
 ]
}