--- lang: ja-jp breaks: true --- # `ASP.NET Core` `gRPC` サーバの `ServerStreaming` で「InvalidOperationException: Reading is already in progress.」エラーが発生する 2022-02-06 ## `gRPC` `ServerStreaming` サーバに対してクライアント側から複数回連続でリクエストを送ると発生する。 以前は、発生していなかったはずだが。。。バグらせたか。。。 ```= Reading is already in progress. -------- at System.IO.Pipelines.ThrowHelper.ThrowInvalidOperationException_AlreadyReading() at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadSingleMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func`2 deserializer) at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() -------- System.InvalidOperationException: Reading is already in progress. at System.IO.Pipelines.ThrowHelper.ThrowInvalidOperationException_AlreadyReading() at System.IO.Pipelines.Pipe.GetReadResult(ReadResult& result) at System.IO.Pipelines.Pipe.GetReadAsyncResult() at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.Http2MessageBody.ReadAsync(CancellationToken cancellationToken) at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token) at Grpc.AspNetCore.Server.Internal.PipeExtensions.ReadSingleMessageAsync[T](PipeReader input, HttpContextServerCallContext serverCallContext, Func`2 deserializer) at Grpc.AspNetCore.Server.Internal.CallHandlers.ServerStreamingServerCallHandler`3.HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext) at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() ``` ## ネットを調査 > Getting "System.InvalidOperationException: Reading is already in progress" With BiDirectional Streaming #658 > https://github.com/grpc/grpc-dotnet/issues/658 ## クライアント側からの終了通知より前にサービスメソッドが完了すると発生するようだ 以下のようにすると発生する。 :::warning エラーメッセージは`ServerStreaming`で発生しているが、根本原因は`DuplexStreaming`で発生していた。同一のクライアントチャネルを使用していたために`ServerStreaming`のエラーとして出力されていたようだ。 ::: #### サービス側 ```csharp= public async Task<DuplexStreamingResult<byte[], byte[]>> CPsRes() { DuplexStreamingContext<byte[], byte[]> streaming = base.GetDuplexStreamingContext<byte[], byte[]>(); Task tsk = ReceiveRequests(streaming); DuplexStreamingResult<byte[], byte[]> result = streaming.Result(); return result; } ConcurrentQueue<byte[]> m_queRequests = new(); public async Task ReceiveRequests( DuplexStreamingContext<byte[], byte[]> streaming ) { using (Task tsk = streaming.MyForEachAsync(async req => { if (this.m_queRequests != null) { this.m_queRequests.Enqueue(req); } })) { await tsk .ConfigureAwait(false); } } ``` #### クライアント側 ```csharp= static async Task CPsResReciverAsync( IMyFirstService callback_async, int number ) { DuplexStreamingResult<byte[]/* res */, byte[]> stream; stream = await callback_async.CPsRes() .ConfigureAwait(false); try { using (Task tsk = stream.ResponseStream.MyForEachAsync(async res => { byte[] bytes = await CPsRes(res, number) .ConfigureAwait(false); await stream.RequestStream.WriteAsync(bytes) .ConfigureAwait(false); })) { // サーバ側の完了を待つ await tsk.ConfigureAwait(false); } } catch (Exception ex) { Console.WriteLine(ex); } finally { stream.Dispose(); } } static async Task<byte[]> CPsRes(byte[] res, int number) { //Console.WriteLine($"[{number}] {res}"); byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x00 }; return bytes; } ``` ## 以下のようにすると正常に動作する :::info クライアント側から確実に`stream.RequestStream.CompleteAsync()`を呼び出す事がポイント!! ::: #### サービス側 ```csharp= public async Task<DuplexStreamingResult<byte[], byte[]>> CPsRes() { DuplexStreamingContext<byte[], byte[]> streaming = base.GetDuplexStreamingContext<byte[], byte[]>(); Task tsk = ReceiveRequests(streaming); // クライアント側からの完了を待つ await tsk.ConfigureAwait(false); DuplexStreamingResult<byte[], byte[]> result = streaming.Result(); return result; } ConcurrentQueue<byte[]> m_queRequests = new(); public async Task ReceiveRequests( DuplexStreamingContext<byte[], byte[]> streaming ) { using (Task tsk = streaming.MyForEachAsync(async req => { if (this.m_queRequests != null) { this.m_queRequests.Enqueue(req); } })) { await tsk .ConfigureAwait(false); } } ``` #### クライアント側 ```csharp= static async Task CPsResReciverAsync( IMyFirstService callback_async, int number ) { DuplexStreamingResult<byte[]/* res */, byte[]> stream; stream = await callback_async.CPsRes() .ConfigureAwait(false); CountdownEvent countdownEvent = new CountdownEvent(1); try { Task _ = Task.Run(async () => { countdownEvent.Wait(); // `CompleteAsync` で送信完了を通知 await stream.RequestStream.CompleteAsync() .ConfigureAwait(false); }); using (Task tsk = stream.ResponseStream.MyForEachAsync(async res => { byte[] bytes = await CPsRes(res, number) .ConfigureAwait(false); await stream.RequestStream.WriteAsync(bytes) .ConfigureAwait(false); })) { // 完了 countdownEvent.Signal(); // サーバ側の完了を待つ await tsk.ConfigureAwait(false); } } catch (Exception ex) { Console.WriteLine(ex); } finally { countdownEvent?.Dispose(); stream.Dispose(); } } static async Task<byte[]> CPsRes(byte[] res, int number) { //Console.WriteLine($"[{number}] {res}"); byte[] bytes = new byte[] { 0x00, 0x00, 0x00, 0x00 }; return bytes; } ``` ###### tags: `gRPC` `ASP.NET Core` `ServerStreaming` `InvalidOperationException: Reading is already in progress.` `MagicOnion` `C#`