---
title: Computing on streams
tags: docs
lastupdated: 3 june 2020
---
# Computing on streams
## pre-amble
The draft laid out in the doc [Scheduling computation](/sajdUniPSwen4S3tHHXUmA) assumed that each task defined in a `Gate` was computationally intensive and as such, a conventional view of batching the input data into constants was a good starting point.
When we further specify the asynchronous messaging as a demo1 application, then the "computation" is only a trivial routing messages from sender to receiver, and a low-latency is desirable.
In this document we explore "dynamic gates", and draw from lessons learnt in Apache Kafka to look at micro-batching (batches of one message). Gates must still remain accountable for computational outputs, but rather than assigning constants to input pins, we can now also assign a `LogId` of single writer log to an input pin.
## Streams
**Definition**: An application _thread_ $A(l_\alpha; sk, rk)$ is a set of single-writer-logs $l_\alpha$ with $\alpha$ over all users' `logId`s and shared service key $sk$ and read key $rk$.
**Notation**: A single-writer-log $l_\alpha$ with `logId` $\alpha$ is an ordered sequence of `record`s, $\alpha_i$, each signed by the private key of $\alpha$, and we note $l_\alpha = [\alpha_i]$.
A node who has the service key $sk$ of a thread $A(.; sk, rk)$ can iterate and copy all records of all logs in the thread. A node who additionally has the read key $rk$ of the thread $A$ can decrypt and read the records of the logs in this thread.
**Definition**: A _gate_ is a contract which embodies a computational task. Multiple rings can be assigned to the same gate for redundancy, but only the primary ring can write to the output of the gate. For more details see [Gates](/DPnL7kmLTTubDzFRshfXsg).
Gates exist in three types: producer, processor, or consumer. Producer gates are tasked to produce batches of records written by the users in the application thread $A(l_\alpha;.,.)$. Processor gates can process these batches, and output newly resulting batches referencing the input batches. Consumer gates lastly can ingest batches and output a collection of results for users $\alpha$.
**Definition**: A _layer_ $L$ is a set of gates of the same type. The layer types correspond to the gate types (a producer, a processor or a consumer layer).
**Definition**: A _databus_ is a set of uni-directional connections between two different layers $L_i \rightarrow L_j$ (with $i \neq j$). Batches flowing through these connections are records appended to multi-writer-logs (MW-logs) written by gates in $L_i$ and read by gates in $L_j$. All MW-logs in a databus share the same service key and read key.
**Definition**: An application _stream_:
- starts with a layer of `producer` contracts $P_k$, which batch records from the users (from `logId`s)
- can optionally process or compute over the batches with a layer of `processor` contracts $G^i_{l_i}$, where $i$ counts over the layers of computation
- ends with a layer of `consumer` contracts $C_m$, which batch results for the users to pull
- we note a stream $\mathcal{S}(P_k,G^0_{l_0},G^1_{l_1}, ..., G^N_{l_N},C_m)$
## case study: Text messaging application with asynchronous message delivery
For this example, imagine a text messaging application $T(l_\alpha, l_\beta; sk, rk)$ running between Alice and Bob with respective `logId` $\alpha$ and $\beta$.
The text messaging application $T$ can use an asynchronous messaging application, called Notifications $N$, to cache and deliver messages from Alice to Bob, when the participants aren't online simultaneously.
We assume that all avatars, like Alice and Bob, are subscribed to notifications from the stream $N$. In this example $N$ is simplified to have only three producers and three consumers, $N(P_1, P_2, P_3, C_1, C_2, C_3)$. The databus connecting the producers and consumers in $N$ is fully connected. There are no processing layers needed in the notification application.
---
## Stream APIs
### Producer API
from client to streaming node
```
// here's an alpha_1
message ProduceRecordRequest {
Header header;
Body body;
message Body {
threadId; // thread associated with l_\alpha
serviceKey;
readKey;
Log log;
Log.Record record;
<!-- Payment payment; -->
bytes producerId; // contract address of producer
message Log {
bytes logId; // l_\alpha
bytes pubkey; // pubkey_\alpha
bytes head;
message Record {
bytes recordNode // contains PreviousLog; Signature; pubKey
bytes eventNode
bytes headerNode
bytes bodyNode
}
}
}
}
```
streaming node can check
1. is new LogId? is direct child of head : accept as starting point (go get prior logs)
1. Log.pubKey == record.pubKey; logId = multihash(Log.pubKey);
Header.pubKey == Log.pubKey (TODO: check in go-threads whether similarly enforced)
1. is record signature valid? (checked by threadsDB)
1. decode bodyNode as a micro-batch
```
type MicroBatch struct {
uint entry
Payment payment
bytes Payload
}
```
```
ProduceRecordResponse
```
from streaming node to application
```
CheckRecord
BeginBatch (give info on the databus layout)
DeliverRecord
EndBatch
Commit
```
```
type Batch struct {
[]pins
[]signatures
}
```
### Payment APIs
Deposit and withdraw from payment channel is done over `RelayMetaTransaction`
First Avatar must open a payment channel.
Payment receipts are to be included in the functional requests directly.
```
message Payment {
bytes avatar; // sender avatar
bytes pubkey;
bytes recipient; // recipient gate
uint nonce;
uint previousBalance;
uint totalPaid;
bytes signature; // avatar needs interface to check signature
}
```
between application and stream
### Relayer APIs
```
message GasPriceRequest
message RelayMetaTransaction {
Header header;
Body body;
message Header {
bytes avatar; // check pubkey is a member of avatar
bytes pubkey;
bytes signature;
}
message Body {
bytes metatransaction;
}
}
```
### Consumer API
```
ConsumeRecordRequest
ConsumeRecordResponse
```
### Processor API
---
Alice(Avatar) generates a `logId` $l_\alpha$. Alice registers a constant `tokenId` for her `logId` and is the owner of this `tokenId`. Registers on a producer gate an input pin with her tokenId (restrictions apply: logId is within range, max logs accepted); when registering the pin, Alice needs to open a payment channel as well.
Similarly, Bob can register an output pin for his `logId` $l_\beta$ to the appropriate consumer, and open a payment channel while doing so.
---
`ReserveCacheRequest` to ring member; you do payment for total data size
```
message ReserveCacheRequest {
Header header;
Body body;
message Body {
bytes payment;
repeated Link link (size, name, cid)
}
}
```
Alice `ProduceRecordRequest` to a ring member
```
message Header {
bytes signature;
bytes publicKey;
}
message ProduceRecordRequest {
Header header = 1;
Body body = 2;
message Body {
threadId
serviceKey
readKey
Log log
Log.Record record
bytes payment
}
}
```
Node: body = bytes payload: handover to Application
record body contains "mosaic-message" (CBOR / PB)
bodyNode raw []byte
is then CBOR encoded Notification
```
struct NotificationRequest {
from
to
cids
ttl
payment
}
```
from Node stack layer to application which runs in docker
```
message ExecuteMosaicMessageRequest {
<includes CBOR encoded Notification>
}
```
# open thoughts
does the
application:
consensus:
# Sentence homeless-shelter
An avatar registers a `logId` as a constant `tokenId`. The avatar attaches on a `Producer` and deposits tokens in a payment channel for this `logId`.
The avatar can open a payment channel with the Producer directly, first with a metatransaction, and on subsequent calls to the producer, pass incremental payment receipts to the producer. The channel can be unilaterlally close by the avatar with a metatransaction, returning unspent funds.
---
The producer rings in async messaging serve as the notification outbox; outbox also caches blocks referenced in the notification.
---
```
message Record {
bytes recordNode = 1;
bytes eventNode = 2;
bytes headerNode = 3;
bytes bodyNode = 4;
}
```