changed 2 years ago
Published Linked with GitHub

Swift Concurrency Part 2


大綱(1/2)

  • Unstructured Task
    • Task
    • Cancellation
      • Task Cancellation
      • Cancellation handlers
    • Task Piority
    • Attached Task
    • Executor
    • UnsafeCurrentTask

大綱(2/2)

  • Structured Task
    • TaskGroup
    • async let
    • Child Task

Task


Concurrency 中最基礎的單元

Task { // do something... }

所有 異步函數 皆執行在 Task

// Task { await something() // }

Task State

digraph Task {
    rankdir = LR;
	suspended -> running [label = "schedule"]
    running -> completed [label = "return/throw"]
    running -> suspended [label = "await"]
}

Task State 實際

await 是潛在暫停點

digraph Task {
    rankdir = LR;
	suspended -> running [label = "schedule"]
    running -> completed [label = "return/throw"]
    running -> suspended [label = "await"]
    running -> running [label = "await"]
}

同步函數

func main() { all() } func all() { a1() a2() a3() }
gantt
    title all
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Main Thread
    a2               :a2, after a1, 2s
    a3               :a3, after a2, 2s
    a1               :a1, 2000-01-01 00, 2s
    all              :all, 2000-01-01 00, 6s
    main             :main, 2000-01-01 00, 6s 

call stack


異步函數1

func all() async { a1() await a2() a3() }

異步函數1-1

可在任意 thread 切換執行(executor)

  • Task: all
  • Job: a1 a2 a3
gantt
    title all
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Global Executor1
    a1               :a1, 2000-01-01 00, 2s
    
    section Global Executor2
    a2               :a2, after a1, 2s
    
    section Global Executor3
    a3               :a3, after a2, 2s

異步函數1-2

await 是潛在暫停點,也有機會不暫停

  • Task: all
  • Job: all

gantt
    title all
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Global Executor
    all              :2000-01-01 00, 6s

異步函數2(with actor)

actor A { let b = B() func all() async { a1() await b.a2() a3() } } actor B { func a2() {} }

每個 actor 有各自的 executor

gantt
    title all
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section A
    a1               :a1, 2000-01-01 00, 2s
    a3               :a3, after a2, 2s
    
    section B
    a2               :a2, after a1, 2s

Task is Cooperative

Task Cancellation


Task Cancellation

只標記 Task 為取消

task.cancel() // task.isCancelled = true

Task { while !Task.isCancelled { // your works } return }

func doSomething() async { while !Task.isCancelled { // your works } return }

Task { try Task.checkCancellation() }

上次的小結

cancel 只能停止

  • 尚未開始的 Task
  • 有支援取消流程的 Task

Default Executor

預設情況下

Task 排程到 default global concurrent executor


Executor


  • Executor 是一個服務(service)
    • 接收提交過來的 Jobs 並且 調度 Thread 來執行 Job
    • 系統假設 executor 是可靠且執行 Job 時永不失敗

  • Executor 執行時是獨佔的(exclusive),表示同時多個 job 提交時,並不會併發執行
  • 不應該依照提交順序執行
    • 實際上應以, 優先權 > 提交順序(submission order)

  • Swift 提供預設 Executor 實作
actor SomeActor {}

  • actor 以及 global actor 可替換其實作
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol Actor : AnyObject, Sendable { nonisolated var unownedExecutor: UnownedSerialExecutor { get } } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol GlobalActor { // ... static var sharedUnownedExecutor: UnownedSerialExecutor { get } }

Cancellation handlers


一般 Task cancel

需要自行實作 cancel 流程


TaskCancellationHandler

在 Task 被取消時,立即呼叫 cancel handler


func download(url: URL) async throws -> Data? { var urlSessionTask: URLSessionTask? return try withTaskCancellationHandler { return try await withUnsafeThrowingContinuation { continuation in urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in if let error = error { // Ideally translate NSURLErrorCancelled to CancellationError here continuation.resume(throwing: error) } else { continuation.resume(returning: data) } } urlSessionTask?.resume() } } onCancel: { urlSessionTask?.cancel() // runs immediately when cancelled } }

Alamofire + Rx

public var data: Observable<Data> { return Observable.create { observer -> Disposable in self.dataRequest.responseData { (res) in do { try observer.onNext(res.result.get()) observer.onCompleted() } catch { observer.onError(error) } return Disposables.create { self.dataRequest.cancel() } } } }

Alamofire + Continuation + TaskCancellationHandler

public var data: Data { get async throws { return try withTaskCancellationHandler { return try await withCheckedThrowingContinuation { continuation in self.dataRequest.responseData { (res) in do { let value = try res.result.get() continuation.resume(returning: value) } catch { continuation.resume(throwing: error) } } } } onCancel: { self.dataRequest.cancel() } } }

Attached Task

Task { // ... }

Context inheritance

  • 跑在 Task
  • 沒跑在 Task

跑在 Task

  • 繼承 TaskPriority
func xxx() async { Task { // 繼承 xxx `TaskPriority` } }

跑在 Task

  • TaskLocal values
    • copy value to inner task
@TaskLocal static var traceID: TraceID? print("traceID: \(traceID)") // traceID: nil $traceID.withValue(1234) { // bind the value print("traceID: \(traceID)") // traceID: 1234 }

跑在 Task & 跑在 Actor

actor One { var counter = 1 // mutable properties var notSendable = NotSendable() // non-sendable values func xxx() { Task { // 繼承 actor 的 Execution Context // 此 task 排程到 actor's executer // closure 變成 `actor-isolated` // 可以存取 actor 的私有狀態(`actor-isolated` state) // mutable properties // non-sendable values counter += 1 notSendable.xxx() } } }

沒跑在 Task

func xxx() { // Main Thread Task { // 諮詢 `runtime` 並推導出最可能的 `TaskPriority` // 例如問 `current thread priority` // 排程到 `global concurrent executor` } }

Actor context propagation

傳遞給 Task init 的閉包將隱式繼承 actor execution context 和形成閉包的上下文的隔離

func notOnActor(_: @Sendable () async -> Void) { } actor A { func f() { notOnActor { await g() // must call g asynchronously, because it's a @Sendable closure } Task { g() // okay to call g synchronously, even though it's @Sendable } } func g() { } }

Implicit "self"

  • 要求使用 self 的意圖,是為了提醒開發者 capture self 會有潛在 reference cycle
  • 傳遞給 Task init 的閉包會被立刻執行,而且只會在內部 reference,因此 explicit self
    • 沒有傳達有效資訊
    • 不需要(not be required)

func acceptEscaping(_: @escaping () -> Void) { } class C { var counter: Int = 0 func f() { acceptEscaping { counter = counter + 1 // error: must use "self." because the closure escapes } Task { counter = counter + 1 // okay: implicit "self" is allowed here } } }

TaskPriority

TaskPriority 是用來協助 Executor 做出 排程決策(scheduling decisions)


List

  • high
  • medium
  • low
  • userInitiated
  • utility
  • background

命名源由

避免使用特定平台相關術語

採用較通俗的術語


C#

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


Thread

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


Operation

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


GCD(OLD)

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


GCD(NEW)

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


各個優先權處理方式,交由以下決定

  • 各個平台
  • specific executor

優先權繼承

  • child 繼承 parent 優先權
  • Detached task 無繼承對象

Task Priority == Executor Priority?

  • Task 優先權 不必匹配同等優先權的 Executor
  • 例子
    • UI Thread: 高優先權 Executor
    • 任意 Task 提交至 UI Thread,在執行期間會以高優先權執行

Priority Escalation

在某些情境下 TaskPirority 必須被提升
以避免 Priority Inversion


Priority inversion

  • Disabling all interrupts to protect critical sections
  • Priority ceiling protocol
  • Priority inheritance
  • Random boosting
  • Avoid blocking

優先級繼承

Priority inheritance


  • 如果一個 Task A 在 Actor 上執行,
  • 同時有更高優先權的 Task B 已被排入 Actor
    這個 Task A 可能暫時會以 高優先權 Task B 同等優先權執行
  • 這並不影響子任務或 reported priority
  • 優先權是 Task 跑在 Thread 的一項參數,而非 Task 本身

  • 如果 Unstructured Task A 建立
  • 然後擁有更高優先權的 Task B 正等待 Task A 完成
  • 此時 Task A 的優先權會永久性的提升至 Task B

Structured Task

Parent - Child Task


Sequential Execute

func chopVegetables() async throws -> [Vegetable] { ... } func marinateMeat() async -> Meat { ... } func preheatOven(temperature: Double) async throws -> Oven { ... } // ... func makeDinner() async throws -> Meal { let veggies = try await chopVegetables() let meat = await marinateMeat() let oven = try await preheatOven(temperature: 350) let dish = Dish(ingredients: [veggies, meat]) return try await oven.cook(dish, duration: .hours(3)) }

gantt
    title Dinner
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section 廚房
    Vegetables       :Vegetables, 2000-01-01 00, 2s
    Meat             :Meat, after Vegetables, 4s
    Oven             :Oven, after Meat, 6s
    Dish			 :Dish, after Oven, 1s
    Dinner           :after Dish, 3s

TaskGroup

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @inlinable public func withTaskGroup<ChildTaskResult, GroupResult>( of childTaskResultType: ChildTaskResult.Type, returning returnType: GroupResult.Type = GroupResult.self, body: (inout TaskGroup<ChildTaskResult>) async -> GroupResult ) async -> GroupResult @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @inlinable public func withThrowingTaskGroup<ChildTaskResult, GroupResult>( of childTaskResultType: ChildTaskResult.Type, returning returnType: GroupResult.Type = GroupResult.self, body: (inout ThrowingTaskGroup<ChildTaskResult, Error>) async throws -> GroupResult ) async rethrows -> GroupResult

Add Child Task

@frozen public struct TaskGroup<ChildTaskResult> { public mutating func addTask( priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> ChildTaskResult ) public mutating func addTaskUnlessCancelled( priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> ChildTaskResult ) -> Bool }

Add Child Task

  • Group 已取消時
// addTaskUnlessCancelled -> false // 則不加入新的 child task

Add Child Task

  • Group 已取消時
    • 還是能加入新 Child Task
      • 因為取消流程

Query

  • 取得順序
    • 不是按加入時的先後次序
    • 先執行完畢的會優先返回
for await value in group { // ... }

// 等待所有 child task public mutating func waitForAll() async // 有沒有未執行(pending) task public var isEmpty: Bool { get }

TaskGroup cancel

// group.isCancelled = true public func cancelAll() public var isCancelled: Bool { get }

Concurrent Execute(TaskGroup)

func makeDinner() async throws -> Meal { // Prepare some variables to receive results from our concurrent child tasks var veggies: [Vegetable]? var meat: Meat? var oven: Oven? enum CookingStep { case veggies([Vegetable]) case meat(Meat) case oven(Oven) } // Create a task group to scope the lifetime of our three child tasks try await withThrowingTaskGroup(of: CookingStep.self) { group in group.addTask { try await .veggies(chopVegetables()) } group.addTask { await .meat(marinateMeat()) } group.addTask { try await .oven(preheatOven(temperature: 350)) } for try await finishedStep in group { switch finishedStep { case .veggies(let v): veggies = v case .meat(let m): meat = m case .oven(let o): oven = o } } } // If execution resumes normally after `withTaskGroup`, then we can assume // that all child tasks added to the group completed successfully. That means // we can confidently force-unwrap the variables containing the child task // results here. let dish = Dish(ingredients: [veggies!, meat!]) return try await oven!.cook(dish, duration: .hours(3)) }

gantt
    title Dinner
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section 廚房
    Vegetables       :Vegetables, 2000-01-01 00, 2s
    Meat             :Meat, 2000-01-01 00, 4s
    Oven             :Oven, 2000-01-01 00, 6s
    
    Dish			 :Dish, after Oven, 1s
    
    Dinner           :after Dish, 3s

async let


let x = await xxx() let y = await yyy()

async let x = xxx() async let y = yyy() await (x, y)

Concurrent Execute(async let)

func chopVegetables() async throws -> [Vegetable] { ... } func marinateMeat() async -> Meat { ... } func preheatOven(temperature: Double) async throws -> Oven { ... } // ... func makeDinner() async throws -> Meal { async let veggies = chopVegetables() async let meat = marinateMeat() async let oven = preheatOven(temperature: 350) let dish = await Dish(ingredients: [veggies, meat]) return try await oven.cook(dish, duration: .hours(3)) }

gantt
    title Dinner
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section 廚房
    Vegetables       :Vegetables, 2000-01-01 00, 2s
    Meat             :Meat, 2000-01-01 00, 4s
    Dish			 :Dish, after Meat, 1s
    Oven             :Oven, 2000-01-01 00, 6s
    Dinner           :after Oven, 3s

問題

go 執行多久?


隱式 await

func go() async { async let f = fast() // 1s async let s = slow() // 3s // implicitly: cancels f // implicitly: cancels s // implicitly: await f // implicitly: await s return "nevermind..." }

async let in closure

不能被 @escaping capture

目前似乎不支援



Error Propagation

隱式: 取消 -> await -> 丟棄 error

func go() async -> String { async let f = fire() // f.cancel() // try? await f return "go" }

Cancellation

Parent Task 被取消,後續建立的 Child Task 也會被標記為 取消狀態


Child Task


Unstructured tasks are not able to utilize some of the optimization techniques wrt.


Child Task 執行範圍的界線

限制在 Parent Task 內


何謂限制在 Parent Task 內

group.add { sleep(1) ;return 1} group.add { sleep(100);return 100} return try await group.next() // return 1
gantt
    title group
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Child
    child1               :c1, 2000-01-01 00, 1s
    child2               :c2, 2000-01-01 00, 10s
    
    section Parent
    Parent               :p1, 2000-01-01 00, 1s

Child Task 執行範圍的界線

Parent Task 必須等待所有 Child Task 執行完畢


X

gantt
    title group
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Child
    child1               :c1, 2000-01-01 00, 1s
    child2               :c2, 2000-01-01 00, 10s
    
    section Parent
    Parent               :p1, 2000-01-01 00, 1s

O

gantt
    title group
    dateFormat YYYY-MM-DD ss
    axisFormat %S

    section Child
    child1               :c1, 2000-01-01 00, 1s
    child2               :c2, 2000-01-01 00, 10s
    
    section Parent
    Parent               :p1, 2000-01-01 00, 10s

簡單範例

group.add { sleep(1) ;return 1} group.add { sleep(100);return 100} // group.waitForAll() return try await group.next() // 結果 // sleep 100 秒 // return 1 // // 等待所有 child task 結束 // 才能 return

Child Task 界線(Demo)


錯誤傳遞

Child Task 可以將錯誤往上拋給 Parent Task


錯誤傳遞

  • Parent Task handle error
    • do catch
    • try 拋出 error
      • 觸發 cancel all
      • 觸發結束(等待所有 Child Task)

錯誤傳遞(Demo)


很重要

cancel task,只有支援你有寫取消流程

cancel task,只有支援你有寫取消流程

cancel task,只有支援你有寫取消流程


沒有取消流程的 Task

無法被 cancel


async let

vs

TaskGroup


async let 無法對 array 使用

for i in items.indices { group.async { (i, await f(items[i])) } } async let f0 = f(items[0]) async let f1 = f(items[1])

TaskGroup 可按照 completion order 取得結果


async let return 時

會隱式 cancel


Reference


致謝

stevapple 校稿, 流程等等建議

Select a repo