# Wetware Cluster Membership
How indexed queries to a node's routing table are performed remotely, and how the results are efficiently streamed.
## Preliminaries: Cluster Membership and the Routing Table
Wetware nodes form a cluster in which peers maintain a mutual awareness of each other. This mutual awareness is provided by the [`cluster.RoutingTable`](https://github.com/wetware/pkg/blob/v0.1.0/cluster/cluster.go#L37-L43) interface. The routing table maintains an indexed set of [`routing.Record`s](https://github.com/wetware/pkg/blob/v0.1.0/cluster/routing/routing.go#L65-L73), each of which is periodically broadcast by a peer as a form of heartbeat and contains routing information and metadata about its originator.
For convenience, here is `routing.Record`:
```go
// Record is an entry in the routing table.
type Record interface {
// Server ID is a globally unique identifier for
// the server instance. It changes each time the
// server is restarted, which allows "instances"
// to be distinguished from the permanent peer
// identity.
Server() ID
// Peer returns the libp2p peer ID, which is
// derived from the node's public key. In a
// production setting, this identifier will
// be persisted across node restarts.
Peer() peer.ID
// Seq returns the sequence number (i.e. a
// monotonic logical clock) that is used to
// identify the record's version. Only the
// latest Record should be retained.
Seq() uint64
// TTL indicates the timespan within which
// a subsequent record with a higher sequence
// number is expected to be observed by peers.
// The routing table SHOULD drop records whose
// TTL has elapsed.
TTL() time.Duration
// Host is an optional string that provides the
// hostname for the network host that underlies
// the peer. Note that multiple peers may co-
// locate on a single physical host, and that
// the host string MAY be empty.
Host() (string, error)
// Meta returns a list of key-value pairs that
// contain optional, user-provided metadata.
// Common uses include e.g. AWS region.
Meta() (Meta, error)
}
```
The routing table is responsible for expiring stale entries, *i.e.* entries for which either (1) the TTL has expired or (2) a new record with a higher sequence number was received. This provides an inconsistent but available system for propagating and querying cluster-membership state. As a design principle, Wetware is a [PA/EL system](https://en.wikipedia.org/wiki/PACELC_theorem) that supports other consistency tradeoffs at the application layer.
## Introducing Indexed Queries
Users and applications can perform indexed queries against the routing table to obtain the set of peers matching some constraints. For instance, at the command line:
```bash
ww ls WHERE region=us-east-1 # metadata field 'region'
```
or
```bash
ww ls WHERE host=my-hostname # hostname
```
In both cases, queries are prefixed-matched against entries in the routing table, and all matching entries are streamed back to the client. Streaming is performed via [Cap'n Proto RPC](https://capnproto.org/rpc.html), which supports [streaming RPC calls](https://capnproto.org/news/2020-04-23-capnproto-0.8.html) with [BBR flow control](https://queue.acm.org/detail.cfm?id=3022184). Additionally, indexed lookups are performed on the cluster node, **not** the client, so only matching entries are sent over the wire. This architecture makes it possible to efficiently query nodes in very large clusters.
## Implementation of Indexed Queries
To undertand how Wetware's indexed queries are implemented, let us trace the path of a request originating at the command line as it makes its way to a node in the cluster and observe how it is processed on the remote end. Along the way, we will observe several distict features of the Wetware architecture.
> **Note:** most links point to code in the `v0.1.0` branch, which is the most up-to-date. However, I have not yet migrated support for indexes in the`ls` CLI command to `v0.1.0`, so links to CLI code will point to the `master` branch. Please don't be alarmed. There are no meaningful differences between these versions.
Let's begin by having a look at the main function for the `ls` command, [here](https://github.com/wetware/pkg/blob/master/internal/cmd/cluster/ls.go#L31-L41).
```go
func ls() cli.ActionFunc {
return func(c *cli.Context) error {
view, release := node.View(c.Context)
defer release()
it, release := view.Iter(c.Context, query(c))
defer release()
return render(it, formatter(c))
}
}
```
The `ls` function returns a callback that will be invoked b the CLI library when the `ls` command is executed. The `node` symbol is a client object pointing to an individual node in the cluster. It was obtained during application startup through a hook in the CLI library that uses Wetware's pluggable peer discovery system to find and connect to a peer in the cluster.
In the first step, we call the `View` function to obtain a `view.View` type. The `View` is an [object capability](https://en.wikipedia.org/wiki/Object-capability_model) that allows the owner to perform queries against the node's routing table, *i.e* it provides the caller of the capability with the node's *view* of the cluster.
>**Wetware and Object Capabilities.** Wetware uses object capabilities for access control. In Wetware, object capabilities are Cap'n Proto `interface` types, which act as unforgeable references to remote objects, providing a [robust security model](https://capnproto.org/rpc.html#security). Practically speaking, the Cap'n Proto schema compiler produces Go code for the types found in `api/*.capnp`. Interface types are compiled to special Go structs whose method calls map onto [latency-optimize, asynchronous RPC](https://capnproto.org/rpc.html#time-travel-promise-pipelining).
The Cap'n Proto schema for the `View` capability is found in [api/cluster.capnp](https://github.com/wetware/pkg/blob/v0.1.0/api/cluster.capnp#L42-L95). The `View` interface contains three methods:
```capnp
interface View {
# A View is a read-only snapshot of a particular host's routing
# table. Views are not updated, and should therefore be queried
# and discarded promptly.
lookup @0 (selector :Selector, constraints :List(Constraint)) -> (result :MaybeRecord);
# Lookup returns the first record matching the selector
# and constraints, or nothing.
iter @1 (handler :Handler, selector :Selector, constraints :List(Constraint)) -> ();
# Iter streams the records matching the selector and
# constraints to the provided Handler. The Handler
# is automatically released when iteration has been
# exhausted.
reverse @2 () -> (view :View);
# Reverse returns a new View in which iteration order
# is reversed. Note that this also affects the Lookup
# method.
```
Returning to the `ls` function, we see that the second step is to call the `Iter` method on the view, passing in some query parameters, and obtaining an iterator.
```go
it, release := view.Iter(c.Context, query(c))
```
The [`query(c)`](https://github.com/wetware/pkg/blob/master/internal/cmd/cluster/query.go#L13-L27) call is responsible for parsing command-line arguments into arguments for `View.Iter`. The parsing step is relatively straightforward, and is therefore left as an exercise to the reader. The iterator is more interesting, and we will return to it shortly. But first, let's look at the arguments to the `iter` method in the schema file in more detail. These are:
```capnp
interface Handler {
# Handler is a 'callback' interface that is used to
# stream records back to iter's caller.
recv @0 (record :Record) -> stream;
# Recv a single record. Per Cap'n Proto stream
# semantics, recv can be called repeatedly and
# will correctly abort if an error is encountered.
}
struct Selector {
# Selector is a union type that indicates how records
# should be matched in the routing table.
union {
all @0 :Void; # match all records
match @1 :Index; # match the provided index
from @2 :Index; # designates the start of a range query
}
}
struct Constraint {
# Constraint to be applied on a query.
union {
limit @0 :UInt64; # max number of matches
to @1 :Index; # designates the end of a range query
}
}
```
This is fairly self-explanatory. In the CLI example, the `WHERE` directive indicates a `match` selector. Replacing it with `FROM` would indicate a range selector (`from`), and omitting directives (e.g. `ww ls`) corresponds to an `all` selector. In the `WHERE` queries shown earlier, the remaining parameters are used to construct the `match` field's `Index`. The `Index` struct is shown below. It's purpose is to designate the field of `routing.Record` that should be tested against the selector.
```capnp
struct Index {
prefix @0 :Bool;
union {
peer @1 :Text;
server @2 :Data;
host @3 :Text;
meta @4 :Text; # key=value
}
}
```
Additionally, the index communicates whether to perform an exact match or a prefix match.
At this point, it is useful to return to the `ls` function and observe how the iterator (variable `it`) is consumed, as this will
1. demonstrate a particularly elegant feature of Wetware's cluster API; and,
2. provide context for understanding how the `iter` RPC call is construted on the client side and handled on the remote node.
So, returning to `ls`, the third and final step boils down to a call to:
```go
render(it, formatter(c))
```
The `formatter(c)` call is not particularly interesting; it simply reads command-line flags to determine the output format of the result (either plain-text or JSON). The `render` function, on the other hand, demonstrates Wetware's generalized mechanism for dealing with flow-controlled streams: the [`Iterator`](https://github.com/wetware/pkg/blob/v0.1.0/util/casm/casm.go#L31-L63) type.
The `render` function is concise and elegant:
```go
type consumerFunc func(routing.Record) error
func render(it cluster.Iterator, consume consumerFunc) error {
for record := it.Next(); record != nil; record = it.Next() {
if err := consume(record); err != nil {
return err
}
}
return it.Err()
}
```
The `consumerFunc` type is returned by `formatter(c)`, and is uninteresting. Instead focus on `cluster.Iterator`, and take note of the `for` loop. The loop will block on each call to `it.Next()` until the next item has been received from the remote end. Recall that BBR flow control is used, so a slow consumer will cause the remote node to throttle appropriately. When the stream either (a) is exhausted or (b) encounters an error, `it.Next()` will return a nil `routing.Record`, causing the loop to exit. Callers are expected to check `it.Err()`, and interpret a nil error as the end of the stream. Callers can abort early by canceling the `context.Context` intially passed to the `Iter` method.
So, how does this work? Let's dig into the client-side `Iter` implementation ([source](https://github.com/wetware/pkg/blob/v0.1.0/cap/view/client.go#L38-L64)).
```go
// Iter returns an iterator that ranges over records matching
// the supplied query. Callers MUST call the ReleaseFunc when
// finished with the iterator. Callers MUST NOT call methods
// on the iterator after calling the ReleaseFunc.
func (v View) Iter(ctx context.Context, query Query) (Iterator, capnp.ReleaseFunc) {
// If v is the null capability, return an iterator
// that is immediately exhausted. This is how we
// model denial of access using capabilities.
if v == (View{}) {
return Iterator{
Future: context.Background(),
Seq: exhausted{},
}, func() {}
}
ctx, cancel := context.WithCancel(ctx)
var (
// Create a new handler capability. We will pass
// it as a parameter to the Iter call, and the remote
// node will use it to stream records back to us.
h = newHandler()
// Call the capability client's View method,
// passing in a handler and the query we got as
// an argument.
f, release = api.View(v).Iter(ctx, h.Handler(query))
)
// Build and return a view.Iterator
return Iterator{
Future: casm.Future(f),
Seq: h,
}, func() {
cancel()
release()
}
}
```
There's quite a bit going on here, so let's unpack it. The first thing to notice is that [`view.Iterator`](https://github.com/wetware/pkg/blob/v0.1.0/cap/view/client.go#L38-L64) is a subtype of the generic type `casm.Iterator`. It is a concrete iterator that deals in `routing.Record`s.
```go
// Iterator is a stateful object that enumerates routing records.
// See casm.Iterator() for important information on lifetime and
// error handling.
type Iterator casm.Iterator[routing.Record]
// Err reports any error encountered during iteration. Next will
// always return nil when Err() != nil.
func (it Iterator) Err() error {
return casm.Iterator[routing.Record](it).Err()
}
// Next upates the iterator's internal state and returns the
// next record in the stream. If a call to Next returns nil,
// the iterator is exhausted.
//
// Records returned by Next are valid until the next call to
// Next, or until the iterator is released. See View.Iter().
func (it Iterator) Next() routing.Record {
r, _ := it.Seq.Next()
return r
}
```
The [`view.handler` type](https://github.com/wetware/pkg/blob/v0.1.0/cap/view/client.go#L94-L98) that was constructed with `newHandler` corresponds to the [`Seq` interface](https://github.com/wetware/pkg/blob/v0.1.0/util/casm/casm.go#L32-L34) in the generic `Iterator`'s type definition. The details of how these pieces fit together are best obtained by carefully reading through the source code. The key point is that `handler` implements the Cap'n Proto `Handler` interface found in `cluster.capnp`. Specifically, it implements the `Recv` method [here](https://github.com/wetware/pkg/blob/v0.1.0/cap/view/client.go#L126-L159), which constitutes the send-end of the iterator shown above. This method is called for each record returned by the routing table lookup in the node. This takes place in the `iter` function that is construted on the [server-side endpoint for `Iter`](https://github.com/wetware/pkg/blob/v0.1.0/cap/view/server.go#L42-L61), in the remote node:
```go
func (s Server) Iter(ctx context.Context, call api.View_iter) error {
// get the selector argument passed in by the client
sel, err := call.Args().Selector()
if err != nil {
return err
}
var (
// get the handler capability, which we use as a
// callback
handler = call.Args().Handler()
// construct the send side of the iterator using
// the handler capability. Note that the resulting
// 'iter' symbol is a function, each call to which
// will send one record back to the caller.
iter = iterator(ctx, handler)
)
// Obtain a snapshot of the routing table, bind it to the
// selector, and stream the results back to the caller.
if err = s.bind(iter, selector(sel)); err == nil {
call.Go()
err = handler.WaitStreaming()
}
return err
}
```
The magic happens in `s.bind`, and is straightforward:
```go
type bindFunc func(routing.Record) error
func (s Server) bind(bind bindFunc, selector query.Selector) error {
// Take a snapshot of the routing table, and pass it
// to the `selector` method, which returns a routing.Iterator.
// The routing.Iterator is a *local* iterator for contents of
// the routing table.
it, err := selector(s.RoutingTable.Snapshot())
if err != nil {
return err
}
// Iterate through local records, sending each one to the caller.
for r := it.Next(); r != nil; r = it.Next() {
if err = bind(r); err != nil {
break
}
}
// Because it is a local iterator, it.Err() is always nil,
// so we don't need to check it.
return err
}
```
The source for `routing.Iterator` can be found [here](https://github.com/wetware/pkg/blob/v0.1.0/cluster/routing/routing.go#L127-L141).
As with `view.Iterator`, it is a concrete subtype of the [generic `casm.Iterator`](https://github.com/wetware/pkg/blob/v0.1.0/cluster/routing/routing.go#L127-L141).
```go
type Iterator[T any] struct {
Seq interface {
Next() (T, bool)
}
Future interface {
Done() <-chan struct{}
Err() error
}
}
```
In all cases, iterators are built from a `Seq` interface that is able to deliver the next value, and a `Future` interface that is able to detect asynchronous termination of the stream, as well as report any errors encountered. The latter is trivially satisfied by the code-generated `Future` types produced by the Cap'n Proto compiler for a given schema, and the former is responsible for receiving a single value and synchronizing access.
Code reuse FTW!
## Closing Thoughts
Wetware's family of `Iterator`s is a powerful feature that provides a consistent API over several key features. In this write-up, we have seen how the generic `casm.Iterator` is used to model both *local* lookups to the routing table, along with the efficient streaming of `routing.Record`s over the network. In both cases, the same developer-facing pattern is used: a simple `for` loop with a final error check.
This pattern also emerges in other Wetware features. For example, Wetware provides a peer-to-peer pubsub mesh that distributed applications can use to broadcast messages. [Subscribing to a pubsub topic](https://github.com/wetware/pkg/blob/master/internal/cmd/cluster/pubsub.go#L66-L82) looks very similar to querying the routing table. In fact, it is even simpler, since it requires no selectors or constraints:
```go
func subAction() cli.ActionFunc {
return func(c *cli.Context) (err error) {
t, release := node.Join(c.Context, c.String("topic"))
defer release()
sub, cancel := t.Subscribe(c.Context)
defer cancel()
print := newPrinter(c)
for msg := sub.Next(); msg != nil; msg = sub.Next() {
print(msg)
}
return sub.Err()
}
}
```
Other client-facing features from inter-process channels to service discovery make use of this same pattern.