Introduction
Kafka Connect provides flexibility, but at the same time, it’s necessary to understand the internal structure with different layers for potential custom implementations. For instance, we will review the source code for source connectors, it’s very similar to sink tasks. All layers execute the same logic; the only difference lies in the ordering.
Connectors
Let’s start from the code in org.apache.kafka.connect.runtime.AbstractWorkerSourceTask#execute
:
if (toSend == null) {
prepareToPollTask();
log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
long start = time.milliseconds();
toSend = poll();
if (toSend != null) {
recordPollReturned(toSend.size(), time.milliseconds() - start);
}
}
As you can see, the worker calls poll
, which, in turn, invokes the poll
method from the task:
protected List<SourceRecord> poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
// Do nothing. Let the framework poll whenever it's ready.
return null;
}
}
It’s the first place that can be customized. Tasks can be defined at the connector level. SourceTask
is a task responsible for pulling records from another system for storage in Kafka. To add a custom task implementation, you should implement the org.apache.kafka.connect.connector.Connector#taskClass
method.
Here, we have two important interfaces that provide the possibility of setting up connectors. It’s the first level of our investigation; connectors establish the link between Kafka and external systems, enabling data ingestion and egress. Kafka Connect offers a variety of pre-built connectors for popular systems like databases, cloud storage, and messaging platforms, in addition to the option of developing custom connectors.
Transformers
Let’s move forward. As part of org.apache.kafka.connect.runtime.AbstractWorkerSourceTask#execute
, we have the logic for sending the records.
if (!sendRecords()) {
stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
Let’s navigate to this implementation: org.apache.kafka.connect.runtime.AbstractWorkerSourceTask#sendRecords
– the most interesting part for us.
// ...
for (final SourceRecord preTransformRecord : toSend) {
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);
// ...
}
// ...
transformationChain
serves as a wrapper for the list of transformations. To set up transformations, two interfaces, org.apache.kafka.connect.transforms.predicates.Predicate
and org.apache.kafka.connect.transforms.Transformation
, can be implemented. Transformers come into play for implementing business logic, enriching data with external sources, or applying custom transformations.
Kafka Connect provides ready-to-use transformations and predicates located in org.apache.kafka.connect.transforms
from the org.apache.kafka:connect-transforms
dependency. The list of transformations can be passed as a parameter for a connector. Each transformation can define the required configuration parameters, and all this functionality is housed in org.apache.kafka.connect.transforms.Transformation
.
Transformers, as the name suggests, are responsible for transforming data within Kafka Connect. They operate on a per-record basis, enabling data enrichment, filtering, and restructuring before it is written to or after it is read from Kafka topics.
Converters
Converters play a crucial role in Kafka Connect by facilitating the translation of data between Kafka’s internal data format and the format specific to the source or sink system. Let’s review the code from org.apache.kafka.connect.runtime.AbstractWorkerSourceTask#convertTransformedRecord
:
protected ProducerRecord<byte[], byte[]> convertTransformedRecord(SourceRecord record) {
if (record == null) {
return null;
}
RecordHeaders headers = retryWithToleranceOperator.execute(() -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverter.getClass());
byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
Stage.KEY_CONVERTER, keyConverter.getClass());
byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
Stage.VALUE_CONVERTER, valueConverter.getClass());
if (retryWithToleranceOperator.failed()) {
return null;
}
return new ProducerRecord<>(record.topic(), record.kafkaPartition(),
ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
}
And the definition of keyConverter
, valueConverter
:
private final Converter keyConverter;
private final Converter valueConverter;
A Converter is an interface that facilitates the translation between Kafka Connect data and byte[]
. As we saw in the example, it’s used for consuming/producing Kafka messages.
Conclusion
We have reviewed the three most important layers to implement for custom connectors. Each layer was designed for specific goals and has corresponding limitations. The source code from Kafka Connect provides a deeper understanding.