# Kotlin Coroutines and Structured Concurrency Tutorial ## Table of Contents 1. [Introduction](#Introduction) 2. [Coroutines Fundamentals](#Coroutines-Fundamentals) 3. [Structured Concurrency](#Structured-Concurrency) 4. [Workflow and Data Flow](#Workflow-and-Data-Flow) 5. [Real-World Examples](#Real-World-Examples) 6. [Best Practices](#Best-Practices) 7. [Common Patterns](#Common-Patterns) ## Introduction Kotlin Coroutines provide a powerful way to handle asynchronous programming with a sequential coding style. They enable you to write non-blocking code that looks and feels like synchronous code, making it easier to reason about and maintain. ### Key Benefits - **Lightweight**: Coroutines are much lighter than threads - **Sequential Code**: Write asynchronous code that looks synchronous - **Structured Concurrency**: Automatic lifecycle management and error handling - **Cancellation**: Built-in cancellation support - **Exception Handling**: Proper exception propagation ## Coroutines Fundamentals ### What are Coroutines? Coroutines are lightweight threads that can be suspended and resumed without blocking the underlying thread. They're perfect for I/O operations, network calls, and other asynchronous tasks. ```kotlin import kotlinx.coroutines.* suspend fun fetchUserData(userId: String): User { delay(1000) // Simulates network call return User(userId, "John Doe") } fun main() = runBlocking { val user = fetchUserData("123") println("User: ${user.name}") } ``` ### Coroutine Builders #### 1. runBlocking Used to bridge blocking and non-blocking code, typically in main functions or tests. ```kotlin fun main() = runBlocking { println("Starting...") delay(1000) println("Done!") } ``` #### 2. launch Fire-and-forget coroutine that returns a Job. ```kotlin fun main() = runBlocking { val job = launch { delay(1000) println("Background task completed") } println("Main thread continues...") job.join() // Wait for completion } ``` #### 3. async Returns a Deferred that can be awaited for a result. ```kotlin fun main() = runBlocking { val deferred = async { delay(1000) "Result from async" } println("Waiting for result...") val result = deferred.await() println(result) } ``` ### Coroutine Context and Dispatchers Dispatchers determine which thread or thread pool the coroutine runs on. ```kotlin import kotlinx.coroutines.* fun main() = runBlocking { // Main dispatcher (UI thread in Android) launch(Dispatchers.Main) { // Update UI } // IO dispatcher for network/disk operations launch(Dispatchers.IO) { // Network call or file I/O } // Default dispatcher for CPU-intensive work launch(Dispatchers.Default) { // Heavy computation } // Unconfined dispatcher launch(Dispatchers.Unconfined) { // Runs in caller thread until first suspension } } ``` ## Structured Concurrency Structured concurrency ensures that coroutines are properly managed within a defined scope, providing automatic cancellation and error handling. ### Coroutine Scope Every coroutine runs within a scope that defines its lifecycle. ```kotlin class UserRepository { private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) fun fetchUsers(): Flow<List<User>> = flow { while (true) { val users = loadUsersFromApi() emit(users) delay(30000) // Refresh every 30 seconds } }.flowOn(Dispatchers.IO) fun cleanup() { scope.cancel() // Cancels all child coroutines } } ``` ### Supervision #### SupervisorJob Failures in child coroutines don't cancel the parent or siblings. ```kotlin fun main() = runBlocking { val supervisor = SupervisorJob() val scope = CoroutineScope(Dispatchers.Default + supervisor) scope.launch { // This failure won't cancel other children throw Exception("Child 1 failed") } scope.launch { delay(2000) println("Child 2 completed successfully") } delay(3000) } ``` #### supervisorScope Creates a supervised scope for handling failures. ```kotlin suspend fun processData() = supervisorScope { val job1 = launch { processChunk1() } val job2 = launch { processChunk2() } val job3 = launch { processChunk3() } // If any job fails, others continue joinAll(job1, job2, job3) } ``` ## Workflow and Data Flow ### Sequential vs Concurrent Execution #### Sequential Execution ``` Time → ┌─────────┐┌─────────┐┌─────────┐ │ Task A ││ Task B ││ Task C │ └─────────┘└─────────┘└─────────┘ ``` ```kotlin suspend fun sequentialExecution() { val resultA = fetchDataA() // 1 second val resultB = fetchDataB() // 1 second val resultC = fetchDataC() // 1 second // Total: 3 seconds } ``` #### Concurrent Execution ``` Time → ┌─────────┐ │ Task A │ ├─────────┤ │ Task B │ ├─────────┤ │ Task C │ └─────────┘ ``` ```kotlin suspend fun concurrentExecution() = coroutineScope { val deferredA = async { fetchDataA() } val deferredB = async { fetchDataB() } val deferredC = async { fetchDataC() } val resultA = deferredA.await() val resultB = deferredB.await() val resultC = deferredC.await() // Total: 1 second (all run in parallel) } ``` ### Flow - Reactive Streams Flow represents a stream of data that can be processed asynchronously. #### Data Flow Pipeline ``` Data Source → Transform → Filter → Collect ↓ ↓ ↓ ↓ Network map { } filter { } collect { } ``` ```kotlin fun userDataFlow(): Flow<User> = flow { val userIds = listOf("1", "2", "3", "4", "5") userIds.forEach { id -> emit(fetchUser(id)) delay(100) // Simulate rate limiting } } suspend fun processUsers() { userDataFlow() .filter { it.isActive } .map { it.toDisplayModel() } .flowOn(Dispatchers.IO) .collect { user -> println("Processing: ${user.name}") } } ``` ### Error Handling Flow ``` ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ Parent │───▶│ Child │───▶│ Grandchild │ │ Scope │ │ Coroutine │ │ Coroutine │ └─────────────┘ └──────────────┘ └─────────────┘ ▲ │ │ │ ▼ ▼ └───── Exception ────┘ [Cancelled] Propagation ``` ## Real-World Examples ### 1. Android Network Repository ```kotlin class WeatherRepository( private val apiService: WeatherApiService, private val database: WeatherDatabase ) { private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) fun getWeatherUpdates(city: String): Flow<Weather> = flow { // Emit cached data first val cached = database.getWeather(city) if (cached != null) { emit(cached) } // Then fetch fresh data try { val fresh = apiService.getWeather(city) database.saveWeather(fresh) emit(fresh) } catch (e: Exception) { if (cached == null) throw e // Keep using cached data if network fails } }.flowOn(Dispatchers.IO) suspend fun refreshWeather(city: String) = withContext(Dispatchers.IO) { try { val weather = apiService.getWeather(city) database.saveWeather(weather) } catch (e: NetworkException) { // Handle network errors throw WeatherFetchException("Failed to refresh weather", e) } } } ``` ### 2. Parallel Data Processing ```kotlin class DataProcessor { suspend fun processLargeDataset(data: List<DataItem>): List<ProcessedItem> = coroutineScope { // Split data into chunks for parallel processing val chunks = data.chunked(100) chunks.map { chunk -> async(Dispatchers.Default) { chunk.map { item -> processItem(item) // CPU-intensive operation } } }.awaitAll().flatten() } private suspend fun processItem(item: DataItem): ProcessedItem { // Simulate heavy computation delay(10) return ProcessedItem( id = item.id, result = item.value * 2, timestamp = System.currentTimeMillis() ) } } ``` ### 3. Rate-Limited API Client ```kotlin class RateLimitedApiClient { private val semaphore = Semaphore(5) // Max 5 concurrent requests private val rateLimiter = Channel<Unit>(capacity = 10) init { // Fill rate limiter with permits every second CoroutineScope(Dispatchers.Default).launch { while (true) { repeat(10) { rateLimiter.trySend(Unit) } delay(1000) } } } suspend fun makeRequest(url: String): ApiResponse = semaphore.withPermit { rateLimiter.receive() // Wait for rate limit permit withContext(Dispatchers.IO) { // Make actual HTTP request httpClient.get(url) } } } ``` ### 4. Event-Driven System ```kotlin class EventProcessor { private val eventChannel = Channel<Event>(Channel.UNLIMITED) private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) init { // Start event processing scope.launch { eventChannel.consumeAsFlow() .buffer(capacity = 100) .collect { event -> processEvent(event) } } } fun sendEvent(event: Event) { eventChannel.trySend(event) } private suspend fun processEvent(event: Event) { when (event) { is UserLoginEvent -> handleUserLogin(event) is OrderCreatedEvent -> handleOrderCreated(event) is PaymentProcessedEvent -> handlePaymentProcessed(event) } } } ``` ### 5. WebSocket Connection Manager ```kotlin class WebSocketManager { private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val messageFlow = MutableSharedFlow<String>() private var connection: WebSocket? = null fun connect(url: String): Flow<ConnectionState> = flow { emit(ConnectionState.Connecting) try { connection = WebSocket.connect(url) { message -> messageFlow.tryEmit(message) } emit(ConnectionState.Connected) // Keep connection alive while (connection?.isActive == true) { delay(30000) connection?.ping() } } catch (e: Exception) { emit(ConnectionState.Error(e)) } finally { emit(ConnectionState.Disconnected) } }.flowOn(Dispatchers.IO) fun sendMessage(message: String) { scope.launch { connection?.send(message) } } fun observeMessages(): Flow<String> = messageFlow.asSharedFlow() fun disconnect() { scope.cancel() connection?.close() } } ``` ### 6. Background Job Scheduler ```kotlin class JobScheduler { private val jobQueue = Channel<Job>(Channel.UNLIMITED) private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) init { // Start job processing workers repeat(3) { workerId -> scope.launch { for (job in jobQueue) { try { processJob(job, workerId) } catch (e: Exception) { handleJobFailure(job, e) } } } } } fun scheduleJob(job: Job) { jobQueue.trySend(job) } private suspend fun processJob(job: Job, workerId: Int) { println("Worker $workerId processing job ${job.id}") when (job.type) { JobType.EMAIL -> sendEmail(job.data) JobType.IMAGE_PROCESSING -> processImage(job.data) JobType.REPORT_GENERATION -> generateReport(job.data) } println("Worker $workerId completed job ${job.id}") } } ``` ## Best Practices ### 1. Use Structured Concurrency ```kotlin // ✅ Good: Use coroutineScope for structured concurrency suspend fun loadUserProfile(userId: String) = coroutineScope { val userDeferred = async { userService.getUser(userId) } val postsDeferred = async { postService.getUserPosts(userId) } val friendsDeferred = async { friendService.getUserFriends(userId) } UserProfile( user = userDeferred.await(), posts = postsDeferred.await(), friends = friendsDeferred.await() ) } // ❌ Bad: GlobalScope can lead to memory leaks fun loadUserProfileBad(userId: String) { GlobalScope.launch { // This coroutine may outlive the calling component } } ``` ### 2. Handle Cancellation Properly ```kotlin suspend fun longRunningTask() { repeat(1000) { i -> ensureActive() // Check if cancelled // Do some work processItem(i) delay(100) // Cancellation point } } ``` ### 3. Use Appropriate Dispatchers ```kotlin class UserService { // ✅ Good: Use IO dispatcher for network calls suspend fun fetchUser(id: String): User = withContext(Dispatchers.IO) { apiClient.getUser(id) } // ✅ Good: Use Default dispatcher for CPU work suspend fun processUserData(data: List<User>): ProcessedData = withContext(Dispatchers.Default) { data.map { heavyComputation(it) } } } ``` ### 4. Prefer Flow for Streams ```kotlin // ✅ Good: Use Flow for data streams fun observeUsers(): Flow<List<User>> = userDao.observeAll() .map { users -> users.sortedBy { it.name } } .flowOn(Dispatchers.Default) // ❌ Bad: Don't use callbacks for streams fun observeUsersBad(callback: (List<User>) -> Unit) { // Callback-based approach is harder to manage } ``` ## Common Patterns ### 1. Resource Management ```kotlin class DatabaseConnection : Closeable { suspend fun useConnection() = coroutineScope { use { connection -> // Connection is automatically closed when scope exits val result = connection.query("SELECT * FROM users") result.process() } } override fun close() { // Cleanup resources } } ``` ### 2. Timeout Handling ```kotlin suspend fun fetchDataWithTimeout(): String = withTimeout(5000) { // Operation must complete within 5 seconds apiService.fetchData() } suspend fun fetchDataWithTimeoutOrNull(): String? = withTimeoutOrNull(5000) { // Returns null if timeout occurs apiService.fetchData() } ``` ### 3. Exception Handling ```kotlin suspend fun robustDataFetch(): Result<Data> = try { val data = withContext(Dispatchers.IO) { apiService.fetchData() } Result.success(data) } catch (e: Exception) { when (e) { is NetworkException -> Result.failure(e) is TimeoutException -> Result.failure(e) else -> throw e // Re-throw unexpected exceptions } } ``` ### 4. Combining Multiple Sources ```kotlin fun combinedUserData(userId: String): Flow<UserData> = combine( userRepository.getUser(userId), preferencesRepository.getPreferences(userId), activityRepository.getActivity(userId) ) { user, preferences, activity -> UserData(user, preferences, activity) } ``` ## Conclusion Kotlin Coroutines and Structured Concurrency provide a powerful foundation for asynchronous programming. By following these patterns and best practices, you can write efficient, maintainable, and robust concurrent code. Key takeaways: - Always use structured concurrency with proper scopes - Choose appropriate dispatchers for different types of work - Handle cancellation and errors gracefully - Use Flow for reactive data streams - Prefer coroutines over callbacks and threads Remember: Coroutines make asynchronous code look synchronous while maintaining all the benefits of non-blocking operations.