Kafka Consumer Optimization and Configuration Details [email protected]

Custom Properties and Execution Factory

public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.COUNT_TIME);
        factory.getContainerProperties().setAckCount(50);
        factory.getContainerProperties().setAckTime(2000);
        factory.setConcurrency(concurrency);
        Return factory;
    }

    Private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        Props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        Props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, consumerRequestTimeoutMs);
        Props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        Props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        Props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        Props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        Props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaLogMessageDeSer.class);
        Props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        Props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//number of each batch
        Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) is considered to be a lock after the docking time
        Return props;
    }

Parameter details

BOOTSTRAP_SERVERS_CONFIG    kafka ip+port
REQUEST_TIMEOUT_MS_CONFIG    请求超时时间
ENABLE_AUTO_COMMIT_CONFIG     是否自动提交
AUTO_COMMIT_INTERVAL_MS_CONFIG 自动提交间隔时间
SESSION_TIMEOUT_MS_CONFIG      session失效时间
KEY_DESERIALIZER_CLASS_CONFIG  key序列化工具类
VALUE_DESERIALIZER_CLASS_CONFIG   value序列化工具类
GROUP_ID_CONFIG                   消费组名称(id)
MAX_POLL_RECORDS_CONFIG            每次拉去最大数量(要配合   factory.setBatchListener(true);使用)
AUTO_OFFSET_RESET_CONFIG            默认offset位置(当不存在offset时从哪里开始读取)
MAX_POLL_INTERVAL_MS_CONFIG         最大poll数据时间间隔

Automatically submit and manually submit configuration

1, auto-submit ENABLE_AUTO_COMMIT_CONFIG Set to true

AUTO_COMMIT_INTERVAL_MS_CONFIG Define the time interval, how often to submit offset

1 Automatically submit ENABLE_AUTO_COMMIT_CONFIG set to false

Because spring-kafka is used, when submitting manually, it will use spring's submitting strategy to submit offset

by custom strategy.

factory.getContainerProperties().setAckMode() 来定义使用的方式
AbstractMessageListenerContainer.AckMode 中定义了7中策略给我们使用
RECORD 每次拉去处理完成后进行提交
BATCH 每次拉去完成之后都进行提交
TIME 指定时间间隔提交
COUNT 拉去到一定数量进行提交
COUNT_TIME 指定时间或达到数量后进行提交
下面两种比较特殊(需要用户自己使用ack进行提交)

MANUAL(用户处理后手动提交)
MANUAL_IMMEDIATE(用户拉取后立即手动提交)

未续