在Ubuntu上配置Kafka消费者涉及多个步骤,包括安装必要的软件、配置Kafka和Zookeeper、创建和订阅主题,以及编写和运行消费者代码。以下是一个详细的指南:
1. 安装Java和Zookeeper
首先,确保你的系统上安装了Java和Zookeeper。
安装Java
sudo apt update sudo apt install openjdk-8-jdk java -version
安装Zookeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz tar xvf zookeeper-3.4.6.tar.gz sudo mv zookeeper-3.4.6 /usr/local/zookeeper sudo cat > /usr/local/zookeeper/conf/zoo.cfg << EOF tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 EOF sudo /usr/local/zookeeper/bin/zkServer.sh start
2. 安装Kafka
接下来,下载并安装Kafka。
下载Kafka
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz tar -xzvf kafka_2.12-3.5.2.tgz sudo mv kafka_2.12-3.5.2 /usr/local/kafka sudo mkdir /tmp/kafka-logs
配置Kafka
编辑Kafka的配置文件server.properties
:
sudo nano /usr/local/kafka/config/server.properties
确保以下配置正确:
listeners=PLAINTEXT://your_server_ip:9092 zookeeper.connect=localhost:2181
启动Kafka
sudo /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
3. 创建和订阅主题
使用Kafka命令行工具创建一个主题并订阅它。
创建主题
sudo /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
订阅主题
sudo /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
4. 编写和运行消费者代码
你可以使用Python编写一个简单的Kafka消费者。
安装Kafka-Python
sudo pip3 install kafka-python
消费者代码示例 (consumer.py
)
from kafka import KafkaConsumer consumer = KafkaConsumer( 'test', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=False ) for msg in consumer: print(msg.value.decode('utf-8'))
运行消费者
python3 consumer.py
5. 高级配置
消费者属性
auto.offset.reset
: 控制消费者在没有有效偏移量或分区不存在时从哪里开始读取。enable.auto.commit
: 是否自动提交偏移量。max.poll.records
: 每次poll调用返回的最大记录数。fetch.min.bytes
: 每次fetch请求的最小字节数。max.partition.fetch.bytes
: 每个分区返回给消费者的最大字节数。session.timeout.ms
: 消费者发送心跳信号的时间间隔。
6. 开机自启配置
为了确保Kafka和Zookeeper在系统启动时自动启动,可以配置它们的开机自启。
Zookeeper服务配置
sudo nano /lib/systemd/system/zookeeper.service
添加内容:
[Unit] Description=Zookeeper service After=network.target [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/kafka/bin" User=root Group=root ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
Kafka服务配置
sudo nano /lib/systemd/system/kafka.service
添加内容:
[Unit] Description=Apache Kafka server (broker) After=zookeeper.service [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/kafka/bin" User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
刷新配置并启动服务
sudo systemctl daemon-reload sudo systemctl enable zookeeper sudo systemctl enable kafka sudo systemctl start zookeeper sudo systemctl start kafka
通过以上步骤,你可以在Ubuntu上成功配置Kafka消费者。根据具体需求,你可以进一步调整消费者配置以满足实际应用场景。