subscriptions
Server-side subscriptions/listen support (2026-07-28, SEP-2575).
On the 2026-07-28 wire there is no standing GET stream: a client opts in to
server events by sending a subscriptions/listen request whose response IS
the stream. This module provides the two pieces a server needs:
SubscriptionBus: the pluggable fan-out seam. The bus carries typedServerEventvalues, not wire notifications - the listen handler owns subscription-id stamping and per-stream filtering, so a custom bus (e.g. backed by Redis pub/sub for multi-replica deployments) never sees JSON-RPC. The in-process default isInMemorySubscriptionBus.ListenHandler: the request handler that servessubscriptions/listen.MCPServerregisters one automatically; lowlevelServerusers pass an instance ason_subscriptions_listen=.
Per the spec, the handler acknowledges first (the ack is the first frame on
the stream), tags every frame with the listen request's JSON-RPC id under
_meta["io.modelcontextprotocol/subscriptionId"], and never delivers an
event kind the client did not request. Delivery is fire-and-forget with no
replay: a dropped stream is not resumable - clients re-listen and refetch.
SUBSCRIPTION_ID_META_KEY
module-attribute
SUBSCRIPTION_ID_META_KEY = (
"io.modelcontextprotocol/subscriptionId"
)
The _meta key carrying the subscription id on every listen-stream frame.
The value is the subscriptions/listen request's JSON-RPC id, verbatim.
ToolsListChanged
dataclass
The server's tool list changed.
Source code in src/mcp/server/subscriptions.py
62 63 64 | |
PromptsListChanged
dataclass
The server's prompt list changed.
Source code in src/mcp/server/subscriptions.py
67 68 69 | |
ResourcesListChanged
dataclass
The server's resource list changed.
Source code in src/mcp/server/subscriptions.py
72 73 74 | |
ResourceUpdated
dataclass
The resource at uri changed and may need to be read again.
Source code in src/mcp/server/subscriptions.py
77 78 79 80 81 | |
ServerEvent
module-attribute
ServerEvent = (
ToolsListChanged
| PromptsListChanged
| ResourcesListChanged
| ResourceUpdated
)
An event a server publishes for delivery to listen subscribers.
SubscriptionBus
Bases: Protocol
Fan-out seam between event publishers and open listen streams.
Implement this over an external pub/sub backend (Redis, NATS, ...) to fan
events out across replicas: publish forwards the event to the backend,
and each replica's bus invokes its local listeners for events arriving
from the backend. The same instance can be shared across servers.
publish is async so backend implementations can do network I/O.
subscribe is synchronous local registration. Listeners are synchronous,
must not raise, and are invoked on the server's event loop.
Source code in src/mcp/server/subscriptions.py
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | |
publish
async
publish(event: ServerEvent) -> None
Deliver event to every subscribed listener.
Source code in src/mcp/server/subscriptions.py
101 102 103 | |
subscribe
subscribe(
listener: Callable[[ServerEvent], None],
) -> Callable[[], None]
Register listener and return an idempotent unsubscribe callable.
Source code in src/mcp/server/subscriptions.py
105 106 107 | |
InMemorySubscriptionBus
In-process SubscriptionBus: synchronous fan-out to listeners in subscription order.
Source code in src/mcp/server/subscriptions.py
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | |
publish
async
publish(event: ServerEvent) -> None
Deliver event to every subscribed listener.
A raising listener is logged and skipped: one bad listener must not starve the others or fail the publishing handler. Ends with a checkpoint so a burst of publishes from one task lets listen streams drain between events instead of overflowing their buffers unread.
Source code in src/mcp/server/subscriptions.py
118 119 120 121 122 123 124 125 126 127 128 129 130 131 | |
subscribe
subscribe(
listener: Callable[[ServerEvent], None],
) -> Callable[[], None]
Register listener and return an idempotent unsubscribe callable.
Source code in src/mcp/server/subscriptions.py
133 134 135 136 137 138 139 140 141 | |
ListenHandler
Serves subscriptions/listen: one call is one subscription stream.
Register on a lowlevel Server via on_subscriptions_listen= (or
add_request_handler); MCPServer does so automatically. Each call
acknowledges the honored filter first, then forwards matching bus events
onto the request's response stream until the client disconnects (which
cancels the handler; the stream just ends, per the spec's abrupt-close
contract) or close ends all streams gracefully.
Requires a transport that can stream a request's response (streamable HTTP's SSE mode).
max_subscriptions bounds concurrent streams (further listen requests are
rejected with INTERNAL_ERROR, before the ack). max_buffered_events
bounds each stream's event backlog: a stream whose client has stopped
reading is ended at the cap (the client re-listens and refetches - there
is no replay, so ending the stream loses nothing the backlog wasn't
already losing).
Source code in src/mcp/server/subscriptions.py
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 | |
__call__
async
__call__(
ctx: ServerRequestContext[Any, Any],
params: SubscriptionsListenRequestParams,
) -> SubscriptionsListenResult
Serve one listen stream.
Source code in src/mcp/server/subscriptions.py
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | |
close
close() -> None
Initiate graceful closure of every open listen stream.
Each stream then drains its buffered events and sends its
SubscriptionsListenResult (stamped with the subscription id) as the
final frame from its own handler task - the spec's graceful closure
flow, telling clients the stream ended deliberately rather than
dropping. This method only initiates that; it does not wait for the
streams to finish flushing.
Source code in src/mcp/server/subscriptions.py
285 286 287 288 289 290 291 292 293 294 295 296 | |