--- tags: tesi --- <style>.markdown-body { max-width: 1500px; }</style> # Multiple streams ## How it should work There is a `StreamEnvironment` that keeps track of the computations to do, including the configuration (batch mode, hosts, base port, ...), and the computation graph. ```rust let mut env = StreamEnvironment::new(...); let stream = env.stream(IteratorSource::new(...)) // new block id=1 .map(|x| ...) // Stream<Map<T1>> .filter(|x| ...) // Stream<Filter<Map<T1>>> .group_by(|x| x.field.clone()) // KeyedStream<T2> // new block id=2 .map(|x| ...) // KeyedStream<Map<T2>> .sliding_window(...) // WindowedStream<T3> .fold(...); // Stream<T4> let stream2 = env.stream(Source::new(...)) // new block id=3 .zip(stream.clone()) // Stream<(T4,T5)> .map(|x| ...) // Stream<Map<(T4,T5)>> // new block id=4 .iterate(1000, |s| { s // Stream<Map<(T4,T5)>> // new block id=5 .filter(|x| ...) // Stream<Filter<Map<(T4,T5)>>> .shuffle() // KeyedStream<T6> // new block id=6 .filter(|x| ...) // KeyedStream<T6> (T6 == Map<(T4,T5)>) }) // Stream<T6> // new block id=7 .filter(|x| ...); // Stream<Filter<T6>> let f1 = stream.collect_vec(); // Future<Vec<T4>> let f2 = stream2.collect_vec(); // Future<Vec<T6>> env.execute(); let v1 = f1.value(); // Vec<T4> let v2 = f2.value(); // Vec<T6> ``` Job graph: ```graphviz digraph { source1 [label="Source 1"] source2 [label="Source 2"] id1 [label="id1\nmap / filter / group_by"] [shape=box] id2 [label="id2\nmap / sliding window / fold"] [shape=box] id3 [label="id3\nzip / map"] [shape=box] subgraph cluster { id4 [label="id4\niterate"] [shape=box] id5 [label="id5\nfilter / shuffle"] [shape=box] id6 [label="id6\nfilter"] [shape=box] graph[style=dotted] } id7 [label="id7\nfilter"] [shape=box] f1 [label="f1"] f2 [label="f2"] source1 -> id1 -> id2 -> f1 source2 -> id3 -> id4 -> id5 -> id6 id1 -> id2 id4 -> id7 -> f2 id2 -> id3 id6 -> id4 } ``` ## Names - **Operator**: one of the locally chainable operators (`map`, `filter`, `flatMap`, ...) - **Block**: chain of operators run inside the same node. Each block may be run in one or more nodes. - **Job Graph**: the graph with the abstract data flow, where the blocks are not yet assigned to the nodes. - **Execution Graph**: the graph with the actual data flow, where the blocks are assigned to the nodes. ## Algorithm - Calling `.stream` on the environment starts the creation of a new stream, preparing a source and starting an empty block. - The returned stream has several methods, depending on the next operation to do: - simple operator (`map`, `filter`, `flatMap`, ...): consume the stream return a new stream with the "matrioska" type - shuffling operator (`shuffle`, `group_by`, `window`, `zip`, ...): consume the stream, finalize the block, return a new stream with an empty type - `iterate`: consume the stream, and start a new empty block (passing it to the closure). The closure returns a stream, the last block is finalized and iterate returns a new stream with an empty type - a sink operator (`collect_vec`, `for_each`, ...): consume the stream, finalizing the block, creating a sink block and (if collect) returns a future with the resulting Vec. - `clone`: takes the stream, finalizes the block (if needed) and returns a new stream with an empty type whose elements are duplicated from the original stream. - Calling `execute` on the environment consumes the environment and starts the sources blocking until all the work is done. - Now the futures obtained by the collects contain the actual data ## Spawning Using the simple operators only the current block is updated. When a block is finalized this happens: - The block is constructed and an ID is assigned - A new thread is spawned (on all the nodes) moving the block inside the thread - The thread waits for the graph to be fully built - A new block is created and the connection between the last block and the new one is stored inside the environment When the environment is executed the graph is fully known and the _optimizer_ assignes the blocks to the ranks (nodes). The execution graph is built and each (rank, block) can be either: - `AssignBlock::Removed`: the thread was spawned for the block, but that block has been assigned to other nodes. The thread exits without doing anything. - `AssignBlock::Assigned(metadata)`: the thread for that block was spawned correctly on that node, the `ExecutionMetadata` is sent to the thread which can start the computation. `ExecutionMetadata` contains: - `prev: Vec<Addr>` list of previous nodes in the execution graph - `next: Vec<Vec<Addr>>` list of next nodes in the execution graph, grouped by the original job graph nodes - `parallelism` the number of replicas of that block ### Scheduler / Optimizer - Need to decide how to replicate/parallelize blocks on multiple nodes - If we have $k$-parallelism (e.g after a group_by) and the stream is split in $w$ streams, we can maintain up to $w \cdot k$ parallel streams - If at some point we don't have parallelism (e.g. equivalent to group_by(0)) there is no need to replicate the next blocks - Need balance between parallelism and slot usage - Too much parallelism might end up consuming too many slots ### Rationale - A block can be spawned on one or more nodes - Before the entire graph is built we cannot assign the nodes to the blocks - _An alternative may be a two-step setup: in a first pass (ran only on one node) we build the graph and assign the blocks to the ranks; in a second pass (ran on the other nodes) the graph is built again an when the correct block is built that code is run._ - Cannot really store the blocks and spawn only the correct one since each block has a different type ## Block structure ### `InnerBlock<IN,OUT>` This type is stored inside the `Stream`, it doesn't know about the previous nodes and next nodes. - `buffer: Vec<IN>` buffer of data pending to be computed by the block - `operators: Map<Filter<...>>: Iterator<Item=OUT>` matrioska of operators of this block, each of which consumes from the previous operator, and the head reads from the buffer - `next_strategy: NextStrategy` strategy to use to send the data to the next nodes: - `NextStrategy::Same` send the data to the block on the same replica - `NextStrategy::Random` select a random node from the next - `NextStrategy::GroupBy(f)` send the message to the correct node - `metadata: BlockMetadata` structural information about the block (whether it uses broadcast variables, number or replicas required, is replicable, ...) ### `Block<IN,OUT>` This type is stored inside the thread and exists only on the threads where the block is the correct one for the rank. - `inner: InnerBlock<IN,OUT>` the actual computation of the block - `metadata: ExecutionMetadata` the info about the block in the execution graph ## Iterations Flink supports 2 semantics for the iterations: 1. DataSet iterate: all the dataset is passed throuth the iteration and what comes out is chosen if make it recirculate or leave the iteration 2. DataStream iterate: each tuple is passed through the iteration and immediately fed back in the loop, mixing new tuples with fed back ones > Not sure when to update broadcast variables in (2) ### Broadcast variables ```rust let state = env.state(42); let s = env.stream(...) .iterate(1000, |s| { s .filter(|x| ...) .use_state(state, |s, var| { s.map(|x| x + var.get()) }) .map(|x| ...) .update_state(state, |&mut var, elem| { // reduce the state on the elements of the stream }, |&mut var| -> bool { // called when an iteration ends and returns true // if a next iteration is needed }) }); env.execute(); let state = state.value(); ``` --- ## Block partition Each block is spawned on every node, some of them may be explicitly removed: - Sources may not be parallel - Sinks may not be parallel - Some blocks may want to be a bottleneck (i.e. iterations) ## Data flow - Each block can be spawned in many replicas: its replication factor is $k$ - A block can take input from many streams - Where should its output go? ## Stream types - `Stream<T>` a stream of data than can run in parallel on many nodes, but the partition between the nodes is not relevant. The semantics is the same as one single stream without the assumption that (by default) all the data pass through an operator - `KeyedStream<K,T>` a stream of data that can run in parallel on many nodes, but the values are partitioned on nodes based on the hash of the key. The semantics is the same as many stream, one for each key. All the data with the same key will pass through a specific operator. - `WindowStream<T>` is the same as `Stream<T>` but you have to use an aggregation operator immediately after. The aggregation is done with the elements inside the window. - `WindowKeyedStream<K,T>` is the same as `KeyedStream<T>` with the notes of `WindowStream<T>` --- _Assume $k \ne k_2$_ ```graphviz digraph { A [label="A\nk"] B [label="B\n1"] C [label="C\nk"] D [label="D\nk2"] A -> B A -> C A -> D } ``` | | Same | Random | GroupBy | |:-------------------------- |:--------------------:|:-----------------------:|:----------------------------------------:| | $A \rightarrow B$ $^{(1)}$ | ✅ $i \rightarrow 0$ | ✅ $i \rightarrow 0$ | ✅ $i \rightarrow 0$ | | $A \rightarrow C$ | ✅ $i \rightarrow i$ | ✅ $i \rightarrow rand$ | ✅ $i \rightarrow hash \text{ mod } k$ | | $A \rightarrow D$ $^{(2)}$ | ❌ | ✅ $i \rightarrow rand$ | ✅ $i \rightarrow hash \text{ mod } k_2$ | $^{(1)}$ Useful for non-shared sinks $^{(2)}$ Using `max_parallelism` for the next block --- _Assume $k \ne k_2$_ ```graphviz digraph { A [label="A\nk"] B [label="B\n1"] C [label="C\nk"] D [label="D\nk2"] B -> A C -> A D -> A } ``` | | Same | Random | GroupBy | |:-------------------------- |:--------------------:|:-----------------------:|:--------------------------------------:| | $B \rightarrow A$ $^{(1)}$ | ❌ | ✅ $0 \rightarrow rand$ | ✅ $0 \rightarrow hash \text{ mod } k$ | | $C \rightarrow A$ | ✅ $i \rightarrow i$ | ✅ $i \rightarrow rand$ | ✅ $i \rightarrow hash \text{ mod } k$ | | $D \rightarrow A$ $^{(2)}$ | ❌ | ✅ $i \rightarrow rand$ | ✅ $i \rightarrow hash \text{ mod } k$ | $^{(1)}$ Useful for non parallel sources $^{(2)}$ Using `max_parallelism` for the next block --- ## Operators `*` can be `''` or `'Keyed'`. | Operator | Input | Parameters | Output | Split block | Next strategy | | ---------------- | ------------------ | ---------------------------------- | ------------------ |:-----------:|:-------------:| | Source | | | `Stream<T>` | Yes | Random | | Parallel source | | | `Stream<T>` | Yes | Same | | Filter | `*Stream<T>` | `T → bool` | `*Stream<T>` | No | | | Map | `*Stream<T>` | `T → S` | `*Stream<S>` | No | | | FlatMap | `*Stream<T>` | `T → IntoIter<S>` | `*Stream<S>` | No | | | Chunks | `*Stream<T>` | `usize` | `*Stream<Vec<T>>` | No | | | FilterMap | `*Stream<T>` | `T → Option<S>` | `*Stream<S>` | No | | | GroupBy | `*Stream<T>` | `&T → K` | `KeyedStream<K,T>` | Yes | GroupBy | | Shuffle | `*Stream<T>` | | `Stream<T>` | Yes | Random | | Fold | `Stream<T>` | `S,T → S`, `S,S → S`, `S` | `Stream<S>` | Yes | Same | | Fold | `KeyedStream<T>` | `S,T → S`, `S` | `KeyedStream<S>` | No | | | Reduce | `Stream<T>` | `T,T → T` | `Stream<T>` | Yes | Same | | Reduce | `KeyedStream<T>` | `T,T → T` | `KeyedStream<T>` | No | | | Window | `*Stream<T>` | `WindowAssigner` | `Window*Stream<T>` | No | | | Fold | `Window*Stream<T>` | `S,T → S`, `S` | `*Stream<S>` | No | | | Concat | `*Stream<T>` | `*Stream<T>` | `*Stream<T>` | No | | | Zip | `Stream<T>` | `Stream<S>` | `Stream<(T,S)>` | No | | | Process | `*Stream<T>` | `IntoIter<T> → IntoIter<S>` | `*Stream<S>` | No | | | MaxParallelism | `*Stream<T>` | `usize` | `Stream<T>` | Yes | Random | | Iterate $^{(1)}$ | `*Stream<T>` | `usize`, `*Stream<T> → *Stream<T>` | `*Stream<T>` | Yes | Same | $^{(1)}$ When iterating over a `KeyedStream`, you can shuffle the data, but the returned stream should be a `KeyedStream` with the same key type and parallelism (note that it doesn't mean that the key should be the same). ## Process allocation In each node there should be _slot_ copies of each block. The most performant way to do so is to have a single process per node, which spawns _slot_ threads for each block (ie a thread for each block in the execution graph). The advantages are: - We can use in-memory channels for all the communication in the same node, avoiding unnecessarily serializations - Spawning the threads is easier (just a single process via ssh) ## Sources Sources do not have preceding blocks. When a block does not have any preceding block, it will not wait for data to arrive before starting to consume the iterator. In this way the source can start to produce data right away, since it doesn't have to receive anything. Sources have a different `NextStrategy`: - Single source: `NextStrategy::Random` - Parallel sources: `NextStrategy::Same` ## Sinks For a generic sink, we can just use `for_each(...)` which will call the given closure on each item of the stream. To collect to a vector, there are two cases: - **Collect only on one node**: the sink has parallelism equal to one - **Collect on all nodes** (shared): we use a block with parallelism equal to one to give a global ordering to the data, then this block will send the data to all the actual sinks (one sink per node) Collecting to a vector is handled through futures. Each sink block keeps receiving data from the stream, saving it in a buffer. When the stream is finished, the future is completed with the buffer. The sinks will not produce any actual data on the stream. ## Iteration [Flink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/iterations.html#superstep-synchronization) If the `Iteration` block is not distributed the computation is pretty easy: - There is only one state updating function and therefore there is only one source of truth for the state - The state updating function process all the data of the stream since it's not distributed - There is only one loop termination condition - There is no synchronization between iteration blocks for restarting the loop If the iteration block is distributed there are many concerns: - Each state updating function will process only part of the stream and produce one new state for each partition - Those states will have to be merged into a single global state by a _master_ iterate block - There is a synchronization step between iterations to wait for the late replicas Our idea is: - When an iteration ends, each replica computes a `DeltaUpdate` reducing the elements of the loop locally (using also the global state of the previous iteration): `f(old_state, delta_update, x) -> new_delta_update` - The delta is sent to the _master_ node that _applies_ the deltas to the global state, obtaining the new global state: `f(old_state, delta_update) -> new_state` - The _master_ node checks the termination condition using the new global state - The _master_ broadcasts the new global state to the interested nodes - The _master_ tells the other iteration nodes to start a new iteration ### KeyedState When iterating over a `KeyedStream` it should be possible to use _keyed_ broadcast variables: there exists a independed state for each key. Internally this simplifies a bit since there is no more a global state. The state is partitioned between the nodes in the same way as the keys. Internally there will be a distributed hashmap where a key exists only on the replica that should handle that key. When the iteration ends it will be possible to collect all the state and the hashmaps will be merged. ## Network design By using threads, the communication between blocks running on the same node can be achieved by using local in-memory channels. To communicate between different nodes, TCP sockets must be used. There can be multiple possibilities: 1. One socket for each pair of nodes 2. One socket for each pair of blocks (replicas of a block on the same node share the same socket) 3. One socket for each pair of replicas of blocks Socket sharing is achieved through multiplexing, by using tags to differentiate the different receivers. Sharing the same socket would be the most efficient approach but dealing with backpressure would be difficult. ### Batch modes - `NoBatching`: equivalent to `FixedBatching(1)` - `FixedBatching(size)`: send batches of at most size items - `AdaptiveBatching(size, max_delay)`: send at least every max_delay, batches of at most size elements ## Message types - `Item<T>` - `Timestamped<Timestamp, T>` - `Watermark<Timestamp>` $^{(1)}$ - `Flush` $^{(1)}$ clears all the internal data, but cannot assume that no more data will arrive. Used to notify that a cycle iteration has ended. - `Terminate` $^{(1)}$ stops the operator flushing its internal data and killing the block. $^{(1)}$ This message should be forwarded after being processed.