# Azure Event Grid 筆記 --- ## 介紹 Azure Event Grid 是一個訊息佇列服務,開發者可以很簡易的建立一個事件驅動的系統。Azure 提供的多種服務當中也採用了 Event Grid 作為事件觸發的處理架構。 --- ### Azure Event Grid 類型 --- - System topics: 可針對 Azure Event 做自動化處理 ![image](https://gist.github.com/assets/75846914/7d6e255e-1d01-4bbd-a2b9-933044d84b1b) --- - Topics: 你可以使用主題來發布和訂閱你的應用程式事件 ![image](https://gist.github.com/assets/75846914/8b5d4d45-796e-4bd9-b865-b1d71c366423) --- - Namespaces: 它可以啟用 MQTT broker 功能,並在大規模下進行拉取和推送交付。 ![image](https://gist.github.com/assets/75846914/756bac9c-16b7-4e7e-a1b5-01f6c00831e2) Note: - [Azure Event Grid:叡揚部落格](https://www.gss.com.tw/blog/azure-event-grid) - [Azure Messaging: When to use What and Why? Post 2 | by Joseph Masengesho | Slalom Technology | Medium](https://medium.com/slalom-technology/azure-messaging-when-to-use-what-and-why-post-2-81d164cc838e) --- # EventGrid Topics ![image](https://gist.github.com/assets/75846914/acfdb072-f801-47db-9252-24086220177d) --- ## Webhook 驗證 - Header 帶有 WebHook-Allowed-Origin 和 WebHook-Allowed-Rate 回應 HTTP Status Code 200 - 驗證API呼叫完成,呼叫 WebHook-Request-Callback。 --- ### 第一種驗證方法 ```csharp [7-9|13-14] [HttpOptions] public async Task<IActionResult> Options() { using (var reader = new StreamReader(Request.Body, Encoding.UTF8)) { // Retrieve the validation header fields var webhookRequestOrigin = HttpContext.Request.Headers["WebHook-Request-Origin"].FirstOrDefault(); var webhookRequestCallback = HttpContext.Request.Headers["WebHook-Request-Callback"]; var webhookRequestRate = HttpContext.Request.Headers["WebHook-Request-Rate"]; // Respond with the appropriate origin and allowed rate to // confirm acceptance of incoming notications HttpContext.Response.Headers.Add("WebHook-Allowed-Rate", "*"); HttpContext.Response.Headers.Add("WebHook-Allowed-Origin", webhookRequestOrigin); } return Ok(); } ``` --- ### APIM 驗證 Webhook 作法 ```xml [12-21] <inbound> <base /> <!– Get the WebHook-Request-Origin –> <set-variable value="@(context.Request.Headers.GetValueOrDefault("WebHook-Request-Origin"))" name="webhookRequestOrigin" /> <!– Return the response with the allowed origin and allowed rate to confirm the subscription. –> <return-response> <set-status code="200" reason="OK" /> <set-header name="WebHook-Allowed-Origin" exists-action="override"> <value>@((string)context.Variables["webhookRequestOrigin"])</value> </set-header> <set-header name="WebHook-Allowed-Rate" exists-action="override"> <value>*</value> </set-header> <set-body /> </return-response> </inbound> ``` Note: * [WebHooks with Azure Event Grid and CloudEvents v1.0 – Made of Strings](https://madeofstrings.com/2020/01/21/webhooks-with-azure-event-grid-and-cloudevents-v1-0/) --- ### 第二種 WebHook-Request-Callback 驗證 ![image](https://gist.github.com/assets/75846914/d006fb9e-59ae-4a23-a8cc-3a5e6e21ac92) --- ## 採雷 驗證前會試 `AwaitingManualAction` 狀態,5 分鐘會還原資源或刪除資源。 ![image](https://gist.github.com/assets/75846914/c3b0d923-9b09-4ac3-9a63-f7fdc3deee71) --- ## 發送事件 ```bash [|2] curl --location 'https://eventgridtest.japaneast-1.eventgrid.azure.net/api/events' \ --header 'aeg-sas-key: {{key}}' \ --header 'ce-id: 111131' \ --header 'ce-source: foo' \ --header 'ce-type: maintenanceRequested' \ --header 'ce-specversion: 1.0' \ --header 'Content-Type: application/json' \ --data '{ "source": "foo", "id": "111131", "type": "maintenanceRequested", "subject": "myapp/vehicles/diggers", "time": "2018-10-30T21:03:07+00:00", "data": { "make": "Contoso", "model": "Small Digger" }, "specversion": "1.0" }' ``` --- ## Azure EventGrid 發送到 Worker 訊息 ```bash curl -X 'POST' 'https://webhook.site/9d116996-bc2c-44e5-8f66-f0a5fb0b5017' \ -H 'connection: close' \ -H 'host: webhook.site' \ -H 'content-length: 287' \ -H 'aeg-event-type: Notification' \ -H 'aeg-metadata-version: 1' \ -H 'aeg-data-version: ' \ -H 'aeg-delivery-count: 0' \ -H 'aeg-subscription-name: TEST2' \ -H 'webhook-request-origin: eventgrid.azure.net' \ -H 'content-type: application/cloudevents+json; charset=utf-8' \ -d $'{"id":"111131","source":"foo","type":"maintenanceRequested","specversion":"1.0","data":{"source":"foo","id":"111131","type":"maintenanceRequested","subject":"myapp/vehicles/diggers","time":"2018-10-30T21:03:07+00:00","data":{"make":"Contoso","model":"Small Digger"},"specversion":"1.0"}}' ``` --- # EventGrid Namespaces --- ## 發送事件 ```csharp [1-4|7|10-11|15-19] var topicEndpoint = Util.GetPassword("AzureTopicEndpoint"); // Should be in the form: https://namespace01.eastus-1.eventgrid.azure.net. var topicKey = Util.GetPassword("AzureTopicKey"); var topicName = Util.GetPassword("AzureTopicName"); var subscription = Util.GetPassword("AzureSubscription"); // Construct the client using an Endpoint for a namespace as well as the access key var client = new EventGridClient(new Uri(topicEndpoint), new AzureKeyCredential(topicKey)); // Publish a single CloudEvent using a custom TestModel for the event data. var @ev = new CloudEvent("employee_source", "type", new TestModel { Name = "Bob", Age = 1999 }); await client.PublishCloudEventAsync(topicName, ev); // //// Publish a batch of CloudEvents. // await client.PublishCloudEventsAsync( topicName, new[] { new CloudEvent("employee_source", "type", new TestModel { Name = "Tom", Age = 55 }), new CloudEvent("employee_source22", "type", new TestModel { Name = "Alice", Age = 25 })}).Dump(); Console.WriteLine("Three events have been published to the topic. Press any key to end the application."); public class TestModel { public string Name { get; set; } public int Age { get; set; } } ``` https://share.linqpad.net/b76q9dhq.linq --- ## 接收事件 ```csharp [9|71|50|92] var topicEndpoint = Util.GetPassword("AzureTopicEndpoint"); // Should be in the form: https://namespace01.eastus-1.eventgrid.azure.net. var topicKey = Util.GetPassword("AzureTopicKey"); var topicName = Util.GetPassword("AzureTopicName"); var subscription = Util.GetPassword("AzureSubscription"); // Construct the client using an Endpoint for a namespace as well as the access key var client = new EventGridClient(new Uri(topicEndpoint), new AzureKeyCredential(topicKey)); int i = 0; ReceiveResult r = await client.ReceiveCloudEventsAsync(topicName, subscription, 20); // handle received messages. Define these variables on the top. var toRelease = new List<string>(); var toAcknowledge = new List<string>(); var toReject = new List<string>(); // Iterate through the results and collect the lock tokens for events we want to release/acknowledge/result foreach (ReceiveDetails detail in r.Value) { CloudEvent @event = detail.Event; BrokerProperties brokerProperties = detail.BrokerProperties; Console.WriteLine(@event.Data.ToString()); // The lock token is used to acknowledge, reject or release the event Console.WriteLine(brokerProperties.LockToken); Console.WriteLine(); // If the event is from the "employee_source" and the name is "Bob", we are not able to acknowledge it yet, so we release it if (@event.Source == "employee_source" && @event.Data.ToObjectFromJson<TestModel>().Name == "Bob") { toRelease.Add(brokerProperties.LockToken); } // acknowledge other employee_source events else if (@event.Source == "employee_source") { toAcknowledge.Add(brokerProperties.LockToken); } // reject all other events else { toReject.Add(brokerProperties.LockToken); } } // Release/acknowledge/reject the events if (toRelease.Count > 0) { ReleaseResult releaseResult = await client.ReleaseCloudEventsAsync(topicName, subscription, new ReleaseOptions(toRelease)); // Inspect the Release result Console.WriteLine($"Failed count for Release: {releaseResult.FailedLockTokens.Count}"); foreach (FailedLockToken failedLockToken in releaseResult.FailedLockTokens) { Console.WriteLine($"Lock Token: {failedLockToken.LockToken}"); Console.WriteLine($"Error Code: {failedLockToken.Error}"); Console.WriteLine($"Error Description: {failedLockToken.ToString}"); } Console.WriteLine($"Success count for Release: {releaseResult.SucceededLockTokens.Count}"); foreach (string lockToken in releaseResult.SucceededLockTokens) { Console.WriteLine($"Lock Token: {lockToken}"); } Console.WriteLine(); } if (toAcknowledge.Count > 0) { AcknowledgeResult acknowledgeResult = await client.AcknowledgeCloudEventsAsync(topicName, subscription, new AcknowledgeOptions(toAcknowledge)); // Inspect the Acknowledge result Console.WriteLine($"Failed count for Acknowledge: {acknowledgeResult.FailedLockTokens.Count}"); foreach (FailedLockToken failedLockToken in acknowledgeResult.FailedLockTokens) { Console.WriteLine($"Lock Token: {failedLockToken.LockToken}"); Console.WriteLine($"Error Code: {failedLockToken.Error}"); Console.WriteLine($"Error Description: {failedLockToken.ToString}"); } Console.WriteLine($"Success count for Acknowledge: {acknowledgeResult.SucceededLockTokens.Count}"); foreach (string lockToken in acknowledgeResult.SucceededLockTokens) { Console.WriteLine($"Lock Token: {lockToken}"); } Console.WriteLine(); } if (toReject.Count > 0) { RejectResult rejectResult = await client.RejectCloudEventsAsync(topicName, subscription, new RejectOptions(toReject)); // Inspect the Reject result Console.WriteLine($"Failed count for Reject: {rejectResult.FailedLockTokens.Count}"); foreach (FailedLockToken failedLockToken in rejectResult.FailedLockTokens) { Console.WriteLine($"Lock Token: {failedLockToken.LockToken}"); Console.WriteLine($"Error Code: {failedLockToken.Error}"); Console.WriteLine($"Error Description: {failedLockToken.ToString}"); } Console.WriteLine($"Success count for Reject: {rejectResult.SucceededLockTokens.Count}"); foreach (string lockToken in rejectResult.SucceededLockTokens) { Console.WriteLine($"Lock Token: {lockToken}"); } Console.WriteLine(); } Console.WriteLine("Received Response"); Console.WriteLine("-----------------"); public class TestModel { public string Name { get; set; } public int Age { get; set; } } ``` https://share.linqpad.net/e5kdvmeu.linq Note: - 接收事件: ReceiveCloudEventsAsync - 確認(完成)事件: AcknowledgeCloudEventsAsync - 釋放事件: ReleaseCloudEventsAsync 釋放事件以使它們可用於重新傳遞。與確認事件類似 - 拒絕事件: RejectCloudEventsAsync 拒絕您的消費者應用程式無法處理的事件。拒絕事件的條件包括無法解析的格式錯誤的事件或處理事件的應用程式出現問題。 --- ## 備註 - Pull delivery 事件一分鐘沒收到 Ack 會重發事件,一分鐘後做 Ack 會發生錯誤,會跑出 Ack Token 過期。 - 事件預設 Retry 10 次,保留7天。 - Release 會重做事件,也是有 Retry 10 次限制。 Note: - Pull delivery 事件一分鐘沒收到 Ack 會重發事件,一分鐘後做 Ack 會發生錯誤,會跑出 Ack Token 過期。這個特性確保了系統在一定時間內收到事件的確認。如果超過這個時間還沒有收到確認,系統會認為事件處理失敗,並重新發送事件。 - 事件預設 Retry 10 次,保留 7 天。這些特性確保了即使事件處理失敗,系統也會嘗試重新處理事件。如果超過重試次數或保留時間,事件將被丟棄。 - Release 會重做事件,也是有 Retry 10 次限制。這個特性確保了即使在 Release 過程中發生錯誤,系統也會嘗試重新處理事件。 --- ## 為什麼同一個方法可以定義給不同型別變數? ![image](https://gist.github.com/assets/75846914/f814f38d-9f1a-43f4-aa25-495a1a64bd2a) ![image](https://gist.github.com/assets/75846914/ff167008-9f64-49da-8333-0330faa07c4e) --- ## 定義隱含轉換的轉型 ![image](https://gist.github.com/assets/75846914/96ca15ab-d67d-4b1b-9c8d-c4ac395890e8) [C# 的 explicit 與 implicit 關鍵字 | Ron 2.0](https://ronsun.github.io/content/20170924-explicit-implicit-keywords.html) --- ## EventGrid Namespace 建立事件&訂閱 --- ### Package **Azure.ResourceManager.EventGrid v1.1.0-beta.4** --- ### 使用前置作業 有關權限的設定: 1. 在 App registration 裡註冊 EventGridNamespace App(EventGridRegistration),建立ClientId、ClientSecret 2. 在EventGridNamespace App給予EventGridRegistration EventGrid Contributor的權限 --- ### 建立和刪除 Topics ```csharp string tenantId = Util.GetPassword("AzureTenantId"); string clientId = Util.GetPassword("AzureClientId"); string clientSecret = Util.GetPassword("AzureClientSecret"); var subscriptionId = Util.GetPassword("AzureSubscriptionId"); var resourceGroupName = Util.GetPassword("AzureResourceGroupName"); var namespaceName = "EventGridNamespace"; // Create a new ArmClient with your credentials ClientSecretCredential credential = new ClientSecretCredential(tenantId, clientId, clientSecret); ArmClient client = new ArmClient(credential, subscriptionId); // Define the topic resource ID ResourceIdentifier eventGridNamespaceResourceId = EventGridNamespaceResource.CreateResourceIdentifier(subscriptionId, resourceGroupName, namespaceName); EventGridNamespaceResource eventGridNamespace = client.GetEventGridNamespaceResource(eventGridNamespaceResourceId); // get the collection of this NamespaceTopicResource NamespaceTopicCollection collection = eventGridNamespace.GetNamespaceTopics(); // invoke the operation string topicName = "examplenamespacetopic2"; NamespaceTopicData data = new NamespaceTopicData() { PublisherType = PublisherType.Custom, InputSchema = EventInputSchema.CloudEventSchemaV10, EventRetentionInDays = 1, }; ArmOperation<NamespaceTopicResource> lro = await collection.CreateOrUpdateAsync(WaitUntil.Completed, topicName, data); NamespaceTopicResource result = lro.Value; // the variable result is a resource, you could call other operations on this instance as well // but just for demo, we get its data from this resource instance NamespaceTopicData resourceData = result.Data; // for demo we just print out the id Console.WriteLine($"Succeeded on id: {resourceData.Id}"); Console.WriteLine("等待 20 秒做刪除動作...."); await Task.Delay(20000); Console.WriteLine($"開始做刪除 Topics: {topicName}"); var topicIdentity = NamespaceTopicResource.CreateResourceIdentifier(subscriptionId, resourceGroupName, namespaceName, topicName); client.GetNamespaceTopicResource(topicIdentity); var dr = result.Delete(WaitUntil.Completed).Dump(); Console.WriteLine($"Delete Topicss: HasCompleted: {dr.HasCompleted}"); ``` --- ### 建立和刪除 Subscription ```csharp // Authenticate string tenantId = Util.GetPassword("AzureTenantId"); string clientId = Util.GetPassword("AzureClientId"); string clientSecret = Util.GetPassword("AzureClientSecret"); var subscriptionId = Util.GetPassword("AzureSubscriptionId"); var resourceGroupName = Util.GetPassword("AzureResourceGroupName"); var namespaceName = "EventGridNamespace"; var topicName = "test"; ClientSecretCredential credential = new ClientSecretCredential(tenantId, clientId, clientSecret); ArmClient client = new ArmClient(credential, subscriptionId); ResourceIdentifier namespaceTopicResourceId = NamespaceTopicResource.CreateResourceIdentifier(subscriptionId, resourceGroupName, namespaceName, topicName); NamespaceTopicResource namespaceTopic = client.GetNamespaceTopicResource(namespaceTopicResourceId); namespaceTopic.Dump(); NamespaceTopicEventSubscriptionCollection collection = namespaceTopic.GetNamespaceTopicEventSubscriptions(); NamespaceTopicEventSubscriptionData data = new NamespaceTopicEventSubscriptionData() { DeliveryConfiguration = new DeliveryConfiguration() { DeliveryMode = DeliveryMode.Queue, Queue = new QueueInfo() { ReceiveLockDurationInSeconds = 60, MaxDeliveryCount = 4, EventTimeToLive = XmlConvert.ToTimeSpan("P1D"), }, }, EventDeliverySchema = DeliverySchema.CloudEventSchemaV10, }; var eventSubscriptionName = "eventSubscriptionName2"; collection.CreateOrUpdate(WaitUntil.Completed, eventSubscriptionName, data).Dump(); Console.WriteLine("等待 20 秒做刪除動作...."); await Task.Delay(20000); Console.WriteLine($"開始做刪除 Topics: {topicName}"); ResourceIdentifier namespaceTopicEventSubscriptionResourceId = NamespaceTopicEventSubscriptionResource.CreateResourceIdentifier(subscriptionId, resourceGroupName, namespaceName, topicName, eventSubscriptionName); NamespaceTopicEventSubscriptionResource namespaceTopicEventSubscription = client.GetNamespaceTopicEventSubscriptionResource(namespaceTopicEventSubscriptionResourceId); await namespaceTopicEventSubscription.DeleteAsync(WaitUntil.Completed); ``` --- End Note: 其他 ## Azure Message Queue 差異 [azureservicebus - Message bus vs. Service bus vs. Event hub vs Event grid - Stack Overflow](https://stackoverflow.com/questions/57740782/message-bus-vs-service-bus-vs-event-hub-vs-event-grid) ## API 觸發事件處理 * [Azure 事件方格 命名空間主題的訂閱者作業 - Azure Event Grid | Microsoft Learn](https://learn.microsoft.com/zh-tw/azure/event-grid/subscriber-operations) https://learn.microsoft.com/zh-tw/azure/event-grid/authenticate-with-access-keys-shared-access-signatures https://learn.microsoft.com/en-us/azure/event-grid/namespace-push-delivery-overview https://learn.microsoft.com/en-us/azure/event-grid/choose-right-tier