Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Go並行処理パターン

sutetotanuki
May 15, 2017
5k

 Go並行処理パターン

https://classmethod.connpass.com/event/55140/ の発表資料

Goの並行処理に必要な幾つかの機能と、それをつかったサンプルをご紹介

sutetotanuki

May 15, 2017
Tweet

More Decks by sutetotanuki

Transcript

  1. ଴ͪΛղফ ฒߦ ॲཧ ॲཧ *0଴ͪ *0଴ͪ ॲཧ ॲཧ *0଴ͪ *0଴ͪ

    *0଴ͪ *0଴ͪ *0଴ͪ *0଴ͪ ଴ͪ࣌ؒΛͳ͘͠ ϨεϙϯεΛૣ͘͢Δ 
  2. func onExit() { fmt.Println("exit!") } func main() { defer onExit()

    fmt.Println("end") } > go run main.go end exit! EFGFS ؔ਺ऴྃ࣌ʹ࣮ߦ͢Δؔ਺Λొ࿥͢Δ
  3. func main() { arr := []int{1,2,3} for _, i :=

    range arr { fmt.Println(i) } } > go run main.go 1 2 3 SBOHF ഑ྻ΍$IBOOFM͔Βஞ࣍తʹ஋ΛऔΓग़͢
  4. ࢖͍ํ func foo() { time.Sleep(1 * time.Second) fmt.Println("2") } func

    main() { fmt.Println("1") // go キーワードの後に関数呼び出しを行うとGoroutineが // 生成され並行に実行される go foo() fmt.Println("3") time.Sleep(2 * time.Second) } 27 > go run main.go 1 3 2
  5. ࢖͍ํ // 送信する値の型を指定しChannelを生成 ch := make(chan int) // 受信(Channelから <-演算子で取り出す)

    val := <- ch // 変数に代入しなくてもいい <- ch // 送信(Channelに向かって <-演算子で送信) ch <- 1 // 閉じる close(ch) 30
  6. ࢖͍ํ func main() { ch := make(chan int) go func()

    { // 送信 ch <- 1 }() // 受信 i := <-ch fmt.Println(i) } 31 > go run main.go 1
  7. ࢖͍ํ func main() { ch := make(chan int) go func()

    { // 送信 ch <- 1 }() // 受信 i := <-ch fmt.Println(i) } 32 DI 
  8. GPSSBOHFΛ࢖ͬͯ ഑ྻͷΑ͏ʹϧʔϓͰ͖Δ func main() { ch := make(chan int) go

    func() { ch <- 1 ch <- 2 close(ch) }() // Channelがcloseされるまでループする for i := range ch { fmt.Println(i) } } 33 > go run main.go 1 2
  9. #VGGFSFE$IBOOFM // bufferの数を指定するとbuffer有りのChannelになる ch := make(chan int, 2) ch <-

    1 ch <- 1 ch <- 1 // bufferの数を超えて書き込もうとするとブロック 34
  10. DMPTF func main() { ch := make(chan int, 1) ch

    <- 1 close(ch) // ch <- 1 CloseされたChannelには送信できない // Closeされても読み出せる fmt.Println(<-ch) // 呼び出せる値がなくなった場合は // ゼロ値(intの場合0、stringの場合"")が返ってくる fmt.Println(<-ch) fmt.Println(<-ch) } 35 > go run main.go 1 0 0
  11. select { case v := <-ch1: // Ch1 から受信 case

    <-ch2: // 受信した場合でも代入式は必須じゃない case ch3 <- 1: // 送信することも可能 default: // 全てのcaseで送受信不能な場合 }  ࢖͍ํ
  12. dataCh1 := make(chan int) dataCh2 := make(chan int) closeCh :=

    make(chan struct{}) go func() { for { // selectは一度処理すると抜けてしまうのでforループで囲む fmt.Println("select start") select { case d := <-dataCh1: fmt.Println(">>> 1:", d) time.Sleep(1 * time.Second) fmt.Println("<<< 1:", d) case d := <-dataCh2: fmt.Println(">>> 2:", d) time.Sleep(1 * time.Second) fmt.Println("<<< 2:", d) case <-closeCh: fmt.Println("close") return } } }() fmt.Println("Send 1 => dataCh1") dataCh1 <- 1 fmt.Println("Send 2 => dataCh1") dataCh1 <- 2 fmt.Println("Send 1 => dataCh2") dataCh2 <- 1 fmt.Println("Send true => closeCh") closeCh <- struct{}{}  > go run main.go Send 1 => dataCh1 select start >>> 1: 1 Send 2 => dataCh1 <<< 1: 1 select start >>> 1: 2 Send 1 => dataCh2 <<< 1: 2 select start >>> 2: 1 Send true => closeCh <<< 2: 1 select start close
  13. func main() { wg := new(sync.WaitGroup) for i := 0;

    i < 10; i++ { // 待つ数を登録する。1回のループにつきGoroutineを // 1つ生成するので1を追加していく wg.Add(1) go func(i int) { // deferキーワードでこの関数を抜けるときに // wg.Done()が呼ばれるように登録する defer wg.Done() time.Sleep(1 * time.Second) fmt.Println(i) }(i) } // wg.Addで追加した数の合計数wg.Done()が呼ばれるまで待機 fmt.Println("** wait **") wg.Wait() fmt.Println("** done **") }  > go run main.go ** wait ** 0 8 6 2 1 3 4 5 7 9 ** done ** ࢖͍ํ
  14. // 終了のお知らせを伝えるためのChannel wait := make(chan struct{}) go func() { time.Sleep(1

    * time.Second) // 処理が終了したらwaitに値を送信 wait <- struct{}{} }() // 値が書き込まれるまでブロック <-wait 
  15. func producer(out chan<- int, v int) { for i :=

    0; i < 10; i++ { out <- v time.Sleep(time.Duration(v) * time.Second) } } func consumer() { ch := make(chan int) go producer(ch, 1) go producer(ch, 2) // producerが送信した値を集約して出力 for i := range ch { fmt.Println(i) } } 
  16. func consumer(ch <-chan int, wg *sync.WaitGroup) { wg.Add(1) defer wg.Done()

    for i := range ch { fmt.Println(i) } } func producer(size int) { queue := make(chan int) wg := new(sync.WaitGroup) go consumer(queue, wg) go consumer(queue, wg) for i := 0; i < size; i++ { queue <- i } close(queue) wg.Wait() } 
  17. func Generator(n int) chan int { ch := make(chan int)

    go func() { for i := 0; i < n; i++ { // 与えられた数まで順番にChannelに書き込んで行く ch <- i } close(ch) }() return ch } func main() { for i := range Generator(10000) { fmt.Printf("%d", i) } } 
  18. func generator(n int) <-chan int { out := make(chan int)

    go func() { defer close(out) for i := 0; i < n; i++ { out <- i } }() return out } func double(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for i := range in { out <- i * 2 } }() return out } func print(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := generator(10) ch2 := double(ch1) print(ch2) }  > go run main.go 0 2 4 6 8 10 12 14 16 18
  19. ୳ࡧϦετʜ ૉ਺Ϧετ 9·Ͱͷ஋Λ୳ࡧϦετ
 ʹೖΕΔ ୳ࡧϦετʜ ૉ਺Ϧετ ୳ࡧϦετͷઌ಄Λ ૉ਺ϦετʹҠ͢ ୳ࡧϦετʜ ૉ਺Ϧετ

    ୳ࡧϦετ͔ΒҠͨ͠ ૉ਺ͷഒ਺Λ࡟আ͢Δ ୳ࡧϦετͷઌ಄Λ ૉ਺ϦετʹҠ͢ ୳ࡧϦετʜ ૉ਺Ϧετ Ҏ߱܁Γฦ͠ʜ
  20. // Send the sequence 2, 3, 4, ... to channel

    'ch'. func Generate(ch chan<- int) { for i := 2; ; i++ { ch <- i // Send 'i' to channel 'ch'. } } // Copy the values from channel 'in' to channel 'out', // removing those divisible by 'prime'. func Filter(in <-chan int, out chan<- int, prime int) { for { i := <-in // Receive value from 'in'. if i%prime != 0 { out <- i // Send 'i' to 'out'. } } } // The prime sieve: Daisy-chain Filter processes. func main() { ch := make(chan int) // Create a new channel. go Generate(ch) // Launch Generate goroutine. for i := 0; i < 10; i++ { prime := <-ch fmt.Println(prime) ch1 := make(chan int) go Filter(ch, ch1, prime) ch = ch1 } } 62
  21. (FOFSBUF ૉ਺ͷ 'JMUFS ૉ਺ͷ 'JMUFS    ʜ */

    */ 065     ʜ ૉ਺ͷ 'JMUFS 065 */    ʜ ૉ਺͕ݟ͔ͭΔຖʹ 'JMUFS͕૿͍͑ͯ͘ ϑΟϧλ͞Εͳ͔ͬͨ ࠷ॳͷ਺͕ૉ਺ QSJNF 
  22. func Generate(ch chan<- int) { for i := 2; ;

    i++ { ch <- i // 2から順に最初のFilterに送信していきます } } 64
  23. // in => 前のFilter(Generator)から数値を渡されるChannel // out => 次のFilterに数値を渡すChannel // prime

    => Filterに設定される素数 func Filter(in <-chan int, out chan<- int, prime int) { for { i := <-in // Filter(Generator)から数値が渡ってくる if i%prime != 0 { out <- i // 自身に渡された素数で割り切れないもの次のFilterに送る } } } 65
  24. func main() { ch := make(chan int) go Generator(ch) //

    素数を10個見つけるまでループ for i := 0; i < 10; i++ {
 // 待機リストの最初の数値を素数として受信 prime := <-ch fmt.Println(prime) // FilterのoutになるChannelを用意 ch1 := make(chan int) go Filter(ch, ch1, prime) // 次のフィルタのinになるChannel変数に // 今回のontのChannelを代入する ch = ch1 } } 66
  25. func fetch(url string) int { res, _ := http.Get(url) return

    res.StatusCode } func crawl(urls []string) { m := make(map[int]int) for _, url := range urls { status := fetch(url) m[status]++ } } ~/w/g/s/g/c/g/p/crawler ››› go build; time ./crawler -n 100 ./crawler -n 100 0.08s user 0.12s system 0% cpu 30.554 total ฒߦॲཧͳ͠ 
  26. func fetch(url string) int { // レスポンスが返るまでブロック res, _ :=

    http.Get(url) return res.StatusCode } ͜ͷϒϩοΫ͕ ϘτϧωοΫ 
  27. func crawl(urls []string) { m := make(map[int]int) // statusを受信するChannel ch

    := make(chan int) for _, url := range urls { go func() { // ここが並行処理 ch <- fetch(url) }() } // urlsの数だけchから読み取る for range urls { m[<-ch]++ } } ~/w/g/s/g/c/g/p/crawler ››› go build; time ./crawler -n 100⏎ ./crawler -n 100 0.04s user 0.03s system 20% cpu 0.338 total ඵ͔Β NT ·Ͱ୹ॖ 
  28. func crawl(urls []string) { m := make(map[int]int) // statusを受信するChannel ch

    := make(chan int) for _, url := range urls { go func() { // ここが並行処理 ch <- fetch(url) }() } // urlsの数だけchから読み取る for range urls { m[<-ch]++ } } ௒͑Δ͘Β͍Ͱ Τϥʔ͕සൃ .BD#PPL"JS ~/w/g/s/g/c/g/p/crawler ››› go build; time ./crawler -n 500 panic: Get http://localhost:65067: dial tcp [::1]:65067: getsockopt: connection refused 
  29. func crawl(urls []string) { m := make(map[int]int) ch := make(chan

    int) // Buffered Channelの指定回数以上送信したら // 受信されるまで送信がブロックされるという特性を利用します sem := make(chan struct{}, 100) go func() { for _, url := range urls { // 100回まで送信するとブロック sem <- struct {}{} go func() { ch <- fetch(url) // 処理が終了すればChannelから受信しバッファを開けることで //同時並行数を開放していく <-sem }() } }() for range urls { m[<-ch]++ } } ~/w/g/s/g/c/g/p/crawler ››› go build; time ./crawler -n 500 ./crawler -n 500 0.17s user 0.14s system 16% cpu 1.895 total ஗͘ͳ͕ͬͨ ਖ਼ৗऴྃ͢ΔΑ͏ʹ 
  30. // Signalを受け取るChannelを宣言 sigCh := make(chan os.Signal, 1) // 受け取るSignalを登録 signal.Notify(sigCh,

    syscall.SIGINT, syscall.SIGUSR1) go func() { for { select { // Signalを待ち受け case sig := <-sigCh: switch sig { case syscall.SIGINT: fmt.Println("SIGINT") os.Exit(0) case syscall.SIGUSR1: fmt.Println("SIGUSR") } } } }() 
  31. func main() { // 3秒毎にChannelに書き込まれる tick := time.Tick(3 * time.Second)

    // 15秒後にChannelに書き込まれる timeout := time.After(15 * time.Second) for { select { case <-tick: fmt.Println("tick") case <-timeout: fmt.Println("timeout") return } } 83
  32. // ジョブ構造体 type Job struct { // 書き込むbyte配列 b []byte

    // 書き込むFileの開始位置 pos int64 } 86
  33. type Worker struct { f *os.File // Fileポインター wg *sync.WaitGroup

    // 処理を待ち合わせるためのWaitGroup queue chan *Job // Jobを受信するChannel stopCh chan bool // 全てのJobが消化されWorkerを終了させたい時のChannel } // 慣例的にNew${構造体名}のメソッドを用意してインスタンスを返します func NewWorker(f *os.File, wg *sync.WaitGroup, queue chan *Job, stopCh chan bool) Worker { return Worker{ f: f, wg: wg, queue: queue, stopCh: stopCh, } } 87
  34. func (w *Worker) Start() { w.wg.Add(1) go func() { defer

    w.wg.Done() for { select { case j := <-w.queue: // queueを受け取ったらその情報でFileに書き込み w.f.WriteAt(j.b, j.pos) case <-w.stopCh: // stopChに値が書き込まれたら関数を抜けて処理を終了します // select内は同時に走らないので最後に受け取ったqueueの処理が終わってから // このセクションが実行される事が保証される return } } }() } 88
  35. func parallelWrite(lines []string) { wg := new(sync.WaitGroup) f, _ :=

    os.Create("test.txt") defer f.Close() stopCh := make(chan bool) queue := make(chan *Job) const workers = 10 // Workerを指定回数スタートさせる for i := 0; i < workers; i++ { worker := NewWorker(f, wg, queue, stopCh) worker.Start() } pos := int64(0) for _, line := range lines { lineBytes := []byte(line + "\n") // Queueを送信。全てのWorkerが処理中なら書き込めずにここでブロック queue <- &Job{ b: lineBytes, pos: pos, } // 書き込まれたbyte数分書き込み位置をすすめる pos += int64(len(lineBytes)) } // workerを終了させるためにstopChに値を書き込んでいく for i := 0; i < workers; i++ { stopCh <- true } wg.Wait() }