在Ubuntu上配置Kafka消费者时,需要考虑多个方面,包括消费者组的ID、服务器地址、自动提交偏移量等。以下是一些关键步骤和配置参数:
Kafka消费者配置步骤
- 配置消费者属性:设置消费者组ID、服务器地址、自动提交偏移量等参数。
- 创建Kafka消费者实例:使用上述配置的属性创建一个Kafka消费者实例。
- 订阅主题:通过调用
subscribe
方法,订阅一个或多个主题。 - 拉取消息:调用
poll
方法从Kafka服务器拉取消息。 - 处理消息:对拉取到的消息进行业务处理。
- 手动提交偏移量(可选):根据需求选择手动提交偏移量,调用
commitSync
或commitAsync
方法。
Kafka消费者重要参数
fetch.min.bytes
:消费者一次拉取中拉取的最小数据量,默认值为1B。fetch.max.bytes
:消费者一次拉取中拉取的最大数据量,默认值为52428800B,即50MB。fetch.max.wait.ms
:指定Kafka的等待时间,默认值为500ms。max.partition.fetch.bytes
:配置从每个分区里返回给consumer的最大数据量。max.poll.records
:配置consumer在一次拉取请求中拉取的最大消息数,默认为500条。connections.max.idle.ms
:空连接超时限制。exclude.internal.topics
:指定Kafka中的内部主题是否可以向消费者公开,默认为true。receive.buffer.bytes
:设置socket接收消息缓冲区大小,默认值为65536B,即64KB。send.buffer.bytes
:设置socket发送消息缓冲区大小,默认值为131072B,即128KB。request.timeout.ms
:consumer等待请求响应的最长时间,默认为30000ms。metadata.max.age.ms
:元数据过期时间,默认300000ms,即5分钟。reconnect.backoff.ms
:尝试重新连接主机之前等待时间,默认50ms。retry.backoff.ms
:尝试重新发送失败的请求到指定主题分区之前的等待时间,默认100ms。isolation.level
:事务隔离级别,有“read_uncommitted”和“read_committed”两种,默认情况为“read_uncommitted”。
使用SpringBoot配置Kafka消费者的示例代码
@Component @Slf4j public class MessageReceiveListener { @KafkaListener(topics = "ifaas-test", containerFactory = "ifaasContainerFactory") public void receiveMessage1(ListconsumerRecords, Acknowledgment ack) { try { log.info("receiveMessage1 接收的kafka消息:" + consumerRecords.size()); ack.acknowledge(); } catch (Exception e) { log.error("kafka失败消息:{}", JSON.toJSONString(consumerRecords)); } } @Bean("ifaasContainerFactory") public ConcurrentKafkaListenerContainerFactory ifaasContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); Map props = staticConsumerProps(ifaasServers, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.setBatchListener(true); factory.setConcurrency(1); return factory; } private Map staticConsumerProps(String servers, String groupId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 350000); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 4000); return props; } }
以上信息提供了在Ubuntu上配置Kafka消费者的一些基本步骤和关键参数,以及使用SpringBoot框架的示例代码。