要将Java Kafka与Spark集成,您需要执行以下步骤:
- 添加依赖项
首先,确保在项目的pom.xml文件中添加Kafka和Spark的依赖项。对于Maven项目,将以下依赖项添加到pom.xml文件中:
org.apache.kafka kafka-clients 2.8.0 org.apache.spark spark-core_2.12 3.2.0 org.apache.spark spark-streaming_2.12 3.2.0
请注意,您可能需要根据项目需求更改版本号。
- 创建Kafka消费者和生产者
创建一个Kafka消费者和生产者,用于从Kafka主题中读取和写入数据。以下是一个简单的示例:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaExample { public static void main(String[] args) { // Kafka配置 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 创建Kafka消费者 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic")); // 创建Kafka生产者 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer<>(props); // 生产数据 for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("test-topic", Integer.toString(i), Integer.toString(i * 2))); } producer.close(); // 消费数据 while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> System.out.printf("offset = %d, key = %s, value = https://www.yisu.com/ask/%s%n", record.offset(), record.key(), record.value())); } } }
- 创建Spark Streaming应用程序
创建一个Spark Streaming应用程序,用于从Kafka主题中读取数据并将其处理为DStream。以下是一个简单的示例:
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; public class SparkStreamingKafkaExample { public static void main(String[] args) { // 创建Spark配置和上下文 SparkConf conf = new SparkConf().setAppName("SparkStreamingKafkaExample").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(sc, Duration.ofSeconds(1)); // 从Kafka主题中读取数据 JavaInputDStreamkafkaStream = ssc.socketTextStream("localhost", 9999); // 将数据转换为DStream JavaPairRDD counts = kafkaStream .flatMap(s -> Arrays.asList(s.split(" ")).iterator()) .mapToPair(word -> new Tuple2<>(word, 1)) .reduceByKey((a, b) -> a + b); // 打印结果 counts.print(); // 启动Streaming上下文 ssc.start(); ssc.awaitTermination(); } }
- 运行应用程序
首先,启动Kafka服务器(如果您还没有启动):
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
然后,分别启动Kafka消费者和生产者以及Spark Streaming应用程序。您应该看到Kafka消费者从Kafka主题中读取数据,并将数据发送到Spark Streaming应用程序进行处理。最后,您应该看到Spark Streaming应用程序输出处理后的结果。