Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Kafka Partitioning Algorithm
Search
Gordon Diggs
September 02, 2016
Technology
150
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
Kafka Partitioning Algorithm
A look at the default partitioning algorithm that Kafka uses
Gordon Diggs
September 02, 2016
More Decks by Gordon Diggs
See All by Gordon Diggs
John Coltrane: Lessons in Leadership
gordondiggs
2
310
The Customer Gap
gordondiggs
1
120
Picking Records with JavaScript and a Button
gordondiggs
0
88
Supbutton
gordondiggs
0
88
Rayons
gordondiggs
0
94
Sous Vide
gordondiggs
0
110
Dev Events & Internal Tools at Paperless Post
gordondiggs
0
130
The Joys and Pains of Working With an Old Codebase
gordondiggs
0
150
The Joys and Pains of Working with an Old Codebase
gordondiggs
1
2.4k
Other Decks in Technology
See All in Technology
白金鉱業Meetup_Vol.24_「AIエージェントは分けるほど良い」は本当か? / Is it true that “the more you divide AI agents, the better”?
brainpadpr
1
260
小さくはじめるSLI/SLO ~育てながら組織に定着させる実践知~ / Starting Small with SLI/SLOs: Building Adoption Through Continuous Growth
nari_ex
2
1.4k
新しいVibe Codingと”自走”について
watany
5
290
RAG を使わないという選択肢
tatsutaka
1
150
AI Engineering Summit Tokyo 2026 AIの前に、やることがある 〜医療データ企業の4フェーズ〜
dtaniwaki
0
2.5k
失敗を経て、Harness Engineering で 大切にしたいことを考える / Learning from Failure: What Matters in Harness Engineering
bitkey
PRO
1
290
Dario Amodi『Policy on the AI Exponential』を理解する
nagatsu
0
210
2026 TECHFRESH 畢業分享會 - 開發日常大解密!從領域驅動到企業級上線
line_developers_tw
PRO
0
690
フロンティアAIのゲート化と地政学リスク
nagatsu
0
110
MCP Appsを作ってみよう
iwamot
PRO
4
480
How Timee Delivers Day 1 Production Ready LLM Features
tomoyks
0
110
生成 AI × MCP で切り拓く次世代 SRE!自律型運用への挑戦と開発者体験の進化
_awache
0
190
Featured
See All Featured
実際に使うSQLの書き方 徹底解説 / pgcon21j-tutorial
soudai
PRO
201
75k
Max Prin - Stacking Signals: How International SEO Comes Together (And Falls Apart)
techseoconnect
PRO
0
180
Darren the Foodie - Storyboard
khoart
PRO
3
3.4k
Speed Design
sergeychernyshev
33
1.8k
The Cult of Friendly URLs
andyhume
79
6.9k
Evolution of real-time – Irina Nazarova, EuRuKo, 2024
irinanazarova
9
1.4k
Digital Projects Gone Horribly Wrong (And the UX Pros Who Still Save the Day) - Dean Schuster
uxyall
0
1.7k
The Organizational Zoo: Understanding Human Behavior Agility Through Metaphoric Constructive Conversations (based on the works of Arthur Shelley, Ph.D)
kimpetersen
PRO
0
360
Bioeconomy Workshop: Dr. Julius Ecuru, Opportunities for a Bioeconomy in West Africa
akademiya2063
PRO
1
140
Navigating the Design Leadership Dip - Product Design Week Design Leaders+ Conference 2024
apolaine
1
340
Prompt Engineering for Job Search
mfonobong
0
340
How to train your dragon (web standard)
notwaldorf
97
6.7k
Transcript
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning • Can be keyed • Round robin (mostly)
by default
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
Utils.toPositive
public static int toPositive(int number) { return number & 0x7fffffff;
}
0111 1111 1111 1111 1111 1111 1111 1111
0000 0000 0000 0000 0000 0000 0000 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0000 0000 0000 0000 0000 0000 0000 1010
Two’s Complement
0000 0000 0000 0000 0000 0000 0000 1010 1111 1111
1111 1111 1111 1111 1111 0101 1111 1111 1111 1111 1111 1111 1111 1010
1111 1111 1111 1111 1111 1111 1111 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0111 1111 1111 1111 1111 1111 1111 1010
None