In a Transformer or Processor implementation, obtain a ProcessorContext and call context.schedule(Duration.ofSeconds(30), PunctuationType.STREAM_TIME, timestamp -> { ... }) inside init()
Inside the punctuator lambda, iterate the local state store using store.all() or store.range(), emit aggregated records via context.forward(), and optionally clear processed entries
Use PunctuationType.STREAM_TIME to fire only when records advance stream time, ensuring the punctuator is deterministic and replays correctly during reprocessing
Use PunctuationType.WALL_CLOCK_TIME only for side effects that should fire regardless of data flow (e.g., health checks), not for data emission, because it breaks determinism
Register the transformer in the topology with builder.addProcessor() and connect it to the relevant state store via connectProcessorAndStateStores()
Known gotchas
STREAM_TIME punctuators do not fire if no records arrive; an idle partition will stall stream-time advance and suppress scheduled outputs indefinitely
Forwarding records from a punctuator bypasses the timestamp extracted from the input record; set an explicit timestamp with context.forward(key, value, To.all().withTimestamp(ts)) to preserve event-time semantics downstream
The Processor API is lower-level than the DSL and does not participate in repartitioning automatically; if a downstream operation requires co-partitioning the topology must be designed accordingly
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp