# Triplit Plugin
**Motivation:**
Triplit provides a powerful foundation for building real-time, offline-capable applications using basic data types (string, number, boolean, date, set). However, to unlock its full potential for richer collaborative experiences like those found in document editors, design tools, or applications with specialized counters or ordered lists, we need a way to extend its core data modeling capabilities.
Currently, adding new data types with custom merge logic (like CRDTs) requires modifying the core `@triplit/db`, `@triplit/client`, and `@triplit/server-core` packages. This presents a high barrier for both the core team and the community to introduce specialized types tailored to specific application needs.
This proposal outlines a **Plugin API** designed to allow developers to define, implement, and register custom CRDTs or other mergeable data structures within Triplit. This would empower developers to:
1. Implement complex collaborative features natively within Triplit's data model.
2. Leverage specialized CRDTs optimized for specific use cases (e.g., performance characteristics of different sequence CRDTs).
3. Foster a community ecosystem where different CRDT plugins can be shared and reused.
4. Keep the Triplit core leaner while enabling broad extensibility.
**Goals:**
1. **Define a Clear Interface:** Specify a robust TypeScript interface (`CRDTPluginInterface`) outlining the contract between Triplit core and a custom data type plugin.
2. **Enable Custom Logic:** Allow plugins to define their unique state representation (`TState`), CRDT merge algorithm (`merge`), and client-facing mutation logic (`generateChangePayload`).
3. **Flexible Storage Strategies:** Explicitly support both:
* **State Blob:** Storing the CRDT state as a single opaque value in the KV store (simpler, suitable for types like Counters).
* **Decomposed KV Representation (DKVR):** Allowing the CRDT state to be mapped across multiple KV entries (more complex, potentially more efficient for types like Lists or Text).
4. **Core Integration:** Define how Triplit's schema system, storage layer, transaction/merge logic, client API, sync protocol, and query engine will interact with registered plugins.
5. **Maintain Guarantees:** Ensure the plugin system upholds Triplit's core principles of offline-first operation, optimistic updates, HLC timestamping for consistency, and real-time synchronization.
6. **Basic Query Support:** Provide optional hooks within the plugin interface to enable basic filtering and sorting capabilities for custom types within Triplit queries.
**Non-Goals (Initial Version):**
* **Highly Optimized IVM for DKVR:** While the API allows DKVR, deeply optimizing the core Incremental View Maintenance for arbitrary decomposed structures is complex and deferred. Initial IVM might treat any change within a DKVR attribute's KV prefix as potentially invalidating dependent queries.
* **Automatic Client Proxy Generation:** Automatically mapping standard JavaScript methods (e.g., `Array.prototype.push`) to custom CRDT operations is complex. The initial focus is on supporting explicit API methods initiated by application code.
* **Complex Validation/UI Metadata in Core API:** Integrating external validation libraries (like Standard Schema) or UI component bindings directly. This functionality is better suited for complementary schema extension libraries built *on top* of this core data API.
**Core Concepts & Integration Strategy:**
This API leverages Triplit's existing architecture:
* **KV Store Abstraction:** Plugins define how their `TState` maps onto `TStoredValue` (or multiple KV pairs for DKVR), interacting via the standard `KVStore` interface.
* **Schema Definition:** Plugins provide a `schemaDefinition` function used by `Schema as S` (e.g., `S.MyCounter = MyCounterPlugin.schemaDefinition`).
* **Merge Logic:** The plugin's `merge` function encapsulates the CRDT algorithm. Triplit's `EntityStore` orchestrates fetching current state, calling `plugin.merge` with states and HLC timestamps, and writing the result back atomically within a transaction. For DKVR, the `merge` function receives the `KVStoreTransaction` to perform multi-key operations.
* **Client API:** Applications will use explicit methods (e.g., `client.incrementCounter(...)`) which internally call `plugin.generateChangePayload` to create the correct delta/state (`TChangePayload`) for the transaction.
* **Synchronization:** The `TChangePayload` is stored in the outbox and transmitted. The server uses the corresponding registered plugin's `merge` function to apply changes.
* **Querying:** Basic filtering/sorting is enabled via optional plugin methods (`compare`, `satisfiesOperator`), which may require state deserialization.
**Proposed API: `CRDTPluginInterface<TState, TSchemaOptions, TStoredValue, TChangePayload>`**
```typescript
import { DataType, TypeConfig, Timestamp, KVStoreTransaction, DBEntity, Tuple } from '@triplit/db';
/** Structure for DKVR storage operations. */
export type DKVRMutationPayload = {
sets?: Map<Tuple, any>; // Map<KeySuffixTuple, Value>
deletes?: Set<Tuple>; // Set<KeySuffixTuple>
};
/** Input type for decodeFromStorage when using DKVR. */
export type DKVRScanResult = AsyncIterable<[keySuffix: Tuple, value: any]>;
/**
* Defines the contract for a custom CRDT data type plugin.
*
* @template TState - The canonical in-memory JavaScript representation (e.g., number for Counter).
* @template TSchemaOptions - User-defined options via Schema builder (e.g., S.MyType(options)).
* @template TStoredValue - How state is stored in KV (single value for 'blob', iterable value type for 'decomposed').
* @template TChangePayload - Representation of a change/delta for merge/sync (often TState for LWW, specific Op for op-based).
*/
export interface CRDTPluginInterface<
TState,
TSchemaOptions = any,
TStoredValue = any,
TChangePayload = TState
> {
/** REQUIRED: Unique string ID (e.g., "pn-counter", "myorg:rga-list"). Use namespacing. */
readonly typeId: string;
// --- Schema Definition & Validation ---
/** REQUIRED: Returns the base Triplit DataType for use in S.Collections({...}). */
schemaDefinition: (options?: TSchemaOptions) => DataType;
/** OPTIONAL: Validates options passed to schemaDefinition. Returns error string if invalid. */
validateSchemaOptions?: (options: TSchemaOptions) => string | undefined;
// --- Core CRDT Logic ---
/** REQUIRED: Returns the default/initial JS state (TState). */
defaultValue: (config: TypeConfig) => TState;
/**
* REQUIRED: The core CRDT merge function. Combines incoming changes with current state using HLC timestamps.
* For DKVR strategy, this function *must* use the provided kvTx for atomic multi-key reads/writes.
*/
merge(
currentState: TState | undefined,
incomingChangePayload: TChangePayload,
currentTimestamp: Timestamp | undefined,
incomingTimestamp: Timestamp,
actorId: string,
kvTx?: KVStoreTransaction // REQUIRED if storageStrategy is 'decomposed'
): TState;
// --- Storage Strategy & Serialization ---
/** REQUIRED: Specifies 'blob' (single KV value) or 'decomposed' (multiple KV values). */
readonly storageStrategy: 'blob' | 'decomposed';
/**
* REQUIRED: Serializes JS TState for storage.
* - 'blob': Returns TStoredValue (single value).
* - 'decomposed': Returns DKVRMutationPayload { sets?: Map<SuffixTuple, Val>, deletes?: Set<SuffixTuple> }.
*/
encodeForStorage: (state: TState) => TStoredValue | DKVRMutationPayload;
/**
* REQUIRED: Deserializes from storage to JS TState.
* - 'blob': Receives TStoredValue.
* - 'decomposed': Receives DKVRScanResult (AsyncIterable over KV pairs for the attribute).
*/
decodeFromStorage: (storedValue: TStoredValue | DKVRScanResult) => TState;
/** OPTIONAL (Recommended for Decomposed KV): Hints at the KV key structure. Helps core fetch data efficiently. */
readonly kvSchema?: Record<string, Tuple[]>; // e.g., { meta: [['_meta']], element: [['_element', '*']] }
// --- Client Interaction ---
/** REQUIRED (for mutations): Translates user API calls into the internal TChangePayload for merge/sync. */
generateChangePayload(
mutationType: string, // Plugin-defined e.g., "increment"
currentState: TState | undefined,
args: any[], // Arguments from client API call
actorId: string
): TChangePayload;
// --- Optional Querying Hooks ---
/** OPTIONAL: Provides comparison for `ORDER BY`. */
compare?: (stateA: TState, stateB: TState) => number;
/** OPTIONAL: Implements filtering logic for `WHERE`. */
satisfiesOperator?: (state: TState, operator: string, compareValue: any) => boolean;
/** OPTIONAL: Lists operators supported by `satisfiesOperator`. Defaults to =, !=, isDefined. */
supportedOperators?: () => ReadonlyArray<string>;
/** OPTIONAL: Extracts a primitive for basic comparison (e.g., counter value). */
getComparableValue?: (state: TState) => string | number | boolean | null;
// --- Optional Optimization Hooks ---
/** OPTIONAL: Calculates delta for efficient sync. Defaults to sending full state/changes. */
diff?: (oldState: TState, newState: TState) => TChangePayload | null;
/** OPTIONAL (for Decomposed KV): Helps IVM identify relevant low-level KV changes. */
isChangeRelevantToState?: (crdtAttributePath: string[], changedKvKey: string[]) => boolean;
}
```
**Integration Points (Revised):**
1. **Registration:** Via `plugins` array in `TriplitClient`/`createServer` options. Core maintains a `Map<typeId, CRDTPluginInterface>`.
2. **Schema:** `S.<TypeName>` calls `plugin.schemaDefinition`. Validation uses `plugin.validateSchemaOptions`.
3. **Storage/Merge (`EntityStoreKV`):**
* Reads (`getEntity`): Checks `plugin.storageStrategy`. If 'blob', uses `kv.get` then `plugin.decodeFromStorage`. If 'decomposed', uses `kv.scan` (with prefix derived from attribute path) and passes iterator to `plugin.decodeFromStorage`.
* Writes (`applyChangesWithTimestamp`): Identifies custom type via schema. Fetches current state appropriately (get or scan). Calls `plugin.merge` (passing `kvTx` if 'decomposed'). Calls `plugin.encodeForStorage`. Writes result back via `kvTx.set` (for 'blob') or applies the `DKVRMutationPayload`'s sets/deletes atomically using `kvTx.set/delete` (for 'decomposed').
4. **Client API/Transactions:**
* **Recommend Explicit Methods:** `client.incrementCounter(...)`, `client.listPush(...)`, etc.
* Inside `client.transact`: Read current state -> Call `plugin.generateChangePayload` -> Call `tx.update` with raw payload + internal flag `_crdtUpdate: true`.
* `tx.update`: If `_crdtUpdate` flag is present, store raw payload in internal diff map.
5. **Sync Layer:** Uses `plugin.encodeForStorage` or `plugin.diff` for `CHANGES` message payload. Server receives payload, passes to `EntityStore` which invokes `plugin.merge`. Handles new message types if needed for op-based sync.
6. **Query Engine/IVM:** Uses optional query hooks (`compare`, `satisfiesOperator`, etc.) potentially involving `decodeFromStorage`. IVM integration for DKVR relies on future core work, potentially guided by `isChangeRelevantToState`.
**Storage Strategy Implications:**
* **State Blob:** Simpler plugin implementation. `encode/decode` handle single values. `merge` usually doesn't need `kvTx`. Efficient for small/infrequently updated CRDTs (Counters). Querying internals is slow. IVM treats it atomically.
* **Decomposed KV (DKVR):** Advanced. `encode` returns sets/deletes. `decode` handles iterators. `merge` **requires** `kvTx`. Enables granular updates/sync, potentially better for large/complex types (Lists, Text). Querying/IVM integration is significantly more complex for both the plugin author and requires core Triplit enhancements for efficiency.
**Example Plugin Sketch (PN-Counter using State Blob):**
(Similar to previous example, demonstrating the simpler 'blob' strategy)
```typescript
const MyPNCounterPlugin: CRDTPluginInterface<CounterState, CounterOptions, string, CounterState> = {
typeId: 'pn-counter',
storageStrategy: 'blob', // Explicitly state blob strategy
schemaDefinition: (options?) => S.Counter(options),
defaultValue: (config) => ({ p: {}, n: {} }),
merge: (current, incoming /* CounterState */, currentTs, incomingTs, actorId) => {
// LWW merge based on attribute HLC timestamp
if (!currentTs || HybridLogicalClock.compare(incomingTs, currentTs) > 0) {
return incoming;
}
return current;
},
encodeForStorage: (state) => JSON.stringify(state), // Store as JSON string
decodeFromStorage: (storedValue: string) => JSON.parse(storedValue),
generateChangePayload: (mutationType, currentState, args, actorId) => {
if (mutationType !== 'increment') throw new Error('Unsupported counter op');
const amount = args[0] as number;
const newState = structuredClone(currentState ?? { p: {}, n: {} });
if (amount > 0) newState.p[actorId] = (newState.p[actorId] || 0) + amount;
else newState.n[actorId] = (newState.n[actorId] || 0) + Math.abs(amount);
return newState; // Return full new state for LWW merge
},
getComparableValue: (state) => /* Calculate value */,
// ... other optional query hooks
};
```
**Benefits:**
* **True Extensibility:** Enables core data types beyond primitives.
* **Encapsulated Logic:** CRDT complexity lives within plugins.
* **Adaptable Storage:** Supports simple blob storage and provides a path for more complex decomposed storage.
* **Community Potential:** Allows sharing and standardization of common collaborative types.
**Challenges & Open Questions:**
1. **DKVR Implementation Details:** How should `encodeForStorage` signal multi-key writes? How should `decodeFromStorage` reliably receive scanned data? What KV scan options are needed?
2. **IVM for DKVR:** What core changes are minimally required for *basic* IVM support with DKVR? Can `isChangeRelevantToState` be effective without deeper IVM awareness?
3. **Op-based Sync Protocol:** Is a generic `CRDT_OP` message feasible, or are type-specific messages better? How are ops batched efficiently?
4. **Transactionality of Merge:** Ensuring the `merge` function (especially for DKVR reading/writing multiple keys via `kvTx`) remains performant within transactions.
5. **Client API for DKVR:** How to provide an intuitive client API for complex CRDTs using DKVR without overly exposing internal KV structure?
**Next Steps:**
* Gather community feedback on the proposed interface and strategy.
* Refine the API based on feedback, particularly the `encode/decodeForStorage` contract for DKVR.
* Prototype a simple 'blob'-based plugin (like the Counter) to validate core integration points.
* Begin design discussions on the necessary core IVM enhancements required for efficient DKVR support.
---
# Triplit Plugin API: Follow-up Proposal
## Addressing Open Concerns
This document builds on the initial Plugin API proposal, providing concrete solutions for three identified concerns:
1. DKVR + IVM performance
2. Underspecified op-based sync
3. Conflated state-based vs op-based merge semantics
---
## Concern 1: DKVR + IVM Performance
### Problem Statement
The original proposal defers IVM optimization for Decomposed KV Representation (DKVR). This means plugins storing state across multiple KV entries (lists, text CRDTs) will trigger full re-evaluation of dependent queries on any sub-key change—potentially making real-time queries on complex types unusably slow.
### Proposed Solution: Queryable Projections
Rather than teaching IVM to understand arbitrary DKVR structures, we introduce **queryable projections**—plugin-defined materialized values that IVM *can* track efficiently.
```typescript
interface QueryableProjection<TState> {
/** Unique key for this projection (e.g., "length", "firstItem", "checksum") */
readonly key: string;
/** Extract projection value from state. Must be deterministic. */
extract: (state: TState) => string | number | boolean | null;
/** Operators this projection supports in WHERE clauses */
supportedOperators: ReadonlyArray<'=' | '!=' | '>' | '<' | '>=' | '<=' | 'contains'>;
}
```
Plugins declare projections upfront:
```typescript
const RGAListPlugin: CRDTPluginInterface<RGAState, ...> = {
typeId: 'rga-list',
storageStrategy: 'decomposed',
// NEW: Declare what's queryable
projections: [
{
key: 'length',
extract: (state) => state.items.filter(i => !i.tombstone).length,
supportedOperators: ['=', '!=', '>', '<', '>=', '<='],
},
{
key: 'isEmpty',
extract: (state) => state.items.filter(i => !i.tombstone).length === 0,
supportedOperators: ['=', '!='],
},
{
key: 'firstItem',
extract: (state) => state.items.find(i => !i.tombstone)?.value ?? null,
supportedOperators: ['=', '!='],
},
],
// ... rest of plugin
};
```
### Storage Strategy: Co-located Projection Values
When a DKVR plugin writes state, the core also writes projection values to dedicated KV keys:
```
# DKVR state keys (plugin-managed)
['entities', 'tasks', 'task-1', 'items', '_element', 'uuid-a'] → { value: "Buy milk", ... }
['entities', 'tasks', 'task-1', 'items', '_element', 'uuid-b'] → { value: "Walk dog", ... }
# Projection keys (core-managed, derived from plugin.projections)
['entities', 'tasks', 'task-1', 'items', '_proj', 'length'] → 2
['entities', 'tasks', 'task-1', 'items', '_proj', 'isEmpty'] → false
['entities', 'tasks', 'task-1', 'items', '_proj', 'firstItem'] → "Buy milk"
```
### IVM Integration
IVM subscribes to projection keys, not DKVR element keys:
```typescript
// Query: WHERE items.length > 5
// IVM watches: ['entities', 'tasks', '*', 'items', '_proj', 'length']
```
When any DKVR key under an attribute changes:
1. Core calls `plugin.decodeFromStorage()` to get full state
2. Core calls each `projection.extract(state)`
3. Core compares with stored projection values
4. Only if projections changed → write new projection values → IVM triggers
This bounds IVM complexity: it only sees primitive projection changes, never arbitrary DKVR structures.
### Projection Update Contract
```typescript
interface CRDTPluginInterface<...> {
// ... existing fields ...
/**
* OPTIONAL: Queryable projections for IVM efficiency.
* If omitted, queries on this type require full decode (slow).
*/
projections?: ReadonlyArray<QueryableProjection<TState>>;
/**
* OPTIONAL: Hint whether a DKVR mutation might change projections.
* If returns false, core skips projection recalculation.
* Default: always recalculate (safe but slower).
*/
mightAffectProjections?: (
mutationType: string,
args: any[]
) => boolean | Set<string>; // true = all, Set = specific projection keys
}
```
Example optimization:
```typescript
const RGAListPlugin = {
// ...
mightAffectProjections: (mutationType, args) => {
switch (mutationType) {
case 'insert':
case 'delete':
return new Set(['length', 'isEmpty', 'firstItem']);
case 'updateElement':
// Only firstItem might change if we updated index 0
return args[0] === 0 ? new Set(['firstItem']) : false;
default:
return true; // Unknown op, recalculate all
}
},
};
```
### Tradeoffs
| Approach | Query Performance | Write Performance | Plugin Complexity |
|----------|-------------------|-------------------|-------------------|
| No projections | O(n) decode per query | O(1) | Low |
| With projections | O(1) lookup | O(k) projection extraction | Medium |
| Full IVM awareness | O(1) incremental | O(1) | High (requires core changes) |
Projections are the pragmatic middle ground—good query performance without requiring deep core modifications.
## Concern 2: Op-Based Sync Protocol
### Problem Statement
The original proposal asks: "Is a generic CRDT_OP message feasible, or are type-specific messages better?" This needs a concrete answer before implementation.
### Proposed Solution: Envelope Protocol with Strategy Declaration
We define a **type-agnostic envelope** that carries **type-specific payloads**. The sync layer routes messages; plugins interpret contents.
```typescript
/** Sync strategy declared by plugin */
type SyncStrategy = 'state' | 'operation' | 'delta';
/** Wire format for CRDT sync messages */
interface CRDTSyncMessage {
/** Message type identifier */
type: 'CRDT_SYNC';
/** Entity location */
entityId: string;
attributePath: string[];
/** Plugin identifier for routing */
crdtTypeId: string;
/** How to interpret the payload */
strategy: SyncStrategy;
/** HLC timestamp */
timestamp: Timestamp;
/** Originating actor */
actorId: string;
/**
* Plugin-serialized payload. Opaque to sync layer.
* - 'state': Full serialized TState
* - 'operation': Serialized TOp or TOp[]
* - 'delta': Serialized TDelta
*/
payload: string; // JSON-serialized by plugin
/** Optional: Sequence number for op-based CRDTs requiring causal order */
sequenceNumber?: number;
/** Optional: Dependencies for causal delivery */
dependencies?: Array<{ actorId: string; sequenceNumber: number }>;
}
```
### Sync Layer Responsibilities
The sync layer (transport, server-core) handles:
1. **Routing**: Match `crdtTypeId` to registered plugin
2. **Ordering**: Buffer and reorder messages if `dependencies` present
3. **Deduplication**: Use `(entityId, attributePath, actorId, sequenceNumber)` as idempotency key
4. **Batching**: Aggregate multiple `CRDT_SYNC` messages per entity in single WebSocket frame
The sync layer does NOT:
- Parse or understand `payload` contents
- Perform merge logic (that's the plugin's job)
- Make assumptions about CRDT semantics
### Plugin Serialization Contract
```typescript
interface CRDTPluginInterface<TState, TSchemaOptions, TStoredValue, TChangePayload> {
// ... existing fields ...
/** REQUIRED: How this CRDT syncs over the wire */
readonly syncStrategy: SyncStrategy;
/** REQUIRED: Serialize change for network transmission */
serializeForSync: (change: TChangePayload) => string;
/** REQUIRED: Deserialize change from network */
deserializeFromSync: (serialized: string) => TChangePayload;
/**
* OPTIONAL (for 'operation' strategy):
* Declare if operations require causal delivery.
* If true, core tracks vector clocks and buffers out-of-order ops.
*/
requiresCausalDelivery?: boolean;
/**
* OPTIONAL (for 'operation' strategy):
* Compress multiple operations into one (reduce network traffic).
* E.g., [insert('a'), insert('b')] → insertBatch(['a', 'b'])
*/
compressOperations?: (ops: TChangePayload[]) => TChangePayload;
}
```
### Example: State-Based Counter
```typescript
const GCounterPlugin: CRDTPluginInterface<GCounterState, {}, string, GCounterState> = {
typeId: 'g-counter',
syncStrategy: 'state', // Full state sync
serializeForSync: (state) => JSON.stringify(state),
deserializeFromSync: (s) => JSON.parse(s),
// No causal delivery needed—state merge is commutative
};
```
### Example: Op-Based List with Causal Delivery
```typescript
type ListOp =
| { type: 'insert'; id: string; afterId: string | null; value: any }
| { type: 'delete'; id: string };
const RGAListPlugin: CRDTPluginInterface<RGAState, {}, DKVRMutationPayload, ListOp> = {
typeId: 'rga-list',
syncStrategy: 'operation',
requiresCausalDelivery: true, // Inserts must see their "afterId" first
serializeForSync: (op) => JSON.stringify(op),
deserializeFromSync: (s) => JSON.parse(s),
compressOperations: (ops) => {
// Combine consecutive inserts from same position
// ... implementation ...
return ops.length === 1 ? ops[0] : { type: 'batch', ops };
},
};
```
### Causal Delivery Implementation
For plugins with `requiresCausalDelivery: true`, the sync layer maintains:
```typescript
interface CausalBuffer {
// Pending ops waiting for dependencies
pending: Map<string, { op: CRDTSyncMessage; missingDeps: Set<string> }>;
// Delivered sequence numbers per actor
delivered: Map<string, number>; // actorId → highest delivered seqNum
// Enqueue and attempt delivery
enqueue(message: CRDTSyncMessage): CRDTSyncMessage[]; // Returns newly deliverable
}
```
This is opt-in complexity. Simple state-based CRDTs ignore it entirely.
## Concern 3: Conflated State-Based vs Op-Based Merge Semantics
### Problem Statement
The original merge signature:
```typescript
merge(
currentState: TState | undefined,
incomingChangePayload: TChangePayload,
currentTimestamp: Timestamp | undefined,
incomingTimestamp: Timestamp,
actorId: string,
kvTx?: KVStoreTransaction
): TState;
```
This conflates three distinct CRDT patterns:
| Pattern | Merge Semantics | Mathematical Property |
|---------|-----------------|----------------------|
| State-based | `merge(stateA, stateB) → state` | Commutative, associative, idempotent |
| Op-based | `apply(state, operation) → state` | Operations commute (or causal delivery) |
| Delta-state | `mergeDelta(state, delta) → state` | Deltas are join-semilattice fragments |
### Proposed Solution: Discriminated Union Interface
Split the core CRDT logic into three explicit variants:
```typescript
/** State-based CRDT (e.g., G-Counter, OR-Set) */
interface StateBasedCRDTCore<TState> {
readonly syncStrategy: 'state';
/**
* Merge two complete states. Must be:
* - Commutative: merge(a, b) = merge(b, a)
* - Associative: merge(merge(a, b), c) = merge(a, merge(b, c))
* - Idempotent: merge(a, a) = a
*/
merge: (stateA: TState, stateB: TState) => TState;
}
/** Operation-based CRDT (e.g., RGA, Counter with inc/dec ops) */
interface OpBasedCRDTCore<TState, TOp> {
readonly syncStrategy: 'operation';
/**
* Apply an operation to current state.
* If requiresCausalDelivery is false, operations must commute.
*/
apply: (
state: TState,
operation: TOp,
timestamp: Timestamp,
actorId: string
) => TState;
/** Whether ops require causal ordering */
requiresCausalDelivery: boolean;
}
/** Delta-state CRDT (e.g., Delta-CRDT variants for bandwidth efficiency) */
interface DeltaStateCRDTCore<TState, TDelta> {
readonly syncStrategy: 'delta';
/** Merge a delta into current state */
mergeDelta: (state: TState, delta: TDelta) => TState;
/** Extract delta representing changes from old to new state */
extractDelta: (oldState: TState, newState: TState) => TDelta;
/** Merge two deltas (for delta aggregation) */
mergeDeltas?: (deltaA: TDelta, deltaB: TDelta) => TDelta;
}
/** Union type for CRDT core logic */
type CRDTCore<TState, TOp = any, TDelta = any> =
| StateBasedCRDTCore<TState>
| OpBasedCRDTCore<TState, TOp>
| DeltaStateCRDTCore<TState, TDelta>;
```
### Revised Plugin Interface
```typescript
interface CRDTPluginInterface
TState,
TSchemaOptions = any,
TStoredValue = any,
TChangePayload = TState,
TOp = any,
TDelta = any
> {
/** REQUIRED: Unique identifier */
readonly typeId: string;
// --- Schema ---
schemaDefinition: (options?: TSchemaOptions) => DataType;
defaultValue: (config: TypeConfig) => TState;
// --- CRDT Core (discriminated by syncStrategy) ---
readonly core: CRDTCore<TState, TOp, TDelta>;
// --- Storage ---
readonly storageStrategy: 'blob' | 'decomposed';
encodeForStorage: (state: TState) => TStoredValue | DKVRMutationPayload;
decodeFromStorage: (stored: TStoredValue | DKVRScanResult) => TState;
// --- Sync Serialization ---
serializeForSync: (change: TChangePayload) => string;
deserializeFromSync: (serialized: string) => TChangePayload;
// --- Client Mutations ---
/**
* Generate change payload from user action.
* Return type depends on syncStrategy:
* - 'state': Return new TState
* - 'operation': Return TOp
* - 'delta': Return TDelta
*/
generateChangePayload: (
mutationType: string,
currentState: TState | undefined,
args: any[],
actorId: string
) => TChangePayload;
// --- Queryable Projections (for IVM) ---
projections?: ReadonlyArray<QueryableProjection<TState>>;
mightAffectProjections?: (mutationType: string, args: any[]) => boolean | Set<string>;
// --- Optional Query Hooks ---
compare?: (stateA: TState, stateB: TState) => number;
}
```
### Core Integration: Type-Safe Dispatch
The EntityStore dispatches to the correct method based on `syncStrategy`:
```typescript
// In EntityStoreKV.applyRemoteChange()
function applyRemoteChange(
plugin: CRDTPluginInterface<any, any, any, any, any, any>,
currentState: TState | undefined,
message: CRDTSyncMessage
): TState {
const change = plugin.deserializeFromSync(message.payload);
switch (plugin.core.syncStrategy) {
case 'state':
// State-based: merge current with incoming state
const current = currentState ?? plugin.defaultValue({});
return plugin.core.merge(current, change as TState);
case 'operation':
// Op-based: apply operation to current state
const state = currentState ?? plugin.defaultValue({});
return plugin.core.apply(
state,
change,
message.timestamp,
message.actorId
);
case 'delta':
// Delta-state: merge delta into current state
const base = currentState ?? plugin.defaultValue({});
return plugin.core.mergeDelta(base, change);
default:
throw new Error(`Unknown sync strategy: ${(plugin.core as any).syncStrategy}`);
}
}
```
### Example: G-Counter (State-Based)
```typescript
type GCounterState = Record<string, number>; // actorId → count
const GCounterPlugin: CRDTPluginInterface<GCounterState, {}, string, GCounterState> = {
typeId: 'g-counter',
schemaDefinition: () => S.Custom({ type: 'g-counter' }),
defaultValue: () => ({}),
core: {
syncStrategy: 'state',
merge: (a, b) => {
// Point-wise maximum
const result: GCounterState = { ...a };
for (const [actor, count] of Object.entries(b)) {
result[actor] = Math.max(result[actor] ?? 0, count);
}
return result;
},
},
storageStrategy: 'blob',
encodeForStorage: (state) => JSON.stringify(state),
decodeFromStorage: (stored) => JSON.parse(stored as string),
serializeForSync: (state) => JSON.stringify(state),
deserializeFromSync: (s) => JSON.parse(s),
generateChangePayload: (mutationType, currentState, args, actorId) => {
if (mutationType !== 'increment') throw new Error('Unknown mutation');
const amount = args[0] as number;
const state = currentState ?? {};
return { ...state, [actorId]: (state[actorId] ?? 0) + amount };
},
projections: [
{
key: 'value',
extract: (state) => Object.values(state).reduce((a, b) => a + b, 0),
supportedOperators: ['=', '!=', '>', '<', '>=', '<='],
},
],
};
```
### Example: RGA List (Op-Based with Causal Delivery)
```typescript
type RGAOp =
| { type: 'insert'; id: string; afterId: string | null; value: any }
| { type: 'delete'; id: string };
interface RGAElement {
id: string;
value: any;
tombstone: boolean;
timestamp: Timestamp;
}
interface RGAState {
elements: Map<string, RGAElement>;
order: string[]; // Sorted element IDs
}
const RGAListPlugin: CRDTPluginInterface<RGAState, {}, DKVRMutationPayload, RGAOp, RGAOp> = {
typeId: 'rga-list',
schemaDefinition: () => S.Custom({ type: 'rga-list' }),
defaultValue: () => ({ elements: new Map(), order: [] }),
core: {
syncStrategy: 'operation',
requiresCausalDelivery: true, // Insert needs to see afterId
apply: (state, op, timestamp, actorId) => {
const newState = structuredClone(state);
switch (op.type) {
case 'insert': {
const element: RGAElement = {
id: op.id,
value: op.value,
tombstone: false,
timestamp,
};
newState.elements.set(op.id, element);
// Find position after afterId
const afterIndex = op.afterId
? newState.order.indexOf(op.afterId)
: -1;
// Insert, handling concurrent inserts via timestamp comparison
let insertIndex = afterIndex + 1;
while (
insertIndex < newState.order.length &&
compareTimestamps(
newState.elements.get(newState.order[insertIndex])!.timestamp,
timestamp
) > 0
) {
insertIndex++;
}
newState.order.splice(insertIndex, 0, op.id);
break;
}
case 'delete': {
const element = newState.elements.get(op.id);
if (element) {
element.tombstone = true;
}
break;
}
}
return newState;
},
},
storageStrategy: 'decomposed',
encodeForStorage: (state): DKVRMutationPayload => {
const sets = new Map<Tuple, any>();
// Store order metadata
sets.set(['_meta', 'order'], state.order);
// Store each element
for (const [id, element] of state.elements) {
sets.set(['_element', id], element);
}
return { sets };
},
decodeFromStorage: async (scan: DKVRScanResult): Promise<RGAState> => {
const state: RGAState = { elements: new Map(), order: [] };
for await (const [key, value] of scan) {
if (key[0] === '_meta' && key[1] === 'order') {
state.order = value;
} else if (key[0] === '_element') {
state.elements.set(key[1] as string, value);
}
}
return state;
},
serializeForSync: (op) => JSON.stringify(op),
deserializeFromSync: (s) => JSON.parse(s),
generateChangePayload: (mutationType, currentState, args, actorId) => {
switch (mutationType) {
case 'insert': {
const [index, value] = args;
const afterId = index === 0
? null
: currentState?.order[index - 1] ?? null;
return {
type: 'insert',
id: `${actorId}-${Date.now()}-${Math.random()}`,
afterId,
value,
};
}
case 'delete': {
const [index] = args;
return {
type: 'delete',
id: currentState!.order[index],
};
}
default:
throw new Error(`Unknown mutation: ${mutationType}`);
}
},
projections: [
{
key: 'length',
extract: (state) => state.order.filter(id =>
!state.elements.get(id)?.tombstone
).length,
supportedOperators: ['=', '!=', '>', '<', '>=', '<='],
},
],
mightAffectProjections: (mutationType) => {
return mutationType === 'insert' || mutationType === 'delete'
? new Set(['length'])
: false;
},
};
```
## Summary of Changes
| Concern | Original Proposal | This Proposal |
|---------|-------------------|---------------|
| **DKVR + IVM** | "Deferred" | Queryable projections with explicit extraction |
| **Op-based sync** | "Undecided" | Envelope protocol with strategy declaration |
| **Merge semantics** | Single conflated `merge()` | Discriminated union: `merge` / `apply` / `mergeDelta` |
### New Required Fields
```typescript
interface CRDTPluginInterface {
// NEW: Replaces old merge()
core: CRDTCore<TState, TOp, TDelta>;
// NEW: Wire serialization
serializeForSync: (change: TChangePayload) => string;
deserializeFromSync: (serialized: string) => TChangePayload;
// NEW (optional): IVM optimization
projections?: ReadonlyArray<QueryableProjection<TState>>;
mightAffectProjections?: (mutationType: string, args: any[]) => boolean | Set<string>;
}
```
### Migration Path
Existing plugin sketches using the old `merge()` signature can be adapted:
```typescript
// Old
merge: (current, incoming, currentTs, incomingTs, actorId) => { ... }
// New (state-based)
core: {
syncStrategy: 'state',
merge: (a, b) => { /* same logic, but pure state merge */ }
}
```
## Next Steps
1. **Validate with prototype**: Implement G-Counter (state-based, blob) and RGA-List (op-based, DKVR) to stress-test the interface
2. **Benchmark projections**: Measure overhead of projection extraction vs query performance gains
3. **Causal delivery implementation**: Build the causal buffer for op-based CRDTs in `@triplit/server-core`
4. **TypeScript inference**: Ensure `TChangePayload` correctly narrows based on `syncStrategy`
Yes, this is a natural extension. Triplit already mentions Durable Objects support in their README. Let me sketch out how all three pieces fit together architecturally.
---
# Triplit Cloudflare Integration: Architecture Proposal
## Overview
This proposal extends Triplit with first-class Cloudflare support:
| Component | Role | Benefit |
|-----------|------|---------|
| **Durable Objects** | Per-document/per-user sync coordinators | Global edge distribution, WebSocket handling, single-writer consistency |
| **D1** | Persistent SQL storage | Queryable data, automatic replication, familiar SQL |
| **Local Replicas** | Client-side SQLite via D1's local mode | Offline support, instant reads, sync when online |
## Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Cloudflare Edge Network │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Durable │ │ Durable │ │ Durable │ │
│ │ Object │ │ Object │ │ Object │ ... │
│ │ (doc-123) │ │ (doc-456) │ │ (user-abc) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ D1 │ │
│ │ (SQLite) │ │
│ │ │ │
│ │ - Entities │ │
│ │ - Triples │ │
│ │ - Projections │ │
│ └─────────────────┘ │
│ │ │
└─────────────────────────────┼────────────────────────────────────────────────┘
│ Read Replicas
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Clients │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Browser │ │ Mobile App │ │ Desktop App │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │
│ │ │ IndexedDB │ │ │ │ SQLite │ │ │ │ SQLite │ │ │
│ │ │ (Triplit) │ │ │ │ (Triplit) │ │ │ │ (Triplit) │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ ▲ ▲ ▲ │
│ │ WebSocket │ │ │
│ └────────────────────┴────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## Component 1: Durable Objects Storage Adapter
Durable Objects provide single-instance coordination with built-in storage. Each DO can manage sync for a document, user, or shard.
### Adapter Interface
```typescript
// packages/cojson-storage-durable-object/src/index.ts
import { DurableObject } from 'cloudflare:workers';
interface TriplitDurableObjectEnv {
TRIPLIT_DO: DurableObjectNamespace;
D1_DATABASE?: D1Database; // Optional: for D1 persistence
}
/**
* Storage adapter using Durable Object's built-in storage API.
* Suitable for per-document or per-user coordination.
*/
export class DurableObjectStorageAdapter implements TriplitStorageAdapter {
constructor(private storage: DurableObjectStorage) {}
async get(key: Tuple): Promise<any> {
return this.storage.get(tupleToString(key));
}
async set(key: Tuple, value: any): Promise<void> {
await this.storage.put(tupleToString(key), value);
}
async delete(key: Tuple): Promise<void> {
await this.storage.delete(tupleToString(key));
}
async scan(options: ScanOptions): Promise<AsyncIterable<[Tuple, any]>> {
const prefix = tupleToString(options.prefix);
const entries = await this.storage.list({ prefix });
return (async function* () {
for (const [key, value] of entries) {
yield [stringToTuple(key), value];
}
})();
}
async transact<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T> {
// DO storage has implicit transactions within a single request
// For explicit transaction control, use storage.transaction()
return this.storage.transaction(async (txn) => {
const tx = new DurableObjectTransaction(txn);
return fn(tx);
});
}
}
```
### Durable Object Implementation
```typescript
// src/triplit-sync-do.ts
import { DurableObject } from 'cloudflare:workers';
import { createTriplitServer, TriplitServer } from '@triplit/server-core';
import { DurableObjectStorageAdapter } from '@triplit/storage-durable-object';
export class TriplitSyncDO extends DurableObject {
private server: TriplitServer | null = null;
private connections: Map<string, WebSocket> = new Map();
constructor(state: DurableObjectState, env: Env) {
super(state, env);
}
private async getServer(): Promise<TriplitServer> {
if (!this.server) {
const storage = new DurableObjectStorageAdapter(this.ctx.storage);
this.server = createTriplitServer({
storage,
// Register CRDT plugins
plugins: [GCounterPlugin, RGAListPlugin],
});
}
return this.server;
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
// WebSocket upgrade for sync
if (request.headers.get('Upgrade') === 'websocket') {
return this.handleWebSocket(request);
}
// REST API for queries (optional)
if (url.pathname.startsWith('/api/')) {
return this.handleRest(request);
}
return new Response('Not found', { status: 404 });
}
private async handleWebSocket(request: Request): Promise<Response> {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);
const connectionId = crypto.randomUUID();
this.connections.set(connectionId, server);
server.accept();
const triplitServer = await this.getServer();
server.addEventListener('message', async (event) => {
const message = JSON.parse(event.data as string);
const response = await triplitServer.handleMessage(connectionId, message);
if (response) {
server.send(JSON.stringify(response));
}
// Broadcast to other connections if needed
if (response?.broadcast) {
this.broadcast(connectionId, response.broadcast);
}
});
server.addEventListener('close', () => {
this.connections.delete(connectionId);
triplitServer.handleDisconnect(connectionId);
});
return new Response(null, { status: 101, webSocket: client });
}
private broadcast(excludeId: string, message: any): void {
const data = JSON.stringify(message);
for (const [id, ws] of this.connections) {
if (id !== excludeId && ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
}
}
// Hibernation support for cost efficiency
async webSocketMessage(ws: WebSocket, message: string): Promise<void> {
// Handle messages after hibernation wake
const server = await this.getServer();
const connectionId = this.getConnectionId(ws);
const response = await server.handleMessage(connectionId, JSON.parse(message));
if (response) {
ws.send(JSON.stringify(response));
}
}
async webSocketClose(ws: WebSocket): Promise<void> {
const connectionId = this.getConnectionId(ws);
this.connections.delete(connectionId);
}
}
```
### Worker Entry Point
```typescript
// src/index.ts
export { TriplitSyncDO } from './triplit-sync-do';
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
// Route to appropriate Durable Object based on document/user ID
// Example: /sync/doc/doc-123 → DO instance for doc-123
const match = url.pathname.match(/^\/sync\/(doc|user)\/([^/]+)/);
if (match) {
const [, type, id] = match;
const doId = env.TRIPLIT_DO.idFromName(`${type}:${id}`);
const stub = env.TRIPLIT_DO.get(doId);
return stub.fetch(request);
}
return new Response('Invalid route', { status: 400 });
},
};
```
---
## Component 2: D1 Storage Adapter
D1 provides SQLite at the edge with automatic replication. Better for larger datasets and complex queries.
### Schema
```sql
-- D1 schema for Triplit
-- Optimized for the triple store model
-- Entity headers
CREATE TABLE entities (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (collection, entity_id)
);
-- Triple store (attribute-value-timestamp)
CREATE TABLE triples (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
value TEXT, -- JSON-encoded
timestamp_hlc TEXT NOT NULL, -- HLC as string for sorting
actor_id TEXT NOT NULL,
tombstone INTEGER DEFAULT 0,
PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc)
);
-- CRDT projections (for IVM)
CREATE TABLE projections (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
projection_key TEXT NOT NULL,
value TEXT, -- JSON-encoded primitive
PRIMARY KEY (collection, entity_id, attribute, projection_key)
);
-- Indexes for common queries
CREATE INDEX idx_triples_entity ON triples(collection, entity_id);
CREATE INDEX idx_triples_attribute ON triples(collection, attribute);
CREATE INDEX idx_projections_lookup ON projections(collection, projection_key, value);
-- For DKVR storage (decomposed CRDTs)
CREATE TABLE dkvr_entries (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
key_suffix TEXT NOT NULL, -- JSON-encoded tuple
value TEXT NOT NULL, -- JSON-encoded
PRIMARY KEY (collection, entity_id, attribute, key_suffix)
);
```
### D1 Storage Adapter
```typescript
// packages/cojson-storage-d1/src/index.ts
export class D1StorageAdapter implements TriplitStorageAdapter {
constructor(private db: D1Database) {}
async get(key: Tuple): Promise<any> {
const [collection, entityId, attribute, ...rest] = key;
if (rest.length === 0) {
// Get latest triple value
const result = await this.db
.prepare(`
SELECT value FROM triples
WHERE collection = ? AND entity_id = ? AND attribute = ? AND tombstone = 0
ORDER BY timestamp_hlc DESC LIMIT 1
`)
.bind(collection, entityId, attribute)
.first();
return result ? JSON.parse(result.value as string) : undefined;
}
// DKVR lookup
const keySuffix = JSON.stringify(rest);
const result = await this.db
.prepare(`
SELECT value FROM dkvr_entries
WHERE collection = ? AND entity_id = ? AND attribute = ? AND key_suffix = ?
`)
.bind(collection, entityId, attribute, keySuffix)
.first();
return result ? JSON.parse(result.value as string) : undefined;
}
async set(key: Tuple, value: any, timestamp?: Timestamp): Promise<void> {
const [collection, entityId, attribute, ...rest] = key;
const jsonValue = JSON.stringify(value);
if (rest.length === 0) {
// Standard triple
await this.db
.prepare(`
INSERT INTO triples (collection, entity_id, attribute, value, timestamp_hlc, actor_id)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (collection, entity_id, attribute, timestamp_hlc)
DO UPDATE SET value = excluded.value
`)
.bind(collection, entityId, attribute, jsonValue, timestamp?.toString(), timestamp?.actorId)
.run();
} else {
// DKVR entry
const keySuffix = JSON.stringify(rest);
await this.db
.prepare(`
INSERT INTO dkvr_entries (collection, entity_id, attribute, key_suffix, value)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (collection, entity_id, attribute, key_suffix)
DO UPDATE SET value = excluded.value
`)
.bind(collection, entityId, attribute, keySuffix, jsonValue)
.run();
}
}
async scan(options: ScanOptions): Promise<AsyncIterable<[Tuple, any]>> {
const { prefix, limit, reverse } = options;
const [collection, entityId, attribute] = prefix;
let query: string;
let bindings: any[];
if (attribute) {
// DKVR scan within an attribute
query = `
SELECT key_suffix, value FROM dkvr_entries
WHERE collection = ? AND entity_id = ? AND attribute = ?
${reverse ? 'ORDER BY key_suffix DESC' : 'ORDER BY key_suffix ASC'}
${limit ? `LIMIT ${limit}` : ''}
`;
bindings = [collection, entityId, attribute];
} else if (entityId) {
// Scan all attributes of an entity
query = `
SELECT attribute, value, timestamp_hlc FROM triples
WHERE collection = ? AND entity_id = ? AND tombstone = 0
ORDER BY attribute ${reverse ? 'DESC' : 'ASC'}
${limit ? `LIMIT ${limit}` : ''}
`;
bindings = [collection, entityId];
} else {
// Scan all entities in collection
query = `
SELECT DISTINCT entity_id FROM entities
WHERE collection = ?
ORDER BY entity_id ${reverse ? 'DESC' : 'ASC'}
${limit ? `LIMIT ${limit}` : ''}
`;
bindings = [collection];
}
const results = await this.db.prepare(query).bind(...bindings).all();
return (async function* () {
for (const row of results.results) {
// Transform row to [Tuple, value] format
yield transformRowToTuple(row, prefix);
}
})();
}
async transact<T>(fn: (tx: StorageTransaction) => Promise<T>): Promise<T> {
// D1 supports batch operations
const batch: D1PreparedStatement[] = [];
const tx: StorageTransaction = {
get: (key) => this.get(key),
set: (key, value, timestamp) => {
// Queue for batch execution
batch.push(this.prepareSet(key, value, timestamp));
return Promise.resolve();
},
delete: (key) => {
batch.push(this.prepareDelete(key));
return Promise.resolve();
},
};
const result = await fn(tx);
// Execute all writes in a batch
if (batch.length > 0) {
await this.db.batch(batch);
}
return result;
}
// Projection support
async setProjection(
collection: string,
entityId: string,
attribute: string,
projectionKey: string,
value: any
): Promise<void> {
await this.db
.prepare(`
INSERT INTO projections (collection, entity_id, attribute, projection_key, value)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (collection, entity_id, attribute, projection_key)
DO UPDATE SET value = excluded.value
`)
.bind(collection, entityId, attribute, projectionKey, JSON.stringify(value))
.run();
}
async queryByProjection(
collection: string,
attribute: string,
projectionKey: string,
operator: string,
compareValue: any
): Promise<string[]> {
const sqlOperator = this.toSqlOperator(operator);
const results = await this.db
.prepare(`
SELECT entity_id FROM projections
WHERE collection = ? AND attribute = ? AND projection_key = ?
AND value ${sqlOperator} ?
`)
.bind(collection, attribute, projectionKey, JSON.stringify(compareValue))
.all();
return results.results.map((r) => r.entity_id as string);
}
}
```
---
## Component 3: Hybrid DO + D1 Architecture
Best of both worlds: Durable Objects for real-time sync coordination, D1 for persistent queryable storage.
```typescript
// src/triplit-hybrid-do.ts
export class TriplitHybridDO extends DurableObject {
private server: TriplitServer | null = null;
constructor(
state: DurableObjectState,
private env: { D1_DATABASE: D1Database }
) {
super(state, env);
}
private async getServer(): Promise<TriplitServer> {
if (!this.server) {
// Use DO storage for hot data / active sync state
const doStorage = new DurableObjectStorageAdapter(this.ctx.storage);
// Use D1 for persistent storage and queries
const d1Storage = new D1StorageAdapter(this.env.D1_DATABASE);
// Tiered storage: DO for recent, D1 for persistent
const storage = new TieredStorageAdapter({
hot: doStorage,
cold: d1Storage,
// Move to D1 after 5 minutes of inactivity
tierDownAfterMs: 5 * 60 * 1000,
});
this.server = createTriplitServer({
storage,
plugins: [GCounterPlugin, RGAListPlugin],
// Sync to D1 on every mutation
onMutation: async (collection, entityId, changes) => {
await d1Storage.persistChanges(collection, entityId, changes);
},
});
}
return this.server;
}
// ... WebSocket handling same as before
}
```
### Tiered Storage Adapter
```typescript
// packages/cojson-storage-tiered/src/index.ts
interface TieredStorageOptions {
hot: TriplitStorageAdapter; // Fast, ephemeral (DO storage)
cold: TriplitStorageAdapter; // Persistent, queryable (D1)
tierDownAfterMs: number;
}
export class TieredStorageAdapter implements TriplitStorageAdapter {
private hotKeys: Map<string, number> = new Map(); // key -> last access time
constructor(private options: TieredStorageOptions) {
// Periodically tier down inactive keys
setInterval(() => this.tierDown(), 60_000);
}
async get(key: Tuple): Promise<any> {
const keyStr = tupleToString(key);
// Try hot storage first
const hotValue = await this.options.hot.get(key);
if (hotValue !== undefined) {
this.hotKeys.set(keyStr, Date.now());
return hotValue;
}
// Fall back to cold storage
const coldValue = await this.options.cold.get(key);
if (coldValue !== undefined) {
// Promote to hot storage
await this.options.hot.set(key, coldValue);
this.hotKeys.set(keyStr, Date.now());
}
return coldValue;
}
async set(key: Tuple, value: any, timestamp?: Timestamp): Promise<void> {
const keyStr = tupleToString(key);
// Write to hot storage immediately
await this.options.hot.set(key, value, timestamp);
this.hotKeys.set(keyStr, Date.now());
// Async write to cold storage (fire and forget, or queue)
this.options.cold.set(key, value, timestamp).catch(console.error);
}
private async tierDown(): Promise<void> {
const now = Date.now();
const threshold = now - this.options.tierDownAfterMs;
for (const [keyStr, lastAccess] of this.hotKeys) {
if (lastAccess < threshold) {
// Remove from hot storage
const key = stringToTuple(keyStr);
await this.options.hot.delete(key);
this.hotKeys.delete(keyStr);
}
}
}
}
```
---
## Component 4: Client-Side Local Replica
For true offline support, clients maintain a local SQLite replica that syncs with the edge.
### Browser (sql.js + IndexedDB persistence)
```typescript
// packages/triplit-client-browser/src/local-replica.ts
import initSqlJs, { Database } from 'sql.js';
export class BrowserLocalReplica {
private db: Database | null = null;
async init(): Promise<void> {
const SQL = await initSqlJs({
locateFile: (file) => `https://sql.js.org/dist/${file}`,
});
// Load from IndexedDB if exists
const persisted = await this.loadFromIndexedDB();
this.db = persisted
? new SQL.Database(persisted)
: new SQL.Database();
// Initialize schema
this.db.run(TRIPLIT_SCHEMA);
}
async query(sql: string, params?: any[]): Promise<any[]> {
return this.db!.exec(sql, params);
}
async persist(): Promise<void> {
const data = this.db!.export();
await this.saveToIndexedDB(data);
}
private async loadFromIndexedDB(): Promise<Uint8Array | null> {
// ... IndexedDB read
}
private async saveToIndexedDB(data: Uint8Array): Promise<void> {
// ... IndexedDB write
}
}
```
### React Native / Mobile (expo-sqlite or op-sqlite)
```typescript
// packages/triplit-client-mobile/src/local-replica.ts
import * as SQLite from 'expo-sqlite';
export class MobileLocalReplica {
private db: SQLite.SQLiteDatabase;
async init(): Promise<void> {
this.db = await SQLite.openDatabaseAsync('triplit-replica.db');
await this.db.execAsync(TRIPLIT_SCHEMA);
}
async query(sql: string, params?: any[]): Promise<any[]> {
return this.db.getAllAsync(sql, params);
}
// No explicit persist needed - SQLite writes to disk
}
```
### Sync Protocol with Local Replica
```typescript
// packages/triplit-client/src/sync-manager.ts
export class SyncManager {
private localReplica: LocalReplica;
private ws: WebSocket | null = null;
private pendingWrites: Map<string, Change> = new Map();
constructor(
private serverUrl: string,
private localReplica: LocalReplica
) {}
async connect(): Promise<void> {
this.ws = new WebSocket(this.serverUrl);
this.ws.onopen = () => {
// Send any pending writes
this.flushPendingWrites();
// Request sync from last known checkpoint
const checkpoint = await this.localReplica.getCheckpoint();
this.ws.send(JSON.stringify({
type: 'SYNC_REQUEST',
since: checkpoint,
}));
};
this.ws.onmessage = async (event) => {
const message = JSON.parse(event.data);
await this.handleServerMessage(message);
};
this.ws.onclose = () => {
// Reconnect with exponential backoff
setTimeout(() => this.connect(), this.getBackoffDelay());
};
}
async write(collection: string, entityId: string, changes: any): Promise<void> {
const changeId = crypto.randomUUID();
// 1. Apply optimistically to local replica
await this.localReplica.applyChanges(collection, entityId, changes);
// 2. Queue for sync
this.pendingWrites.set(changeId, { collection, entityId, changes });
// 3. Try to send immediately if connected
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendChange(changeId, { collection, entityId, changes });
}
// 4. Persist pending writes for crash recovery
await this.persistPendingWrites();
}
private async handleServerMessage(message: SyncMessage): Promise<void> {
switch (message.type) {
case 'CHANGES':
// Apply server changes to local replica
for (const change of message.changes) {
await this.localReplica.applyChanges(
change.collection,
change.entityId,
change.changes,
change.timestamp
);
}
// Update checkpoint
await this.localReplica.setCheckpoint(message.checkpoint);
break;
case 'ACK':
// Remove from pending writes
this.pendingWrites.delete(message.changeId);
await this.persistPendingWrites();
break;
case 'REJECT':
// Rollback optimistic update
await this.localReplica.rollback(message.changeId);
this.pendingWrites.delete(message.changeId);
break;
}
}
}
```
---
## Complete Client Architecture
```typescript
// packages/triplit-client-cloudflare/src/index.ts
import { TriplitClient } from '@triplit/client';
import { BrowserLocalReplica } from './local-replica';
import { SyncManager } from './sync-manager';
import { D1QueryAdapter } from './d1-query-adapter';
interface CloudflareTriplitClientOptions {
/** Cloudflare Worker URL for sync */
syncUrl: string;
/** D1 database for direct queries (optional, for Workers) */
d1?: D1Database;
/** Durable Object stub (optional, for Workers calling DO directly) */
doStub?: DurableObjectStub;
/** CRDT plugins to register */
plugins?: CRDTPluginInterface[];
}
export async function createCloudflareTriplitClient(
options: CloudflareTriplitClientOptions
): Promise<TriplitClient> {
// Initialize local replica
const localReplica = new BrowserLocalReplica();
await localReplica.init();
// Set up sync manager
const syncManager = new SyncManager(options.syncUrl, localReplica);
// Create Triplit client with local storage
const client = new TriplitClient({
storage: new LocalReplicaStorageAdapter(localReplica),
plugins: options.plugins ?? [],
// Hook into writes to sync
onMutation: (collection, entityId, changes) => {
syncManager.write(collection, entityId, changes);
},
});
// Start sync
await syncManager.connect();
return client;
}
```
---
## Deployment Configuration
### wrangler.toml
```toml
name = "triplit-edge"
main = "src/index.ts"
compatibility_date = "2024-01-01"
# Durable Objects
[durable_objects]
bindings = [
{ name = "TRIPLIT_DO", class_name = "TriplitSyncDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["TriplitSyncDO"]
# D1 Database
[[d1_databases]]
binding = "D1_DATABASE"
database_name = "triplit-db"
database_id = "xxxxx-xxxx-xxxx-xxxx"
# Environment variables
[vars]
JWT_SECRET = "your-secret-here"
```
---
## Summary: What You Get
| Feature | DO Only | D1 Only | DO + D1 Hybrid |
|---------|---------|---------|----------------|
| Real-time sync | ✅ WebSockets | ❌ Polling | ✅ WebSockets |
| Global edge distribution | ✅ | ✅ | ✅ |
| Queryable SQL | ❌ KV only | ✅ | ✅ |
| Large datasets | ⚠️ 128KB limit/key | ✅ | ✅ |
| Hibernation (cost) | ✅ | N/A | ✅ |
| Offline support | ✅ Client replica | ✅ Client replica | ✅ Client replica |
| Automatic replication | ❌ | ✅ D1 replicas | ✅ |
| CRDT plugin support | ✅ | ✅ | ✅ |
---
## Recommended Approach
**Start with DO + D1 Hybrid:**
1. Durable Objects handle WebSocket connections and real-time coordination
2. D1 provides persistent storage and complex queries
3. Tiered storage keeps hot data in DO, persists to D1
4. Client-side SQLite replicas enable full offline support
This gives you:
- ✅ Global edge deployment (Cloudflare network)
- ✅ Real-time sync (DO WebSockets)
- ✅ Queryable SQL (D1)
- ✅ Offline read/write (client SQLite)
- ✅ Automatic conflict resolution (Triplit property LWW + CRDT plugins)
- ✅ Postgres alternative that scales globally
The hybrid approach adds minimal complexity over DO-only while unlocking D1's query capabilities—worth it for most applications.
# Adding Postgres Support to Triplit
## Overview
Triplit's storage layer is pluggable. Current adapters include SQLite, IndexedDB, LMDB, and memory. This document specifies a Postgres adapter that enables:
- Server-side Triplit with Postgres persistence
- Queryable data alongside Triplit's sync
- Familiar Postgres tooling (backups, monitoring, extensions)
- Horizontal scaling via read replicas
---
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Triplit Server │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Query │ │ Entity │ │ Sync │ │
│ │ Engine │───▶│ Store │◀───│ Manager │ │
│ └──────────────┘ └──────┬───────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Storage Adapter │ ◀── Interface │
│ │ (Postgres) │ │
│ └────────┬────────┘ │
│ │ │
└─────────────────────────────┼────────────────────────────────────┘
│
▼
┌─────────────────┐
│ Postgres │
│ │
│ ┌───────────┐ │
│ │ triples │ │
│ ├───────────┤ │
│ │ metadata │ │
│ ├───────────┤ │
│ │ dkvr │ │
│ ├───────────┤ │
│ │projections│ │
│ └───────────┘ │
└─────────────────┘
```
---
## Part 1: Triplit's Storage Interface
First, let's understand the interface we need to implement. Based on Triplit's existing adapters:
```typescript
// From @triplit/db/storage/adapter-sqlite
export interface AsyncSQLiteAdapter {
execute(sql: string, args?: any[]): Promise<QueryResult>;
normalizeResults(results: any): { key: string; value: string }[];
transact(fn: (adapter: AsyncSQLiteExecutor) => Promise<void>): Promise<void>;
close(): Promise<void>;
}
export interface AsyncSQLiteExecutor {
execute(sql: string, args?: any[]): Promise<QueryResult>;
}
export interface QueryResult {
rows?: any[];
rowsAffected?: number;
lastInsertRowId?: number;
}
```
However, Triplit also has a higher-level `KeyValueStore` interface that storage adapters can implement directly:
```typescript
// Core KV interface used by EntityStore
export type Tuple = (string | number)[];
export interface KVStore {
get(key: Tuple): Promise<any>;
set(key: Tuple, value: any): Promise<void>;
delete(key: Tuple): Promise<void>;
scan(options: ScanOptions): AsyncIterable<[Tuple, any]>;
transact<T>(fn: (tx: KVStoreTransaction) => Promise<T>): Promise<T>;
}
export interface ScanOptions {
prefix?: Tuple;
start?: Tuple;
end?: Tuple;
limit?: number;
reverse?: boolean;
}
export interface KVStoreTransaction extends KVStore {
commit(): Promise<void>;
rollback(): Promise<void>;
}
```
We'll implement both: a low-level SQL adapter and a high-level KV adapter.
---
## Part 2: Postgres Schema
```sql
-- packages/cojson-storage-postgres/schema.sql
-- ============================================================================
-- CORE TABLES
-- ============================================================================
-- Key-value store for Triplit's tuple-based storage
-- This is the foundational table that mirrors Triplit's KV abstraction
CREATE TABLE kv_store (
key_tuple JSONB NOT NULL PRIMARY KEY, -- Tuple as JSON array
value JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Optimized index for prefix scans (most common operation)
CREATE INDEX idx_kv_prefix ON kv_store USING GIN (key_tuple jsonb_path_ops);
-- Index for range scans
CREATE INDEX idx_kv_key ON kv_store USING BTREE ((key_tuple::text));
-- ============================================================================
-- DENORMALIZED TABLES (Optional, for query performance)
-- ============================================================================
-- Entity metadata for fast collection scans
CREATE TABLE entities (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id)
);
CREATE INDEX idx_entities_collection ON entities (collection);
CREATE INDEX idx_entities_updated ON entities (updated_at DESC);
-- Triples table (denormalized view of entity attributes)
-- Enables SQL queries on Triplit data
CREATE TABLE triples (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
value JSONB,
timestamp_hlc TEXT NOT NULL, -- HLC for ordering
actor_id TEXT NOT NULL,
tombstone BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc)
);
CREATE INDEX idx_triples_entity ON triples (collection, entity_id);
CREATE INDEX idx_triples_attribute ON triples (collection, attribute);
CREATE INDEX idx_triples_value ON triples USING GIN (value jsonb_path_ops);
CREATE INDEX idx_triples_active ON triples (collection, entity_id, attribute)
WHERE tombstone = FALSE;
-- DKVR entries for decomposed CRDTs (lists, text, etc.)
CREATE TABLE dkvr_entries (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
key_suffix JSONB NOT NULL, -- Sub-key tuple
value JSONB NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id, attribute, key_suffix)
);
CREATE INDEX idx_dkvr_attribute ON dkvr_entries (collection, entity_id, attribute);
-- Projections for CRDT query optimization (from plugin API proposal)
CREATE TABLE projections (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
projection_key TEXT NOT NULL,
value JSONB,
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id, attribute, projection_key)
);
CREATE INDEX idx_projections_lookup ON projections (collection, attribute, projection_key);
CREATE INDEX idx_projections_value ON projections (collection, attribute, projection_key, value);
-- ============================================================================
-- SYNC STATE TABLES
-- ============================================================================
-- Client sync checkpoints
CREATE TABLE sync_checkpoints (
client_id TEXT NOT NULL PRIMARY KEY,
checkpoint JSONB NOT NULL, -- Vector clock or sequence numbers
last_sync_at TIMESTAMPTZ DEFAULT NOW()
);
-- Outbox for pending changes (optional, for guaranteed delivery)
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
change_payload JSONB NOT NULL,
timestamp_hlc TEXT NOT NULL,
actor_id TEXT NOT NULL,
status TEXT DEFAULT 'pending', -- pending, delivered, failed
created_at TIMESTAMPTZ DEFAULT NOW(),
delivered_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_pending ON outbox (status, created_at) WHERE status = 'pending';
-- ============================================================================
-- FUNCTIONS
-- ============================================================================
-- Function to update timestamps automatically
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Apply to tables
CREATE TRIGGER trg_kv_store_updated
BEFORE UPDATE ON kv_store
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
CREATE TRIGGER trg_entities_updated
BEFORE UPDATE ON entities
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
-- Function for tuple comparison (for range scans)
CREATE OR REPLACE FUNCTION tuple_compare(a JSONB, b JSONB)
RETURNS INTEGER AS $$
DECLARE
len_a INTEGER := jsonb_array_length(a);
len_b INTEGER := jsonb_array_length(b);
min_len INTEGER := LEAST(len_a, len_b);
i INTEGER;
elem_a JSONB;
elem_b JSONB;
BEGIN
FOR i IN 0..min_len-1 LOOP
elem_a := a->i;
elem_b := b->i;
-- Compare elements (handles string and number)
IF elem_a::text < elem_b::text THEN
RETURN -1;
ELSIF elem_a::text > elem_b::text THEN
RETURN 1;
END IF;
END LOOP;
-- Equal up to min length, shorter tuple comes first
RETURN len_a - len_b;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
-- Function to check if tuple starts with prefix
CREATE OR REPLACE FUNCTION tuple_has_prefix(tuple JSONB, prefix JSONB)
RETURNS BOOLEAN AS $$
DECLARE
prefix_len INTEGER := jsonb_array_length(prefix);
i INTEGER;
BEGIN
IF jsonb_array_length(tuple) < prefix_len THEN
RETURN FALSE;
END IF;
FOR i IN 0..prefix_len-1 LOOP
IF tuple->i IS DISTINCT FROM prefix->i THEN
RETURN FALSE;
END IF;
END LOOP;
RETURN TRUE;
END;
$$ LANGUAGE plpgsql IMMUTABLE;
```
---
## Part 3: Postgres Storage Adapter
```typescript
// packages/cojson-storage-postgres/src/index.ts
import { Pool, PoolClient, QueryResult as PgQueryResult } from 'pg';
// Re-export for convenience
export { Pool } from 'pg';
// ============================================================================
// Types
// ============================================================================
export type Tuple = (string | number)[];
export interface ScanOptions {
prefix?: Tuple;
start?: Tuple;
end?: Tuple;
limit?: number;
reverse?: boolean;
}
export interface KVStore {
get(key: Tuple): Promise<any>;
set(key: Tuple, value: any): Promise<void>;
delete(key: Tuple): Promise<void>;
scan(options: ScanOptions): AsyncGenerator<[Tuple, any]>;
}
export interface KVStoreTransaction extends KVStore {
commit(): Promise<void>;
rollback(): Promise<void>;
}
export interface PostgresStorageOptions {
/** Postgres connection pool */
pool: Pool;
/**
* Enable denormalized tables (entities, triples) for SQL queries.
* Adds write overhead but enables rich querying.
* Default: true
*/
enableDenormalization?: boolean;
/**
* Schema name for all Triplit tables.
* Default: 'triplit'
*/
schema?: string;
}
// ============================================================================
// Main Adapter
// ============================================================================
export class PostgresStorageAdapter implements KVStore {
private pool: Pool;
private schema: string;
private enableDenormalization: boolean;
constructor(options: PostgresStorageOptions) {
this.pool = options.pool;
this.schema = options.schema ?? 'public';
this.enableDenormalization = options.enableDenormalization ?? true;
}
// --------------------------------------------------------------------------
// Initialization
// --------------------------------------------------------------------------
async initialize(): Promise<void> {
// Run schema migrations
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Check if schema exists, create if not
await client.query(`
CREATE SCHEMA IF NOT EXISTS ${this.schema}
`);
// Create tables (idempotent)
await this.createTables(client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
private async createTables(client: PoolClient): Promise<void> {
// KV store table
await client.query(`
CREATE TABLE IF NOT EXISTS ${this.schema}.kv_store (
key_tuple JSONB NOT NULL PRIMARY KEY,
value JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
// Create indexes
await client.query(`
CREATE INDEX IF NOT EXISTS idx_kv_prefix
ON ${this.schema}.kv_store USING GIN (key_tuple jsonb_path_ops)
`);
if (this.enableDenormalization) {
// Entities table
await client.query(`
CREATE TABLE IF NOT EXISTS ${this.schema}.entities (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id)
)
`);
// Triples table
await client.query(`
CREATE TABLE IF NOT EXISTS ${this.schema}.triples (
collection TEXT NOT NULL,
entity_id TEXT NOT NULL,
attribute TEXT NOT NULL,
value JSONB,
timestamp_hlc TEXT NOT NULL,
actor_id TEXT NOT NULL,
tombstone BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (collection, entity_id, attribute, timestamp_hlc)
)
`);
// Indexes for triples
await client.query(`
CREATE INDEX IF NOT EXISTS idx_triples_entity
ON ${this.schema}.triples (collection, entity_id)
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_triples_value
ON ${this.schema}.triples USING GIN (value jsonb_path_ops)
`);
}
}
// --------------------------------------------------------------------------
// Core KV Operations
// --------------------------------------------------------------------------
async get(key: Tuple): Promise<any> {
const result = await this.pool.query(
`SELECT value FROM ${this.schema}.kv_store WHERE key_tuple = $1`,
[JSON.stringify(key)]
);
if (result.rows.length === 0) {
return undefined;
}
return result.rows[0].value;
}
async set(key: Tuple, value: any): Promise<void> {
await this.pool.query(
`
INSERT INTO ${this.schema}.kv_store (key_tuple, value)
VALUES ($1, $2)
ON CONFLICT (key_tuple) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW()
`,
[JSON.stringify(key), JSON.stringify(value)]
);
// Denormalize if enabled
if (this.enableDenormalization) {
await this.denormalizeWrite(key, value);
}
}
async delete(key: Tuple): Promise<void> {
await this.pool.query(
`DELETE FROM ${this.schema}.kv_store WHERE key_tuple = $1`,
[JSON.stringify(key)]
);
// Clean up denormalized data
if (this.enableDenormalization) {
await this.denormalizeDelete(key);
}
}
async *scan(options: ScanOptions): AsyncGenerator<[Tuple, any]> {
const { prefix, start, end, limit, reverse } = options;
let query = `SELECT key_tuple, value FROM ${this.schema}.kv_store WHERE 1=1`;
const params: any[] = [];
let paramIndex = 1;
// Prefix filter using JSONB containment
if (prefix && prefix.length > 0) {
// Use a custom function or build the prefix check
query += ` AND ${this.schema}.tuple_has_prefix(key_tuple, $${paramIndex})`;
params.push(JSON.stringify(prefix));
paramIndex++;
}
// Range filters
if (start) {
query += ` AND key_tuple::text >= $${paramIndex}::text`;
params.push(JSON.stringify(start));
paramIndex++;
}
if (end) {
query += ` AND key_tuple::text < $${paramIndex}::text`;
params.push(JSON.stringify(end));
paramIndex++;
}
// Ordering
query += ` ORDER BY key_tuple::text ${reverse ? 'DESC' : 'ASC'}`;
// Limit
if (limit !== undefined) {
query += ` LIMIT $${paramIndex}`;
params.push(limit);
}
const result = await this.pool.query(query, params);
for (const row of result.rows) {
const key = typeof row.key_tuple === 'string'
? JSON.parse(row.key_tuple)
: row.key_tuple;
yield [key as Tuple, row.value];
}
}
// --------------------------------------------------------------------------
// Transactions
// --------------------------------------------------------------------------
async transact<T>(fn: (tx: PostgresTransaction) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const tx = new PostgresTransaction(client, this.schema, this.enableDenormalization);
const result = await fn(tx);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
// --------------------------------------------------------------------------
// Denormalization
// --------------------------------------------------------------------------
private async denormalizeWrite(key: Tuple, value: any): Promise<void> {
// Key format: ['entities', collection, entityId, attribute, ...]
if (key[0] !== 'entities' || key.length < 4) {
return;
}
const [, collection, entityId, attribute] = key as [string, string, string, string];
// Upsert entity
await this.pool.query(
`
INSERT INTO ${this.schema}.entities (collection, entity_id)
VALUES ($1, $2)
ON CONFLICT (collection, entity_id) DO UPDATE SET updated_at = NOW()
`,
[collection, entityId]
);
// Insert triple (with HLC from value if present)
const hlc = value?._timestamp ?? Date.now().toString();
const actorId = value?._actorId ?? 'unknown';
const tripleValue = value?._value ?? value;
await this.pool.query(
`
INSERT INTO ${this.schema}.triples
(collection, entity_id, attribute, value, timestamp_hlc, actor_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (collection, entity_id, attribute, timestamp_hlc)
DO UPDATE SET value = EXCLUDED.value
`,
[collection, entityId, attribute, JSON.stringify(tripleValue), hlc, actorId]
);
}
private async denormalizeDelete(key: Tuple): Promise<void> {
if (key[0] !== 'entities' || key.length < 4) {
return;
}
const [, collection, entityId, attribute] = key as [string, string, string, string];
// Mark triple as tombstoned (don't actually delete for history)
await this.pool.query(
`
UPDATE ${this.schema}.triples
SET tombstone = TRUE
WHERE collection = $1 AND entity_id = $2 AND attribute = $3
`,
[collection, entityId, attribute]
);
}
// --------------------------------------------------------------------------
// Query Helpers (for direct SQL access)
// --------------------------------------------------------------------------
/**
* Query entities directly via SQL.
* Useful for analytics, admin dashboards, etc.
*/
async queryEntities(
collection: string,
options?: {
where?: Record<string, any>;
orderBy?: string;
limit?: number;
offset?: number;
}
): Promise<any[]> {
if (!this.enableDenormalization) {
throw new Error('Direct queries require enableDenormalization: true');
}
let query = `
SELECT DISTINCT ON (e.entity_id)
e.entity_id,
e.created_at,
e.updated_at,
jsonb_object_agg(t.attribute, t.value) as data
FROM ${this.schema}.entities e
LEFT JOIN ${this.schema}.triples t
ON e.collection = t.collection
AND e.entity_id = t.entity_id
AND t.tombstone = FALSE
WHERE e.collection = $1
`;
const params: any[] = [collection];
let paramIndex = 2;
// Add WHERE conditions
if (options?.where) {
for (const [attr, value] of Object.entries(options.where)) {
query += ` AND EXISTS (
SELECT 1 FROM ${this.schema}.triples t2
WHERE t2.collection = e.collection
AND t2.entity_id = e.entity_id
AND t2.attribute = $${paramIndex}
AND t2.value = $${paramIndex + 1}
AND t2.tombstone = FALSE
)`;
params.push(attr, JSON.stringify(value));
paramIndex += 2;
}
}
query += ` GROUP BY e.entity_id, e.created_at, e.updated_at`;
// Order by
if (options?.orderBy) {
query += ` ORDER BY ${options.orderBy}`;
}
// Pagination
if (options?.limit) {
query += ` LIMIT $${paramIndex}`;
params.push(options.limit);
paramIndex++;
}
if (options?.offset) {
query += ` OFFSET $${paramIndex}`;
params.push(options.offset);
}
const result = await this.pool.query(query, params);
return result.rows.map(row => ({
id: row.entity_id,
...row.data,
_meta: {
createdAt: row.created_at,
updatedAt: row.updated_at,
},
}));
}
// --------------------------------------------------------------------------
// Lifecycle
// --------------------------------------------------------------------------
async close(): Promise<void> {
await this.pool.end();
}
}
// ============================================================================
// Transaction Implementation
// ============================================================================
class PostgresTransaction implements KVStoreTransaction {
private committed = false;
private rolledBack = false;
constructor(
private client: PoolClient,
private schema: string,
private enableDenormalization: boolean
) {}
async get(key: Tuple): Promise<any> {
this.checkState();
const result = await this.client.query(
`SELECT value FROM ${this.schema}.kv_store WHERE key_tuple = $1`,
[JSON.stringify(key)]
);
return result.rows[0]?.value;
}
async set(key: Tuple, value: any): Promise<void> {
this.checkState();
await this.client.query(
`
INSERT INTO ${this.schema}.kv_store (key_tuple, value)
VALUES ($1, $2)
ON CONFLICT (key_tuple) DO UPDATE SET
value = EXCLUDED.value,
updated_at = NOW()
`,
[JSON.stringify(key), JSON.stringify(value)]
);
if (this.enableDenormalization) {
await this.denormalizeWrite(key, value);
}
}
async delete(key: Tuple): Promise<void> {
this.checkState();
await this.client.query(
`DELETE FROM ${this.schema}.kv_store WHERE key_tuple = $1`,
[JSON.stringify(key)]
);
}
async *scan(options: ScanOptions): AsyncGenerator<[Tuple, any]> {
this.checkState();
// Same implementation as main adapter but using this.client
const { prefix, limit, reverse } = options;
let query = `SELECT key_tuple, value FROM ${this.schema}.kv_store WHERE 1=1`;
const params: any[] = [];
let paramIndex = 1;
if (prefix && prefix.length > 0) {
query += ` AND ${this.schema}.tuple_has_prefix(key_tuple, $${paramIndex})`;
params.push(JSON.stringify(prefix));
paramIndex++;
}
query += ` ORDER BY key_tuple::text ${reverse ? 'DESC' : 'ASC'}`;
if (limit !== undefined) {
query += ` LIMIT $${paramIndex}`;
params.push(limit);
}
const result = await this.client.query(query, params);
for (const row of result.rows) {
yield [row.key_tuple as Tuple, row.value];
}
}
async commit(): Promise<void> {
// Actual commit happens in transact() wrapper
this.committed = true;
}
async rollback(): Promise<void> {
this.rolledBack = true;
throw new Error('Transaction rollback requested');
}
private checkState(): void {
if (this.committed) {
throw new Error('Transaction already committed');
}
if (this.rolledBack) {
throw new Error('Transaction already rolled back');
}
}
private async denormalizeWrite(key: Tuple, value: any): Promise<void> {
if (key[0] !== 'entities' || key.length < 4) {
return;
}
const [, collection, entityId, attribute] = key as [string, string, string, string];
await this.client.query(
`
INSERT INTO ${this.schema}.entities (collection, entity_id)
VALUES ($1, $2)
ON CONFLICT (collection, entity_id) DO UPDATE SET updated_at = NOW()
`,
[collection, entityId]
);
const hlc = value?._timestamp ?? Date.now().toString();
const actorId = value?._actorId ?? 'unknown';
const tripleValue = value?._value ?? value;
await this.client.query(
`
INSERT INTO ${this.schema}.triples
(collection, entity_id, attribute, value, timestamp_hlc, actor_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (collection, entity_id, attribute, timestamp_hlc)
DO UPDATE SET value = EXCLUDED.value
`,
[collection, entityId, attribute, JSON.stringify(tripleValue), hlc, actorId]
);
}
}
```
---
## Part 4: Triplit Server Integration
```typescript
// packages/cojson-storage-postgres/src/server.ts
import { createServer, TriplitServer } from '@triplit/server-core';
import { PostgresStorageAdapter } from './index';
import { Pool } from 'pg';
export interface PostgresTriplitServerOptions {
/** Postgres connection string */
connectionString?: string;
/** Or provide a pre-configured pool */
pool?: Pool;
/** Port to listen on */
port?: number;
/** JWT secret for auth */
jwtSecret: string;
/** CRDT plugins to register */
plugins?: any[];
/** Enable verbose logging */
verboseLogs?: boolean;
}
export async function createPostgresTriplitServer(
options: PostgresTriplitServerOptions
): Promise<TriplitServer> {
// Create pool if not provided
const pool = options.pool ?? new Pool({
connectionString: options.connectionString,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
// Create storage adapter
const storage = new PostgresStorageAdapter({
pool,
enableDenormalization: true,
});
// Initialize schema
await storage.initialize();
// Create Triplit server with Postgres storage
const server = createServer({
storage,
jwtSecret: options.jwtSecret,
plugins: options.plugins ?? [],
verboseLogs: options.verboseLogs ?? false,
});
return server;
}
// Convenience function for common setup
export async function startPostgresTriplitServer(
options: PostgresTriplitServerOptions
): Promise<{ server: TriplitServer; close: () => Promise<void> }> {
const server = await createPostgresTriplitServer(options);
const port = options.port ?? 8080;
const httpServer = server.listen(port);
console.log(`Triplit server (Postgres) running on port ${port}`);
return {
server,
close: async () => {
httpServer.close();
await server.close();
},
};
}
```
---
## Part 5: Usage Examples
### Basic Server Setup
```typescript
// server.ts
import { startPostgresTriplitServer } from '@triplit/storage-postgres';
import { GCounterPlugin, RGAListPlugin } from './plugins';
async function main() {
const { server, close } = await startPostgresTriplitServer({
connectionString: process.env.DATABASE_URL,
port: 6543,
jwtSecret: process.env.JWT_SECRET!,
plugins: [GCounterPlugin, RGAListPlugin],
verboseLogs: true,
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down...');
await close();
process.exit(0);
});
}
main().catch(console.error);
```
### Direct SQL Queries
```typescript
// admin-api.ts
import { PostgresStorageAdapter } from '@triplit/storage-postgres';
import { Pool } from 'pg';
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const storage = new PostgresStorageAdapter({ pool, enableDenormalization: true });
// Query todos via SQL (bypassing Triplit for read-only analytics)
async function getTodoStats() {
const result = await pool.query(`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE value->>'completed' = 'true') as completed,
COUNT(*) FILTER (WHERE value->>'completed' = 'false') as pending
FROM triplit.triples
WHERE collection = 'todos'
AND attribute = 'completed'
AND tombstone = FALSE
`);
return result.rows[0];
}
// Use the helper for structured queries
async function getRecentTodos(limit = 10) {
return storage.queryEntities('todos', {
orderBy: 'e.updated_at DESC',
limit,
});
}
// Full-text search using Postgres
async function searchTodos(searchTerm: string) {
const result = await pool.query(`
SELECT DISTINCT e.entity_id, t.value->>'text' as text
FROM triplit.entities e
JOIN triplit.triples t ON e.entity_id = t.entity_id
WHERE e.collection = 'todos'
AND t.attribute = 'text'
AND t.value->>'text' ILIKE $1
AND t.tombstone = FALSE
`, [`%${searchTerm}%`]);
return result.rows;
}
```
### Client Connection
```typescript
// client.ts
import { TriplitClient } from '@triplit/client';
import { schema } from './schema';
const client = new TriplitClient({
serverUrl: 'http://localhost:6543',
token: 'your-jwt-token',
schema,
});
// Normal Triplit operations - storage is transparent
await client.insert('todos', {
id: 'todo-1',
text: 'Buy groceries',
completed: false,
});
// Subscribe to changes
const unsubscribe = client.subscribe(
client.query('todos').where('completed', '=', false),
(results) => {
console.log('Pending todos:', results);
}
);
```
### Docker Compose Setup
```yaml
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: triplit
POSTGRES_PASSWORD: triplit
POSTGRES_DB: triplit
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U triplit"]
interval: 5s
timeout: 5s
retries: 5
triplit:
build: .
environment:
DATABASE_URL: postgres://triplit:triplit@postgres:5432/triplit
JWT_SECRET: your-secret-here
PORT: 6543
ports:
- "6543:6543"
depends_on:
postgres:
condition: service_healthy
volumes:
postgres_data:
```
---
## Part 6: SQL Compatibility Layer (Optional)
For scenarios where you want to use Triplit's existing SQLite-based code paths:
```typescript
// packages/cojson-storage-postgres/src/sql-compat.ts
/**
* Adapter that translates SQLite SQL to Postgres SQL.
* Useful for compatibility with Triplit's existing SQL-based storage code.
*/
export class PostgresSQLiteCompatAdapter implements AsyncSQLiteAdapter {
constructor(private pool: Pool) {}
async execute(sql: string, args?: any[]): Promise<QueryResult> {
const pgSql = this.translateSQL(sql);
const pgArgs = this.translateArgs(args);
const result = await this.pool.query(pgSql, pgArgs);
return {
rows: result.rows,
rowsAffected: result.rowCount ?? 0,
};
}
normalizeResults(results: any): { key: string; value: string }[] {
if (!results.rows) return [];
return results.rows.map((row: any) => ({
key: row.key ?? row.key_tuple,
value: typeof row.value === 'string' ? row.value : JSON.stringify(row.value),
}));
}
async transact(fn: (adapter: AsyncSQLiteExecutor) => Promise<void>): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const executor: AsyncSQLiteExecutor = {
execute: async (sql, args) => {
const pgSql = this.translateSQL(sql);
const pgArgs = this.translateArgs(args);
const result = await client.query(pgSql, pgArgs);
return { rows: result.rows, rowsAffected: result.rowCount ?? 0 };
},
};
await fn(executor);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async close(): Promise<void> {
await this.pool.end();
}
// --------------------------------------------------------------------------
// SQL Translation
// --------------------------------------------------------------------------
private translateSQL(sql: string): string {
let pgSql = sql;
// Replace ? placeholders with $1, $2, etc.
let paramIndex = 0;
pgSql = pgSql.replace(/\?/g, () => `$${++paramIndex}`);
// SQLite -> Postgres syntax differences
const translations: [RegExp, string][] = [
// AUTOINCREMENT -> SERIAL (already handled in schema)
[/\bAUTOINCREMENT\b/gi, ''],
// INTEGER PRIMARY KEY -> SERIAL PRIMARY KEY
[/\bINTEGER PRIMARY KEY\b/gi, 'SERIAL PRIMARY KEY'],
// BOOLEAN handling (SQLite uses 0/1)
[/\b(\w+)\s*=\s*1\b/g, '$1 = TRUE'],
[/\b(\w+)\s*=\s*0\b/g, '$1 = FALSE'],
// String concatenation
[/\|\|/g, '||'], // Same in Postgres, but ensure it's not confused
// GLOB -> LIKE (approximate)
[/\bGLOB\b/gi, 'LIKE'],
// datetime functions
[/\bdatetime\s*\(\s*'now'\s*\)/gi, 'NOW()'],
[/\bstrftime\s*\(\s*'%s'\s*,\s*'now'\s*\)/gi, "EXTRACT(EPOCH FROM NOW())"],
// IFNULL -> COALESCE
[/\bIFNULL\s*\(/gi, 'COALESCE('],
// GROUP_CONCAT -> STRING_AGG
[/\bGROUP_CONCAT\s*\(\s*(\w+)\s*\)/gi, 'STRING_AGG($1::text, \',\')'],
// JSON functions
[/\bjson_extract\s*\(\s*(\w+)\s*,\s*'\$\.(\w+)'\s*\)/gi, "$1->>'$2'"],
[/\bjson\s*\(/gi, 'to_jsonb('],
];
for (const [pattern, replacement] of translations) {
pgSql = pgSql.replace(pattern, replacement);
}
return pgSql;
}
private translateArgs(args?: any[]): any[] {
if (!args) return [];
return args.map(arg => {
// Convert booleans to Postgres format
if (typeof arg === 'boolean') {
return arg;
}
// Convert objects to JSON strings
if (typeof arg === 'object' && arg !== null) {
return JSON.stringify(arg);
}
return arg;
});
}
}
```
---
## Part 7: Migration Tool
```typescript
// packages/cojson-storage-postgres/src/migrate.ts
import { Pool } from 'pg';
import Database from 'better-sqlite3';
interface MigrationOptions {
/** SQLite database path */
sqlitePath: string;
/** Postgres connection string */
postgresUrl: string;
/** Batch size for inserts */
batchSize?: number;
/** Progress callback */
onProgress?: (processed: number, total: number) => void;
}
/**
* Migrate data from SQLite Triplit storage to Postgres.
*/
export async function migrateToPostgres(options: MigrationOptions): Promise<void> {
const { sqlitePath, postgresUrl, batchSize = 1000, onProgress } = options;
// Open SQLite database
const sqlite = new Database(sqlitePath, { readonly: true });
// Connect to Postgres
const pool = new Pool({ connectionString: postgresUrl });
try {
// Get total count
const countResult = sqlite.prepare('SELECT COUNT(*) as count FROM kv_store').get() as { count: number };
const total = countResult.count;
console.log(`Migrating ${total} records from SQLite to Postgres...`);
// Create tables in Postgres
const client = awai