How to configure consumer with Quarkus and Kafka for losing less messages

Posted by

To prepare the correct configuration for your application, you need to understand how these settings can influence performance of your application and the number of lost messages.

How to define consumer in Quarkus

Simplest way to define consumer in Quarkus:

@Incoming("example-in")
public void receive(Record<Integer, String> record) {
}

The properties that have influence on the number of lost messages

The list of properties that can influence the number of lost messages:
mp.messaging.incoming.example-in.commit-strategy – commit strategy
mp.messaging.incoming.example-in.Enable.auto.commit – enabling auto commit
mp.messaging.incoming.example-in.auto.commit.interval.ms – interval between auto commit actions
mp.messaging.incoming.example-in.max.poll.records – The maximum number of records returned in a single call to poll().
mp.messaging.incoming.example-in.max-queue-size-factor – Multiplier factor to determine maximum number of records queued for processing, using `max.poll.records` * `max-queue-size-factor`
Given that,  it’s very clear that if an instance stops, it can lose the number of messages which were polled and already committed but not processed. Max size of the queue is simply configurable by mp.messaging.incoming.example-in.max.poll.records and mp.messaging.incoming.example-in.max-queue-size-factor.
But it’s not the only thing that should be configured for your application. Commit strategies should be reviewed.

Quarkus commit strategies

Ignore strategy

If `enable.auto.commit` is true then the default is `ignore` otherwise it is `throttled`. This strategy ignores any acknowledgment and all messages are committed via auto commit interval without any relation to a process step. Example:
Instance is stopped, the number of lost messages = mp.messaging.incoming.example-in.max.poll.records * mp.messaging.incoming.example-in.max-queue-size-factor

Throttled strategy

Suitable for async processing. It will keep track of received messages and commit to the next offset after the latest ACKed message in sequence. It will commit periodically as defined by `auto.commit.interval.ms` (default: 5000)
This strategy has good performance because processed messages are committed by interval and you have the possibility to mark processed messages as acknowledged by your business logic.
It is possible to define auto or manual acknowledgment in your code. 

The latest strategy

This strategy commits each message. It can influence the performance of your application but would create  less duplicated messages.

Conclusion

Throtted strategy would be  the best choice for losing less messages with Kafka. Additionally you would need to set up suitable values for properties from ‘Kafka connector commit strategies’ to establish the number of the polled messages in memory.

Leave a Reply