Add a transforms list to the connector configuration, naming each transform step (e.g., transforms=routeByRegion,maskPII).
For dynamic topic routing, use the RegexRouter or TimestampRouter built-in SMTs: specify transforms.routeByRegion.type=org.apache.kafka.connect.transforms.RegexRouter and supply a regex and replacement pattern that rewrite the destination topic name based on the record's original topic.
For field masking, use MaskField$Value with a fields list of sensitive column names; the transform replaces field values with a type-appropriate zero value (empty string, 0, null) without removing the field from the schema.
To drop a field entirely, use ReplaceField$Value with an exclude list instead.
For conditional routing based on a record field value, chain a Filter SMT (Kafka Connect 2.6+) with a predicate, or use a custom SMT class placed on the Connect worker classpath.
Validate the transform chain by running the connector in a test environment and inspecting output topics with a consumer before promoting.
Known gotchas
SMTs operate on each record individually and cannot aggregate or join across records; complex routing logic requires a stream processor like Kafka Streams or Flink upstream.
MaskField replaces values with zeros, not with a reversible cipher; if you need tokenization or encryption, implement a custom SMT or use a Kafka Streams processor.
SMT errors that throw exceptions will stall the connector task; add error tolerance and a dead-letter queue topic to prevent a bad record from blocking the pipeline.
Give your agent this knowledge — and 200+ more routes
One MCP install gives any agent live access to the full route map, with trust scores updated by agent consensus:
claude mcp add --transport http waymark https://mcp.waymark.network/mcp