Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
Kafka Partitioning Algorithm
Search
Sponsored
·
SiteGround - Reliable hosting with speed, security, and support you can count on.
→
Gordon Diggs
September 02, 2016
Technology
150
0
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
Kafka Partitioning Algorithm
A look at the default partitioning algorithm that Kafka uses
Gordon Diggs
September 02, 2016
More Decks by Gordon Diggs
See All by Gordon Diggs
John Coltrane: Lessons in Leadership
gordondiggs
2
310
The Customer Gap
gordondiggs
1
120
Picking Records with JavaScript and a Button
gordondiggs
0
88
Supbutton
gordondiggs
0
88
Rayons
gordondiggs
0
94
Sous Vide
gordondiggs
0
110
Dev Events & Internal Tools at Paperless Post
gordondiggs
0
130
The Joys and Pains of Working With an Old Codebase
gordondiggs
0
150
The Joys and Pains of Working with an Old Codebase
gordondiggs
1
2.4k
Other Decks in Technology
See All in Technology
Microsoft Build Keynoteふりかえり
tomokusaba
0
120
AAIFに入ってみた ~内から見えるコミュニティ動向~
sato4
0
140
[モダンアプリ勉強会]今更聞けないGit/GitHub入門
tsukuboshi
0
360
DevOps Agentで始めるAWS運用 〜フロンティアエージェントが変える運用の現場〜
nyankotaro
1
380
NAB Show 2026 動画技術関連レポート / NAB Show 2026 Report
cyberagentdevelopers
PRO
0
160
2026 TECHFRESH 畢業分享會 - AI-Native 重塑軟體工程與虛擬講師
line_developers_tw
PRO
0
710
Disciplined Vibes: Scaling AI-Assisted Engineering
sheharyar
0
130
AWSシリコン最前線 〜AI時代のチップ選択を読み解く〜
htokoyo
2
400
就職⽀援サービスにおけるキャリアアドバイザーのシフトスケジューリング
recruitengineers
PRO
1
130
「エンジニア進化論」2028年の開発完全自動化、エンジニアはどう進化するか
cyberagentdevelopers
PRO
4
4.4k
Claude Code の Sandbox 機能を Anthropic Sandbox Runtime(srt) で試そう!/lets-play-anthropic-sandbox-runtime
tomoki10
1
520
チームで進めるAI駆動アジャイル×ウォーターフォール
kumaiu
0
150
Featured
See All Featured
A Tale of Four Properties
chriscoyier
163
24k
How GitHub (no longer) Works
holman
316
150k
The Invisible Side of Design
smashingmag
302
52k
Breaking role norms: Why Content Design is so much more than writing copy - Taylor Woolridge
uxyall
0
310
16th Malabo Montpellier Forum Presentation
akademiya2063
PRO
0
140
Ten Tips & Tricks for a 🌱 transition
stuffmc
0
130
A better future with KSS
kneath
240
18k
Agile Actions for Facilitating Distributed Teams - ADO2019
mkilby
0
200
Evolving SEO for Evolving Search Engines
ryanjones
0
210
Keith and Marios Guide to Fast Websites
keithpitt
413
23k
Navigating the Design Leadership Dip - Product Design Week Design Leaders+ Conference 2024
apolaine
1
340
Leadership Guide Workshop - DevTernity 2021
reverentgeek
1
300
Transcript
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning Algorithm CC LnL 20160902
Kafka Partitioning • Can be keyed • Round robin (mostly)
by default
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
Utils.toPositive
public static int toPositive(int number) { return number & 0x7fffffff;
}
0111 1111 1111 1111 1111 1111 1111 1111
0000 0000 0000 0000 0000 0000 0000 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0000 0000 0000 0000 0000 0000 0000 1010
Two’s Complement
0000 0000 0000 0000 0000 0000 0000 1010 1111 1111
1111 1111 1111 1111 1111 0101 1111 1111 1111 1111 1111 1111 1111 1010
1111 1111 1111 1111 1111 1111 1111 1010 & 0111
1111 1111 1111 1111 1111 1111 1111 = 0111 1111 1111 1111 1111 1111 1111 1010
None