Create a StreamTableEnvironment from a StreamExecutionEnvironment and configure the catalog to point to an Iceberg catalog (REST, Hive, or Hadoop)
Define the Kafka source table with CREATE TABLE using the 'kafka' connector, specifying topic, bootstrap.servers, format (json or avro), and scan.startup.mode
Define the Iceberg sink table with CREATE TABLE using the 'iceberg' connector and properties for catalog-name, catalog-type, database, and table
Write a streaming INSERT INTO iceberg_table SELECT ... FROM kafka_table with optional transformations, filtering, or aggregations in the SQL body
Call tableEnv.executeSql() to submit the streaming job; Flink compiles the SQL plan and runs it as a continuous DataStream job
Configure checkpointing on the underlying StreamExecutionEnvironment before creating the TableEnvironment to ensure Iceberg sink commits are transactional
Known gotchas
Flink SQL streaming jobs require explicit watermark declarations (WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND) in the source table DDL for event-time windowing to work correctly
The Iceberg Flink sink commits data files during checkpoint completion, not continuously; low checkpoint intervals improve data freshness at the cost of higher checkpoint overhead
Flink SQL catalog configurations are session-scoped; the catalog must be re-registered every time a new TableEnvironment is created, or use a persistent catalog backed by a metastore
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp