Lock in $30 Savings on PRO—Offer Ends Soon! ⏳

サーバーサイドでのKotlin Coroutines

サーバーサイドでのKotlin Coroutines

2022年2月10日(木) 「Server-Side Kotlin Meetup vol.1」の登壇資料です。

Takehata Naoto

February 10, 2022
Tweet

More Decks by Takehata Naoto

Other Decks in Programming

Transcript

  1. 概要 竹端 尚人 主にバックエンドエンジニア 株式会社justInCaseTechnlogies 技術顧問 アスクル株式会社 技術顧問 Twitter: @n_takehata

    • 2006.04 公務員 • 2007.12 SES • 2014.04 株式会社アプリボット(Kotlinを始める) • 2020.06 株式会社ZOZOテクノロジーズ • 2020.12 フリーランス(現在)
  2. 登壇、執筆 • CEDEC 2018、2019登壇 • Software Design 2019年2月号〜4月号で短期連載 「サーバーサイド開発の品質を向上させる Java→Kotlin

    移行のススメ」執筆 • 2021年4月 書籍「Kotlin サーバーサイドプログラミング 実践開発」を出版
  3. coroutineScope { launch { for (i in 1..5) { println("Hello

    A $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { println("Hello B $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { println("Hello C $i thread:${Thread.currentThread()}") } } }
  4. Hello A 1 thread:Thread[main,5,main] Hello A 2 thread:Thread[main,5,main] Hello A

    3 thread:Thread[main,5,main] Hello A 4 thread:Thread[main,5,main] Hello A 5 thread:Thread[main,5,main] Hello B 1 thread:Thread[main,5,main] Hello B 2 thread:Thread[main,5,main] ・・・ Hello C 1 thread:Thread[main,5,main] Hello C 2 thread:Thread[main,5,main] ・・・ Aの1〜5→Bの1〜5→Cの1〜5 の順番で出力される
  5. runBlocking { launch { for (i in 1..5) { delay(1)

    // 中断 println("Hello A $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { delay(1) // 中断 println("Hello B $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { delay(1) // 中断 println("Hello C $i thread:${Thread.currentThread()}") } } }
  6. Hello A 1 thread:Thread[main,5,main] Hello B 1 thread:Thread[main,5,main] Hello C

    1 thread:Thread[main,5,main] Hello A 2 thread:Thread[main,5,main] Hello B 2 thread:Thread[main,5,main] Hello C 2 thread:Thread[main,5,main] Hello A 3 thread:Thread[main,5,main] Hello B 3 thread:Thread[main,5,main] ・・・ Aの1→Bの1→Cの1→Aの2→Bの2・・・ と出力される
  7. runBlocking { launch { for (i in 1..5) { delay(1)

    // 中断 println("Hello A $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { delay(1) // 中断 println("Hello B $i thread:${Thread.currentThread()}") } } launch { for (i in 1..5) { delay(1) // 中断 println("Hello C $i thread:${Thread.currentThread()}") } } } ①中断して次のlaunchへ ②中断して次のlaunchへ ③中断して次のlaunchへ ④復帰して出力 ⑥復帰して出力 ⑧復帰して出力 ⑤中断して次のlaunchへ ⑦中断して次のlaunchへ
  8. val webClient = WebClient.builder().build() coroutineScope { println("${LocalDateTime.now()} start coroutines. thread:${Thread.currentThread()}")

    val deferreds = listOf( async { println("${LocalDateTime.now()} start Google. thread:${Thread.currentThread()}") val response = webClient.get().uri("https://www.google.com/") .accept(MediaType.TEXT_HTML) .retrieve() .awaitBody<String>() println("${LocalDateTime.now()} end Google. thread:${Thread.currentThread()}") Regex("""<title>.*</title>""").find(response)?.value }, async { println("${LocalDateTime.now()} start Yahoo. thread:${Thread.currentThread()}") val response = webClient.get().uri("https://www.yahoo.co.jp/") // 省略・・・ }, async { println("${LocalDateTime.now()} start Bing. thread:${Thread.currentThread()}") val response = webClient.get().uri("https://www.bing.com/") // 省略・・・ } ) println(deferreds.awaitAll()) } ①WebClientを生成 ②URLを設定してGETを実行 ③HTMLからtitleを正規表現で取得 ④全ての実行が終わったら awaitAllで取得して出力
  9. 2022-02-08T08:30:46.381236 start coroutines. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:46.387398 start Google. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:46.978801 start

    Yahoo. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:46.983323 start Bing. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:47.210427 end Yahoo. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:47.239416 end Google. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:30:47.308506 end Bing. thread:Thread[reactor-http-nio-2,5,main] [<title>Google</title>, <title>Yahoo! JAPAN</title>, <title>Bing</title>] • 各asyncブロックがstartし、順に終了している • webClientの実行のところで中断されているのがわかる
  10. coroutineScope { val deferreds = listOf( async { println("${LocalDateTime.now()} start

    Google. thread:${Thread.currentThread()}") val response = restTemplate.getForObject<String>("https://www.google.com/") println("${LocalDateTime.now()} end Google. thread:${Thread.currentThread()}") Regex("""<title>.*</title>""").find(response)?.value }, RestTemplateを使うとブロッキングになってしまうので注意
  11. @GRpcService class GreeterGrpcService : GreeterGrpcKt.GreeterCoroutineImplBase() { override suspend fun hello(request:

    HelloRequest) = HelloResponse.newBuilder() .setText("Hello ${request.name}") .build() } nameをリクエストで受け取って、メッセージを作って textとして返すだけの処理
  12. val channel = ManagedChannelBuilder.forAddress("localhost", 6565) .usePlaintext() .build() val stub =

    GreeterGrpcKt.GreeterCoroutineStub(channel) coroutineScope { println("${LocalDateTime.now()} start coroutines. thread:${Thread.currentThread()}") val deferreds = listOf( async { val request = HelloRequest.newBuilder().setName("Kotlin").build() println("${LocalDateTime.now()} start Kotlin thread:${Thread.currentThread()}") val response = stub.hello(request) println("${LocalDateTime.now()} end Kotlin thread:${Thread.currentThread()}") response.text }, async { val request = HelloRequest.newBuilder().setName("Java").build() // 省略・・・ }, async { val request = HelloRequest.newBuilder().setName("Scala").build() // 省略・・・ } ) println(deferreds.awaitAll()) } ①gRPCのStubを生成 ②リクエストを実行 ③レスポンスからtextを取得 ④全ての実行が終わったら awaitAllで取得して出力
  13. 2022-02-08T08:35:20.178803 start coroutines. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:35:20.186422 start Kotlin thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:35:20.207198 start

    Java thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:35:20.212305 start Scala thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:35:20.422334 end Java thread:Thread[grpc-default-executor-0,5,main] 2022-02-08T08:35:20.422439 end Scala thread:Thread[grpc-default-executor-1,5,main] 2022-02-08T08:35:20.422513 end Kotlin thread:Thread[grpc-default-executor-2,5,main] [Hello Kotlin, Hello Java, Hello scala] • 各asyncブロックがstartし、順に終了している • 実行スレッドはgrpc-default-executorに切り替わる(grpc-kotlinが依存しているgrpc-javaの影響)
  14. CREATE TABLE user(id int PRIMARY KEY, name varchar(32), age int);

    INSERT INTO user VALUES(1, "Kotlin", 10), (2, "Java", 20), (3, "Scala", 8);
  15. coroutineScope { println("${LocalDateTime.now()} start coroutines. thread:${Thread.currentThread()}") val deferreds = listOf(

    async { println("${LocalDateTime.now()} start Kotlin thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(1) println("${LocalDateTime.now()} end Kotlin thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Java thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(2) println("${LocalDateTime.now()} end Java thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Scala thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(3) println("${LocalDateTime.now()} end Scala thread:${Thread.currentThread()}") result?.name } ) println(deferreds.awaitAll()) } ①MyBatisのMapperで主キー検索を実行 ②実行結果からnameを取得 ③全ての実行が終わったら awaitAllで取得して出力
  16. 2022-02-08T08:39:13.124363 start coroutines. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.130277 start Kotlin thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.618934 end

    Kotlin thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.620109 start Java thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.624218 end Java thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.624440 start Scala thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:39:13.627268 end Scala thread:Thread[reactor-http-nio-2,5,main] [Kotlin, Java, Scala] • 各asyncブロックが直列で実行されている • JDBCはデータベースへのアクセス中も中断してくれないため
  17. withContext(Dispatchers.IO) { val deferreds = listOf( async { println("${LocalDateTime.now()} start

    Kotlin thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(1) println("${LocalDateTime.now()} end Kotlin thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Java thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(2) println("${LocalDateTime.now()} end Java thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Scala thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(3) println("${LocalDateTime.now()} end Scala thread:${Thread.currentThread()}") result?.name } ) println(deferreds.awaitAll()) }
  18. withContext(Dispatchers.IO) { val deferreds = listOf( async { println("${LocalDateTime.now()} start

    Kotlin thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(1) println("${LocalDateTime.now()} end Kotlin thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Java thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(2) println("${LocalDateTime.now()} end Java thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Scala thread:${Thread.currentThread()}") val result = userMapper.selectByPrimaryKey(3) println("${LocalDateTime.now()} end Scala thread:${Thread.currentThread()}") result?.name } ) println(deferreds.awaitAll()) }
  19. 2022-02-08T08:43:16.225768 start coroutines. thread:Thread[DefaultDispatcher-worker-1,5,main] 2022-02-08T08:43:16.228337 start Kotlin thread:Thread[DefaultDispatcher-worker-3,5,main] 2022-02-08T08:43:16.232434 start

    Scala thread:Thread[DefaultDispatcher-worker-4,5,main] 2022-02-08T08:43:16.232335 start Java thread:Thread[DefaultDispatcher-worker-2,5,main] 2022-02-08T08:43:16.729098 end Kotlin thread:Thread[DefaultDispatcher-worker-3,5,main] 2022-02-08T08:43:16.734883 end Scala thread:Thread[DefaultDispatcher-worker-4,5,main] 2022-02-08T08:43:16.738588 end Java thread:Thread[DefaultDispatcher-worker-2,5,main] [Kotlin, Java, Scala] Contextを切り替えて各asyncブロックが並列で実行されている
  20. interface UserRepository : CoroutineCrudRepository<User, Int> { } データクラスとRepositoryの作成 data class

    User( @Id val id: Int, val name: String, val age: Int ) • テーブルに構造に紐づいたデータクラスを作成 • CoroutineCrudRepositoryを実装したRepositoryを作成(これで各 種アクセスの関数が使える )
  21. /** * Retrieves an entity by its id. * *

    @param id must not be null. * @return [Mono] emitting the entity with the given id or empty if none found. * @throws IllegalArgumentException in case the given id is null. */ suspend fun findById(id: ID): T? /** * Returns whether an entity with the given id exists. * * @param id must not be null. * @return true if an entity with the given id exists, false otherwise. * @throws IllegalArgumentException in case the given id is null. */ suspend fun existsById(id: ID): Boolean /** * Deletes the entity with the given id. * * @param id must not be null. * @throws IllegalArgumentException in case the given id is null. */ suspend fun deleteById(id: ID)
  22. coroutineScope { println("${LocalDateTime.now()} start coroutines. thread:${Thread.currentThread()}") val deferreds = listOf(

    async { println("${LocalDateTime.now()} start Kotlin thread:${Thread.currentThread()}") val result = userRepository.findById(1) println("${LocalDateTime.now()} end Kotlin thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Java thread:${Thread.currentThread()}") val result = userRepository.findById(2) println("${LocalDateTime.now()} end Java thread:${Thread.currentThread()}") result?.name }, async { println("${LocalDateTime.now()} start Scala thread:${Thread.currentThread()}") val result = userRepository.findById(3) println("${LocalDateTime.now()} end Scala thread:${Thread.currentThread()}") result?.name } ) println(deferreds.awaitAll()) } ①Repositoryで主キー検索を実行 ②実行結果からnameを取得 ③全ての実行が終わったら awaitAllで取得して出力
  23. 2022-02-08T08:46:28.149196 start coroutines. thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:46:28.156509 start Kotlin thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:46:28.448620 start

    Java thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:46:28.504981 start Scala thread:Thread[reactor-http-nio-2,5,main] 2022-02-08T08:46:28.941121 end Kotlin thread:Thread[reactor-tcp-nio-2,5,main] 2022-02-08T08:46:28.982010 end Java thread:Thread[reactor-tcp-nio-2,5,main] 2022-02-08T08:46:29.017775 end Scala thread:Thread[reactor-tcp-nio-2,5,main] [Kotlin, Java, Scala] • 各asyncブロックがstartし、順に終了している • 実行スレッドはR2DBCの関数実行後はreactor-tcp-nioに切り替わる
  24. interface UserRepository : CoroutineCrudRepository<User, Int> { suspend fun findByName(name: String):

    List<User> @Query( """ SELECT * FROM user ORDER BY age LIMIT 1 """ ) suspend fun findMostYoung(): User } • シンプルなクエリは、 findByNameなど機能とカラム名の命名規則で作れる • @Queryを使うことで自由にクエリを定義することもできる • これもノンブロッキングになる