# Swift Concurrency Part 1 --- # [Swift Concurrency Manifesto](https://gist.github.com/yxztj/7744e97eaf8031d673338027d89eea76) ---- ## Callback hell > 俗稱 金字塔(`波動拳`) ```swift= func processImageData1(completionBlock: (result: Image) -> Void) { loadWebResource("dataprofile.txt") { dataResource in loadWebResource("imagedata.dat") { imageResource in decodeImage(dataResource, imageResource) { imageTmp in dewarpAndCleanupImage(imageTmp) { imageResult in completionBlock(imageResult) } } } } } ``` --- # Term ---- ## async/await ---- ## Continuations ---- ## Task * Unstructured Task * cancellation * Atached/Detached Task * `Task {}` * `Task.detached {}` ---- ## TaskGroup * Parent/Child Task * Structured Task * async let * TaskGroup ---- ## `Actor` * `actor` * Sendable * @unchecked * @Sendable * isolated/nonisolated ---- ## `GlobalActor` * `@globalActor` * `@MainActor` ---- ## TaskLocal ---- ## AsyncSequence ---- ## AsyncStream ---- ## Job 相關 * Excutor * SerialExecutor * UnownedSerialExecutor * UnownedJob --- ### [Concurrency vs. Parallelism](http://tutorials.jenkov.com/java-concurrency/concurrency-vs-parallelism.html) {%youtube Y1pgpn2gOSg %} ---- ### Concurrency Making progress on more than one task seemingly at the same time. ---- ### Concurrency ![](https://i.imgur.com/0vwXzPm.png) ---- ### Parallel Execution Making progress on more than one task at the exact same time. ---- ### Parallel Execution ![](https://i.imgur.com/OSWtQZS.png) --- # async/await ---- ## 宣告 ```swift= func download(_ url: URL) async throws -> Data {} func decode(_ data: Data) async throws -> UIImage {} ``` ---- ## 一般使用方式 > 在 async func 內使用 ```swift= func your_function() async throws -> UIImage { let data = try await download(url) let image = try await decode(data) return image } ``` ---- ## 在 sync func 內使用 > 透過 `Task` ```swift= func your_function_task() -> Task<UIImage, Error> { let task = Task { let data = try await download(url) let image = try await decode(data) // image ... return image } return taks } ``` ---- ## [get async throws](https://github.com/apple/swift-evolution/blob/main/proposals/0310-effectful-readonly-properties.md) ```swift= protocol RemoteData { var data: Data { get async throws } } struct Resouce: RemoteData { let url: URL var data: Data { get async throws { try await URLSession.shared.data(from: url).0 } } } ``` ---- ## closure ``` swift () async -> Void ``` ---- ## 隱式 closure ``` swift // () async -> Int let closure = { await getInt() } // implicitly async ``` ---- ## 對現有 api 的衝擊 ```swift= // old api func doSomething(completionHandler: ((String) -> Void)? = nil) { ... } // new api func doSomething() async -> String { ... } ``` ---- ### 對現有 api 的衝擊(rule) > 如果在 `同步` 預設優先呼叫 `同步` > 如果在 `非同步` 預設優先呼叫 `非同步` ---- ### 對現有 api 的衝擊(rule) ```swift= func xxx() { // old api // doSomething(completionHandler: nil) doSomething() } func xxx() async { // new api let str: String = await doSomething() } ``` ---- ## 自動隱式轉換 ```swift= var syncNonThrowing: () -> Void = {} var asyncNonThrowing: () async -> Void = {} asyncNonThrowing = syncNonThrowing ``` ---- ### 可以用同步函數替代非同步 ```swift= protocol Asynchronous { func f() async } struct S2: Asynchronous { func f() { } // okay, synchronous function satisfying async requirement } ``` ---- ## Test ``` swift class AsyncTests: XCTestCase { func testCallback() throws { let expectation = XCTestExpectation (description: "download completion") download(url) { result, error in XCTAssertEqual(result, Data()) expectation.fulfill() } wait(for: [expectation], timeout: 5.0) } func testAsync() async throws { let result = try await download(url) XCTAssertEqual(result, Data()) } } ``` ---- ## 限制 > 不能在 defer 內使用 await ```swift= defer { await xxx() // x } ``` ---- ## 限制 > @autoclosure sync closure 只能放在 async func ```swift= // x func computeArgumentLater<T> (_ fn: @escaping @autoclosure () async -> T) { } // o func computeArgumentLater<T> (_ fn: @escaping @autoclosure () async -> T) async { } ``` ---- ## 限制 同步函數 不能呼叫 異步函數 --- ### 同步函數 vs 異步函數 ---- ## 同步函數 > sync func ```swift= func xxx() {} ``` ---- ## 異步函數 > async func ```swift= func xxx() async {} ``` ---- ## 同步函數 * 執行過程中,只能在同一個 thread 執行 * call stack ---- ## 同步函數 ```swift= func a() { // in main thread b() // in main thread } func b() { // in main thread } ``` ---- ## 異步函數 * 執行過程中,可在任意 thread 切換執行 * 獨立存儲 ---- ## 異步函數 ```swift= func a() async { // in thread 1 await b() // in thread 2 await c() // in thread 3 } func b() async { // in thread 4 } func c() async { // in thread 5 } ``` ---- ## 暫停點(Suspension point) * 只會出現在 `顯試操作` * 潛在暫停點 * `await` * 暫停會中斷 `原子性` * 讓異步函數離開其 `thread` ---- ## 暫停點 ```swift= func a() async { // in thread 1 await b() // 暫停點 // in thread 2 await c() // 暫停點 // in thread 3 } func b() async { // in thread 4 // return 暫停點 } func c() async { // in thread 5 // return 暫停點 } ``` --- ## CheckedContinuation ---- ```swift= @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public func withCheckedContinuation<T>( function: String = #function, _ body: (CheckedContinuation<T, Never>) -> Void ) async -> T @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public struct CheckedContinuation<T, E> where E : Error {} ``` ---- ### [Converting closure-based code into async/await in Swift](https://www.andyibanez.com/posts/converting-closure-based-code-into-async-await-in-swift/) ---- ## async code ```swift= func downloadMetadata(for id: Int) async throws -> ImageMetadata { let metadataUrl = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part1/\(id).json")! let metadataRequest = URLRequest(url: metadataUrl) let (data, metadataResponse) = try await URLSession.shared.data(for: metadataRequest) guard (metadataResponse as? HTTPURLResponse)?.statusCode == 200 else { throw ImageDownloadError.invalidMetadata } return try JSONDecoder().decode(ImageMetadata.self, from: data) } ``` ---- ## callback code ```swift= let metadataUrl = URL(string: "https://www.andyibanez.com/fairesepages.github.io/tutorials/async-await/part1/\(imageNumber).json")! let metadataTask = URLSession.shared.dataTask(with: metadataUrl) { data, response, error in guard let data = data, let metadata = try? JSONDecoder().decode(ImageMetadata.self, from: data), (response as? HTTPURLResponse)?.statusCode == 200 else { completionHandler(nil, ImageDownloadError.invalidMetadata) return } let detailedImage = DetailedImage(image: image, metadata: metadata) completionHandler(detailedImage, nil) } metadataTask.resume() ``` ---- ## Continuations code ```swift= guard let data = data, let metadata = try? JSONDecoder().decode(ImageMetadata.self, from: data), (response as? HTTPURLResponse)?.statusCode == 200 else { completionHandler(nil, ImageDownloadError.invalidMetadata) return } let detailedImage = DetailedImage(image: image, metadata: metadata) completionHandler(detailedImage, nil) ``` ---- ## Explicit continuations * `withCheckedContinuation` * `CheckedContinuation<T, Never>` * `withCheckedThrowingContinuation` * `CheckedContinuation<T, Error>` ---- ## 使用 Continuation 封裝 callback code ```swift= func downloadImageAndMetadata(imageNumber: Int) async throws -> DetailedImage { return try await withCheckedThrowingContinuation({ (continuation: CheckedContinuation<DetailedImage, Error>) in downloadImageAndMetadata(imageNumber: imageNumber) { image, error in if let image = image { continuation.resume(returning: image) } else { continuation.resume(throwing: error!) } } }) } ``` ---- ## 注意事項 * 請確保 value/error 有被呼叫過一次 * `continuation.resume(returning: value)` * `continuation.resume(throwing: error)` * 呼叫兩次以上屬於未定義行為 * 未呼叫的話會讓 task 永遠停住 ---- ## Rx VS Continuation ---- ## Rx ```swift= public var data: Observable<Data> { return Observable.create { observer -> Disposable in self.dataRequest.responseData { (res) in do { try observer.onNext(res.result.get()) observer.onCompleted() } catch { observer.onError(error) } } return Disposables.create { self.dataRequest.cancel() } } } ``` ```swift= data.subscribe(onNext: { data in // ... }) ``` ---- ## Continuation ```swift= public var data: Data { get async throws { return try await withCheckedThrowingContinuation { continuation in self.dataRequest.responseData { (res) in do { let value = try res.result.get() continuation.resume(returning: value) } catch { continuation.resume(throwing: error) } } } } } ``` ```swift= let value = try await data ``` ---- ## Demo <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/vt5r2gbheff3xh3d63xepdo2gm/embedded/"> </iframe> --- ## UnsafeContinuation ```swift= @frozen struct UnsafeContinuation<T, E> where E : Error func withUnsafeContinuation<T>(_ fn: (UnsafeContinuation<T, Never>) -> Void) async -> T ``` ---- 跟 `CheckedContinuation` 擁有一樣的介面 可直接切換 無 run time check --- ## Task ---- ```swift= @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @frozen public struct Task<Success, Failure> : Sendable where Failure : Error { } @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension Task where Failure == Never { public init( priority: TaskPriority? = nil, operation: @escaping @Sendable () async -> Success ) } ``` ---- A unit of asynchronous work. 一個非同步工作的單元 ---- When you create an instance of `Task`, you provide a closure that contains the work for that task to perform. 當你建立 `Task` 實體時,你需要提供 closure 以供 task 執行 ```swift= let task = Task { return 1 } ``` ---- Tasks can start running immediately after creation; 任務可以在創建後立即開始運行 ---- you don't explicitly start or schedule them. 您不用明確啟動或安排它們。 ---- After creating a task, you use the instance to interact with it for example, to wait for it to complete or to cancel it. 創建任務後,您使用實例與其交互 例如,等待它完成或取消它。 ---- 等待它完成 <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/lvpgzooinvfrtegfg2pagllyuu/embedded/"> </iframe> ```swift= await task.value // or try await task.value await task.result ``` ---- 取消它 ```swift= task.cancel() ``` ---- It's not a programming error to discard a reference to a task without waiting for that task to finish or canceling it. 丟棄對任務的引用不是編程錯誤 無需等待該任務完成或取消它。 ---- A task runs regardless of whether you keep a reference to it. 無論您是否保留對它的引用,任務都會運行。 <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/2u2ayn7s5ffqznql5tm54lobmy/embedded/"> </iframe> ---- However, if you discard the reference to a task, you give up the ability to wait for that task's result or cancel the task. 但是,如果您放棄持有 `task` 你會失去等待該任務的結果或取消該任務之能力 ---- Only code that's running as part of the task can interact with that task. To interact with the current task, you call one of the static methods on `Task`. 只有運行在 `task` 內的 code 可以用 `static method` 跟 `當前 task` 互動 ```swift= Task { Task.isCancelled Task.currentPriority try Task.checkCancellation() await Task.yield() try await Task.sleep(nanoseconds: UInt64) } ``` --- # Task Cancellation ---- Tasks include a shared mechanism for indicating cancellation, but not a shared implementation for how to handle cancellation. 任務包括指示取消的共享機制,但不包括如何處理取消的共享實現。 ---- Depending on the work you're doing in the task, the correct way to stop that work varies. 根據您在任務中所做的工作,停止該工作的正確方法會有所不同。 ---- Likewise, it's the responsibility of the code running as part of the task to check for cancellation whenever stopping is appropriate. 同樣,作為任務的一部分運行的代碼有責任在適當的停止時檢查取消。 ---- In a long-task that includes multiple pieces, you might need to check for cancellation at several points, and handle cancellation differently at each point. 在包含多個部分的長任務中,您可能需要在多個點檢查取消,並在每個點以不同的方式處理取消。 ```swift= Task { while !Task.isCancelled { // Long run } } ``` ---- If you only need to throw an error to stop the work, call the `Task.checkCancellation()` function to check for cancellation. 如果只需要拋出一個錯誤來停止工作,調用`Task.checkCancellation()`函數來檢查是否取消。 ```swift= Task { try Task.checkCancellation() } ``` ---- Other responses to cancellation include returning the work completed so far, returning an empty result, or returning `nil`. 對取消的其他響應包括返回到目前為止已完成的工作、返回空結果或返回“nil”。 ```swift= Task { if Task.isCancelled { return nil } // return ... } ``` ---- Cancellation is a purely Boolean state; there's no way to include additional information like the reason for cancellation. 取消是一個純粹的 `bool` 狀態; 無法包含其他信息,例如取消原因。 ---- This reflects the fact that a task can be canceled for many reasons, and additional reasons can accrue during the cancellation process. 這反映了一個任務可以由於多種原因被取消的事實,並且在取消過程中可能會產生其他原因。 ---- ## Demo <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/vgn4s5cbf5h5jd6me4fhpgsokq/embedded/"> </iframe> --- ### Job ---- A task's execution can be seen as a series of periods where the task ran. Each such period ends at a suspension point or the completion of the task. ---- 運行中 task 可以被視為一連串的執行週期(series of periods) 這些週期的結束點位於 * task 暫停點 * 完成 task ---- These periods of execution are represented by instances of `PartialAsyncTask`. Unless you're implementing a custom executor, you don't directly interact with partial tasks. 這些執行週期可以 ``PartialAsyncTask`` 的實例呈現 除非你要實作 custom executor 否則你不必直接跟它做互動 ---- ## UnownedJob `PartialAsyncTask` 已改名為 `UnownedJob` --- ## [Unstructured Concurrency](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID643) Unlike tasks that are part of a task group, an unstructured task doesn’t have a parent task. ---- ### Attached Task To create an unstructured task that runs on the current actor, call the Task.init(priority:operation:) initializer. ```swift= Task {} ``` ---- ### Detached Task To create an unstructured task that’s not part of the current actor, known more specifically as a detached task, call the Task.detached(priority:operation:) class method. ```swift= Task.detached {} ``` --- ## Task 小結 ---- Task 建立後會自行執行 ---- cancel 只能停止 * ~~尚未開始的 Task~~ * 有支援取消流程的 Task ---- Task 的 cancel 流程必須自行掌控 ---- 一個 Task 可以拆分成多個 UnownedJob --- ## Actor Model {%youtube ELwEdb_pD0k %} ---- {%youtube 7erJ1DV_Tlo %} ---- ## 信箱 ---- ### 擁有復數地址 ---- ### 地址 != ID ---- ### 沒有 ID ---- ### 允許操作 * 建立 `子 actor` * 傳送訊息(message) * 決定如何處理下一條訊息 * FIFO 處理訊息 ---- ### ISOLATED * 擁有私有`狀態` * 不共享記憶體 ---- ## 各語言實作方式不同 --- ## Swift Actor ```swift= actor MyActor {} ``` ---- ### protocol Actor ```swift= @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol Actor : AnyObject, Sendable { nonisolated var unownedExecutor: UnownedSerialExecutor { get } } ``` ---- ### UnownedSerialExecutor ```swift= @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @frozen public struct UnownedSerialExecutor { @inlinable public init<E>(ordinary executor: E) where E : SerialExecutor } ``` ---- ### SerialExecutor ```swift= @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) public protocol SerialExecutor : Executor { func enqueue(_ job: UnownedJob) func asUnownedSerialExecutor() -> UnownedSerialExecutor } ``` ---- ### UnownedJob ``` swift= @available(SwiftStdlib 5.1, *) @frozen public struct UnownedJob: Sendable { private var context: Builtin.Job @_alwaysEmitIntoClient @inlinable public func _runSynchronously( on executor: UnownedSerialExecutor ) { _swiftJobRun(self, executor) } } ``` ---- ### UnownedJob ``` swift= @available(SwiftStdlib 5.1, *) @_silgen_name("swift_job_run") @usableFromInline internal func _swiftJobRun( _ job: UnownedJob, _ executor: UnownedSerialExecutor ) -> () ``` --- ## Actor reentrancy * Actor 的 `isolated functions` 是可重入的(reentrant). * 當 Actor 的 `isolated functions` 暫停時, 重入性會允許在原 isolated function 恢復之前,並在 actor 先執行其他工作,稱之 `interleaving`. ---- ### Actor reentrancy 預期是 before before after after <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/okf3jkkjzvcetmpl4dxig25ulq/embedded/"> </iframe> ---- ### Actor reentrancy(預期) <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/vrel7oeb3bg4blm2o55kkcb3qu/embedded/"> </iframe> ---- ### 結論(坑) async func 有機會排進 `actor's queue` --- ## Actor 猜想 ---- ### Thread * 擁有唯一一個 Thread? * 擁有可隨時切換 Thread? * async/await ---- ### Thread ```swift= func action() async { print("test") } actor Tester1 { func action() { print("test") } } actor Tester2 { func action() { print("test") } } Task.detached { await action() await Tester1().action() await Tester2().action() exit(0) } RunLoop.main.run() ``` ---- ### Thread ![](https://i.imgur.com/bSsuKfu.png) ---- ### Thread ![](https://i.imgur.com/iJlBOwi.png) ![](https://i.imgur.com/eTr6axU.png) ![](https://i.imgur.com/9fHkKv5.png) ---- ### Thread `Thread 2 Queue : com.apple.root.default-qos.cooperative (concurrent)` --- ### Queue > Message Queue? ---- ### Queue > Task Queue? ---- ### Queue > Job(UnownedJob) Queue ---- ### Executor * `UnownedSerialExecutor` * 以 FIFO 方式,在 Thread 執行 Job --- ## 問題 No.1 ```swift= import Foundation func process(_ text: String) { print("\(Date()) \(text) 1") sleep(2) print("\(Date()) \(text) 2") sleep(2) print("\(Date()) \(text) 3") } actor Test { var counter = 0 func ver1_1() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { process("a") } let b: Task<Void, Never> = Task { process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } func ver1_2() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { _ = counter process("a") } let b: Task<Void, Never> = Task { _ = counter process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } } @MainActor func main() async { let test = Test() await test.ver1_1();print("") await test.ver1_2();print("") exit(EXIT_SUCCESS) } Task.detached { await main() } RunLoop.main.run() ``` ---- ## 問題 No.2 ```swift= import Foundation func process(_ text: String) { print("\(Date()) \(text) 1") sleep(2) print("\(Date()) \(text) 2") sleep(2) print("\(Date()) \(text) 3") } func processAsync() async { let text = "a" process(text) } func processAsync1(_ text: String) async { process(text) } func processAsync2(_ text: String) async { process(text) } @MainActor func processMain(_ text: String) async { process(text) } @MainActor func ver2_1() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { process("a") } let b: Task<Void, Never> = Task { process("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func ver2_2() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await processAsync() } let b: Task<Void, Never> = Task { await processAsync() } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func ver2_3() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await processAsync1("a") await processAsync1("b") } let b: Task<Void, Never> = Task { await processAsync2("c") await processAsync2("d") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } actor T { func processAsync1(_ text: String) { process(text) } } let t1 = T() let t2 = T() @MainActor func ver2_4() async { print("\(Date()) \(#function) start") let a: Task<Void, Never> = Task { await t1.processAsync1("a") } let b: Task<Void, Never> = Task { await t2.processAsync1("b") } sleep(2) let (_, _) = await (a.value, b.value) print("\(Date()) end") } @MainActor func main() async { // await ver2_1();print("") // await ver2_2();print("") await ver2_3();print("") // await ver2_4();print("") exit(EXIT_SUCCESS) } Task.detached { await main() } RunLoop.main.run() ``` --- ## 坑 No.1 Task 有/無 mutable var <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/xx5n6mvxv5arbkr3hkl6fwk4o4/embedded/"> </iframe> ---- ## 坑 No.2 async func 有/無 參數 <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/pqq74s2cdzbm5htskkbeiu7wq4/embedded/"> </iframe> --- ### Q & A --- ## ??? No.1 ---- ### [0296-async-await.md](https://github.com/apple/swift-evolution/blob/main/proposals/0296-async-await.md) > However, many asynchronous functions are not just asynchronous: they’re also associated with specific actors ![](https://i.imgur.com/rAkq5tO.png) ---- #### `async function` 跟 `特定 actor` 相關 ```swift= // actor SpecificActor { func xxx() async {} //} // @SpecificActor func xxx() async {} ``` ---- <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/77nint5bwrc7dbzpup3hkuykka/embedded/"> </iframe> ---- ### 結論 一般 async func 有可能會延續上一個 actor嗎? --- ## ??? NO.2 ---- ### [swift.org](https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html#ID643) > To create an unstructured task that runs on the current actor, call the `Task.init(priority:operation:)` initializer ![](https://i.imgur.com/OIt0649.png) ---- #### 跑在 `目前的 actor` ```swift= func xxx() { // is actor? Task { } } ``` ---- <iframe width="100%" height="300" frameborder="0" src="https://swiftfiddle.com/ux5wjo4mpfcxrfynxes4fvvsae/embedded/"> </iframe> --- ### Reference * [SE 296](https://github.com/apple/swift-evolution/blob/main/proposals/0296-async-await.md) * [SE 306](https://github.com/apple/swift-evolution/blob/main/proposals/0306-actors.md) * [Task](https://developer.apple.com/documentation/swift/task) * [Continuation](https://developer.apple.com/documentation/swift/checkedcontinuation) * ...
{"title":"Swift Concurrency Part1","tags":"swift","slideOptions":{"theme":"league","transition":"fade"}}
    822 views