<style> .reveal { font-size: 2.5rem; } </style> # dora-rs Low latency, distributed, and composable data flow framework. [github.com/dora-rs/dora](https://github.com/dora-rs/dora/) <br><br> Philipp Oppermann & Xavier Tao June 7, 2022 --- ## Project Goal - Rust based framework for building robotic applications - Aiming for low latency, distributed, and composability - At the bridge between academic application and the industry - Fully open-source from the start Example use case: Driving an autonomous car within a simulation --- ## Design Overview ![image alt](https://raw.githubusercontent.com/dora-rs/dora/main/design/overview.svg) - Users can define _custom nodes_ and _operators_ - custom nodes have full control with an own `main` function - operators can use higher-level features (e.g. deadlines) and can communicate more efficiently --- ## Why Rust? - Safety is important for reliability - No garbage collector → no latency spikes - Integration with other languages - Crate ecosystem, e.g. `pyO3`, `wasmtime` - Async Rust for handling events from different sources --- ## Async Design - Don't use `select` macro - requires lots of pinning and fusing - its custom syntax breaks tools such as `rust-analyzer` or `rustfmt` - `FusedFuture` bound does not prevent poll after `Poll::Ready` ```rust loop { let mut fut = (&mut fut).fuse(); // 🗲 (should be outside of loop) select! { _ = fut => println!("foo"), }; } ``` - <!-- .element: class="fragment"-->Use an event stream instead - as proposed by Yoshua Wuyts in [_"Futures Concurrency III"_](https://blog.yoshuawuyts.com/futures-concurrency-3/) --- ## Async Design: Event Stream ```rust enum Event { Input(InputEvent), InputsStopped, Operator { id: OperatorId, event: OperatorEvent, }, } ``` ```rust let inputs = subscribe(&topics, communication).await?; let input_events = inputs.map(Event::Input) .chain(stream::once(async { Event::InputsStopped })); ``` <!-- .element: class="fragment"--> ```rust let mut operator_events = tokio_stream::StreamMap::new(); for operator in operators { operator_events.insert(operator.id, spawn_operator().await); } let op_events = operator_events.map(|(id, event)| Event::Operator { id, event }); ``` <!-- .element: class="fragment"--> ```rust use futures_concurrency::Merge; let mut events = (input_events, op_events).merge(); // now we have a normal while loop with a match instead of needing `select` while let Some(event) = events.next().await { match event { ... } } ``` <!-- .element: class="fragment"--> --- ## Challenges with Async Rust - CPU-bound operations in async functions - Cancellation of spawned tasks - Error/panic propagation --- ### CPU-bound operations in async functions > To give a sense of scale of how much time is too much, a good rule of thumb is **no more than 10 to 100 microseconds** between each `.await` . ||CPU-bound computation|Synchronous IO|Running forever| |-|-|-|-| |`spawn_blocking`|Suboptimal|OK|No |`rayon`|OK|No|No |Dedicated thread|OK|OK|OK \--- Alice Ryhl, in [_"Async: What is blocking?"_](https://ryhl.io/blog/async-what-is-blocking/) --- ### Spawning CPU-bound tasks - **tokio::spawn_blocking**: Optimal for reducing tail latency - Pros - Directly available threadpool - Work well enough on most cases - Cons - Need adequate parametrisation (workers, threads, timeout...) - **rayon**: Optimal for maximizing cpu usage - Pros - Do not need Arc for ∞ read - Stack allocation of tasks instead of heap - Recursive parallelisation - Cons - Add a layer of technology --- - Response Time <img src="https://i.imgur.com/QNqSkDL.png" alt="drawing" /> *Test on a Deep Learning task, with high concurrency* --- - Computation Time <img src="https://i.imgur.com/NoNwBo0.png" alt="drawing" /> --- <a href="https://imgflip.com/i/6lurm7"><img src="https://i.imgflip.com/6lurm7.jpg" title="made at imgflip.com"/></a> --- ### Cancellation of spawned Tasks - Dropping a `JoinHandle` detaches the task → keeps running in the background ```rust let task = tokio::spawn(async { let mut items = Vec::new(); for _ in 0..n { items.push(read_large_file().await); } let _ = result_tx.send(items).await; }); do_something_else()?; // 🗲 `task` keeps running on error let items = task.await; ``` - <!-- .element: class="fragment"-->Workarounds - manual checks (e.g. is `result_tx` closed?) → error-prone and less efficient - use `smol` instead of `tokio` or `async_std` - use `FutureExt:remote_handle` everywhere ```rust let (task, handle) = task.remote_handle(); tokio::spawn(task); do_something_else()?; // dropping `handle` cancels task handle.await ``` --- ### Error/Panic Propagation - Easy to accidentally discard a panic/error ```rust tokio::spawn(async { panic!("foo") }); tokio::spawn(async { Result::<(), u32>::Err(1) }); ``` - neither a compiler warning nor a runtime error occurs (just some `stderr` output for the panic) - same with `async_std` - `must_use` warning with `smol` (tasks are canceled on drop) --- ## Async Annoyances - Compiler cannot infer error type in `async` blocks ```rust let task = async { value.context("I/O error")?; Ok(()) // ^^ cannot infer type for type parameter `E` // workaround: `Result::<_, anyhow::Error>::Ok(())` }; ``` - <!-- .element: class="fragment"-->Clone boilerplate with <code>async move {}</code> ```rust let id_clone = id.clone(); let tx_clone = tx.clone(); // etc ... let task = async move { tx_clone.send(id_clone).await }; // (we still need `id`, `tx`, etc. here) ``` (also applies to `move` closures) --- ## Stdlib Footguns - `min`/`max` methods are confusing ```rust fn foo(value: usize) { // ensure that the value is at least 1 let value = value.min(1); // 🗲 } ``` (`min` sets an _upper bound_ for the value) - <!-- .element: class="fragment"-->Unwrapping a <code>thread::Result</code> hides message ```rust let t = std::thread::spawn(|| panic!("foo")); let result = t.join(); // propagate the panic result.unwrap(); // → "called `Result::unwrap()` on an `Err` value: Any { .. }" ``` (to retain the panic message, you need to use `std::panic::resume_unwind` instead of `unwrap`) --- ## Summary - Async Challenges - CPU-bound operations in async tasks - async cancellation - error/panic propagation - Async Annoyances - infer error type in `async` blocks - clone boilerplate with `move` - Stdlib Footguns: `min/max`, `thread::Result::unwrap()`
{"metaMigratedAt":"2023-06-17T03:51:41.201Z","metaMigratedFrom":"YAML","title":"dora-rs","breaks":true,"slideOptions":"{\"theme\":\"white\",\"center\":false}","contributors":"[{\"id\":\"2c9dccfa-1285-4f1b-be4c-5748b549b2bb\",\"add\":8607,\"del\":3845},{\"id\":\"dcd8580f-6041-4708-8c3f-3f0de43b5626\",\"add\":4768,\"del\":2661}]"}
    793 views