# Job Queue ## Changelog * 2024-08-12: Add [the "Better" section](#Better) to demonstrate general and "`JSONB` column approach"-specific improvements you can possibly make. * 2024-08-06: Initial version <details> <summary>Code from tweets</summary> <img src="https://hackmd.io/_uploads/HkCG851qR.png" /> <img src="https://hackmd.io/_uploads/r1yr8qycA.jpg" /> </details> A few observations first: 1. `enum Job` has two variants with the same contents: * `ProcessMetricsUpload(JobDetails)` * `SendVerificationEmail(JobDetails)` Sometimes it totally makes sense to do that, other times you'll do just fine with a simple ```rust= struct Job { kind: JobKind, } enum JobKind { ProcessMetricsUpload, SendVerificationEmail, } ``` NB: This "kind-based" approach won't work if you want to store different data depending on the kind: ```rust= enum Job { ProcessMetricsUpload { metric_name: String }, SendVerificationEmail { email_address: EmailAddress }, } ``` Based on the code I've seen, it feels like `kind` would work well enough, but I won't make any assumptions to cover all possible use cases. 2. `struct Database` doesn't do much, it mostly copies the interface of `sqlx`. So one could say it's a level of inderection without a useful abstraction to leverage: even `query_one` and `query` methods are dependent on `sqlx` and `Postgres` functionality. If no abstraction is needed (i.e. you're not trying to follow "clean architecture" rules), you can just pass and `.clone()` `sqlx::PgPool` everywhere: ```rust= struct Queue { pg_pool: sqlx::PgPool, // Other fields... } ``` Now, based on observations above, we have three options available to us if we want to use compile-time type checking via `sqlx::query!` and `sqlx::query_as!` macros. ## 1. Kind-based approach ```rust= #[derive(sqlx::Type)] #[sqlx(type_name = "job_type", rename_all = "snake_case")] pub enum JobType { ProcessMetricsUpload, SendVerificationEmail, } // Notice how we don't need to derive anything here // (or implement any trait, for that matter). struct Job { id: String, r#type: JobType, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, } async fn get_available_jobs(&self) -> Result<Vec<Job>> { sqlx::query_as!( Job, r#" SELECT id , type AS "r#type: JobType" , locked_at , created_at , updated_at FROM job_queue "#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database) } ``` The `AS "r#type: JobType"` annotation is important! It helps `sqlx` to map `job_type` (a type stored directly in Postgres) to our `enum JobType`. Why do need need to explicitly repeat `JobType` here? `sqlx` doesn't assume there is only one possible way to convert Postgres's `job_type` into some arbitrary Rust type (there could be many!), so we nudge it a little. ## 2. Enum-based approach More type safety, nice! We will need an intermediate struct to store raw Postgres data, which we can easily convert into our "domain-level" (sorry) struct. ```rust= //////////////////////////////////////////////////////////////////////////////// // `job.rs` enum Job { ProcessMetricsUpload(JobDetails), SendVerificationEmail(JobDetails), } struct JobDetails { id: String, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, } //////////////////////////////////////////////////////////////////////////////// // `queue.rs` async fn get_available_jobs(&self) -> Result<Vec<Job>> { let jobs = sqlx::query_as!( QueryJob, r#" SELECT id , type AS "r#type: QueryJobType" , locked_at , created_at , updated_at FROM job_queue "#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database)?; Ok(jobs .into_iter() .map(Job::from) // Implementation below. .collect()) } // I usually name such struct with `Query` prefix // to signal that this struct is used only for // `SELECT`-ing data from Postgres and nothing else. struct QueryJob { id: String, r#type: QueryJobType, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, } #[derive(sqlx::Type)] #[sqlx(type_name = "job_type", rename_all = "snake_case")] enum QueryJobType { ProcessMetricsUpload, SendVerificationEmail, } // With this, we can write `query_job.into()` to get a `Job`. // Or, alternatively, `Job::from(query_job)`. impl From<QueryJob> for Job { fn from(j: QueryJob) -> Job { let details = JobDetails { id: j.id, locked_at: j.locked_at, created_at: j.created_at, updated_at: j.updated_at, }; match j.r#type { QueryJobType::ProcessMetricsUpload => Job::ProcessMetricsUpload(details), QueryJobType::SendVerificationEmail => Job::SendVerificationEmail(details), } } } ``` ## 3. Advanced enums I think it's fair to assume some day different types of jobs will need different data. Without coming up with some crazy schemas (e.g. using a table per type), we can store all the data in a single table. On the spot, I can think of two ways: ### Many columns Store common data in standalone columns ```psql= id VARCHAR(64) NOT NULL , locked_at TIMESTAMPTZ , created_at TIMESTAMPTZ , updated_at TIMESTAMPTZ ``` and store type-related data in optional columns for each type: ```psql= -- Metrics upload. , metrics_upload__metric_name TEXT , metrics_upload__uploaded_at TIMESTAMPTZ -- Verification emails. , verification_email__email_address TEXT , verification_email__sent_at TIMESTAMPTZ ``` Now you can declare `QueryJob` with all the columns: ```rust= //////////////////////////////////////////////////////////////////////////////// // `job.rs` // Let's say some type-specific fields are required // (e.g. `metric_name` or `email_address`). enum Job { ProcessMetricsUpload { details: JobDetails, metric_name: String, uploaded_at: Option<DateTime<Utc>>, }, SendVerificationEmail { details: JobDetails, email_address: String, sent_at: Option<DateTime<Utc>>, }, } struct JobDetails { id: String, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, } //////////////////////////////////////////////////////////////////////////////// // `queue.rs` struct QueryJob { id: String, r#type: QueryJobType, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, // Metrics upload. metric_name: Option<String>, // Optional due to Postgres column being `NULL`-able. metric_uploaded_at: Option<DateTime<Utc>>, // Verification emails. verification_email_address: Option<String>, // Optional due to Postgres column being `NULL`-able. verification_email_sent_at: Option<DateTime<Utc>>, } #[derive(sqlx::Type)] #[sqlx(type_name = "job_type", rename_all = "snake_case")] enum QueryJobType { ProcessMetricsUpload, SendVerificationEmail, } async fn get_available_jobs(&self) -> Result<Vec<Job>> { let jobs = sqlx::query_as!( QueryJob, r#" SELECT id , type AS "r#type: QueryJobType" , locked_at , created_at , updated_at , metrics_upload__metric_name AS metric_name , metrics_upload__uploaded_at AS metric_uploaded_at , verification_email__email_address AS verification_email_address , verification_email__sent_at AS verification_email_sent_at FROM job_queue "#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database)?; let jobs = jobs .into_iter() .map(Job::try_from) // See: https://doc.rust-lang.org/rust-by-example/error/iter_result.html#fail-the-entire-operation-with-collect .collect::<Result<Vec<Job>>()?; Ok(jobs) } impl TryFrom<QueryJob> for Job { type Error = crate::Error; fn try_from(j: QueryJob) -> Result<Job, crate::Error> { let details = JobDetails { id: j.id, locked_at: j.locked_at, created_at: j.created_at, updated_at: j.updated_at, }; match j.r#type { QueryJobType::ProcessMetricsUpload => { // Manually process every required field here. // // `Error::JobIntergrity` is a new error variant // to help you debug errors when they occur. // Be specific so that future you won't spend // more than one or two minutes fixing possible issue. let metric_name = j .metric_name .ok_or(crate::Error::JobIntegrity { missing_field: "metric_name", })?; Ok(Job::ProcessMetricsUpload { details, metric_name, metric_uploaded_at: j.metric_uploaded_at, }) } QueryJobType::SendVerificationEmail => { let verification_email_address = j .verification_email_address .ok_or(crate::Error::JobIntegrity { missing_field: "verification_email_address", })? Ok(Job::SendVerificationEmail { details, verification_email_address, verification_email_sent_at: j.verification_email_sent_at, }) } } } } ``` ### Single JSON(B) column Store common data in standalone columns ```psql= id VARCHAR(64) NOT NULL , locked_at TIMESTAMPTZ , created_at TIMESTAMPTZ , updated_at TIMESTAMPTZ ``` and store type-related data in a single JSONB column: ```psql= , data JSONB (NOT NULL?) ``` ```rust= //////////////////////////////////////////////////////////////////////////////// // `job.rs` enum Job { ProcessMetricsUpload { details: JobDetails, metric_name: String, uploaded_at: Option<DateTime<Utc>>, }, SendVerificationEmail { details: JobDetails, email_address: String, sent_at: Option<DateTime<Utc>>, }, } struct JobDetails { id: String, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, } //////////////////////////////////////////////////////////////////////////////// // `queue.rs` struct QueryJob { id: String, r#type: QueryJobType, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, // Type-specific data. data: serde_json::Value, } #[derive(sqlx::Type)] #[sqlx(type_name = "job_type", rename_all = "snake_case")] enum QueryJobType { ProcessMetricsUpload, SendVerificationEmail, } async fn get_available_jobs(&self) -> Result<Vec<Job>> { let jobs = sqlx::query_as!( QueryJob, r#" SELECT id , type AS "r#type: QueryJobType" , locked_at , created_at , updated_at , data FROM job_queue "#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database)?; let jobs = jobs .into_iter() .map(Job::try_from) // See: https://doc.rust-lang.org/rust-by-example/error/iter_result.html#fail-the-entire-operation-with-collect .collect::<Result<Vec<Job>>()?; Ok(jobs) } impl TryFrom<QueryJob> for Job { type Error = crate::Error; fn try_from(j: QueryJob) -> Result<Job, crate::Error> { let details = JobDetails { id: j.id, locked_at: j.locked_at, created_at: j.created_at, updated_at: j.updated_at, }; match j.r#type { QueryJobType::ProcessMetricsUpload => { #[derive(serde::Deserialize)] struct ProcessMetricsUploadJobData { metric_name: String, metric_uploaded_at: Option<DateTime<Utc>>, } let data: ProcessMetricsUploadJobData = serde_json::from_value(&j.data)?; Ok(Job::ProcessMetricsUpload { details, metric_name: data.metric_name, metric_uploaded_at: data.metric_uploaded_at, }) } QueryJobType::SendVerificationEmail => { #[derive(serde::Deserialize)] struct SendVerificationEmailJobData { email_address: String, email_sent_at: Option<DateTime<Utc>>, } let data: SendVerificationEmailJobData = serde_json::from_value(&j.data)?; Ok(Job::SendVerificationEmail { details, verification_email_address: data.verification_email_address, verification_email_sent_at: data.verification_email_sent_at, }) } } } } ``` There are a few things you can improve here, but I omitted them for brevity. Let me know if you're interested in it and I'll create a separate HackMD doc. #### Better The example above is quite suboptimal in a few ways: 1. For common fields, we have to duplicate `details: JobDetails` for each enum variant. It prevents us from easily mutating common fields, because we need to match on each variant. 2. Since Rust doesn't treat enum variants as separate types ([yet!](https://github.com/rust-lang/rfcs/pull/2593)), we can't add additional functionality to specific variants (i.e. `impl Job::SendVerificationEmail`). 3. We didn't define clear boundary between "domain" types and "storage" types: even though we have inline structs like `ProcessMetricsUploadJobData` and `SendVerificationEmailJobData`, it's not clear how we can achieve `INSERT`-s. Let's see how we can mitigate these flaws. ##### Common fields and separate functionality We can create a "root" struct with common fields and store type-specific fields in the `JobPayload` enum: ```rust= struct Job { id: String, status: JobStatus, locked_at: Option<DateTime<Utc>>, created_at: Option<DateTime<Utc>>, updated_at: Option<DateTime<Utc>>, payload: JobPayload, } enum JobStatus { // Your statuses here... } enum JobPayload { ProcessMetricsUpload(job_payload::ProcessMetricsUpload), SendVerificationEmail(job_payload::SendVerificationEmail), } mod job_payload { struct ProcessMetricsUpload { storage_id: String, path: String, full_path: String, } struct SendVerificationEmail { email: String, verification_code: String, } } ``` Since our `JobPayload` enum simply wraps payload-specific structs, we can do easily do this for common fields: ```rust= impl Job { fn lock(&mut self) { self.locked_at = Some(Utc::now()); } } ``` And also **this!** for payload-specific fields: ```rust= mod job_payload { impl ProcessMetricsUpload { fn replace_storage_id(&mut self, new_id: String) -> String { std::mem::replace(&mut self.storage_id, new_id) } // Any method you may need... } } impl Job { fn process_metrics_upload_mut(&mut self) -> Option<&mut job_payload::ProcessMetricsUpload> { match self.payload { JobPayload::ProcessMetricsUpload(payload) => Some(&mut payload), _ => None, } } } // Usage fn process_job(job: Job) { if let Some(metrics) = job.process_metrics_upload_mut() { let old_id = metrics.replace_storage_id("123".into()); tracing::debug!(?old_id, "Changed storage id"); } } ``` ##### Clear mapping between "domain" and "storage" types Now that we have `job_payload::ProcessMetricsUpload` and `job_payload::SendVerificationEmail` as separate types, we can significantly simplify conversion from `QueryJob` into `Job`: ```rust= impl TryFrom<QueryJob> for Job { type Error = crate::Error; fn try_from(j: QueryJob) -> Result<Job, crate::Error> { // NB: The careful reader will notice that // we don't really have to match on `r#type`, // but I will omit the explanation for now // to make it more digestible. let payload = match j.r#type { QueryJobType::ProcessMetricsUpload => { JobPayload::ProcessMetricsUpload( serde_json::from_value(&j.payload)? ) } QueryJobType::SendVerificationEmail => { JobPayload::SendVerificationEmail( serde_json::from_value(&j.payload)? ) } }; Ok(Job { id: j.id, locked_at: j.locked_at, created_at: j.created_at, updated_at: j.updated_at, payload, }) } } ``` Each variant of `JobPayload` contains only type-specific data and we store `payload JSON(B)` column with exactly this data and nothing more or less, so conversion is really straightforward: `serde_json::Value` -> `JobPayload`. A small nit: I don't like to perform "heavy" operations (like parsing JSON-s) inside the `try_from` method, since it hides the cost of such operation for the reader. We can just replace it with ```rust= fn build_job(query_job: QueryJob) -> Result<Job, crate::Error> { // Same implementation as in `try_from`. } ``` (A thing of preference, I guess) Another small nit: I always keep `impl TryFrom<QueryThing> for Thing` close to `QueryThing` (and not close to `Thing`), since `Thing` shouldn't really care about `QueryThing` at all. The same applies to `impl TryFrom<Thing> for QueryThing` (I keep it near `QueryThing`). Back to the code, we have one downside with this simplified conversion: any change to the `JobPayload` type should be considered breaking and the compiler doesn't have any means to detect such changes when calling `serde_json::from_value`. For that, we'll need a separate `QueryJobPayload` type with these goals: 1. Changes to `JobPayload` don't immediately break conversion from `serde_json::Value`, because our conversion will look like this: `serde_json::Value` (raw, Postgres level) -> `QueryJobPayload` (intermediate, "expected from Postgres" level) -> `JobPayload` (domain level). 2. `QueryJobPayload` only reflects how payload is expected to be stored in Postgres and may differ from `JobPayload` in any way until it's convertible. 3. Track breaking changes with `QueryJobPayload` exclusively, without having to worry about the rest of the application breaking. ```rust= struct QueryJob { id: String, r#type: QueryJobType, // We don't really need this anymore! payload: serde_json::Value, // We can leverage clever tricks to use `QueryJobPayload` instead of `serde_json::Value` here, // but I won't in order to keep the code simpler and easier to understand. } // One-to-one representation of the shape of data we expect from the JSONB column. enum QueryJobPayload { ProcessMetricsUpload { storage_id: String, } } fn build_job(query_job: QueryJob) -> Result<Job, crate::Error> { // `query_job.payload: serde_json::Value` should already know which type of the job this is, // so we can deserialize from JSON into the `QueryJobPayload` enum without looking at `r#type`. let query_payload: QueryJobPayload = serde_json::from_value(&query_job.payload)?; Ok(Job { id: query_job.id, payload: query_payload.into(), }) } // Any breaking change to `QueryJobPayload` will break this `impl` // and you'll know whether you need to adjust anything // in the surrounding code. impl From<QueryJobPayload> for JobPayload { fn from(p: QueryJobPayload) -> JobPayload { match p { // ... } } } ``` With this, we can do `INSERT`-s the following way: ```rust= async fn insert_job(pg_pool: &sqlx::PgPool, job: Job) -> Result<()> { let query_payload: QueryJobPayload = job.payload.into(); let payload = serde_json::to_value(&query_payload)?; sqlx::query!( r#" INSERT INTO job_queue( id, locked_at, payload ) VALUES ($1, $2, $3), "#, job.id, job.locked_at, query_payload, ) .execute(pg_pool) .await?; Ok(()) } #[derive(serde::Serialize)] enum QueryJobPayload { ProcessMetricsUpload { storage_id: String, } } // Any breaking change to `QueryJobPayload` will break the implementation here. impl From<JobPayload> for QueryJobPayload { // Implementation... } ``` ### Bonus: better error handling Suppose one of many jobs has its data corrupted. What will `get_available_jobs` return? ```rust= async fn get_available_jobs(&self) -> Result<Vec<Job>> { ... } ``` That's right, an error. Kinda lame that one single job results in throwing away all other jobs. Let's change the signature a bit: ```rust= async fn get_available_jobs(&self) -> Result< Vec<Result<Job, crate::Error>>, crate::Error, > { ... } ``` Now, we have outer `Result` which can return an error when something really unexpected happens (like Postgres is unavailable). If we managed to `SELECT` raw data (meaning rows represented by `QueryJob`), then we can return a vector of results where each result represents possible failure of converting that one respective job. ```rust= async fn get_available_jobs(&self) -> Result< Vec<Result<Job, crate::Error>>, crate::Error, > { let jobs = sqlx::query_as!( QueryJob, r#"..."#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database)?; Ok(jobs .into_iter() .map(Job::try_from) .collect()) } // Usage async fn complex_routine() { let jobs: Vec<Job> = get_available_jobs() .await .inspect_err(|error| tracing::error!(?error, "Failed to get available jobs from Postgres") )? // Now, filter out all errors and leave only // successfully deserialized jobs. .into_iter() .filter_map(|result| match result { Ok(job) => Some(job), Err(error) => { tracing::error!(?error, "Failed to deserialize job from Postgres"); None } // Or use `.ok()`: // result // .inspect_err(|error| tracing::error!(?error, "Failed to deserialize job from Postgres")) // .ok() }) .collect(); } ``` Admittedly, it's a bit more hustle. Why should we even do that? I can think of at least one reason: if we consider the application in layers, "lower level" layers should not make decisions for "higher" ones. For instance, `get_available_jobs` should not throw out valid jobs if one of them is corrupted - only "higher level" layer should decide what to do with such jobs. In case we consider `get_available_jobs` a "higher level" layer, we can filter out and log corrupted jobs before returning them: ```rust= async fn get_available_jobs(&self) -> Result<Vec<Job>> { let jobs = sqlx::query_as!( QueryJob, r#"..."#, ) .fetch_all(&self.pg_pool) .await .map_err(Error::Database)?; Ok(jobs .into_iter() .filter_map(|job| match job.into() { Ok(job) => Some(job), Err(error) => { tracing::error!(?error, "Failed to deserialize job from Postgres"); None } }) .collect()) } ```