Developing Custom Transformations in Kafka Connect

Posted by

Custom transformations in Kafka Connect offer a powerful approach to manipulate data within connectors, allowing for tasks such as replacement, filtering, or enrichment of messages. The full code, along with other examples, is accessible in the repository.

To define custom transformation, it’s required to implement org.apache.kafka.connect.transforms.Transformation interface with main method apply(R record). In our example, we will replace the original message by new message that contains random generated id and name parameter equal to the pageid field from the original message.

Below is the implementation of the MessageTransformation class:

public class MessageTransformation<R extends ConnectRecord<R>> implements Transformation<R> {

private static final String ID_FIELD = "id";
private static final String NAME = "name";
private static final String PURPOSE = "message model";
private static final String NAME_DEFAULT_VALUE = "default_name";
private static final String PAGEID_KEY = "pageid";

@Override
public R apply(R record) {
var transformedSchema = SchemaBuilder.struct()
.field(ID_FIELD, Schema.STRING_SCHEMA)
.field(NAME, Schema.STRING_SCHEMA)
.build();

Schema schema = record.valueSchema();
if (schema == null) {
var transformed = new Struct(transformedSchema)
.put(ID_FIELD, UUID.randomUUID().toString())
.put(NAME, NAME_DEFAULT_VALUE);

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Schema.STRING_SCHEMA,
transformed.get(ID_FIELD),
transformedSchema,
transformed,
record.timestamp()
);
} else {
var value = Requirements.requireStruct(record.value(), PURPOSE);
var transformed = new Struct(transformedSchema)
.put(ID_FIELD, UUID.randomUUID().toString())
.put(NAME, value.get(PAGEID_KEY).toString());

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Schema.STRING_SCHEMA,
transformed.get(ID_FIELD),
transformedSchema,
transformed,
record.timestamp()
);
}
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

This implementation is located in kafka-connect-transformation module. To apply this transformation, a connector config should contain:

transforms=MessageTransformation
transforms.MessageTransformation.type=com.uuidable.transformation.MessageTransformation

This is a basic example illustrating the concepts of transformation in Kafka Connect. For further development, this transformation can be extended with custom predicates and transformation-specific configurations.

Leave a Reply