Python Usage Guide

This guide covers how the MCPWatch Python SDK instruments your MCP server and what gets captured.

Instrumenting your server

The instrument() function is the primary API. It wraps your server’s decorator methods to automatically capture events:

import os
from mcp.server import Server
from mcpwatch import instrument

server = instrument(
    Server("my-server"),
    api_key=os.environ["MCPWATCH_API_KEY"],
    endpoint="https://api.mcpwatch.dev",
    debug=True,
)

After instrumenting, define tools and resources as you normally would:

@server.tool("search_files")
async def search_files(query: str) -> str:
    results = await search_index(query)
    return json.dumps(results)

@server.resource("config://app")
async def get_config() -> dict:
    return {"contents": [{"uri": "config://app", "text": read_config()}]}

Each handler invocation is automatically captured as an event with timing, arguments, response, and error data.

What gets captured

The SDK wraps these server methods:

MethodEvent TypeCaptured Data
@server.tool()tool_callTool name, arguments, result, duration, errors
@server.resource()resource_readResource name, params, response, duration
server.run()initializeServer name, transport type
atexit handlercloseServer name

Client information (name, version) is extracted from the server’s request context on the first tool or resource call.

Sampling

Control the percentage of events captured with the sample_rate option:

server = instrument(
    Server("my-server"),
    api_key=os.environ["MCPWATCH_API_KEY"],
    sample_rate=0.1,  # Capture 10% of events
)

Sampling is applied per handler invocation. A sample_rate of 0.5 means each tool call has a 50% chance of being captured. Errors that occur within a sampled call are still captured.

Quota management

MCPWatch tracks your event quota and provides feedback through response headers. Register a callback to be notified when your quota is running low:

server = instrument(
    Server("my-server"),
    api_key=os.environ["MCPWATCH_API_KEY"],
    on_quota_warning=lambda info: print(
        f"Quota {info.status}: {info.remaining}/{info.limit} remaining"
    ),
)

When your quota hits the hard limit (HTTP 429), the SDK automatically pauses ingestion and resumes after the Retry-After period. Events generated during the pause are dropped.

Transport detection

The SDK detects your transport type by inspecting the server object and its class hierarchy. Detected types:

  • stdioStdioServerTransport
  • sseSSEServerTransport
  • streamable-httpStreamableHTTPServerTransport
  • websocketWebSocketServerTransport
  • unknown — Fallback when no match is found

The detect_transport_type() function is also available as a public export if you need it independently.

Graceful shutdown

The SDK registers an atexit handler that emits a close event and flushes remaining events when the process exits. This works automatically — no explicit shutdown call is needed.

For the initialize event, it is emitted when server.run() is called.

Error handling

The SDK is designed to never interfere with your server’s operation:

  • If event delivery fails, it retries once after 1 second, then drops the event
  • If the event queue exceeds 1000 pending events, the oldest are dropped
  • Network errors are caught and logged (with debug=True) but never raised
  • HTTP request timeout is 10 seconds
  • Only 5xx errors are retried; 4xx errors are not retried

Async event loop handling

The SDK’s background flush loop runs as an asyncio task. It handles two scenarios:

  • Event loop already running (typical for MCP servers): the flush loop starts immediately
  • No event loop yet: the flush loop starts lazily when the first event is added

The atexit handler creates a fresh event loop with asyncio.run() for the final flush.

Next steps

Last updated — MCPWatch Documentation