Python API Reference
Full reference for all functions, classes, and types in the mcpwatch package.
instrument()
The main SDK entry point. Wraps an MCP server to automatically capture events.
def instrument(
server: T,
*,
api_key: str,
endpoint: str = "https://api.mcpwatch.dev",
debug: bool = False,
sample_rate: float = 1.0,
max_batch_size: int = 50,
flush_interval: float = 1.0,
on_quota_warning: Callable[[QuotaInfo], None] | None = None,
) -> T
Parameters:
server— An MCPServerinstance from themcppackage (positional)api_key— MCPWatch API key (keyword-only, required). If empty, instrumentation is skipped.endpoint— Ingestion endpoint URLdebug— Enable debug logging via themcpwatchloggersample_rate— Probability (0.0–1.0) that each handler invocation is capturedmax_batch_size— Max events per batch before an immediate flushflush_interval— Seconds between periodic background flusheson_quota_warning— Called when quota status is “warning” or “exceeded”
Returns: The same server instance with wrapped tool() and resource() methods.
Example:
import os
from mcp.server import Server
from mcpwatch import instrument
server = instrument(
Server("my-server"),
api_key=os.environ["MCPWATCH_API_KEY"],
debug=True,
sample_rate=0.5,
)
MCPWatchClient
Low-level async HTTP client for sending event batches. Most users should use instrument() instead.
Constructor
def __init__(
self,
api_key: str,
endpoint: str = "https://api.mcpwatch.dev",
debug: bool = False,
on_quota_warning: Callable[[QuotaInfo], None] | None = None,
)
Methods
send_batch(events)
Sends a batch of events to the ingestion API. Retries once on 5xx errors.
async def send_batch(self, events: list[McpWatchEvent]) -> SendResult
close()
Closes the underlying httpx client.
async def close(self) -> None
quota_status
Returns the most recent quota information, or None.
@property
def quota_status(self) -> QuotaInfo | None
detect_transport_type()
Utility function to detect the transport type of an MCP server.
def detect_transport_type(server: Any) -> str
Returns: One of "stdio", "sse", "streamable-http", "websocket", or "unknown".
Types
MCPWatchConfig
class MCPWatchConfig(BaseModel):
api_key: str
endpoint: str = "https://api.mcpwatch.dev"
debug: bool = False
sample_rate: float = 1.0
max_batch_size: int = 50
flush_interval: float = 1.0
on_quota_warning: Callable[[QuotaInfo], None] | None = None
EventType
class EventType(str, Enum):
TOOL_CALL = "tool_call"
RESOURCE_READ = "resource_read"
PROMPT_GET = "prompt_get"
INITIALIZE = "initialize"
CLOSE = "close"
NOTIFICATION = "notification"
ERROR = "error"
McpWatchEvent
class McpWatchEvent(BaseModel):
event_id: str
trace_id: str
span_id: str
parent_span_id: str | None = None
event_type: EventType
event_name: str
started_at: str # ISO 8601
ended_at: str | None = None
duration_ms: float = 0.0
mcp_method: str # e.g., "tools/call"
mcp_protocol_version: str = "2025-11-25"
request_params: dict[str, Any] = {}
response_content: dict[str, Any] = {}
is_error: bool = False
error_code: int | None = None
error_message: str | None = None
transport_type: str = ""
server_name: str = ""
server_version: str = ""
client_name: str = ""
client_version: str = ""
attributes: dict[str, str] = {}
sdk_name: str = "mcpwatch-python"
sdk_version: str = "1.0.0"
QuotaInfo
class QuotaInfo(BaseModel):
limit: int
remaining: int
status: str # "ok", "warning", "exceeded", "hard_limit"
reset: str # ISO 8601
Automatically captured operations
Operations captured by instrument():
| Server Method | Event Type | MCP Method | Captured Data |
|---|---|---|---|
@server.tool() | tool_call | tools/call | Tool name, arguments, result, duration, errors |
@server.resource() | resource_read | resources/read | Resource name, params, response, duration |
server.run() | initialize | initialize | Server name, transport type |
| atexit | close | close | Server name |
Batching behavior
- Events queue in memory (max 1000 pending)
- Flush triggers: batch full (
max_batch_size), timer (flush_interval), or process exit - Failed sends retry once after 1 second, then drop (5xx only; 4xx not retried)
- HTTP timeout: 10 seconds per request
- Ingestion endpoint:
POST /v1/events
Last updated — MCPWatch Documentation