# `平行處理與非同步`
[TOC]
使用 ORID 的方式分享讀後心得
* 「Objective」指了解客觀事實的問句
* 在這個章節上,你看到了什麼?還記得哪些段落?
* 「Reflective」喚起情緒與感受的問句
* 這章有哪些部份是讓你印象深刻的嗎?
* 讀完這個章節的感受是什麼?
* 「Interpretive」尋找解釋前述感受的問句
* 為什麼書中的這些部份讓你印象深刻?
* 為什麼會帶給你這樣的感受(感動/驚訝/難過/開心)?
* 引發你想到了什麼經驗?重要的意義是什麼?
* 「Decisional」找出決議和行動的問句
* 這個章節有帶給我們可以改變或應用的地方嗎?
* 接下來你的行動/計劃會是什麼?
詳細內容可以參考這個 -> [焦點討論法 (ORID)](http://kojenchieh.pixnet.net/blog/post/391843868-%E7%84%A6%E9%BB%9E%E8%A8%8E%E8%AB%96%E6%B3%95-(orid))
主持人輪流做,負責發會議通知、訂會議室,時間是每週二下午五點。
在參與的過程中讓由主持人先進行分享,在由其他人分享自己共筆中所記錄到遺漏的部份,並提出問題
用開放式的討論:
* 不要怕自己問蠢問題
* 不要用批評或攻擊的
## Task Programming
### Max
.net framework 4.0 推出了**TAP(Task-based asynchronous pattern)**, 讓我們可以使用Task明確定義要切分到另一條thread執行的工作.
在4.0之前只能自行操控**Thread、APM (Asynchronous Programming Model) 或 EAP (Event-based Asynchronous Pattern)** 等方法去達成非同步的目的。
目前微軟官方建議直接使用TAP+後來的Async Await, 不要再專案中撰寫APM 或 EAP
[TAP 相關簡介](https://docs.microsoft.com/en-us/dotnet/standard/asynchronous-programming-patterns/task-based-asynchronous-pattern-tap)
#### Task 操作介紹:
``` CSharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication3
{
/// <summary>
/// 示範 Task
/// tip: 關鍵字: wait系列, await, result, getResult 要等, 其他都不用
/// </summary>
class Program
{
static void Main(string[] args)
{
//Task.Start() => 不等
Task slowTask = new Task(() => RunSlowly("【7】我是 SlowTask 我等 1 秒"));
slowTask.Start();
//Task.Factory.StartNew() => 不等
Task.Factory.StartNew(() => RunQuickly("【3】我是 QuickTask 我等 0.3 秒"));
Console.WriteLine("【1】我是主程式 我不用等 slowTask 跟 quickTask 我會最先跑");
//【Run → Result】-------------------------------------------------------------------------------
//Task.Run() => 不等
var mediumTask = Task.Run<string>(() => { return GetReturn("【4】我是 MediumTask 我等 0.6 秒"); });
Console.WriteLine("【2】並不是 Task.Run 阻塞主程式");
//Task.Result => 要等
Console.WriteLine(mediumTask.Result);
Console.WriteLine("【5】而是 MediumTask.Result 讓主程式被迫等待");
//------------------------------------------------------------------------------------------------
//【Start → Wait】-------------------------------------------------------------------------------
//Task.Start() => 不等
Task dbTask = new Task(() => GetDataFromDb("【8】花了 2 秒鐘將資料從 DB 取回"));
dbTask.Start();
Console.WriteLine("【6】並不是 Task.Start 阻塞主程式");
//Task.Wait() => 要等
dbTask.Wait();
Console.WriteLine("【9】而是 Task.Wait() 讓主程式被迫等待");
//------------------------------------------------------------------------------------------------
Console.Read();
}
static void RunQuickly(string str)
{
Thread.Sleep(300);
Console.WriteLine(str);
}
static void RunSlowly(string str)
{
Thread.Sleep(1000);
Console.WriteLine(str);
}
static string GetReturn(string str)
{
Thread.Sleep(600);
return str;
}
static void GetDataFromDb(string str)
{
Thread.Sleep(2000);
Console.WriteLine(str);
}
}
}
```
##### 總結:
1. **Two ways of using tasks**
- `Task.Factory.StartNew() creates and starts a Task`
- `new Task(() => { ... }) creates a task; use Start() to fire it`
2. **Tasks take an optional 'object' argument**
- `Task.Factory.StartNew(x => { foo(x) }, arg);``
3. **To return values, use `Task<T>` instead of Task**
- To get the return value. use t.Result (this waits until task is complete)
4. **Use Task.CurrentId to identify individual tasks.**
#### **Cancel Task**
.net framework 4.0之後, 可指定Cancel token 到 task中, 即可透過兩種方法取消Task:
1. 檢查到 `ct.IsCancellationRequested == true;`之後, 直接從delegate return, 但這樣Task的狀態會直接切至RanToCompletion, 代表已完成
2. 檢查到 `ct.IsCancellationRequested == true;`之後, 執行`ct.ThrowIfCancellationRequested();`, 此時task的狀態就會明確的切換至Canceled
Task的狀態如下
![](https://i.imgur.com/A0HOfoD.png)
* 補充: 可透過`CancellationTokenSource.CreateLinkedTokenSource`將多個CancellationTokenSource串聯成一個_linkingRegistrations, 此時只要任一個Token cancel即會中斷task.
``` CSharp
private static void CompositeCancelationToken()
{
// it's possible to create a 'composite' cancelation source that involves several tokens
var planned = new CancellationTokenSource();
var preventative = new CancellationTokenSource();
var emergency = new CancellationTokenSource();
// make a token source that is linked on their tokens
var paranoid = CancellationTokenSource.CreateLinkedTokenSource(
planned.Token, preventative.Token, emergency.Token);
Task.Factory.StartNew(() =>
{
int i = 0;
while (true)
{
paranoid.Token.ThrowIfCancellationRequested();
Console.Write($"{i++}\t");
Thread.Sleep(100);
}
}, paranoid.Token);
paranoid.Token.Register(() => Console.WriteLine("Cancelation requested"));
Console.ReadKey();
// use any of the aforementioned token soures
//emergency.Cancel();
preventative.Cancel();
}
```
* CancellationToken.WaitHandle.WaitOne(int millisecondsTimeout)
- 執行此指令時, task會暫停特定秒數並等待有無cancel的指令, 如果超過時間未收到則繼續往下執行, 如果未傳入時間則會無限等待至被cancel為止才往下。
``` CSharp
static void Main(string[] args)
{
// we've already seen the classic Thread.Sleep
var cts = new CancellationTokenSource();
var token = cts.Token;
var t = new Task(() =>
{
Console.WriteLine("You have 5 seconds to disarm this bomb by pressing a key");
bool canceled = token.WaitHandle.WaitOne(5000);
Console.WriteLine(canceled ? "Bomb disarmed." : "BOOM!!!!");
}, token);
t.Start();
// unlike sleep and waitone
// thread does not give up its turn
// avoiding a context switch
//SpinWait.SpinUntil(() => false);
//Console.WriteLine("Are you still here?");
Console.ReadKey();
cts.Cancel();
Console.WriteLine("Main program done, press any key.");
Console.ReadKey();
}
```
#### Task相關的等待機制
- `Task.Wait();` => 直到task完成才往下
- `Task.Wait(int millisecondsTimeout)` => 等待Task特定秒數, 如果未完成就直接往下執行
- `Task.WaitAll(arams System.Threading.Tasks.Task[] tasks); ` => 等待所有傳入的task都完成才往下
- `Task.WaitAny (System.Threading.Tasks.Task[] tasks);` => 等待任一task完成及往下執行
#### Task的Excetpion Handling
可透過 `catch (AggregateException ae)`取得所有task的exception, 並Foreach逐筆處理
``` CSharp
try
{
Task.WaitAll(t, t2);
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
Console.WriteLine($"Exception {e.GetType()} from {e.Source}.");
}
}
```
亦可透過`AggregateException.Handle(Func<Exception,bool> predicate)`處理, 在func裡面如果return false的話, 則會在throw到外層一次
## Concurrent Collections
### Allen
#### Thread safe collections
.NET Framework 4 引進了 System.Collections.Concurrent 命名空間,其中包含數個兼具安全執行緒與調整能力的集合類別。 多個執行緒可以安全且有效率地新增或移除這些集合中的項目,而不需要利用使用者程式碼進行額外同步處理。 當您撰寫新的程式碼時,只要多個執行緒將同時寫入集合,就使用並行集合類別。
* ConcurrentDictionary
* ConcurrentQueue
* ConcurrentStack
* ConcurrentBag
* BlockingCollection (生產/消費者模式)
#### ConcurrentCollection Performance compare
![](https://i.imgur.com/1AHzgRy.png)
```charper=
static ConcurrentBag<int> bag = new ConcurrentBag<int>();
static ConcurrentStack<int> stack = new ConcurrentStack<int>();
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static ConcurrentDictionary<int, int> dictionary = new ConcurrentDictionary<int, int>();
private static void addStack(int obj) { stack.Push(obj); }
private static void addQueue(int obj) { queue.Enqueue(obj); }
private static void addBag(int obj) { bag.Add(obj); }
private static void addDictionary(int obj) { dictionary.TryAdd(obj, obj); }
private static void popStack(int obj) { stack.TryPop(out var _); }
private static void dequeueQueue(int obj) { queue.TryDequeue(out var _); }
private static void takeBag(int obj) { bag.TryTake(out var _); }
private static void removeDictionary(int obj) { dictionary.TryRemove(new KeyValuePair<int, int>(obj, obj)); }
static void Main()
{
ConcurrentBag<int> bag = new ConcurrentBag<int>();
ConcurrentStack<int> stack = new ConcurrentStack<int>();
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
run(addBag);
run(addStack);
run(addQueue);
run(addDictionary);
run(takeBag);
run(popStack);
run(dequeueQueue);
run(removeDictionary);
Console.ReadLine();
}
private static void run(Action<int> action)
{
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 10000000, new ParallelOptions() { MaxDegreeOfParallelism = 8 }, action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
```
#### BlockingCollection (生產/消費者模式)
提供安全執行緒集合適用的封鎖和界限容量
* **Add** 方法用於向集合添加元素。
* **Take** 方法用於從集合中獲取元素。當集合為空時,Take 方法將阻塞,直到獲取到新元素。
* **CompleteAdding** 方法標記集合為完成狀態,此後不能再向集合中添加元素,調用 Add 將拋出 System.InvalidOperationException 異常。
* 調用 **CompleteAdding** 方法將使阻塞狀態的 Take 方法拋出 System.InvalidOperationException 異常。
* 實例化 BlockingCollection\<T> 時,可以傳入 boundedCapacity 參數,設置集合的上限,集合中元素到達上限後,Add 方法將阻塞。
* **TryAdd** 方法在集合滿時,不會阻塞,而是直接返回 false,並且丟棄要插入的元素。
* **TryTake** 方法在集合為空時不會阻塞,而是會返回 false。
* 當有多個線程 Take 時,將形成一個 Take 隊列,依次獲取到元素。
#### 生產/消費者模式範例:
```charper=
void Main()
{
var producer = Task.Run(() => Producer()); //改成TPL的方式
var consumer = Task.Run(() => Consumer());
}
private static BlockingCollection<int> data = new BlockingCollection<int>(); //共享資料
private static void Producer() //生產者
{
for (int ctr = 0; ctr < 20; ctr++)
{
Thread.Sleep(100);
data.Add(ctr);
}
}
private static void Consumer() //消費者
{
foreach (var item in data.GetConsumingEnumerable())
{
Console.WriteLine(item);
Thread.Sleep(100);
}
}
```
## Task Coordination
### Jill
Task 有一些協調的方法 可以讓它們彼此間合作
#### Continuations
- 如何讓多個 Tasks 之間,依照特定的順序執行任務
- 例如某些 Task 可能仰賴於前一個 Task 的結果
- 方法:
- **ContinueWith** : 接在指定的 Task 後面執行 (1 個 Task 跟在 另一個 Task 後面執行)
- 可以使用 **TaskContinuationOptions** 條件式決定是否要接續執行
- NotOnFaulted : 前項在完成時的 Status 屬性為 Faulted,則擲回未處理的例外狀況。 這個選項對多工接續而言無效
- NotOnRanToCompletion : 當前項執行完成時,不應該排程接續工作。 如果前項在完成時的 Status 屬性為 RanToCompletion,則這個選項對多工接續而言無效
- OnlyOnFaulted : 只有在前項擲回未處理的例外狀況時,才應排程接續工作
- OnlyOnRanToCompletion : 指定只有在接續的前項徹底執行後,才應該排定接續
```
task.ContinueWith(t => {...}, TaskContinuationOptions.option )
```
- **ContinueWhenAll** : 當所有 Task 都執行完畢後,才接著執行 (one to many)
```
Task.Factory.ContinueWhenAll(new[] {task, task2},
task =>
{
...
})
```
- **ContinueWhenAny** : 當有一個 Task 執行完畢後,就直接接著執行 (one to any)
```
Task.Factory.ContinueWhenAny(new[] {task, task2},
task =>
{
...
})
```
- 補充:
要留意有些 Continuation 可能永遠不會發生
#### Child Tasks
- 在某個 Task 裡,再創建 Task
- 預設: Child Task 會與 Parent Task 各別單獨執行
- Detached:
- 創一個 Task 在一個 Task 裡,寫法一樣
- 行為模式跟一般創建 Task 是一樣的,沒有 關聯
- Attached:
- **TaskCreationOptions.AttachedToParent**
- Waiting on parent => waiting on child complete
#### Barrier
- 允許多項工作在**多個階段**中以**平行方式**來合作處理某個演算法
- 屬性:
- CurrentPhaseNumber : 取得屏障目前階段的編號
- ParticipantCount : 取得在屏障中的參與者總數
- ParticipantsRemaining : 取得在目前階段中尚未發出訊號的屏障中參與者數目
- 方法:
- SignalAndWait : 發出訊號,表示參與者已到達屏障,並且在等候所有其他參與者到達屏障
- AddParticipant(s) : 通知 Barrier,表示要增加其他參與者
- RemoveParticipant(s) : 通知 Barrier,表示會減少參與者
#### CountdownEvent
- 指定一個數量,當數量被扣為 0 時,才可執行後續任務
- 永遠都是在 減少 count
- 屬性:
- CurrentCount : 取得設定事件時需要的剩餘訊號次數
- InitialCount : 取得設定事件一開始時所需要的訊號次數
- IsSet : 目前計數是否已達到零
- 方法:
- Signal : 註冊訊號,並遞減 CurrentCount 的值
- Wait : 封鎖目前的執行緒,直到 Count 為0
![](https://i.imgur.com/K1MefKv.png)
- 與 Barrier 的不同之處:
- 可以放置 Signal 在想要 change counter的地方
- 而且在別的地方再放置 Wait 等待 counter 變為 0
- 當 counter 為0時,在 Wait 的後面繼續執行想做的事
- 與 ContinueWhenAll 的不同之處:
- ContinueWhenAll 需要全部的 Task 都執行完畢才會執行
- CountdownEvent 則是 wait 到 counter 為 0 則直接執行
#### ManualResetEventSlim / AutoResetEvent
**[ManualResetEventSlim]**
- 表示執行緒同步處理事件,收到訊號時,必須手動重設。 此類別是 ManualResetEvent 的輕量型替代方案
- 方法:
- Set : 將事件的狀態設定為已收到訊號,讓正在等候該事件的一或多個執行緒繼續執行
- Wait : 封鎖目前的執行緒,直到收到訊號為止
**[AutoResetEvent]**
- 與 ManualResetEventSlim 的不同
- 必需要定義 initial state in the constructor
```
var evt = new AutoResetEvent(false);
```
- Wait 改成 WaitOne
- WaitOne 會等到有 Set 後才能繼續執行,但是 AutoResetEvent 會自動將狀態切回 false, 而 ManualResetEventSlim 則會一直保持狀態為 True
**[補充]**
- 如果是簡單的例子,其實可以直接使用 Continuations 提到的方法就行
#### SemaphoreSlim
- 可以同時 遞增 跟 遞減 count
- 限制可同時存取一項資源或資源集區的執行緒數目
想像有一個工具箱,一開始被放置了 n 把工具,整個工具箱最多可以放置 m 把工具,每個工人需要在工作之前從工具箱取走至少 1 把工具才能進行作業,直到工具箱沒有工具時,之後的工人就必須等待,等到有工人完成作業將工具歸還到工具箱,才能繼續從工具箱取走工具進行作業,而總工具數量也不是一成不變的,可以視整體的工作情況進行增減,但要注意工具箱的可容納上限
一個工具箱:一個 SemaphoreSlim 實例
一開始被放置的 n 把工具:SemaphoreSlim 建構式的 initialCount
最多可以放置 m 把工具:SemaphoreSlim 建構式的 maxCount,預設值為 Int32.MaxValue。
```
var semaphore = new SemaphoreSlim(initialCount, maxCount);
```
工人:執行程式的 Thread
取走工具:SemaphoreSlim 的 Wait() 方法
歸還工具:SemaphoreSlim 的 Release() 方法
#### 總結 (Synchronization Primitives)
- Have a counter
- Execute N threads at a time
- Other threads are unbloked until state changes
Barrier :
- 有多個 stages 而且是有順序性,當所有的 threads 都要完成同個階段後的任務,才能一起進入下一個階段
- SignalAndWait()
CoundownEvent :
- Signal and Wait separate
- Wait until signal level reaches 0 , then unblock
ManualResetEventSlim / AutoResetEvent :
- Count 只有 1
- AutoResetEvent reset count after waiting
SemaphoreSlime :
- counter decreased by Wait() and increased by Release(N)
- have a maximum
## Parallel Loops
### Terence
#### Parallel Invoke/For/ForEach
* Parallel Invoke: Executes each of the provided actions, possibly in parallel.
* Parallel For: Executes a for loop in which iterations may run in parallel.
* Parallel ForEach: Executes a foreach operation in which iterations may run in parallel.
#### Breaking, Cancellations and Exceptions
In parallel for、foreach loop use ParallelLoopState's Stop/Break method to control flow.
* ParallelLoopState.Break(): 在完成當前的動作之後,不再執行後續的動作,但在當前動作開始之前**已經在執行**的工作,則必須完成。
* ParallelLoopState.Stop(): 不會再創建新的線程執行,而且當前“已經在執行”的工作也會被中止。
透過 CancellationTokenSource 來控制迴圈的行為,要將平行呼叫包含在 try catch 區塊裡
```C#
try
{
parallel method
}
catch (OperationCanceledException ce)
{
當平行迴圈發現CancellationToken Cancel時,就會拋出 OperationCanceledException
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
Console.WriteLine(e.Message);
return true;
});
}
```
使用ParallelLoopState.IsExceptional來判斷迴圈是否有Exception,並作相對應的處理(寫Log之類)
補充: AggregateException 用來將多個失敗合併成單一可擲回的例外狀況物件。 它廣泛用於工作 平行程式庫 (TPL) 和 平行 LINQ (PLINQ)
#### Thread Local Storage
```C#
Parallel.For(1, 1001, x =>
{
Console.Write($"[{x}] t={Task.CurrentId}\t");
Interlocked.Add(ref sum, x); // concurrent access to sum from all these threads is inefficient
// all tasks can add up their respective values, then add sum to total sum
});
Parallel.For(1, 1001,
() => 0, // initialize local state, show code completion for next arg
(x, state, tls) =>
{
//Console.WriteLine($"{Task.CurrentId}");
tls += x;
Console.WriteLine($"Task {Task.CurrentId} has sum {tls}");
return tls;
},
partialSum =>
{
Console.WriteLine($"Partial value of task {Task.CurrentId} is {partialSum}");
Interlocked.Add(ref sum, partialSum);
}
);
Console.WriteLine($"Sum of 1..100 = {sum}");
```
#### Partitioning
The Partitioner class is used to make parallel executions more chunky. If you have a lot of very small tasks to run in parallel the overhead of invoking delegates for each may be prohibitive. By using Partitioner, you can rearrange the workload into chunks and have each parallel invocation work on a slightly larger set.
```C#
public void SquareEachValue()
{
const int count = 100000;
var values = Enumerable.Range(0, count);
var results = new int[count];
Parallel.ForEach(values, x => { results[x] = (int)Math.Pow(x, 2); });
}
public void SquareEachValueChunked()
{
const int count = 100000;
var results = new int[count];
var part = Partitioner.Create(0, count, 10000); // rangeSize = size of each subrange
Parallel.ForEach(part, range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
results[i] = (int)Math.Pow(i, 2);
}
});
}
```
## Data Sharing and Synchronization
### Wayne
#### atomic operation(原子操作)
- 多個執行緒取得同一個全域性資源時,能夠確保所有其他的執行緒都不在同一時刻取得相同的資源
#### lock
- 最一般的鎖,資源使用完自動釋放
#### Interlock operation
- 為多重執行緒共用的變數提供原子操作
#### spin lock
- 沒有獲得鎖的執行緒不會Blocking,會無窮迴圈嘗試直到獲取鎖為止
- 特性
1.比較浪費CPU資源
2.用於短暫間格,防止重複取得資源
#### reader-writer-lock
- 一般版
啟動reader-lock之後,即無法異動資料,等到全部的reader-lock都是釋放後才能異動
- 升級版
可以在reader-lock內使用write-lock,此時即可異動資料,只是全部的執行續變成都要排隊
- 特性
1.用於處理資料異動
2.support lock recursion
#### mutex
- 沒有獲得鎖的執行緒會Blocking,直到獲取鎖為止
- 特性
1.用於比較久的間格
2.可以同時間監控多個鎖
3.local、global
## Parallel LINQ
### Yusheng
#### AsParallel AND AsOrdered
```C#
private static void Demo()
{
const int count = 50;
var items = Enumerable.Range(1, count).ToArray();
var cubes = items.AsParallel()
// .AsOrdered()
.Select(x =>
{
var val = x * x * x;
Console.WriteLine($"{val} ({Task.CurrentId})\t");
return val;
});
foreach (var i in cubes.ToArray())
Console.Write($"{i} ({Task.CurrentId?.ToString()?? "main"})\t");
Console.WriteLine();
}
```
![](https://i.imgur.com/GIb2fEq.png)
- AsOrder會維持原本集合的順序
#### Cancellation
```C#
static void Demo()
{
var cts = new CancellationTokenSource();
var items = Enumerable.Range(1, 20);
var results = items.AsParallel()
.WithCancellation(cts.Token)
.Select(i =>
{
double result = Math.Log10(i);
Console.WriteLine($"i = {i}, tid = {Task.CurrentId}");
//if (result == 1) cts.Cancel();
if (result == 1) throw new InvalidOperationException();
return result;
});
foreach (var c in results)
{
//if (c == 1) cts.Cancel();
//if (c == 1) throw new InvalidOperationException();
Console.WriteLine($"result = {c}, tid = {Task.CurrentId?.ToString() ?? "main"}");
}
}
```
![](https://i.imgur.com/o675uny.png)
- throw exception 不會把其他thread停下來,cancel會
#### MergeOptions
```C#
public static void Demo()
{
var numbers = Enumerable.Range(1, 20).ToArray();
var results = numbers.AsParallel()
.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.Select(x =>
{
var result = Math.Log10(x);
Console.WriteLine($"Produced {result}");
return result;
});
foreach (var result in results)
{
Console.WriteLine($"Consumed {result}");
}
}
```
- FullyBuffered = all results produced before they are consumed
- NotBuffered = each result can be consumed right after it's produced
- Default = AutoBuffered = buffer the number of results selected by the runtime
- AsOrdered也會有FullyBuffered的效果
#### Aggregate
```C#
public static void Demo()
{
var sum = ParallelEnumerable.Range(1, 1000)
.Aggregate(
0, // initial seed for calculations
(partialSum, i) => partialSum += i, // per task
(total, subtotal) => total += subtotal, // combine all tasks
i => i); // final result processing
Console.WriteLine($"Sum is {sum}");
}
```
## Async/Await
### Kevin
#### 語法糖
用async, await來簡化promise跟ContinueWith
``` javascript
const p = new Promise((resolve) => {
if (true) {
resolve(`直接執行 Promise`)
}
})
function PromiseFn(String) {
return new Promise((resolve, reject) => {
setTimeout(function () {
if (String.length >= 5) {
resolve(`Promise 成功, ${String} 共 ${String.length} 個字 `)
} else {
reject('Promise 失敗')
}
}, 2000);
});
}
PromiseFn('Ryder')
.then((res) => {
console.log(res) // Promise 成功, Ryder 共 5 個字
return PromiseFn('youtube')
}).then((res) => {
console.log(res) // Promise 成功, youtube 共 7 個字
return p
}).then((res) => {
console.log(res) // 直接執行 Promise
})
----------------------------------------------------------------------
const p = new Promise((resolve) => {
if (true) {
resolve(`直接執行 Promise`)
}
})
function PromiseFn(String) {
return new Promise((resolve, reject) => {
setTimeout(function () {
if (String.length >= 5) {
resolve(`Promise 成功, ${String} 共 ${String.length} 個字 `)
} else {
reject('Promise 失敗')
}
}, 2000);
});
}
async function usePromise() {
const data1 = await PromiseFn('Ryder');
console.log(data1) // Promise 成功, Ryder 共 5 個字
const data2 = await PromiseFn('youtube');
console.log(data2) // Promise 成功, youtube 共 7 個字
const data3 = await p
console.log(data3); // 直接執行 Promise
}
usePromise();
```
``` C#
public Task<int> CalculateValueAsync()
{
return Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
return 123;
});
}
var calculation = CalculateValueAsync();
calculation.ContinueWith(t =>
{
LblResult.Text = t.Result.ToString();
}, TaskScheduler.FromCurrentSynchronizationContext());
-------------------------------------------------------------
public async Task<int> CalculateValueAsync()
{
return Task.Factory.StartNew(() =>
{
Thread.Sleep(5000);
return 123;
});
}
int value = await CalculateValueAsync();
LblResult.Text = value.ToString();
```
#### 非同步執行的順序
![](https://i.imgur.com/JL729fa.png)
[非同步執行順序](https://docs.microsoft.com/zh-tw/dotnet/csharp/programming-guide/concepts/async/task-asynchronous-programming-model)
#### state machine
![](https://i.imgur.com/UsentT7.png)
![](https://i.imgur.com/FdJEjPG.png)
[狀態機](https://www.cnblogs.com/yaopengfei/p/12848795.html)
## Async/Await
## Min
### WhenAll VS WaitAll
```C#
public class InterlockedOperations
{
public static async Task<Test> TestMethod1()
{
await Task.Delay(1000);
Console.WriteLine($"execute : {DateTime.Now}"); ;
return new Test(){Id = 1};
}
public static async Task<Test> TestMethod2()
{
await Task.Delay(1000);
Console.WriteLine($"execute : {DateTime.Now}"); ;
return new Test() { Id = 2 };
}
public static async Task<Test> TestMethod3()
{
await Task.Delay(1000);
Console.WriteLine($"execute : {DateTime.Now}"); ;
return new Test() { Id = 3 };
}
public static async Task Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
Test[] result = await Task.WhenAll(TestMethod1(), TestMethod2(), TestMethod3());
Console.WriteLine(result[0].Id);
sw.Stop();
Console.WriteLine(sw.Elapsed.TotalMilliseconds);
sw.Restart();
// wait all will block main thread
var taskArray = new Task[] { TestMethod1() , TestMethod2(), TestMethod3() };
Task.WaitAll(taskArray);
var obj1 = ((Task<Test>)taskArray[0]).Result;
Console.WriteLine(obj1.Id);
sw.Stop();
Console.WriteLine(sw.Elapsed.TotalMilliseconds);
sw.Restart();
//easy to trouble shooting, performance suck
var t1 = await TestMethod1();
var t2 = await TestMethod2();
var t3 = await TestMethod3();
Console.WriteLine(t1.Id);
sw.Stop();
Console.WriteLine(sw.Elapsed.TotalMilliseconds);
//非同步轉同步
Task<Test> t4 = TestMethod1();
Test taskResult = t4.Result;
var t5 = TestMethod1();
var awaiterResult = t5.GetAwaiter().GetResult();
}
}
```
Task.WaitAll 阻塞當前線程,WhenAll會再起另一個thread去等待不阻塞當前線程
![](https://i.imgur.com/Rkf8WAI.png)
[SynchronizationContext](https://blog.opasschang.com/understand-csharp-asyn/)
WhenAny的一種情境…
```C#
Task.WhenAny(httpDownLoad, ftpDownLoad);
```
可平行跑,但又只需要一個結果時,就能用WhenAny
### Async Initialization
await有擴散性,await就得變async Task...
A => B => C => D (都是同步,相安無事)
其中一個改async...所有用到他的人都要await
Solution A:
<font color=#FF6600>async Task</font> A =><font color=#FF6600>async Task</font> B => <font color=#FF6600>async Task</font> C => async Task D
Solution B:
A => B => C(<font color=#FF6600>D.result把非同步變同步</font>) => async Task D
基於上述的擴散性,建構子async, 因為建構子不能return Task...建構子只能return該物件
so...建構子裡不能直接await async task
但如果想做到Construct裡呼叫async task,可以參考下面的作法
#### New object, and trigger async init method
```C#
namespace DataSharingAndSynchronization
{
public class InterlockedOperations
{
public static async Task Main(string[] args)
{
Foo foo = new Foo();
await foo.InitAsync();
}
}
public class Foo
{
public Task<Foo> task;
public Foo()
{
}
public async Task<Foo> InitAsync()
{
await Task.Delay(1000);
return this;
}
}
}
```
#### Async Factory Method
```C#
namespace DataSharingAndSynchronization
{
public class InterlockedOperations
{
public static async Task Main(string[] args)
{
Foo foo = await Foo.CreateInitAsync();
}
}
public class Foo
{
private Foo()
{
}
private async Task<Foo> InitAsync()
{
await Task.Delay(1000);
return this;
}
public static Task<Foo> CreateInitAsync()
{
var foo = new Foo();
return foo.InitAsync();
}
}
}
```
#### Async Initilization Pattern
```C#
namespace DataSharingAndSynchronization
{
public interface IAsyncInit
{
public Task InitTask { get; }
}
public class InterlockedOperations
{
public static async Task Main(string[] args)
{
var demo = new Demo();
if (demo is IAsyncInit ai)
await demo.InitTask;
var demo2 = new Demo2(demo);
await demo2.InitTask;
}
}
public class Demo2 : IAsyncInit
{
public Task InitTask { get; }
private readonly Demo _demo;
public Demo2(Demo demo)
{
this._demo = demo;
InitTask = InitAsync();
}
public async Task InitAsync()
{
if (_demo is IAsyncInit ai)
await _demo.InitAsync();
await Task.Delay(1000);
}
}
public class Demo : IAsyncInit
{
public Demo()
{
InitTask = InitAsync();
}
public async Task InitAsync()
{
await Task.Delay(1000);
}
public Task InitTask { get; }
}
}
```
#### Async Lazy Initilization
```C#
public class Stuff
{
private static int value;
private readonly Lazy<int> NormalLazy
= new Lazy<int>(() =>
{
return value++;
});
private readonly Lazy<Task<int>> AutoIncValue
= new Lazy<Task<int>>(async () =>
{
await Task.Delay(1000);
return value++;
});
private readonly Lazy<Task<int>> AutoIncValue2
= new Lazy<Task<int>>(() => Task.Run(async () =>
{
await Task.Delay(1000);
return value++;
}));
// Nito.AsyncEx
public AsyncLazy<int> AutoIncValue3
= new AsyncLazy<int>(async () =>
{
await Task.Delay(1000);
return value++;
});
public async Task<int> UserValue()
{
int value2 = await AutoIncValue.Value;
return value;
}
}
```
### ValueTask
Task是一種class...大量的async await會造成GC的壓力
有些東西c#會做cache去減輕GC壓力
因此ValueTask因此而生…ValueTask用的是struct不是ref type
1. Task<TResult> 類別
所有的類別(**class**)都是一種參考型別 (Reference Type),這意味著,當你在執行一個標示 async 的非同步方法時,若該方法會透過 Task<TResult> 物件立即回應一個值,你就必須先在** Heap 記憶體中先將 Task 物件保存,然後再將該物件的記憶體位址參考放入 Stack 記憶體中**,而最後保存在變數中的資料其實是該物件的記憶體位址參考。因此,你要透過 Task 物件取得執行結果時,**會有一點點點點的額外開銷**(overhead)。
2. ValueTask<TResult> 結構
所有的結構(**struct**)都是一種實質型別 (Value Type),這意味著,當你在執行一個標示 async 的非同步方法時,若該方法會透過 ValueTask<TResult> 物件立即回應一個值,由於是實質型別的關係,這個 **ValueTask<TResult> 物件會直接儲存在 Stack 記憶體中進行操作。簡單來說,就是記憶體操作的效率比較好**!
But Task<TResult>在某些狀況下有cache,效能會比ValueTask更好
1. Task
2. Task<bool>
3. Task<int>, int = -1~9
```C#
public class InterlockedOperations
{
public class InterlockedOperations
{
public static async Task Main(string[] args)
{
BenchmarkRunner.Run<TaskAndValueTaskComparsion>();
}
}
[ShortRunJob]
public class TaskAndValueTaskComparsion
{
[Benchmark]
public ValueTask<int> RunValueTaskWithNew()
{
return new ValueTask<int>(1);
}
[Benchmark]
public Task<int> RunTaskFromResult()
{
return Task.FromResult(1);
}
}
}
```
看大神的[文章](https://blog.miniasp.com/post/2022/05/30/Understanding-Task-and-ValueTask)比較快
建議還是使用Task...要用ValueTask記得要做測試,確保效能真的有提昇,畢竟平行的世界,很多事和我們想的不一樣
注意事項:
* 非同步 => 同步 => 非同步…Request會直接hang住…也不會噴exception
* await是語法糖…exception會做拆包,能看到application的exception
* .Result不是語法糖,會execute一個task,excpetion會是AggregationException
{"metaMigratedAt":"2023-06-17T06:12:40.349Z","metaMigratedFrom":"YAML","title":"`平行處理與非同步`","breaks":true,"contributors":"[{\"id\":\"c0782b8a-7192-4ce3-9c0b-3bd7ea99403d\",\"add\":5895,\"del\":1425},{\"id\":\"19f60c0d-b0a6-47e5-aa1c-9bc551af5404\",\"add\":10221,\"del\":4889},{\"id\":\"8e5092b3-9c45-4e3c-a9ba-e0473bb94ea4\",\"add\":3276,\"del\":775},{\"id\":\"9a8297a3-e755-4bdb-84f3-282752333c4e\",\"add\":708,\"del\":141},{\"id\":\"4583e633-767a-4ce0-a9f4-fadca4482a91\",\"add\":3165,\"del\":352},{\"id\":\"f34520ad-a52e-45b9-9429-22d32484f107\",\"add\":4408,\"del\":517},{\"id\":\"9c21ccc7-ec01-488f-adf3-8f148e003880\",\"add\":3144,\"del\":147},{\"id\":\"97cb78c0-d8e2-43ff-9212-c37e8215594c\",\"add\":7500,\"del\":154}]"}