Define a MapStateDescriptor for the broadcast state and call env.fromSource(...).broadcast(descriptor) to create a BroadcastStream
Connect the keyed DataStream to the BroadcastStream using keyedStream.connect(broadcastStream) to produce a BroadcastConnectedStream
Implement a KeyedBroadcastProcessFunction, overriding processBroadcastElement to update broadcast state and processElement to read it for per-key enrichment
Use ctx.getBroadcastState(descriptor).put(key, value) inside processBroadcastElement and ctx.getBroadcastState(descriptor).get(key) inside processElement
Register an event-time timer inside processElement if you need to expire enrichment state, and handle it in onTimer
Known gotchas
Broadcast state is not partitioned — writes in processBroadcastElement must be idempotent because every parallel subtask receives the same broadcast records independently
Reading broadcast state from processElement is safe, but writing to it from processElement is forbidden and throws an exception at runtime
Checkpointing broadcast state with large config maps can significantly increase checkpoint size; prefer compact lookup structures and bounded map sizes
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp