Try   HackMD

Wetware Cluster Membership

How indexed queries to a node's routing table are performed remotely, and how the results are efficiently streamed.

Preliminaries: Cluster Membership and the Routing Table

Wetware nodes form a cluster in which peers maintain a mutual awareness of each other. This mutual awareness is provided by the cluster.RoutingTable interface. The routing table maintains an indexed set of routing.Records, each of which is periodically broadcast by a peer as a form of heartbeat and contains routing information and metadata about its originator.

For convenience, here is routing.Record:

// Record is an entry in the routing table.
type Record interface {
    // Server ID is a globally unique identifier for
    // the server instance. It changes each time the
    // server is restarted, which allows "instances"
    // to be distinguished from the permanent peer
    // identity.
    Server() ID

    // Peer returns the libp2p peer ID, which is
    // derived from the node's public key.  In a
    // production setting, this identifier will
    // be persisted across node restarts.
    Peer() peer.ID

    // Seq returns the sequence number (i.e. a
    // monotonic logical clock) that is used to
    // identify the record's version.  Only the
    // latest Record should be retained.
    Seq() uint64

    // TTL indicates the timespan within which
    // a subsequent record with a higher sequence
    // number is expected to be observed by peers.
    // The routing table SHOULD drop records whose
    // TTL has elapsed.
    TTL() time.Duration

    // Host is an optional string that provides the
    // hostname for the network host that underlies
    // the peer.  Note that multiple peers may co-
    // locate on a single physical host, and that
    // the host string MAY be empty.
    Host() (string, error)

    // Meta returns a list of key-value pairs that
    // contain optional, user-provided metadata.
    // Common uses include e.g. AWS region.
    Meta() (Meta, error)
}

The routing table is responsible for expiring stale entries, i.e. entries for which either (1) the TTL has expired or (2) a new record with a higher sequence number was received. This provides an inconsistent but available system for propagating and querying cluster-membership state. As a design principle, Wetware is a PA/EL system that supports other consistency tradeoffs at the application layer.

Introducing Indexed Queries

Users and applications can perform indexed queries against the routing table to obtain the set of peers matching some constraints. For instance, at the command line:

ww ls WHERE region=us-east-1  # metadata field 'region'

or

ww ls WHERE host=my-hostname  # hostname

In both cases, queries are prefixed-matched against entries in the routing table, and all matching entries are streamed back to the client. Streaming is performed via Cap'n Proto RPC, which supports streaming RPC calls with BBR flow control. Additionally, indexed lookups are performed on the cluster node, not the client, so only matching entries are sent over the wire. This architecture makes it possible to efficiently query nodes in very large clusters.

Implementation of Indexed Queries

To undertand how Wetware's indexed queries are implemented, let us trace the path of a request originating at the command line as it makes its way to a node in the cluster and observe how it is processed on the remote end. Along the way, we will observe several distict features of the Wetware architecture.

Note: most links point to code in the v0.1.0 branch, which is the most up-to-date. However, I have not yet migrated support for indexes in thels CLI command to v0.1.0, so links to CLI code will point to the master branch. Please don't be alarmed. There are no meaningful differences between these versions.

Let's begin by having a look at the main function for the ls command, here.

func ls() cli.ActionFunc {
	return func(c *cli.Context) error {
		view, release := node.View(c.Context)
		defer release()

		it, release := view.Iter(c.Context, query(c))
		defer release()

		return render(it, formatter(c))
	}
}

The ls function returns a callback that will be invoked b the CLI library when the ls command is executed. The node symbol is a client object pointing to an individual node in the cluster. It was obtained during application startup through a hook in the CLI library that uses Wetware's pluggable peer discovery system to find and connect to a peer in the cluster.

In the first step, we call the View function to obtain a view.View type. The View is an object capability that allows the owner to perform queries against the node's routing table, i.e it provides the caller of the capability with the node's view of the cluster.

Wetware and Object Capabilities. Wetware uses object capabilities for access control. In Wetware, object capabilities are Cap'n Proto interface types, which act as unforgeable references to remote objects, providing a robust security model. Practically speaking, the Cap'n Proto schema compiler produces Go code for the types found in api/*.capnp. Interface types are compiled to special Go structs whose method calls map onto latency-optimize, asynchronous RPC.

The Cap'n Proto schema for the View capability is found in api/cluster.capnp. The View interface contains three methods:

interface View {
    # A View is a read-only snapshot of a particular host's routing
    # table. Views are not updated, and should therefore be queried
    # and discarded promptly.

    lookup  @0 (selector :Selector, constraints :List(Constraint)) -> (result :MaybeRecord);
    # Lookup returns the first record matching the selector
    # and constraints, or nothing.
    
    iter    @1 (handler :Handler, selector :Selector, constraints :List(Constraint)) -> ();
    # Iter streams the records matching the selector and
    # constraints to the provided Handler.  The Handler
    # is automatically released when iteration has been
    # exhausted.
    
    reverse @2 () -> (view :View);
    # Reverse returns a new View in which iteration order
    # is reversed.  Note that this also affects the Lookup
    # method.

Returning to the ls function, we see that the second step is to call the Iter method on the view, passing in some query parameters, and obtaining an iterator.

it, release := view.Iter(c.Context, query(c))

The query(c) call is responsible for parsing command-line arguments into arguments for View.Iter. The parsing step is relatively straightforward, and is therefore left as an exercise to the reader. The iterator is more interesting, and we will return to it shortly. But first, let's look at the arguments to the iter method in the schema file in more detail. These are:

interface Handler {
    # Handler is a 'callback' interface that is used to
    # stream records back to iter's caller.
    
    recv @0 (record :Record) -> stream;
    # Recv a single record.  Per Cap'n Proto stream
    # semantics, recv can be called repeatedly and
    # will correctly abort if an error is encountered.
}

struct Selector {
    # Selector is a union type that indicates how records
    # should be matched in the routing table.

    union {
        all        @0 :Void;  # match all records
        match      @1 :Index; # match the provided index
        from       @2 :Index; # designates the start of a range query
    }
}

struct Constraint {
    # Constraint to be applied on a query.

    union {
        limit      @0 :UInt64;  # max number of matches
        to         @1 :Index;   # designates the end of a range query
    }
}

This is fairly self-explanatory. In the CLI example, the WHERE directive indicates a match selector. Replacing it with FROM would indicate a range selector (from), and omitting directives (e.g. ww ls) corresponds to an all selector. In the WHERE queries shown earlier, the remaining parameters are used to construct the match field's Index. The Index struct is shown below. It's purpose is to designate the field of routing.Record that should be tested against the selector.

struct Index {
    prefix    @0 :Bool;
    union {
        peer   @1 :Text;
        server @2 :Data;
        host   @3 :Text;
        meta   @4 :Text;        # key=value
    }
}

Additionally, the index communicates whether to perform an exact match or a prefix match.

At this point, it is useful to return to the ls function and observe how the iterator (variable it) is consumed, as this will

  1. demonstrate a particularly elegant feature of Wetware's cluster API; and,
  2. provide context for understanding how the iter RPC call is construted on the client side and handled on the remote node.

So, returning to ls, the third and final step boils down to a call to:

render(it, formatter(c))

The formatter(c) call is not particularly interesting; it simply reads command-line flags to determine the output format of the result (either plain-text or JSON). The render function, on the other hand, demonstrates Wetware's generalized mechanism for dealing with flow-controlled streams: the Iterator type.

The render function is concise and elegant:

type consumerFunc func(routing.Record) error

func render(it cluster.Iterator, consume consumerFunc) error {
	for record := it.Next(); record != nil; record = it.Next() {
		if err := consume(record); err != nil {
			return err
		}
	}

	return it.Err()
}

The consumerFunc type is returned by formatter(c), and is uninteresting. Instead focus on cluster.Iterator, and take note of the for loop. The loop will block on each call to it.Next() until the next item has been received from the remote end. Recall that BBR flow control is used, so a slow consumer will cause the remote node to throttle appropriately. When the stream either (a) is exhausted or (b) encounters an error, it.Next() will return a nil routing.Record, causing the loop to exit. Callers are expected to check it.Err(), and interpret a nil error as the end of the stream. Callers can abort early by canceling the context.Context intially passed to the Iter method.

So, how does this work? Let's dig into the client-side Iter implementation (source).

// Iter returns an iterator that ranges over records matching
// the supplied query. Callers MUST call the ReleaseFunc when
// finished with the iterator.  Callers MUST NOT call methods
// on the iterator after calling the ReleaseFunc.
func (v View) Iter(ctx context.Context, query Query) (Iterator, capnp.ReleaseFunc) {
    // If v is the null capability, return an iterator
    // that is immediately exhausted.  This is how we
    // model denial of access using capabilities.
	if v == (View{}) {
		return Iterator{
			Future: context.Background(),
			Seq:    exhausted{},
		}, func() {}
	}

	ctx, cancel := context.WithCancel(ctx)

	var (
        // Create a new handler capability.  We will pass
        // it as a parameter to the Iter call, and the remote
        // node will use it to stream records back to us.
            h          = newHandler()
        
        // Call the capability client's View method,
        // passing in a handler and the query we got as
        // an argument.
            f, release = api.View(v).Iter(ctx, h.Handler(query))
	)

    // Build and return a view.Iterator
	return Iterator{
			Future: casm.Future(f),
			Seq:    h,
		}, func() {
			cancel()
			release()
		}
}

There's quite a bit going on here, so let's unpack it. The first thing to notice is that view.Iterator is a subtype of the generic type casm.Iterator. It is a concrete iterator that deals in routing.Records.

// Iterator is a stateful object that enumerates routing records.
// See casm.Iterator() for important information on lifetime and
// error handling.
type Iterator casm.Iterator[routing.Record]

// Err reports any error encountered during iteration. Next will
// always return nil when Err() != nil.
func (it Iterator) Err() error {
	return casm.Iterator[routing.Record](it).Err()
}

// Next upates the iterator's internal state and returns the
// next record in the stream.  If a call to Next returns nil,
// the iterator is exhausted.
//
// Records returned by Next are valid until the next call to
// Next, or until the iterator is released.  See View.Iter().
func (it Iterator) Next() routing.Record {
	r, _ := it.Seq.Next()
	return r
}

The view.handler type that was constructed with newHandler corresponds to the Seq interface in the generic Iterator's type definition. The details of how these pieces fit together are best obtained by carefully reading through the source code. The key point is that handler implements the Cap'n Proto Handler interface found in cluster.capnp. Specifically, it implements the Recv method here, which constitutes the send-end of the iterator shown above. This method is called for each record returned by the routing table lookup in the node. This takes place in the iter function that is construted on the server-side endpoint for Iter, in the remote node:

func (s Server) Iter(ctx context.Context, call api.View_iter) error {
    // get the selector argument passed in by the client
    sel, err := call.Args().Selector()
    if err != nil {
        return err
    }

    var (
        // get the handler capability, which we use as a
        // callback
        handler = call.Args().Handler()

        // construct the send side of the iterator using
        // the handler capability.  Note that the resulting
        // 'iter' symbol is a function, each call to which
        // will send one record back to the caller.
        iter = iterator(ctx, handler)
    )

    // Obtain a snapshot of the routing table, bind it to the
    // selector, and stream the results back to the caller.
    if err = s.bind(iter, selector(sel)); err == nil {
        call.Go()
        err = handler.WaitStreaming()
    }

    return err
}

The magic happens in s.bind, and is straightforward:

type bindFunc func(routing.Record) error

func (s Server) bind(bind bindFunc, selector query.Selector) error {
    // Take a snapshot of the routing table, and pass it
    // to the `selector` method, which returns a routing.Iterator.
    // The routing.Iterator is a *local* iterator for contents of
    // the routing table.
    it, err := selector(s.RoutingTable.Snapshot())
    if err != nil {
        return err
    }

    // Iterate through local records, sending each one to the caller.
    for r := it.Next(); r != nil; r = it.Next() {
        if err = bind(r); err != nil {
            break
        }
    }

    // Because it is a local iterator, it.Err() is always nil,
    // so we don't need to check it.

    return err
}

The source for routing.Iterator can be found here.

As with view.Iterator, it is a concrete subtype of the generic casm.Iterator.

type Iterator[T any] struct {
	Seq interface {
		Next() (T, bool)
	}

	Future interface {
		Done() <-chan struct{}
		Err() error
	}
}

In all cases, iterators are built from a Seq interface that is able to deliver the next value, and a Future interface that is able to detect asynchronous termination of the stream, as well as report any errors encountered. The latter is trivially satisfied by the code-generated Future types produced by the Cap'n Proto compiler for a given schema, and the former is responsible for receiving a single value and synchronizing access.

Code reuse FTW!

Closing Thoughts

Wetware's family of Iterators is a powerful feature that provides a consistent API over several key features. In this write-up, we have seen how the generic casm.Iterator is used to model both local lookups to the routing table, along with the efficient streaming of routing.Records over the network. In both cases, the same developer-facing pattern is used: a simple for loop with a final error check.

This pattern also emerges in other Wetware features. For example, Wetware provides a peer-to-peer pubsub mesh that distributed applications can use to broadcast messages. Subscribing to a pubsub topic looks very similar to querying the routing table. In fact, it is even simpler, since it requires no selectors or constraints:

func subAction() cli.ActionFunc {
	return func(c *cli.Context) (err error) {
		t, release := node.Join(c.Context, c.String("topic"))
		defer release()

		sub, cancel := t.Subscribe(c.Context)
		defer cancel()

		print := newPrinter(c)

		for msg := sub.Next(); msg != nil; msg = sub.Next() {
			print(msg)
		}

		return sub.Err()
	}
}

Other client-facing features from inter-process channels to service discovery make use of this same pattern.