# Valinor Tech Design
## Notation Guide
This document uses Mermaid class diagrams to illustrate the database schema and relationships. Key notation:
- **`has_many(r_w_e)`** - `dependent: :restrict_with_exception` - Prevents deletion if associated records exist
- **`has_many(destroy)`** - `dependent: :destroy` - Cascading delete of associated records
- **`has_many(false)`** - `dependent: false` - No automatic deletion of associated records
- **`has_one(r_w_e)`** - Same as has_many but for singular associations
- **`belongs_to`** - Foreign key relationship
- **`<<joinTable>>`** - Indicates a pure join table between two entities
- **`Uuid?`** - Optional/nullable UUID field
- **`#attachment_name`** - ActiveStorage attachment (e.g., `#input_files` for uploaded files)
## Data Flow Overview
Valinor processes data through the following high-level workflow:
1. **Upload** - User uploads files via `DataUpload` containing multiple sheets
2. **Ingestion** - Each sheet becomes a `Dataset` with a `DatasetVersion`. An ingestion `Transformation` parses the file and creates `DataRow` records according to the `SchemaTable` definition
3. **Schema Transformation** - `DataFlow` orchestrates multi-step transformations:
- MGA schema → Noldor schema (first `SchemaMapping`)
- Noldor schema → Carrier schema (second `SchemaMapping`)
- Each transformation reads source `DataRow`s and writes transformed `DataRow`s to the output `DatasetVersion`
4. **Output Generation** - Final `Transformation` generates output files (CSV, JSON) from the transformed data
5. **Approval & Delivery** - Approved outputs are made available for download or delivery to carriers
Key concepts:
- **Schemas** define table structures with headers and data types
- **SchemaMappings** define field-level transformations between schemas using `HeaderMapping`s
- **OutputConfigs** specify which input tables are needed to generate a carrier's output file
- **DataFlows** are instances of OutputConfigs for specific reporting periods
### Data Flow Diagram
```mermaid
flowchart LR
Start([User<br/>uploads<br/>files])
subgraph DataUpload["<b>DataUpload Phase</b>"]
direction TB
Upload[DataUpload<br/>created with<br/>#input_files]
Parse[Parse file<br/>into sheets]
CreateDS[Create<br/>Dataset +<br/>DatasetVersion<br/>per sheet]
Match{Match to<br/>SchemaTable?}
Wait1[Wait for<br/>user match]
Upload --> Parse --> CreateDS --> Match
Match -->|No| Wait1 --> IngestT
Match -->|Yes| IngestT
subgraph Ingestion["<b>Ingestion Transformation</b>"]
direction TB
IngestT["<b>Transformation</b><br/><code>ingestion</code><br/>mapping: NULL"]
ParseR[Parse rows per<br/>SchemaTable]
CreateDR[Create DataRows<br/>with<br/>data_values_json]
IngestComplete[DataRows stored<br/>in ingested<br/>DatasetVersion]
IngestT --> ParseR --> CreateDR --> IngestComplete
end
end
subgraph Check["<b>DataFlow Readiness Check</b>"]
direction TB
EnsureDF{DataFlow exists<br/>for period?}
NeedsDS{Needs this<br/>dataset?}
AddDS[Add dataset to<br/>existing DataFlow]
NewDF[Create new<br/>DataFlow]
ReadyCheck{All required tables<br/>have approved<br/>ingestions?}
Wait2[Wait for more<br/>datasets/approvals]
TriggerDF[Mark DataFlow<br/>in_progress]
EnsureDF -->|Yes| NeedsDS
EnsureDF -->|No| NewDF
NeedsDS -->|Yes| AddDS
NeedsDS -->|No| Wait2
AddDS --> ReadyCheck
NewDF --> ReadyCheck
ReadyCheck -->|No| Wait2
ReadyCheck -->|Yes| TriggerDF
end
subgraph DataFlow["<b>DataFlow Phase</b>"]
direction TB
CreateDF[Create DataFlow<br/>+ output<br/>DatasetVersion]
subgraph Step1["<b>Step 1: Approval Check</b>"]
direction TB
S1["<b>DataFlowStep</b><br/><code>ingestions_approved</code>"]
S1Check{All<br/>approved?}
S1 --> S1Check
end
subgraph Step2["<b>Step 2: MGA → Noldor</b>"]
direction TB
S2["<b>DataFlowStep</b><br/><code>schema_transform</code>"]
T1["<b>Transformation</b><br/><code>schema_transform</code>"]
Read1["Read DataRows<br/><i>from primary table</i><br/><i>ingested version</i>"]
Apply1[Apply<br/>SchemaMapping<br/>HeaderMappings]
Write1["Write to<br/>DataFlow output<br/>DatasetVersion"]
S2 --> T1 --> Read1 --> Apply1 --> Write1
end
subgraph Step3["<b>Step 3: Noldor → Carrier</b>"]
direction TB
S3["<b>DataFlowStep</b><br/><code>schema_transform</code>"]
T2["<b>Transformation</b><br/><code>schema_transform</code>"]
Read2["Read DataRows<br/><i>from Step 2</i><br/><i>output version</i>"]
Apply2[Apply<br/>SchemaMapping<br/>HeaderMappings]
Write2["Write to<br/>same output<br/>DatasetVersion"]
S3 --> T2 --> Read2 --> Apply2 --> Write2
end
subgraph Step4["<b>Step 4: Generate Output</b>"]
direction TB
S4["<b>DataFlowStep</b><br/><code>output</code>"]
T3["<b>Transformation</b><br/><code>output</code><br/>mapping: inherited or NULL"]
Read3["Read final<br/>transformed<br/>DataRows"]
Generate[Generate<br/>CSV/JSON files]
Attach[Attach files to<br/>DatasetVersion]
S4 --> T3 --> Read3 --> Generate --> Attach
end
subgraph Step5["<b>Step 5: Completion & Delivery</b>"]
direction TB
S5["<b>DataFlowStep</b><br/><code>data_flow_completed</code>"]
PortalApprove{Portal<br/>Approved?}
WebhookSend{Send to<br/>Webhooks?}
DataClientSend{Send to<br/>Data Client?}
S5 --> PortalApprove
PortalApprove -->|Yes| WebhookSend
PortalApprove -->|Yes| DataClientSend
end
CreateDF --> Step1
S1Check -->|Yes| Step2
Step2 --> Step3
Step3 --> Step4
Step4 --> Step5
end
End1([Deliver to<br/>carrier])
End2([Revise and<br/>retry])
Start --> DataUpload
DataUpload --> Check
Check --> DataFlow
DataFlow --> End1
WebhookSend -->|Yes| End1
DataClientSend -->|Yes| End1
PortalApprove -->|No| End2
%% Styling
classDef uploadPhase fill:#e1f5e1,stroke:#333,stroke-width:2px
classDef ingestPhase fill:#fff3cd,stroke:#333,stroke-width:2px
classDef checkPhase fill:#ffe0b2,stroke:#333,stroke-width:2px
classDef dataFlowPhase fill:#cfe2ff,stroke:#333,stroke-width:2px
classDef outputPhase fill:#f8d7da,stroke:#333,stroke-width:2px
class DataUpload uploadPhase
class Ingestion ingestPhase
class Check checkPhase
class DataFlow,Step1,Step2,Step3 dataFlowPhase
class Step4,Step5 outputPhase
```
**Key Points:**
- **DataUpload Phase** (green): User uploads files, system creates Dataset + DatasetVersion for each sheet
- **Ingestion Phase** (yellow): `Transformation` (type: `ingestion`) parses raw file data into structured `DataRows` based on `SchemaTable` definition
- **Readiness Check** (orange): `DataFlow::Ensure` creates/reuses DataFlows; the flow only starts when ALL required input tables have approved ingestions meeting `dataset_count` requirements
- **DataFlow Phase** (blue): `DataFlow` orchestrates multiple `DataFlowStep`s, each containing `Transformation`s (type: `schema_transform`) that apply `SchemaMapping`s
- **Output Phase** (red): Final `Transformation` (type: `output`) generates export files; output can be delivered via Portal approval, Webhooks, or Data Client
**Transformation Types:**
- `ingestion`: Parses uploaded files → DataRows (in DataUpload context), `schema_mapping` is NULL
- `schema_transform`: Applies SchemaMapping rules to transform DataRows (in DataFlow context), `schema_mapping` is required
- `output`: Generates final export files from DataRows (in DataFlow context), `schema_mapping` is inherited from last schema_transform step or NULL
## Organization Design
```mermaid
classDiagram
source
direction LR
Organization --> Datasource : has_many(r_w_e)
Organization --> OrganizationMembership : has_many(r_w_e)
Organization --> Program : has_many(r_w_e)
Organization --> ProgramAgreement : has_many(r_w_e)
Organization --> User : has_many(r_w_e)
User <-- OrganizationMembership : belongs_to
class Organization{
Uuid id
String name
String key
Enum organization_type
Array~String~ output_formats
String? google_drive_url
Datetime? deleted_at
}
class Datasource{
Uuid id
Uuid organization_id
Enum datasource_type
Jsonb? config
Datetime? deleted_at
}
class User{
Uuid id
Uuid organization_id
Uuid? identity_provider_id
String auth0_uid
String first_name
String last_name
String email_address
Bool noldor_admin
Bool noldor_developer
Bool email_verified
Datetime? deleted_at
}
class OrganizationMembership{
Uuid id
Uuid organization_id
Uuid user_id
Enum role
Datetime? last_viewed_at
Datetime? deleted_at
}
class Program{
Uuid id
Uuid organization_id
String name
Datetime? deleted_at
}
class ProgramAgreement{
<<joinTable>>
Uuid id
Uuid organization_id
Uuid mga_program_id
Uuid carrier_program_id
Date effective_date
Date expiration_date
Bool auto_approve_output_files
Datetime? deleted_at
}
```
### Organization
:::info
Organization represents a customer (or Noldor) entity for the sake of segmenting data.
:::
- `organization_type` possible values:
- `noldor` [default], `carrier`, `mga`, `tpa`, `broker`, `other`
- `output_formats` has a default of `["json", "csv"]`
### Datasource
:::info
Datasource represents a source of data for an Organization (e.g. manual upload or an API integration).
:::
- `datasource_type` possible values:
- `manual_upload` [default]
### User
:::info
A user represents someone who can log into the system. Each user belongs to a primary organization.
:::
- `noldor_admin` has a default of `false`
- `noldor_developer` has a default of `false`
- `email_verified` has a default of `false`
### OrganizationMembership
:::info
User membership relationship to an organization. This grants users access and permissions to organizations beyond their primary organization.
:::
- `role` possible values:
- `admin` [default]
The `user_id` links to the User table, establishing which user has membership in the organization.
### Program
:::info
A Program is a logical grouping of Program Agreements that address a single market segment from the organization's perspective.
In the case of an MGA, a Program represents a market segment that the MGA is underwriting with one or more carriers / fronting companies / reinsurers providing the paper and capacity to serve it.
In the case of a carrier / fronting company / reinsurer, a Program represents a collection of MGAs' Programs that all have a comparable market segment and that the carrier / fronting company / reinsurer has a Program Agreement with to provide paper and capacity.
:::
Can be ordered by a combo of organization name and program name.
### ProgramAgreement
:::info
A ProgramAgreement is a contract between an MGA and a carrier / fronting company / reinsurer that authorizes the MGA to underwrite policies that are backed or partially backed by the carrier / fronting company / reinsurer.
This contract details the terms and guidelines the MGA is authorized to operate within as well as commission rates, etc.
:::
- `auto_approve_output_files` has a default of `false`
:::warning
**DEPRECATED**: `SchemaTablesProgramAgreement` is deprecated. Schema tables should now be linked to program agreements through output configs by setting them as input tables.
:::
## Schema Design
```mermaid
classDiagram
source
direction LR
Organization <-- Schema : belongs_to
Schema --> SchemaTable : has_many(r_w_e)
SchemaTable --> SchemaHeader : has_many(destroy)
SchemaTable --> SchemaTablesProgramAgreement : has_many(destroy)
`#seed_file` "0..1" ()-- SchemaTable : has_one
ProgramAgreement <-- SchemaTablesProgramAgreement : belongs_to
class Organization{
Uuid id
...
}
class Schema{
Uuid id
Uuid organization_id
Int version
Datetime? deleted_at
Datetime? published_at
}
class ProgramAgreement{
Uuid id
...
}
class SchemaTable{
Uuid id
Uuid schema_id
Uuid organization_id
String name
Enum dataset_type
String? sheet_name
String? header_spec
Int header_row_index
Datetime? deleted_at
}
class SchemaHeader{
Uuid id
Uuid schema_table_id
Uuid organization_id
String name
Enum data_type
Bool required
Bool unique
Jsonb? metadata
Int order
Text? description
Datetime? deleted_at
}
class SchemaTablesProgramAgreement{
<<joinTable>>
Uuid id
Uuid schema_table_id
Uuid program_agreement_id
Uuid organization_id
Datetime? deleted_at
}
```
### Schema
:::info
Schema represents the collective structure of an Organization's data
:::
- `version` has a default of `1`
### SchemaTable
:::info
SchemaTable represents a single model that makes up part of an Organization's data (e.g. a Policy).
:::
- `dataset_type` possible values:
- `policy` [default], `risk`, `claim`, `location`, `exposure`, `broker`, `transaction`, `other`
- `header_row_index` (a note in the code says this will go away and be replaced with `header_spec`) has a default of `1`
- `header_spec` is a String with integers that are separated by non-decimal characters (e.g., `"3,4,5"`). These describe the row numbers where you can find the headers.

The SchemaTable can optionally have an attached `seed_file` (ActiveStorage attachment). This file builds out the headers and provides a view of what a table looks like.
Header names are generated by concatenating multiple vertical header strings together with spaces from the `header_spec`.

### SchemaHeader
:::info
SchemaHeader represents a single column/attribute of an Object as part of an Organization's data (e.g. Policy.policy_number).
:::
- `data_type` possible values:
- `string` [default], `decimal`, `integer`, `boolean`, `date`, `percent`, `dollars`, `dollars_with_cents`, `datetime`, `float`, `time`, `big_decimal`
- `required` has a default of `false`
- `unique` has a default of `false`
- `order` has a default of `0`
### SchemaTablesProgramAgreement
:::warning
**DEPRECATED**: This model is being phased out. Link schema tables to program agreements through `OutputConfig` by setting them as input tables instead.
:::
:::info
SchemaTablesProgramAgreement is a join table that associates one or more schema tables with one or more program agreements.
:::
This is unique by `schema_table_id` and `program_agreement_id`.
## SchemaMapping Design
```mermaid
classDiagram
source
`Schema (Input)` <-- SchemaMapping : belongs_to
`Schema (Output)` <-- SchemaMapping : belongs_to
SchemaMapping --> Tag : has_many(destroy)
Tag <-- Tagging : belongs_to
HeaderMapping --> Tagging : has_many(destroy)
SchemaMapping --> HeaderMapping : has_many(destroy)
SchemaMapping --> Variable : has_many(destroy)
SchemaMapping --> HeaderGroup : has_many(destroy)
HeaderGroup --> HeaderGroupSchemaHeader : has_many(destroy)
SchemaHeader <-- HeaderGroupSchemaHeader : belongs_to
class `Schema (Input)`{
Uuid id
...
}
class `Schema (Output)`{
Uuid id
...
}
class SchemaMapping{
Uuid id
Uuid input_schema_id
Uuid output_schema_id
Uuid organization_id
Uuid? parent_id [SchemaMapping?]
Uuid? output_table_id [SchemaTable?]
Bool deprecated_by_output_config_migration
Bool exclude_rows
Bool remove_duplicate_output_rows
Bool split_input_rows_into_multiple_output_rows
Text? notes
Datetime? deleted_at
}
class HeaderMapping{
Uuid id
Uuid schema_mapping_id
Uuid organization_id
Uuid? input_header_id [SchemaHeader?]
Uuid? output_header_id [SchemaHeader?]
Uuid? input_variable_id [Variable?]
Uuid? output_variable_id [Variable?]
Enum input_type
Enum output_type
Enum source_type
String? input_value
String? output_value
Int order
Jsonb? config
Text? notes
Datetime? deleted_at
}
class Tagging{
<<joinTable>>
Uuid id
Uuid tag_id
String tagged_type
Uuid tagged_id
Uuid organization_id
Datetime? deleted_at
}
class Tag{
Uuid id
Uuid organization_id
Uuid schema_mapping_id
Uuid? condition_group_id
String name
String color
Datetime? deleted_at
}
class Variable{
Uuid id
Uuid schema_mapping_id
Uuid organization_id
String name
String key
Enum data_type
Text? default_value
Datetime? deleted_at
}
class HeaderGroup{
Uuid id
Uuid schema_mapping_id
Uuid organization_id
String label
Datetime? deleted_at
}
class HeaderGroupSchemaHeader{
<<joinTable>>
Uuid id
Uuid header_group_id
Uuid schema_header_id
Datetime? deleted_at
}
class SchemaHeader{
Uuid id
...
}
```
### SchemaMapping
:::info
SchemaMapping represents a mapping for transforming data between two schemas.
:::
- `deprecated_by_output_config_migration` has a default of `false`
- `exclude_rows` has a default of `false`
- `remove_duplicate_output_rows` has a default of `false`
- `split_input_rows_into_multiple_output_rows` has a default of `false`
Defines how to transform data from an input schema to an output schema. Contains header mappings, variables, and transformation logic.
Supports the following `ConditionGroup`'s:
- `has_conditions :exclude_row` - Creates `exclude_row_condition_group` for conditional row exclusion (works with the `exclude_rows` boolean column)
- `has_conditions :header_group` - Creates `header_group_condition_group` for conditional row splitting (works with `header_groups` and the `split_input_rows_into_multiple_output_rows` boolean column)
A schema mapping can inherit from another schema mapping with the `parent_id` column.
:::warning
If the optional `output_table` is present (mapping to carrier table) then no `HeaderMapping`'s should point to any other `SchemaTable`'s besides this `output_table`.
:::
:::info
The `OutputConfig`'s can share `SchemaMapping`'s, but within a `SchemaMapping` it will NOT run the `HeaderMapping`'s unless the upstream header exists in ONLY the `OutputConfig`'s primary input table.
:::
### HeaderMapping
:::info
HeaderMapping represents the collection of rules for mapping one SchemaHeader to another.
:::
- `input_type` possible values:
- `schema_header` [default], `string`, `variable`
- `output_type` possible values:
- `schema_header` [default], `variable`
- `source_type` possible values:
- `user_defined` [default], `suggested`, `unmapped`, `duplicate`
- `order` has a default of `0`
- `store` has the `accessors` of
- `dsl`, `dsl_text`
Conditions can be applied to a HeaderMapping through:
- It has a 0 or 1 `ConditionGroup`'s directly attached to itself.
- It `has_many` `tags` (through `taggings`). Note, the `Tag` can have 0 or 1 `ConditionGroup`'s.
### Tag
:::info
Tag header mappings with different tags that provide a highlight color and bring in conditions. Tags are scoped to schema mappings.
:::
- `color` has a default of `"#1a547a"`
- `schema_mapping_id` is NOT NULL - each tag belongs to a specific schema mapping
This can have 0 or 1 `ConditionGroup`'s attached to it. You can see this in action on the `HeaderMapping` through the `Tagging` join table.
### Variable
:::info
Variable represents a datastorage for a transform
:::
- `data_type` possible values (no default):
- `string`, `decimal`, `integer`, `boolean`, `date`, `percent`, `dollars`, `dollars_with_cents`, `datetime`, `float`, `time`, `big_decimal`
The `key` is unique for the `SchemaMapping` and the parent `SchemaMapping` (only one level up).
### HeaderGroup
:::info
HeaderGroup represents a collection of schema headers within a SchemaMapping, for use in splitting an input row into several output rows.
:::
### HeaderGroupSchemaHeader
:::info
HeaderGroupSchemaHeader is a join table that associates one or more header groups with one or more schema headers.
:::
## OutputConfig Design
```mermaid
classDiagram
source
SchemaMapping <-- SchemaMappingStep : belongs_to
OutputConfig --> SchemaMappingStep : has_many(r_w_e)
OutputConfig --> SchemaTablesOutputConfig : has_many(destroy)
`SchemaTable (Input)` <-- SchemaTablesOutputConfig : belongs_to
`SchemaTable (Output)` <-- OutputConfig : belongs_to
ProgramAgreement <-- OutputConfig : belongs_to
note for `SchemaTable (Input)` "Input tables it expects<br>before generating output.<br><br>Must have one primary."
note for `SchemaTable (Output)` "Output table it generates in the end"
note for SchemaMapping "1\. MGA->Noldor mapping<br>2\. Noldor->Carrier mapping<br><br>Final mapping MUST match<br>expected output table"
class OutputConfig{
Uuid id
Uuid program_agreement_id
Uuid output_table_id [SchemaTable]
Uuid organization_id
String label
Enum reporting_cadence
Datetime? deleted_at
}
class ProgramAgreement{
Uuid id
...
}
class `SchemaTable (Input)`:::input{
Uuid id
...
}
class `SchemaTable (Output)`:::output{
Uuid id
...
}
class SchemaTablesOutputConfig{
<<joinTable>>
Uuid id
Uuid? output_config_id
Uuid? input_table_id [SchemaTable?]
Int dataset_count
Int dependency_order
Bool primary_table
Bool optional
Datetime? deleted_at
}
class SchemaMappingStep{
Uuid id
Uuid schema_mapping_id
String parent_type
Uuid parent_id
Uuid organization_id
Int order
Datetime? deleted_at
}
class SchemaMapping{
Uuid id
...
Uuid? output_table_id [SchemaTable?]
...
}
classDef input fill:#90EE90,stroke:#333
classDef output fill:#87CEFA,stroke:#333
```
### OutputConfig
:::info
Configuration that specifies the dependencies needed to generate an output file for a carrier.
:::
- `reporting_cadence` possible values:
- `monthly`
The OutputConfig can have multiple input `SchemaTable`'s but only one output `SchemaTable`.
:::warning
The final `output_table` SHOULD match the last `SchemaMapping` step's `output_table`. This is not enforced, but can be expected to be true.
:::
### SchemaTablesOutputConfig
:::info
Tables needed to generate an output file for a carrier
:::
- `dataset_count` has a default of `1`
- `dependency_order` has a default of `0`
- `primary_table` has a default of `false`
- `optional` has a default of `false`
:::warning
The `output_config_id` and `input_table_id` columns can be `NULL` in the database. Although the model validates they are present.
:::
### SchemaMappingStep
:::info
Single step in a series of steps in an output workflow
:::
- `parent_type` / `parent_id` is a polymorphic reference (this is how it maps to `OutputConfig`)
This behaves like a polymorphic join table between a `SchemaMapping` and the `OutputConfig`. These steps seem to track the `Tranformation`'s.
## DataUpload Design
```mermaid
classDiagram
source
direction LR
DataSource <-- DataUpload : belongs_to
`#input_files` "1.." ()-- DataUpload : has_many
DataUpload --> Dataset : has_many(r_w_e)
SchemaTable "0..1" <-- Dataset : belongs_to
ProgramAgreement "0..1" <-- Dataset : belongs_to
Dataset --> DatasetVersion : has_many(false)
`#input_file` "1" ()-- DatasetVersion : has_one
note for SchemaTable "The <code>SchemaTable</code><br>is only set for<br>ingestion datasets"
class DataSource{
Uuid id
...
}
class DataUpload{
Uuid id
Uuid datasource_id
Uuid organization_id
Uuid? task_id
Date start_date
Date end_date
String? notes
Datetime? deleted_at
}
class Dataset{
Uuid id
Uuid data_upload_id
Uuid organization_id
Uuid? schema_table_id
Uuid? program_agreement_id
Uuid? data_flow_id
Enum dataset_type
Enum status
Bool approved
String filename
String? sheet_name
String? checksum
Int header_row_index
String? header_spec
Date start_date
Date end_date
Datetime? deleted_at
}
class DatasetVersion{
Uuid id
Uuid organization_id
Int version
Jsonb? input_headers
}
class SchemaTable{
Uuid id
...
}
class ProgramAgreement{
Uuid id
...
}
```
### DataUpload
:::info
DataUpload represents a top-level set of data to be ingested and transformed
:::
This record `has_many` attached files under `#input_files`.
The `DataUpload` can hold multiple files or one file with multiple sheets. Each sheet becomes a `DatasetVersion`.
Once you upload the files it will kick off the `OutputConfig` (by generating a `DataUpload` for the supplied reporting period) once the primary table has been matched with the corresponding sheet.
### Dataset
:::info
Dataset represents an individual table to be ingested and transformed
:::
- `dataset_type` possible values:
- `policy` [default], `risk`, `claim`, `location`, `exposure`, `broker`, `transaction`, `other`
- `status` possible values:
- `pending` [default], `in_progress`, `completed`, `failed`
- `approved` has a default of `false`
- `header_row_index` has a default of `1`
A Dataset can belong to either a `DataUpload` (for user-uploaded data) OR a `DataFlow` (for generated output data). This is a single sheet from a file (e.g., "risk", "policy", "claim" sheet).
### DatasetVersion
:::info
DatasetVersion represents one version of dataset (i.e. when a Dataset is updated with a new file, that creates a new version)
:::
- `version` has a default of `1`
This record `has_one` attached file under `#input_file`. It also has access to the `DataRow`'s and `Transformation`'s.
## Transformation Design
```mermaid
classDiagram
source
direction LR
DatasetVersion <-- Transformation : belongs_to
Transformation --> DataRow : has_many(false)
SchemaTable <-- DataRow : belongs_to
class Transformation{
Uuid id
Uuid organization_id
Uuid? schema_mapping_id
Uuid? retry_for_id
Uuid? data_flow_step_id
Enum transformation_type
Enum status
Enum approval_status
Datetime? approval_updated_at
Datetime? started_at
Datetime? ended_at
Bool recalculate_fields
Jsonb? metadata
}
class DatasetVersion{
Uuid id
...
}
class DataRow{
Uuid id
Uuid transformation_id
Uuid schema_table_id
Uuid organization_id
Uuid dataset_version_id
Uuid? batch_id
Enum approval_status
Int index
Int group
Jsonb data_values_json
}
class SchemaTable{
Uuid id
...
}
```
### Transformation
:::info
Transformation tracks the process of parsing a DatasetVersion file and creating child DataRow records from it for downstream use
:::
- `transformation_type` possible values:
- `ingestion` [default], `schema_transform`, `output`
- `status` possible values:
- `pending` [default], `in_progress`, `completed`, `failed`, `canceled`
- `approval_status` possible values:
- `pending` [default], `approved`, `rejected`
- `recalculate_fields` has a default of `false` (field exists in database but not actively used in code)
#### Transformation (Ingestion)
The user uploads a number of files in the `DataUpload` which creates a number of `Dataset`'s (and `DatasetVersion`'s) for each sheet.
For each of these `DatasetVersion`'s a `Transformation` (`transformation_type: :ingestion`) is kicked off. This will transform the sheet into its corresponding `Dataset#schema_table` as `DataRow`'s on the transformation.
See [Ingestion Pipeline](#ingestion-pipeline) for detailed processing steps.
#### Transformation (in DataFlow)
Within a `DataFlow` there will be at a minimum 5 steps (`DataFlowStep`). See [Schema Transform Pipeline](#schema-transform-pipeline) for detailed processing steps.
1. `step_type: :ingestions_approved` - this confirms all the ingested dataset versions have been approved
1. `step_type: :schema_transform` - the MGA -> Noldor schema mapping
- This contains at least one `Transformation` (`transformation_type: :schema_tranform`) that writes its `DataRow`'s to the corresponding output `DatasetVersion` of the `DataFlow`
1. `step_type: :schema_transform` - the Noldor -> carrier schema mapping
- This also contains at least one `Transformation` that writes the same `DatasetVersion`
1. `step_type: :output` - Generates the final output files (CSV, JSON, etc.)
- Contains `Transformation` (`transformation_type: :output`) that creates the exported files
1. `step_type: :data_flow_completed` - Marks the data flow as complete
### DataRow
:::info
DataRow represents one row of data. Data values are stored in the `data_values_json` JSONB column.
:::
- `approval_status` possible values:
- `pending` [default], `approved`, `rejected`
- `group` has a default of `-1`
- `data_values_json` has a default of `{}`
The system has fully transitioned to using `data_values_json` for storing data values. The structure is:
```json
{
"values": {
"schema_header_uuid_1": {
"value": "John Doe",
"mapped_from": "optional_source_info"
},
"schema_header_uuid_2": {
"value": "12345"
}
},
"multiple_values": {
"schema_header_uuid_1": [
{
"value": "Additional Value 1",
"mapped_from": "some_source"
},
{
"value": "Additional Value 2"
}
]
}
}
```
- Keys are `schema_header_id` (UUID strings)
- `values` contains the primary value for each header
- `multiple_values` contains additional values for headers that already have a value in `values` (used when splitting input rows)
- Each value object contains `value` (the actual data) and optionally `mapped_from` (tracking information)
## DataFlow Design
```mermaid
classDiagram
source
OutputConfig <-- DataFlow : belongs_to
`Dataset (Upstream)` <-- DataFlowsDataset : belongs_to
DataFlow --> DataFlowsDataset : has_many(destroy)
DataFlow --> `Dataset (Output)` : has_one(r_w_e)
DataFlow --> DataFlowStep : has_many(destroy)
DataFlowStep --> Transformation : has_many(nullify)
SchemaMapping "0..1" <-- DataFlowStep : belongs_to
note for DataFlow "A copy of <code>OutputConfig</code><br>for each reporting period"
note for `Dataset (Upstream)` "This is the expected<br>list of input data sheets.<br><br>These were previously<br>ingested by a Transformation."
note for `Dataset (Output)` "This is the<br>expected<br>final output<br>sheet"
note for Transformation "Data rows<br>live here.<br>Accumulated<br>data rows<br>can be accessed<br>through<br><code>DatasetVersion</code>."
note for DataFlowStep "\- Ingestion step<br>\- Map MGA->Noldor<br>\- Map Noldor->Carrier<br>\- Output step<br>\- Completion step"
class DataFlow{
Uuid output_config_id
Uuid id
Uuid organization_id
Enum status
Enum approval_status
Bool from_migration
Datetime reporting_period_start_date
Datetime? deleted_at
}
class OutputConfig{
Uuid id
...
}
class DataFlowsDataset{
<<joinTable>>
Uuid id
Uuid data_flow_id
Uuid dataset_id
Uuid organization_id
Datetime? deleted_at
}
class `Dataset (Upstream)`:::input{
Uuid id
...
}
class `Dataset (Output)`:::output{
Uuid id
...
}
class SchemaMapping{
Uuid id
...
}
class DataFlowStep{
Uuid id
Uuid data_flow_id
Uuid organization_id
Uuid? input_table_id [SchemaTable?]
Uuid? schema_mapping_id [SchemaMapping?]
Uuid? previous_step_id [DataFlowStep?]
Uuid? migrated_transformation_id [Transformation?]
Int order
String title
Enum step_type
Enum status
Enum approval_status
Bool fetch_from_ingested_datasets
Datetime? started_at
Datetime? ended_at
Datetime? deleted_at
}
class Transformation{
Uuid id
...
}
classDef input fill:#90EE90,stroke:#333
classDef output fill:#87CEFA,stroke:#333
```
### DataFlow
:::info
Individual instance of the workflow for generating a specific output file, which keeps track of the progress that has been made and what needs to be done next.
:::
- `status` possible values:
- `pending` [default], `in_progress`, `completed`, `failed`, `canceled`
- `approval_status` possible values:
- `pending` [default], `approved`, `rejected`
- `from_migration` has a default of `false`
### DataFlowsDataset
:::info
Join table for data flows and datasets
:::
### DataFlowStep
:::info
Single step in a data flow
:::
- `step_type` possible values (no default):
- `ingestions_approved`, `schema_transform`, `output`, `data_flow_completed`
- `status` possible values:
- `pending` [default], `in_progress`, `completed`, `failed`, `canceled`
- `approval_status` possible values:
- `pending` [default], `approved`, `rejected`
- `fetch_from_ingested_datasets` has a default of `false`
## Data Processing Pipelines
This section details the processing pipelines for each transformation type. These pipelines describe how data flows through the system at runtime.
### Ingestion Pipeline
Ingestion transformations (`transformation_type: :ingestion`) parse uploaded files and create `DataRow` records. Unlike schema transformations, ingestion uses a simpler single-pass pipeline orchestrated by `IngestFileJob`.
#### Pipeline Overview
```
IngestFileJob (Sidekiq::Job)
│
▼
Transform::IngestFile.all
├── Create State object
├── Start transformation
├── IngestFile::ProcessBatch
│ ├── Parse spreadsheet file
│ ├── Validate headers match SchemaTable
│ ├── For each row: validate and insert DataRows
│ └── Compute dataset checksum
└── Transform::Finalize
├── Assign indexes (index = group)
└── Mark complete or auto-approve
```
#### Triggering
Ingestion transformations are created and enqueued during the `DataUpload::CreateDatasets` flow:
1. User uploads files via `DataUpload`
2. `DataUpload::FetchTableMatches` matches sheets to `SchemaTable`s
3. For each match, `DataUpload::EnsureDataset` creates `Dataset` + `DatasetVersion`
4. After all `DataFlow`s are ensured, `transformations.first_or_create!` creates the ingestion transformation
5. `Transformation#queue_transformation_job` enqueues `IngestFileJob`
#### Processing Steps
1. **File Parsing** (`IngestFile::Batch#parse_sheet`): Parses the spreadsheet file attached to the `DatasetVersion` using the `Dataset#parse_sheet` method.
2. **Validation** (`IngestFile::Validations`):
- Validates headers match `SchemaTable` headers (missing/extra headers create errors)
- Validates file is not a duplicate of previously uploaded files
- Validates data types for each cell value
- Validates required fields are present
3. **Row Processing** (`IngestFile::ProcessBatch#process_sheet`):
- Iterates each row using `each_row_with_ui_index` (1-based indexing matching spreadsheet)
- Skips empty rows (tracked and reported as info)
- Maps cell values to `SchemaHeader`s by name (supports aliases)
- Validates and normalizes values (UTF-8 encoding, special characters)
- Creates `RowPresenter` with `group = row_index` (1-based)
- Optionally excludes rows based on `SchemaTablesProgramAgreement` conditions
- Computes running MD5 checksum of row values
4. **Data Insertion**: `DataRow` records are bulk-inserted with:
- `index = -1` (placeholder, assigned in finalization)
- `group = row_index` (1-based, matches spreadsheet row number)
- `batch_id = nil` (ingestion doesn't use database Batch records)
- `data_values_json` containing the parsed values
5. **Finalization** (`Transform::Finalize`):
- For ingestion (no Batch records): `index` is set directly from `group` via `UPDATE data_rows SET index = group`
- Dataset checksum is saved for duplicate detection
- Transformation is marked complete and auto-approved if no errors
#### Relationship to DataFlow
Multiple ingestion transformations feed into a single `DataFlow`:
- An `OutputConfig` specifies required input tables via `SchemaTablesOutputConfig` (each with `dataset_count`)
- Each input table may have multiple datasets (e.g., 3 policy files for one reporting period)
- All ingestion transformations must be approved before the `DataFlow` can proceed
- The `DataFlow#ready?` method checks that all required tables have sufficient approved ingestions
- Upon approval, `Transformation::ApproveOrReject` triggers the first schema transform step
### Schema Transform Pipeline
Schema transformations (`transformation_type: :schema_transform`) process data through a multi-stage pipeline orchestrated by `TransformJob`. The pipeline handles joining data from multiple tables, grouping by compound keys, and processing in batches.
#### Pipeline Overview
```
TransformJob (Sidekiq::IterableJob)
│
▼
Stage 1: Transform::Schema::Start
├── Create State object
├── MakeBatches → creates Batch records
│ ├── Build "super rows" (joined/stitched data)
│ └── Partition into batches (by index or compound key)
├── Run validations
└── Compute aggregations
│
▼
Stage 2-3: For each Batch:
Transform::Schema::ProcessBatch
├── Iterate over groups (rows sharing compound key)
├── For each row: apply header mappings via DSL
├── Aggregate group values (first, last, sum, min, max)
├── Remove duplicates, generate Noldor IDs
└── Insert DataRows with index=-1, group=N
│
▼
Stage 4: Transform::Finalize
├── Assign sequential indexes across all batches
├── Save output JSON snapshot
└── Mark transformation complete or auto-approve
```
#### Stage 1: Building Super Rows
A "super row" (`Batch::SuperRow`) represents one or more `DataRow` IDs that should be processed together. The construction differs based on transformation context:
**MGA → Noldor (from ingestion):**
1. **Stitching** (`DatasetStitcher`): When `dataset_count > 1`, multiple uploaded datasets of the same table are vertically concatenated. Rows are assigned "stitched indexes" that maintain order across datasets (ordered by filename, then sheet_name).
2. **Joining** (`JoinDataRows`): Primary table rows are joined with child table rows based on `TableRelationship` configurations. Join strategies include:
- `left_join`: Returns all matching child rows per primary row, expanding the result set
- `left_join_distinct`: Returns distinct matching child rows
- `stacking`: Appends child table rows after primary rows (no key matching)
Join keys are defined in `SchemaRelationship` with `HeaderPair`s specifying which headers to match.
**Noldor → Carrier (to carrier):**
- Fetches data rows directly from the previous step's output `DatasetVersion`
- Filters to rows matching the input schema's tables
#### Stage 2: Batch Creation
Super rows are partitioned into `Batch` records stored in the database. Two strategies exist:
**Index-based batching** (`MakeIndexBatches`):
- Simple slicing: every ~500 super rows become one batch
- Used when no group-by aggregation is configured
**Compound key batching** (`MakeGroupByBatches`):
- Rows are grouped by compound key values (e.g., policy_number + effective_date)
- Compound keys are defined via `GroupBy::Aggregation` with `KeyPart`s
- All rows sharing a compound key are kept in the same batch
- Batches target ~500 rows but may be larger to keep groups intact
Batch `input_data` JSON stores:
```json
{
"initial": [{ "index": 1, "data_row_ids": ["uuid1", "uuid2"] }, ...],
"group_by": {
"aggregation_id": "uuid",
"compound_key": [{ "schema_header_id": "uuid" }, ...],
"input": [
{ "rows": [0, 1, 2], "compound_key": ["POL001", "2024-01-01"] },
{ "rows": [3, 4], "compound_key": ["POL002", "2024-01-01"] }
]
}
}
```
#### Stage 3: Batch Processing
Each batch is processed by `Transform::Schema::ProcessBatch`:
1. **Group iteration** (`GroupBy`): Iterates over groups of rows. For non-grouped batches, each row is its own group.
2. **Row splitting** (optional): If `split_input_rows_into_multiple_output_rows` is enabled, a single input row can produce multiple output rows based on `HeaderGroup` configurations.
3. **Header mapping**: For each row, `ProcessHeaderMapping` applies the `SchemaMapping`'s `HeaderMapping`s:
- Creates a `Dsl::Lookup` context with access to input values, variables, and aggregations
- Evaluates DSL expressions to compute output values
- Checks tag conditions to determine which mappings apply
- Optionally excludes rows based on `exclude_row_condition_group`
4. **Group aggregation**: After processing all rows in a group, values are aggregated:
- `first`: Keep first non-blank value
- `last`: Keep last non-blank value
- `sum`: Sum all values (for numeric types)
- `min`/`max`: Keep minimum/maximum value
- Implicit aggregation: If no explicit aggregation, keeps last non-blank value and warns if values differ
5. **Duplicate detection** (`DuplicateRowTracker`): Removes duplicate output rows using Redis for cross-batch tracking.
6. **Noldor ID generation** (`NoldorIdPopulator`): Generates unique Noldor IDs for output rows.
7. **Data insertion**: `DataRow` records are bulk-inserted with:
- `index = -1` (placeholder, assigned in Stage 4)
- `group = N` (sequential within the batch, used for index assignment)
- `batch_id` linking back to the Batch record
- `data_values_json` containing the transformed values
#### Stage 4: Finalization
`Transform::Finalize` completes the transformation:
1. **Index assignment** (`assign_row_indexes`): The `index` field is computed sequentially across all batches:
- Iterates batches in cursor order
- For each batch, finds the maximum `group` value
- Assigns sequential indexes: batch 1 groups 1-10 get indexes 1-10, batch 2 groups 1-5 get indexes 11-15, etc.
- Uses SQL UPDATE to set `data_rows.index` and `issues.index` based on `group`
:::info
The `group` field represents the processing order within a batch, while `index` represents the final sequential order across the entire transformation output.
:::
2. **Output snapshot**: Saves a JSON snapshot of the transformation output (`SnapshotJson::Save`)
3. **Completion**: Marks transformation as `completed` or `failed` based on error count
4. **Auto-approval**: If configured and no halting issues exist, automatically approves the transformation
5. **Redis cleanup**: Resets Redis keys used for duplicate tracking
## DataFlow Readiness & Approval
This section describes how DataFlows transition through states and how outputs are approved and delivered.
### DataFlow Readiness
A DataFlow transitions to `in_progress` status when **all** of these conditions are met:
1. **All ingestion transformations are approved** - Every input table's ingestion transformation must have `approval_status = :approved`
2. **Dataset count requirements satisfied** - For each required input table, the count of approved datasets ≥ `dataset_count` (configured in `SchemaTablesOutputConfig`)
3. **Optional tables skipped** - Tables marked `optional: true` don't block readiness
Two methods determine readiness in `DataFlow`:
- `ready?` - All required tables must have sufficient approved datasets
- `optionally_ready?` - Same as `ready?` but skips tables marked as optional
### Approval & Delivery Types
Output files have three independent approval/delivery mechanisms:
| Type | Service | Purpose |
|------|---------|---------|
| **Portal/Dashboard** | `Transformation::ApproveOrReject` | Core approval via UI "Approve" button. Updates `approval_status` to `approved`. Publishes `DataFlowApprovedV2` event to EventBridge. |
| **Webhooks** | `Transformation::WebhooksEvents::Create` | Manual action after portal approval. Creates `Webhooks::Event` with action `"dataset:<type>:approved"`. Delivers to customer APIs via configured `Webhooks::Subscription`s. |
| **Data Client** | `DataClient::SendDatasetOutputs` | Manual action after portal approval. Creates `DataClient::DatasetOutput` audit records. Publishes `DataFlowOutputsSent` event. Makes files available for customer download. |
:::info
`ProgramAgreement.auto_approve_output_files` controls whether output steps require manual approval. When `true`, the output step's `auto_approve?` returns `true`, allowing automatic approval.
:::