Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Kafka Partitioning Algorithm
Search
Gordon Diggs
September 02, 2016
Technology
0
140
Kafka Partitioning Algorithm
A look at the default partitioning algorithm that Kafka uses
Gordon Diggs
September 02, 2016
Tweet
Share
More Decks by Gordon Diggs
See All by Gordon Diggs
John Coltrane: Lessons in Leadership
gordondiggs
2
290
The Customer Gap
gordondiggs
1
110
Picking Records with JavaScript and a Button
gordondiggs
0
80
Supbutton
gordondiggs
0
64
Rayons
gordondiggs
0
66
Sous Vide
gordondiggs
0
88
Dev Events & Internal Tools at Paperless Post
gordondiggs
0
120
The Joys and Pains of Working With an Old Codebase
gordondiggs
0
150
The Joys and Pains of Working with an Old Codebase
gordondiggs
1
2.4k
Other Decks in Technology
See All in Technology
わからなくて良いなら、わからなきゃだめなの?
kotaoue
1
370
[JAWSDAYS2026][D8]その起票、愛が足りてますか?AWSサポートを味方につける、技術的「ラブレター」の書き方
hirosys_
3
190
Tebiki Engineering Team Deck
tebiki
0
27k
AIエージェント、 社内展開の前に知っておきたいこと
oracle4engineer
PRO
2
140
20260311 ビジネスSWG活動報告(デジタルアイデンティティ人材育成推進WG Ph2 活動報告会)
oidfj
0
340
ソフトバンク流!プラットフォームエンジニアリング実現へのアプローチ
sbtechnight
0
150
実践 Datadog MCP Server
nulabinc
PRO
2
230
AWSの資格って役に立つの?
tk3fftk
2
340
OCHaCafe S11 #2 コンテナ時代の次の一手:Wasm 最前線
oracle4engineer
PRO
2
140
us-east-1 に障害が起きた時に、 ap-northeast-1 にどんな影響があるか 説明できるようになろう!
miu_crescent
PRO
13
4.4k
情シスのための生成AI実践ガイド2026 / Generative AI Practical Guide for Business Technology 2026
glidenote
0
270
Oracle Cloud Infrastructure IaaS 新機能アップデート 2025/12 - 2026/2
oracle4engineer
PRO
0
150
Featured
See All Featured
How To Stay Up To Date on Web Technology
chriscoyier
790
250k
B2B Lead Gen: Tactics, Traps & Triumph
marketingsoph
0
77
[SF Ruby Conf 2025] Rails X
palkan
2
830
Dealing with People You Can't Stand - Big Design 2015
cassininazir
367
27k
The Success of Rails: Ensuring Growth for the Next 100 Years
eileencodes
47
8k
Are puppies a ranking factor?
jonoalderson
1
3.1k
Connecting the Dots Between Site Speed, User Experience & Your Business [WebExpo 2025]
tammyeverts
11
860
How to Align SEO within the Product Triangle To Get Buy-In & Support - #RIMC
aleyda
1
1.4k
Save Time (by Creating Custom Rails Generators)
garrettdimon
PRO
32
2.4k
Practical Tips for Bootstrapping Information Extraction Pipelines
honnibal
25
1.8k
Deep Space Network (abreviated)
tonyrice
0
92
Responsive Adventures: Dirty Tricks From The Dark Corners of Front-End
smashingmag
254
22k
Transcript
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning • Can be keyed • Round robin (mostly)
by default
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
Utils.toPositive
public static int toPositive(int number) { return number & 0x7fffffff;
}
0111 1111 1111 1111 1111 1111 1111 1111
0000 0000 0000 0000 0000 0000 0000 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0000 0000 0000 0000 0000 0000 0000 1010
Two’s Complement
0000 0000 0000 0000 0000 0000 0000 1010 1111 1111
1111 1111 1111 1111 1111 0101 1111 1111 1111 1111 1111 1111 1111 1010
1111 1111 1111 1111 1111 1111 1111 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0111 1111 1111 1111 1111 1111 1111 1010
None