package org.example; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class CustomizePartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { Integer keyHash = (key == null) ? null : key.hashCode(); if (keyHash == null) { // key为null,使用默认的Partitioner进行分配 return 0; } else { // 根据哈希值对Partition数取模,获得应该分配到的Partition编号 int numPartitions = cluster.availablePartitionsForTopic(topic).size(); return Math.abs(keyHash % numPartitions); } } @Override public void close() { } @Override public void configure(Map configs) { } }