# Triplit Plugin **Motivation:** Triplit provides a powerful foundation for building real-time, offline-capable applications using basic data types (string, number, boolean, date, set). However, to unlock its full potential for richer collaborative experiences like those found in document editors, design tools, or applications with specialized counters or ordered lists, we need a way to extend its core data modeling capabilities. Currently, adding new data types with custom merge logic (like CRDTs) requires modifying the core `@triplit/db`, `@triplit/client`, and `@triplit/server-core` packages. This presents a high barrier for both the core team and the community to introduce specialized types tailored to specific application needs. This proposal outlines a **Plugin API** designed to allow developers to define, implement, and register custom CRDTs or other mergeable data structures within Triplit. This would empower developers to: 1. Implement complex collaborative features natively within Triplit's data model. 2. Leverage specialized CRDTs optimized for specific use cases (e.g., performance characteristics of different sequence CRDTs). 3. Foster a community ecosystem where different CRDT plugins can be shared and reused. 4. Keep the Triplit core leaner while enabling broad extensibility. **Goals:** 1. **Define a Clear Interface:** Specify a robust TypeScript interface (`CRDTPluginInterface`) outlining the contract between Triplit core and a custom data type plugin. 2. **Enable Custom Logic:** Allow plugins to define their unique state representation (`TState`), CRDT merge algorithm (`merge`), and client-facing mutation logic (`generateChangePayload`). 3. **Flexible Storage Strategies:** Explicitly support both: * **State Blob:** Storing the CRDT state as a single opaque value in the KV store (simpler, suitable for types like Counters). * **Decomposed KV Representation (DKVR):** Allowing the CRDT state to be mapped across multiple KV entries (more complex, potentially more efficient for types like Lists or Text). 4. **Core Integration:** Define how Triplit's schema system, storage layer, transaction/merge logic, client API, sync protocol, and query engine will interact with registered plugins. 5. **Maintain Guarantees:** Ensure the plugin system upholds Triplit's core principles of offline-first operation, optimistic updates, HLC timestamping for consistency, and real-time synchronization. 6. **Basic Query Support:** Provide optional hooks within the plugin interface to enable basic filtering and sorting capabilities for custom types within Triplit queries. **Non-Goals (Initial Version):** * **Highly Optimized IVM for DKVR:** While the API allows DKVR, deeply optimizing the core Incremental View Maintenance for arbitrary decomposed structures is complex and deferred. Initial IVM might treat any change within a DKVR attribute's KV prefix as potentially invalidating dependent queries. * **Automatic Client Proxy Generation:** Automatically mapping standard JavaScript methods (e.g., `Array.prototype.push`) to custom CRDT operations is complex. The initial focus is on supporting explicit API methods initiated by application code. * **Complex Validation/UI Metadata in Core API:** Integrating external validation libraries (like Standard Schema) or UI component bindings directly. This functionality is better suited for complementary schema extension libraries built *on top* of this core data API. **Core Concepts & Integration Strategy:** This API leverages Triplit's existing architecture: * **KV Store Abstraction:** Plugins define how their `TState` maps onto `TStoredValue` (or multiple KV pairs for DKVR), interacting via the standard `KVStore` interface. * **Schema Definition:** Plugins provide a `schemaDefinition` function used by `Schema as S` (e.g., `S.MyCounter = MyCounterPlugin.schemaDefinition`). * **Merge Logic:** The plugin's `merge` function encapsulates the CRDT algorithm. Triplit's `EntityStore` orchestrates fetching current state, calling `plugin.merge` with states and HLC timestamps, and writing the result back atomically within a transaction. For DKVR, the `merge` function receives the `KVStoreTransaction` to perform multi-key operations. * **Client API:** Applications will use explicit methods (e.g., `client.incrementCounter(...)`) which internally call `plugin.generateChangePayload` to create the correct delta/state (`TChangePayload`) for the transaction. * **Synchronization:** The `TChangePayload` is stored in the outbox and transmitted. The server uses the corresponding registered plugin's `merge` function to apply changes. * **Querying:** Basic filtering/sorting is enabled via optional plugin methods (`compare`, `satisfiesOperator`), which may require state deserialization. **Proposed API: `CRDTPluginInterface<TState, TSchemaOptions, TStoredValue, TChangePayload>`** ```typescript import { DataType, TypeConfig, Timestamp, KVStoreTransaction, DBEntity, Tuple } from '@triplit/db'; /** Structure for DKVR storage operations. */ export type DKVRMutationPayload = { sets?: Map<Tuple, any>; // Map<KeySuffixTuple, Value> deletes?: Set<Tuple>; // Set<KeySuffixTuple> }; /** Input type for decodeFromStorage when using DKVR. */ export type DKVRScanResult = AsyncIterable<[keySuffix: Tuple, value: any]>; /** * Defines the contract for a custom CRDT data type plugin. * * @template TState - The canonical in-memory JavaScript representation (e.g., number for Counter). * @template TSchemaOptions - User-defined options via Schema builder (e.g., S.MyType(options)). * @template TStoredValue - How state is stored in KV (single value for 'blob', iterable value type for 'decomposed'). * @template TChangePayload - Representation of a change/delta for merge/sync (often TState for LWW, specific Op for op-based). */ export interface CRDTPluginInterface< TState, TSchemaOptions = any, TStoredValue = any, TChangePayload = TState > { /** REQUIRED: Unique string ID (e.g., "pn-counter", "myorg:rga-list"). Use namespacing. */ readonly typeId: string; // --- Schema Definition & Validation --- /** REQUIRED: Returns the base Triplit DataType for use in S.Collections({...}). */ schemaDefinition: (options?: TSchemaOptions) => DataType; /** OPTIONAL: Validates options passed to schemaDefinition. Returns error string if invalid. */ validateSchemaOptions?: (options: TSchemaOptions) => string | undefined; // --- Core CRDT Logic --- /** REQUIRED: Returns the default/initial JS state (TState). */ defaultValue: (config: TypeConfig) => TState; /** * REQUIRED: The core CRDT merge function. Combines incoming changes with current state using HLC timestamps. * For DKVR strategy, this function *must* use the provided kvTx for atomic multi-key reads/writes. */ merge( currentState: TState | undefined, incomingChangePayload: TChangePayload, currentTimestamp: Timestamp | undefined, incomingTimestamp: Timestamp, actorId: string, kvTx?: KVStoreTransaction // REQUIRED if storageStrategy is 'decomposed' ): TState; // --- Storage Strategy & Serialization --- /** REQUIRED: Specifies 'blob' (single KV value) or 'decomposed' (multiple KV values). */ readonly storageStrategy: 'blob' | 'decomposed'; /** * REQUIRED: Serializes JS TState for storage. * - 'blob': Returns TStoredValue (single value). * - 'decomposed': Returns DKVRMutationPayload { sets?: Map<SuffixTuple, Val>, deletes?: Set<SuffixTuple> }. */ encodeForStorage: (state: TState) => TStoredValue | DKVRMutationPayload; /** * REQUIRED: Deserializes from storage to JS TState. * - 'blob': Receives TStoredValue. * - 'decomposed': Receives DKVRScanResult (AsyncIterable over KV pairs for the attribute). */ decodeFromStorage: (storedValue: TStoredValue | DKVRScanResult) => TState; /** OPTIONAL (Recommended for Decomposed KV): Hints at the KV key structure. Helps core fetch data efficiently. */ readonly kvSchema?: Record<string, Tuple[]>; // e.g., { meta: [['_meta']], element: [['_element', '*']] } // --- Client Interaction --- /** REQUIRED (for mutations): Translates user API calls into the internal TChangePayload for merge/sync. */ generateChangePayload( mutationType: string, // Plugin-defined e.g., "increment" currentState: TState | undefined, args: any[], // Arguments from client API call actorId: string ): TChangePayload; // --- Optional Querying Hooks --- /** OPTIONAL: Provides comparison for `ORDER BY`. */ compare?: (stateA: TState, stateB: TState) => number; /** OPTIONAL: Implements filtering logic for `WHERE`. */ satisfiesOperator?: (state: TState, operator: string, compareValue: any) => boolean; /** OPTIONAL: Lists operators supported by `satisfiesOperator`. Defaults to =, !=, isDefined. */ supportedOperators?: () => ReadonlyArray<string>; /** OPTIONAL: Extracts a primitive for basic comparison (e.g., counter value). */ getComparableValue?: (state: TState) => string | number | boolean | null; // --- Optional Optimization Hooks --- /** OPTIONAL: Calculates delta for efficient sync. Defaults to sending full state/changes. */ diff?: (oldState: TState, newState: TState) => TChangePayload | null; /** OPTIONAL (for Decomposed KV): Helps IVM identify relevant low-level KV changes. */ isChangeRelevantToState?: (crdtAttributePath: string[], changedKvKey: string[]) => boolean; } ``` **Integration Points (Revised):** 1. **Registration:** Via `plugins` array in `TriplitClient`/`createServer` options. Core maintains a `Map<typeId, CRDTPluginInterface>`. 2. **Schema:** `S.<TypeName>` calls `plugin.schemaDefinition`. Validation uses `plugin.validateSchemaOptions`. 3. **Storage/Merge (`EntityStoreKV`):** * Reads (`getEntity`): Checks `plugin.storageStrategy`. If 'blob', uses `kv.get` then `plugin.decodeFromStorage`. If 'decomposed', uses `kv.scan` (with prefix derived from attribute path) and passes iterator to `plugin.decodeFromStorage`. * Writes (`applyChangesWithTimestamp`): Identifies custom type via schema. Fetches current state appropriately (get or scan). Calls `plugin.merge` (passing `kvTx` if 'decomposed'). Calls `plugin.encodeForStorage`. Writes result back via `kvTx.set` (for 'blob') or applies the `DKVRMutationPayload`'s sets/deletes atomically using `kvTx.set/delete` (for 'decomposed'). 4. **Client API/Transactions:** * **Recommend Explicit Methods:** `client.incrementCounter(...)`, `client.listPush(...)`, etc. * Inside `client.transact`: Read current state -> Call `plugin.generateChangePayload` -> Call `tx.update` with raw payload + internal flag `_crdtUpdate: true`. * `tx.update`: If `_crdtUpdate` flag is present, store raw payload in internal diff map. 5. **Sync Layer:** Uses `plugin.encodeForStorage` or `plugin.diff` for `CHANGES` message payload. Server receives payload, passes to `EntityStore` which invokes `plugin.merge`. Handles new message types if needed for op-based sync. 6. **Query Engine/IVM:** Uses optional query hooks (`compare`, `satisfiesOperator`, etc.) potentially involving `decodeFromStorage`. IVM integration for DKVR relies on future core work, potentially guided by `isChangeRelevantToState`. **Storage Strategy Implications:** * **State Blob:** Simpler plugin implementation. `encode/decode` handle single values. `merge` usually doesn't need `kvTx`. Efficient for small/infrequently updated CRDTs (Counters). Querying internals is slow. IVM treats it atomically. * **Decomposed KV (DKVR):** Advanced. `encode` returns sets/deletes. `decode` handles iterators. `merge` **requires** `kvTx`. Enables granular updates/sync, potentially better for large/complex types (Lists, Text). Querying/IVM integration is significantly more complex for both the plugin author and requires core Triplit enhancements for efficiency. **Example Plugin Sketch (PN-Counter using State Blob):** (Similar to previous example, demonstrating the simpler 'blob' strategy) ```typescript const MyPNCounterPlugin: CRDTPluginInterface<CounterState, CounterOptions, string, CounterState> = { typeId: 'pn-counter', storageStrategy: 'blob', // Explicitly state blob strategy schemaDefinition: (options?) => S.Counter(options), defaultValue: (config) => ({ p: {}, n: {} }), merge: (current, incoming /* CounterState */, currentTs, incomingTs, actorId) => { // LWW merge based on attribute HLC timestamp if (!currentTs || HybridLogicalClock.compare(incomingTs, currentTs) > 0) { return incoming; } return current; }, encodeForStorage: (state) => JSON.stringify(state), // Store as JSON string decodeFromStorage: (storedValue: string) => JSON.parse(storedValue), generateChangePayload: (mutationType, currentState, args, actorId) => { if (mutationType !== 'increment') throw new Error('Unsupported counter op'); const amount = args[0] as number; const newState = structuredClone(currentState ?? { p: {}, n: {} }); if (amount > 0) newState.p[actorId] = (newState.p[actorId] || 0) + amount; else newState.n[actorId] = (newState.n[actorId] || 0) + Math.abs(amount); return newState; // Return full new state for LWW merge }, getComparableValue: (state) => /* Calculate value */, // ... other optional query hooks }; ``` **Benefits:** * **True Extensibility:** Enables core data types beyond primitives. * **Encapsulated Logic:** CRDT complexity lives within plugins. * **Adaptable Storage:** Supports simple blob storage and provides a path for more complex decomposed storage. * **Community Potential:** Allows sharing and standardization of common collaborative types. **Challenges & Open Questions:** 1. **DKVR Implementation Details:** How should `encodeForStorage` signal multi-key writes? How should `decodeFromStorage` reliably receive scanned data? What KV scan options are needed? 2. **IVM for DKVR:** What core changes are minimally required for *basic* IVM support with DKVR? Can `isChangeRelevantToState` be effective without deeper IVM awareness? 3. **Op-based Sync Protocol:** Is a generic `CRDT_OP` message feasible, or are type-specific messages better? How are ops batched efficiently? 4. **Transactionality of Merge:** Ensuring the `merge` function (especially for DKVR reading/writing multiple keys via `kvTx`) remains performant within transactions. 5. **Client API for DKVR:** How to provide an intuitive client API for complex CRDTs using DKVR without overly exposing internal KV structure? **Next Steps:** * Gather community feedback on the proposed interface and strategy. * Refine the API based on feedback, particularly the `encode/decodeForStorage` contract for DKVR. * Prototype a simple 'blob'-based plugin (like the Counter) to validate core integration points. * Begin design discussions on the necessary core IVM enhancements required for efficient DKVR support. --- # Triplit Plugin API: Follow-up Proposal ## Addressing Open Concerns This document builds on the initial Plugin API proposal, providing concrete solutions for three identified concerns: 1. DKVR + IVM performance 2. Underspecified op-based sync 3. Conflated state-based vs op-based merge semantics --- ## Concern 1: DKVR + IVM Performance ### Problem Statement The original proposal defers IVM optimization for Decomposed KV Representation (DKVR). This means plugins storing state across multiple KV entries (lists, text CRDTs) will trigger full re-evaluation of dependent queries on any sub-key change—potentially making real-time queries on complex types unusably slow. ### Proposed Solution: Queryable Projections Rather than teaching IVM to understand arbitrary DKVR structures, we introduce **queryable projections**—plugin-defined materialized values that IVM *can* track efficiently. ```typescript interface QueryableProjection<TState> { /** Unique key for this projection (e.g., "length", "firstItem", "checksum") */ readonly key: string; /** Extract projection value from state. Must be deterministic. */ extract: (state: TState) => string | number | boolean | null; /** Operators this projection supports in WHERE clauses */ supportedOperators: ReadonlyArray<'=' | '!=' | '>' | '<' | '>=' | '<=' | 'contains'>; } ``` Plugins declare projections upfront: ```typescript const RGAListPlugin: CRDTPluginInterface<RGAState, ...> = { typeId: 'rga-list', storageStrategy: 'decomposed', // NEW: Declare what's queryable projections: [ { key: 'length', extract: (state) => state.items.filter(i => !i.tombstone).length, supportedOperators: ['=', '!=', '>', '<', '>=', '<='], }, { key: 'isEmpty', extract: (state) => state.items.filter(i => !i.tombstone).length === 0, supportedOperators: ['=', '!='], }, { key: 'firstItem', extract: (state) => state.items.find(i => !i.tombstone)?.value ?? null, supportedOperators: ['=', '!='], }, ], // ... rest of plugin }; ``` ### Storage Strategy: Co-located Projection Values When a DKVR plugin writes state, the core also writes projection values to dedicated KV keys: ``` # DKVR state keys (plugin-managed) ['entities', 'tasks', 'task-1', 'items', '_element', 'uuid-a'] → { value: "Buy milk", ... } ['entities', 'tasks', 'task-1', 'items', '_element', 'uuid-b'] → { value: "Walk dog", ... } # Projection keys (core-managed, derived from plugin.projections) ['entities', 'tasks', 'task-1', 'items', '_proj', 'length'] → 2 ['entities', 'tasks', 'task-1', 'items', '_proj', 'isEmpty'] → false ['entities', 'tasks', 'task-1', 'items', '_proj', 'firstItem'] → "Buy milk" ``` ### IVM Integration IVM subscribes to projection keys, not DKVR element keys: ```typescript // Query: WHERE items.length > 5 // IVM watches: ['entities', 'tasks', '*', 'items', '_proj', 'length'] ``` When any DKVR key under an attribute changes: 1. Core calls `plugin.decodeFromStorage()` to get full state 2. Core calls each `projection.extract(state)` 3. Core compares with stored projection values 4. Only if projections changed → write new projection values → IVM triggers This bounds IVM complexity: it only sees primitive projection changes, never arbitrary DKVR structures. ### Projection Update Contract ```typescript interface CRDTPluginInterface<...> { // ... existing fields ... /** * OPTIONAL: Queryable projections for IVM efficiency. * If omitted, queries on this type require full decode (slow). */ projections?: ReadonlyArray<QueryableProjection<TState>>; /** * OPTIONAL: Hint whether a DKVR mutation might change projections. * If returns false, core skips projection recalculation. * Default: always recalculate (safe but slower). */ mightAffectProjections?: ( mutationType: string, args: any[] ) => boolean | Set<string>; // true = all, Set = specific projection keys } ``` Example optimization: ```typescript const RGAListPlugin = { // ... mightAffectProjections: (mutationType, args) => { switch (mutationType) { case 'insert': case 'delete': return new Set(['length', 'isEmpty', 'firstItem']); case 'updateElement': // Only firstItem might change if we updated index 0 return args[0] === 0 ? new Set(['firstItem']) : false; default: return true; // Unknown op, recalculate all } }, }; ``` ### Tradeoffs | Approach | Query Performance | Write Performance | Plugin Complexity | |----------|-------------------|-------------------|-------------------| | No projections | O(n) decode per query | O(1) | Low | | With projections | O(1) lookup | O(k) projection extraction | Medium | | Full IVM awareness | O(1) incremental | O(1) | High (requires core changes) | Projections are the pragmatic middle ground—good query performance without requiring deep core modifications. ## Concern 2: Op-Based Sync Protocol ### Problem Statement The original proposal asks: "Is a generic CRDT_OP message feasible, or are type-specific messages better?" This needs a concrete answer before implementation. ### Proposed Solution: Envelope Protocol with Strategy Declaration We define a **type-agnostic envelope** that carries **type-specific payloads**. The sync layer routes messages; plugins interpret contents. ```typescript /** Sync strategy declared by plugin */ type SyncStrategy = 'state' | 'operation' | 'delta'; /** Wire format for CRDT sync messages */ interface CRDTSyncMessage { /** Message type identifier */ type: 'CRDT_SYNC'; /** Entity location */ entityId: string; attributePath: string[]; /** Plugin identifier for routing */ crdtTypeId: string; /** How to interpret the payload */ strategy: SyncStrategy; /** HLC timestamp */ timestamp: Timestamp; /** Originating actor */ actorId: string; /** * Plugin-serialized payload. Opaque to sync layer. * - 'state': Full serialized TState * - 'operation': Serialized TOp or TOp[] * - 'delta': Serialized TDelta */ payload: string; // JSON-serialized by plugin /** Optional: Sequence number for op-based CRDTs requiring causal order */ sequenceNumber?: number; /** Optional: Dependencies for causal delivery */ dependencies?: Array<{ actorId: string; sequenceNumber: number }>; } ``` ### Sync Layer Responsibilities The sync layer (transport, server-core) handles: 1. **Routing**: Match `crdtTypeId` to registered plugin 2. **Ordering**: Buffer and reorder messages if `dependencies` present 3. **Deduplication**: Use `(entityId, attributePath, actorId, sequenceNumber)` as idempotency key 4. **Batching**: Aggregate multiple `CRDT_SYNC` messages per entity in single WebSocket frame The sync layer does NOT: - Parse or understand `payload` contents - Perform merge logic (that's the plugin's job) - Make assumptions about CRDT semantics ### Plugin Serialization Contract ```typescript interface CRDTPluginInterface<TState, TSchemaOptions, TStoredValue, TChangePayload> { // ... existing fields ... /** REQUIRED: How this CRDT syncs over the wire */ readonly syncStrategy: SyncStrategy; /** REQUIRED: Serialize change for network transmission */ serializeForSync: (change: TChangePayload) => string; /** REQUIRED: Deserialize change from network */ deserializeFromSync: (serialized: string) => TChangePayload; /** * OPTIONAL (for 'operation' strategy): * Declare if operations require causal delivery. * If true, core tracks vector clocks and buffers out-of-order ops. */ requiresCausalDelivery?: boolean; /** * OPTIONAL (for 'operation' strategy): * Compress multiple operations into one (reduce network traffic). * E.g., [insert('a'), insert('b')] → insertBatch(['a', 'b']) */ compressOperations?: (ops: TChangePayload[]) => TChangePayload; } ``` ### Example: State-Based Counter ```typescript const GCounterPlugin: CRDTPluginInterface<GCounterState, {}, string, GCounterState> = { typeId: 'g-counter', syncStrategy: 'state', // Full state sync serializeForSync: (state) => JSON.stringify(state), deserializeFromSync: (s) => JSON.parse(s), // No causal delivery needed—state merge is commutative }; ``` ### Example: Op-Based List with Causal Delivery ```typescript type ListOp = | { type: 'insert'; id: string; afterId: string | null; value: any } | { type: 'delete'; id: string }; const RGAListPlugin: CRDTPluginInterface<RGAState, {}, DKVRMutationPayload, ListOp> = { typeId: 'rga-list', syncStrategy: 'operation', requiresCausalDelivery: true, // Inserts must see their "afterId" first serializeForSync: (op) => JSON.stringify(op), deserializeFromSync: (s) => JSON.parse(s), compressOperations: (ops) => { // Combine consecutive inserts from same position // ... implementation ... return ops.length === 1 ? ops[0] : { type: 'batch', ops }; }, }; ``` ### Causal Delivery Implementation For plugins with `requiresCausalDelivery: true`, the sync layer maintains: ```typescript interface CausalBuffer { // Pending ops waiting for dependencies pending: Map<string, { op: CRDTSyncMessage; missingDeps: Set<string> }>; // Delivered sequence numbers per actor delivered: Map<string, number>; // actorId → highest delivered seqNum // Enqueue and attempt delivery enqueue(message: CRDTSyncMessage): CRDTSyncMessage[]; // Returns newly deliverable } ``` This is opt-in complexity. Simple state-based CRDTs ignore it entirely. ## Concern 3: Conflated State-Based vs Op-Based Merge Semantics ### Problem Statement The original merge signature: ```typescript merge( currentState: TState | undefined, incomingChangePayload: TChangePayload, currentTimestamp: Timestamp | undefined, incomingTimestamp: Timestamp, actorId: string, kvTx?: KVStoreTransaction ): TState; ``` This conflates three distinct CRDT patterns: | Pattern | Merge Semantics | Mathematical Property | |---------|-----------------|----------------------| | State-based | `merge(stateA, stateB) → state` | Commutative, associative, idempotent | | Op-based | `apply(state, operation) → state` | Operations commute (or causal delivery) | | Delta-state | `mergeDelta(state, delta) → state` | Deltas are join-semilattice fragments | ### Proposed Solution: Discriminated Union Interface Split the core CRDT logic into three explicit variants: ```typescript /** State-based CRDT (e.g., G-Counter, OR-Set) */ interface StateBasedCRDTCore<TState> { readonly syncStrategy: 'state'; /** * Merge two complete states. Must be: * - Commutative: merge(a, b) = merge(b, a) * - Associative: merge(merge(a, b), c) = merge(a, merge(b, c)) * - Idempotent: merge(a, a) = a */ merge: (stateA: TState, stateB: TState) => TState; } /** Operation-based CRDT (e.g., RGA, Counter with inc/dec ops) */ interface OpBasedCRDTCore<TState, TOp> { readonly syncStrategy: 'operation'; /** * Apply an operation to current state. * If requiresCausalDelivery is false, operations must commute. */ apply: ( state: TState, operation: TOp, timestamp: Timestamp, actorId: string ) => TState; /** Whether ops require causal ordering */ requiresCausalDelivery: boolean; } /** Delta-state CRDT (e.g., Delta-CRDT variants for bandwidth efficiency) */ interface DeltaStateCRDTCore<TState, TDelta> { readonly syncStrategy: 'delta'; /** Merge a delta into current state */ mergeDelta: (state: TState, delta: TDelta) => TState; /** Extract delta representing changes from old to new state */ extractDelta: (oldState: TState, newState: TState) => TDelta; /** Merge two deltas (for delta aggregation) */ mergeDeltas?: (deltaA: TDelta, deltaB: TDelta) => TDelta; } /** Union type for CRDT core logic */ type CRDTCore<TState, TOp = any, TDelta = any> = | StateBasedCRDTCore<TState> | OpBasedCRDTCore<TState, TOp> | DeltaStateCRDTCore<TState, TDelta>; ``` ### Revised Plugin Interface ```typescript interface CRDTPluginInterface TState, TSchemaOptions = any, TStoredValue = any, TChangePayload = TState, TOp = any, TDelta = any > { /** REQUIRED: Unique identifier */ readonly typeId: string; // --- Schema --- schemaDefinition: (options?: TSchemaOptions) => DataType; defaultValue: (config: TypeConfig) => TState; // --- CRDT Core (discriminated by syncStrategy) --- readonly core: CRDTCore<TState, TOp, TDelta>; // --- Storage --- readonly storageStrategy: 'blob' | 'decomposed'; encodeForStorage: (state: TState) => TStoredValue | DKVRMutationPayload; decodeFromStorage: (stored: TStoredValue | DKVRScanResult) => TState; // --- Sync Serialization --- serializeForSync: (change: TChangePayload) => string; deserializeFromSync: (serialized: string) => TChangePayload; // --- Client Mutations --- /** * Generate change payload from user action. * Return type depends on syncStrategy: * - 'state': Return new TState * - 'operation': Return TOp * - 'delta': Return TDelta */ generateChangePayload: ( mutationType: string, currentState: TState | undefined, args: any[], actorId: string ) => TChangePayload; // --- Queryable Projections (for IVM) --- projections?: ReadonlyArray<QueryableProjection<TState>>; mightAffectProjections?: (mutationType: string, args: any[]) => boolean | Set<string>; // --- Optional Query Hooks --- compare?: (stateA: TState, stateB: TState) => number; } ``` ### Core Integration: Type-Safe Dispatch The EntityStore dispatches to the correct method based on `syncStrategy`: ```typescript // In EntityStoreKV.applyRemoteChange() function applyRemoteChange( plugin: CRDTPluginInterface<any, any, any, any, any, any>, currentState: TState | undefined, message: CRDTSyncMessage ): TState { const change = plugin.deserializeFromSync(message.payload); switch (plugin.core.syncStrategy) { case 'state': // State-based: merge current with incoming state const current = currentState ?? plugin.defaultValue({}); return plugin.core.merge(current, change as TState); case 'operation': // Op-based: apply operation to current state const state = currentState ?? plugin.defaultValue({}); return plugin.core.apply( state, change, message.timestamp, message.actorId ); case 'delta': // Delta-state: merge delta into current state const base = currentState ?? plugin.defaultValue({}); return plugin.core.mergeDelta(base, change); default: throw new Error(`Unknown sync strategy: ${(plugin.core as any).syncStrategy}`); } } ``` ### Example: G-Counter (State-Based) ```typescript type GCounterState = Record<string, number>; // actorId → count const GCounterPlugin: CRDTPluginInterface<GCounterState, {}, string, GCounterState> = { typeId: 'g-counter', schemaDefinition: () => S.Custom({ type: 'g-counter' }), defaultValue: () => ({}), core: { syncStrategy: 'state', merge: (a, b) => { // Point-wise maximum const result: GCounterState = { ...a }; for (const [actor, count] of Object.entries(b)) { result[actor] = Math.max(result[actor] ?? 0, count); } return result; }, }, storageStrategy: 'blob', encodeForStorage: (state) => JSON.stringify(state), decodeFromStorage: (stored) => JSON.parse(stored as string), serializeForSync: (state) => JSON.stringify(state), deserializeFromSync: (s) => JSON.parse(s), generateChangePayload: (mutationType, currentState, args, actorId) => { if (mutationType !== 'increment') throw new Error('Unknown mutation'); const amount = args[0] as number; const state = currentState ?? {}; return { ...state, [actorId]: (state[actorId] ?? 0) + amount }; }, projections: [ { key: 'value', extract: (state) => Object.values(state).reduce((a, b) => a + b, 0), supportedOperators: ['=', '!=', '>', '<', '>=', '<='], }, ], }; ``` ### Example: RGA List (Op-Based with Causal Delivery) ```typescript type RGAOp = | { type: 'insert'; id: string; afterId: string | null; value: any } | { type: 'delete'; id: string }; interface RGAElement { id: string; value: any; tombstone: boolean; timestamp: Timestamp; } interface RGAState { elements: Map<string, RGAElement>; order: string[]; // Sorted element IDs } const RGAListPlugin: CRDTPluginInterface<RGAState, {}, DKVRMutationPayload, RGAOp, RGAOp> = { typeId: 'rga-list', schemaDefinition: () => S.Custom({ type: 'rga-list' }), defaultValue: () => ({ elements: new Map(), order: [] }), core: { syncStrategy: 'operation', requiresCausalDelivery: true, // Insert needs to see afterId apply: (state, op, timestamp, actorId) => { const newState = structuredClone(state); switch (op.type) { case 'insert': { const element: RGAElement = { id: op.id, value: op.value, tombstone: false, timestamp, }; newState.elements.set(op.id, element); // Find position after afterId const afterIndex = op.afterId ? newState.order.indexOf(op.afterId) : -1; // Insert, handling concurrent inserts via timestamp comparison let insertIndex = afterIndex + 1; while ( insertIndex < newState.order.length && compareTimestamps( newState.elements.get(newState.order[insertIndex])!.timestamp, timestamp ) > 0 ) { insertIndex++; } newState.order.splice(insertIndex, 0, op.id); break; } case 'delete': { const element = newState.elements.get(op.id); if (element) { element.tombstone = true; } break; } } return newState; }, }, storageStrategy: 'decomposed', encodeForStorage: (state): DKVRMutationPayload => { const sets = new Map<Tuple, any>(); // Store order metadata sets.set(['_meta', 'order'], state.order); // Store each element for (const [id, element] of state.elements) { sets.set(['_element', id], element); } return { sets }; }, decodeFromStorage: async (scan: DKVRScanResult): Promise<RGAState> => { const state: RGAState = { elements: new Map(), order: [] }; for await (const [key, value] of scan) { if (key[0] === '_meta' && key[1] === 'order') { state.order = value; } else if (key[0] === '_element') { state.elements.set(key[1] as string, value); } } return state; }, serializeForSync: (op) => JSON.stringify(op), deserializeFromSync: (s) => JSON.parse(s), generateChangePayload: (mutationType, currentState, args, actorId) => { switch (mutationType) { case 'insert': { const [index, value] = args; const afterId = index === 0 ? null : currentState?.order[index - 1] ?? null; return { type: 'insert', id: `${actorId}-${Date.now()}-${Math.random()}`, afterId, value, }; } case 'delete': { const [index] = args; return { type: 'delete', id: currentState!.order[index], }; } default: throw new Error(`Unknown mutation: ${mutationType}`); } }, projections: [ { key: 'length', extract: (state) => state.order.filter(id => !state.elements.get(id)?.tombstone ).length, supportedOperators: ['=', '!=', '>', '<', '>=', '<='], }, ], mightAffectProjections: (mutationType) => { return mutationType === 'insert' || mutationType === 'delete' ? new Set(['length']) : false; }, }; ``` ## Summary of Changes | Concern | Original Proposal | This Proposal | |---------|-------------------|---------------| | **DKVR + IVM** | "Deferred" | Queryable projections with explicit extraction | | **Op-based sync** | "Undecided" | Envelope protocol with strategy declaration | | **Merge semantics** | Single conflated `merge()` | Discriminated union: `merge` / `apply` / `mergeDelta` | ### New Required Fields ```typescript interface CRDTPluginInterface { // NEW: Replaces old merge() core: CRDTCore<TState, TOp, TDelta>; // NEW: Wire serialization serializeForSync: (change: TChangePayload) => string; deserializeFromSync: (serialized: string) => TChangePayload; // NEW (optional): IVM optimization projections?: ReadonlyArray<QueryableProjection<TState>>; mightAffectProjections?: (mutationType: string, args: any[]) => boolean | Set<string>; } ``` ### Migration Path Existing plugin sketches using the old `merge()` signature can be adapted: ```typescript // Old merge: (current, incoming, currentTs, incomingTs, actorId) => { ... } // New (state-based) core: { syncStrategy: 'state', merge: (a, b) => { /* same logic, but pure state merge */ } } ``` ## Next Steps 1. **Validate with prototype**: Implement G-Counter (state-based, blob) and RGA-List (op-based, DKVR) to stress-test the interface 2. **Benchmark projections**: Measure overhead of projection extraction vs query performance gains 3. **Causal delivery implementation**: Build the causal buffer for op-based CRDTs in `@triplit/server-core` 4. **TypeScript inference**: Ensure `TChangePayload` correctly narrows based on `syncStrategy` Yes, this is a natural extension. Triplit already mentions Durable Objects support in their README. Let me sketch out how all three pieces fit together architecturally. --- # Triplit Cloudflare Integration: Architecture Proposal ## Overview This proposal extends Triplit with first-class Cloudflare support: | Component | Role | Benefit | |-----------|------|---------| | **Durable Objects** | Per-document/per-user sync coordinators | Global edge distribution, WebSocket handling, single-writer consistency | | **D1** | Persistent SQL storage | Queryable data, automatic replication, familiar SQL | | **Local Replicas** | Client-side SQLite via D1's local mode | Offline support, instant reads, sync when online | ## Architecture ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ Cloudflare Edge Network │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Durable │ │ Durable │ │ Durable │ │ │ │ Object │ │ Object │ │ Object │ ... │ │ │ (doc-123) │ │ (doc-456) │ │ (user-abc) │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ └───────────────────┼───────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ D1 │ │ │ │ (SQLite) │ │ │ │ │ │ │ │ - Entities │ │ │ │ - Triples │ │ │ │ - Projections │ │ │ └─────────────────┘ │ │ │ │ └─────────────────────────────┼────────────────────────────────────────────────┘ │ Read Replicas ▼ ┌─────────────────────────────────────────────────────────────────────────────┐ │ Clients │ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Browser │ │ Mobile App │ │ Desktop App │ │ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │ │ │ IndexedDB │ │ │ │ SQLite │ │ │ │ SQLite │ │ │ │ │ │ (Triplit) │ │ │ │ (Triplit) │ │ │ │ (Triplit) │ │ │ │ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ ▲ ▲ ▲ │ │ │ WebSocket │ │ │ │ └────────────────────┴────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` --- ## Component 1: Durable Objects Storage Adapter Durable Objects provide single-instance coordination with built-in storage. Each DO can manage sync for a document, user, or shard. ### Adapter Interface ```typescript // packages/cojson-storage-durable-object/src/index.ts import { DurableObject } from 'cloudflare:workers'; interface TriplitDurableObjectEnv { TRIPLIT_DO: DurableObjectNamespace; D1_DATABASE?: D1Database; // Optional: for D1 persistence } /** * Storage adapter using Durable Object's built-in storage API. * Suitable for per-document or per-user coordination. */ export class DurableObjectStorageAdapter implements TriplitStorageAdapter { constructor(private storage: DurableObjectStorage) {} async get(key: Tuple): Promise<any> { return this.storage.get(tupleToString(key)); } async set(key: Tuple, value: any): Promise<void> { await this.storage.put(tupleToString(key), value); } async delete(key: Tuple): Promise<void> { await this.storage.delete(tupleToString(key)); } async scan(options: ScanOptions): Promise<AsyncIterable<[Tuple, any]>> { const prefix = tupleToString(options.prefix); const entries = await this.storage.list({ prefix }); return (async function* () { for (const [key, value] of entries) { yield [stringToTuple(key), value]; } })(); } async transact<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T> { // DO storage has implicit transactions within a single request // For explicit transaction control, use storage.transaction() return this.storage.transaction(async (txn) => { const tx = new DurableObjectTransaction(txn); return fn(tx); }); } } ``` ### Durable Object Implementation ```typescript // src/triplit-sync-do.ts import { DurableObject } from 'cloudflare:workers'; import { createTriplitServer, TriplitServer } from '@triplit/server-core'; import { DurableObjectStorageAdapter } from '@triplit/storage-durable-object'; export class TriplitSyncDO extends DurableObject { private server: TriplitServer | null = null; private connections: Map<string, WebSocket> = new Map(); constructor(state: DurableObjectState, env: Env) { super(state, env); } private async getServer(): Promise<TriplitServer> { if (!this.server) { const storage = new DurableObjectStorageAdapter(this.ctx.storage); this.server = createTriplitServer({ storage, // Register CRDT plugins plugins: [GCounterPlugin, RGAListPlugin], }); } return this.server; } async fetch(request: Request): Promise<Response> { const url = new URL(request.url); // WebSocket upgrade for sync if (request.headers.get('Upgrade') === 'websocket') { return this.handleWebSocket(request); } // REST API for queries (optional) if (url.pathname.startsWith('/api/')) { return this.handleRest(request); } return new Response('Not found', { status: 404 }); } private async handleWebSocket(request: Request): Promise<Response> { const pair = new WebSocketPair(); const [client, server] = Object.values(pair); const connectionId = crypto.randomUUID(); this.connections.set(connectionId, server); server.accept(); const triplitServer = await this.getServer(); server.addEventListener('message', async (event) => { const message = JSON.parse(event.data as string); const response = await triplitServer.handleMessage(connectionId, message); if (response) { server.send(JSON.stringify(response)); } // Broadcast to other connections if needed if (response?.broadcast) { this.broadcast(connectionId, response.broadcast); } }); server.addEventListener('close', () => { this.connections.delete(connectionId); triplitServer.handleDisconnect(connectionId); }); return new Response(null, { status: 101, webSocket: client }); } private broadcast(excludeId: string, message: any): void { const data = JSON.stringify(message); for (const [id, ws] of this.connections) { if (id !== excludeId && ws.readyState === WebSocket.OPEN) { ws.send(data); } } } // Hibernation support for cost efficiency async webSocketMessage(ws: WebSocket, message: string): Promise<void> { // Handle messages after hibernation wake const server = await this.getServer(); const connectionId = this.getConnectionId(ws); const response = await server.handleMessage(connectionId, JSON.parse(message)); if (response) { ws.send(JSON.stringify(response)); } } async webSocketClose(ws: WebSocket): Promise<void> { const connectionId = this.getConnectionId(ws); this.connections.delete(connectionId); } } ``` ### Worker Entry Point ```typescript // src/index.ts export { TriplitSyncDO } from './triplit-sync-do'; export default { async fetch(request: Request, env: Env): Promise<Response> { const url = new URL(request.url); // Route to appropriate Durable Object based on document/user ID // Example: /sync/doc/doc-123 → DO instance for doc-123 const match = url.pathname.match(/^\/sync\/(doc|user)\/([^/]+)/); if (match) { const [, type, id] = match; const doId = env.TRIPLIT_DO.idFromName(`${type}:${id}`); const stub = env.TRIPLIT_DO.get(doId); return stub.fetch(request); } return new Response('Invalid route', { status: 400 }); }, }; ``` --- ## Component 2: D1 Storage Adapter D1 provides SQLite at the edge with automatic replication. Better for larger datasets and complex queries. ### Schema ```sql -- D1 schema for Triplit -- Optimized for the triple store model -- Entity headers CREATE TABLE entities ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, PRIMARY KEY (collection, entity_id) ); -- Triple store (attribute-value-timestamp) CREATE TABLE triples ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, value TEXT, -- JSON-encoded timestamp_hlc TEXT NOT NULL, -- HLC as string for sorting actor_id TEXT NOT NULL, tombstone INTEGER DEFAULT 0, PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc) ); -- CRDT projections (for IVM) CREATE TABLE projections ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, projection_key TEXT NOT NULL, value TEXT, -- JSON-encoded primitive PRIMARY KEY (collection, entity_id, attribute, projection_key) ); -- Indexes for common queries CREATE INDEX idx_triples_entity ON triples(collection, entity_id); CREATE INDEX idx_triples_attribute ON triples(collection, attribute); CREATE INDEX idx_projections_lookup ON projections(collection, projection_key, value); -- For DKVR storage (decomposed CRDTs) CREATE TABLE dkvr_entries ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, key_suffix TEXT NOT NULL, -- JSON-encoded tuple value TEXT NOT NULL, -- JSON-encoded PRIMARY KEY (collection, entity_id, attribute, key_suffix) ); ``` ### D1 Storage Adapter ```typescript // packages/cojson-storage-d1/src/index.ts export class D1StorageAdapter implements TriplitStorageAdapter { constructor(private db: D1Database) {} async get(key: Tuple): Promise<any> { const [collection, entityId, attribute, ...rest] = key; if (rest.length === 0) { // Get latest triple value const result = await this.db .prepare(` SELECT value FROM triples WHERE collection = ? AND entity_id = ? AND attribute = ? AND tombstone = 0 ORDER BY timestamp_hlc DESC LIMIT 1 `) .bind(collection, entityId, attribute) .first(); return result ? JSON.parse(result.value as string) : undefined; } // DKVR lookup const keySuffix = JSON.stringify(rest); const result = await this.db .prepare(` SELECT value FROM dkvr_entries WHERE collection = ? AND entity_id = ? AND attribute = ? AND key_suffix = ? `) .bind(collection, entityId, attribute, keySuffix) .first(); return result ? JSON.parse(result.value as string) : undefined; } async set(key: Tuple, value: any, timestamp?: Timestamp): Promise<void> { const [collection, entityId, attribute, ...rest] = key; const jsonValue = JSON.stringify(value); if (rest.length === 0) { // Standard triple await this.db .prepare(` INSERT INTO triples (collection, entity_id, attribute, value, timestamp_hlc, actor_id) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (collection, entity_id, attribute, timestamp_hlc) DO UPDATE SET value = excluded.value `) .bind(collection, entityId, attribute, jsonValue, timestamp?.toString(), timestamp?.actorId) .run(); } else { // DKVR entry const keySuffix = JSON.stringify(rest); await this.db .prepare(` INSERT INTO dkvr_entries (collection, entity_id, attribute, key_suffix, value) VALUES (?, ?, ?, ?, ?) ON CONFLICT (collection, entity_id, attribute, key_suffix) DO UPDATE SET value = excluded.value `) .bind(collection, entityId, attribute, keySuffix, jsonValue) .run(); } } async scan(options: ScanOptions): Promise<AsyncIterable<[Tuple, any]>> { const { prefix, limit, reverse } = options; const [collection, entityId, attribute] = prefix; let query: string; let bindings: any[]; if (attribute) { // DKVR scan within an attribute query = ` SELECT key_suffix, value FROM dkvr_entries WHERE collection = ? AND entity_id = ? AND attribute = ? ${reverse ? 'ORDER BY key_suffix DESC' : 'ORDER BY key_suffix ASC'} ${limit ? `LIMIT ${limit}` : ''} `; bindings = [collection, entityId, attribute]; } else if (entityId) { // Scan all attributes of an entity query = ` SELECT attribute, value, timestamp_hlc FROM triples WHERE collection = ? AND entity_id = ? AND tombstone = 0 ORDER BY attribute ${reverse ? 'DESC' : 'ASC'} ${limit ? `LIMIT ${limit}` : ''} `; bindings = [collection, entityId]; } else { // Scan all entities in collection query = ` SELECT DISTINCT entity_id FROM entities WHERE collection = ? ORDER BY entity_id ${reverse ? 'DESC' : 'ASC'} ${limit ? `LIMIT ${limit}` : ''} `; bindings = [collection]; } const results = await this.db.prepare(query).bind(...bindings).all(); return (async function* () { for (const row of results.results) { // Transform row to [Tuple, value] format yield transformRowToTuple(row, prefix); } })(); } async transact<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T> { // D1 supports batch operations const batch: D1PreparedStatement[] = []; const tx: StorageTransaction = { get: (key) => this.get(key), set: (key, value, timestamp) => { // Queue for batch execution batch.push(this.prepareSet(key, value, timestamp)); return Promise.resolve(); }, delete: (key) => { batch.push(this.prepareDelete(key)); return Promise.resolve(); }, }; const result = await fn(tx); // Execute all writes in a batch if (batch.length > 0) { await this.db.batch(batch); } return result; } // Projection support async setProjection( collection: string, entityId: string, attribute: string, projectionKey: string, value: any ): Promise<void> { await this.db .prepare(` INSERT INTO projections (collection, entity_id, attribute, projection_key, value) VALUES (?, ?, ?, ?, ?) ON CONFLICT (collection, entity_id, attribute, projection_key) DO UPDATE SET value = excluded.value `) .bind(collection, entityId, attribute, projectionKey, JSON.stringify(value)) .run(); } async queryByProjection( collection: string, attribute: string, projectionKey: string, operator: string, compareValue: any ): Promise<string[]> { const sqlOperator = this.toSqlOperator(operator); const results = await this.db .prepare(` SELECT entity_id FROM projections WHERE collection = ? AND attribute = ? AND projection_key = ? AND value ${sqlOperator} ? `) .bind(collection, attribute, projectionKey, JSON.stringify(compareValue)) .all(); return results.results.map((r) => r.entity_id as string); } } ``` --- ## Component 3: Hybrid DO + D1 Architecture Best of both worlds: Durable Objects for real-time sync coordination, D1 for persistent queryable storage. ```typescript // src/triplit-hybrid-do.ts export class TriplitHybridDO extends DurableObject { private server: TriplitServer | null = null; constructor( state: DurableObjectState, private env: { D1_DATABASE: D1Database } ) { super(state, env); } private async getServer(): Promise<TriplitServer> { if (!this.server) { // Use DO storage for hot data / active sync state const doStorage = new DurableObjectStorageAdapter(this.ctx.storage); // Use D1 for persistent storage and queries const d1Storage = new D1StorageAdapter(this.env.D1_DATABASE); // Tiered storage: DO for recent, D1 for persistent const storage = new TieredStorageAdapter({ hot: doStorage, cold: d1Storage, // Move to D1 after 5 minutes of inactivity tierDownAfterMs: 5 * 60 * 1000, }); this.server = createTriplitServer({ storage, plugins: [GCounterPlugin, RGAListPlugin], // Sync to D1 on every mutation onMutation: async (collection, entityId, changes) => { await d1Storage.persistChanges(collection, entityId, changes); }, }); } return this.server; } // ... WebSocket handling same as before } ``` ### Tiered Storage Adapter ```typescript // packages/cojson-storage-tiered/src/index.ts interface TieredStorageOptions { hot: TriplitStorageAdapter; // Fast, ephemeral (DO storage) cold: TriplitStorageAdapter; // Persistent, queryable (D1) tierDownAfterMs: number; } export class TieredStorageAdapter implements TriplitStorageAdapter { private hotKeys: Map<string, number> = new Map(); // key -> last access time constructor(private options: TieredStorageOptions) { // Periodically tier down inactive keys setInterval(() => this.tierDown(), 60_000); } async get(key: Tuple): Promise<any> { const keyStr = tupleToString(key); // Try hot storage first const hotValue = await this.options.hot.get(key); if (hotValue !== undefined) { this.hotKeys.set(keyStr, Date.now()); return hotValue; } // Fall back to cold storage const coldValue = await this.options.cold.get(key); if (coldValue !== undefined) { // Promote to hot storage await this.options.hot.set(key, coldValue); this.hotKeys.set(keyStr, Date.now()); } return coldValue; } async set(key: Tuple, value: any, timestamp?: Timestamp): Promise<void> { const keyStr = tupleToString(key); // Write to hot storage immediately await this.options.hot.set(key, value, timestamp); this.hotKeys.set(keyStr, Date.now()); // Async write to cold storage (fire and forget, or queue) this.options.cold.set(key, value, timestamp).catch(console.error); } private async tierDown(): Promise<void> { const now = Date.now(); const threshold = now - this.options.tierDownAfterMs; for (const [keyStr, lastAccess] of this.hotKeys) { if (lastAccess < threshold) { // Remove from hot storage const key = stringToTuple(keyStr); await this.options.hot.delete(key); this.hotKeys.delete(keyStr); } } } } ``` --- ## Component 4: Client-Side Local Replica For true offline support, clients maintain a local SQLite replica that syncs with the edge. ### Browser (sql.js + IndexedDB persistence) ```typescript // packages/triplit-client-browser/src/local-replica.ts import initSqlJs, { Database } from 'sql.js'; export class BrowserLocalReplica { private db: Database | null = null; async init(): Promise<void> { const SQL = await initSqlJs({ locateFile: (file) => `https://sql.js.org/dist/${file}`, }); // Load from IndexedDB if exists const persisted = await this.loadFromIndexedDB(); this.db = persisted ? new SQL.Database(persisted) : new SQL.Database(); // Initialize schema this.db.run(TRIPLIT_SCHEMA); } async query(sql: string, params?: any[]): Promise<any[]> { return this.db!.exec(sql, params); } async persist(): Promise<void> { const data = this.db!.export(); await this.saveToIndexedDB(data); } private async loadFromIndexedDB(): Promise<Uint8Array | null> { // ... IndexedDB read } private async saveToIndexedDB(data: Uint8Array): Promise<void> { // ... IndexedDB write } } ``` ### React Native / Mobile (expo-sqlite or op-sqlite) ```typescript // packages/triplit-client-mobile/src/local-replica.ts import * as SQLite from 'expo-sqlite'; export class MobileLocalReplica { private db: SQLite.SQLiteDatabase; async init(): Promise<void> { this.db = await SQLite.openDatabaseAsync('triplit-replica.db'); await this.db.execAsync(TRIPLIT_SCHEMA); } async query(sql: string, params?: any[]): Promise<any[]> { return this.db.getAllAsync(sql, params); } // No explicit persist needed - SQLite writes to disk } ``` ### Sync Protocol with Local Replica ```typescript // packages/triplit-client/src/sync-manager.ts export class SyncManager { private localReplica: LocalReplica; private ws: WebSocket | null = null; private pendingWrites: Map<string, Change> = new Map(); constructor( private serverUrl: string, private localReplica: LocalReplica ) {} async connect(): Promise<void> { this.ws = new WebSocket(this.serverUrl); this.ws.onopen = () => { // Send any pending writes this.flushPendingWrites(); // Request sync from last known checkpoint const checkpoint = await this.localReplica.getCheckpoint(); this.ws.send(JSON.stringify({ type: 'SYNC_REQUEST', since: checkpoint, })); }; this.ws.onmessage = async (event) => { const message = JSON.parse(event.data); await this.handleServerMessage(message); }; this.ws.onclose = () => { // Reconnect with exponential backoff setTimeout(() => this.connect(), this.getBackoffDelay()); }; } async write(collection: string, entityId: string, changes: any): Promise<void> { const changeId = crypto.randomUUID(); // 1. Apply optimistically to local replica await this.localReplica.applyChanges(collection, entityId, changes); // 2. Queue for sync this.pendingWrites.set(changeId, { collection, entityId, changes }); // 3. Try to send immediately if connected if (this.ws?.readyState === WebSocket.OPEN) { this.sendChange(changeId, { collection, entityId, changes }); } // 4. Persist pending writes for crash recovery await this.persistPendingWrites(); } private async handleServerMessage(message: SyncMessage): Promise<void> { switch (message.type) { case 'CHANGES': // Apply server changes to local replica for (const change of message.changes) { await this.localReplica.applyChanges( change.collection, change.entityId, change.changes, change.timestamp ); } // Update checkpoint await this.localReplica.setCheckpoint(message.checkpoint); break; case 'ACK': // Remove from pending writes this.pendingWrites.delete(message.changeId); await this.persistPendingWrites(); break; case 'REJECT': // Rollback optimistic update await this.localReplica.rollback(message.changeId); this.pendingWrites.delete(message.changeId); break; } } } ``` --- ## Complete Client Architecture ```typescript // packages/triplit-client-cloudflare/src/index.ts import { TriplitClient } from '@triplit/client'; import { BrowserLocalReplica } from './local-replica'; import { SyncManager } from './sync-manager'; import { D1QueryAdapter } from './d1-query-adapter'; interface CloudflareTriplitClientOptions { /** Cloudflare Worker URL for sync */ syncUrl: string; /** D1 database for direct queries (optional, for Workers) */ d1?: D1Database; /** Durable Object stub (optional, for Workers calling DO directly) */ doStub?: DurableObjectStub; /** CRDT plugins to register */ plugins?: CRDTPluginInterface[]; } export async function createCloudflareTriplitClient( options: CloudflareTriplitClientOptions ): Promise<TriplitClient> { // Initialize local replica const localReplica = new BrowserLocalReplica(); await localReplica.init(); // Set up sync manager const syncManager = new SyncManager(options.syncUrl, localReplica); // Create Triplit client with local storage const client = new TriplitClient({ storage: new LocalReplicaStorageAdapter(localReplica), plugins: options.plugins ?? [], // Hook into writes to sync onMutation: (collection, entityId, changes) => { syncManager.write(collection, entityId, changes); }, }); // Start sync await syncManager.connect(); return client; } ``` --- ## Deployment Configuration ### wrangler.toml ```toml name = "triplit-edge" main = "src/index.ts" compatibility_date = "2024-01-01" # Durable Objects [durable_objects] bindings = [ { name = "TRIPLIT_DO", class_name = "TriplitSyncDO" } ] [[migrations]] tag = "v1" new_classes = ["TriplitSyncDO"] # D1 Database [[d1_databases]] binding = "D1_DATABASE" database_name = "triplit-db" database_id = "xxxxx-xxxx-xxxx-xxxx" # Environment variables [vars] JWT_SECRET = "your-secret-here" ``` --- ## Summary: What You Get | Feature | DO Only | D1 Only | DO + D1 Hybrid | |---------|---------|---------|----------------| | Real-time sync | ✅ WebSockets | ❌ Polling | ✅ WebSockets | | Global edge distribution | ✅ | ✅ | ✅ | | Queryable SQL | ❌ KV only | ✅ | ✅ | | Large datasets | ⚠️ 128KB limit/key | ✅ | ✅ | | Hibernation (cost) | ✅ | N/A | ✅ | | Offline support | ✅ Client replica | ✅ Client replica | ✅ Client replica | | Automatic replication | ❌ | ✅ D1 replicas | ✅ | | CRDT plugin support | ✅ | ✅ | ✅ | --- ## Recommended Approach **Start with DO + D1 Hybrid:** 1. Durable Objects handle WebSocket connections and real-time coordination 2. D1 provides persistent storage and complex queries 3. Tiered storage keeps hot data in DO, persists to D1 4. Client-side SQLite replicas enable full offline support This gives you: - ✅ Global edge deployment (Cloudflare network) - ✅ Real-time sync (DO WebSockets) - ✅ Queryable SQL (D1) - ✅ Offline read/write (client SQLite) - ✅ Automatic conflict resolution (Triplit property LWW + CRDT plugins) - ✅ Postgres alternative that scales globally The hybrid approach adds minimal complexity over DO-only while unlocking D1's query capabilities—worth it for most applications. # Adding Postgres Support to Triplit ## Overview Triplit's storage layer is pluggable. Current adapters include SQLite, IndexedDB, LMDB, and memory. This document specifies a Postgres adapter that enables: - Server-side Triplit with Postgres persistence - Queryable data alongside Triplit's sync - Familiar Postgres tooling (backups, monitoring, extensions) - Horizontal scaling via read replicas --- ## Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Triplit Server │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Query │ │ Entity │ │ Sync │ │ │ │ Engine │───▶│ Store │◀───│ Manager │ │ │ └──────────────┘ └──────┬───────┘ └──────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Storage Adapter │ ◀── Interface │ │ │ (Postgres) │ │ │ └────────┬────────┘ │ │ │ │ └─────────────────────────────┼────────────────────────────────────┘ │ ▼ ┌─────────────────┐ │ Postgres │ │ │ │ ┌───────────┐ │ │ │ triples │ │ │ ├───────────┤ │ │ │ metadata │ │ │ ├───────────┤ │ │ │ dkvr │ │ │ ├───────────┤ │ │ │projections│ │ │ └───────────┘ │ └─────────────────┘ ``` --- ## Part 1: Triplit's Storage Interface First, let's understand the interface we need to implement. Based on Triplit's existing adapters: ```typescript // From @triplit/db/storage/adapter-sqlite export interface AsyncSQLiteAdapter { execute(sql: string, args?: any[]): Promise<QueryResult>; normalizeResults(results: any): { key: string; value: string }[]; transact(fn: (adapter: AsyncSQLiteExecutor) => Promise<void>): Promise<void>; close(): Promise<void>; } export interface AsyncSQLiteExecutor { execute(sql: string, args?: any[]): Promise<QueryResult>; } export interface QueryResult { rows?: any[]; rowsAffected?: number; lastInsertRowId?: number; } ``` However, Triplit also has a higher-level `KeyValueStore` interface that storage adapters can implement directly: ```typescript // Core KV interface used by EntityStore export type Tuple = (string | number)[]; export interface KVStore { get(key: Tuple): Promise<any>; set(key: Tuple, value: any): Promise<void>; delete(key: Tuple): Promise<void>; scan(options: ScanOptions): AsyncIterable<[Tuple, any]>; transact<T>(fn: (tx: KVStoreTransaction) => Promise<T>): Promise<T>; } export interface ScanOptions { prefix?: Tuple; start?: Tuple; end?: Tuple; limit?: number; reverse?: boolean; } export interface KVStoreTransaction extends KVStore { commit(): Promise<void>; rollback(): Promise<void>; } ``` We'll implement both: a low-level SQL adapter and a high-level KV adapter. --- ## Part 2: Postgres Schema ```sql -- packages/cojson-storage-postgres/schema.sql -- ============================================================================ -- CORE TABLES -- ============================================================================ -- Key-value store for Triplit's tuple-based storage -- This is the foundational table that mirrors Triplit's KV abstraction CREATE TABLE kv_store ( key_tuple JSONB NOT NULL PRIMARY KEY, -- Tuple as JSON array value JSONB, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- Optimized index for prefix scans (most common operation) CREATE INDEX idx_kv_prefix ON kv_store USING GIN (key_tuple jsonb_path_ops); -- Index for range scans CREATE INDEX idx_kv_key ON kv_store USING BTREE ((key_tuple::text)); -- ============================================================================ -- DENORMALIZED TABLES (Optional, for query performance) -- ============================================================================ -- Entity metadata for fast collection scans CREATE TABLE entities ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id) ); CREATE INDEX idx_entities_collection ON entities (collection); CREATE INDEX idx_entities_updated ON entities (updated_at DESC); -- Triples table (denormalized view of entity attributes) -- Enables SQL queries on Triplit data CREATE TABLE triples ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, value JSONB, timestamp_hlc TEXT NOT NULL, -- HLC for ordering actor_id TEXT NOT NULL, tombstone BOOLEAN DEFAULT FALSE, created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc) ); CREATE INDEX idx_triples_entity ON triples (collection, entity_id); CREATE INDEX idx_triples_attribute ON triples (collection, attribute); CREATE INDEX idx_triples_value ON triples USING GIN (value jsonb_path_ops); CREATE INDEX idx_triples_active ON triples (collection, entity_id, attribute) WHERE tombstone = FALSE; -- DKVR entries for decomposed CRDTs (lists, text, etc.) CREATE TABLE dkvr_entries ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, key_suffix JSONB NOT NULL, -- Sub-key tuple value JSONB NOT NULL, updated_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id, attribute, key_suffix) ); CREATE INDEX idx_dkvr_attribute ON dkvr_entries (collection, entity_id, attribute); -- Projections for CRDT query optimization (from plugin API proposal) CREATE TABLE projections ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, projection_key TEXT NOT NULL, value JSONB, updated_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id, attribute, projection_key) ); CREATE INDEX idx_projections_lookup ON projections (collection, attribute, projection_key); CREATE INDEX idx_projections_value ON projections (collection, attribute, projection_key, value); -- ============================================================================ -- SYNC STATE TABLES -- ============================================================================ -- Client sync checkpoints CREATE TABLE sync_checkpoints ( client_id TEXT NOT NULL PRIMARY KEY, checkpoint JSONB NOT NULL, -- Vector clock or sequence numbers last_sync_at TIMESTAMPTZ DEFAULT NOW() ); -- Outbox for pending changes (optional, for guaranteed delivery) CREATE TABLE outbox ( id BIGSERIAL PRIMARY KEY, collection TEXT NOT NULL, entity_id TEXT NOT NULL, change_payload JSONB NOT NULL, timestamp_hlc TEXT NOT NULL, actor_id TEXT NOT NULL, status TEXT DEFAULT 'pending', -- pending, delivered, failed created_at TIMESTAMPTZ DEFAULT NOW(), delivered_at TIMESTAMPTZ ); CREATE INDEX idx_outbox_pending ON outbox (status, created_at) WHERE status = 'pending'; -- ============================================================================ -- FUNCTIONS -- ============================================================================ -- Function to update timestamps automatically CREATE OR REPLACE FUNCTION update_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; -- Apply to tables CREATE TRIGGER trg_kv_store_updated BEFORE UPDATE ON kv_store FOR EACH ROW EXECUTE FUNCTION update_updated_at(); CREATE TRIGGER trg_entities_updated BEFORE UPDATE ON entities FOR EACH ROW EXECUTE FUNCTION update_updated_at(); -- Function for tuple comparison (for range scans) CREATE OR REPLACE FUNCTION tuple_compare(a JSONB, b JSONB) RETURNS INTEGER AS $$ DECLARE len_a INTEGER := jsonb_array_length(a); len_b INTEGER := jsonb_array_length(b); min_len INTEGER := LEAST(len_a, len_b); i INTEGER; elem_a JSONB; elem_b JSONB; BEGIN FOR i IN 0..min_len-1 LOOP elem_a := a->i; elem_b := b->i; -- Compare elements (handles string and number) IF elem_a::text < elem_b::text THEN RETURN -1; ELSIF elem_a::text > elem_b::text THEN RETURN 1; END IF; END LOOP; -- Equal up to min length, shorter tuple comes first RETURN len_a - len_b; END; $$ LANGUAGE plpgsql IMMUTABLE; -- Function to check if tuple starts with prefix CREATE OR REPLACE FUNCTION tuple_has_prefix(tuple JSONB, prefix JSONB) RETURNS BOOLEAN AS $$ DECLARE prefix_len INTEGER := jsonb_array_length(prefix); i INTEGER; BEGIN IF jsonb_array_length(tuple) < prefix_len THEN RETURN FALSE; END IF; FOR i IN 0..prefix_len-1 LOOP IF tuple->i IS DISTINCT FROM prefix->i THEN RETURN FALSE; END IF; END LOOP; RETURN TRUE; END; $$ LANGUAGE plpgsql IMMUTABLE; ``` --- ## Part 3: Postgres Storage Adapter ```typescript // packages/cojson-storage-postgres/src/index.ts import { Pool, PoolClient, QueryResult as PgQueryResult } from 'pg'; // Re-export for convenience export { Pool } from 'pg'; // ============================================================================ // Types // ============================================================================ export type Tuple = (string | number)[]; export interface ScanOptions { prefix?: Tuple; start?: Tuple; end?: Tuple; limit?: number; reverse?: boolean; } export interface KVStore { get(key: Tuple): Promise<any>; set(key: Tuple, value: any): Promise<void>; delete(key: Tuple): Promise<void>; scan(options: ScanOptions): AsyncGenerator<[Tuple, any]>; } export interface KVStoreTransaction extends KVStore { commit(): Promise<void>; rollback(): Promise<void>; } export interface PostgresStorageOptions { /** Postgres connection pool */ pool: Pool; /** * Enable denormalized tables (entities, triples) for SQL queries. * Adds write overhead but enables rich querying. * Default: true */ enableDenormalization?: boolean; /** * Schema name for all Triplit tables. * Default: 'triplit' */ schema?: string; } // ============================================================================ // Main Adapter // ============================================================================ export class PostgresStorageAdapter implements KVStore { private pool: Pool; private schema: string; private enableDenormalization: boolean; constructor(options: PostgresStorageOptions) { this.pool = options.pool; this.schema = options.schema ?? 'public'; this.enableDenormalization = options.enableDenormalization ?? true; } // -------------------------------------------------------------------------- // Initialization // -------------------------------------------------------------------------- async initialize(): Promise<void> { // Run schema migrations const client = await this.pool.connect(); try { await client.query('BEGIN'); // Check if schema exists, create if not await client.query(` CREATE SCHEMA IF NOT EXISTS ${this.schema} `); // Create tables (idempotent) await this.createTables(client); await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } private async createTables(client: PoolClient): Promise<void> { // KV store table await client.query(` CREATE TABLE IF NOT EXISTS ${this.schema}.kv_store ( key_tuple JSONB NOT NULL PRIMARY KEY, value JSONB, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) `); // Create indexes await client.query(` CREATE INDEX IF NOT EXISTS idx_kv_prefix ON ${this.schema}.kv_store USING GIN (key_tuple jsonb_path_ops) `); if (this.enableDenormalization) { // Entities table await client.query(` CREATE TABLE IF NOT EXISTS ${this.schema}.entities ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id) ) `); // Triples table await client.query(` CREATE TABLE IF NOT EXISTS ${this.schema}.triples ( collection TEXT NOT NULL, entity_id TEXT NOT NULL, attribute TEXT NOT NULL, value JSONB, timestamp_hlc TEXT NOT NULL, actor_id TEXT NOT NULL, tombstone BOOLEAN DEFAULT FALSE, created_at TIMESTAMPTZ DEFAULT NOW(), PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc) ) `); // Indexes for triples await client.query(` CREATE INDEX IF NOT EXISTS idx_triples_entity ON ${this.schema}.triples (collection, entity_id) `); await client.query(` CREATE INDEX IF NOT EXISTS idx_triples_value ON ${this.schema}.triples USING GIN (value jsonb_path_ops) `); } } // -------------------------------------------------------------------------- // Core KV Operations // -------------------------------------------------------------------------- async get(key: Tuple): Promise<any> { const result = await this.pool.query( `SELECT value FROM ${this.schema}.kv_store WHERE key_tuple = $1`, [JSON.stringify(key)] ); if (result.rows.length === 0) { return undefined; } return result.rows[0].value; } async set(key: Tuple, value: any): Promise<void> { await this.pool.query( ` INSERT INTO ${this.schema}.kv_store (key_tuple, value) VALUES ($1, $2) ON CONFLICT (key_tuple) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW() `, [JSON.stringify(key), JSON.stringify(value)] ); // Denormalize if enabled if (this.enableDenormalization) { await this.denormalizeWrite(key, value); } } async delete(key: Tuple): Promise<void> { await this.pool.query( `DELETE FROM ${this.schema}.kv_store WHERE key_tuple = $1`, [JSON.stringify(key)] ); // Clean up denormalized data if (this.enableDenormalization) { await this.denormalizeDelete(key); } } async *scan(options: ScanOptions): AsyncGenerator<[Tuple, any]> { const { prefix, start, end, limit, reverse } = options; let query = `SELECT key_tuple, value FROM ${this.schema}.kv_store WHERE 1=1`; const params: any[] = []; let paramIndex = 1; // Prefix filter using JSONB containment if (prefix && prefix.length > 0) { // Use a custom function or build the prefix check query += ` AND ${this.schema}.tuple_has_prefix(key_tuple, $${paramIndex})`; params.push(JSON.stringify(prefix)); paramIndex++; } // Range filters if (start) { query += ` AND key_tuple::text >= $${paramIndex}::text`; params.push(JSON.stringify(start)); paramIndex++; } if (end) { query += ` AND key_tuple::text < $${paramIndex}::text`; params.push(JSON.stringify(end)); paramIndex++; } // Ordering query += ` ORDER BY key_tuple::text ${reverse ? 'DESC' : 'ASC'}`; // Limit if (limit !== undefined) { query += ` LIMIT $${paramIndex}`; params.push(limit); } const result = await this.pool.query(query, params); for (const row of result.rows) { const key = typeof row.key_tuple === 'string' ? JSON.parse(row.key_tuple) : row.key_tuple; yield [key as Tuple, row.value]; } } // -------------------------------------------------------------------------- // Transactions // -------------------------------------------------------------------------- async transact<T>(fn: (tx: PostgresTransaction) => Promise<T>): Promise<T> { const client = await this.pool.connect(); try { await client.query('BEGIN'); const tx = new PostgresTransaction(client, this.schema, this.enableDenormalization); const result = await fn(tx); await client.query('COMMIT'); return result; } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } // -------------------------------------------------------------------------- // Denormalization // -------------------------------------------------------------------------- private async denormalizeWrite(key: Tuple, value: any): Promise<void> { // Key format: ['entities', collection, entityId, attribute, ...] if (key[0] !== 'entities' || key.length < 4) { return; } const [, collection, entityId, attribute] = key as [string, string, string, string]; // Upsert entity await this.pool.query( ` INSERT INTO ${this.schema}.entities (collection, entity_id) VALUES ($1, $2) ON CONFLICT (collection, entity_id) DO UPDATE SET updated_at = NOW() `, [collection, entityId] ); // Insert triple (with HLC from value if present) const hlc = value?._timestamp ?? Date.now().toString(); const actorId = value?._actorId ?? 'unknown'; const tripleValue = value?._value ?? value; await this.pool.query( ` INSERT INTO ${this.schema}.triples (collection, entity_id, attribute, value, timestamp_hlc, actor_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (collection, entity_id, attribute, timestamp_hlc) DO UPDATE SET value = EXCLUDED.value `, [collection, entityId, attribute, JSON.stringify(tripleValue), hlc, actorId] ); } private async denormalizeDelete(key: Tuple): Promise<void> { if (key[0] !== 'entities' || key.length < 4) { return; } const [, collection, entityId, attribute] = key as [string, string, string, string]; // Mark triple as tombstoned (don't actually delete for history) await this.pool.query( ` UPDATE ${this.schema}.triples SET tombstone = TRUE WHERE collection = $1 AND entity_id = $2 AND attribute = $3 `, [collection, entityId, attribute] ); } // -------------------------------------------------------------------------- // Query Helpers (for direct SQL access) // -------------------------------------------------------------------------- /** * Query entities directly via SQL. * Useful for analytics, admin dashboards, etc. */ async queryEntities( collection: string, options?: { where?: Record<string, any>; orderBy?: string; limit?: number; offset?: number; } ): Promise<any[]> { if (!this.enableDenormalization) { throw new Error('Direct queries require enableDenormalization: true'); } let query = ` SELECT DISTINCT ON (e.entity_id) e.entity_id, e.created_at, e.updated_at, jsonb_object_agg(t.attribute, t.value) as data FROM ${this.schema}.entities e LEFT JOIN ${this.schema}.triples t ON e.collection = t.collection AND e.entity_id = t.entity_id AND t.tombstone = FALSE WHERE e.collection = $1 `; const params: any[] = [collection]; let paramIndex = 2; // Add WHERE conditions if (options?.where) { for (const [attr, value] of Object.entries(options.where)) { query += ` AND EXISTS ( SELECT 1 FROM ${this.schema}.triples t2 WHERE t2.collection = e.collection AND t2.entity_id = e.entity_id AND t2.attribute = $${paramIndex} AND t2.value = $${paramIndex + 1} AND t2.tombstone = FALSE )`; params.push(attr, JSON.stringify(value)); paramIndex += 2; } } query += ` GROUP BY e.entity_id, e.created_at, e.updated_at`; // Order by if (options?.orderBy) { query += ` ORDER BY ${options.orderBy}`; } // Pagination if (options?.limit) { query += ` LIMIT $${paramIndex}`; params.push(options.limit); paramIndex++; } if (options?.offset) { query += ` OFFSET $${paramIndex}`; params.push(options.offset); } const result = await this.pool.query(query, params); return result.rows.map(row => ({ id: row.entity_id, ...row.data, _meta: { createdAt: row.created_at, updatedAt: row.updated_at, }, })); } // -------------------------------------------------------------------------- // Lifecycle // -------------------------------------------------------------------------- async close(): Promise<void> { await this.pool.end(); } } // ============================================================================ // Transaction Implementation // ============================================================================ class PostgresTransaction implements KVStoreTransaction { private committed = false; private rolledBack = false; constructor( private client: PoolClient, private schema: string, private enableDenormalization: boolean ) {} async get(key: Tuple): Promise<any> { this.checkState(); const result = await this.client.query( `SELECT value FROM ${this.schema}.kv_store WHERE key_tuple = $1`, [JSON.stringify(key)] ); return result.rows[0]?.value; } async set(key: Tuple, value: any): Promise<void> { this.checkState(); await this.client.query( ` INSERT INTO ${this.schema}.kv_store (key_tuple, value) VALUES ($1, $2) ON CONFLICT (key_tuple) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW() `, [JSON.stringify(key), JSON.stringify(value)] ); if (this.enableDenormalization) { await this.denormalizeWrite(key, value); } } async delete(key: Tuple): Promise<void> { this.checkState(); await this.client.query( `DELETE FROM ${this.schema}.kv_store WHERE key_tuple = $1`, [JSON.stringify(key)] ); } async *scan(options: ScanOptions): AsyncGenerator<[Tuple, any]> { this.checkState(); // Same implementation as main adapter but using this.client const { prefix, limit, reverse } = options; let query = `SELECT key_tuple, value FROM ${this.schema}.kv_store WHERE 1=1`; const params: any[] = []; let paramIndex = 1; if (prefix && prefix.length > 0) { query += ` AND ${this.schema}.tuple_has_prefix(key_tuple, $${paramIndex})`; params.push(JSON.stringify(prefix)); paramIndex++; } query += ` ORDER BY key_tuple::text ${reverse ? 'DESC' : 'ASC'}`; if (limit !== undefined) { query += ` LIMIT $${paramIndex}`; params.push(limit); } const result = await this.client.query(query, params); for (const row of result.rows) { yield [row.key_tuple as Tuple, row.value]; } } async commit(): Promise<void> { // Actual commit happens in transact() wrapper this.committed = true; } async rollback(): Promise<void> { this.rolledBack = true; throw new Error('Transaction rollback requested'); } private checkState(): void { if (this.committed) { throw new Error('Transaction already committed'); } if (this.rolledBack) { throw new Error('Transaction already rolled back'); } } private async denormalizeWrite(key: Tuple, value: any): Promise<void> { if (key[0] !== 'entities' || key.length < 4) { return; } const [, collection, entityId, attribute] = key as [string, string, string, string]; await this.client.query( ` INSERT INTO ${this.schema}.entities (collection, entity_id) VALUES ($1, $2) ON CONFLICT (collection, entity_id) DO UPDATE SET updated_at = NOW() `, [collection, entityId] ); const hlc = value?._timestamp ?? Date.now().toString(); const actorId = value?._actorId ?? 'unknown'; const tripleValue = value?._value ?? value; await this.client.query( ` INSERT INTO ${this.schema}.triples (collection, entity_id, attribute, value, timestamp_hlc, actor_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (collection, entity_id, attribute, timestamp_hlc) DO UPDATE SET value = EXCLUDED.value `, [collection, entityId, attribute, JSON.stringify(tripleValue), hlc, actorId] ); } } ``` --- ## Part 4: Triplit Server Integration ```typescript // packages/cojson-storage-postgres/src/server.ts import { createServer, TriplitServer } from '@triplit/server-core'; import { PostgresStorageAdapter } from './index'; import { Pool } from 'pg'; export interface PostgresTriplitServerOptions { /** Postgres connection string */ connectionString?: string; /** Or provide a pre-configured pool */ pool?: Pool; /** Port to listen on */ port?: number; /** JWT secret for auth */ jwtSecret: string; /** CRDT plugins to register */ plugins?: any[]; /** Enable verbose logging */ verboseLogs?: boolean; } export async function createPostgresTriplitServer( options: PostgresTriplitServerOptions ): Promise<TriplitServer> { // Create pool if not provided const pool = options.pool ?? new Pool({ connectionString: options.connectionString, max: 20, idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, }); // Create storage adapter const storage = new PostgresStorageAdapter({ pool, enableDenormalization: true, }); // Initialize schema await storage.initialize(); // Create Triplit server with Postgres storage const server = createServer({ storage, jwtSecret: options.jwtSecret, plugins: options.plugins ?? [], verboseLogs: options.verboseLogs ?? false, }); return server; } // Convenience function for common setup export async function startPostgresTriplitServer( options: PostgresTriplitServerOptions ): Promise<{ server: TriplitServer; close: () => Promise<void> }> { const server = await createPostgresTriplitServer(options); const port = options.port ?? 8080; const httpServer = server.listen(port); console.log(`Triplit server (Postgres) running on port ${port}`); return { server, close: async () => { httpServer.close(); await server.close(); }, }; } ``` --- ## Part 5: Usage Examples ### Basic Server Setup ```typescript // server.ts import { startPostgresTriplitServer } from '@triplit/storage-postgres'; import { GCounterPlugin, RGAListPlugin } from './plugins'; async function main() { const { server, close } = await startPostgresTriplitServer({ connectionString: process.env.DATABASE_URL, port: 6543, jwtSecret: process.env.JWT_SECRET!, plugins: [GCounterPlugin, RGAListPlugin], verboseLogs: true, }); // Graceful shutdown process.on('SIGINT', async () => { console.log('Shutting down...'); await close(); process.exit(0); }); } main().catch(console.error); ``` ### Direct SQL Queries ```typescript // admin-api.ts import { PostgresStorageAdapter } from '@triplit/storage-postgres'; import { Pool } from 'pg'; const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const storage = new PostgresStorageAdapter({ pool, enableDenormalization: true }); // Query todos via SQL (bypassing Triplit for read-only analytics) async function getTodoStats() { const result = await pool.query(` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE value->>'completed' = 'true') as completed, COUNT(*) FILTER (WHERE value->>'completed' = 'false') as pending FROM triplit.triples WHERE collection = 'todos' AND attribute = 'completed' AND tombstone = FALSE `); return result.rows[0]; } // Use the helper for structured queries async function getRecentTodos(limit = 10) { return storage.queryEntities('todos', { orderBy: 'e.updated_at DESC', limit, }); } // Full-text search using Postgres async function searchTodos(searchTerm: string) { const result = await pool.query(` SELECT DISTINCT e.entity_id, t.value->>'text' as text FROM triplit.entities e JOIN triplit.triples t ON e.entity_id = t.entity_id WHERE e.collection = 'todos' AND t.attribute = 'text' AND t.value->>'text' ILIKE $1 AND t.tombstone = FALSE `, [`%${searchTerm}%`]); return result.rows; } ``` ### Client Connection ```typescript // client.ts import { TriplitClient } from '@triplit/client'; import { schema } from './schema'; const client = new TriplitClient({ serverUrl: 'http://localhost:6543', token: 'your-jwt-token', schema, }); // Normal Triplit operations - storage is transparent await client.insert('todos', { id: 'todo-1', text: 'Buy groceries', completed: false, }); // Subscribe to changes const unsubscribe = client.subscribe( client.query('todos').where('completed', '=', false), (results) => { console.log('Pending todos:', results); } ); ``` ### Docker Compose Setup ```yaml # docker-compose.yml version: '3.8' services: postgres: image: postgres:16-alpine environment: POSTGRES_USER: triplit POSTGRES_PASSWORD: triplit POSTGRES_DB: triplit volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U triplit"] interval: 5s timeout: 5s retries: 5 triplit: build: . environment: DATABASE_URL: postgres://triplit:triplit@postgres:5432/triplit JWT_SECRET: your-secret-here PORT: 6543 ports: - "6543:6543" depends_on: postgres: condition: service_healthy volumes: postgres_data: ``` --- ## Part 6: SQL Compatibility Layer (Optional) For scenarios where you want to use Triplit's existing SQLite-based code paths: ```typescript // packages/cojson-storage-postgres/src/sql-compat.ts /** * Adapter that translates SQLite SQL to Postgres SQL. * Useful for compatibility with Triplit's existing SQL-based storage code. */ export class PostgresSQLiteCompatAdapter implements AsyncSQLiteAdapter { constructor(private pool: Pool) {} async execute(sql: string, args?: any[]): Promise<QueryResult> { const pgSql = this.translateSQL(sql); const pgArgs = this.translateArgs(args); const result = await this.pool.query(pgSql, pgArgs); return { rows: result.rows, rowsAffected: result.rowCount ?? 0, }; } normalizeResults(results: any): { key: string; value: string }[] { if (!results.rows) return []; return results.rows.map((row: any) => ({ key: row.key ?? row.key_tuple, value: typeof row.value === 'string' ? row.value : JSON.stringify(row.value), })); } async transact(fn: (adapter: AsyncSQLiteExecutor) => Promise<void>): Promise<void> { const client = await this.pool.connect(); try { await client.query('BEGIN'); const executor: AsyncSQLiteExecutor = { execute: async (sql, args) => { const pgSql = this.translateSQL(sql); const pgArgs = this.translateArgs(args); const result = await client.query(pgSql, pgArgs); return { rows: result.rows, rowsAffected: result.rowCount ?? 0 }; }, }; await fn(executor); await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } } async close(): Promise<void> { await this.pool.end(); } // -------------------------------------------------------------------------- // SQL Translation // -------------------------------------------------------------------------- private translateSQL(sql: string): string { let pgSql = sql; // Replace ? placeholders with $1, $2, etc. let paramIndex = 0; pgSql = pgSql.replace(/\?/g, () => `$${++paramIndex}`); // SQLite -> Postgres syntax differences const translations: [RegExp, string][] = [ // AUTOINCREMENT -> SERIAL (already handled in schema) [/\bAUTOINCREMENT\b/gi, ''], // INTEGER PRIMARY KEY -> SERIAL PRIMARY KEY [/\bINTEGER PRIMARY KEY\b/gi, 'SERIAL PRIMARY KEY'], // BOOLEAN handling (SQLite uses 0/1) [/\b(\w+)\s*=\s*1\b/g, '$1 = TRUE'], [/\b(\w+)\s*=\s*0\b/g, '$1 = FALSE'], // String concatenation [/\|\|/g, '||'], // Same in Postgres, but ensure it's not confused // GLOB -> LIKE (approximate) [/\bGLOB\b/gi, 'LIKE'], // datetime functions [/\bdatetime\s*\(\s*'now'\s*\)/gi, 'NOW()'], [/\bstrftime\s*\(\s*'%s'\s*,\s*'now'\s*\)/gi, "EXTRACT(EPOCH FROM NOW())"], // IFNULL -> COALESCE [/\bIFNULL\s*\(/gi, 'COALESCE('], // GROUP_CONCAT -> STRING_AGG [/\bGROUP_CONCAT\s*\(\s*(\w+)\s*\)/gi, 'STRING_AGG($1::text, \',\')'], // JSON functions [/\bjson_extract\s*\(\s*(\w+)\s*,\s*'\$\.(\w+)'\s*\)/gi, "$1->>'$2'"], [/\bjson\s*\(/gi, 'to_jsonb('], ]; for (const [pattern, replacement] of translations) { pgSql = pgSql.replace(pattern, replacement); } return pgSql; } private translateArgs(args?: any[]): any[] { if (!args) return []; return args.map(arg => { // Convert booleans to Postgres format if (typeof arg === 'boolean') { return arg; } // Convert objects to JSON strings if (typeof arg === 'object' && arg !== null) { return JSON.stringify(arg); } return arg; }); } } ``` --- ## Part 7: Migration Tool ```typescript // packages/cojson-storage-postgres/src/migrate.ts import { Pool } from 'pg'; import Database from 'better-sqlite3'; interface MigrationOptions { /** SQLite database path */ sqlitePath: string; /** Postgres connection string */ postgresUrl: string; /** Batch size for inserts */ batchSize?: number; /** Progress callback */ onProgress?: (processed: number, total: number) => void; } /** * Migrate data from SQLite Triplit storage to Postgres. */ export async function migrateToPostgres(options: MigrationOptions): Promise<void> { const { sqlitePath, postgresUrl, batchSize = 1000, onProgress } = options; // Open SQLite database const sqlite = new Database(sqlitePath, { readonly: true }); // Connect to Postgres const pool = new Pool({ connectionString: postgresUrl }); try { // Get total count const countResult = sqlite.prepare('SELECT COUNT(*) as count FROM kv_store').get() as { count: number }; const total = countResult.count; console.log(`Migrating ${total} records from SQLite to Postgres...`); // Create tables in Postgres const client = awai