# Job Queue
## Changelog
* 2024-08-12: Add [the "Better" section](#Better) to demonstrate general and "`JSONB` column approach"-specific improvements you can possibly make.
* 2024-08-06: Initial version
<details>
<summary>Code from tweets</summary>
<img src="https://hackmd.io/_uploads/HkCG851qR.png" />
<img src="https://hackmd.io/_uploads/r1yr8qycA.jpg" />
</details>
A few observations first:
1. `enum Job` has two variants with the same contents:
* `ProcessMetricsUpload(JobDetails)`
* `SendVerificationEmail(JobDetails)`
Sometimes it totally makes sense to do that, other times you'll do just fine with a simple
```rust=
struct Job {
kind: JobKind,
}
enum JobKind {
ProcessMetricsUpload,
SendVerificationEmail,
}
```
NB: This "kind-based" approach won't work if you want to store different data depending on the kind:
```rust=
enum Job {
ProcessMetricsUpload { metric_name: String },
SendVerificationEmail { email_address: EmailAddress },
}
```
Based on the code I've seen, it feels like `kind` would work well enough, but I won't make any assumptions to cover all possible use cases.
2. `struct Database` doesn't do much, it mostly copies the interface of `sqlx`. So one could say it's a level of inderection without a useful abstraction to leverage: even `query_one` and `query` methods are dependent on `sqlx` and `Postgres` functionality.
If no abstraction is needed (i.e. you're not trying to follow "clean architecture" rules), you can just pass and `.clone()` `sqlx::PgPool` everywhere:
```rust=
struct Queue {
pg_pool: sqlx::PgPool,
// Other fields...
}
```
Now, based on observations above, we have three options available to us if we want to use compile-time type checking via `sqlx::query!` and `sqlx::query_as!` macros.
## 1. Kind-based approach
```rust=
#[derive(sqlx::Type)]
#[sqlx(type_name = "job_type", rename_all = "snake_case")]
pub enum JobType {
ProcessMetricsUpload,
SendVerificationEmail,
}
// Notice how we don't need to derive anything here
// (or implement any trait, for that matter).
struct Job {
id: String,
r#type: JobType,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
}
async fn get_available_jobs(&self) -> Result<Vec<Job>> {
sqlx::query_as!(
Job,
r#"
SELECT id
, type AS "r#type: JobType"
, locked_at
, created_at
, updated_at
FROM job_queue
"#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)
}
```
The `AS "r#type: JobType"` annotation is important! It helps `sqlx` to map `job_type` (a type stored directly in Postgres) to our `enum JobType`. Why do need need to explicitly repeat `JobType` here? `sqlx` doesn't assume there is only one possible way to convert Postgres's `job_type` into some arbitrary Rust type (there could be many!), so we nudge it a little.
## 2. Enum-based approach
More type safety, nice! We will need an intermediate struct to store raw Postgres data, which we can easily convert into our "domain-level" (sorry) struct.
```rust=
////////////////////////////////////////////////////////////////////////////////
// `job.rs`
enum Job {
ProcessMetricsUpload(JobDetails),
SendVerificationEmail(JobDetails),
}
struct JobDetails {
id: String,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
}
////////////////////////////////////////////////////////////////////////////////
// `queue.rs`
async fn get_available_jobs(&self) -> Result<Vec<Job>> {
let jobs = sqlx::query_as!(
QueryJob,
r#"
SELECT id
, type AS "r#type: QueryJobType"
, locked_at
, created_at
, updated_at
FROM job_queue
"#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)?;
Ok(jobs
.into_iter()
.map(Job::from) // Implementation below.
.collect())
}
// I usually name such struct with `Query` prefix
// to signal that this struct is used only for
// `SELECT`-ing data from Postgres and nothing else.
struct QueryJob {
id: String,
r#type: QueryJobType,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "job_type", rename_all = "snake_case")]
enum QueryJobType {
ProcessMetricsUpload,
SendVerificationEmail,
}
// With this, we can write `query_job.into()` to get a `Job`.
// Or, alternatively, `Job::from(query_job)`.
impl From<QueryJob> for Job {
fn from(j: QueryJob) -> Job {
let details = JobDetails {
id: j.id,
locked_at: j.locked_at,
created_at: j.created_at,
updated_at: j.updated_at,
};
match j.r#type {
QueryJobType::ProcessMetricsUpload => Job::ProcessMetricsUpload(details),
QueryJobType::SendVerificationEmail => Job::SendVerificationEmail(details),
}
}
}
```
## 3. Advanced enums
I think it's fair to assume some day different types of jobs will need different data. Without coming up with some crazy schemas (e.g. using a table per type), we can store all the data in a single table. On the spot, I can think of two ways:
### Many columns
Store common data in standalone columns
```psql=
id VARCHAR(64) NOT NULL
, locked_at TIMESTAMPTZ
, created_at TIMESTAMPTZ
, updated_at TIMESTAMPTZ
```
and store type-related data in optional columns for each type:
```psql=
-- Metrics upload.
, metrics_upload__metric_name TEXT
, metrics_upload__uploaded_at TIMESTAMPTZ
-- Verification emails.
, verification_email__email_address TEXT
, verification_email__sent_at TIMESTAMPTZ
```
Now you can declare `QueryJob` with all the columns:
```rust=
////////////////////////////////////////////////////////////////////////////////
// `job.rs`
// Let's say some type-specific fields are required
// (e.g. `metric_name` or `email_address`).
enum Job {
ProcessMetricsUpload {
details: JobDetails,
metric_name: String,
uploaded_at: Option<DateTime<Utc>>,
},
SendVerificationEmail {
details: JobDetails,
email_address: String,
sent_at: Option<DateTime<Utc>>,
},
}
struct JobDetails {
id: String,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
}
////////////////////////////////////////////////////////////////////////////////
// `queue.rs`
struct QueryJob {
id: String,
r#type: QueryJobType,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
// Metrics upload.
metric_name: Option<String>, // Optional due to Postgres column being `NULL`-able.
metric_uploaded_at: Option<DateTime<Utc>>,
// Verification emails.
verification_email_address: Option<String>, // Optional due to Postgres column being `NULL`-able.
verification_email_sent_at: Option<DateTime<Utc>>,
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "job_type", rename_all = "snake_case")]
enum QueryJobType {
ProcessMetricsUpload,
SendVerificationEmail,
}
async fn get_available_jobs(&self) -> Result<Vec<Job>> {
let jobs = sqlx::query_as!(
QueryJob,
r#"
SELECT id
, type AS "r#type: QueryJobType"
, locked_at
, created_at
, updated_at
, metrics_upload__metric_name AS metric_name
, metrics_upload__uploaded_at AS metric_uploaded_at
, verification_email__email_address AS verification_email_address
, verification_email__sent_at AS verification_email_sent_at
FROM job_queue
"#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)?;
let jobs = jobs
.into_iter()
.map(Job::try_from)
// See: https://doc.rust-lang.org/rust-by-example/error/iter_result.html#fail-the-entire-operation-with-collect
.collect::<Result<Vec<Job>>()?;
Ok(jobs)
}
impl TryFrom<QueryJob> for Job {
type Error = crate::Error;
fn try_from(j: QueryJob) -> Result<Job, crate::Error> {
let details = JobDetails {
id: j.id,
locked_at: j.locked_at,
created_at: j.created_at,
updated_at: j.updated_at,
};
match j.r#type {
QueryJobType::ProcessMetricsUpload => {
// Manually process every required field here.
//
// `Error::JobIntergrity` is a new error variant
// to help you debug errors when they occur.
// Be specific so that future you won't spend
// more than one or two minutes fixing possible issue.
let metric_name = j
.metric_name
.ok_or(crate::Error::JobIntegrity {
missing_field: "metric_name",
})?;
Ok(Job::ProcessMetricsUpload {
details,
metric_name,
metric_uploaded_at: j.metric_uploaded_at,
})
}
QueryJobType::SendVerificationEmail => {
let verification_email_address = j
.verification_email_address
.ok_or(crate::Error::JobIntegrity {
missing_field: "verification_email_address",
})?
Ok(Job::SendVerificationEmail {
details,
verification_email_address,
verification_email_sent_at: j.verification_email_sent_at,
})
}
}
}
}
```
### Single JSON(B) column
Store common data in standalone columns
```psql=
id VARCHAR(64) NOT NULL
, locked_at TIMESTAMPTZ
, created_at TIMESTAMPTZ
, updated_at TIMESTAMPTZ
```
and store type-related data in a single JSONB column:
```psql=
, data JSONB (NOT NULL?)
```
```rust=
////////////////////////////////////////////////////////////////////////////////
// `job.rs`
enum Job {
ProcessMetricsUpload {
details: JobDetails,
metric_name: String,
uploaded_at: Option<DateTime<Utc>>,
},
SendVerificationEmail {
details: JobDetails,
email_address: String,
sent_at: Option<DateTime<Utc>>,
},
}
struct JobDetails {
id: String,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
}
////////////////////////////////////////////////////////////////////////////////
// `queue.rs`
struct QueryJob {
id: String,
r#type: QueryJobType,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
// Type-specific data.
data: serde_json::Value,
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "job_type", rename_all = "snake_case")]
enum QueryJobType {
ProcessMetricsUpload,
SendVerificationEmail,
}
async fn get_available_jobs(&self) -> Result<Vec<Job>> {
let jobs = sqlx::query_as!(
QueryJob,
r#"
SELECT id
, type AS "r#type: QueryJobType"
, locked_at
, created_at
, updated_at
, data
FROM job_queue
"#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)?;
let jobs = jobs
.into_iter()
.map(Job::try_from)
// See: https://doc.rust-lang.org/rust-by-example/error/iter_result.html#fail-the-entire-operation-with-collect
.collect::<Result<Vec<Job>>()?;
Ok(jobs)
}
impl TryFrom<QueryJob> for Job {
type Error = crate::Error;
fn try_from(j: QueryJob) -> Result<Job, crate::Error> {
let details = JobDetails {
id: j.id,
locked_at: j.locked_at,
created_at: j.created_at,
updated_at: j.updated_at,
};
match j.r#type {
QueryJobType::ProcessMetricsUpload => {
#[derive(serde::Deserialize)]
struct ProcessMetricsUploadJobData {
metric_name: String,
metric_uploaded_at: Option<DateTime<Utc>>,
}
let data: ProcessMetricsUploadJobData = serde_json::from_value(&j.data)?;
Ok(Job::ProcessMetricsUpload {
details,
metric_name: data.metric_name,
metric_uploaded_at: data.metric_uploaded_at,
})
}
QueryJobType::SendVerificationEmail => {
#[derive(serde::Deserialize)]
struct SendVerificationEmailJobData {
email_address: String,
email_sent_at: Option<DateTime<Utc>>,
}
let data: SendVerificationEmailJobData = serde_json::from_value(&j.data)?;
Ok(Job::SendVerificationEmail {
details,
verification_email_address: data.verification_email_address,
verification_email_sent_at: data.verification_email_sent_at,
})
}
}
}
}
```
There are a few things you can improve here, but I omitted them for brevity. Let me know if you're interested in it and I'll create a separate HackMD doc.
#### Better
The example above is quite suboptimal in a few ways:
1. For common fields, we have to duplicate `details: JobDetails` for each enum variant. It prevents us from easily mutating common fields, because we need to match on each variant.
2. Since Rust doesn't treat enum variants as separate types ([yet!](https://github.com/rust-lang/rfcs/pull/2593)), we can't add additional functionality to specific variants (i.e. `impl Job::SendVerificationEmail`).
3. We didn't define clear boundary between "domain" types and "storage" types: even though we have inline structs like `ProcessMetricsUploadJobData` and `SendVerificationEmailJobData`, it's not clear how we can achieve `INSERT`-s.
Let's see how we can mitigate these flaws.
##### Common fields and separate functionality
We can create a "root" struct with common fields and store type-specific fields in the `JobPayload` enum:
```rust=
struct Job {
id: String,
status: JobStatus,
locked_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
updated_at: Option<DateTime<Utc>>,
payload: JobPayload,
}
enum JobStatus {
// Your statuses here...
}
enum JobPayload {
ProcessMetricsUpload(job_payload::ProcessMetricsUpload),
SendVerificationEmail(job_payload::SendVerificationEmail),
}
mod job_payload {
struct ProcessMetricsUpload {
storage_id: String,
path: String,
full_path: String,
}
struct SendVerificationEmail {
email: String,
verification_code: String,
}
}
```
Since our `JobPayload` enum simply wraps payload-specific structs, we can do easily do this for common fields:
```rust=
impl Job {
fn lock(&mut self) {
self.locked_at = Some(Utc::now());
}
}
```
And also **this!** for payload-specific fields:
```rust=
mod job_payload {
impl ProcessMetricsUpload {
fn replace_storage_id(&mut self, new_id: String) -> String {
std::mem::replace(&mut self.storage_id, new_id)
}
// Any method you may need...
}
}
impl Job {
fn process_metrics_upload_mut(&mut self) -> Option<&mut job_payload::ProcessMetricsUpload> {
match self.payload {
JobPayload::ProcessMetricsUpload(payload) => Some(&mut payload),
_ => None,
}
}
}
// Usage
fn process_job(job: Job) {
if let Some(metrics) = job.process_metrics_upload_mut() {
let old_id = metrics.replace_storage_id("123".into());
tracing::debug!(?old_id, "Changed storage id");
}
}
```
##### Clear mapping between "domain" and "storage" types
Now that we have `job_payload::ProcessMetricsUpload` and `job_payload::SendVerificationEmail` as separate types, we can significantly simplify conversion from `QueryJob` into `Job`:
```rust=
impl TryFrom<QueryJob> for Job {
type Error = crate::Error;
fn try_from(j: QueryJob) -> Result<Job, crate::Error> {
// NB: The careful reader will notice that
// we don't really have to match on `r#type`,
// but I will omit the explanation for now
// to make it more digestible.
let payload = match j.r#type {
QueryJobType::ProcessMetricsUpload => {
JobPayload::ProcessMetricsUpload(
serde_json::from_value(&j.payload)?
)
}
QueryJobType::SendVerificationEmail => {
JobPayload::SendVerificationEmail(
serde_json::from_value(&j.payload)?
)
}
};
Ok(Job {
id: j.id,
locked_at: j.locked_at,
created_at: j.created_at,
updated_at: j.updated_at,
payload,
})
}
}
```
Each variant of `JobPayload` contains only type-specific data and we store `payload JSON(B)` column with exactly this data and nothing more or less, so conversion is really straightforward: `serde_json::Value` -> `JobPayload`.
A small nit: I don't like to perform "heavy" operations (like parsing JSON-s) inside the `try_from` method, since it hides the cost of such operation for the reader. We can just replace it with
```rust=
fn build_job(query_job: QueryJob) -> Result<Job, crate::Error> {
// Same implementation as in `try_from`.
}
```
(A thing of preference, I guess)
Another small nit: I always keep `impl TryFrom<QueryThing> for Thing` close to `QueryThing` (and not close to `Thing`), since `Thing` shouldn't really care about `QueryThing` at all. The same applies to `impl TryFrom<Thing> for QueryThing` (I keep it near `QueryThing`).
Back to the code, we have one downside with this simplified conversion: any change to the `JobPayload` type should be considered breaking and the compiler doesn't have any means to detect such changes when calling `serde_json::from_value`. For that, we'll need a separate `QueryJobPayload` type with these goals:
1. Changes to `JobPayload` don't immediately break conversion from `serde_json::Value`, because our conversion will look like this: `serde_json::Value` (raw, Postgres level) -> `QueryJobPayload` (intermediate, "expected from Postgres" level) -> `JobPayload` (domain level).
2. `QueryJobPayload` only reflects how payload is expected to be stored in Postgres and may differ from `JobPayload` in any way until it's convertible.
3. Track breaking changes with `QueryJobPayload` exclusively, without having to worry about the rest of the application breaking.
```rust=
struct QueryJob {
id: String,
r#type: QueryJobType, // We don't really need this anymore!
payload: serde_json::Value, // We can leverage clever tricks to use `QueryJobPayload` instead of `serde_json::Value` here,
// but I won't in order to keep the code simpler and easier to understand.
}
// One-to-one representation of the shape of data we expect from the JSONB column.
enum QueryJobPayload {
ProcessMetricsUpload {
storage_id: String,
}
}
fn build_job(query_job: QueryJob) -> Result<Job, crate::Error> {
// `query_job.payload: serde_json::Value` should already know which type of the job this is,
// so we can deserialize from JSON into the `QueryJobPayload` enum without looking at `r#type`.
let query_payload: QueryJobPayload = serde_json::from_value(&query_job.payload)?;
Ok(Job {
id: query_job.id,
payload: query_payload.into(),
})
}
// Any breaking change to `QueryJobPayload` will break this `impl`
// and you'll know whether you need to adjust anything
// in the surrounding code.
impl From<QueryJobPayload> for JobPayload {
fn from(p: QueryJobPayload) -> JobPayload {
match p {
// ...
}
}
}
```
With this, we can do `INSERT`-s the following way:
```rust=
async fn insert_job(pg_pool: &sqlx::PgPool, job: Job) -> Result<()> {
let query_payload: QueryJobPayload = job.payload.into();
let payload = serde_json::to_value(&query_payload)?;
sqlx::query!(
r#"
INSERT INTO job_queue(
id, locked_at, payload
)
VALUES ($1, $2, $3),
"#,
job.id,
job.locked_at,
query_payload,
)
.execute(pg_pool)
.await?;
Ok(())
}
#[derive(serde::Serialize)]
enum QueryJobPayload {
ProcessMetricsUpload {
storage_id: String,
}
}
// Any breaking change to `QueryJobPayload` will break the implementation here.
impl From<JobPayload> for QueryJobPayload {
// Implementation...
}
```
### Bonus: better error handling
Suppose one of many jobs has its data corrupted. What will `get_available_jobs` return?
```rust=
async fn get_available_jobs(&self) -> Result<Vec<Job>> { ... }
```
That's right, an error. Kinda lame that one single job results in throwing away all other jobs.
Let's change the signature a bit:
```rust=
async fn get_available_jobs(&self) -> Result<
Vec<Result<Job, crate::Error>>,
crate::Error,
>
{ ... }
```
Now, we have outer `Result` which can return an error when something really unexpected happens (like Postgres is unavailable). If we managed to `SELECT` raw data (meaning rows represented by `QueryJob`), then we can return a vector of results where each result represents possible failure of converting that one respective job.
```rust=
async fn get_available_jobs(&self) -> Result<
Vec<Result<Job, crate::Error>>,
crate::Error,
> {
let jobs = sqlx::query_as!(
QueryJob,
r#"..."#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)?;
Ok(jobs
.into_iter()
.map(Job::try_from)
.collect())
}
// Usage
async fn complex_routine() {
let jobs: Vec<Job> = get_available_jobs()
.await
.inspect_err(|error|
tracing::error!(?error, "Failed to get available jobs from Postgres")
)?
// Now, filter out all errors and leave only
// successfully deserialized jobs.
.into_iter()
.filter_map(|result| match result {
Ok(job) => Some(job),
Err(error) => {
tracing::error!(?error, "Failed to deserialize job from Postgres");
None
}
// Or use `.ok()`:
// result
// .inspect_err(|error| tracing::error!(?error, "Failed to deserialize job from Postgres"))
// .ok()
})
.collect();
}
```
Admittedly, it's a bit more hustle. Why should we even do that?
I can think of at least one reason: if we consider the application in layers, "lower level" layers should not make decisions for "higher" ones. For instance, `get_available_jobs` should not throw out valid jobs if one of them is corrupted - only "higher level" layer should decide what to do with such jobs.
In case we consider `get_available_jobs` a "higher level" layer, we can filter out and log corrupted jobs before returning them:
```rust=
async fn get_available_jobs(&self) -> Result<Vec<Job>> {
let jobs = sqlx::query_as!(
QueryJob,
r#"..."#,
)
.fetch_all(&self.pg_pool)
.await
.map_err(Error::Database)?;
Ok(jobs
.into_iter()
.filter_map(|job| match job.into() {
Ok(job) => Some(job),
Err(error) => {
tracing::error!(?error, "Failed to deserialize job from Postgres");
None
}
})
.collect())
}
```