# Queue Overload - Observability
## Navigation
1. [Problem](https://hackmd.io/@jwdunne/HJXyhNY4h)
2. [Observability](https://hackmd.io/@jwdunne/S1pJ1CgHn)
3. [Testing](https://hackmd.io/@jwdunne/H1zKkAeSn)
4. [Throughput optimisation opportunities](https://hackmd.io/@jwdunne/H1h2k0xH3)
5. [Backpressure](https://hackmd.io/@jwdunne/B1WZeCeBh)
6. [Load shedding](https://hackmd.io/@jwdunne/BJB4MReH2)
7. [Autoscaling](https://hackmd.io/@jwdunne/Bkw_zAxHn)
## Solution
There are a few ways to calculate overload.
One is to set a maximum queue capacity - which Laravel does not support out of the box. This is the "nuclear option", where all jobs are dropped after we reach that limit. The same mechanisms, however, still apply and we would autoscale, apply backpressure and implement load shedding before that point (i.e where overload is anything above 80% of the queue capacity).
We may need to do this if this continues to be a problem.
For now, we should consider $\lambda / c\mu \geq 1$ to be a state of overload, with $\lambda/c\mu \geq 0.8$ being a particular danger zone.
## Interface
We should use a Gateway to pull and persist these numbers:
```php
interface QueueLoadGateway
{
public function recordQueued(string $queue): void;
public function recordCompletion(string $queue): void;
public function get(): QueueLoad;
}
```
`QueueLoad` would be a value object that encapsulates the metrics and the calculations on those metrics:
```php
final class QueueLoad
{
public function __construct(
public readonly float $threshold,
public readonly int $workers,
public readonly float $queuedPerSecond,
public readonly float $completedPerSecond
) {}
public function ratio(): float;
public function overloading(): bool;
public function overloaded(): bool;
}
```
`$threshold` must satisfy $0 < threshold < 1$.
`QueueLoad::ratio` will be sent to CloudWatch for auto-scaling whenever a job is queued or completed. The latency from doing this may incur a performance penalty so we should configure the queue and events to dispatch after the response is to sent to the user.
It makes sense to use this for both `QueueLoad::overloading()` and `QueueLoad::overloading()` too:
```php
public function ratio(): float
{
if ($this->queuedPerSecond === 0) {
return 0;
}
if ($this->completedPerSecond === 0) {
return 1;
}
return $this->queuedPerSecond / ($this->workers * $this->completedPerSecond);
}
```
Since there is always the possibility for zeroes on either side, we default to $0$ if there are no jobs queued in the past 5 minutes and default to $1$ if there are queued jobs but no jobs completed in the past 5 minutes (a non-zero queued rate with a zero completed rate is a dangerous place to be).
This design does not consider monitoring separate queues but this may be desirable: if the forms or urgent queue gets backed up, this is a more serious situation.
## Job queued rate
We should build a first-class, isolated mechanism to record queued and completion rates, decoupled from a DB-driven queue. These calculations will continue to work even if we switch to a Redis or SQS-backed queue.
We will use a Redis stream to store the job queued log. Redis streams can autogenerate the ID of the stream element and they are monotonically incrementing in two parts: an auto incrementing integer and a millisecond part. This allows us to query stream by time range.
To add to the stream, we use the [`XADD`](https://redis.io/commands/xadd/) command:
```redis
XADD jobs:queued MINID <5 mins ago in ms> * queue
```
The `*` tells Redis to automatically generate an ID as described earlier. `XADD` also allows us to automatically trim the stream in one command (using `MINID <ms>`), which means we do not need to periodically purge items.
Redis' `XRANGE` command allows us to [query based on time](https://redis.io/commands/xrange/#incomplete-ids), in milliseconds, which means we can select 5 minutes of data:
```redis
XRANGE jobs:queued <5 mins ago in ms> +
```
We can then count them and then divide by 300 to find jobs queued per second.
## Job completion rate
As with job queued rate, we will use a Redis stream to store the job completion log. The operations will be exactly the same.
```redis
XADD jobs:completed MINID <10 mins ago in ms> * queue
XRANGE jobs:completed <5 mins ago in ms> +
```
## Workers
We need to know the number of worker tasks and containers per worker. To calculate the actual jobs completed rate across all workers
We will need to retrieve this number from:
- Environment variables, for the number of containers per worker variable
- AWS, for the number of worker nodes (which allows us to account for autoscaling and if a worker has been killed)
We should pass the containers per worker variable to the worker containers as an environment variable in Terraform.
We will need to use the `DescribeServices` action:
```php
$ecs->describeServices([
'cluster' => $clusterArn,
'services' => [$serviceName]
]);
```
We then use the `runningCount` property on the returned service and multiple it with the containers per worker variable.
During deployment, this number could temporarily rise, which would affect the calculated ratio. This, however, would be transient and would not erroneously trigger overload states. In terms of autoscaling, we can configure scale up and down cool downs that smooth past any fluctuations during deployment.
This will require us to pass the cluster ARN and service name as an environment variable to the workers.
We also need a dedicated IAM for describing the cluster:
```terraform
resource "aws_iam_user" "ecs_read_services" {
name = "LeadfloECS-ReadServices-${var.env_name}"
}
resource "aws_iam_user_policy" "ecs_read_services" {
name = "${var.env_name}-read-services"
user = aws_iam_user.ecs_read_services.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"ecs:DescribeServices"
]
Resource = "*"
}
]
})
}
resource "aws_iam_access_key" "ecs_read_services" {
user = aws_iam_user.ecs_read_services.name
}
```
_Note: instead of creating separate IAMs for each purpose, we should experiment with task roles, which do the same thing but means we do not need to lug IAM secrets everywhere_
This will need to be added to AWS secrets manager and then passed to the containers.
This should be provided in a separate `WorkerCountGateway` so that we can integration test. The `QueueLoadGateway` would depend on the `WorkerCountGateway` to fetch the number of workers:
```php
interface WorkerCountGateway
{
public function get(): int;
}
```
The `WorkerCountGateway` should cache the response for 2 minutes since this could be an expensive API call that could slow down the HTTP API container when jobs are queued and slow the job completion rate.
## The "overloading" state
The `overloading` method tells us if we're approaching overload. This would allow us to start frontline mechanisms early. We would take the job queued rate by the job completion rate and test if it meets a configurable threshold:
```php
public function overloading(): bool
{
return $this->ratio() >= $this->threshold && !$this->overloaded();
}
```
An invariant is that if `QueueLoad::overloaded()` is true, `QueueLoad::overloading()` is not true. This is so that we can distinguish between overloading and overloaded states. Theoretically, being `overloaded` means that `overloading` is true but, practically, it creates complexity for consumers.
## The "overloaded" state
The `overloaded` method tells us if we're now in a state of overload. This means that the job queued rate is greater than or equal to the job completion rate or:
```php
public function overloaded(): bool
{
return $this->ratio() >= 1;
}
```
## Monitoring job queued and completion rates
We would configure 2 gauges each for job queued and job completion rates. These can be emitted every 5 minutes or when a job is queued and when a job is completed (perhaps the latter due to the irony of increasing job queued rate as a solution to dealing with overload).
## Alerting on overloading and overloaded queues
We should set up 2 alerts:
1. Queue overloading—which is an early warning signal that things are about to get hot
2. Queued overloaded—which tells us that we're now in overload
These alerts make it clear why the system has introduced backpressure and load shedding measures. Otherwise, these measures could silently affect other metrics.
## Implementation
- Implement `QueueLoad` value object
- Pass workers per container as an environment variable
- Experiment with ECS task roles for providing permissions
- Create an IAM for reading ECS services and pass as secret
- Define and implement `WorkerCountGateway`
- Define and implement `QueueLoadGateway`
- Implement a gauge for job completion rate and job queued rate
- Implement an alert for queue overloading
- Implement an alert for queue overloaded