Add the spark-sql-kafka connector dependency matching your Spark and Kafka client versions (e.g., spark-sql-kafka-0-10 artifact; verify the correct artifact coordinates for your Spark version).
Create a streaming DataFrame using spark.readStream.format('kafka').option('kafka.bootstrap.servers', '...').option('subscribe', 'topic-name').option('startingOffsets', 'earliest' or 'latest').load().
The resulting DataFrame has columns: key, value (both binary), topic, partition, offset, timestamp, timestampType. Cast value to string or deserialize as needed.
Apply transformations (parsing, filtering, aggregation) on the streaming DataFrame.
Write results with df.writeStream.format(...).option('checkpointLocation', '...').start() to begin consumption.
Known gotchas
The Kafka connector dependency must match the Spark version exactly; version mismatch causes ClassNotFoundException at runtime.
Kafka credentials (SSL, SASL) are passed via kafka.* prefixed options; do not hard-code secrets — use Spark secrets or external config.
startingOffsets='earliest' on a large topic replays all historical data on a fresh start; use a specific JSON offset map for precise control.
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp