# Increase reliability by using Kafka in the Driver API
## Goals
- reduce strain on the database by reducing the number of transactions that frequently write to many tables (e.g. TripAssignment), and reducing the number of frequent queries that query the same tables
- move towards small core driver data store that can be used efficiently for synchronous reads and writes, and support eventual consistency for other data
- reduce synchronous query response times by moving tangential tasks (e.g. sending notifications, updating WASEAT) into kafka consumers
- fix NoMethodErrors which result from trying to use immutable records that are no longer current (related to race conditions)
- fix inconsistent data with order guarantees
## Database strain
The database is currently under strain because of frequently running queries that either query/update many tables (TripAssignment) or run even more frequently and need to update some of the same tables as the larger queries.
Identifying the exact part of the larger queries that is the problem is difficult, but making improvements to small frequent queries, e.g. CurrentTrip or UpdateOrder, will leave room for flexibility with the larger queries.
Actual problems include:
- reads that run for a long time, forcing the database to hold extra copies of rows that it could otherwise delete. This creates pressure on memory
- updates for the same rows blocking while they wait for each other and so building up a queue that stops later transactions from running.
The solutions for the other problems listed in this document do not directly address TripAssignment (which has previously been fragile), but will take pressure off the database so that TripAssignment is indirectly improved.
Kafka is involved in many of these improvements because it allows us to take database intensive work and split it up between the main work to be done and the work that can be done in a consumer process at a later time.
## Overly general data maintenance in driver endpoints
The current database suits the purpose of storing data efficiently (everything is normalised). Normalisation makes reading and writing less efficient.
Impact on driver app reads and writes:
1) When the driver app updates data via GraphQL, writes are done synchronously to tables and fields that are more heavily normalised than makes sense for the driver app's use of the data.
1) Before responding, the server reads many fields for updating restaurant dashboard and customer app that are not part of the data the driver app needs.
1) the driver app does not make use of the response data from GraphQL updates, instead it makes a separate CurrentTrip query.
Compare the data used in the synchronous `UpdateOrder` call with the data actually used by the Driver app from the `CurrentTrip` query.
### UpdateOrder
Note how few of the fields are actually used to keep the data up to date vs how many are used for updating the restaurant dashboard.
| attribute | used by |
| -------- | -------- |
| user.remember_token, user.driver | looking up driver |
| order.albaik_identifier | looking up order |
| order.delivery | keep record of old delivery in memory, create new delivery with state `out_for_delivery` |
| order.preparation | broadcast to restaurant dashboard |
| preparation.branch, preparation.albaik_identifier (current preparation & old preparation) | broadcast to restaurant dashboard |
| driver.current_trip.id, driver.status (current prep & old prep) | broadcast to restaurant dashboard |
| preparation.order.albaik_identifier, preparation.pickup?, preparation.order.created_at, preparation.order_total_item_count, preparation.order_items, preparation.preparable.at_restaurant?, branch.can_reject_orders, preparation.delivery?, preparation.driver.working_on_delivery?(preparation.delivery), preparation.driver.location.present?, preparation.driver.id, preparation.arrived?, preparation.preparable_type | rendering preparation on dashboard |
| (new delivery).status | update new driver status to working on order|
| order .albaik_identifier, order.user.preferred_locale, order.user.notify, order.overall_order_status, order.delivery_status OR order.status | notify customer |
### Current Trip query
This data is what is actually used by the driver app and could be maintained in a denormalised structure that does not have to be updated very often (if at all). Most of it could be assembled once when a trip is assigned; only the delivery status changes in the normal workflow of an order.
```
Order {
// no change after assignment
id
albaikIdentifier
total
latitude
longitude
delivery { //changes
createdAt
status
}
branch { // no change without reassignment
id
name
latitude
longitude
}
customer { //no change
firstName
lastName
telephone
}
items { //no change
id
name
quantity
total
}
}
```
### Improvement: maintain separate data for the driver app
All of the data that is actually [used by the driver app](#Current-Trip-query) can be updated in a small store that is maintained separately from the updates of other data.
CurrentTrip will only read from this "core" driver store. The store can be denormalised to the structure that is useful for assembling the results of CurrentTrip query. It only needs fields that are useful for the CurrentTrip query.
Initially this store could just be a separate table(s) in the current postgres database, later it could be in a different database or noSQL store.
- every time a trip is assigned create driver trip row
- what causes delivery status to change?
- order.replace_delivery only called by TripAssignment.create! and UpdateOrderDeliveryService (UpdateOrder mutation - scan, completed, failed)
- Delivery.create! only called by TripAssignment
- UpdateOrder will only write to this store
- Kafka consumers will take care of updating non-driver app data
#### Incremental additions to core driver data
We can start by creating the data needed by the app to handle a driver assignment. After we have the initial data, we can incrementally increase the scope of the data with changes during the delivery flow (driver arrival, then collecting, etc).
Driver status is not read from the server by the driver app, so initially the core driver data will exclude it. We may eventually want to build a snapshot of driver status data for the _server_ to use (e.g. for trip assignment). At that point it may make sense to add that to core driver data, or it might be that we want to store it somewhere else.
- what causes driver status to change?
- DriverStatus only updated by Drivers::UpdateDriverStatus.perform(driver, status)
- Drivers::UpdateDriverStatus.perform called from
- Drivers::UpdateOrderDeliveryStatus (scan, completed, failed)
- Drivers::UpdateDriverProfile (change driver location)
- ReprocessRejectedPreparableJob (mark driver available after order rejected)
- Driver#make_available
- Mutations::UpdateDriverStatus (any of unavailable .. returning order to restaurant)
- TripAssignment#update_previously_assigned_driver
#### Eventual consistency of other data
All of the data that is currently read by the [synchronous UpdateOrder query](#UpdateOrder) that is NOT [used by the driver app](#Current-Trip-query) must be eventually consistent. The restaurant dashboard still needs to show accurate information and the customer must still be notified of changes in their order.
This can be achieved by having consumers that update the "normal" data after we send an event to indicate that delivery status changes have arrived from the app.
## Slow synchronous performance example: UpdateOrder mutation
Skylight indicates that the graphQL UpdateOrder mutation is called very frequently and its performance (p50=200ms, p90=590ms) is a problem as a result.
UpdateOrder starts a complicated process:
1. Update delivery on order with new status
2. Update driver status
2.1 if waiting at restaurant, update restaurant dashboard
2.2 if accident, reassign the order or fail it
3. close order in WASEAT if delivery is delivered
4. record failure if delivery is failed
5. Send notification to customer
6. Return the order ID to the driver API
### Improvement: reduce synchronous work
At least steps 2.1, 3, 5, can be moved to consumers without affecting the driver app.
This will reduce the time spent with open connection between app and server and allow the asynchronous steps to happen in parallel (if desirable).
### Improvement: decoupling
Separating concerns makes business logic sense: e.g. an inability to communicate with WASEAT should not stop the driver from marking an order delivered.
### Long term: purely asynchronous connections
Should further reductions in response time be desired, the entire request could be made asynchronous. This will require new interaction on the driver app.
### UpdateOrder with Kafka
```mermaid
sequenceDiagram
participant DA as DriverApp
participant GQL as GraphQL
participant UDS as UpdateOrderDeliveryStatus
participant K as Kafka
participant CRD as RestaurantConsumer
participant CW as WaseatConsumer
participant CAN as CustomerNotificationConsumer
participant RD as RestaurantDashboard
participant CA as CustomerApp
participant W as Waseat
activate GQL
DA->>+GQL: updateOrder (scan/completed/failed)
activate UDS
GQL->>+UDS: perform
note over UDS: update delivery<br/>update driver
UDS->>+K: delivery updated
deactivate UDS
GQL->>+DA: order.id
deactivate GQL
activate CW
CW->>+K: fetch event
CW->>+W: close order
deactivate CW
activate CAN
CAN->>+K: fetch event
CAN-->>+CA: push notification ("WAITING_AT_RESTAURANT")
deactivate CAN
activate CRD
CRD->>+K: fetch event
CRD->>+RD: broadcast delivery update
deactivate CRD
```
## Race condition example: driver arrival
When the driver app communicates that the driver has arrived at the same time that the restaurant dashboard communicates that the team member is collecting the order:
* We encounter a NoMethodError
* the DB becomes out of sync
```mermaid
sequenceDiagram
participant Driver App
participant GraphQL
participant Serializer
participant PreparationStatusChannel
participant DB
participant /preparations
participant Restaurant dashboard
Driver App->>+GraphQL: updateOrder (arrived=1)
activate GraphQL
activate /preparations
Restaurant dashboard->>+/preparations: XHR PATCH preparation (collecting=1)
GraphQL->>+DB: load d0 (arrived=0, collecting=0)
/preparations->>+DB: load d0 (arrived=0, collecting=0)
GraphQL->>+DB: replace d0 with d1 (arrived=1, collecting=0)
/preparations->>+DB: replace d0 with d2 (arrived=0, collecting=1)
note right of DB: DB is now in <br/>an inconsistent state. <br/>Order should be <br/>arrived=1 AND collecting=1
GraphQL->>+Serializer: construct order JSON based on d1
GraphQL->>+PreparationStatusChannel: broadcast(d1.preparation)
note right of /preparations: No error here.<br/> HTML is assembled<br/> based on latest delivery/preparation.
/preparations-->>+Restaurant dashboard: preparation HTML based on d2
deactivate /preparations
PreparationStatusChannel--x+Restaurant dashboard: preparation HTML
note right of PreparationStatusChannel: NoMethodError occurs here. <br/>d2.preparation is up to date.
note left of GraphQL: error bubbles up and <br/>driver App does not get <br/> updated order JSON
GraphQL-->>+Driver App: error
deactivate GraphQL
```
### Improvement: Share denormalised data in events
Events will contain denormalised data. Rather than assembling an object graph from the database we will have all the data without having to run queries in consumers.
Fewer queries serve data for more operations and so we will lock database rows less frequently.
The event data will be internally consistent, e.g. we will not find that `delivery.preparation` is unexpectedly nil because the preparation data has `current = false`. This should prevent NoMethodErrors.
(we will have to take care to validate that processing an event does not create an invalid state)
### Long term: order guarantees
Once the conflicting events (driver arrival and order collected) are both in Kafka, we should be able to prevent race conditions entirely by processing them in the right order in a kafka partition with the same partition key (probably driver ID).
## Work arising
Start by creating the Driver Data (see "future" boxes in Driver Data column below) and using it to answer `CurrentTrip` queries. Break this work up by starting with trip assignment and slowly building up the data.
```mermaid
sequenceDiagram
participant CA as Customer App
participant DA as Driver App
participant CP as Customer API
participant TA as TripAssignment
participant ND as Normal Data
participant DP as Driver API
participant RD as Restaurant Dash
participant RP as Restaurant API
participant DD as Driver Data
DA->>+CP: POST /sessions
note over DA: store token for <br/>driver API requests
DA->>+DP: UpdateDriverStatus(AVAILABLE)
DP->>+ND: create DriverStatus
CA-->>+CP: place order
CP->>+TA: enqueue
TA->>+ND: assign order
TA->>+DD: create Driver::CurrentTrip
note over TA: TODO: in "create driver data asynchronously<br/> on trip assignment" we will build<br/> a consumer to do this
TA-->>+DA: notify of driver assignment
TA-->>+RD: broadcast
DA->>+DP: fetch CurrentTrip
DP->>+DD: (when feature flag is on) answer from Driver::CurrentTrip
note over DD: DONE in <br/>"As a driver, after I am assigned<br/> an order I want to know about my trip soon"
note over DA: display current order
note over DA: Q: how does driver app<br/>work out that its status<br/>is DRIVING_TO_RESTAURANT?
DA->>+DP: UpdateDriverProfile(lat, lng)
note over DA: repeat every 45s<br/>until...
note over DA: driver presses<br/> "I have arrived"
DA->>+DP: UpdateDriverStatus(WAITING_AT_RESTAURANT)
note over DP: TODO update from Driver API<br/> in "As a driver when I arrive<br/> I want my driver app to show this"
DP->>+DD: Update Driver::CurrentTrip
DP->>+ND: replace DriverStatus
DP-->>+RD: broadcast
note over RD: Team member prints
RD->>+RP: collecting
RP->>+ND: update
RP->>+DD: Update Driver::CurrentTrip
note over DD: <br/>asynchronous update<br/> from consumer<br/>event originates:<br/>restaurant API<br/> TODO in "As a driver when the order is<br/> collected by the team member I want<br/> my driver app to show this"
RP-->>+DA: notify
RP-->>+CA: notify
DA->>+DP: fetch CurrentTrip
note over DA: driver scans QR code
DA->>+DP: UpdateOrder(OUT_FOR_DELIVERY)
DP->>+DD: update Driver::CurrentTrip
note over DD: future: <br/>synchronous update<br/> from Driver API TODO in "As a driver<br/> when I collect the order I want<br/> my driver app to show this"
DP->>ND: replace delivery
DP->>ND: replace DriverStatus
DP-->>+RD: broadcast
DA->>+DP: fetch CurrentTrip
note over DA: driver confirms delivery
DA->>+DP: UpdateOrder(DELIVERED)
note over DD: future: <br/>synchronous update<br/> from Driver API TODO in "As a driver <br/>when I deliver the order I want <br/> my driver app to show this"
DP->>ND: replace delivery
DP->>ND: replace DriverStatus
DP-->>+RD: broadcast
note over DP: also close in WASEAT
DP-->>+CA: notify
DA->>+DP: fetch CurrentTrip
```
### Plan/Progress
1) DONE Write data to new snapshot on initial trip assignment.
*assess data needed by making CurrentTrip read from an empty table and seeing what fields are missing*
2) DONE Read data for current trip from snapshot if flipper flag is turned on for driver.
*for drivers who have the feature flag turned on, this will be as good as the Driver core data that has been assembled*
*document in PR how people can turn on reading from snapshot for a particular driver*
3) DONE move the creation of driver data into a consumer. Send an event from `TripAssignment` instead of creating the data directly.
4) DONE Update driver core data asynchronously when the restaurant team member collects an order
- A restaurant team member will print the receipt from the restaurant dashboard and start collecting the order.
- This process calls `Restaurant::PreparationsController#update` which calls `Restaurant::UpdatePreparableStatus#perform`
- We must send an event from `Restaurant::UpdatePreparableStatus` (something like `delivery.ready_to_collect`)
- We must add a consumer to listen for the `ready_to_collect` event which will update the `delivery_status` on `Driver::CurrentTrip`.
5) DONE Other order updates:
*If initiated by driver API (e.g. driver arrival), update snapshot synchronously*
*If initiated outside of driver API, update rest of data synchronously and update driver snapshot asynchronously via Kafka*
6) move other non-data work arising from Driver API into consumers for eventual consistency
e.g. for `UpdateOrder(out_for_delivery)` we need to change the UpdateOrder mutation
- DONE add event to out_for_delivery path of UpdateOrder
- behind a feature flag
- add consumer behaviour for restaurant dashboard
- ensure event has data necessary for broadcast
- "broadcast to dashboard" code is currently called directly from UpdateOrder mutation. Copy this into consumer
- also behind a feature flag
- turn dashboard broadcast code in UpdateOrder OFF when feature flag is true
- turn dashboard broadcast code in consumer ON when feature flag is true
- add consumer behaviour for customer notification (behind enableKafkaOutForDeliveryCustomerNotification)
- customer notification code is currently called directly from UpdateOrder mutation. Copy this into new consumer
- ensure event has data necessary for push notification
- feature flag
- turn customer notification in UpdateOrder mutation OFF when feature flag is true
- turn behaviour in consumer ON when feature flag is true
7) Move non-driver data work out of driver API too
We ultimately only want the Driver API to update driver data. We can update the data that is not in the core Driver::CurrentTrip table in a consumer instead of directly from the Driver API.
In the beginning we will have driver data that is not in the "core" Driver::CurrentTrip, for instance the Driver and DriverStatus tables. The rest of the system still needs that data, but it does not have to be updated by the Driver API.
e.g. for driver arrival (`UpdateDriverStatus(waiting_at_restaurant)`)
- currently we call `driver.change_status!`. This call can be moved into a different consumer.
## Maintaining focus
- ignore edge cases like branch reassignment
- don't attempt to simplify calculated driver status by using core driver snapshot (will be nice, but we don't need it now)
- add order updates incrementally, don't try to do them at once
- stick to the happy path
Having a feature flag for each consumer of each event will add up to many code paths. We can consider using the strategy pattern (see ["Avoiding Conditionals" here](https://martinfowler.com/articles/feature-toggles.html)) to refactor the current `UpdateOrderDeliveryStatus` code.
If the other graphQL mutations are good candidates for offloading synchronous work, we can then follow the above pattern for those mutations too.