Create a Flink SQL table with CONNECTOR='upsert-kafka', TOPIC, PROPERTIES.BOOTSTRAP.SERVERS, KEY.FORMAT, and VALUE.FORMAT; the PRIMARY KEY clause is required and maps to the Kafka message key
Set KEY.FORMAT='json' and VALUE.FORMAT='json' (or 'avro-confluent' with Schema Registry properties) matching the format expected by downstream Kafka consumers
Ensure the upstream query emits a changelog stream with upsert semantics (e.g., a GROUP BY aggregation or a deduplicated stream); the upsert-kafka connector translates +U/-D changelog rows into key-value Kafka writes and tombstones
Set PROPERTIES.COMPRESSION.TYPE='lz4' on the target topic and enable log compaction (cleanup.policy=compact) at the Kafka broker level so the topic retains only the latest value per key
Read the upsert-kafka topic back using a second Flink SQL table with the same connector to verify the changelog is correctly maintained and can serve as a KTable source
Known gotchas
The upsert-kafka connector does not support append-only streams; if the upstream query emits only INSERT rows (no updates or deletes), use the standard kafka connector instead to avoid unexpected tombstone generation
Flink writes tombstone records (null value) for DELETE changelog entries; if downstream consumers do not handle null values, they may throw NullPointerExceptions or silently skip deletions
Schema evolution on the key format is especially dangerous with upsert-kafka because changing the key schema can cause existing compacted records to become unreachable under the new key, leading to phantom entries in the compacted log
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp