CustomizePartitioner.java 902 B

12345678910111213141516171819202122232425262728
  1. package org.example;
  2. import org.apache.kafka.clients.producer.Partitioner;
  3. import org.apache.kafka.common.Cluster;
  4. import java.util.Map;
  5. public class CustomizePartitioner implements Partitioner {
  6. @Override
  7. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  8. Integer keyHash = (key == null) ? null : key.hashCode();
  9. if (keyHash == null) {
  10. // key为null,使用默认的Partitioner进行分配
  11. return 0;
  12. } else {
  13. // 根据哈希值对Partition数取模,获得应该分配到的Partition编号
  14. int numPartitions = cluster.availablePartitionsForTopic(topic).size();
  15. return Math.abs(keyHash % numPartitions);
  16. }
  17. }
  18. @Override
  19. public void close() {
  20. }
  21. @Override
  22. public void configure(Map<String, ?> configs) {
  23. }
  24. }