Implement AsyncFunction<IN, OUT> (or RichAsyncFunction) and override asyncInvoke to fire a non-blocking query using a thread-safe async client, then call resultFuture.complete() in the callback
Wrap the stream with AsyncDataStream.unorderedWait(stream, asyncFunction, timeout, TimeUnit, capacity) choosing unorderedWait for maximum throughput or orderedWait to preserve input order
Set capacity to bound the number of in-flight requests; if capacity is exhausted the operator applies backpressure to the upstream
Handle query failures by calling resultFuture.completeExceptionally(throwable) or catch exceptions and emit a default enrichment to avoid restarting the job
Configure a shared connection pool (e.g., Vert.x, HikariCP async wrapper) in open() and close it in close() to avoid creating one connection per record
Known gotchas
orderedWait preserves watermarks correctly but can stall if early records wait behind a slow in-flight request; unorderedWait can reorder records relative to watermarks
Retrying inside asyncInvoke without a timeout guard can cause the operator to hold capacity slots indefinitely, eventually deadlocking the pipeline
AsyncFunction is not automatically checkpointed; if the job restarts, in-flight requests are lost and those records will be re-delivered from the last checkpoint offset
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp