--- title: Computing on streams tags: docs lastupdated: 3 june 2020 --- # Computing on streams ## pre-amble The draft laid out in the doc [Scheduling computation](/sajdUniPSwen4S3tHHXUmA) assumed that each task defined in a `Gate` was computationally intensive and as such, a conventional view of batching the input data into constants was a good starting point. When we further specify the asynchronous messaging as a demo1 application, then the "computation" is only a trivial routing messages from sender to receiver, and a low-latency is desirable. In this document we explore "dynamic gates", and draw from lessons learnt in Apache Kafka to look at micro-batching (batches of one message). Gates must still remain accountable for computational outputs, but rather than assigning constants to input pins, we can now also assign a `LogId` of single writer log to an input pin. ## Streams **Definition**: An application _thread_ $A(l_\alpha; sk, rk)$ is a set of single-writer-logs $l_\alpha$ with $\alpha$ over all users' `logId`s and shared service key $sk$ and read key $rk$. **Notation**: A single-writer-log $l_\alpha$ with `logId` $\alpha$ is an ordered sequence of `record`s, $\alpha_i$, each signed by the private key of $\alpha$, and we note $l_\alpha = [\alpha_i]$. A node who has the service key $sk$ of a thread $A(.; sk, rk)$ can iterate and copy all records of all logs in the thread. A node who additionally has the read key $rk$ of the thread $A$ can decrypt and read the records of the logs in this thread. **Definition**: A _gate_ is a contract which embodies a computational task. Multiple rings can be assigned to the same gate for redundancy, but only the primary ring can write to the output of the gate. For more details see [Gates](/DPnL7kmLTTubDzFRshfXsg). Gates exist in three types: producer, processor, or consumer. Producer gates are tasked to produce batches of records written by the users in the application thread $A(l_\alpha;.,.)$. Processor gates can process these batches, and output newly resulting batches referencing the input batches. Consumer gates lastly can ingest batches and output a collection of results for users $\alpha$. **Definition**: A _layer_ $L$ is a set of gates of the same type. The layer types correspond to the gate types (a producer, a processor or a consumer layer). **Definition**: A _databus_ is a set of uni-directional connections between two different layers $L_i \rightarrow L_j$ (with $i \neq j$). Batches flowing through these connections are records appended to multi-writer-logs (MW-logs) written by gates in $L_i$ and read by gates in $L_j$. All MW-logs in a databus share the same service key and read key. **Definition**: An application _stream_: - starts with a layer of `producer` contracts $P_k$, which batch records from the users (from `logId`s) - can optionally process or compute over the batches with a layer of `processor` contracts $G^i_{l_i}$, where $i$ counts over the layers of computation - ends with a layer of `consumer` contracts $C_m$, which batch results for the users to pull - we note a stream $\mathcal{S}(P_k,G^0_{l_0},G^1_{l_1}, ..., G^N_{l_N},C_m)$ ## case study: Text messaging application with asynchronous message delivery For this example, imagine a text messaging application $T(l_\alpha, l_\beta; sk, rk)$ running between Alice and Bob with respective `logId` $\alpha$ and $\beta$. The text messaging application $T$ can use an asynchronous messaging application, called Notifications $N$, to cache and deliver messages from Alice to Bob, when the participants aren't online simultaneously. We assume that all avatars, like Alice and Bob, are subscribed to notifications from the stream $N$. In this example $N$ is simplified to have only three producers and three consumers, $N(P_1, P_2, P_3, C_1, C_2, C_3)$. The databus connecting the producers and consumers in $N$ is fully connected. There are no processing layers needed in the notification application. --- ## Stream APIs ### Producer API from client to streaming node ``` // here's an alpha_1 message ProduceRecordRequest { Header header; Body body; message Body { threadId; // thread associated with l_\alpha serviceKey; readKey; Log log; Log.Record record; <!-- Payment payment; --> bytes producerId; // contract address of producer message Log { bytes logId; // l_\alpha bytes pubkey; // pubkey_\alpha bytes head; message Record { bytes recordNode // contains PreviousLog; Signature; pubKey bytes eventNode bytes headerNode bytes bodyNode } } } } ``` streaming node can check 1. is new LogId? is direct child of head : accept as starting point (go get prior logs) 1. Log.pubKey == record.pubKey; logId = multihash(Log.pubKey); Header.pubKey == Log.pubKey (TODO: check in go-threads whether similarly enforced) 1. is record signature valid? (checked by threadsDB) 1. decode bodyNode as a micro-batch ``` type MicroBatch struct { uint entry Payment payment bytes Payload } ``` ``` ProduceRecordResponse ``` from streaming node to application ``` CheckRecord BeginBatch (give info on the databus layout) DeliverRecord EndBatch Commit ``` ``` type Batch struct { []pins []signatures } ``` ### Payment APIs Deposit and withdraw from payment channel is done over `RelayMetaTransaction` First Avatar must open a payment channel. Payment receipts are to be included in the functional requests directly. ``` message Payment { bytes avatar; // sender avatar bytes pubkey; bytes recipient; // recipient gate uint nonce; uint previousBalance; uint totalPaid; bytes signature; // avatar needs interface to check signature } ``` between application and stream ### Relayer APIs ``` message GasPriceRequest message RelayMetaTransaction { Header header; Body body; message Header { bytes avatar; // check pubkey is a member of avatar bytes pubkey; bytes signature; } message Body { bytes metatransaction; } } ``` ### Consumer API ``` ConsumeRecordRequest ConsumeRecordResponse ``` ### Processor API --- Alice(Avatar) generates a `logId` $l_\alpha$. Alice registers a constant `tokenId` for her `logId` and is the owner of this `tokenId`. Registers on a producer gate an input pin with her tokenId (restrictions apply: logId is within range, max logs accepted); when registering the pin, Alice needs to open a payment channel as well. Similarly, Bob can register an output pin for his `logId` $l_\beta$ to the appropriate consumer, and open a payment channel while doing so. --- `ReserveCacheRequest` to ring member; you do payment for total data size ``` message ReserveCacheRequest { Header header; Body body; message Body { bytes payment; repeated Link link (size, name, cid) } } ``` Alice `ProduceRecordRequest` to a ring member ``` message Header { bytes signature; bytes publicKey; } message ProduceRecordRequest { Header header = 1; Body body = 2; message Body { threadId serviceKey readKey Log log Log.Record record bytes payment } } ``` Node: body = bytes payload: handover to Application record body contains "mosaic-message" (CBOR / PB) bodyNode raw []byte is then CBOR encoded Notification ``` struct NotificationRequest { from to cids ttl payment } ``` from Node stack layer to application which runs in docker ``` message ExecuteMosaicMessageRequest { <includes CBOR encoded Notification> } ``` # open thoughts does the application: consensus: # Sentence homeless-shelter An avatar registers a `logId` as a constant `tokenId`. The avatar attaches on a `Producer` and deposits tokens in a payment channel for this `logId`. The avatar can open a payment channel with the Producer directly, first with a metatransaction, and on subsequent calls to the producer, pass incremental payment receipts to the producer. The channel can be unilaterlally close by the avatar with a metatransaction, returning unspent funds. --- The producer rings in async messaging serve as the notification outbox; outbox also caches blocks referenced in the notification. --- ``` message Record { bytes recordNode = 1; bytes eventNode = 2; bytes headerNode = 3; bytes bodyNode = 4; } ```