# Kotlin Coroutines and Structured Concurrency Tutorial
## Table of Contents
1. [Introduction](#Introduction)
2. [Coroutines Fundamentals](#Coroutines-Fundamentals)
3. [Structured Concurrency](#Structured-Concurrency)
4. [Workflow and Data Flow](#Workflow-and-Data-Flow)
5. [Real-World Examples](#Real-World-Examples)
6. [Best Practices](#Best-Practices)
7. [Common Patterns](#Common-Patterns)
## Introduction
Kotlin Coroutines provide a powerful way to handle asynchronous programming with a sequential coding style. They enable you to write non-blocking code that looks and feels like synchronous code, making it easier to reason about and maintain.
### Key Benefits
- **Lightweight**: Coroutines are much lighter than threads
- **Sequential Code**: Write asynchronous code that looks synchronous
- **Structured Concurrency**: Automatic lifecycle management and error handling
- **Cancellation**: Built-in cancellation support
- **Exception Handling**: Proper exception propagation
## Coroutines Fundamentals
### What are Coroutines?
Coroutines are lightweight threads that can be suspended and resumed without blocking the underlying thread. They're perfect for I/O operations, network calls, and other asynchronous tasks.
```kotlin
import kotlinx.coroutines.*
suspend fun fetchUserData(userId: String): User {
delay(1000) // Simulates network call
return User(userId, "John Doe")
}
fun main() = runBlocking {
val user = fetchUserData("123")
println("User: ${user.name}")
}
```
### Coroutine Builders
#### 1. runBlocking
Used to bridge blocking and non-blocking code, typically in main functions or tests.
```kotlin
fun main() = runBlocking {
println("Starting...")
delay(1000)
println("Done!")
}
```
#### 2. launch
Fire-and-forget coroutine that returns a Job.
```kotlin
fun main() = runBlocking {
val job = launch {
delay(1000)
println("Background task completed")
}
println("Main thread continues...")
job.join() // Wait for completion
}
```
#### 3. async
Returns a Deferred that can be awaited for a result.
```kotlin
fun main() = runBlocking {
val deferred = async {
delay(1000)
"Result from async"
}
println("Waiting for result...")
val result = deferred.await()
println(result)
}
```
### Coroutine Context and Dispatchers
Dispatchers determine which thread or thread pool the coroutine runs on.
```kotlin
import kotlinx.coroutines.*
fun main() = runBlocking {
// Main dispatcher (UI thread in Android)
launch(Dispatchers.Main) {
// Update UI
}
// IO dispatcher for network/disk operations
launch(Dispatchers.IO) {
// Network call or file I/O
}
// Default dispatcher for CPU-intensive work
launch(Dispatchers.Default) {
// Heavy computation
}
// Unconfined dispatcher
launch(Dispatchers.Unconfined) {
// Runs in caller thread until first suspension
}
}
```
## Structured Concurrency
Structured concurrency ensures that coroutines are properly managed within a defined scope, providing automatic cancellation and error handling.
### Coroutine Scope
Every coroutine runs within a scope that defines its lifecycle.
```kotlin
class UserRepository {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun fetchUsers(): Flow<List<User>> = flow {
while (true) {
val users = loadUsersFromApi()
emit(users)
delay(30000) // Refresh every 30 seconds
}
}.flowOn(Dispatchers.IO)
fun cleanup() {
scope.cancel() // Cancels all child coroutines
}
}
```
### Supervision
#### SupervisorJob
Failures in child coroutines don't cancel the parent or siblings.
```kotlin
fun main() = runBlocking {
val supervisor = SupervisorJob()
val scope = CoroutineScope(Dispatchers.Default + supervisor)
scope.launch {
// This failure won't cancel other children
throw Exception("Child 1 failed")
}
scope.launch {
delay(2000)
println("Child 2 completed successfully")
}
delay(3000)
}
```
#### supervisorScope
Creates a supervised scope for handling failures.
```kotlin
suspend fun processData() = supervisorScope {
val job1 = launch { processChunk1() }
val job2 = launch { processChunk2() }
val job3 = launch { processChunk3() }
// If any job fails, others continue
joinAll(job1, job2, job3)
}
```
## Workflow and Data Flow
### Sequential vs Concurrent Execution
#### Sequential Execution
```
Time →
┌─────────┐┌─────────┐┌─────────┐
│ Task A ││ Task B ││ Task C │
└─────────┘└─────────┘└─────────┘
```
```kotlin
suspend fun sequentialExecution() {
val resultA = fetchDataA() // 1 second
val resultB = fetchDataB() // 1 second
val resultC = fetchDataC() // 1 second
// Total: 3 seconds
}
```
#### Concurrent Execution
```
Time →
┌─────────┐
│ Task A │
├─────────┤
│ Task B │
├─────────┤
│ Task C │
└─────────┘
```
```kotlin
suspend fun concurrentExecution() = coroutineScope {
val deferredA = async { fetchDataA() }
val deferredB = async { fetchDataB() }
val deferredC = async { fetchDataC() }
val resultA = deferredA.await()
val resultB = deferredB.await()
val resultC = deferredC.await()
// Total: 1 second (all run in parallel)
}
```
### Flow - Reactive Streams
Flow represents a stream of data that can be processed asynchronously.
#### Data Flow Pipeline
```
Data Source → Transform → Filter → Collect
↓ ↓ ↓ ↓
Network map { } filter { } collect { }
```
```kotlin
fun userDataFlow(): Flow<User> = flow {
val userIds = listOf("1", "2", "3", "4", "5")
userIds.forEach { id ->
emit(fetchUser(id))
delay(100) // Simulate rate limiting
}
}
suspend fun processUsers() {
userDataFlow()
.filter { it.isActive }
.map { it.toDisplayModel() }
.flowOn(Dispatchers.IO)
.collect { user ->
println("Processing: ${user.name}")
}
}
```
### Error Handling Flow
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Parent │───▶│ Child │───▶│ Grandchild │
│ Scope │ │ Coroutine │ │ Coroutine │
└─────────────┘ └──────────────┘ └─────────────┘
▲ │ │
│ ▼ ▼
└───── Exception ────┘ [Cancelled]
Propagation
```
## Real-World Examples
### 1. Android Network Repository
```kotlin
class WeatherRepository(
private val apiService: WeatherApiService,
private val database: WeatherDatabase
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun getWeatherUpdates(city: String): Flow<Weather> = flow {
// Emit cached data first
val cached = database.getWeather(city)
if (cached != null) {
emit(cached)
}
// Then fetch fresh data
try {
val fresh = apiService.getWeather(city)
database.saveWeather(fresh)
emit(fresh)
} catch (e: Exception) {
if (cached == null) throw e
// Keep using cached data if network fails
}
}.flowOn(Dispatchers.IO)
suspend fun refreshWeather(city: String) = withContext(Dispatchers.IO) {
try {
val weather = apiService.getWeather(city)
database.saveWeather(weather)
} catch (e: NetworkException) {
// Handle network errors
throw WeatherFetchException("Failed to refresh weather", e)
}
}
}
```
### 2. Parallel Data Processing
```kotlin
class DataProcessor {
suspend fun processLargeDataset(data: List<DataItem>): List<ProcessedItem> =
coroutineScope {
// Split data into chunks for parallel processing
val chunks = data.chunked(100)
chunks.map { chunk ->
async(Dispatchers.Default) {
chunk.map { item ->
processItem(item) // CPU-intensive operation
}
}
}.awaitAll().flatten()
}
private suspend fun processItem(item: DataItem): ProcessedItem {
// Simulate heavy computation
delay(10)
return ProcessedItem(
id = item.id,
result = item.value * 2,
timestamp = System.currentTimeMillis()
)
}
}
```
### 3. Rate-Limited API Client
```kotlin
class RateLimitedApiClient {
private val semaphore = Semaphore(5) // Max 5 concurrent requests
private val rateLimiter = Channel<Unit>(capacity = 10)
init {
// Fill rate limiter with permits every second
CoroutineScope(Dispatchers.Default).launch {
while (true) {
repeat(10) { rateLimiter.trySend(Unit) }
delay(1000)
}
}
}
suspend fun makeRequest(url: String): ApiResponse =
semaphore.withPermit {
rateLimiter.receive() // Wait for rate limit permit
withContext(Dispatchers.IO) {
// Make actual HTTP request
httpClient.get(url)
}
}
}
```
### 4. Event-Driven System
```kotlin
class EventProcessor {
private val eventChannel = Channel<Event>(Channel.UNLIMITED)
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
init {
// Start event processing
scope.launch {
eventChannel.consumeAsFlow()
.buffer(capacity = 100)
.collect { event ->
processEvent(event)
}
}
}
fun sendEvent(event: Event) {
eventChannel.trySend(event)
}
private suspend fun processEvent(event: Event) {
when (event) {
is UserLoginEvent -> handleUserLogin(event)
is OrderCreatedEvent -> handleOrderCreated(event)
is PaymentProcessedEvent -> handlePaymentProcessed(event)
}
}
}
```
### 5. WebSocket Connection Manager
```kotlin
class WebSocketManager {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val messageFlow = MutableSharedFlow<String>()
private var connection: WebSocket? = null
fun connect(url: String): Flow<ConnectionState> = flow {
emit(ConnectionState.Connecting)
try {
connection = WebSocket.connect(url) { message ->
messageFlow.tryEmit(message)
}
emit(ConnectionState.Connected)
// Keep connection alive
while (connection?.isActive == true) {
delay(30000)
connection?.ping()
}
} catch (e: Exception) {
emit(ConnectionState.Error(e))
} finally {
emit(ConnectionState.Disconnected)
}
}.flowOn(Dispatchers.IO)
fun sendMessage(message: String) {
scope.launch {
connection?.send(message)
}
}
fun observeMessages(): Flow<String> = messageFlow.asSharedFlow()
fun disconnect() {
scope.cancel()
connection?.close()
}
}
```
### 6. Background Job Scheduler
```kotlin
class JobScheduler {
private val jobQueue = Channel<Job>(Channel.UNLIMITED)
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
init {
// Start job processing workers
repeat(3) { workerId ->
scope.launch {
for (job in jobQueue) {
try {
processJob(job, workerId)
} catch (e: Exception) {
handleJobFailure(job, e)
}
}
}
}
}
fun scheduleJob(job: Job) {
jobQueue.trySend(job)
}
private suspend fun processJob(job: Job, workerId: Int) {
println("Worker $workerId processing job ${job.id}")
when (job.type) {
JobType.EMAIL -> sendEmail(job.data)
JobType.IMAGE_PROCESSING -> processImage(job.data)
JobType.REPORT_GENERATION -> generateReport(job.data)
}
println("Worker $workerId completed job ${job.id}")
}
}
```
## Best Practices
### 1. Use Structured Concurrency
```kotlin
// ✅ Good: Use coroutineScope for structured concurrency
suspend fun loadUserProfile(userId: String) = coroutineScope {
val userDeferred = async { userService.getUser(userId) }
val postsDeferred = async { postService.getUserPosts(userId) }
val friendsDeferred = async { friendService.getUserFriends(userId) }
UserProfile(
user = userDeferred.await(),
posts = postsDeferred.await(),
friends = friendsDeferred.await()
)
}
// ❌ Bad: GlobalScope can lead to memory leaks
fun loadUserProfileBad(userId: String) {
GlobalScope.launch {
// This coroutine may outlive the calling component
}
}
```
### 2. Handle Cancellation Properly
```kotlin
suspend fun longRunningTask() {
repeat(1000) { i ->
ensureActive() // Check if cancelled
// Do some work
processItem(i)
delay(100) // Cancellation point
}
}
```
### 3. Use Appropriate Dispatchers
```kotlin
class UserService {
// ✅ Good: Use IO dispatcher for network calls
suspend fun fetchUser(id: String): User = withContext(Dispatchers.IO) {
apiClient.getUser(id)
}
// ✅ Good: Use Default dispatcher for CPU work
suspend fun processUserData(data: List<User>): ProcessedData =
withContext(Dispatchers.Default) {
data.map { heavyComputation(it) }
}
}
```
### 4. Prefer Flow for Streams
```kotlin
// ✅ Good: Use Flow for data streams
fun observeUsers(): Flow<List<User>> = userDao.observeAll()
.map { users -> users.sortedBy { it.name } }
.flowOn(Dispatchers.Default)
// ❌ Bad: Don't use callbacks for streams
fun observeUsersBad(callback: (List<User>) -> Unit) {
// Callback-based approach is harder to manage
}
```
## Common Patterns
### 1. Resource Management
```kotlin
class DatabaseConnection : Closeable {
suspend fun useConnection() = coroutineScope {
use { connection ->
// Connection is automatically closed when scope exits
val result = connection.query("SELECT * FROM users")
result.process()
}
}
override fun close() {
// Cleanup resources
}
}
```
### 2. Timeout Handling
```kotlin
suspend fun fetchDataWithTimeout(): String = withTimeout(5000) {
// Operation must complete within 5 seconds
apiService.fetchData()
}
suspend fun fetchDataWithTimeoutOrNull(): String? = withTimeoutOrNull(5000) {
// Returns null if timeout occurs
apiService.fetchData()
}
```
### 3. Exception Handling
```kotlin
suspend fun robustDataFetch(): Result<Data> = try {
val data = withContext(Dispatchers.IO) {
apiService.fetchData()
}
Result.success(data)
} catch (e: Exception) {
when (e) {
is NetworkException -> Result.failure(e)
is TimeoutException -> Result.failure(e)
else -> throw e // Re-throw unexpected exceptions
}
}
```
### 4. Combining Multiple Sources
```kotlin
fun combinedUserData(userId: String): Flow<UserData> =
combine(
userRepository.getUser(userId),
preferencesRepository.getPreferences(userId),
activityRepository.getActivity(userId)
) { user, preferences, activity ->
UserData(user, preferences, activity)
}
```
## Conclusion
Kotlin Coroutines and Structured Concurrency provide a powerful foundation for asynchronous programming. By following these patterns and best practices, you can write efficient, maintainable, and robust concurrent code.
Key takeaways:
- Always use structured concurrency with proper scopes
- Choose appropriate dispatchers for different types of work
- Handle cancellation and errors gracefully
- Use Flow for reactive data streams
- Prefer coroutines over callbacks and threads
Remember: Coroutines make asynchronous code look synchronous while maintaining all the benefits of non-blocking operations.