# ReminderService ### Problem Statement: Currently, we have a stable Actors framework in Dapr. As a part of it, we have Reminders that can be created. In Dapr Workflow, we are using these Reminders to invoke Workflow/Activities. Thus, to Scale out Workflow instances, we need to scale out these reminders’ creation/deletion. Currently, reminder creation/deletion is not scalable. Also, Current Reminder Partition is not working as expected for concurrent reminders creation/deletion in capacity of around consistently 1k+ creation/deletion requests. Ref: Workflow errors out on more than 2 app instances · Issue #6896 · dapr/dapr (github.com) ### Scope Of This Design: This design aims to solve the Reminder Creation/Deletion issue in broader way i.e. by taking Reminders out of the purview of different sidecar instances and centralizing the operations in the way of a new Control Plane Service - Reminder Service. So, Reminder will come out of scope of Actors and Actors can use Reminders, just like any other system, like pubsub or user App directly may be able to use Reminders. ### Evolution of Design: The thought process behind design is simple that we take out responsibility of Reminder Lifecycle from Dapr Sidecar to a central Service. ![p1.png](https://hackmd.io/_uploads/BJEaxuzQT.png) Which would essentially mean the following: ![p1.png](https://hackmd.io/_uploads/S1plWufm6.png) Going further, to Scale out Reminder Service, we need separate buckets for them to work on, for which we use the concept of Token Bucket. ![p1.png](https://hackmd.io/_uploads/S1aLEtfQp.png) The ReminderDb used here can be of any type: SQL or NoSQL, as we keep key-value pairs only to maintain different entities. **Before we go any further,** we need to understand the underlying Token Concept used in this design. ### Token Concept: Intent is to create some buckets of tokens or Ids to be distributed. These buckets can then be allocated to Sidecar instances and Reminder Service instances, for Reminder creation and execution. As a very first step, we create an internal nomenclature for app-id. This helps us to become agnostic of app-id in our keys and implementation. #### Table 0 | App-Id | TokenBucket Prefix | | -------- | -------- | | orderapp | B | | payment-app | AE | With these prefixes in place, we assign a token bucket for each sidecar instance. #### Now, what is a Token Bucket? A token bucket means that the tokens will be allotted in a particular range only. So, if a token bucket range is, let’s say, 100001 to 20000, then we will start assigning token 100001, then 100002, then 100003 and so on, till 200000 - for new reminders creation. Thus, these tokens will be used in assigning keys for new Reminders creation. So, if a sidecar instance is allocated a token bucket and if that sidecar instance reaches the exhaustion point of tokens inside this token bucket, then a new token bucket will be assigned to it with an increased range. So, let’s say, if there are 3 sidecar instances and ranges 1 - 100000, 100001 - 200000 and 200001 - 300000 are already assigned for these 3 sidecar instances and 1 sidecar with range 100001 - 200000 runs out of tokens (in code, approaches running out of tokens, let’s say 95% are consumed already), then a new token bucket with range 300001 - 400000 will be created and this sidecar instance be provided with this token bucket, so that it starts using new token bucket, once previous one exhausts. In case, there are already more token buckets created, as compared to number of sidecar instances - these will be distributed in round robin way, when the system comes up. #### Table 1 | Token Bucket | Token Range | | -------- | -------- | | B1 | 1 - 100000 | | B2 | 100001 - 200000 | | B3 | 200001 - 300000 | | B4 | 300001 - 400000 | | AE1 | 1 - 100000 | These buckets will be assigned for sidecar instances and for Reminders Service. Let’s say there are 3 Sidecar instances for orderapp (B) and 1 Sidecar instance for payment-app (AE) ; and 2 Reminders Service instances. So, assignments can be like this: #### Table 2 | Token Bucket | Sidecar Instance IP | Reminders Service Instance IP | | -------- | -------- | -------- | | B1 | B sidecar Instance IP1 | Reminder Service Instance IP1 | | B2 | B sidecar Instance 2 | Reminder Service Instance IP2 | | B3 | B sidecar Instance IP3 | Reminder Service Instance IP1 | | B4 | B sidecar Instance IP1 | Reminder Service Instance IP2 | | AE1 | AE sidecar Instance IP1 | Reminder Service Instance IP2 | In addition, to the token, we also need to ensure that what happens, if we run out of All possible token buckets (due to int32 range) i.e. if we approach **2147483647** or int32 limit. (It gave us **21474** token buckets in a single int32 range iteration) So, to ensure that we are able to iterate again and again through our int32 range, we also use a random 8-digit vector for 1 iteration of int32 range. So, table 1 would look finally something like this: #### Table 1 | Token Bucket | Token Range | Vector| | -------- | -------- |-------- | | B1 | 1 - 100000 | drftghrq| | B2 | 100001 - 200000 |drftghrq | | B3 | 200001 - 300000 | drftghrq| | B4 | 300001 - 400000 |drftghrq | | AE1 | 1 - 100000 | drftghrq| Thus, Id of a Reminder will be formed like: `reminder||<tokenBucket-prefix>||<token>||<8digit-vector> ` If app-id is orderapp and token is 200143, so reminder id would be `“reminder||B||200143||drftghrq” ` ![p1.png](https://hackmd.io/_uploads/SJkVEOG7a.png) ### Sequence Of Events for Sidecar Registration and Reminder Operations * At the very beginning, Reminders Service instances elect a leader. Here, by default kubernetesKubernetes leader election can be used in the first version. In future version, we can start supporting self-hosted as well by using an optional third partythird-party library. We can decide on which third party library to be used. * In Parallel, Sidecar instances start connecting with any of the Reminders Service through ConnectSidecar gRPC stream. They also call RegisterSidecar in the beginning, specifying their app-id and address details. * If Sidecars don’t have any other message to send on ConnectSidecar gRPC stream, then they will keep sending an empty message on ConnectSidecar, every 2 seconds (can be made configurable). This will be used to understand the health of sidecar by Reminder Service and to understand, if Sidecar is down or facing issues. * Information about all available sidecar instances is propagated to Reminder Service leader instance. We can use something like https://stackoverflow.com/a/64053472/18848251 * This leader queries db for key `tokenBucket||consumed`. And, allocates sidecar instances and Reminder Service Instances for these buckets. That is it forms Table 2. The rows of this Table are stored in db against keys in form of `tokenBucket||allocations||<tokenBucket_id>`. ```Key: tokenBucket||consumed Value: { "tokenBuckets":[ {"id":"B1", "rangeFrom":"1", "rangeTo":"100000", "vector":"drftghrq"}, {"id":"B2", "rangeFrom":"100001", "rangeTo":"200000", "vector":"drftghrq"}, {"id":"B3", "rangeFrom":"200001", "rangeTo":"300000", "vector":"drftghrq"}, {"id":"B4", "rangeFrom":"300001", "rangeTo":"400000", "vector":"drftghrq"} ] } ``` ``` Key: tokenBucket||allocations||B1 Value: {"sidecarInstance":"123.9.8.45", "remSerInstance":"134.9.8.82"} Key: tokenBucket||allocations||B2 Value: {"sidecarInstance":"123.9.8.46", "remSerInstance":"134.9.8.83"} Key: tokenBucket||allocations||B3 Value: {"sidecarInstance":"123.9.8.47", "remSerInstance":"134.9.8.82"} Key: tokenBucket||allocations||B4 Value: {"sidecarInstance":"123.9.8.45", "remSerInstance":"134.9.8.83"} ``` * In case consumed number of buckets is less than number of sidecar instances, it will create and store more tokenBuckets in `tokenBucket||consumed` and get allocations done and entered in `tokenBucket||allocations||<tokenBucket_id>`. * Once this allocation is done by leader, it is propagated to all other ReminderService instances and to all sidecar instances. * Each sidecar instance will now establish ConnectSidecar gRPC stream with all Reminder Service instances which are common to it for any token bucket. And, it can call corresponding Reminder Service Instance to Create Reminder. For Delete/Get, it would be better to call correct Reminder Service instance as per the token number, present in reminder id. * Once a Reminder Service Instance knows about its allocation, it starts to enquire ReminderDB for reminder keys, which it has to cater to. It also knows that till what token it has to enquire: ``` Key: tokenBucket||B1||consumedTill Value: "204368" ``` * There will be cases when some reminder is deleted and those keys are not present in database, so for those keys, no data will be found and they will not be catered to. * **Assumption** is that each sidecar will be load-balanced for an Activity/task, which is creating Reminders. Hence, in such a case, it should not happen that only 1 sidecar instance is placing request to Create Reminders. * What happens when there is a sub-system which uses Reminders but load balances in a different way than proposed Reminders? In that case, a Reminder can be basically required by some other sidecar instance, outside its allocated token range. So, in that case, it can basically subscribe to a reminder. (Defined below in API Specification in details) #### Q: What happens when a DaprSidecar instance goes down? ![p1.png](https://hackmd.io/_uploads/HJ3iHOzXp.png) #### Q: What happens when a ReminderService instance goes down? ![p1.png](https://hackmd.io/_uploads/r1-k8dz7p.png) #### Q: How does CreateReminder work? ![p1.png](https://hackmd.io/_uploads/rJmzI_M7a.png) #### Q: How does Execution of Reminders work? ![p1.png](https://hackmd.io/_uploads/BJ_EIuGQp.png) ## APIs ``` // ReminderService is used to register sidecar, maintain health check and // execute reminder operations service ReminderService { // Used by sidecars to Register themselves with ReminderService rpc RegisterSidecar(RegisterSidecarRequest) returns RegisterSidecarResponse {} // Maintains a bi-directional stream so that sidecar is able to register, // let Reminder Service know about its liveness // In addition, response ReminderServiceStream can be used to ExecuteReminders rpc ConnectSidecar(stream SidecarStream) returns (stream ReminderServiceStream) {} // CreateReminder to be used for Creation of Reminder rpc CreateReminder(CreateReminderRequest) returns CreateReminderResponse {} // DeleteReminder to be used for Deletion of Reminder rpc DeleteReminder(DeletereminderRequest) returns DeleteReminderResponse {} // GetReminder to be used for fetching a reminder rpc GetReminder(GetReminderRequest) returns GetReminderResponse{} // Subscribereminder can be used for subscribing to specific reminders rpc SubscribeReminder(SubscribeReminderRequest) returns SubscribeReminderResponse {} } // RegisterSidecarRequest to be used by Sidecar to get themselves registered with // ReminderService message RegisterSidecarRequest { string app_id = 1; string address = 2; } // RegisterSidecarRespoonse is used as a response by ReminderService, letting // sidecar know its token bucket prefix message RegisterSidecarResponse { string token_bucket_prefix =1; } // SidecarStream is the message used by sidecar for streaming any executeReminder // response. If there is no message to be sent, it will keep sending an Empty // message per few seconds. This will help ReminderService in understanding // what all sidecars are alive. message SidecarStream { oneof sidecar_message { ExecuteReminderResponse execute_reminder_res = 1; google.protobuf.Empty empty_msg = 2; } } // ReminderServiceStream is the message used by ReminderService to call // ExecuteReminder or to let sidecar know token buckets allocation or any // update in token bucket allocation // First, TokenBucketsUpdate message would imply that ReminderService is now ready // to take Reminder operations request, like Create/Delete etc. message ReminderServiceStream { oneof reminder_message { ExecuteReminderRequest execute_reminder_req = 1; TokenBucketsUpdate token_buckets_update = 2; } } // TokenBucketsUpdate contains All the information for All Token Buckets i.e. // to what sidecar instance and what reminder service instance, which token bucket // belongs message TokenBucketsUpdate { repeated TokenBucketInstanceSync instance_sync = 1; } // This is the single row representing one token buckets information of allocation message TokenBucketInstanceSync { string token_bucket_id = 1; string sidecar_instance = 2; string rem_ser_instance = 3; } // Reminder message contains all the information for a reminder message Reminder string reminder_name = 1; google.protobuf.Timestamp next_execution_time = 2; string initial_delay = 3; string schedule = 4; google.protobuf.Timestamp ttl = 4 // metadata here can be used to store any key value pair, including any data // reference ReminderMetadata metadata = 5; } // Optional key-value pair, which can be passed with Reminder. // Can be used to store any data reference as well. message ReminderMetadata { string key = 1; string value = 2; } message CreateReminderRequest { Reminder reminder = 1; } message CreateReminderResponse { string reminder_id = 1; } message DeleteReminderRequest { string reminder_id = 1; } message DeleteReminderResponse { } message GetReminderRequest { string reminder_id =1; } message GetReminderResponse { Reminder reminder = 1; } // Send the list of reminder_ids to be subscribed message SubscribeReminderRequest { repeated string reminder_list = 1; } // Returns back the updated reminder_ids message SubscribeReminderResponse { repeated string reminder_id = 1; } message ExecuteReminderRequest { Reminder reminder = 1; } message ExecuteReminderResponse { string reminder_id = 1; bool success = 2; } ``` ### What does **SubscribeReminder** do? There can be scenarios where some sub-system is using Reminders but that sub-system may be load balancing in a different way than the proposed Reminders service. 1 classic example is of existing Actor sub-system. It will use Reminders but the way Actors load balance, across many sidecar instances, will be very different than proposed Reminder system does. Thus, there will be scenarios where-in a sidecar instance may end up having some ActorId which has a reminder with id as `reminder||B||301234||drftghrq` , whereas this sidecar instance will not be belonging to this tockenBucket i.e. to B4 in this case (300001 - 400000). So, it will call SubscribeReminder API with a list of all such reminders to any ReminderService instance, to which this sidecar instance is connected, and that ReminderService instance will ask corresponding ReminderService to stop taking care of this reminder and once the corresponding reminder service acknowledges that it has stopped taking care of this reminder, the current ReminderService will enlist it under the Active Token Bucket belonging to this App Sidecar instance. Here, it also changes the Id of this Reminder, as per the allocated Token Bucket of current ReminderService; And, this new id will be propagated to sidecar instance to let them update this new reminder Id in their system. ## Internal Working of ReminderService ### Reminder Creation And Execution At the time of Reminder Creation, **reminder is persisted in ReminderDB as a key value pair**, where key is reminder_id and value is reminder value. ``` key: “reminder||B||200143||drftghrq” value: { “reminder_name”: “reminder_orderid_QSDF”, “next_execution_time”: “ 2023-19-28-04-58-46 ”, “initial_delay”: “10s”, “schedule”: “<TBM>”, “metadata”: { [“key”: “K1”, “value”: “V1”], [“key”: “K2”, “value”: “V2”], } } ``` **All reminders catered by a ReminderService instance are stored in-memory as well** and a map is created like below table. This in-memory map is required to be able to sort on the basis of upcoming execution time. <table> <tr> <th> Token Bucket </th> <th colspan="2"> Next Execution Time (Sorted List) </th> </tr> <tr> <td rowspan="3"> B1 </td> <td> 2023-19-28-04-58-46 </td> <td> ```json { "reminderId": "reminder||B||143||drftghrq", "reminderValue": { “reminder_name”: “run-activity” " next_execution_time ": "2023-19-28-04-58-46", “initial_delay”: “10s”, “schedule”: “”, "metadata": "" } }, { "reminderId": "reminder||B||98765||drftghrq", "reminderValue": { “reminder_name”: “dummy-reminder” " next_execution_time ": "2023-19-28-04-58-46", “initial_delay”: “10s”, “schedule”: “”, "metadata": "" } } ``` </td> </tr> <tr> <td> 2023-19-28-04-58-47 </td> <td> ```json { "reminderId": "reminder||B||9287||drftghrq", "reminderValue": { “reminder_name”: “run-activity” " next_execution_time ": "2023-19-28-04-58-47", “initial_delay”: “10s”, “schedule”: “”, "metadata": "" } } ``` </td> </tr> <tr> <td> 2023-19-28-04-58-50 </td> <td> ```json { "reminderId": "reminder||B||200287||drftghrq", "reminderValue": { “reminder_name”: “rem-w” " next_execution_time ": "2023-19-28-04-58-50", “initial_delay”: “10s”, “schedule”: “”, "metadata": "" } } ``` </td> </tr> <tr> <td> B4 </td> <td> 2023-19-28-04-58-46 </td> <td> ```json { "reminderId": "reminder||B||345673||drftghrq", "reminderValue": { “reminder_name”: “run-activity” " next_execution_time ": "2023-19-28-04-58-46", “initial_delay”: “10s”, “schedule”: “”, "metadata": "" } } ``` </td> </tr> </table> Thus, for every second, it knows upfront what all reminders to be executed for what token bucket i.e. for what sidecar instance and thus, it can call ExecuteReminder per second for corresponding Reminders. We don’t update “consumedTill” with each CreateReminder call, rather we update it in a cron based fashion, like every 2 seconds. And, when we need to fetch keys at the time of start-up, we fetch 1k or 2k more keys. Like, if consumedTill is 103456, so we will fetch from 100001 to 104456. ### At Startup At the time of startup, a leader election happens among ReminderService instances which decides for TokenBucket Allocation and each ReminderService instance reads all reminders (as per its allocated token buckets) from database. Once, this is done, ReminderService becomes ready to start catering to Reminders operation, like Creation/Deletion etc. ### What Happens At Rebalance Of Any Sub-System, having Different Rebalance Criteria #### 1st Approach: Let’s assume that any other sub-system, like Actor sub-system, gets rebalanced and some Actor Id ActorIdXYZ starts to be catered by some other sidecar, let’s say it was earlier created under sidecar instance 1 but now it is catered by sidecar instance 2. Now, sidecar instance 2 knows token bucket allocation and it reads reminder id required by this ActorIdXYZ, which let’s assume has reminder id as `reminder||B||200287||drftghrq`. Let’s assume that this sidecar instance 2 caters to token range 100001 – 200000 and 500001 - 600000. So, it knows that 200287 is not currently received by it. So, it can call SubscribeReminder at ReminderService and subscribe to this reminder. ReminderService instances will internally shift this reminder from the current Reminder instance catering to 200287 to another ReminderSidecar instance catering to any token bucket, catering to Sidecar instance 2. This shift happens in their memory only and database key-value pair of this reminder doesn’t need to be changed. Here, while shifting special care is taken to confirm that no Active ExecuteReminder call is in progress at the original ReminderService instance end. If that is in progress, it waits for its response and only then it de-allocates from its in-memory and moves ahead to shift to another Reminder Service instance. Here, Id of this Reminder is updated, as per the allocated Token Bucket of current ReminderService; And, this new id will be propagated to sidecar instance to let them update this new reminder Id in their system. #### 2nd Approach: We keep calling Reminders as per assigned TokenBuckets only. ActorId can be set in reminder metadata. When a reminder is executed on any sidecar instance, it fetches actorId for it and calculates sidecar hosting this actor Id as per its own consistent hashing mechanism and sends reminder to be executed there. ### Scale Out Recommendations We will create some benchmarks which would be an indicator representing load factor and that would be able to indicate to users, if they need to scale out. ### Memory Implications of Keeping Reminders In-Memory On a quick Back of the Envelope calculation, if we limit Metadata of Reminder to 128 bytes, a reminder data would consume around 256 bytes. That would mean that there can be around 4 million reminders in-memory in 1 GB memory. ### Upgrade Path From reminders v1 to v2 This version is not backward compatible. Customers will need to chose among v1 or v2. For existing data, we will need to create some migration scripts for existing reminders. ## Road Ahead I want to highlight few things which can be improved upon in future versions: ### Phase 2 of ReminderService (Handling Actors as well): Instead of letting Placement Service handle Actors, and keeping rebalancing them, I like the idea of handling Actors in this new Service itself, based on Dapr Actors v2 (github.com) The same principles can be used to handle Actors in this service. Only the schema will change like this: ``` key: "hosts" value: { "hosts":[ {"address":"128.9.9.43", "state": "ON"}, {"address":"128.9.9.44", "state": "OFF"}, {"address":"128.9.9.45", "state": "ON"} ] } ``` ``` key: <actorId>||host value: <host_address> ``` ``` key: <actorId||reminders> value: { "reminders":[ "reminder||B||200143||drftghrq", "reminder||B||200156||drftghrq", "reminder||B||206543||drftghrq", "reminder||B||208145||drftghrq", "reminder||B||209149||drftghrq", ] } ``` In this case, new ReminderService will be responsible for chosing a random host for a inactive/deactivated Actor. On deciding the host, it can read its reminders and change keys for them internally, i.e. in-memory and in ReminderDB. It doesn’t need to send this information of updated key to Dapr Sidecar. This would essentially mean that ScheduleReminder API won’t be required any more. Though, we can keep that, just for a feature kind of thing. ### What happens if some Token Bucket becomes Hot? As per the data points that we gather from PoC and running this service, we can set some benchmarks and accordingly design an algo deducing the weight of load. If it reaches a particular threshold, we can raise the request to divide the Hot Token Bucket and keep doing that until the load is back to normal. So, a token bucket range would be divided from 100000 to 50k and so on. The divided Token Bucket will be allocated to different Reminder Service instances. ### Token Bucket Garbage Collector There would be scenarios when some token buckets reminders would get deleted altogether. In some future version, a GC process can be there which can deduce that some token bucket has no active reminder and thus make this token bucket available for re-use. # Appendix 1: Different kinds of entries in ReminderDB: ```Key: tokenBucket||consumed Value: { "tokenBuckets":[ {"id":"B1", "rangeFrom":"1", "rangeTo":"100000", "vector":"drftghrq"}, {"id":"B2", "rangeFrom":"100001", "rangeTo":"200000", "vector":"drftghrq"}, {"id":"B3", "rangeFrom":"200001", "rangeTo":"300000", "vector":"drftghrq"}, {"id":"B4", "rangeFrom":"300001", "rangeTo":"400000", "vector":"drftghrq"} ] } ``` ``` Key: tokenBucket||allocations||B1 Value: {"sidecarInstance":"123.9.8.45", "remSerInstance":"134.9.8.82"} Key: tokenBucket||allocations||B2 Value: {"sidecarInstance":"123.9.8.46", "remSerInstance":"134.9.8.83"} Key: tokenBucket||allocations||B3 Value: {"sidecarInstance":"123.9.8.47", "remSerInstance":"134.9.8.82"} Key: tokenBucket||allocations||B4 Value: {"sidecarInstance":"123.9.8.45", "remSerInstance":"134.9.8.83"} ``` ``` Key: tokenBucket||B1||consumedTill Value: "204368" ```