Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Kafka Partitioning Algorithm
Search
Gordon Diggs
September 02, 2016
Technology
0
140
Kafka Partitioning Algorithm
A look at the default partitioning algorithm that Kafka uses
Gordon Diggs
September 02, 2016
Tweet
Share
More Decks by Gordon Diggs
See All by Gordon Diggs
John Coltrane: Lessons in Leadership
gordondiggs
2
290
The Customer Gap
gordondiggs
1
110
Picking Records with JavaScript and a Button
gordondiggs
0
80
Supbutton
gordondiggs
0
64
Rayons
gordondiggs
0
66
Sous Vide
gordondiggs
0
88
Dev Events & Internal Tools at Paperless Post
gordondiggs
0
120
The Joys and Pains of Working With an Old Codebase
gordondiggs
0
150
The Joys and Pains of Working with an Old Codebase
gordondiggs
1
2.4k
Other Decks in Technology
See All in Technology
Everything Claude Code を眺める
oikon48
13
8.5k
Postman v12 で変わる API開発ワークフロー (Postman v12 アップデート) / New API development workflow with Postman v12
yokawasa
0
150
大規模ECサイトのあるバッチのパフォーマンスを改善するために僕たちのチームがしてきたこと
panda_program
1
260
コンテキスト・ハーネスエンジニアリングの現在
hirosatogamo
PRO
6
610
It’s “Time” to use Temporal
sajikix
3
230
今のWordPress の制作手法ってなにがあんねん?(改) / What’s the Deal with WordPress Development These Days?
tbshiki
0
520
Keycloak を使った SSO で CockroachDB にログインする / CockroachDB SSO with Keycloak
kota2and3kan
0
170
OpenClaw を Amazon Lightsail で動かす理由
uechishingo
0
230
Google系サービスで文字起こしから勝手にカレンダーを埋めるエージェントを作った話
risatube
0
200
プラットフォームエンジニアリングはAI時代の開発者をどう救うのか
jacopen
8
4k
2026年もソフトウェアサプライチェーンのリスクに立ち向かうために / Product Security Square #3
flatt_security
1
700
モジュラモノリス導入から4年間の総括:アーキテクチャと組織の相互作用について / Architecture and Organizational Interaction
nazonohito51
3
820
Featured
See All Featured
The Anti-SEO Checklist Checklist. Pubcon Cyber Week
ryanjones
0
95
The Hidden Cost of Media on the Web [PixelPalooza 2025]
tammyeverts
2
250
Why Our Code Smells
bkeepers
PRO
340
58k
Neural Spatial Audio Processing for Sound Field Analysis and Control
skoyamalab
0
220
How Fast Is Fast Enough? [PerfNow 2025]
tammyeverts
3
500
Typedesign – Prime Four
hannesfritz
42
3k
GitHub's CSS Performance
jonrohan
1032
470k
Breaking role norms: Why Content Design is so much more than writing copy - Taylor Woolridge
uxyall
0
210
30 Presentation Tips
portentint
PRO
1
260
Dealing with People You Can't Stand - Big Design 2015
cassininazir
367
27k
Designing Experiences People Love
moore
143
24k
A better future with KSS
kneath
240
18k
Transcript
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning • Can be keyed • Round robin (mostly)
by default
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
Utils.toPositive
public static int toPositive(int number) { return number & 0x7fffffff;
}
0111 1111 1111 1111 1111 1111 1111 1111
0000 0000 0000 0000 0000 0000 0000 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0000 0000 0000 0000 0000 0000 0000 1010
Two’s Complement
0000 0000 0000 0000 0000 0000 0000 1010 1111 1111
1111 1111 1111 1111 1111 0101 1111 1111 1111 1111 1111 1111 1111 1010
1111 1111 1111 1111 1111 1111 1111 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0111 1111 1111 1111 1111 1111 1111 1010
None