ソースを参照

kafka batch send concurrent cluster

admin 1 年間 前
コミット
697de296fd

+ 62 - 0
kafkaSpringCluster/pom.xml

@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>kafkaSpringCluster</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.2.2.RELEASE</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.78</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 28 - 0
kafkaSpringCluster/src/main/java/org/example/CustomizePartitioner.java

@@ -0,0 +1,28 @@
+package org.example;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import java.util.Map;
+
+public class CustomizePartitioner implements Partitioner {
+    @Override
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+        Integer keyHash = (key == null) ? null : key.hashCode();
+        if (keyHash == null) {
+            // key为null,使用默认的Partitioner进行分配
+            return 0;
+        } else {
+            // 根据哈希值对Partition数取模,获得应该分配到的Partition编号
+            int numPartitions = cluster.availablePartitionsForTopic(topic).size();
+            return Math.abs(keyHash % numPartitions);
+        }
+    }
+
+    @Override
+    public void close() {
+    }
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+}

+ 77 - 0
kafkaSpringCluster/src/main/java/org/example/KafkaConsumer.java

@@ -0,0 +1,77 @@
+package org.example;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+@Component
+public class KafkaConsumer {
+    // 消费监听
+//    @KafkaListener(topics = {"topic1"})
+//    public void onMessage1(ConsumerRecord<?, ?> record){
+//        // 消费的哪个topic、partition的消息,打印出消息内容
+//        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
+//    }
+//
+//    /**
+//     * @Title 指定topic、partition、offset消费
+//     * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
+//     * @Date 2020/3/22 13:38
+//     * @Param [record]
+//     * @return void
+//     **/
+//    @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
+//            @TopicPartition(topic = "topic1", partitions = { "0" }),
+//            @TopicPartition(topic = "topic2", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "8"))
+//    })
+//    public void onMessage2(ConsumerRecord<?, ?> record) {
+//        System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
+//    }
+
+//    // 批量,在配置中设置:
+//    // # 设置批量消费
+//    //spring.kafka.listener.type=batch
+//    //# 批量消费每次最多消费多少条消息
+//    //spring.kafka.consumer.max-poll-records=5
+//    // 注意:修改后其他consumer不能正常接收消息
+//    @KafkaListener(topics = {"topic1"})
+//    public void listen1(List<String> data) {
+//        System.out.println("收到"+ data.size() + "条消息:");
+//        System.out.println(data);
+//    }
+//
+//    // 或者用ConsumerRecord类接收
+//    @KafkaListener(topics = {"topic1"})
+//    public void listen2(List<ConsumerRecord<String, Object>> records) {
+//        System.out.println("收到"+ records.size() + "条消息:");
+//        System.out.println(records);
+//    }
+
+    // 使用注解方式获取消息头、消息体,则也是使用List来接收
+
+    @KafkaListener(topics = {"topic5"})
+    public void listen2(@Payload List<String> data,
+                        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
+                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
+                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
+                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> tss) {
+        System.out.println("收到"+ data.size() + "条消息:");
+        System.out.println("data: "+data);
+        System.out.println("topics: "+topics);
+        System.out.println("topics: "+partitions);
+        System.out.println("keys: "+keys);
+        System.out.println("timestamp: "+tss);
+    }
+}

+ 20 - 0
kafkaSpringCluster/src/main/java/org/example/KafkaInitialConfiguration.java

@@ -0,0 +1,20 @@
+package org.example;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class KafkaInitialConfiguration {
+    // 创建一个名为testtopic的Topic并设置分区数为8,分区副本数为2
+    @Bean
+    public NewTopic initialTopic() {
+        return new NewTopic("topic5",8, (short) 2 );
+    }
+    // 如果要修改分区数,只需修改配置值重启项目即可
+    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
+    @Bean
+    public NewTopic updateTopic() {
+        return new NewTopic("testtopic",10, (short) 2 );
+    }
+}

+ 77 - 0
kafkaSpringCluster/src/main/java/org/example/KafkaProducer.java

@@ -0,0 +1,77 @@
+package org.example;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class KafkaProducer {
+    @Autowired
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    // 简单发送消息
+    @GetMapping("/kafka/normal/{message}")
+    public void sendMessage1(@PathVariable("message") String normalMessage) {
+        kafkaTemplate.send("topic1", normalMessage);
+    }
+
+    // 回调
+    @GetMapping("/kafka/callbackOne/{message}")
+    public void sendMessage2(@PathVariable("message") String callbackMessage) {
+        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
+            // 消息发送到的topic
+            String topic = success.getRecordMetadata().topic();
+            // 消息发送到的分区
+            int partition = success.getRecordMetadata().partition();
+            // 消息在分区内的offset
+            long offset = success.getRecordMetadata().offset();
+            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
+        }, failure -> {
+            System.out.println("发送消息失败:" + failure.getMessage());
+        });
+    }
+
+    // 另一种回调
+    @GetMapping("/kafka/callbackTwo/{message}")
+    public void sendMessage3(@PathVariable("message") String callbackMessage) {
+        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
+            @Override
+            public void onFailure(Throwable ex) {
+                System.out.println("发送消息失败:"+ex.getMessage());
+            }
+
+            @Override
+            public void onSuccess(SendResult<String, Object> result) {
+                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
+            }
+        });
+    }
+
+    // 有误
+////    @Transactional
+//    @GetMapping("/kafka/transaction")
+//    public void sendMessage7(){
+//        // 声明事务:后面报错消息不会发出去
+//        kafkaTemplate.executeInTransaction(operations -> {
+//            operations.send("topic1","test executeInTransaction");
+//            throw new RuntimeException("fail");
+//        });
+//        // 不声明事务:后面报错但前面消息已经发送成功了
+//        kafkaTemplate.send("topic1","test executeInTransaction");
+//        throw new RuntimeException("fail");
+//    }
+
+    // 发送消息
+    @GetMapping("/test")
+    public void test() {
+        for (int i = 0; i < 23; i++) {
+            kafkaTemplate.send("topic5", String.valueOf(i) ,"message-" + i);
+        }
+    }
+}

+ 11 - 0
kafkaSpringCluster/src/main/java/org/example/Main.java

@@ -0,0 +1,11 @@
+package org.example;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Main {
+    public static void main(String[] args){
+        SpringApplication.run(Main.class, args);
+    }
+}

+ 54 - 0
kafkaSpringCluster/src/main/resources/application.properties

@@ -0,0 +1,54 @@
+###########?Kafka???###########
+spring.kafka.bootstrap-servers=localhost:9091,localhost:9092,localhost:9093
+###########??????????###########
+# ????
+spring.kafka.producer.retries=2
+# ????:??????????????????ack??(??0?1?all/-1)
+spring.kafka.producer.acks=-1
+# ????
+spring.kafka.producer.batch-size=16384
+# ????
+spring.kafka.producer.properties.linger.ms=0
+# ???????????batch-size??????linger.ms?,???????????kafka
+# linger.ms?0??????????????kafka,???batch-size??????
+
+# ????????
+spring.kafka.producer.buffer-memory = 33554432
+# Kafka????????????
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+# ??????
+# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
+###########??????????###########
+# ??????ID
+spring.kafka.consumer.properties.group.id=defaultConsumerGroup
+# ??????offset
+spring.kafka.consumer.enable-auto-commit=true
+# ??offset??(??????????offset)
+spring.kafka.consumer.auto.commit.interval.ms=1000
+# ?kafka?????offset?offset??????????offset
+# earliest:?????????offset;
+# latest:?????????offset(???????????);
+# none:??????????????offset,?????;
+spring.kafka.consumer.auto-offset-reset=latest
+# ????????(??????consumer??????,????rebalance??)
+spring.kafka.consumer.properties.session.timeout.ms=120000
+# ????????
+spring.kafka.consumer.properties.request.timeout.ms=180000
+# Kafka????????????
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+# ??????topic????????????(??)
+spring.kafka.listener.missing-topics-fatal=false
+# ??????
+# spring.kafka.listener.type=batch
+# ???????????????
+# spring.kafka.consumer.max-poll-records=50
+# ??????
+spring.kafka.producer.properties.partitioner.class=org.example.CustomizePartitioner
+# ??????
+spring.kafka.listener.type=batch
+# ???????????????
+spring.kafka.consumer.max-poll-records=5
+# ??
+spring.kafka.listener.concurrency=3