{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# D2 -- Building an MCP Server: Wrapping Custom Tools for Any MCP Client\n",
    "\n",
    "> Part of *Building with Claude -- A Practitioner's Guide to the Anthropic API*  \n",
    "> Based on Anthropic's \"Building with the Claude API\" course (Coursera) and public API documentation.  \n",
    "> Not affiliated with or endorsed by Anthropic.\n",
    "\n",
    "## What you will build\n",
    "\n",
    "This notebook covers MCP from the **server** perspective:\n",
    "\n",
    "1. Walk through `mcp_data_server.py` section by section -- the design decisions behind each tool, resource, and prompt\n",
    "2. Verify the server works independently before connecting Claude\n",
    "3. Add a fourth tool (`get_query_performance`) to the server live\n",
    "4. Run the server and connect to it to confirm the new tool is discovered\n",
    "5. Run `data_monitor_cli.py` -- the complete CLI application from D2\n",
    "\n",
    "**D1 pre-reading**: This notebook assumes you have read D1 and understand `ClientSession`, `list_tools()`, and the MCP execution loop.\n",
    "\n",
    "**Files used:**  \n",
    "`notebooks/mcp_data_server.py` -- the server we build and extend  \n",
    "`scripts/data_monitor_cli.py`  -- the CLI app we validate  \n",
    "`data/warehouse_usage.csv` + `data/pipeline_jobs.csv` -- datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 1 -- Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import inspect\n",
    "import os\n",
    "import sys\n",
    "from pathlib import Path\n",
    "\n",
    "import anthropic\n",
    "import nest_asyncio\n",
    "from dotenv import load_dotenv\n",
    "from mcp import ClientSession, StdioServerParameters\n",
    "from mcp.client.stdio import stdio_client\n",
    "\n",
    "nest_asyncio.apply()\n",
    "load_dotenv(Path.cwd().parent / \".env\")\n",
    "assert os.environ.get(\"ANTHROPIC_API_KEY\"), \"Set ANTHROPIC_API_KEY in .env\"\n",
    "\n",
    "anthropic_client = anthropic.Anthropic()\n",
    "MODEL = \"claude-sonnet-4-5\"\n",
    "\n",
    "SERVER_SCRIPT = Path.cwd() / \"mcp_data_server.py\"\n",
    "assert SERVER_SCRIPT.exists(), f\"Server not found: {SERVER_SCRIPT}\"\n",
    "\n",
    "SERVER_PARAMS = StdioServerParameters(\n",
    "    command=sys.executable,\n",
    "    args=[str(SERVER_SCRIPT)],\n",
    ")\n",
    "\n",
    "def run(coro):\n",
    "    return asyncio.get_event_loop().run_until_complete(coro)\n",
    "\n",
    "async def _with_session(fn):\n",
    "    async with stdio_client(SERVER_PARAMS) as (read, write):\n",
    "        async with ClientSession(read, write) as session:\n",
    "            await session.initialize()\n",
    "            return await fn(session)\n",
    "\n",
    "def mcp_run(fn):\n",
    "    return run(_with_session(fn))\n",
    "\n",
    "print(\"Setup complete.\")\n",
    "print(f\"Server: {SERVER_SCRIPT}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 2 -- Server structure walkthrough\n",
    "\n",
    "Before running the server, read its source to understand how FastMCP works.\n",
    "\n",
    "**Three key patterns in `mcp_data_server.py`:**\n",
    "\n",
    "1. **`@mcp.tool()` decorator**: FastMCP reads the function name, docstring, and type hints to generate the tool schema automatically. The first paragraph of the docstring becomes the `description` field; parameter annotations become the `inputSchema`.\n",
    "\n",
    "2. **`@mcp.resource(\"uri\")` decorator**: The URI becomes the resource address. Clients call `session.read_resource(uri)` to fetch the content. Resources are read, not called -- they do not appear in the tool use loop.\n",
    "\n",
    "3. **`@mcp.prompt()` decorator**: The function's parameters become the prompt arguments. FastMCP renders the template by calling the function with the client-supplied arguments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read the server source and print the structure overview\n",
    "source = SERVER_SCRIPT.read_text()\n",
    "\n",
    "import re\n",
    "\n",
    "# Extract all decorator + function signature lines\n",
    "print(\"mcp_data_server.py -- structure overview\\n\")\n",
    "lines = source.splitlines()\n",
    "for i, line in enumerate(lines):\n",
    "    if re.match(r'^@mcp\\.(tool|resource|prompt)', line):\n",
    "        print(f\"  Line {i+1:>3}: {line}\")\n",
    "        # Print the following def line\n",
    "        if i+1 < len(lines):\n",
    "            print(f\"           {lines[i+1]}\")\n",
    "        print()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# The FastMCP server is just a standard Python module.\n# Load it here to inspect the tool functions without launching a subprocess:\nimport importlib.util\n\nspec = importlib.util.spec_from_file_location(\"mcp_data_server\", SERVER_SCRIPT)\nsrv_module = importlib.util.module_from_spec(spec)\nspec.loader.exec_module(srv_module)\n\n# Show the docstring of each tool function\nfor name in [\"get_warehouse_summary\", \"get_warehouse_spend_range\", \"get_failing_jobs\", \"check_job_sla\"]:\n    fn = getattr(srv_module, name)\n    first_line = inspect.getdoc(fn).splitlines()[0]\n    sig = inspect.signature(fn)\n    print(f\"Tool: {name}{sig}\")\n    print(f\"  Desc: {first_line}\")\n    print()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 3 -- Test tools directly (without Claude)\n",
    "\n",
    "Before connecting Claude, test the server's tool functions directly by calling the Python\n",
    "functions from the loaded module. This is the unit-test layer: verify each function\n",
    "returns the expected output format for a known input.\n",
    "\n",
    "This isolates tool logic bugs from agent reasoning bugs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Test 1: get_warehouse_summary -- spot-check format and content\n",
    "print(\"Direct function call: get_warehouse_summary(days=7)\")\n",
    "print(\"-\" * 60)\n",
    "result = srv_module.get_warehouse_summary(days=7)\n",
    "print(result)\n",
    "print()\n",
    "\n",
    "# Verify: result should mention WH_BI_M prominently (it has the Tableau extract anomaly)\n",
    "assert \"WH_BI_M\" in result, \"Expected WH_BI_M in summary\"\n",
    "print(\"Assertion passed: WH_BI_M appears in summary.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Test 2: check_job_sla -- valid job name vs invalid\n",
    "print(\"Direct function call: check_job_sla('dbt_fct_subscriptions')\")\n",
    "print(\"-\" * 60)\n",
    "print(srv_module.check_job_sla(\"dbt_fct_subscriptions\"))\n",
    "\n",
    "print()\n",
    "print(\"Direct function call: check_job_sla('nonexistent_job')\")\n",
    "print(\"-\" * 60)\n",
    "result = srv_module.check_job_sla(\"nonexistent_job\")\n",
    "print(result)\n",
    "assert \"not found\" in result, \"Expected error message for unknown job\"\n",
    "print(\"Assertion passed: informative error for unknown job.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Test 3: verify MCP session also delivers the same content\n",
    "# (confirms FastMCP wraps the function correctly)\n",
    "async def test_via_session(session):\n",
    "    r = await session.call_tool(\"get_warehouse_summary\", {\"days\": 7})\n",
    "    return r.content[0].text\n",
    "\n",
    "session_result = mcp_run(test_via_session)\n",
    "assert session_result == result or \"WH_BI_M\" in session_result, \\\n",
    "    \"MCP session result differs unexpectedly from direct call\"\n",
    "print(\"Assertion passed: MCP session and direct call return consistent output.\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 4 -- Add a new tool: get_query_performance\n\nOne of MCP's key benefits: adding a tool to the server makes it immediately available\nto all connected clients -- no client code changes required.\n\nWe add `get_query_performance(warehouse_name, days)` by creating a new file\n`mcp_data_server_extended.py` \u2014 a copy of the base server with the new tool appended.\nThis keeps `mcp_data_server.py` untouched (safe to reset: just delete the `_extended` copy).\nThen we connect a fresh session to the extended server and confirm `list_tools()` returns five tools."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create mcp_data_server_extended.py by copying the base server and appending the new tool.\n# mcp_data_server.py is never modified -- delete mcp_data_server_extended.py to reset.\nNEW_TOOL_CODE = '''\n\n@mcp.tool()\ndef get_query_performance(warehouse_name: str, days: int = 14) -> str:\n    \"\"\"Return average execution and queue time trends for a named warehouse.\n\n    Call this when investigating query slowdowns, queue buildup, or execution time\n    regression. Use alongside get_warehouse_summary to correlate performance\n    degradation with credit cost increases.\n\n    Args:\n        warehouse_name: Exact warehouse name (e.g. WH_BI_M).\n        days: Number of days to look back (default 14).\n    \"\"\"\n    from datetime import datetime, timedelta\n\n    rows = [r for r in USAGE_ROWS if r[\"warehouse_name\"] == warehouse_name]\n    if not rows:\n        all_wh = sorted({r[\"warehouse_name\"] for r in USAGE_ROWS})\n        return f\"Warehouse not found: {warehouse_name!r}. Available: {\\', \\'.join(all_wh)}\"\n\n    all_dates = sorted({r[\"date\"] for r in USAGE_ROWS})\n    # days-1 so \"last N days\" returns exactly N rows (>= cutoff with timedelta(days=N) gives N+1)\n    cutoff = (\n        datetime.strptime(all_dates[-1], \"%Y-%m-%d\") - timedelta(days=days - 1)\n    ).strftime(\"%Y-%m-%d\")\n    recent = sorted([r for r in rows if r[\"date\"] >= cutoff], key=lambda x: x[\"date\"])\n    if not recent:\n        return f\"No data for {warehouse_name} in the last {days} days (since {cutoff}).\"\n\n    lines = [\n        f\"Query performance -- {warehouse_name} -- last {days} days\",\n        \"\",\n        f\"{{'Date':<12}} {{'Queries':>8}} {{'Avg Queue (s)':>14}} {{'Avg Exec (s)':>13}}\",\n        \"-\" * 52,\n    ]\n    for r in recent:\n        lines.append(\n            f\"{{r['date']:<12}} {{int(r['query_count']):>8}} \"\n            f\"{{float(r['avg_queue_time_s']):>14.2f}} \"\n            f\"{{float(r['avg_execution_time_s']):>13.2f}}\"\n        )\n    avg_q = sum(float(r[\"avg_queue_time_s\"]) for r in recent) / len(recent)\n    avg_e = sum(float(r[\"avg_execution_time_s\"]) for r in recent) / len(recent)\n    lines += [\"-\" * 52, f\"{{'Period average':<12}} {{''  :>8}} {{avg_q:>14.2f}} {{avg_e:>13.2f}}\"]\n    return \"\\n\".join(lines)\n'''\n\nEXTENDED_SCRIPT = SERVER_SCRIPT.parent / \"mcp_data_server_extended.py\"\n\noriginal_source = SERVER_SCRIPT.read_text()\n\n# Insert before the entry-point block\ninsert_before = \"\\n# ==========================================================================\"\ninsert_before += \"\\n# Entry point\"\nnew_source = original_source.replace(insert_before, NEW_TOOL_CODE + insert_before)\n\nEXTENDED_SCRIPT.write_text(new_source)\nprint(f\"Created {EXTENDED_SCRIPT.name} ({len(new_source)} bytes)\")\nprint(f\"mcp_data_server.py is unchanged.\")\n\n# Server params for the extended server\nfrom mcp import StdioServerParameters as _SRP\nEXTENDED_SERVER_PARAMS = StdioServerParameters(\n    command=sys.executable,\n    args=[str(EXTENDED_SCRIPT)],\n)\n\nasync def _with_extended_session(fn):\n    async with stdio_client(EXTENDED_SERVER_PARAMS) as (read, write):\n        async with ClientSession(read, write) as session:\n            await session.initialize()\n            return await fn(session)\n\ndef mcp_run_extended(fn):\n    return run(_with_extended_session(fn))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Connect a fresh session and confirm the tool is now discoverable\nasync def verify_new_tool(session):\n    result = await session.list_tools()\n    tool_names = [t.name for t in result.tools]\n    print(f\"Tools now available ({len(tool_names)}): {tool_names}\")\n    assert \"get_query_performance\" in tool_names, \"New tool not discovered!\"\n    print(\"Assertion passed: get_query_performance is discoverable via list_tools().\")\n\n    # Call it to verify it works\n    r = await session.call_tool(\"get_query_performance\", {\"warehouse_name\": \"WH_BI_M\", \"days\": 14})\n    print()\n    print(r.content[0].text[:600])\n    return True\n\n\nmcp_run_extended(verify_new_tool)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 5 -- Claude with the extended server\n",
    "\n",
    "The same execution loop from D1 works unchanged with the extended server.\n",
    "Claude now discovers four tools and can decide to call `get_query_performance`\n",
    "when the question calls for it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def run_with_mcp(question: str, verbose: bool = True) -> str:\n    \"\"\"Execute loop identical to D1 -- works with any number of tools.\"\"\"\n    async with stdio_client(EXTENDED_SERVER_PARAMS) as (read, write):\n        async with ClientSession(read, write) as session:\n            await session.initialize()\n            tools_result = await session.list_tools()\n            anthropic_tools = [\n                {\"name\": t.name, \"description\": t.description or \"\",\n                 \"input_schema\": t.inputSchema}\n                for t in tools_result.tools\n            ]\n            if verbose:\n                print(f\"Discovered {len(anthropic_tools)} tools from server.\")\n\n            messages = [{\"role\": \"user\", \"content\": question}]\n            for turn in range(1, 11):\n                response = anthropic_client.messages.create(\n                    model=MODEL, max_tokens=4096,\n                    tools=anthropic_tools, messages=messages,\n                )\n                if verbose:\n                    calls = [b.name for b in response.content if b.type == \"tool_use\"]\n                    print(f\"  Turn {turn}: {response.stop_reason}\",\n                          f\", tools={calls}\" if calls else \"\")\n\n                if response.stop_reason == \"end_turn\":\n                    return next((b.text for b in response.content if b.type == \"text\"), \"\")\n\n                if response.stop_reason == \"tool_use\":\n                    messages.append({\"role\": \"assistant\", \"content\": response.content})\n                    results = []\n                    for block in response.content:\n                        if block.type == \"tool_use\":\n                            r = await session.call_tool(block.name, block.input)\n                            results.append({\n                                \"type\": \"tool_result\",\n                                \"tool_use_id\": block.id,\n                                \"content\": r.content[0].text,\n                            })\n                    messages.append({\"role\": \"user\", \"content\": results})\n    return \"(max turns)\"\n\n\nprint(\"run_with_mcp() defined.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Question that should trigger get_query_performance (the new tool)\n",
    "q = (\n",
    "    \"Is WH_BI_M experiencing queue buildup in the last 2 weeks? \"\n",
    "    \"How does the queue time compare with its credit spend trend?\"\n",
    ")\n",
    "print(f\"Question: {q}\")\n",
    "print(\"-\" * 70)\n",
    "answer = run(run_with_mcp(q))\n",
    "print()\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 6 -- The CLI application\n",
    "\n",
    "`scripts/data_monitor_cli.py` is the standalone version of the execution loop above.\n",
    "It accepts a question from the command line, connects to the MCP server, runs Claude,\n",
    "and prints the answer -- no notebook needed.\n",
    "\n",
    "This is the D2 capstone: a reusable CLI tool backed by a reusable MCP server."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import subprocess\n",
    "\n",
    "cli_path = Path.cwd().parent / \"scripts\" / \"data_monitor_cli.py\"\n",
    "assert cli_path.exists(), f\"CLI not found: {cli_path}\"\n",
    "\n",
    "# List available tools and resources\n",
    "print(\"CLI: --list-tools output\")\n",
    "print(\"=\" * 60)\n",
    "result = subprocess.run(\n",
    "    [sys.executable, str(cli_path), \"--list-tools\"],\n",
    "    capture_output=True, text=True\n",
    ")\n",
    "print(result.stdout)\n",
    "if result.returncode != 0:\n",
    "    print(\"STDERR:\", result.stderr[:300])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Run a direct question via CLI (verbose mode shows agent turns)\n",
    "print(\"CLI: direct question with --verbose\")\n",
    "print(\"=\" * 60)\n",
    "result = subprocess.run(\n",
    "    [\n",
    "        sys.executable, str(cli_path),\n",
    "        \"--verbose\",\n",
    "        \"Which job has the most SLA compliance issues in the last 90 days?\",\n",
    "    ],\n",
    "    capture_output=True, text=True,\n",
    ")\n",
    "if result.stderr:\n",
    "    print(\"Agent progress (stderr):\")\n",
    "    print(result.stderr.strip())\n",
    "    print()\n",
    "print(\"Answer:\")\n",
    "print(result.stdout.strip())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Run the incident_analysis server-side prompt via CLI\n",
    "print(\"CLI: --prompt incident_analysis\")\n",
    "print(\"=\" * 60)\n",
    "result = subprocess.run(\n",
    "    [\n",
    "        sys.executable, str(cli_path),\n",
    "        \"--prompt\", \"incident_analysis\",\n",
    "        \"--warehouse\", \"WH_BI_M\",\n",
    "        \"--date-range\", \"2025-07-20 to 2025-08-10\",\n",
    "        \"--verbose\",\n",
    "    ],\n",
    "    capture_output=True, text=True,\n",
    ")\n",
    "if result.stderr:\n",
    "    print(result.stderr.strip())\n",
    "    print()\n",
    "print(result.stdout.strip())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 7 -- Practitioner Lab\n",
    "\n",
    "### Exercise 1: Add a resource for the data quality runbook\n",
    "\n",
    "The server exposes `data://warehouse-runbook` but not the data quality runbook. Add:\n",
    "\n",
    "```python\n",
    "@mcp.resource(\"data://quality-runbook\")\n",
    "def get_quality_runbook() -> str:\n",
    "    \"\"\"The data quality monitoring runbook.\"\"\"\n",
    "    return (DATA_DIR / \"runbook_data_quality.md\").read_text()\n",
    "```\n",
    "\n",
    "Then connect a session and verify `list_resources()` returns three resources.\n",
    "\n",
    "### Exercise 2: SLA summary tool\n",
    "\n",
    "Add a `get_sla_summary()` tool (no parameters) that returns the SLA status for all\n",
    "eight jobs in one call. This is the \"dashboard view\" that avoids calling `check_job_sla`\n",
    "eight times separately. Consider how the description should distinguish this from\n",
    "`check_job_sla`.\n",
    "\n",
    "### Exercise 3: HTTP/SSE transport\n",
    "\n",
    "Run the server in HTTP/SSE mode from a terminal:\n",
    "\n",
    "```bash\n",
    "# In one terminal:\n",
    "python notebooks/mcp_data_server.py --transport sse --port 8765\n",
    "\n",
    "# In another, update SERVER_PARAMS and use sse_client instead of stdio_client:\n",
    "from mcp.client.sse import sse_client\n",
    "async with sse_client(\"http://localhost:8765/sse\") as (read, write):\n",
    "    ...\n",
    "```\n",
    "\n",
    "Note: FastMCP does not currently accept `--transport` as a CLI flag; call\n",
    "`mcp.run(transport=\"sse\", port=8765)` in a modified copy of the server.\n",
    "\n",
    "### Exercise 4: Multi-turn conversation agent\n",
    "\n",
    "Extend `run_with_mcp()` to maintain conversation history across multiple questions\n",
    "in one session (append user and assistant messages to the same `messages` list).\n",
    "This allows follow-up questions like:\n",
    "- \"Which warehouse costs the most?\"\n",
    "- \"Is there a correlated pipeline failure for that warehouse?\"\n",
    "- \"What does the runbook say about this incident pattern?\""
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.11.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}