Python API Reference

Full reference for all functions, classes, and types in the mcpwatch package.

instrument()

The main SDK entry point. Wraps an MCP server to automatically capture events.

def instrument(
    server: T,
    *,
    api_key: str,
    endpoint: str = "https://api.mcpwatch.dev",
    debug: bool = False,
    sample_rate: float = 1.0,
    max_batch_size: int = 50,
    flush_interval: float = 1.0,
    on_quota_warning: Callable[[QuotaInfo], None] | None = None,
) -> T

Parameters:

  • server — An MCP Server instance from the mcp package (positional)
  • api_key — MCPWatch API key (keyword-only, required). If empty, instrumentation is skipped.
  • endpoint — Ingestion endpoint URL
  • debug — Enable debug logging via the mcpwatch logger
  • sample_rate — Probability (0.0–1.0) that each handler invocation is captured
  • max_batch_size — Max events per batch before an immediate flush
  • flush_interval — Seconds between periodic background flushes
  • on_quota_warning — Called when quota status is “warning” or “exceeded”

Returns: The same server instance with wrapped tool() and resource() methods.

Example:

import os
from mcp.server import Server
from mcpwatch import instrument

server = instrument(
    Server("my-server"),
    api_key=os.environ["MCPWATCH_API_KEY"],
    debug=True,
    sample_rate=0.5,
)

MCPWatchClient

Low-level async HTTP client for sending event batches. Most users should use instrument() instead.

Constructor

def __init__(
    self,
    api_key: str,
    endpoint: str = "https://api.mcpwatch.dev",
    debug: bool = False,
    on_quota_warning: Callable[[QuotaInfo], None] | None = None,
)

Methods

send_batch(events)

Sends a batch of events to the ingestion API. Retries once on 5xx errors.

async def send_batch(self, events: list[McpWatchEvent]) -> SendResult

close()

Closes the underlying httpx client.

async def close(self) -> None

quota_status

Returns the most recent quota information, or None.

@property
def quota_status(self) -> QuotaInfo | None

detect_transport_type()

Utility function to detect the transport type of an MCP server.

def detect_transport_type(server: Any) -> str

Returns: One of "stdio", "sse", "streamable-http", "websocket", or "unknown".


Types

MCPWatchConfig

class MCPWatchConfig(BaseModel):
    api_key: str
    endpoint: str = "https://api.mcpwatch.dev"
    debug: bool = False
    sample_rate: float = 1.0
    max_batch_size: int = 50
    flush_interval: float = 1.0
    on_quota_warning: Callable[[QuotaInfo], None] | None = None

EventType

class EventType(str, Enum):
    TOOL_CALL = "tool_call"
    RESOURCE_READ = "resource_read"
    PROMPT_GET = "prompt_get"
    INITIALIZE = "initialize"
    CLOSE = "close"
    NOTIFICATION = "notification"
    ERROR = "error"

McpWatchEvent

class McpWatchEvent(BaseModel):
    event_id: str
    trace_id: str
    span_id: str
    parent_span_id: str | None = None
    event_type: EventType
    event_name: str
    started_at: str                          # ISO 8601
    ended_at: str | None = None
    duration_ms: float = 0.0
    mcp_method: str                          # e.g., "tools/call"
    mcp_protocol_version: str = "2025-11-25"
    request_params: dict[str, Any] = {}
    response_content: dict[str, Any] = {}
    is_error: bool = False
    error_code: int | None = None
    error_message: str | None = None
    transport_type: str = ""
    server_name: str = ""
    server_version: str = ""
    client_name: str = ""
    client_version: str = ""
    attributes: dict[str, str] = {}
    sdk_name: str = "mcpwatch-python"
    sdk_version: str = "1.0.0"

QuotaInfo

class QuotaInfo(BaseModel):
    limit: int
    remaining: int
    status: str                              # "ok", "warning", "exceeded", "hard_limit"
    reset: str                               # ISO 8601

Automatically captured operations

Operations captured by instrument():

Server MethodEvent TypeMCP MethodCaptured Data
@server.tool()tool_calltools/callTool name, arguments, result, duration, errors
@server.resource()resource_readresources/readResource name, params, response, duration
server.run()initializeinitializeServer name, transport type
atexitclosecloseServer name

Batching behavior

  • Events queue in memory (max 1000 pending)
  • Flush triggers: batch full (max_batch_size), timer (flush_interval), or process exit
  • Failed sends retry once after 1 second, then drop (5xx only; 4xx not retried)
  • HTTP timeout: 10 seconds per request
  • Ingestion endpoint: POST /v1/events

Last updated — MCPWatch Documentation