117.info
人生若只如初见

Ubuntu Kafka配置中的消费者设置

在Ubuntu上配置Kafka消费者时,需要考虑多个方面,包括消费者组的ID、服务器地址、自动提交偏移量等。以下是一些关键步骤和配置参数:

Kafka消费者配置步骤

  1. 配置消费者属性:设置消费者组ID、服务器地址、自动提交偏移量等参数。
  2. 创建Kafka消费者实例:使用上述配置的属性创建一个Kafka消费者实例。
  3. 订阅主题:通过调用subscribe方法,订阅一个或多个主题。
  4. 拉取消息:调用poll方法从Kafka服务器拉取消息。
  5. 处理消息:对拉取到的消息进行业务处理。
  6. 手动提交偏移量(可选):根据需求选择手动提交偏移量,调用commitSynccommitAsync方法。

Kafka消费者重要参数

  • fetch.min.bytes:消费者一次拉取中拉取的最小数据量,默认值为1B。
  • fetch.max.bytes:消费者一次拉取中拉取的最大数据量,默认值为52428800B,即50MB。
  • fetch.max.wait.ms:指定Kafka的等待时间,默认值为500ms。
  • max.partition.fetch.bytes:配置从每个分区里返回给consumer的最大数据量。
  • max.poll.records:配置consumer在一次拉取请求中拉取的最大消息数,默认为500条。
  • connections.max.idle.ms:空连接超时限制。
  • exclude.internal.topics:指定Kafka中的内部主题是否可以向消费者公开,默认为true。
  • receive.buffer.bytes:设置socket接收消息缓冲区大小,默认值为65536B,即64KB。
  • send.buffer.bytes:设置socket发送消息缓冲区大小,默认值为131072B,即128KB。
  • request.timeout.ms:consumer等待请求响应的最长时间,默认为30000ms。
  • metadata.max.age.ms:元数据过期时间,默认300000ms,即5分钟。
  • reconnect.backoff.ms:尝试重新连接主机之前等待时间,默认50ms。
  • retry.backoff.ms:尝试重新发送失败的请求到指定主题分区之前的等待时间,默认100ms。
  • isolation.level:事务隔离级别,有“read_uncommitted”和“read_committed”两种,默认情况为“read_uncommitted”。

使用SpringBoot配置Kafka消费者的示例代码

@Component
@Slf4j
public class MessageReceiveListener {
    @KafkaListener(topics = "ifaas-test", containerFactory = "ifaasContainerFactory")
    public void receiveMessage1(List consumerRecords, Acknowledgment ack) {
        try {
            log.info("receiveMessage1 接收的kafka消息:" + consumerRecords.size());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("kafka失败消息:{}", JSON.toJSONString(consumerRecords));
        }
    }

    @Bean("ifaasContainerFactory")
    public ConcurrentKafkaListenerContainerFactory ifaasContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        Map props = staticConsumerProps(ifaasServers, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(1);
        return factory;
    }

    private Map staticConsumerProps(String servers, String groupId) {
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 350000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 4000);
        return props;
    }
}

以上信息提供了在Ubuntu上配置Kafka消费者的一些基本步骤和关键参数,以及使用SpringBoot框架的示例代码。

未经允许不得转载 » 本文链接:https://www.117.info/ask/fe964AzsNAwFXBVc.html

推荐文章

  • Ubuntu Python安装后如何验证

    在Ubuntu系统中安装Python后,可以通过以下几种方法来验证安装是否成功:
    检查Python版本 打开终端(Terminal)。
    输入 python --version 或 python3 ...

  • filezilla连接ubuntu服务器方法

    要使用FileZilla连接到Ubuntu服务器,请按照以下步骤操作:
    1. 安装FileZilla Server 在Ubuntu服务器上安装FileZilla Server。可以通过以下命令下载并安装:...

  • ubuntu如何安装openssl

    在Ubuntu上安装OpenSSL非常简单,因为它是大多数Linux发行版的默认软件包之一。以下是在Ubuntu上安装OpenSSL的步骤: 打开终端(快捷键:Ctrl + Alt + T)。 首先...

  • Ubuntu Swagger如何处理错误

    在Ubuntu上使用Swagger时,可能会遇到一些常见错误。以下是一些解决方案: TypeError: Failed to execute ‘fetch‘ on ‘Window‘: Request with GET/HEAD meth...

  • dmesg中的驱动信息怎么查看

    dmesg(display message或者driver message)是Linux下用于显示内核启动信息和运行时状态的命令行工具。它可以帮助你查看驱动程序加载和运行的相关信息。要查看d...

  • HDFS在大数据处理中的优势是什么

    HDFS(Hadoop Distributed File System)在大数据处理中具有多方面的优势,主要包括以下几点:
    高可靠性 数据冗余: HDFS通过将数据块复制到多个节点来实现...

  • ubuntu deluser如何配置防火墙规则

    在Ubuntu系统中,deluser命令用于删除用户账户。如果你想在删除用户的同时配置防火墙规则,以阻止该用户的所有网络访问,你可以使用ufw(Uncomplicated Firewall...

  • Python机器学习在Ubuntu如何部署

    在Ubuntu上部署Python机器学习项目通常涉及以下几个步骤: 操作系统环境准备: 更新操作系统: sudo apt update && sudo apt upgrade -y 安装基础工具: sudo ap...