Upgrade to Pro
— share decks privately, control downloads, hide ads and more …
Speaker Deck
Features
Speaker Deck
PRO
Sign in
Sign up for free
Search
Search
pubsub with concurrent
Search
Sponsored
·
SiteGround - Reliable hosting with speed, security, and support you can count on.
→
takashabe
May 27, 2019
Programming
960
1
Share
Embed
Copy iframe code
Copy JS code
Copy link
Start on current slide
pubsub with concurrent
https://gounconference.connpass.com/event/129090/
の発表資料です。
takashabe
May 27, 2019
More Decks by takashabe
See All by takashabe
より良いターミナルでの生活を求めて
takashabe
0
72
OpenCensusでcustom context propagationとexporterを書いた話 / OpenCensus with custom context propagation and exporter
takashabe
0
1.8k
社内ISUCONを開催した話
takashabe
0
1.7k
ISUCON大反省会
takashabe
0
2k
gitのブランチ戦略
takashabe
8
6.1k
サルでもわかるgit
takashabe
0
1.6k
playで複数DBする
takashabe
0
1.6k
MySQLで高トラフィックに立ち向かう
takashabe
0
1.8k
GitHubの良さ
takashabe
2
2.3k
Other Decks in Programming
See All in Programming
Datadog × OpenTelemetry 入門と実践のあいだ
kn_to_maxpno
1
180
コンテキストの使い捨てをやめる — ビジネスルール駆動開発と miko —
ioki
0
230
代数的データ型って何が嬉しいの? #frontend_phpcon_do
kajitack
8
3.8k
Vite+ Unified Toolchain for the Web
naokihaba
0
340
なぜ型を書くのか? TSKaigi2026で改めて考える #tskaigi_smarthr
kajitack
0
140
jQueryをバージョンアップする前に使いたいjQuery Migrate
matsuo_atsushi
0
580
生成AI時代にこそ効くGo | Why Go Works in the Age of Generative AI
mom0tomo
8
3.3k
PHPで使える日時の表現と、その知り方 #frontend_phpcon_do
o0h
PRO
0
260
Go1.27で導入されるジェネリクスメソッドでできること
mackee
0
170
メソッドのジェネリクスでGoの夢は広がるか? / Kyoto.go #65
utgwkk
3
920
TAKTでAI駆動開発の品質を設計する
j5ik2o
7
1.5k
過去最大のMCPアップデート! 2026-07-28 RC版の謎に迫る
licux
6
390
Featured
See All Featured
Future Trends and Review - Lecture 12 - Web Technologies (1019888BNR)
signer
PRO
0
3.6k
The Language of Interfaces
destraynor
162
27k
The Psychology of Web Performance [Beyond Tellerrand 2023]
tammyeverts
49
3.5k
Data-driven link building: lessons from a $708K investment (BrightonSEO talk)
szymonslowik
1
1.1k
The Pragmatic Product Professional
lauravandoore
37
7.3k
How to Ace a Technical Interview
jacobian
281
24k
Helping Users Find Their Own Way: Creating Modern Search Experiences
danielanewman
31
3.2k
Why Our Code Smells
bkeepers
PRO
340
58k
Bioeconomy Workshop: Dr. Julius Ecuru, Opportunities for a Bioeconomy in West Africa
akademiya2063
PRO
1
150
KATA
mclloyd
PRO
35
15k
Automating Front-end Workflow
addyosmani
1370
210k
Practical Orchestrator
shlominoach
191
11k
Transcript
Pub/Sub with concurrent Go(Un)ConferenceʢGo͋Μ͜ʣLTେձ 6kg @takashabe
whoami • Takashi Abe (@takashabe) • גࣜձࣾαΠόʔΤʔδΣϯτ ΞυςΫελδΦ • ࠂ৴ϓϩμΫτΛGoͰ࡞͍ͬͯ·͢
• Goྺ3͘Β͍ • ͖ͳඪ४ύοέʔδ `go` ύοέʔδͰ͢ • Macͷ`g`Ωʔ͕յΕ͍ͯͯGo͕ଧͪͮΒͯ͘ࠔ͍ͬͯ·͢
Cloud Pub/Sub Λͬͯಘͨ Subscriber WorkerͷݟΛհ
Cloud Pub/Sub is Կ • GCPͷpub/subαʔϏε • Subscriberͷ࣮ʹ͍͔ͭ͘ύλʔϯ͕͋Δ • Pull
• worker͕pub/sub APIΛcallͯ͠ϝοηʔδΛ ϑΣον͢Δ • Push • worker͕HTTPαʔόͱͯ͠ಈ࡞͢Δ • pub/sub͕ϝοηʔδΛPOSTϦΫΤετ͢Δ
Cloud Pub/Sub is Կ • GCPͷpub/subαʔϏε • Subscriberͷ࣮ʹ͍͔ͭ͘ύλʔϯ͕͋Δ • Pull
• worker͕pub/sub APIΛcallͯ͠ϝοηʔδΛ ϑΣον͢Δ • Push • worker͕HTTPαʔόͱͯ͠ಈ࡞͢Δ • pub/sub͕ϝοηʔδΛPOSTϦΫΤετ͢Δ
Cloud Pub/Sub is Կ https://speakerdeck.com/hlts2/sub-worker-framework- implementation?slide=6 ΑΓҾ༻
ࣄྫ 1 1ϝοηʔδͷॲཧʹ͕͔͔࣌ؒͬͯ٧·Δ
1ϝοηʔδͷॲཧʹ͕͔͔࣌ؒͬͯ٧·Δ • 1ϝοηʔδͨΓͷॲཧ͕͍࣌ؒ • ͔ͭಛఆͷSubscriber worker͕େྔͷϝοηʔδΛॲ ཧ͠Α͏ͱͯ͠͠·͏
• Receive Settingͷ࠷దԽ • goroutineͰฒྻॲཧ
Receive Settingͷ࠷దԽ https://github.com/golang/dep/blob/master/docs/assets/DigbyShadows.png ΑΓҾ༻ • σϑΥϧτ • MaxOutstandingMessages: 1000 •
NumGoroutines: 1 →1 worker͕େྔͷϝοηʔδΛอ࣋ͯ͠͠·͏ type ReceiveSettings struct { ... MaxOutstandingMessages int // ϝϞϦʹஔ͍͓ͯ͘ϝοηʔδ NumGoroutines int // ϝοηʔδΛॲཧ͢Δworker goroutine }
goroutineͰฒྻॲཧ • IOόϯυͳॲཧͳͲɺฒྻԽग़དྷΔͱ͜ΖΛͻͨ͢ ΒฒྻԽ
ࣄྫ2 ͋ΔଐੑΛχΞϦΞϧλΠϜͰҰճ͚ͩߋ৽ ͍ͨ͠
͋ΔଐੑΛχΞϦΞϧλΠϜͰҰճ͚ͩߋ৽͍ͨ͠ • ࠂ͕ݟΒΕͨ࣌ɺͦͷࠂʹඥͮ͘ଐੑΛߋ৽ • ݫີʹຖճߋ৽͢Δඞཁͳ͍ • 1͝ͱʹߋ৽͢ΔΑ͏ʹͨ͠ • σʔλετΞͷ࠷ऴߋ৽࣌Ͱ੍ޚ •
ॲཧதʹෳϦΫΤετ͕͋Δͱແବͳߋ৽͕Δ
golang.org/x/sync/singleflight Ͱॏෳߋ৽Λආ͚Δ
golang.org/x/sync/singleflight https://play.golang.org/p/h_lCoqJDp9s package main import ( "fmt" "sync" "time" "golang.org/x/sync/singleflight"
) func main() { var ( sg singleflight.Group wg sync.WaitGroup ) for i := 0; i < 10; i++ { wg.Add(1) go func() { // "key" ͝ͱʹಉ࣮࣌ߦΛߜΔ now, _, shared := sg.Do("key", func() (interface{}, error) { time.Sleep(time.Millisecond * 100) return time.Now(), nil }) fmt.Printf("%v: shared: %t\n", now, shared) wg.Done() }() } wg.Wait() }
golang.org/x/sync/singleflight 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1
+0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true time.Now() ͷ݁Ռ͕ἧ͍ͬͯΔ
ࣄྫ3 όʔετੑ͕͋ΔSubscriberͷAPIίʔϧͰ όοΫΤϯυ͕ࢮ͵
όʔετੑ͕͋ΔSubscriberͷAPIίʔϧͰόοΫΤϯυ͕ࢮ͵ • ϝοηʔδΛड͚ͯόοΫΤϯυͷgRPCαʔόʹϦΫ ΤετΛૹΔαʔϏε • Publishྔ͕֎෦αʔϏεґଘ • ීஈ3,000RPSఔ • ͨ·ʹ20,000RPS͘Β͍
golang.org/x/time/rate ͰBackpressureΛ࣮͢Δ
golang.org/x/time/rate https://play.golang.org/p/HsnbmPfUZ_A package main import ( "context" "fmt" "sync" "time"
"golang.org/x/time/rate" ) func main() { limit := rate.Every(time.Second / 1) // τʔΫϯͷཷ·Δ limiter := rate.NewLimiter(limit, 1) // limitΛݩʹlimiterΛੜɻୈ2ҾτʔΫϯͷ࠷େஷଂྔ(burst) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 10; i++ { limiter.Wait(context.Background()) fmt.Println(time.Now()) } wg.Done() }() wg.Wait() }
golang.org/x/time/rate 2009-11-10 23:00:00 +0000 UTC m=+0.000000001 2009-11-10 23:00:01 +0000 UTC
m=+1.000000001 2009-11-10 23:00:02 +0000 UTC m=+2.000000001 2009-11-10 23:00:03 +0000 UTC m=+3.000000001 2009-11-10 23:00:04 +0000 UTC m=+4.000000001 2009-11-10 23:00:05 +0000 UTC m=+5.000000001 2009-11-10 23:00:06 +0000 UTC m=+6.000000001 2009-11-10 23:00:07 +0000 UTC m=+7.000000001 2009-11-10 23:00:08 +0000 UTC m=+8.000000001 2009-11-10 23:00:09 +0000 UTC m=+9.000000001 େମ1ඵͣͭॲཧ͞Ε͍ͯΔ
·ͱΊ • Cloud Pub/SubΛGoͰͬͯૺ۰ͨ͠ඇಉظॲཧܥͷ ࣄྫʹ͍ͭͯհͨ͠ • Pub/SubʹݶͬͨͰͳ͍ • `golang.org/x/…` ύοέʔδsyncΛ͡Ίɺศརͳ
ͷ͕ଟ͍ͷͰͨ·ʹ८ճ͢Δͱָ͍͠ • golang.org/x/net/netutil ͱ͔͖