Custom transformations in Kafka Connect offer a powerful approach to manipulate data within connectors, allowing for tasks such as replacement, filtering, or enrichment of messages. The full code, along with other examples, is accessible in the repository.
To define custom transformation, it’s required to implement org.apache.kafka.connect.transforms.Transformation
interface with main method apply(R record)
. In our example, we will replace the original message by new message that contains random generated id
and name
parameter equal to the pageid
field from the original message.
Below is the implementation of the MessageTransformation
class:
public class MessageTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String ID_FIELD = "id";
private static final String NAME = "name";
private static final String PURPOSE = "message model";
private static final String NAME_DEFAULT_VALUE = "default_name";
private static final String PAGEID_KEY = "pageid";
@Override
public R apply(R record) {
var transformedSchema = SchemaBuilder.struct()
.field(ID_FIELD, Schema.STRING_SCHEMA)
.field(NAME, Schema.STRING_SCHEMA)
.build();
Schema schema = record.valueSchema();
if (schema == null) {
var transformed = new Struct(transformedSchema)
.put(ID_FIELD, UUID.randomUUID().toString())
.put(NAME, NAME_DEFAULT_VALUE);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
Schema.STRING_SCHEMA,
transformed.get(ID_FIELD),
transformedSchema,
transformed,
record.timestamp()
);
} else {
var value = Requirements.requireStruct(record.value(), PURPOSE);
var transformed = new Struct(transformedSchema)
.put(ID_FIELD, UUID.randomUUID().toString())
.put(NAME, value.get(PAGEID_KEY).toString());
return record.newRecord(
record.topic(),
record.kafkaPartition(),
Schema.STRING_SCHEMA,
transformed.get(ID_FIELD),
transformedSchema,
transformed,
record.timestamp()
);
}
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
This implementation is located in kafka-connect-transformation
module. To apply this transformation, a connector config should contain:
transforms=MessageTransformation
transforms.MessageTransformation.type=com.uuidable.transformation.MessageTransformation
This is a basic example illustrating the concepts of transformation in Kafka Connect. For further development, this transformation can be extended with custom predicates and transformation-specific configurations.