Try   HackMD

Triplit Plugin

Motivation:

Triplit provides a powerful foundation for building real-time, offline-capable applications using basic data types (string, number, boolean, date, set). However, to unlock its full potential for richer collaborative experiences like those found in document editors, design tools, or applications with specialized counters or ordered lists, we need a way to extend its core data modeling capabilities.

Currently, adding new data types with custom merge logic (like CRDTs) requires modifying the core @triplit/db, @triplit/client, and @triplit/server-core packages. This presents a high barrier for both the core team and the community to introduce specialized types tailored to specific application needs.

This proposal outlines a Plugin API designed to allow developers to define, implement, and register custom CRDTs or other mergeable data structures within Triplit. This would empower developers to:

  1. Implement complex collaborative features natively within Triplit's data model.
  2. Leverage specialized CRDTs optimized for specific use cases (e.g., performance characteristics of different sequence CRDTs).
  3. Foster a community ecosystem where different CRDT plugins can be shared and reused.
  4. Keep the Triplit core leaner while enabling broad extensibility.

Goals:

  1. Define a Clear Interface: Specify a robust TypeScript interface (CRDTPluginInterface) outlining the contract between Triplit core and a custom data type plugin.
  2. Enable Custom Logic: Allow plugins to define their unique state representation (TState), CRDT merge algorithm (merge), and client-facing mutation logic (generateChangePayload).
  3. Flexible Storage Strategies: Explicitly support both:
    • State Blob: Storing the CRDT state as a single opaque value in the KV store (simpler, suitable for types like Counters).
    • Decomposed KV Representation (DKVR): Allowing the CRDT state to be mapped across multiple KV entries (more complex, potentially more efficient for types like Lists or Text).
  4. Core Integration: Define how Triplit's schema system, storage layer, transaction/merge logic, client API, sync protocol, and query engine will interact with registered plugins.
  5. Maintain Guarantees: Ensure the plugin system upholds Triplit's core principles of offline-first operation, optimistic updates, HLC timestamping for consistency, and real-time synchronization.
  6. Basic Query Support: Provide optional hooks within the plugin interface to enable basic filtering and sorting capabilities for custom types within Triplit queries.

Non-Goals (Initial Version):

  • Highly Optimized IVM for DKVR: While the API allows DKVR, deeply optimizing the core Incremental View Maintenance for arbitrary decomposed structures is complex and deferred. Initial IVM might treat any change within a DKVR attribute's KV prefix as potentially invalidating dependent queries.
  • Automatic Client Proxy Generation: Automatically mapping standard JavaScript methods (e.g., Array.prototype.push) to custom CRDT operations is complex. The initial focus is on supporting explicit API methods initiated by application code.
  • Complex Validation/UI Metadata in Core API: Integrating external validation libraries (like Standard Schema) or UI component bindings directly. This functionality is better suited for complementary schema extension libraries built on top of this core data API.

Core Concepts & Integration Strategy:

This API leverages Triplit's existing architecture:

  • KV Store Abstraction: Plugins define how their TState maps onto TStoredValue (or multiple KV pairs for DKVR), interacting via the standard KVStore interface.
  • Schema Definition: Plugins provide a schemaDefinition function used by Schema as S (e.g., S.MyCounter = MyCounterPlugin.schemaDefinition).
  • Merge Logic: The plugin's merge function encapsulates the CRDT algorithm. Triplit's EntityStore orchestrates fetching current state, calling plugin.merge with states and HLC timestamps, and writing the result back atomically within a transaction. For DKVR, the merge function receives the KVStoreTransaction to perform multi-key operations.
  • Client API: Applications will use explicit methods (e.g., client.incrementCounter(...)) which internally call plugin.generateChangePayload to create the correct delta/state (TChangePayload) for the transaction.
  • Synchronization: The TChangePayload is stored in the outbox and transmitted. The server uses the corresponding registered plugin's merge function to apply changes.
  • Querying: Basic filtering/sorting is enabled via optional plugin methods (compare, satisfiesOperator), which may require state deserialization.

Proposed API: CRDTPluginInterface<TState, TSchemaOptions, TStoredValue, TChangePayload>

import { DataType, TypeConfig, Timestamp, KVStoreTransaction, DBEntity, Tuple } from '@triplit/db';

/** Structure for DKVR storage operations. */
export type DKVRMutationPayload = {
  sets?: Map<Tuple, any>; // Map<KeySuffixTuple, Value>
  deletes?: Set<Tuple>;  // Set<KeySuffixTuple>
};

/** Input type for decodeFromStorage when using DKVR. */
export type DKVRScanResult = AsyncIterable<[keySuffix: Tuple, value: any]>;

/**
 * Defines the contract for a custom CRDT data type plugin.
 *
 * @template TState - The canonical in-memory JavaScript representation (e.g., number for Counter).
 * @template TSchemaOptions - User-defined options via Schema builder (e.g., S.MyType(options)).
 * @template TStoredValue - How state is stored in KV (single value for 'blob', iterable value type for 'decomposed').
 * @template TChangePayload - Representation of a change/delta for merge/sync (often TState for LWW, specific Op for op-based).
 */
export interface CRDTPluginInterface<
  TState,
  TSchemaOptions = any,
  TStoredValue = any,
  TChangePayload = TState
> {
  /** REQUIRED: Unique string ID (e.g., "pn-counter", "myorg:rga-list"). Use namespacing. */
  readonly typeId: string;

  // --- Schema Definition & Validation ---
  /** REQUIRED: Returns the base Triplit DataType for use in S.Collections({...}). */
  schemaDefinition: (options?: TSchemaOptions) => DataType;
  /** OPTIONAL: Validates options passed to schemaDefinition. Returns error string if invalid. */
  validateSchemaOptions?: (options: TSchemaOptions) => string | undefined;

  // --- Core CRDT Logic ---
  /** REQUIRED: Returns the default/initial JS state (TState). */
  defaultValue: (config: TypeConfig) => TState;

  /**
   * REQUIRED: The core CRDT merge function. Combines incoming changes with current state using HLC timestamps.
   * For DKVR strategy, this function *must* use the provided kvTx for atomic multi-key reads/writes.
   */
  merge(
    currentState: TState | undefined,
    incomingChangePayload: TChangePayload,
    currentTimestamp: Timestamp | undefined,
    incomingTimestamp: Timestamp,
    actorId: string,
    kvTx?: KVStoreTransaction // REQUIRED if storageStrategy is 'decomposed'
  ): TState;

  // --- Storage Strategy & Serialization ---
  /** REQUIRED: Specifies 'blob' (single KV value) or 'decomposed' (multiple KV values). */
  readonly storageStrategy: 'blob' | 'decomposed';

  /**
   * REQUIRED: Serializes JS TState for storage.
   * - 'blob': Returns TStoredValue (single value).
   * - 'decomposed': Returns DKVRMutationPayload { sets?: Map<SuffixTuple, Val>, deletes?: Set<SuffixTuple> }.
   */
  encodeForStorage: (state: TState) => TStoredValue | DKVRMutationPayload;

  /**
   * REQUIRED: Deserializes from storage to JS TState.
   * - 'blob': Receives TStoredValue.
   * - 'decomposed': Receives DKVRScanResult (AsyncIterable over KV pairs for the attribute).
   */
  decodeFromStorage: (storedValue: TStoredValue | DKVRScanResult) => TState;

  /** OPTIONAL (Recommended for Decomposed KV): Hints at the KV key structure. Helps core fetch data efficiently. */
  readonly kvSchema?: Record<string, Tuple[]>; // e.g., { meta: [['_meta']], element: [['_element', '*']] }

  // --- Client Interaction ---
  /** REQUIRED (for mutations): Translates user API calls into the internal TChangePayload for merge/sync. */
  generateChangePayload(
    mutationType: string, // Plugin-defined e.g., "increment"
    currentState: TState | undefined,
    args: any[], // Arguments from client API call
    actorId: string
  ): TChangePayload;

  // --- Optional Querying Hooks ---
  /** OPTIONAL: Provides comparison for `ORDER BY`. */
  compare?: (stateA: TState, stateB: TState) => number;
  /** OPTIONAL: Implements filtering logic for `WHERE`. */
  satisfiesOperator?: (state: TState, operator: string, compareValue: any) => boolean;
  /** OPTIONAL: Lists operators supported by `satisfiesOperator`. Defaults to =, !=, isDefined. */
  supportedOperators?: () => ReadonlyArray<string>;
  /** OPTIONAL: Extracts a primitive for basic comparison (e.g., counter value). */
  getComparableValue?: (state: TState) => string | number | boolean | null;

  // --- Optional Optimization Hooks ---
  /** OPTIONAL: Calculates delta for efficient sync. Defaults to sending full state/changes. */
  diff?: (oldState: TState, newState: TState) => TChangePayload | null;
  /** OPTIONAL (for Decomposed KV): Helps IVM identify relevant low-level KV changes. */
  isChangeRelevantToState?: (crdtAttributePath: string[], changedKvKey: string[]) => boolean;
}

Integration Points (Revised):

  1. Registration: Via plugins array in TriplitClient/createServer options. Core maintains a Map<typeId, CRDTPluginInterface>.
  2. Schema: S.<TypeName> calls plugin.schemaDefinition. Validation uses plugin.validateSchemaOptions.
  3. Storage/Merge (EntityStoreKV):
    • Reads (getEntity): Checks plugin.storageStrategy. If 'blob', uses kv.get then plugin.decodeFromStorage. If 'decomposed', uses kv.scan (with prefix derived from attribute path) and passes iterator to plugin.decodeFromStorage.
    • Writes (applyChangesWithTimestamp): Identifies custom type via schema. Fetches current state appropriately (get or scan). Calls plugin.merge (passing kvTx if 'decomposed'). Calls plugin.encodeForStorage. Writes result back via kvTx.set (for 'blob') or applies the DKVRMutationPayload's sets/deletes atomically using kvTx.set/delete (for 'decomposed').
  4. Client API/Transactions:
    • Recommend Explicit Methods: client.incrementCounter(...), client.listPush(...), etc.
    • Inside client.transact: Read current state -> Call plugin.generateChangePayload -> Call tx.update with raw payload + internal flag _crdtUpdate: true.
    • tx.update: If _crdtUpdate flag is present, store raw payload in internal diff map.
  5. Sync Layer: Uses plugin.encodeForStorage or plugin.diff for CHANGES message payload. Server receives payload, passes to EntityStore which invokes plugin.merge. Handles new message types if needed for op-based sync.
  6. Query Engine/IVM: Uses optional query hooks (compare, satisfiesOperator, etc.) potentially involving decodeFromStorage. IVM integration for DKVR relies on future core work, potentially guided by isChangeRelevantToState.

Storage Strategy Implications:

  • State Blob: Simpler plugin implementation. encode/decode handle single values. merge usually doesn't need kvTx. Efficient for small/infrequently updated CRDTs (Counters). Querying internals is slow. IVM treats it atomically.
  • Decomposed KV (DKVR): Advanced. encode returns sets/deletes. decode handles iterators. merge requires kvTx. Enables granular updates/sync, potentially better for large/complex types (Lists, Text). Querying/IVM integration is significantly more complex for both the plugin author and requires core Triplit enhancements for efficiency.

Example Plugin Sketch (PN-Counter using State Blob):

(Similar to previous example, demonstrating the simpler 'blob' strategy)

const MyPNCounterPlugin: CRDTPluginInterface<CounterState, CounterOptions, string, CounterState> = {
  typeId: 'pn-counter',
  storageStrategy: 'blob', // Explicitly state blob strategy
  schemaDefinition: (options?) => S.Counter(options),
  defaultValue: (config) => ({ p: {}, n: {} }),
  merge: (current, incoming /* CounterState */, currentTs, incomingTs, actorId) => {
    // LWW merge based on attribute HLC timestamp
    if (!currentTs || HybridLogicalClock.compare(incomingTs, currentTs) > 0) {
      return incoming;
    }
    return current;
  },
  encodeForStorage: (state) => JSON.stringify(state), // Store as JSON string
  decodeFromStorage: (storedValue: string) => JSON.parse(storedValue),
  generateChangePayload: (mutationType, currentState, args, actorId) => {
    if (mutationType !== 'increment') throw new Error('Unsupported counter op');
    const amount = args[0] as number;
    const newState = structuredClone(currentState ?? { p: {}, n: {} });
    if (amount > 0) newState.p[actorId] = (newState.p[actorId] || 0) + amount;
    else newState.n[actorId] = (newState.n[actorId] || 0) + Math.abs(amount);
    return newState; // Return full new state for LWW merge
  },
  getComparableValue: (state) => /* Calculate value */,
  // ... other optional query hooks
};

Benefits:

  • True Extensibility: Enables core data types beyond primitives.
  • Encapsulated Logic: CRDT complexity lives within plugins.
  • Adaptable Storage: Supports simple blob storage and provides a path for more complex decomposed storage.
  • Community Potential: Allows sharing and standardization of common collaborative types.

Challenges & Open Questions:

  1. DKVR Implementation Details: How should encodeForStorage signal multi-key writes? How should decodeFromStorage reliably receive scanned data? What KV scan options are needed?
  2. IVM for DKVR: What core changes are minimally required for basic IVM support with DKVR? Can isChangeRelevantToState be effective without deeper IVM awareness?
  3. Op-based Sync Protocol: Is a generic CRDT_OP message feasible, or are type-specific messages better? How are ops batched efficiently?
  4. Transactionality of Merge: Ensuring the merge function (especially for DKVR reading/writing multiple keys via kvTx) remains performant within transactions.
  5. Client API for DKVR: How to provide an intuitive client API for complex CRDTs using DKVR without overly exposing internal KV structure?

Next Steps:

  • Gather community feedback on the proposed interface and strategy.
  • Refine the API based on feedback, particularly the encode/decodeForStorage contract for DKVR.
  • Prototype a simple 'blob'-based plugin (like the Counter) to validate core integration points.
  • Begin design discussions on the necessary core IVM enhancements required for efficient DKVR support.