changed 3 years ago
Published Linked with GitHub

Swift Concurrency Part 1


Swift Concurrency Manifesto


Callback hell

俗稱 金字塔(波動拳)

func processImageData1(completionBlock: (result: Image) -> Void) { loadWebResource("dataprofile.txt") { dataResource in loadWebResource("imagedata.dat") { imageResource in decodeImage(dataResource, imageResource) { imageTmp in dewarpAndCleanupImage(imageTmp) { imageResult in completionBlock(imageResult) } } } } }

Term


async/await


Continuations


Task

  • Unstructured Task
  • cancellation
  • Atached/Detached Task
    • Task {}
    • Task.detached {}

TaskGroup

  • Parent/Child Task
  • Structured Task
    • async let
    • TaskGroup

Actor

  • actor
  • Sendable
    • @unchecked
    • @Sendable
  • isolated/nonisolated

GlobalActor

  • @globalActor
    • @MainActor

TaskLocal


AsyncSequence


AsyncStream


Job 相關

  • Excutor
  • SerialExecutor
  • UnownedSerialExecutor
  • UnownedJob

Concurrency vs. Parallelism

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


Concurrency

Making progress on more than one task seemingly at the same time.


Concurrency


Parallel Execution

Making progress on more than one task at the exact same time.


Parallel Execution


async/await


宣告

func download(_ url: URL) async throws -> Data {} func decode(_ data: Data) async throws -> UIImage {}

一般使用方式

在 async func 內使用

func your_function() async throws -> UIImage { let data = try await download(url) let image = try await decode(data) return image }

在 sync func 內使用

透過 Task

func your_function_task() -> Task<UIImage, Error> { let task = Task { let data = try await download(url) let image = try await decode(data) // image ... return image } return taks }

get async throws

protocol RemoteData { var data: Data { get async throws } } struct Resouce: RemoteData { let url: URL var data: Data { get async throws { try await URLSession.shared.data(from: url).0 } } }

closure

() async -> Void

隱式 closure

// () async -> Int
let closure = { await getInt() } // implicitly async

對現有 api 的衝擊

// old api func doSomething(completionHandler: ((String) -> Void)? = nil) { ... } // new api func doSomething() async -> String { ... }

對現有 api 的衝擊(rule)

如果在 同步 預設優先呼叫 同步

如果在 非同步 預設優先呼叫 非同步


對現有 api 的衝擊(rule)

func xxx() { // old api // doSomething(completionHandler: nil) doSomething() } func xxx() async { // new api let str: String = await doSomething() }

自動隱式轉換

var syncNonThrowing: () -> Void = {} var asyncNonThrowing: () async -> Void = {} asyncNonThrowing = syncNonThrowing

可以用同步函數替代非同步

protocol Asynchronous { func f() async } struct S2: Asynchronous { func f() { } // okay, synchronous function satisfying async requirement }

Test

class AsyncTests: XCTestCase {
    func testCallback() throws {
        let expectation = XCTestExpectation (description: "download completion")
        download(url) { result, error in
            XCTAssertEqual(result, Data())
            expectation.fulfill()
        }
        wait(for: [expectation], timeout: 5.0)
    }

    func testAsync() async throws {
        let result = try await download(url)
        XCTAssertEqual(result, Data())
    }
}

限制

不能在 defer 內使用 await

defer { await xxx() // x }

限制

@autoclosure sync closure 只能放在 async func

// x func computeArgumentLater<T> (_ fn: @escaping @autoclosure () async -> T) { } // o func computeArgumentLater<T> (_ fn: @escaping @autoclosure () async -> T) async { }

限制

同步函數 不能呼叫 異步函數


同步函數 vs 異步函數


同步函數

sync func

func xxx() {}

異步函數

async func

func xxx() async {}

同步函數

  • 執行過程中,只能在同一個 thread 執行
  • call stack

同步函數

func a() { // in main thread b() // in main thread } func b() { // in main thread }

異步函數

  • 執行過程中,可在任意 thread 切換執行
  • 獨立存儲

異步函數

func a() async { // in thread 1 await b() // in thread 2 await c() // in thread 3 } func b() async { // in thread 4 } func c() async { // in thread 5 }

暫停點(Suspension point)

  • 只會出現在 顯試操作
  • 潛在暫停點
    • await
  • 暫停會中斷 原子性
  • 讓異步函數離開其 thread

暫停點

func a() async { // in thread 1 await b() // 暫停點 // in thread 2 await c() // 暫停點 // in thread 3 } func b() async { // in thread 4 // return 暫停點 } func c() async { // in thread 5 // return 暫停點 }

CheckedContinuation


@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public func withCheckedContinuation<T>( function: String = #function, _ body: (CheckedContinuation<T, Never>) -> Void ) async -> T @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public struct CheckedContinuation<T, E> where E : Error {}

Converting closure-based code into async/await in Swift


async code

func downloadMetadata(for id: Int) async throws -> ImageMetadata { let metadataUrl = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part1/\(id).json")! let metadataRequest = URLRequest(url: metadataUrl) let (data, metadataResponse) = try await URLSession.shared.data(for: metadataRequest) guard (metadataResponse as? HTTPURLResponse)?.statusCode == 200 else { throw ImageDownloadError.invalidMetadata } return try JSONDecoder().decode(ImageMetadata.self, from: data) }

callback code

let metadataUrl = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part1/\(imageNumber).json")! let metadataTask = URLSession.shared.dataTask(with: metadataUrl) { data, response, error in guard let data = data, let metadata = try? JSONDecoder().decode(ImageMetadata.self, from: data), (response as? HTTPURLResponse)?.statusCode == 200 else { completionHandler(nil, ImageDownloadError.invalidMetadata) return } let detailedImage = DetailedImage(image: image, metadata: metadata) completionHandler(detailedImage, nil) } metadataTask.resume()

Continuations code

guard let data = data, let metadata = try? JSONDecoder().decode(ImageMetadata.self, from: data), (response as? HTTPURLResponse)?.statusCode == 200 else { completionHandler(nil, ImageDownloadError.invalidMetadata) return } let detailedImage = DetailedImage(image: image, metadata: metadata) completionHandler(detailedImage, nil)

Explicit continuations

  • withCheckedContinuation
    • CheckedContinuation<T, Never>
  • withCheckedThrowingContinuation
    • CheckedContinuation<T, Error>

使用 Continuation 封裝 callback code

func downloadImageAndMetadata(imageNumber: Int) async throws -> DetailedImage { return try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<DetailedImage, Error>) in downloadImageAndMetadata(imageNumber: imageNumber) { image, error in if let image = image { continuation.resume(returning: image) } else { continuation.resume(throwing: error!) } } }) }

注意事項

  • 請確保 value/error 有被呼叫過一次
    • continuation.resume(returning: value)
    • continuation.resume(throwing: error)
  • 呼叫兩次以上屬於未定義行為
  • 未呼叫的話會讓 task 永遠停住

Rx VS Continuation


Rx

public var data: Observable<Data> { return Observable.create { observer -> Disposable in self.dataRequest.responseData { (res) in do { try observer.onNext(res.result.get()) observer.onCompleted() } catch { observer.onError(error) } } return Disposables.create { self.dataRequest.cancel() } } }
data.subscribe(onNext: { data in // ... })

Continuation

public var data: Data { get async throws { return try await withCheckedThrowingContinuation { continuation in self.dataRequest.responseData { (res) in do { let value = try res.result.get() continuation.resume(returning: value) } catch { continuation.resume(throwing: error) } } } } }
let value = try await data

Demo


UnsafeContinuation

@frozen struct UnsafeContinuation<T, E> where E : Error func withUnsafeContinuation<T>(_ fn: (UnsafeContinuation<T, Never>) -> Void) async -> T

CheckedContinuation 擁有一樣的介面

可直接切換

無 run time check


Task


@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @frozen public struct Task<Success, Failure> : Sendable where Failure : Error { } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension Task where Failure == Never { public init( priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success ) }

A unit of asynchronous work.
一個非同步工作的單元


When you create an instance of Task,
you provide a closure that contains the work for that task to perform.
當你建立 Task 實體時,你需要提供 closure 以供 task 執行

let task = Task { return 1 }

Tasks can start running immediately after creation;
任務可以在創建後立即開始運行


you don't explicitly start or schedule them.
您不用明確啟動或安排它們。


After creating a task, you use the instance to interact with it
for example, to wait for it to complete or to cancel it.
創建任務後,您使用實例與其交互
例如,等待它完成或取消它。


等待它完成

await task.value // or try await task.value await task.result

取消它

task.cancel()

It's not a programming error to discard a reference to a task
without waiting for that task to finish or canceling it.
丟棄對任務的引用不是編程錯誤
無需等待該任務完成或取消它。


A task runs regardless of whether you keep a reference to it.
無論您是否保留對它的引用,任務都會運行。


However, if you discard the reference to a task,
you give up the ability
to wait for that task's result or cancel the task.
但是,如果您放棄持有 task
你會失去等待該任務的結果或取消該任務之能力


Only code that's running as part of the task can interact with that task.
To interact with the current task,
you call one of the static methods on Task.
只有運行在 task 內的 code
可以用 static method當前 task 互動

Task { Task.isCancelled Task.currentPriority try Task.checkCancellation() await Task.yield() try await Task.sleep(nanoseconds: UInt64) }

Task Cancellation


Tasks include a shared mechanism for indicating cancellation,
but not a shared implementation for how to handle cancellation.
任務包括指示取消的共享機制,但不包括如何處理取消的共享實現。


Depending on the work you're doing in the task,
the correct way to stop that work varies.
根據您在任務中所做的工作,停止該工作的正確方法會有所不同。


Likewise,
it's the responsibility of the code running as part of the task
to check for cancellation whenever stopping is appropriate.
同樣,作為任務的一部分運行的代碼有責任在適當的停止時檢查取消。


In a long-task that includes multiple pieces,
you might need to check for cancellation at several points,
and handle cancellation differently at each point.
在包含多個部分的長任務中,您可能需要在多個點檢查取消,並在每個點以不同的方式處理取消。

Task { while !Task.isCancelled { // Long run } }

If you only need to throw an error to stop the work,
call the Task.checkCancellation() function to check for cancellation.

如果只需要拋出一個錯誤來停止工作,調用Task.checkCancellation()函數來檢查是否取消。

Task { try Task.checkCancellation() }

Other responses to cancellation include
returning the work completed so far, returning an empty result, or returning nil.

對取消的其他響應包括返回到目前為止已完成的工作、返回空結果或返回“nil”。

Task { if Task.isCancelled { return nil } // return ... }

Cancellation is a purely Boolean state;
there's no way to include additional information
like the reason for cancellation.
取消是一個純粹的 bool 狀態; 無法包含其他信息,例如取消原因。


This reflects the fact that a task can be canceled for many reasons,
and additional reasons can accrue during the cancellation process.
這反映了一個任務可以由於多種原因被取消的事實,並且在取消過程中可能會產生其他原因。


Demo


Job


A task's execution can be seen as a series of periods where the task ran.
Each such period ends at a suspension point or the
completion of the task.


運行中 task 可以被視為一連串的執行週期(series of periods)
這些週期的結束點位於

  • task 暫停點
  • 完成 task

These periods of execution are represented by instances of PartialAsyncTask.
Unless you're implementing a custom executor,
you don't directly interact with partial tasks.

這些執行週期可以 PartialAsyncTask 的實例呈現
除非你要實作 custom executor
否則你不必直接跟它做互動


UnownedJob

PartialAsyncTask 已改名為 UnownedJob


Unstructured Concurrency

Unlike tasks that are part of a task group, an unstructured task doesn’t have a parent task.


Attached Task

To create an unstructured task that runs on the current actor, call the Task.init(priority:operation:) initializer.

Task {}

Detached Task

To create an unstructured task that’s not part of the current actor, known more specifically as a detached task, call the Task.detached(priority:operation:) class method.

Task.detached {}

Task 小結


Task 建立後會自行執行


cancel 只能停止

  • 尚未開始的 Task
  • 有支援取消流程的 Task

Task 的 cancel 流程必須自行掌控


一個 Task 可以拆分成多個 UnownedJob


Actor Model

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


信箱


擁有復數地址


地址 != ID


沒有 ID


允許操作

  • 建立 子 actor
  • 傳送訊息(message)
  • 決定如何處理下一條訊息
    • FIFO 處理訊息

ISOLATED

  • 擁有私有狀態
    • 不共享記憶體

各語言實作方式不同


Swift Actor

actor MyActor {}

protocol Actor

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol Actor : AnyObject, Sendable { nonisolated var unownedExecutor: UnownedSerialExecutor { get } }

UnownedSerialExecutor

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @frozen public struct UnownedSerialExecutor { @inlinable public init<E>(ordinary executor: E) where E : SerialExecutor }

SerialExecutor

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol SerialExecutor : Executor { func enqueue(_ job: UnownedJob) func asUnownedSerialExecutor() -> UnownedSerialExecutor }

UnownedJob

@available(SwiftStdlib 5.1, *) @frozen public struct UnownedJob: Sendable { private var context: Builtin.Job @_alwaysEmitIntoClient @inlinable public func _runSynchronously( on executor: UnownedSerialExecutor ) { _swiftJobRun(self, executor) } }

UnownedJob

@available(SwiftStdlib 5.1, *) @_silgen_name("swift_job_run") @usableFromInline internal func _swiftJobRun( _ job: UnownedJob, _ executor: UnownedSerialExecutor ) -> ()

Actor reentrancy

  • Actor 的 isolated functions 是可重入的(reentrant).
  • 當 Actor 的 isolated functions 暫停時, 重入性會允許在原 isolated function 恢復之前,並在 actor 先執行其他工作,稱之 interleaving.

Actor reentrancy

預期是 before before after after


Actor reentrancy(預期)


結論(坑)

async func 有機會排進 actor's queue


Actor 猜想


Thread

  • 擁有唯一一個 Thread?
  • 擁有可隨時切換 Thread?
    • async/await

Thread

func action() async { print("test") } actor Tester1 { func action() { print("test") } } actor Tester2 { func action() { print("test") } } Task.detached { await action() await Tester1().action() await Tester2().action() exit(0) } RunLoop.main.run()

Thread


Thread




Thread

Thread 2 Queue : com.apple.root.default-qos.cooperative (concurrent)


Queue

Message Queue?


Queue

Task Queue?


Queue

Job(UnownedJob) Queue


Executor

  • UnownedSerialExecutor
  • 以 FIFO 方式,在 Thread 執行 Job

問題 No.1

import Foundation func process(_ text: String) { print("\(Date()) \(text) 1") sleep(2) print("\(Date()) \(text) 2") sleep(2) print("\(Date()) \(text) 3") } actor Test { var counter = 0 func ver1_1() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { process("a") } let b: Task<Void, Never> = Task { process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } func ver1_2() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { _ = counter process("a") } let b: Task<Void, Never> = Task { _ = counter process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } } @MainActor func main() async { let test = Test() await test.ver1_1();print("") await test.ver1_2();print("") exit(EXIT_SUCCESS) } Task.detached { await main() } RunLoop.main.run()

問題 No.2

import Foundation func process(_ text: String) { print("\(Date()) \(text) 1") sleep(2) print("\(Date()) \(text) 2") sleep(2) print("\(Date()) \(text) 3") } func processAsync() async { let text = "a" process(text) } func processAsync1(_ text: String) async { process(text) } func processAsync2(_ text: String) async { process(text) } @MainActor func processMain(_ text: String) async { process(text) } @MainActor func ver2_1() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { process("a") } let b: Task<Void, Never> = Task { process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func ver2_2() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await processAsync() } let b: Task<Void, Never> = Task { await processAsync() } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func ver2_3() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await processAsync1("a") await processAsync1("b") } let b: Task<Void, Never> = Task { await processAsync2("c") await processAsync2("d") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } actor T { func processAsync1(_ text: String) { process(text) } } let t1 = T() let t2 = T() @MainActor func ver2_4() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await t1.processAsync1("a") } let b: Task<Void, Never> = Task { await t2.processAsync1("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func main() async { // await ver2_1();print("") // await ver2_2();print("") await ver2_3();print("") // await ver2_4();print("") exit(EXIT_SUCCESS) } Task.detached { await main() } RunLoop.main.run()

坑 No.1

Task 有/無 mutable var


坑 No.2

async func 有/無 參數


Q & A


??? No.1


0296-async-await.md

However, many asynchronous functions are not just asynchronous: they’re also associated with specific actors

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


async function特定 actor 相關

// actor SpecificActor { func xxx() async {} //} // @SpecificActor func xxx() async {}


結論

一般 async func 有可能會延續上一個 actor嗎?


??? NO.2


swift.org

To create an unstructured task that runs on the current actor, call the Task.init(priority:operation:) initializer

Image Not Showing Possible Reasons
  • The image file may be corrupted
  • The server hosting the image is unavailable
  • The image path is incorrect
  • The image format is not supported
Learn More →


跑在 目前的 actor

func xxx() { // is actor? Task { } }


Reference

Select a repo