# Valinor Tech Design ## Notation Guide This document uses Mermaid class diagrams to illustrate the database schema and relationships. Key notation: - **`has_many(r_w_e)`** - `dependent: :restrict_with_exception` - Prevents deletion if associated records exist - **`has_many(destroy)`** - `dependent: :destroy` - Cascading delete of associated records - **`has_many(false)`** - `dependent: false` - No automatic deletion of associated records - **`has_one(r_w_e)`** - Same as has_many but for singular associations - **`belongs_to`** - Foreign key relationship - **`<<joinTable>>`** - Indicates a pure join table between two entities - **`Uuid?`** - Optional/nullable UUID field - **`#attachment_name`** - ActiveStorage attachment (e.g., `#input_files` for uploaded files) ## Data Flow Overview Valinor processes data through the following high-level workflow: 1. **Upload** - User uploads files via `DataUpload` containing multiple sheets 2. **Ingestion** - Each sheet becomes a `Dataset` with a `DatasetVersion`. An ingestion `Transformation` parses the file and creates `DataRow` records according to the `SchemaTable` definition 3. **Schema Transformation** - `DataFlow` orchestrates multi-step transformations: - MGA schema → Noldor schema (first `SchemaMapping`) - Noldor schema → Carrier schema (second `SchemaMapping`) - Each transformation reads source `DataRow`s and writes transformed `DataRow`s to the output `DatasetVersion` 4. **Output Generation** - Final `Transformation` generates output files (CSV, JSON) from the transformed data 5. **Approval & Delivery** - Approved outputs are made available for download or delivery to carriers Key concepts: - **Schemas** define table structures with headers and data types - **SchemaMappings** define field-level transformations between schemas using `HeaderMapping`s - **OutputConfigs** specify which input tables are needed to generate a carrier's output file - **DataFlows** are instances of OutputConfigs for specific reporting periods ### Data Flow Diagram ```mermaid flowchart LR Start([User<br/>uploads<br/>files]) subgraph DataUpload["<b>DataUpload Phase</b>"] direction TB Upload[DataUpload<br/>created with<br/>#input_files] Parse[Parse file<br/>into sheets] CreateDS[Create<br/>Dataset +<br/>DatasetVersion<br/>per sheet] Match{Match to<br/>SchemaTable?} Wait1[Wait for<br/>user match] Upload --> Parse --> CreateDS --> Match Match -->|No| Wait1 --> IngestT Match -->|Yes| IngestT subgraph Ingestion["<b>Ingestion Transformation</b>"] direction TB IngestT["<b>Transformation</b><br/><code>ingestion</code><br/>mapping: NULL"] ParseR[Parse rows per<br/>SchemaTable] CreateDR[Create DataRows<br/>with<br/>data_values_json] IngestComplete[DataRows stored<br/>in ingested<br/>DatasetVersion] IngestT --> ParseR --> CreateDR --> IngestComplete end end subgraph Check["<b>DataFlow Readiness Check</b>"] direction TB EnsureDF{DataFlow exists<br/>for period?} NeedsDS{Needs this<br/>dataset?} AddDS[Add dataset to<br/>existing DataFlow] NewDF[Create new<br/>DataFlow] ReadyCheck{All required tables<br/>have approved<br/>ingestions?} Wait2[Wait for more<br/>datasets/approvals] TriggerDF[Mark DataFlow<br/>in_progress] EnsureDF -->|Yes| NeedsDS EnsureDF -->|No| NewDF NeedsDS -->|Yes| AddDS NeedsDS -->|No| Wait2 AddDS --> ReadyCheck NewDF --> ReadyCheck ReadyCheck -->|No| Wait2 ReadyCheck -->|Yes| TriggerDF end subgraph DataFlow["<b>DataFlow Phase</b>"] direction TB CreateDF[Create DataFlow<br/>+ output<br/>DatasetVersion] subgraph Step1["<b>Step 1: Approval Check</b>"] direction TB S1["<b>DataFlowStep</b><br/><code>ingestions_approved</code>"] S1Check{All<br/>approved?} S1 --> S1Check end subgraph Step2["<b>Step 2: MGA → Noldor</b>"] direction TB S2["<b>DataFlowStep</b><br/><code>schema_transform</code>"] T1["<b>Transformation</b><br/><code>schema_transform</code>"] Read1["Read DataRows<br/><i>from primary table</i><br/><i>ingested version</i>"] Apply1[Apply<br/>SchemaMapping<br/>HeaderMappings] Write1["Write to<br/>DataFlow output<br/>DatasetVersion"] S2 --> T1 --> Read1 --> Apply1 --> Write1 end subgraph Step3["<b>Step 3: Noldor → Carrier</b>"] direction TB S3["<b>DataFlowStep</b><br/><code>schema_transform</code>"] T2["<b>Transformation</b><br/><code>schema_transform</code>"] Read2["Read DataRows<br/><i>from Step 2</i><br/><i>output version</i>"] Apply2[Apply<br/>SchemaMapping<br/>HeaderMappings] Write2["Write to<br/>same output<br/>DatasetVersion"] S3 --> T2 --> Read2 --> Apply2 --> Write2 end subgraph Step4["<b>Step 4: Generate Output</b>"] direction TB S4["<b>DataFlowStep</b><br/><code>output</code>"] T3["<b>Transformation</b><br/><code>output</code><br/>mapping: inherited or NULL"] Read3["Read final<br/>transformed<br/>DataRows"] Generate[Generate<br/>CSV/JSON files] Attach[Attach files to<br/>DatasetVersion] S4 --> T3 --> Read3 --> Generate --> Attach end subgraph Step5["<b>Step 5: Completion & Delivery</b>"] direction TB S5["<b>DataFlowStep</b><br/><code>data_flow_completed</code>"] PortalApprove{Portal<br/>Approved?} WebhookSend{Send to<br/>Webhooks?} DataClientSend{Send to<br/>Data Client?} S5 --> PortalApprove PortalApprove -->|Yes| WebhookSend PortalApprove -->|Yes| DataClientSend end CreateDF --> Step1 S1Check -->|Yes| Step2 Step2 --> Step3 Step3 --> Step4 Step4 --> Step5 end End1([Deliver to<br/>carrier]) End2([Revise and<br/>retry]) Start --> DataUpload DataUpload --> Check Check --> DataFlow DataFlow --> End1 WebhookSend -->|Yes| End1 DataClientSend -->|Yes| End1 PortalApprove -->|No| End2 %% Styling classDef uploadPhase fill:#e1f5e1,stroke:#333,stroke-width:2px classDef ingestPhase fill:#fff3cd,stroke:#333,stroke-width:2px classDef checkPhase fill:#ffe0b2,stroke:#333,stroke-width:2px classDef dataFlowPhase fill:#cfe2ff,stroke:#333,stroke-width:2px classDef outputPhase fill:#f8d7da,stroke:#333,stroke-width:2px class DataUpload uploadPhase class Ingestion ingestPhase class Check checkPhase class DataFlow,Step1,Step2,Step3 dataFlowPhase class Step4,Step5 outputPhase ``` **Key Points:** - **DataUpload Phase** (green): User uploads files, system creates Dataset + DatasetVersion for each sheet - **Ingestion Phase** (yellow): `Transformation` (type: `ingestion`) parses raw file data into structured `DataRows` based on `SchemaTable` definition - **Readiness Check** (orange): `DataFlow::Ensure` creates/reuses DataFlows; the flow only starts when ALL required input tables have approved ingestions meeting `dataset_count` requirements - **DataFlow Phase** (blue): `DataFlow` orchestrates multiple `DataFlowStep`s, each containing `Transformation`s (type: `schema_transform`) that apply `SchemaMapping`s - **Output Phase** (red): Final `Transformation` (type: `output`) generates export files; output can be delivered via Portal approval, Webhooks, or Data Client **Transformation Types:** - `ingestion`: Parses uploaded files → DataRows (in DataUpload context), `schema_mapping` is NULL - `schema_transform`: Applies SchemaMapping rules to transform DataRows (in DataFlow context), `schema_mapping` is required - `output`: Generates final export files from DataRows (in DataFlow context), `schema_mapping` is inherited from last schema_transform step or NULL ## Organization Design ```mermaid classDiagram source direction LR Organization --> Datasource : has_many(r_w_e) Organization --> OrganizationMembership : has_many(r_w_e) Organization --> Program : has_many(r_w_e) Organization --> ProgramAgreement : has_many(r_w_e) Organization --> User : has_many(r_w_e) User <-- OrganizationMembership : belongs_to class Organization{ Uuid id String name String key Enum organization_type Array~String~ output_formats String? google_drive_url Datetime? deleted_at } class Datasource{ Uuid id Uuid organization_id Enum datasource_type Jsonb? config Datetime? deleted_at } class User{ Uuid id Uuid organization_id Uuid? identity_provider_id String auth0_uid String first_name String last_name String email_address Bool noldor_admin Bool noldor_developer Bool email_verified Datetime? deleted_at } class OrganizationMembership{ Uuid id Uuid organization_id Uuid user_id Enum role Datetime? last_viewed_at Datetime? deleted_at } class Program{ Uuid id Uuid organization_id String name Datetime? deleted_at } class ProgramAgreement{ <<joinTable>> Uuid id Uuid organization_id Uuid mga_program_id Uuid carrier_program_id Date effective_date Date expiration_date Bool auto_approve_output_files Datetime? deleted_at } ``` ### Organization :::info Organization represents a customer (or Noldor) entity for the sake of segmenting data. ::: - `organization_type` possible values: - `noldor` [default], `carrier`, `mga`, `tpa`, `broker`, `other` - `output_formats` has a default of `["json", "csv"]` ### Datasource :::info Datasource represents a source of data for an Organization (e.g. manual upload or an API integration). ::: - `datasource_type` possible values: - `manual_upload` [default] ### User :::info A user represents someone who can log into the system. Each user belongs to a primary organization. ::: - `noldor_admin` has a default of `false` - `noldor_developer` has a default of `false` - `email_verified` has a default of `false` ### OrganizationMembership :::info User membership relationship to an organization. This grants users access and permissions to organizations beyond their primary organization. ::: - `role` possible values: - `admin` [default] The `user_id` links to the User table, establishing which user has membership in the organization. ### Program :::info A Program is a logical grouping of Program Agreements that address a single market segment from the organization's perspective. In the case of an MGA, a Program represents a market segment that the MGA is underwriting with one or more carriers / fronting companies / reinsurers providing the paper and capacity to serve it. In the case of a carrier / fronting company / reinsurer, a Program represents a collection of MGAs' Programs that all have a comparable market segment and that the carrier / fronting company / reinsurer has a Program Agreement with to provide paper and capacity. ::: Can be ordered by a combo of organization name and program name. ### ProgramAgreement :::info A ProgramAgreement is a contract between an MGA and a carrier / fronting company / reinsurer that authorizes the MGA to underwrite policies that are backed or partially backed by the carrier / fronting company / reinsurer. This contract details the terms and guidelines the MGA is authorized to operate within as well as commission rates, etc. ::: - `auto_approve_output_files` has a default of `false` :::warning **DEPRECATED**: `SchemaTablesProgramAgreement` is deprecated. Schema tables should now be linked to program agreements through output configs by setting them as input tables. ::: ## Schema Design ```mermaid classDiagram source direction LR Organization <-- Schema : belongs_to Schema --> SchemaTable : has_many(r_w_e) SchemaTable --> SchemaHeader : has_many(destroy) SchemaTable --> SchemaTablesProgramAgreement : has_many(destroy) `#seed_file` "0..1" ()-- SchemaTable : has_one ProgramAgreement <-- SchemaTablesProgramAgreement : belongs_to class Organization{ Uuid id ... } class Schema{ Uuid id Uuid organization_id Int version Datetime? deleted_at Datetime? published_at } class ProgramAgreement{ Uuid id ... } class SchemaTable{ Uuid id Uuid schema_id Uuid organization_id String name Enum dataset_type String? sheet_name String? header_spec Int header_row_index Datetime? deleted_at } class SchemaHeader{ Uuid id Uuid schema_table_id Uuid organization_id String name Enum data_type Bool required Bool unique Jsonb? metadata Int order Text? description Datetime? deleted_at } class SchemaTablesProgramAgreement{ <<joinTable>> Uuid id Uuid schema_table_id Uuid program_agreement_id Uuid organization_id Datetime? deleted_at } ``` ### Schema :::info Schema represents the collective structure of an Organization's data ::: - `version` has a default of `1` ### SchemaTable :::info SchemaTable represents a single model that makes up part of an Organization's data (e.g. a Policy). ::: - `dataset_type` possible values: - `policy` [default], `risk`, `claim`, `location`, `exposure`, `broker`, `transaction`, `other` - `header_row_index` (a note in the code says this will go away and be replaced with `header_spec`) has a default of `1` - `header_spec` is a String with integers that are separated by non-decimal characters (e.g., `"3,4,5"`). These describe the row numbers where you can find the headers. ![image](https://hackmd.io/_uploads/r1fC-J3Igl.png) The SchemaTable can optionally have an attached `seed_file` (ActiveStorage attachment). This file builds out the headers and provides a view of what a table looks like. Header names are generated by concatenating multiple vertical header strings together with spaces from the `header_spec`. ![image](https://hackmd.io/_uploads/H1zZfk28xx.png =250x) ### SchemaHeader :::info SchemaHeader represents a single column/attribute of an Object as part of an Organization's data (e.g. Policy.policy_number). ::: - `data_type` possible values: - `string` [default], `decimal`, `integer`, `boolean`, `date`, `percent`, `dollars`, `dollars_with_cents`, `datetime`, `float`, `time`, `big_decimal` - `required` has a default of `false` - `unique` has a default of `false` - `order` has a default of `0` ### SchemaTablesProgramAgreement :::warning **DEPRECATED**: This model is being phased out. Link schema tables to program agreements through `OutputConfig` by setting them as input tables instead. ::: :::info SchemaTablesProgramAgreement is a join table that associates one or more schema tables with one or more program agreements. ::: This is unique by `schema_table_id` and `program_agreement_id`. ## SchemaMapping Design ```mermaid classDiagram source `Schema (Input)` <-- SchemaMapping : belongs_to `Schema (Output)` <-- SchemaMapping : belongs_to SchemaMapping --> Tag : has_many(destroy) Tag <-- Tagging : belongs_to HeaderMapping --> Tagging : has_many(destroy) SchemaMapping --> HeaderMapping : has_many(destroy) SchemaMapping --> Variable : has_many(destroy) SchemaMapping --> HeaderGroup : has_many(destroy) HeaderGroup --> HeaderGroupSchemaHeader : has_many(destroy) SchemaHeader <-- HeaderGroupSchemaHeader : belongs_to class `Schema (Input)`{ Uuid id ... } class `Schema (Output)`{ Uuid id ... } class SchemaMapping{ Uuid id Uuid input_schema_id Uuid output_schema_id Uuid organization_id Uuid? parent_id [SchemaMapping?] Uuid? output_table_id [SchemaTable?] Bool deprecated_by_output_config_migration Bool exclude_rows Bool remove_duplicate_output_rows Bool split_input_rows_into_multiple_output_rows Text? notes Datetime? deleted_at } class HeaderMapping{ Uuid id Uuid schema_mapping_id Uuid organization_id Uuid? input_header_id [SchemaHeader?] Uuid? output_header_id [SchemaHeader?] Uuid? input_variable_id [Variable?] Uuid? output_variable_id [Variable?] Enum input_type Enum output_type Enum source_type String? input_value String? output_value Int order Jsonb? config Text? notes Datetime? deleted_at } class Tagging{ <<joinTable>> Uuid id Uuid tag_id String tagged_type Uuid tagged_id Uuid organization_id Datetime? deleted_at } class Tag{ Uuid id Uuid organization_id Uuid schema_mapping_id Uuid? condition_group_id String name String color Datetime? deleted_at } class Variable{ Uuid id Uuid schema_mapping_id Uuid organization_id String name String key Enum data_type Text? default_value Datetime? deleted_at } class HeaderGroup{ Uuid id Uuid schema_mapping_id Uuid organization_id String label Datetime? deleted_at } class HeaderGroupSchemaHeader{ <<joinTable>> Uuid id Uuid header_group_id Uuid schema_header_id Datetime? deleted_at } class SchemaHeader{ Uuid id ... } ``` ### SchemaMapping :::info SchemaMapping represents a mapping for transforming data between two schemas. ::: - `deprecated_by_output_config_migration` has a default of `false` - `exclude_rows` has a default of `false` - `remove_duplicate_output_rows` has a default of `false` - `split_input_rows_into_multiple_output_rows` has a default of `false` Defines how to transform data from an input schema to an output schema. Contains header mappings, variables, and transformation logic. Supports the following `ConditionGroup`'s: - `has_conditions :exclude_row` - Creates `exclude_row_condition_group` for conditional row exclusion (works with the `exclude_rows` boolean column) - `has_conditions :header_group` - Creates `header_group_condition_group` for conditional row splitting (works with `header_groups` and the `split_input_rows_into_multiple_output_rows` boolean column) A schema mapping can inherit from another schema mapping with the `parent_id` column. :::warning If the optional `output_table` is present (mapping to carrier table) then no `HeaderMapping`'s should point to any other `SchemaTable`'s besides this `output_table`. ::: :::info The `OutputConfig`'s can share `SchemaMapping`'s, but within a `SchemaMapping` it will NOT run the `HeaderMapping`'s unless the upstream header exists in ONLY the `OutputConfig`'s primary input table. ::: ### HeaderMapping :::info HeaderMapping represents the collection of rules for mapping one SchemaHeader to another. ::: - `input_type` possible values: - `schema_header` [default], `string`, `variable` - `output_type` possible values: - `schema_header` [default], `variable` - `source_type` possible values: - `user_defined` [default], `suggested`, `unmapped`, `duplicate` - `order` has a default of `0` - `store` has the `accessors` of - `dsl`, `dsl_text` Conditions can be applied to a HeaderMapping through: - It has a 0 or 1 `ConditionGroup`'s directly attached to itself. - It `has_many` `tags` (through `taggings`). Note, the `Tag` can have 0 or 1 `ConditionGroup`'s. ### Tag :::info Tag header mappings with different tags that provide a highlight color and bring in conditions. Tags are scoped to schema mappings. ::: - `color` has a default of `"#1a547a"` - `schema_mapping_id` is NOT NULL - each tag belongs to a specific schema mapping This can have 0 or 1 `ConditionGroup`'s attached to it. You can see this in action on the `HeaderMapping` through the `Tagging` join table. ### Variable :::info Variable represents a datastorage for a transform ::: - `data_type` possible values (no default): - `string`, `decimal`, `integer`, `boolean`, `date`, `percent`, `dollars`, `dollars_with_cents`, `datetime`, `float`, `time`, `big_decimal` The `key` is unique for the `SchemaMapping` and the parent `SchemaMapping` (only one level up). ### HeaderGroup :::info HeaderGroup represents a collection of schema headers within a SchemaMapping, for use in splitting an input row into several output rows. ::: ### HeaderGroupSchemaHeader :::info HeaderGroupSchemaHeader is a join table that associates one or more header groups with one or more schema headers. ::: ## OutputConfig Design ```mermaid classDiagram source SchemaMapping <-- SchemaMappingStep : belongs_to OutputConfig --> SchemaMappingStep : has_many(r_w_e) OutputConfig --> SchemaTablesOutputConfig : has_many(destroy) `SchemaTable (Input)` <-- SchemaTablesOutputConfig : belongs_to `SchemaTable (Output)` <-- OutputConfig : belongs_to ProgramAgreement <-- OutputConfig : belongs_to note for `SchemaTable (Input)` "Input tables it expects<br>before generating output.<br><br>Must have one primary." note for `SchemaTable (Output)` "Output table it generates in the end" note for SchemaMapping "1\. MGA->Noldor mapping<br>2\. Noldor->Carrier mapping<br><br>Final mapping MUST match<br>expected output table" class OutputConfig{ Uuid id Uuid program_agreement_id Uuid output_table_id [SchemaTable] Uuid organization_id String label Enum reporting_cadence Datetime? deleted_at } class ProgramAgreement{ Uuid id ... } class `SchemaTable (Input)`:::input{ Uuid id ... } class `SchemaTable (Output)`:::output{ Uuid id ... } class SchemaTablesOutputConfig{ <<joinTable>> Uuid id Uuid? output_config_id Uuid? input_table_id [SchemaTable?] Int dataset_count Int dependency_order Bool primary_table Bool optional Datetime? deleted_at } class SchemaMappingStep{ Uuid id Uuid schema_mapping_id String parent_type Uuid parent_id Uuid organization_id Int order Datetime? deleted_at } class SchemaMapping{ Uuid id ... Uuid? output_table_id [SchemaTable?] ... } classDef input fill:#90EE90,stroke:#333 classDef output fill:#87CEFA,stroke:#333 ``` ### OutputConfig :::info Configuration that specifies the dependencies needed to generate an output file for a carrier. ::: - `reporting_cadence` possible values: - `monthly` The OutputConfig can have multiple input `SchemaTable`'s but only one output `SchemaTable`. :::warning The final `output_table` SHOULD match the last `SchemaMapping` step's `output_table`. This is not enforced, but can be expected to be true. ::: ### SchemaTablesOutputConfig :::info Tables needed to generate an output file for a carrier ::: - `dataset_count` has a default of `1` - `dependency_order` has a default of `0` - `primary_table` has a default of `false` - `optional` has a default of `false` :::warning The `output_config_id` and `input_table_id` columns can be `NULL` in the database. Although the model validates they are present. ::: ### SchemaMappingStep :::info Single step in a series of steps in an output workflow ::: - `parent_type` / `parent_id` is a polymorphic reference (this is how it maps to `OutputConfig`) This behaves like a polymorphic join table between a `SchemaMapping` and the `OutputConfig`. These steps seem to track the `Tranformation`'s. ## DataUpload Design ```mermaid classDiagram source direction LR DataSource <-- DataUpload : belongs_to `#input_files` "1.." ()-- DataUpload : has_many DataUpload --> Dataset : has_many(r_w_e) SchemaTable "0..1" <-- Dataset : belongs_to ProgramAgreement "0..1" <-- Dataset : belongs_to Dataset --> DatasetVersion : has_many(false) `#input_file` "1" ()-- DatasetVersion : has_one note for SchemaTable "The <code>SchemaTable</code><br>is only set for<br>ingestion datasets" class DataSource{ Uuid id ... } class DataUpload{ Uuid id Uuid datasource_id Uuid organization_id Uuid? task_id Date start_date Date end_date String? notes Datetime? deleted_at } class Dataset{ Uuid id Uuid data_upload_id Uuid organization_id Uuid? schema_table_id Uuid? program_agreement_id Uuid? data_flow_id Enum dataset_type Enum status Bool approved String filename String? sheet_name String? checksum Int header_row_index String? header_spec Date start_date Date end_date Datetime? deleted_at } class DatasetVersion{ Uuid id Uuid organization_id Int version Jsonb? input_headers } class SchemaTable{ Uuid id ... } class ProgramAgreement{ Uuid id ... } ``` ### DataUpload :::info DataUpload represents a top-level set of data to be ingested and transformed ::: This record `has_many` attached files under `#input_files`. The `DataUpload` can hold multiple files or one file with multiple sheets. Each sheet becomes a `DatasetVersion`. Once you upload the files it will kick off the `OutputConfig` (by generating a `DataUpload` for the supplied reporting period) once the primary table has been matched with the corresponding sheet. ### Dataset :::info Dataset represents an individual table to be ingested and transformed ::: - `dataset_type` possible values: - `policy` [default], `risk`, `claim`, `location`, `exposure`, `broker`, `transaction`, `other` - `status` possible values: - `pending` [default], `in_progress`, `completed`, `failed` - `approved` has a default of `false` - `header_row_index` has a default of `1` A Dataset can belong to either a `DataUpload` (for user-uploaded data) OR a `DataFlow` (for generated output data). This is a single sheet from a file (e.g., "risk", "policy", "claim" sheet). ### DatasetVersion :::info DatasetVersion represents one version of dataset (i.e. when a Dataset is updated with a new file, that creates a new version) ::: - `version` has a default of `1` This record `has_one` attached file under `#input_file`. It also has access to the `DataRow`'s and `Transformation`'s. ## Transformation Design ```mermaid classDiagram source direction LR DatasetVersion <-- Transformation : belongs_to Transformation --> DataRow : has_many(false) SchemaTable <-- DataRow : belongs_to class Transformation{ Uuid id Uuid organization_id Uuid? schema_mapping_id Uuid? retry_for_id Uuid? data_flow_step_id Enum transformation_type Enum status Enum approval_status Datetime? approval_updated_at Datetime? started_at Datetime? ended_at Bool recalculate_fields Jsonb? metadata } class DatasetVersion{ Uuid id ... } class DataRow{ Uuid id Uuid transformation_id Uuid schema_table_id Uuid organization_id Uuid dataset_version_id Uuid? batch_id Enum approval_status Int index Int group Jsonb data_values_json } class SchemaTable{ Uuid id ... } ``` ### Transformation :::info Transformation tracks the process of parsing a DatasetVersion file and creating child DataRow records from it for downstream use ::: - `transformation_type` possible values: - `ingestion` [default], `schema_transform`, `output` - `status` possible values: - `pending` [default], `in_progress`, `completed`, `failed`, `canceled` - `approval_status` possible values: - `pending` [default], `approved`, `rejected` - `recalculate_fields` has a default of `false` (field exists in database but not actively used in code) #### Transformation (Ingestion) The user uploads a number of files in the `DataUpload` which creates a number of `Dataset`'s (and `DatasetVersion`'s) for each sheet. For each of these `DatasetVersion`'s a `Transformation` (`transformation_type: :ingestion`) is kicked off. This will transform the sheet into its corresponding `Dataset#schema_table` as `DataRow`'s on the transformation. See [Ingestion Pipeline](#ingestion-pipeline) for detailed processing steps. #### Transformation (in DataFlow) Within a `DataFlow` there will be at a minimum 5 steps (`DataFlowStep`). See [Schema Transform Pipeline](#schema-transform-pipeline) for detailed processing steps. 1. `step_type: :ingestions_approved` - this confirms all the ingested dataset versions have been approved 1. `step_type: :schema_transform` - the MGA -> Noldor schema mapping - This contains at least one `Transformation` (`transformation_type: :schema_tranform`) that writes its `DataRow`'s to the corresponding output `DatasetVersion` of the `DataFlow` 1. `step_type: :schema_transform` - the Noldor -> carrier schema mapping - This also contains at least one `Transformation` that writes the same `DatasetVersion` 1. `step_type: :output` - Generates the final output files (CSV, JSON, etc.) - Contains `Transformation` (`transformation_type: :output`) that creates the exported files 1. `step_type: :data_flow_completed` - Marks the data flow as complete ### DataRow :::info DataRow represents one row of data. Data values are stored in the `data_values_json` JSONB column. ::: - `approval_status` possible values: - `pending` [default], `approved`, `rejected` - `group` has a default of `-1` - `data_values_json` has a default of `{}` The system has fully transitioned to using `data_values_json` for storing data values. The structure is: ```json { "values": { "schema_header_uuid_1": { "value": "John Doe", "mapped_from": "optional_source_info" }, "schema_header_uuid_2": { "value": "12345" } }, "multiple_values": { "schema_header_uuid_1": [ { "value": "Additional Value 1", "mapped_from": "some_source" }, { "value": "Additional Value 2" } ] } } ``` - Keys are `schema_header_id` (UUID strings) - `values` contains the primary value for each header - `multiple_values` contains additional values for headers that already have a value in `values` (used when splitting input rows) - Each value object contains `value` (the actual data) and optionally `mapped_from` (tracking information) ## DataFlow Design ```mermaid classDiagram source OutputConfig <-- DataFlow : belongs_to `Dataset (Upstream)` <-- DataFlowsDataset : belongs_to DataFlow --> DataFlowsDataset : has_many(destroy) DataFlow --> `Dataset (Output)` : has_one(r_w_e) DataFlow --> DataFlowStep : has_many(destroy) DataFlowStep --> Transformation : has_many(nullify) SchemaMapping "0..1" <-- DataFlowStep : belongs_to note for DataFlow "A copy of <code>OutputConfig</code><br>for each reporting period" note for `Dataset (Upstream)` "This is the expected<br>list of input data sheets.<br><br>These were previously<br>ingested by a Transformation." note for `Dataset (Output)` "This is the<br>expected<br>final output<br>sheet" note for Transformation "Data rows<br>live here.<br>Accumulated<br>data rows<br>can be accessed<br>through<br><code>DatasetVersion</code>." note for DataFlowStep "\- Ingestion step<br>\- Map MGA->Noldor<br>\- Map Noldor->Carrier<br>\- Output step<br>\- Completion step" class DataFlow{ Uuid output_config_id Uuid id Uuid organization_id Enum status Enum approval_status Bool from_migration Datetime reporting_period_start_date Datetime? deleted_at } class OutputConfig{ Uuid id ... } class DataFlowsDataset{ <<joinTable>> Uuid id Uuid data_flow_id Uuid dataset_id Uuid organization_id Datetime? deleted_at } class `Dataset (Upstream)`:::input{ Uuid id ... } class `Dataset (Output)`:::output{ Uuid id ... } class SchemaMapping{ Uuid id ... } class DataFlowStep{ Uuid id Uuid data_flow_id Uuid organization_id Uuid? input_table_id [SchemaTable?] Uuid? schema_mapping_id [SchemaMapping?] Uuid? previous_step_id [DataFlowStep?] Uuid? migrated_transformation_id [Transformation?] Int order String title Enum step_type Enum status Enum approval_status Bool fetch_from_ingested_datasets Datetime? started_at Datetime? ended_at Datetime? deleted_at } class Transformation{ Uuid id ... } classDef input fill:#90EE90,stroke:#333 classDef output fill:#87CEFA,stroke:#333 ``` ### DataFlow :::info Individual instance of the workflow for generating a specific output file, which keeps track of the progress that has been made and what needs to be done next. ::: - `status` possible values: - `pending` [default], `in_progress`, `completed`, `failed`, `canceled` - `approval_status` possible values: - `pending` [default], `approved`, `rejected` - `from_migration` has a default of `false` ### DataFlowsDataset :::info Join table for data flows and datasets ::: ### DataFlowStep :::info Single step in a data flow ::: - `step_type` possible values (no default): - `ingestions_approved`, `schema_transform`, `output`, `data_flow_completed` - `status` possible values: - `pending` [default], `in_progress`, `completed`, `failed`, `canceled` - `approval_status` possible values: - `pending` [default], `approved`, `rejected` - `fetch_from_ingested_datasets` has a default of `false` ## Data Processing Pipelines This section details the processing pipelines for each transformation type. These pipelines describe how data flows through the system at runtime. ### Ingestion Pipeline Ingestion transformations (`transformation_type: :ingestion`) parse uploaded files and create `DataRow` records. Unlike schema transformations, ingestion uses a simpler single-pass pipeline orchestrated by `IngestFileJob`. #### Pipeline Overview ``` IngestFileJob (Sidekiq::Job) │ ▼ Transform::IngestFile.all ├── Create State object ├── Start transformation ├── IngestFile::ProcessBatch │ ├── Parse spreadsheet file │ ├── Validate headers match SchemaTable │ ├── For each row: validate and insert DataRows │ └── Compute dataset checksum └── Transform::Finalize ├── Assign indexes (index = group) └── Mark complete or auto-approve ``` #### Triggering Ingestion transformations are created and enqueued during the `DataUpload::CreateDatasets` flow: 1. User uploads files via `DataUpload` 2. `DataUpload::FetchTableMatches` matches sheets to `SchemaTable`s 3. For each match, `DataUpload::EnsureDataset` creates `Dataset` + `DatasetVersion` 4. After all `DataFlow`s are ensured, `transformations.first_or_create!` creates the ingestion transformation 5. `Transformation#queue_transformation_job` enqueues `IngestFileJob` #### Processing Steps 1. **File Parsing** (`IngestFile::Batch#parse_sheet`): Parses the spreadsheet file attached to the `DatasetVersion` using the `Dataset#parse_sheet` method. 2. **Validation** (`IngestFile::Validations`): - Validates headers match `SchemaTable` headers (missing/extra headers create errors) - Validates file is not a duplicate of previously uploaded files - Validates data types for each cell value - Validates required fields are present 3. **Row Processing** (`IngestFile::ProcessBatch#process_sheet`): - Iterates each row using `each_row_with_ui_index` (1-based indexing matching spreadsheet) - Skips empty rows (tracked and reported as info) - Maps cell values to `SchemaHeader`s by name (supports aliases) - Validates and normalizes values (UTF-8 encoding, special characters) - Creates `RowPresenter` with `group = row_index` (1-based) - Optionally excludes rows based on `SchemaTablesProgramAgreement` conditions - Computes running MD5 checksum of row values 4. **Data Insertion**: `DataRow` records are bulk-inserted with: - `index = -1` (placeholder, assigned in finalization) - `group = row_index` (1-based, matches spreadsheet row number) - `batch_id = nil` (ingestion doesn't use database Batch records) - `data_values_json` containing the parsed values 5. **Finalization** (`Transform::Finalize`): - For ingestion (no Batch records): `index` is set directly from `group` via `UPDATE data_rows SET index = group` - Dataset checksum is saved for duplicate detection - Transformation is marked complete and auto-approved if no errors #### Relationship to DataFlow Multiple ingestion transformations feed into a single `DataFlow`: - An `OutputConfig` specifies required input tables via `SchemaTablesOutputConfig` (each with `dataset_count`) - Each input table may have multiple datasets (e.g., 3 policy files for one reporting period) - All ingestion transformations must be approved before the `DataFlow` can proceed - The `DataFlow#ready?` method checks that all required tables have sufficient approved ingestions - Upon approval, `Transformation::ApproveOrReject` triggers the first schema transform step ### Schema Transform Pipeline Schema transformations (`transformation_type: :schema_transform`) process data through a multi-stage pipeline orchestrated by `TransformJob`. The pipeline handles joining data from multiple tables, grouping by compound keys, and processing in batches. #### Pipeline Overview ``` TransformJob (Sidekiq::IterableJob) │ ▼ Stage 1: Transform::Schema::Start ├── Create State object ├── MakeBatches → creates Batch records │ ├── Build "super rows" (joined/stitched data) │ └── Partition into batches (by index or compound key) ├── Run validations └── Compute aggregations │ ▼ Stage 2-3: For each Batch: Transform::Schema::ProcessBatch ├── Iterate over groups (rows sharing compound key) ├── For each row: apply header mappings via DSL ├── Aggregate group values (first, last, sum, min, max) ├── Remove duplicates, generate Noldor IDs └── Insert DataRows with index=-1, group=N │ ▼ Stage 4: Transform::Finalize ├── Assign sequential indexes across all batches ├── Save output JSON snapshot └── Mark transformation complete or auto-approve ``` #### Stage 1: Building Super Rows A "super row" (`Batch::SuperRow`) represents one or more `DataRow` IDs that should be processed together. The construction differs based on transformation context: **MGA → Noldor (from ingestion):** 1. **Stitching** (`DatasetStitcher`): When `dataset_count > 1`, multiple uploaded datasets of the same table are vertically concatenated. Rows are assigned "stitched indexes" that maintain order across datasets (ordered by filename, then sheet_name). 2. **Joining** (`JoinDataRows`): Primary table rows are joined with child table rows based on `TableRelationship` configurations. Join strategies include: - `left_join`: Returns all matching child rows per primary row, expanding the result set - `left_join_distinct`: Returns distinct matching child rows - `stacking`: Appends child table rows after primary rows (no key matching) Join keys are defined in `SchemaRelationship` with `HeaderPair`s specifying which headers to match. **Noldor → Carrier (to carrier):** - Fetches data rows directly from the previous step's output `DatasetVersion` - Filters to rows matching the input schema's tables #### Stage 2: Batch Creation Super rows are partitioned into `Batch` records stored in the database. Two strategies exist: **Index-based batching** (`MakeIndexBatches`): - Simple slicing: every ~500 super rows become one batch - Used when no group-by aggregation is configured **Compound key batching** (`MakeGroupByBatches`): - Rows are grouped by compound key values (e.g., policy_number + effective_date) - Compound keys are defined via `GroupBy::Aggregation` with `KeyPart`s - All rows sharing a compound key are kept in the same batch - Batches target ~500 rows but may be larger to keep groups intact Batch `input_data` JSON stores: ```json { "initial": [{ "index": 1, "data_row_ids": ["uuid1", "uuid2"] }, ...], "group_by": { "aggregation_id": "uuid", "compound_key": [{ "schema_header_id": "uuid" }, ...], "input": [ { "rows": [0, 1, 2], "compound_key": ["POL001", "2024-01-01"] }, { "rows": [3, 4], "compound_key": ["POL002", "2024-01-01"] } ] } } ``` #### Stage 3: Batch Processing Each batch is processed by `Transform::Schema::ProcessBatch`: 1. **Group iteration** (`GroupBy`): Iterates over groups of rows. For non-grouped batches, each row is its own group. 2. **Row splitting** (optional): If `split_input_rows_into_multiple_output_rows` is enabled, a single input row can produce multiple output rows based on `HeaderGroup` configurations. 3. **Header mapping**: For each row, `ProcessHeaderMapping` applies the `SchemaMapping`'s `HeaderMapping`s: - Creates a `Dsl::Lookup` context with access to input values, variables, and aggregations - Evaluates DSL expressions to compute output values - Checks tag conditions to determine which mappings apply - Optionally excludes rows based on `exclude_row_condition_group` 4. **Group aggregation**: After processing all rows in a group, values are aggregated: - `first`: Keep first non-blank value - `last`: Keep last non-blank value - `sum`: Sum all values (for numeric types) - `min`/`max`: Keep minimum/maximum value - Implicit aggregation: If no explicit aggregation, keeps last non-blank value and warns if values differ 5. **Duplicate detection** (`DuplicateRowTracker`): Removes duplicate output rows using Redis for cross-batch tracking. 6. **Noldor ID generation** (`NoldorIdPopulator`): Generates unique Noldor IDs for output rows. 7. **Data insertion**: `DataRow` records are bulk-inserted with: - `index = -1` (placeholder, assigned in Stage 4) - `group = N` (sequential within the batch, used for index assignment) - `batch_id` linking back to the Batch record - `data_values_json` containing the transformed values #### Stage 4: Finalization `Transform::Finalize` completes the transformation: 1. **Index assignment** (`assign_row_indexes`): The `index` field is computed sequentially across all batches: - Iterates batches in cursor order - For each batch, finds the maximum `group` value - Assigns sequential indexes: batch 1 groups 1-10 get indexes 1-10, batch 2 groups 1-5 get indexes 11-15, etc. - Uses SQL UPDATE to set `data_rows.index` and `issues.index` based on `group` :::info The `group` field represents the processing order within a batch, while `index` represents the final sequential order across the entire transformation output. ::: 2. **Output snapshot**: Saves a JSON snapshot of the transformation output (`SnapshotJson::Save`) 3. **Completion**: Marks transformation as `completed` or `failed` based on error count 4. **Auto-approval**: If configured and no halting issues exist, automatically approves the transformation 5. **Redis cleanup**: Resets Redis keys used for duplicate tracking ## DataFlow Readiness & Approval This section describes how DataFlows transition through states and how outputs are approved and delivered. ### DataFlow Readiness A DataFlow transitions to `in_progress` status when **all** of these conditions are met: 1. **All ingestion transformations are approved** - Every input table's ingestion transformation must have `approval_status = :approved` 2. **Dataset count requirements satisfied** - For each required input table, the count of approved datasets ≥ `dataset_count` (configured in `SchemaTablesOutputConfig`) 3. **Optional tables skipped** - Tables marked `optional: true` don't block readiness Two methods determine readiness in `DataFlow`: - `ready?` - All required tables must have sufficient approved datasets - `optionally_ready?` - Same as `ready?` but skips tables marked as optional ### Approval & Delivery Types Output files have three independent approval/delivery mechanisms: | Type | Service | Purpose | |------|---------|---------| | **Portal/Dashboard** | `Transformation::ApproveOrReject` | Core approval via UI "Approve" button. Updates `approval_status` to `approved`. Publishes `DataFlowApprovedV2` event to EventBridge. | | **Webhooks** | `Transformation::WebhooksEvents::Create` | Manual action after portal approval. Creates `Webhooks::Event` with action `"dataset:<type>:approved"`. Delivers to customer APIs via configured `Webhooks::Subscription`s. | | **Data Client** | `DataClient::SendDatasetOutputs` | Manual action after portal approval. Creates `DataClient::DatasetOutput` audit records. Publishes `DataFlowOutputsSent` event. Makes files available for customer download. | :::info `ProgramAgreement.auto_approve_output_files` controls whether output steps require manual approval. When `true`, the output step's `auto_approve?` returns `true`, allowing automatic approval. :::