Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer.
E.g. the following commits every 10s - on each call to `poll`, it doesn't automagically commit every 5 s.
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "5000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(10_000);
}
Just a note: I am not claiming it is working correctly, only saying there is a clear and documented way how the client knows when to commit, and that it works as expected in a simple scenario.
By calling `poll()` again. It doesn't commit the records returned from poll until auto commit interval expires AND you call poll again.
At least this is what the javadoc says quite clearly: https://kafka.apache.org/39/javadoc/org/apache/kafka/clients...
Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that you must consume all data returned from each call to poll(Duration) before any subsequent calls, or before closing the consumer.
E.g. the following commits every 10s - on each call to `poll`, it doesn't automagically commit every 5 s.