--- lang: ja-jp breaks: true --- # MagicOnion の gRPC 通信方式 2021-10-04 `Grpc.Core.CallInvoker.BlockingUnaryCall`メソッド以外は何らかの実装が行われている。 ### `MagicOnionClientBase`クラス `Grpc.Core.CallInvoker.AsyncUnaryCall`が使用されている。 ### `StreamingHubClientBase<TStreamingHub, TReceiver>`クラス `Grpc.Core.CallInvoker.AsyncDuplexStreamingCall`が使用されている。 ### `DynamicClientBuilder`クラス `DefineMethods`メソッド内で、`il` により `AsyncServerStreamingCall`、`AsyncClientStreamingCall`、`AsyncDuplexStreamingCall`メソッドが呼び出されている。 ## `AsyncUnaryCall` Unary RPCは、通常の関数呼び出しのように、クライアントがサーバーに1つのリクエストを送信し、1つのレスポンスを返します。 :::info **同一**チャネル・**同一**インターフェイス内の`Unary`メソッドはそれぞれ(同一・不同にかかわらず)独立して非同期で動作する。 ※ただし、`Unary`メソッドはステートレスであり、実装されている`ServiceBase`クラスはメソッドの呼び出し毎にインスタンス化される。 ::: ## `AsyncClientStreamingCall` > gRPC / MagicOnion 入門 (6) - Client Streaming 通信 > https://blog.xin9le.net/entry/2017/06/26/182646 クライアント・ストリーミングRPCでは、クライアントが一連のメッセージを書き込み、それをサーバーに送信しますが、これも提供されたストリームを使用します。クライアントは、メッセージの書き込みが終わると、サーバーがメッセージを読み込んで応答を返すのを待ちます。ここでもgRPCは、個々のRPCコール内でのメッセージの順序付けを保証しています。 インターフェイス ```csharp= public interface IMyFirstService : IService<IMyFirstService> { Task<ClientStreamingResult<string, string>> SplitUpload(); } ``` サーバ側 ```csharp= public async Task<ClientStreamingResult<string, string>> SplitUpload() { try { //--- クライアント側が WriteAsync するたびに呼び出される //--- CompleteAsync されるまでメッセージを受信し続ける var streaming = this.GetClientStreamingContext<string, string>(); string sum = ""; await streaming.ForEachAsync(x => { Console.WriteLine($"Received = {x}"); sum += x + ";"; }); //--- 結果を返す return streaming.Result(sum); } catch (Exception ex) { Console.WriteLine(ex.Message); throw; } finally { Console.WriteLine("SplitUpload() finally"); } } ``` クライアント側 ```csharp= static async Task CrientStream( IMyFirstService client ) { //--- WriteAsync するとサーバー側の ForEachAsync が動く var streaming = await client.SplitUpload(); foreach (var x in Enumerable.Range(1, 4)) { await streaming.RequestStream.WriteAsync(x.ToString()); } //--- 完了通知 //--- これによりサーバー側の ForEachAsync が終了する await streaming.RequestStream.CompleteAsync(); //--- サーバーからの結果を取得 var response = await streaming.ResponseAsync; Console.WriteLine($"Response = {response}"); } ``` ## `AsyncServerStreamingCall` > gRPC / MagicOnion 入門 (5) - Server Streaming 通信 > https://blog.xin9le.net/entry/2017/06/19/210145 サーバーストリーミングRPCでは、クライアントがサーバーにリクエストを送信し、一連のメッセージを読み返すストリームを取得します。gRPCは、個々のRPCコール内でのメッセージの順序付けを保証します。 インターフェイス ```csharp= public interface IMyFirstService : IService<IMyFirstService> { Task<ServerStreamingResult<string>> OnReceive(); // push 配信によるメッセージ受信 } ``` サーバ側 ```csharp= public async Task<ServerStreamingResult<string>> OnReceive() { //--- WriteAsync するたびにレスポンスが返る var streaming = this.GetServerStreamingContext<string>(); foreach (var x in Enumerable.Range(1, 100)) { await streaming.WriteAsync(x.ToString()); } //--- 完了信号を返す return streaming.Result(); } ``` クライアント側 ```csharp= //--- サーバーが WriteAsync すると ForEachAsync が動く //--- サーバーから完了信号が送られると ForEachAsync が終了する var streaming = await client.OnReceive(); await streaming.ResponseStream.ForEachAsync(async x => Console.WriteLine("Result : [" + x + "]") ); ``` ## `AsyncDuplexStreamingCall` > gRPC / MagicOnion 入門 (7) - Duplex Streaming 通信 > https://blog.xin9le.net/entry/2017/07/03/180734 > Core concepts, architecture and lifecycle > https://grpc.io/docs/what-is-grpc/core-concepts/ > 双方向ストリーミングRPCは、双方が読み書き可能なストリームを使用して一連のメッセージを送信するものです。2つのストリームは独立して動作するため、クライアントとサーバーは好きな順番で読み書きすることができます。例えば、サーバーはクライアントのメッセージをすべて受信するのを待ってから応答を書き込むこともできますし、メッセージを交互に読んでから書き込むこともできますし、その他の読み書きの組み合わせも可能です。**各ストリーム内のメッセージの順序は保持されます。** :::warning 「`各ストリーム内のメッセージの順序は保持されます。`」という事は、つまりクライアント側からサーバ側への異なる内容の複数の送信をそれぞれ独立して非同期で処理出来ないという事。 ::: インターフェイス ```csharp= public interface IMyFirstService : IService<IMyFirstService> { Task<DuplexStreamingResult<string, string>> DuplexSample(); } ``` サーバ側 ```csharp= public async Task<DuplexStreamingResult<string, string>> DuplexSample() { try { var streaming = this.GetDuplexStreamingContext<string, string>(); var task = streaming.ForEachAsync(async x => { //--- クライアントから送信された値が偶数だったら 2 倍にして返してみたり Console.WriteLine($"Received : {x}"); //if (x % 2 == 0) { await streaming.WriteAsync(x + " " + x); } }); //--- サーバー側から任意のタイミングで送信してみたり await Task.Delay(100); // テキトーにずらしたり await streaming.WriteAsync("123"); await streaming.WriteAsync("456"); //--- メッセージの受信がすべて終わるまで待つ await task; //--- サーバーからの送信が完了したことを通知 return streaming.Result(); } catch (Exception ex) { Console.WriteLine(ex.Message); throw; } finally { Console.WriteLine("DuplexSample() finally"); } } ``` クライアント側 ```csharp= { var streaming = await client.DuplexSample(); var task = streaming.ResponseStream.ForEachAsync(async x => Console.WriteLine("DuplexSample : [" + x + "]") ); //--- WriteAsync でサーバー側にメッセージを送信 //--- CompleteAsync で送信完了を通知 foreach (var x in Enumerable.Range(0, 5)) { await streaming.RequestStream.WriteAsync(x.ToString()); } Console.WriteLine("streaming.RequestStream.CompleteAsync() の実行を待機します。"); Console.ReadLine(); await streaming.RequestStream.CompleteAsync(); //--- メッセージの受信完了を待機 await task; } ``` ## `StreamingHubBase` :::warning 内部で、gRPCの `AsyncDuplexStreamingCall` を呼び出すことで、SignalR のHubと同様の機能を実現した、`MagicOnion`の独自機能。 ※`AsyncDuplexStreamingCall`が使用されているため、`.NET Framework`では利用出来ない。 ::: インターフェイス ```csharp= public interface ITimerHub : IStreamingHub< ITimerHub, ITimerHubReceiver > { Task SetAsync( TimeSpan interval, string userid ); Task Stop(); } ``` ```csharp= public interface ITimerHubReceiver { void OnTick(string message); } ``` サーバ側 ```csharp= public class TimerHub : StreamingHubBase< ITimerHub, ITimerHubReceiver >, ITimerHub { private Task _timerLoopTask; private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private TimeSpan _interval = TimeSpan.FromSeconds(1); private IGroup _group; private Guid guidValue = Guid.NewGuid(); public async Task SetAsync( TimeSpan interval, string userid ) { if (_timerLoopTask != null) { throw new InvalidOperationException("The timer has been already started."); } string connectionId = ""; connectionId = ConnectionId.ToString(); _group = await this.Group.AddAsync(connectionId); _interval = interval; _timerLoopTask = Task.Run(async () => { while (_cancellationTokenSource.IsCancellationRequested == false) { await Task.Delay(_interval, _cancellationTokenSource.Token); //IIdentity identity = Context.GetPrincipal().Identity; string strGUIX = guidValue.ToString(); string strUserId = userid; string strName = "aaaaaaaaaa"; string message = $"ServerID={strGUIX}; UserId={strUserId}; Name={strName}"; Console.WriteLine(message); BroadcastToSelf(_group).OnTick(message); } Console.WriteLine("SetAsync completed."); }); } public async Task Stop() { Console.WriteLine("TimerHub Stop()"); _cancellationTokenSource.Cancel(); } protected override ValueTask OnDisconnected() { Console.WriteLine("TimerHub OnDisconnected()"); _cancellationTokenSource.Cancel(); return base.OnDisconnected(); } } ``` クライアント側 ```csharp= class Program : ITimerHubReceiver { static async Task Main(string[] args) { ・・・ ITimerHub timerHubClient = await program.ProcessStreamingHub(channel, option); Console.ReadLine(); //await timerHubClient.Stop(); await timerHubClient.DisposeAsync(); await timerHubClient.WaitForDisconnect(); ・・・ } private async Task<ITimerHub> ProcessStreamingHub( GrpcChannel channel, MessagePackSerializerOptions option ) { try { ITimerHub timerHubClient = await StreamingHubClient.ConnectAsync< ITimerHub, ITimerHubReceiver >( channel : channel, receiver : this, serializerOptions : option ); await timerHubClient.SetAsync( interval: TimeSpan.FromSeconds(1), userid : Guid.NewGuid().ToString() ); await Task.Yield(); // NOTE: Release the gRPC's worker thread here. return timerHubClient; } catch (Exception ex) { Console.WriteLine(ex); throw; } finally { Console.WriteLine("ProcessStreamingHub finally."); } } void ITimerHubReceiver.OnTick(string message) { Console.WriteLine(message + " [" + Thread.CurrentThread.ManagedThreadId + "]"); } } ``` :::info `timerHubClient.DisposeAsync()` を実行すると、サーバ側の `OnDisconnected()` が呼び出される。 ::: ###### tags: `MagicOnion` `gRPC` `通信方式` `ASP.NET Core`