Python Usage Guide
This guide covers how the MCPWatch Python SDK instruments your MCP server and what gets captured.
Instrumenting your server
The instrument() function is the primary API. It wraps your server’s decorator methods to automatically capture events:
import os
from mcp.server import Server
from mcpwatch import instrument
server = instrument(
Server("my-server"),
api_key=os.environ["MCPWATCH_API_KEY"],
endpoint="https://api.mcpwatch.dev",
debug=True,
)
After instrumenting, define tools and resources as you normally would:
@server.tool("search_files")
async def search_files(query: str) -> str:
results = await search_index(query)
return json.dumps(results)
@server.resource("config://app")
async def get_config() -> dict:
return {"contents": [{"uri": "config://app", "text": read_config()}]}
Each handler invocation is automatically captured as an event with timing, arguments, response, and error data.
What gets captured
The SDK wraps these server methods:
| Method | Event Type | Captured Data |
|---|---|---|
@server.tool() | tool_call | Tool name, arguments, result, duration, errors |
@server.resource() | resource_read | Resource name, params, response, duration |
server.run() | initialize | Server name, transport type |
| atexit handler | close | Server name |
Client information (name, version) is extracted from the server’s request context on the first tool or resource call.
Sampling
Control the percentage of events captured with the sample_rate option:
server = instrument(
Server("my-server"),
api_key=os.environ["MCPWATCH_API_KEY"],
sample_rate=0.1, # Capture 10% of events
)
Sampling is applied per handler invocation. A sample_rate of 0.5 means each tool call has a 50% chance of being captured. Errors that occur within a sampled call are still captured.
Quota management
MCPWatch tracks your event quota and provides feedback through response headers. Register a callback to be notified when your quota is running low:
server = instrument(
Server("my-server"),
api_key=os.environ["MCPWATCH_API_KEY"],
on_quota_warning=lambda info: print(
f"Quota {info.status}: {info.remaining}/{info.limit} remaining"
),
)
When your quota hits the hard limit (HTTP 429), the SDK automatically pauses ingestion and resumes after the Retry-After period. Events generated during the pause are dropped.
Transport detection
The SDK detects your transport type by inspecting the server object and its class hierarchy. Detected types:
- stdio —
StdioServerTransport - sse —
SSEServerTransport - streamable-http —
StreamableHTTPServerTransport - websocket —
WebSocketServerTransport - unknown — Fallback when no match is found
The detect_transport_type() function is also available as a public export if you need it independently.
Graceful shutdown
The SDK registers an atexit handler that emits a close event and flushes remaining events when the process exits. This works automatically — no explicit shutdown call is needed.
For the initialize event, it is emitted when server.run() is called.
Error handling
The SDK is designed to never interfere with your server’s operation:
- If event delivery fails, it retries once after 1 second, then drops the event
- If the event queue exceeds 1000 pending events, the oldest are dropped
- Network errors are caught and logged (with
debug=True) but never raised - HTTP request timeout is 10 seconds
- Only 5xx errors are retried; 4xx errors are not retried
Async event loop handling
The SDK’s background flush loop runs as an asyncio task. It handles two scenarios:
- Event loop already running (typical for MCP servers): the flush loop starts immediately
- No event loop yet: the flush loop starts lazily when the first event is added
The atexit handler creates a fresh event loop with asyncio.run() for the final flush.
Next steps
- API Reference — Full function and type documentation
- TypeScript SDK — Using MCPWatch with TypeScript servers
Last updated — MCPWatch Documentation