Define a state case class and output type. Use flatMapGroupsWithState[StateType, OutputType](outputMode, timeoutConf)(stateFunc) on a KeyValueGroupedDataset.
In the state function, receive (key, Iterator[InputRow], GroupState[StateType]); update state, set a timeout with state.setTimeoutDuration or state.setTimeoutTimestamp, and yield zero or more output rows.
Handle state.hasTimedOut to emit or expire state when no new data arrives within the timeout.
For PySpark, use applyInPandasWithState with a Python function receiving (key, values: pd.DataFrame, state: GroupState); return a pd.DataFrame of output rows.
Choose outputMode Update or Append depending on whether you emit results incrementally or only on timeout.
Known gotchas
State serialization uses encoders; ensure your state type has a registered Encoder or use a supported case class / Python type.
Processing-time timeouts fire approximately; event-time timeouts require a watermark and fire only after the watermark advances past the timeout timestamp.
flatMapGroupsWithState state size is unbounded per key unless you explicitly expire state; uncontrolled growth causes executor OOM.
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp