Upgrade to Pro — share decks privately, control downloads, hide ads and more …

pubsub with concurrent

pubsub with concurrent

takashabe

May 27, 2019
Tweet

More Decks by takashabe

Other Decks in Programming

Transcript

  1. whoami • Takashi Abe (@takashabe) • גࣜձࣾαΠόʔΤʔδΣϯτ ΞυςΫελδΦ • ޿ࠂ഑৴ϓϩμΫτΛGoͰ࡞͍ͬͯ·͢

    • Goྺ3೥͘Β͍ • ޷͖ͳඪ४ύοέʔδ͸ `go` ύοέʔδͰ͢ • Macͷ`g`Ωʔ͕յΕ͍ͯͯGo͕ଧͪͮΒͯ͘ࠔ͍ͬͯ·͢
  2. Cloud Pub/Sub is Կ • GCPͷpub/subαʔϏε • Subscriberͷ࣮૷ʹ͍͔ͭ͘ύλʔϯ͕͋Δ • Pull

    • worker͕pub/sub APIΛcallͯ͠ϝοηʔδΛ ϑΣον͢Δ • Push • worker͕HTTPαʔόͱͯ͠ಈ࡞͢Δ • pub/sub͕ϝοηʔδΛPOSTϦΫΤετ͢Δ
  3. Cloud Pub/Sub is Կ • GCPͷpub/subαʔϏε • Subscriberͷ࣮૷ʹ͍͔ͭ͘ύλʔϯ͕͋Δ • Pull

    • worker͕pub/sub APIΛcallͯ͠ϝοηʔδΛ ϑΣον͢Δ • Push • worker͕HTTPαʔόͱͯ͠ಈ࡞͢Δ • pub/sub͕ϝοηʔδΛPOSTϦΫΤετ͢Δ
  4. Receive Settingͷ࠷దԽ https://github.com/golang/dep/blob/master/docs/assets/DigbyShadows.png ΑΓҾ༻ • σϑΥϧτ • MaxOutstandingMessages: 1000 •

    NumGoroutines: 1 →1 worker͕େྔͷϝοηʔδΛอ࣋ͯ͠͠·͏ type ReceiveSettings struct { ... MaxOutstandingMessages int // ϝϞϦ಺ʹஔ͍͓ͯ͘ϝοηʔδ਺ NumGoroutines int // ϝοηʔδΛॲཧ͢Δworker goroutine }
  5. golang.org/x/sync/singleflight https://play.golang.org/p/h_lCoqJDp9s package main import ( "fmt" "sync" "time" "golang.org/x/sync/singleflight"

    ) func main() { var ( sg singleflight.Group wg sync.WaitGroup ) for i := 0; i < 10; i++ { wg.Add(1) go func() { // "key" ͝ͱʹಉ࣮࣌ߦΛߜΔ now, _, shared := sg.Do("key", func() (interface{}, error) { time.Sleep(time.Millisecond * 100) return time.Now(), nil }) fmt.Printf("%v: shared: %t\n", now, shared) wg.Done() }() } wg.Wait() }
  6. golang.org/x/sync/singleflight 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1

    +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true 2009-11-10 23:00:00.1 +0000 UTC m=+0.100000001: shared: true time.Now() ͷ݁Ռ͕ἧ͍ͬͯΔ
  7. golang.org/x/time/rate https://play.golang.org/p/HsnbmPfUZ_A package main import ( "context" "fmt" "sync" "time"

    "golang.org/x/time/rate" ) func main() { limit := rate.Every(time.Second / 1) // τʔΫϯͷཷ·Δ଎౓ limiter := rate.NewLimiter(limit, 1) // limitΛݩʹlimiterΛੜ੒ɻୈ2Ҿ਺͸τʔΫϯͷ࠷େஷଂྔ(burst) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 10; i++ { limiter.Wait(context.Background()) fmt.Println(time.Now()) } wg.Done() }() wg.Wait() }
  8. golang.org/x/time/rate 2009-11-10 23:00:00 +0000 UTC m=+0.000000001 2009-11-10 23:00:01 +0000 UTC

    m=+1.000000001 2009-11-10 23:00:02 +0000 UTC m=+2.000000001 2009-11-10 23:00:03 +0000 UTC m=+3.000000001 2009-11-10 23:00:04 +0000 UTC m=+4.000000001 2009-11-10 23:00:05 +0000 UTC m=+5.000000001 2009-11-10 23:00:06 +0000 UTC m=+6.000000001 2009-11-10 23:00:07 +0000 UTC m=+7.000000001 2009-11-10 23:00:08 +0000 UTC m=+8.000000001 2009-11-10 23:00:09 +0000 UTC m=+9.000000001 େମ1ඵͣͭॲཧ͞Ε͍ͯΔ