Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Kotlin Coroutines

Kotlin Coroutines

Jussi Pohjolainen

March 28, 2024
Tweet

More Decks by Jussi Pohjolainen

Other Decks in Technology

Transcript

  1. Process and Thread • Process • A process is an

    instance of a program running on a computer. • It is a self-contained execution environment and typically has its own memory space. • Processes are independent of each other • Thread • A thread is a smaller sequence of programmed instructions within a process • Threads within the same process share the same memory space but operate independently • Multithreading allows a process to perform multiple tasks concurrently
  2. What is a Thread? • A thread is a single

    sequential flow of execution of tasks of a process in an operating system (OS). • OS threads are at the core of Java’s concurrency model • Java threads are just a wrapper at OS thread • JVM must work with the underlying OS to create and manage that thread, which is quite expensive because the JVM must communicate with the OS back and forth throughout the thread’s lifetime • This switching is an expensive operation, which makes threads expensive
  3. Java public class Main { public static void main(final String[]

    args) { for(int i=0; i<10; i!++) { generateThread().start(); } } public static Thread generateThread() { Thread t = new Thread(() -> { for(int i=0; i<10; i!++) { System.out.println(Thread.currentThread().getName() + " i = " + i); try { Thread.sleep(1000); } catch(InterruptedException e) { e.printStackTrace(); } } }); return t; } }
  4. Thread 1 / CPU Thread 2 / CPU Thread 3

    / CPU Java supports multithreading at OS Level. True parallel execution
  5. Multithreaded environment THREAD 1 i = 1 i = 2

    i = 3 i = 4 i = 5 i = 6 i = 7 i = 8 THREAD 2 i = 1 i = 2 i = 3 i = 4 i = 5 i = 6 i = 7 i = 8
  6. One threaded environment THREAD 1 method1 i = 1 i

    = 2 i = 3 i = 4 THREAD 1 method2 i = 1 i = 2 i = 3 i = 4
  7. Kotlin import kotlin.concurrent.thread fun main() { val t1 = Thread({

    println("hello") }) t1.start() val t2 = Thread() { println("hello") } t2.start() Thread { println("hello") }.start() thread { println("hello") } }
  8. Kotlin import kotlin.concurrent.thread fun main() { repeat(10) { generateThread() }

    } fun generateThread() = thread { for(i in 1!..10) { println("${Thread.currentThread().name} i = $i") Thread.sleep(1000); } }
  9. Kotlin coroutine • Kotlin coroutines are entirely managed by the

    language • Do not require communication with the underlying OS to be created and managed • Coroutines are lightweight and can be launched millions at once • Java 19 introduced the concept of virtual threads. Virtual threads are lightweight threads that are managed by the JVM rather than the OS
  10. Kotlin Coroutine • Coroutines are not bound to any particular

    thread • Coroutines can be • Creating async tasks only one thread (like js) – giving illusion of simultaneous execution. • Creating async tasks using thread pool • Dispatchers.IO, Dispatchers.Default • Default thread pool amount is the number of CPU cores • Runtime.getRuntime().availableProcessors()
  11. Two java threads import kotlin.concurrent.thread fun main() { thread {

    repeat(10) { println(it) Thread.sleep(1000) } } thread { repeat(10) { println(it) Thread.sleep(1000) } } } App creates two threads and when threads have finished app ends
  12. package org.example import kotlinx.coroutines.* fun main() { println("Thread name: ${Thread.currentThread().name}")

    runBlocking { launch { repeat(10) { println("Thread name: ${Thread.currentThread().name}: $it") delay(1000) } } launch { repeat(10) { println("Thread name: ${Thread.currentThread().name}: $it") delay(1000) } } } println("Thread name: ${Thread.currentThread().name}") } Only one thread is used! Runblocking scope is for main method or testing, it waits that all coroutines are done and then ends the app
  13. Two coroutines fun main() { runBlocking { launch { repeat(10)

    { println("A ${Thread.currentThread().name}: $it") delay(1000) } } launch { repeat(10) { println("B ${Thread.currentThread().name}: $it") delay(1000) } } } } It is now synchronous
  14. Two coroutines fun main() { runBlocking { launch(Dispatchers.Default) { repeat(10)

    { println("A ${Thread.currentThread().name}: $it") delay(1000) } } launch(Dispatchers.Default) { repeat(10) { println("B ${Thread.currentThread().name}: $it") delay(1000) } } } } Dispatchers.Default => CPU intensive work, creates thread pool
  15. Two coroutines fun main() { runBlocking { launch(Dispatchers.Default) { repeat(100)

    { println("A ${Thread.currentThread().name}: $it") delay(1000) } } launch(Dispatchers.Default) { repeat(100) { println("B ${Thread.currentThread().name}: $it") delay(1000) } } } } It will do some multitasking now
  16. Job and cancel fun main() { runBlocking { val job1

    : Job = launch(Dispatchers.Default) { repeat(10) { println("A ${Thread.currentThread().name}: $it") delay(1000) } } job1.cancel() !// join() } } It can cancel/wait the coroutine
  17. Job and join fun main() { runBlocking { val jobs

    : List<Job> = List(10) { launch(Dispatchers.Default) { repeat(10) { println("${Thread.currentThread().name}: $it") delay(200) } } } jobs.forEach { it.join() } !// Wait for all jobs to complete println("done") } } Waits until the job is done
  18. async fun main() { runBlocking { val coroutine: Deferred<Int> =

    async(Dispatchers.Default) { var sum = 0 repeat(10) { sum += it delay(100) } sum } val result : Int = coroutine.await() println(result) } } In lambda, the last expression value in the block is return value Will wait until it receives the result Will hold the sum
  19. awaitAll fun main() { runBlocking { val coroutine1: Deferred<Int> =

    async(Dispatchers.Default) { delay(100) 1 } val coroutine2: Deferred<Int> = async(Dispatchers.Default) { delay(300) 2 } val list : List<Deferred<Int!>> = listOf(coroutine1, coroutine2) val results : List<Int> = list.awaitAll() println(results) } } Waits for all to be complete
  20. async fun main() { runBlocking { val coroutines : List<Deferred<Int!>>

    = List(100) { async(Dispatchers.Default) { delay(2000) (0!..10).random() } } val results : List<Int> = coroutines.awaitAll() println(results) } } Uses thread pool Will wait until it receives the result
  21. Coroutine Context • A CoroutineContext is a set of data

    that governs coroutine behavior. It typically includes: • Job -> can control lifecycle, like cancel • Dispatcher -> what thread? • CoroutineName -> optional name for debugging
  22. Suspend Function • A suspend function is a function in

    Kotlin that • can pause its execution without blocking the underlying threa • Allows other tasks to run concurrently. • Suspend function usually has a suspend point. • When a suspend function reaches a suspension point • it saves its state and later resumes execution once the operation is complete • all without halting the progress of the thread it's running on.
  23. Suspend Point? • A suspend point is any function that

    suspends execution and allows other coroutines to run in the meantime. Common suspend points include: • delay • withContext(!..) • await() • calling another suspend function. • Rule: A suspend function must always be called inside a coroutine context • runBlocking • launch or async • Another suspend function
  24. suspend function package org.example import kotlinx.coroutines.* import kotlin.concurrent.thread fun doSomething():

    String { delay(1000) return "Hello after delay" } fun main() { runBlocking { val r = doSomething() } } Suspend function 'suspend fun delay(timeMillis: Long): Unit' should be called only from a coroutine or another suspend function.
  25. suspend function package org.example import kotlinx.coroutines.* import kotlin.concurrent.thread suspend fun

    doSomething(): String { delay(1000) !// Suspend point (coroutine pauses and resumes after 1 second) return "Hello after delay" } fun main() { runBlocking { val r = doSomething() println(r) } } suspend tells the compiler that this function may pause execution at certain points (delay, withContext, network calls, etc.) and resume later. These points are not allowed in normal functions.
  26. import java.nio.file.Files import java.nio.file.Path import kotlin.concurrent.thread fun readFile(path: Path, callback:

    (String) -> Unit) { thread { val content = Files.readString(path) callback(content) } } fun writeFile(path: Path, content: String, callback: () -> Unit) { thread { Files.writeString(path, content) callback() } } fun copyFile(source: Path, destination: Path, callback: (Boolean) -> Unit) { readFile(source) { content -> writeFile(destination, content) { callback(true) } } } fun main() { copyFile(Path.of("build.gradle.kts"), Path.of("copy.txt")) { result -> println("Copy success: $result") } "// Keep JVM alive long enough for threads to finish (only for demo) Thread.sleep(1000) } Callback hell
  27. package org.example import java.nio.file.Files import java.nio.file.Path import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext

    import kotlinx.coroutines.* suspend fun readFile(path: Path): String { return withContext(Dispatchers.IO) { Files.readString(path) } } suspend fun writeFile(path: Path, content: String) { return withContext(Dispatchers.IO) { Files.writeString(path, content) } } suspend fun copyFile(source: Path, destination: Path): Boolean { try { val content = readFile(source) writeFile(destination, content) return true } catch (e: Exception) { return false } } fun main() { runBlocking { val result = copyFile(Path.of("build.gradle.kts"), Path.of("copy.txt")) println("Copy success: $result") } } Easier to read, exception handling
  28. Copying at the same time fun main() { runBlocking {

    val paths : List<String> = listOf("a.txt", "b.txt", "c.txt") val defs : List<Deferred<Boolean!>> = paths.mapIndexed { index, file -> async { copyFile(Path.of(file), Path.of("copy$index.txt")) } } val results = defs.awaitAll() println("Results: $results") } } Several copying at once
  29. Android • in an Android app, the point of having

    suspend functions like copyFile(!!...) is to offload heavy or blocking work off the main (UI) thread. • The main thread in Android handles UI rendering, touch events, animations, etc. • If you do blocking operations (like Files.readString(...)) on the main thread, Android will throw NetworkOnMainThreadException or ANR (Application Not Responding). • By wrapping file I/O in withContext(Dispatchers.IO), you:
  30. Android @Composable fun CopyButton() { val scope = rememberCoroutineScope() Button(onClick

    = { scope.launch { val result = copyFile( Path.of("/data/data/org.example/files/source.txt"), Path.of("/data/data/org.example/files/copy.txt") ) println("Copy result: $result") } }) { Text("Copy File") } } Creates coroutine context where coroutine is cancelled if Button leaves the UI
  31. suspend function and withContext package org.example import kotlinx.coroutines.* suspend fun

    fetchData(id: Int): String { println("2: " + Thread.currentThread().name) val value = withContext(Dispatchers.IO) { println("3: " + Thread.currentThread().name) delay(1000) !// Simulating some background work "Data from task $id on ${Thread.currentThread().name}" } return value } fun main() = runBlocking { println("1: " + Thread.currentThread().name) val deferredList : List<Deferred<String!>> = listOf( async { fetchData(1) }, async { fetchData(2) } ) val results = deferredList.awaitAll() println(results) !// Prints all results after both coroutines complete }
  32. package org.example import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse import java.net.URI

    import java.util.concurrent.CompletableFuture fun main() { !// Create the HttpClient val httpClient: HttpClient = HttpClient.newHttpClient() !// Build the HttpRequest for the Chuck Norris API val httpRequest: HttpRequest = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() .build() !// Send the asynchronous request and process the response val futureResponse: CompletableFuture<HttpResponse<String!>> = httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofString()) futureResponse .thenApply { response: HttpResponse<String> -> val responseBody: String = response.body() !// Extract response body responseBody } .thenAccept { responseBody: String -> println("Response: $responseBody") !// Print the result } .join() !// Wait for completion }
  33. fun main() { !// Create the HttpClient val client =

    HttpClient.newHttpClient() !// Build the HttpRequest for the Chuck Norris API val request = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() !// Use the GET method .build() !// Send the asynchronous request and process the response var completableFuture = client.sendAsync (request, HttpResponse.BodyHandlers.ofString()) println("Start") completableFuture.thenApply { it.body() !// let's get the body of the http response }.thenAccept { println(it) !// and then print it }.join() } callbacks
  34. fun fetchChuckNorrisJoke(): CompletableFuture<String> { val client = HttpClient.newHttpClient() val request

    = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() .build() !// Starts to fetch immediately val httpResponse : CompletableFuture<HttpResponse<String!>> = client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) val body : CompletableFuture<String> = httpResponse.thenApply { resp : HttpResponse<String> - > resp.body() } return body } fun main() { val futureJoke : CompletableFuture<String> = fetchChuckNorrisJoke() println("START") val future : CompletableFuture<Void> = futureJoke.thenAccept { response : String -> println("Received response: $response") } println("STOP") future.join() }
  35. fun fetchChuckNorrisJoke(): CompletableFuture<String> { val client = HttpClient.newHttpClient() val request

    = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() .build() return client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .thenApply { it.body() } } fun main() { fetchChuckNorrisJoke().thenAccept { println("Received response: $it") }.join() !// Wait for the asynchronous operation to complete } Helper method Callback
  36. !// Declares a function that supports suspension. !// Effectively, this

    makes the fetchChuckNorrisJoke() function suspend until the HTTP request completes, !// allowing other coroutines to run in the meantime. suspend fun fetchChuckNorrisJoke(): String { !// suspendCancellableCoroutine: Converts a callback-based async operation into a suspend function. !// The coroutine suspends until continuation.resume(result) is called. val value = suspendCancellableCoroutine { continuation -> val client = HttpClient.newHttpClient() val request = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() .build() client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .thenApply { it.body() }.thenAccept { !// Resumes the suspended coroutine when the HTTP request completes. !// Passes the joke (response body) as the function’s return value. continuation.resume(it) } } return value }
  37. fun main() { runBlocking { try { val joke =

    fetchChuckNorrisJoke() println("Received joke: $joke") } catch (e: Exception) { println("Failed to fetch joke: ${e.message}") } } }
  38. fun main() { runBlocking { val jokeDeferred1 = async {

    fetchChuckNorrisJoke() } val jokeDeferred2 = async { fetchChuckNorrisJoke() } !// Wait for both operations to complete and get their results. val joke1 = jokeDeferred1.await() val joke2 = jokeDeferred2.await() !// Print the results. println("Joke 1: $joke1") println("Joke 2: $joke2") } }
  39. Turning sync to async • Last example had already threading

    and callbacks • With suspendCancellableCoroutine it is possible to turn the function into a suspend function • If function does not have threading built in, you can use withContext
  40. package org.example import kotlinx.coroutines.* import java.net.http.HttpClient import java.net.http.HttpRequest import java.net.http.HttpResponse

    import java.net.URI suspend fun fetchRandomJoke(): String { val client = HttpClient.newHttpClient() val request = HttpRequest.newBuilder() .uri(URI.create("https:!//api.chucknorris.io/jokes/random")) .GET() .build() return withContext(Dispatchers.IO) { println(Thread.currentThread().name) val response = client.send(request, HttpResponse.BodyHandlers.ofString()) if(response.statusCode() !!= 200) { throw RuntimeException("Failed to fetch joke, status code: ${response.statusCode()}") } response.body() } } fun main() { runBlocking { println(Thread.currentThread().name) val deferredJoke : Deferred<String> = async { fetchRandomJoke() } val joke = deferredJoke.await() println(joke) val deferredList = List(10) { async { fetchRandomJoke() } } val jokes = deferredList.awaitAll() println(jokes) } }