12345678910111213141516171819202122232425262728 |
- package org.example;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import java.util.Map;
- public class CustomizePartitioner implements Partitioner {
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- Integer keyHash = (key == null) ? null : key.hashCode();
- if (keyHash == null) {
- // key为null,使用默认的Partitioner进行分配
- return 0;
- } else {
- // 根据哈希值对Partition数取模,获得应该分配到的Partition编号
- int numPartitions = cluster.availablePartitionsForTopic(topic).size();
- return Math.abs(keyHash % numPartitions);
- }
- }
- @Override
- public void close() {
- }
- @Override
- public void configure(Map<String, ?> configs) {
- }
- }
|