{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# D1 -- MCP Concepts: Connecting Claude to an MCP Server as a Client\n",
    "\n",
    "> Part of *Building with Claude -- A Practitioner's Guide to the Anthropic API*  \n",
    "> Based on Anthropic's \"Building with the Claude API\" course (Coursera) and public API documentation.  \n",
    "> Not affiliated with or endorsed by Anthropic.\n",
    "\n",
    "## What you will build\n",
    "\n",
    "This notebook demonstrates MCP from the **client** perspective:\n",
    "\n",
    "1. Connect to a local MCP server (`mcp_data_server.py`) over stdio transport\n",
    "2. Discover tools, resources, and prompts dynamically -- no hardcoded schemas\n",
    "3. Call tools directly through the MCP session\n",
    "4. Read resources (reference documents, schema info)\n",
    "5. Retrieve and use prompt templates\n",
    "6. Run Claude with the server's tools via the same execution loop as C2\n",
    "\n",
    "The companion server (`mcp_data_server.py`) is in the same `notebooks/` folder. It exposes\n",
    "three tools, two resources, and one prompt over the Acme SaaS data platform datasets.\n",
    "\n",
    "**Prerequisite**: `pip install mcp>=1.0.0 nest_asyncio>=1.5.0`  \n",
    "**API key**: `ANTHROPIC_API_KEY` in your `.env` file"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 1 -- Setup\n",
    "\n",
    "MCP client code is async (all `session.*` calls are coroutines). Jupyter runs its own\n",
    "asyncio event loop, which normally blocks `asyncio.run()`. The `nest_asyncio` patch\n",
    "makes `asyncio.get_event_loop().run_until_complete()` work inside Jupyter:\n",
    "\n",
    "```\n",
    "pip install mcp>=1.0.0 nest_asyncio>=1.5.0\n",
    "```\n",
    "\n",
    "We also define two helpers that keep the demo cells concise:\n",
    "- `run(coro)` -- run an async coroutine from a sync Jupyter cell\n",
    "- `mcp_run(fn)` -- open a fresh server session, call `fn(session)`, close the session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import os\n",
    "import sys\n",
    "from pathlib import Path\n",
    "\n",
    "import anthropic\n",
    "import nest_asyncio\n",
    "from dotenv import load_dotenv\n",
    "from mcp import ClientSession, StdioServerParameters\n",
    "from mcp.client.stdio import stdio_client\n",
    "\n",
    "# Patch the running Jupyter event loop so run_until_complete() works here\n",
    "nest_asyncio.apply()\n",
    "\n",
    "load_dotenv(Path.cwd().parent / \".env\")\n",
    "assert os.environ.get(\"ANTHROPIC_API_KEY\"), \"Set ANTHROPIC_API_KEY in .env\"\n",
    "\n",
    "anthropic_client = anthropic.Anthropic()\n",
    "\n",
    "# Model used throughout this notebook\n",
    "MODEL = \"claude-sonnet-4-5\"\n",
    "\n",
    "# -----------------------------------------------------------------------\n",
    "# MCP server parameters\n",
    "# -----------------------------------------------------------------------\n",
    "# The server script lives alongside this notebook.\n",
    "# StdioServerParameters tells the MCP client how to launch it as a subprocess.\n",
    "SERVER_PARAMS = StdioServerParameters(\n",
    "    command=sys.executable,               # same Python interpreter as the notebook\n",
    "    args=[str(Path.cwd() / \"mcp_data_server.py\")],\n",
    ")\n",
    "\n",
    "# -----------------------------------------------------------------------\n",
    "# Async helpers\n",
    "# -----------------------------------------------------------------------\n",
    "def run(coro):\n",
    "    \"\"\"Run an async coroutine synchronously from a Jupyter cell.\"\"\"\n",
    "    return asyncio.get_event_loop().run_until_complete(coro)\n",
    "\n",
    "\n",
    "async def _with_session(fn):\n",
    "    \"\"\"Open a fresh MCP server session, run fn(session), close the session.\"\"\"\n",
    "    async with stdio_client(SERVER_PARAMS) as (read, write):\n",
    "        async with ClientSession(read, write) as session:\n",
    "            await session.initialize()\n",
    "            return await fn(session)\n",
    "\n",
    "\n",
    "def mcp_run(fn):\n",
    "    \"\"\"Synchronous wrapper: open MCP session, call fn(session), return result.\"\"\"\n",
    "    return run(_with_session(fn))\n",
    "\n",
    "\n",
    "print(\"Setup complete.\")\n",
    "print(f\"Server script : {SERVER_PARAMS.args[0]}\")\n",
    "print(f\"Model         : {MODEL}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 2 -- Connect and Initialize\n",
    "\n",
    "The MCP protocol opens with a handshake: the client sends `initialize`, the server returns\n",
    "its name, version, and the list of capability categories it supports (tools, resources,\n",
    "prompts, logging, etc.).\n",
    "\n",
    "The `stdio_client()` context manager launches the server as a subprocess and opens the\n",
    "transport. `ClientSession.initialize()` runs the handshake and makes the session ready."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def demo_connect(session: ClientSession):\n",
    "    \"\"\"Show the server's initialization response.\"\"\"\n",
    "    # The session.initialize() call was already made inside mcp_run().\n",
    "    # We inspect what the server declared about itself:\n",
    "    info = session.server_info          # ServerInfo(name, version)\n",
    "    caps = session.server_capabilities  # ServerCapabilities object\n",
    "\n",
    "    print(\"MCP server connected successfully\")\n",
    "    print(f\"  Server name    : {info.name}\")\n",
    "    print(f\"  Server version : {info.version}\")\n",
    "    print(\"  Capabilities   :\")\n",
    "    print(f\"    tools     : {caps.tools is not None}\")\n",
    "    print(f\"    resources : {caps.resources is not None}\")\n",
    "    print(f\"    prompts   : {caps.prompts is not None}\")\n",
    "    print(f\"    logging   : {caps.logging is not None}\")\n",
    "    return True\n",
    "\n",
    "\n",
    "mcp_run(demo_connect)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 3 -- Tool Discovery\n",
    "\n",
    "In C2, tool schemas were hardcoded in the client application. With MCP, the client\n",
    "discovers them from the server at runtime via `session.list_tools()`.\n",
    "\n",
    "What comes back is the same structure you would have written manually in C2 --\n",
    "name, description, and JSON Schema for inputs -- but served by the domain expert\n",
    "who built the server, not by your application code."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "\n",
    "async def demo_list_tools(session: ClientSession):\n",
    "    result = await session.list_tools()\n",
    "    print(f\"Server exposes {len(result.tools)} tool(s):\\n\")\n",
    "    for tool in result.tools:\n",
    "        print(f\"  Tool: {tool.name}\")\n",
    "        print(f\"  Desc: {tool.description}\")\n",
    "        props = tool.inputSchema.get(\"properties\", {})\n",
    "        required = tool.inputSchema.get(\"required\", [])\n",
    "        for param, schema in props.items():\n",
    "            req_flag = \" (required)\" if param in required else \" (optional)\"\n",
    "            default = f\", default={schema.get('default')!r}\" if \"default\" in schema else \"\"\n",
    "            print(f\"    param: {param} [{schema.get('type', '?')}]{req_flag}{default}\")\n",
    "        print()\n",
    "    return result.tools\n",
    "\n",
    "\n",
    "discovered_tools = mcp_run(demo_list_tools)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Translate to Anthropic's tool format -- this is all that's needed to use them with Claude\n",
    "anthropic_tools = [\n",
    "    {\n",
    "        \"name\": t.name,\n",
    "        \"description\": t.description or \"\",\n",
    "        \"input_schema\": t.inputSchema,\n",
    "    }\n",
    "    for t in discovered_tools\n",
    "]\n",
    "\n",
    "print(\"Anthropic-format tool list (ready to pass to messages.create):\")\n",
    "for t in anthropic_tools:\n",
    "    print(f\"  {t['name']}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 4 -- Calling Tools Directly\n",
    "\n",
    "Before wiring tools to Claude, call them directly through the session. This is the MCP\n",
    "equivalent of unit-testing tool functions in C2: verify the output format and content\n",
    "before attaching to an agent loop.\n",
    "\n",
    "`session.call_tool(name, arguments)` sends the call to the server and returns a\n",
    "`CallToolResult` with a `content` list of text (or image, or embedded resource) blocks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def demo_tools_direct(session: ClientSession):\n",
    "    print(\"=\" * 60)\n",
    "    print(\"Tool: get_warehouse_summary (days=14)\")\n",
    "    print(\"=\" * 60)\n",
    "    result = await session.call_tool(\"get_warehouse_summary\", {\"days\": 14})\n",
    "    print(result.content[0].text)\n",
    "\n",
    "    print()\n",
    "    print(\"=\" * 60)\n",
    "    print(\"Tool: get_failing_jobs (days=30)\")\n",
    "    print(\"=\" * 60)\n",
    "    result = await session.call_tool(\"get_failing_jobs\", {\"days\": 30})\n",
    "    print(result.content[0].text)\n",
    "\n",
    "    print()\n",
    "    print(\"=\" * 60)\n",
    "    print(\"Tool: check_job_sla (job_name='dbt_fct_subscriptions')\")\n",
    "    print(\"=\" * 60)\n",
    "    result = await session.call_tool(\n",
    "        \"check_job_sla\", {\"job_name\": \"dbt_fct_subscriptions\"}\n",
    "    )\n",
    "    print(result.content[0].text)\n",
    "\n",
    "\n",
    "mcp_run(demo_tools_direct)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 5 -- Resources\n",
    "\n",
    "Resources are the MCP primitive with no direct equivalent in C2. A resource is a piece of\n",
    "data exposed at a URI address -- think reference documents, schema definitions, runbooks.\n",
    "\n",
    "Resources are **read**, not **called**: use `session.list_resources()` to enumerate what is\n",
    "available, and `session.read_resource(uri)` to fetch the content.\n",
    "\n",
    "The content comes back in the `contents` list of the result. Each item has a `uri` and\n",
    "either a `text` field (text resources) or a `blob` field (binary resources)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def demo_resources(session: ClientSession):\n",
    "    # List available resources\n",
    "    list_result = await session.list_resources()\n",
    "    print(f\"Server exposes {len(list_result.resources)} resource(s):\\n\")\n",
    "    for r in list_result.resources:\n",
    "        print(f\"  URI       : {r.uri}\")\n",
    "        print(f\"  Name      : {r.name}\")\n",
    "        print(f\"  MIME type : {r.mimeType or 'text/plain'}\")\n",
    "        print()\n",
    "\n",
    "    # Read the schema resource (shorter than the runbook)\n",
    "    schema_result = await session.read_resource(\"data://dataset-schema\")\n",
    "    schema_text = schema_result.contents[0].text\n",
    "    print(\"Resource content: data://dataset-schema\")\n",
    "    print(\"-\" * 50)\n",
    "    print(schema_text)\n",
    "\n",
    "    # Read the first ~300 chars of the runbook to confirm it loaded\n",
    "    runbook_result = await session.read_resource(\"data://warehouse-runbook\")\n",
    "    runbook_text = runbook_result.contents[0].text\n",
    "    print()\n",
    "    print(\"Resource: data://warehouse-runbook (first 300 chars)\")\n",
    "    print(\"-\" * 50)\n",
    "    print(runbook_text[:300], \"...\")\n",
    "\n",
    "    return runbook_text\n",
    "\n",
    "\n",
    "runbook_content = mcp_run(demo_resources)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 6 -- Prompts\n",
    "\n",
    "Prompts are parametrised message templates stored on the server. The server author writes\n",
    "and maintains the template; any client can retrieve it by name with arguments.\n",
    "\n",
    "`session.list_prompts()` enumerates available prompts.  \n",
    "`session.get_prompt(name, arguments={...})` retrieves the rendered template as a list of\n",
    "messages ready to pass to Claude."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def demo_prompts(session: ClientSession):\n",
    "    # List available prompts\n",
    "    list_result = await session.list_prompts()\n",
    "    print(f\"Server exposes {len(list_result.prompts)} prompt(s):\\n\")\n",
    "    for p in list_result.prompts:\n",
    "        print(f\"  Name: {p.name}\")\n",
    "        print(f\"  Desc: {p.description}\")\n",
    "        if p.arguments:\n",
    "            for arg in p.arguments:\n",
    "                req = \" (required)\" if arg.required else \" (optional)\"\n",
    "                print(f\"    arg: {arg.name}{req} -- {arg.description}\")\n",
    "        print()\n",
    "\n",
    "    # Retrieve the incident_analysis prompt with concrete arguments\n",
    "    prompt_result = await session.get_prompt(\n",
    "        \"incident_analysis\",\n",
    "        arguments={\n",
    "            \"warehouse_name\": \"WH_BI_M\",\n",
    "            \"date_range\": \"2025-07-20 to 2025-08-10\",\n",
    "        },\n",
    "    )\n",
    "\n",
    "    prompt_text = prompt_result.messages[0].content.text\n",
    "    print(\"Rendered prompt (incident_analysis):\")\n",
    "    print(\"-\" * 60)\n",
    "    print(prompt_text)\n",
    "    return prompt_text\n",
    "\n",
    "\n",
    "rendered_prompt = mcp_run(demo_prompts)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 7 -- Claude with MCP Tools\n",
    "\n",
    "Now connect everything: the server's tools, Claude, and the execution loop from C2.\n",
    "\n",
    "The loop is structurally identical to C2. The only change is the execution backend:\n",
    "\n",
    "```python\n",
    "# C2: local function dispatch\n",
    "result = TOOL_DISPATCH[block.name](block.input)\n",
    "\n",
    "# D1: MCP server dispatch\n",
    "result = await session.call_tool(block.name, block.input)\n",
    "```\n",
    "\n",
    "Everything else -- appending tool_use blocks, returning tool_result messages, cycling\n",
    "until end_turn -- is unchanged."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def run_claude_with_mcp(question: str, verbose: bool = True) -> str:\n",
    "    \"\"\"Run Claude with tools sourced from the MCP server.\n",
    "\n",
    "    Opens a session, discovers tools, runs the execution loop, returns\n",
    "    Claude's final text response.\n",
    "    \"\"\"\n",
    "    async with stdio_client(SERVER_PARAMS) as (read, write):\n",
    "        async with ClientSession(read, write) as session:\n",
    "            await session.initialize()\n",
    "\n",
    "            # -- Discover tools from the server --\n",
    "            tools_result = await session.list_tools()\n",
    "            anthropic_tools = [\n",
    "                {\n",
    "                    \"name\": t.name,\n",
    "                    \"description\": t.description or \"\",\n",
    "                    \"input_schema\": t.inputSchema,\n",
    "                }\n",
    "                for t in tools_result.tools\n",
    "            ]\n",
    "            if verbose:\n",
    "                print(f\"Discovered {len(anthropic_tools)} tools from MCP server.\")\n",
    "\n",
    "            # -- Run execution loop --\n",
    "            messages = [{\"role\": \"user\", \"content\": question}]\n",
    "            turn = 0\n",
    "            while turn < 10:\n",
    "                turn += 1\n",
    "                response = anthropic_client.messages.create(\n",
    "                    model=MODEL,\n",
    "                    max_tokens=4096,\n",
    "                    tools=anthropic_tools,\n",
    "                    messages=messages,\n",
    "                )\n",
    "\n",
    "                if verbose:\n",
    "                    tool_calls = [\n",
    "                        b.name for b in response.content if b.type == \"tool_use\"\n",
    "                    ]\n",
    "                    print(f\"  Turn {turn}: stop_reason={response.stop_reason}\", end=\"\")\n",
    "                    if tool_calls:\n",
    "                        print(f\", tools={tool_calls}\")\n",
    "                    else:\n",
    "                        print()\n",
    "\n",
    "                if response.stop_reason == \"end_turn\":\n",
    "                    for block in response.content:\n",
    "                        if block.type == \"text\":\n",
    "                            return block.text\n",
    "                    return \"(no text block in final response)\"\n",
    "\n",
    "                if response.stop_reason == \"tool_use\":\n",
    "                    messages.append(\n",
    "                        {\"role\": \"assistant\", \"content\": response.content}\n",
    "                    )\n",
    "                    tool_results = []\n",
    "                    for block in response.content:\n",
    "                        if block.type == \"tool_use\":\n",
    "                            # Execute via MCP session -- not a local function\n",
    "                            mcp_result = await session.call_tool(\n",
    "                                block.name, block.input\n",
    "                            )\n",
    "                            tool_results.append({\n",
    "                                \"type\": \"tool_result\",\n",
    "                                \"tool_use_id\": block.id,\n",
    "                                \"content\": mcp_result.content[0].text,\n",
    "                            })\n",
    "                    messages.append({\"role\": \"user\", \"content\": tool_results})\n",
    "\n",
    "            return \"(max_turns reached)\"\n",
    "\n",
    "\n",
    "print(\"run_claude_with_mcp() defined.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Demo 1: cost distribution question -- expects get_warehouse_summary\n",
    "q1 = \"Which warehouse has the highest credit spend over the last 30 days and by how much does it exceed the average?\"\n",
    "\n",
    "print(f\"Question: {q1}\")\n",
    "print(\"-\" * 70)\n",
    "answer = run(run_claude_with_mcp(q1))\n",
    "print()\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Demo 2: pipeline health -- expects get_failing_jobs + check_job_sla\n",
    "q2 = (\n",
    "    \"Which pipeline job has failed the most times in the last 90 days? \"\n",
    "    \"Is it currently within its SLA?\"\n",
    ")\n",
    "\n",
    "print(f\"Question: {q2}\")\n",
    "print(\"-\" * 70)\n",
    "answer = run(run_claude_with_mcp(q2))\n",
    "print()\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Demo 3: use the rendered incident_analysis prompt from the server\n",
    "# This is a server-defined workflow: Claude follows the steps in the prompt\n",
    "# and uses multiple tools in sequence.\n",
    "\n",
    "print(\"Running incident analysis using server-side prompt...\")\n",
    "print(\"-\" * 70)\n",
    "answer = run(run_claude_with_mcp(rendered_prompt))\n",
    "print()\n",
    "print(answer)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Section 8 -- Practitioner Lab\n",
    "\n",
    "### Exercise 1: Inject a resource into a Claude query\n",
    "\n",
    "Read the `data://warehouse-runbook` resource and pass its content to Claude as context\n",
    "for a question about the runbook's procedures. Compare the answer quality with and without\n",
    "the resource content.\n",
    "\n",
    "```python\n",
    "async def query_with_resource(session: ClientSession):\n",
    "    runbook = (await session.read_resource(\"data://warehouse-runbook\")).contents[0].text\n",
    "    messages = [{\n",
    "        \"role\": \"user\",\n",
    "        \"content\": (\n",
    "            f\"<runbook>\\n{runbook}\\n</runbook>\\n\\n\"\n",
    "            \"According to the runbook, what steps should be taken when a \"\n",
    "            \"warehouse credit spike is detected in the first 30 minutes?\"\n",
    "        )\n",
    "    }]\n",
    "    response = anthropic_client.messages.create(\n",
    "        model=MODEL, max_tokens=1024, messages=messages\n",
    "    )\n",
    "    return response.content[0].text\n",
    "\n",
    "print(mcp_run(query_with_resource))\n",
    "```\n",
    "\n",
    "### Exercise 2: Multi-step correlation with parallel tool calls\n",
    "\n",
    "Ask: *\"Are the pipeline failures around 2025-07-15 correlated with a warehouse cost\n",
    "increase?\"*  \n",
    "Claude should call `get_failing_jobs` and `get_warehouse_summary` in parallel (one turn,\n",
    "two `tool_use` blocks). Print the verbose turn log to confirm parallelism.\n",
    "\n",
    "### Exercise 3: Invalid job name error handling\n",
    "\n",
    "Call `check_job_sla` with a job name that does not exist in the dataset and observe\n",
    "the server's error response. Confirm that the error is returned as a `tool_result`\n",
    "content block (not a Python exception), and that Claude handles it gracefully in the\n",
    "next turn.\n",
    "\n",
    "### Exercise 4: Add a fourth tool to the server\n",
    "\n",
    "Add a `get_top_queries(warehouse_name, days)` tool to `mcp_data_server.py` that returns\n",
    "the days with the highest `avg_execution_time_s` for a named warehouse. Restart the\n",
    "server (re-run the setup cell) and confirm `list_tools()` now returns four tools -- no\n",
    "changes needed to this notebook."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "name": "python",
   "version": "3.11.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
