owned this note
owned this note
Published
Linked with GitHub
---
type: slide
title: Writing a Stream from scratch
slideOptions:
theme: solarized
highlightTheme: github
---
<style>
body.reveal-viewport {
background-size: cover;
background-image: url("background.svg?raw=1");
background-position: 50% 50%;
background-repeat: no-repeat;
}
</style>
<!-- .slide: data-background-image="https://www.dropbox.com/s/efk8zr2jvbpame2/background-full.svg?raw=1" -->
# Writing a Stream from scratch
---
## Who am I
- @morandidodo
- dodomorandi on GitHub
---
## Agenda
1. Clone a repo for basic structure and eventual help
2. Implement a naive stream
3. Improve the stream
4. More improvements!
5. Future possibilies
---
## These slides
shorturl.at/fiovM
----

---
## Clone the repo
`https://github.com/dodomorandi/workshop-rustlab-2022.git`
----
### Open the docs!
`cargo doc --open --no-deps`
---
## First steps
<small><em>tag</em>: <code>step-1</code></small>
- Create `main.rs`
- Set `default-run = "workshop-rustlab-2022"` on `Cargo.toml`
- Run the server in another terminal
----
### Main
- With `tokio`
- KISS: `current_thread`
- Return a `Result` using a boxed dyn error
- Init `tracing_subscriber`
----
### Run the server
`RUST_LOG=debug cargo run --bin server`
---
## Init MyStream
<small><em>tag</em>: <code>step-2</code></small>
- `mod my_stream`
- Make `MyStream` generic over `T`
- `impl Stream`, use `futures-utils`
----
### How `Stream` works
- Similar to `Future`, `poll_next` instead of `poll`
- Still needs `Pin<&mut Self>` and a `Context`
- Returns a `Poll<Option<Self::Item>>`. When no more items are available from
the stream, a `Poll::Ready(None)` has to be returned.
- You have helper traits in `futures-util`, as for `Future`: `StreamExt` and
`TryStreamExt`.
----
### How `Pin` works (brief intro)
- Generic over a _pointer-like_ type
- Abstract way to avoid _moving_ data
- Zero-overhead abstraction
- Can get inner data `T` if `<T as Deref>::Target: Unpin`
----
### Dummy MyStream
- Return type is a `Result`
- `Error` as an empty enum (for now)
- Use a `todo!` as implementation
---
## Finish main
<small><em>tag</em>: <code>step-3</code></small>
- `MyStream::new`
- Custom `MyEntry`
- Consume stream
----
### `MyStream::new`
- Take `fields` as an `HashSet<ServerField>`
- Take an optional port
- Take a `Client`
- Store `ServerQuery`
- Store query cost
----
### Fix `Error`
- Add `Reqwest` variant
- Implement `fmt::Display` and `error::Error`
----
### Create `MyEntry`
- Choose your fields based on `database::Entry`
- I took `name`, `geo_point_2d` and `numeromoderno`. Name is a nice to have.
----
### Consume `MyStream`
- Create the stream `MyStream<Vec<MyEntry>>` with the right `ServerField`s
- `map_ok` (use `stream::iter`), `try_flatten`, `try_filter` ("empty" names
equal to `-`) and `try_for_each` to print
----
### Here the code
```rust
let stream = MyStream::<Vec<MyEntry>>::new(
[Name, GeoPoint2d, Numeromoderno].into_iter().collect(),
Some(8080),
Client::new(),
);
stream
.map_ok(|entries| stream::iter(entries.into_iter().map(Ok)))
.try_flatten()
.try_filter(|data| ready(data.name != "-"))
.try_for_each(|data| {
println!("{}: {:#?}", data.name, data.geo_point_2d);
future::ok::<_, my_stream::Error>(())
})
.await?;
```
---
## Warm enough?
Let's really start
---
## Basic, `todo!` implementation
<small><em>tag</em>: <code>step-4</code></small>
- `LeakyBucket`, `last_call` and an `Inner` state
- `Empty` → `HeadRequest(HeadRequestFuture)` → `Done`
- Separate operations from `poll_next`
----
### `HeadRequestFuture`
- `BoxFuture`
- KISS: `'static` lifetime
- Return type is `Result<reqwest::Response, reqwest::Error>`
- `impl From<reqwest::Error> for Error`
----
### How `Pin` works (again)
- `Pin<&mut T>` is `!Copy`, just like `&mut T`, but it cannot directly reborrow
- `Pin::as_mut` on a `&mut Pin<T>` to reborrow
- `Pin::get_mut` to get a `&mut T` from a `Pin<&mut T>` **when T is `!Unpin`**
----
### How to correctly use `Pin<&mut Self>`
- Create an usable `this: &mut Self`
- Carefully choose `&mut self` or `self: Pin<&mut Self>` for auxiliary methods,
depending on the need to `poll` and if `Self` can be freely unpinned and
repinned. For now, just use `&mut self`.
----
### When `Empty`
- `this.request_next_page`, get a `Future`
- Poll the future
- `Ready` → `this.handle_head_data_result`
- `Pending` → set inner to `HeadRequest` and return `Pending`
----
### When `HeadRequest`
- `ready!` the `poll`
- `this.handle_head_data_result`
----
### When `Done`
- Just return `Ready(None)`
----
## Basic, partial implementation
Auxiliary functions
---
### `request_next_page`
- Takes `&mut self`
- Returns a `HeadRequestFuture`
- Update the `last_call`
- Use `client.execute`, creating a request using the `query.create_request`
helper
- `.boxed()`
----
### `handle_head_data_result` (1)
- Takes `&mut self` and the `Output` type of `HeadRequestFuture`
- We will also need the `Context`, we will need to poll things later
- In case of error, we are `Done`, return the error
----
### `handle_head_data_result` (2)
- `LeakyBucket::try_from(headers)`, choose your way in case of error
- `match response.error_for_status()`
- `Ok` → `todo!` handle response
- `Err` → split `TOO_MANY_REQUESTS` from other errors, `todo` for throttling
----
### When finished and confident...
...you can take a sneak peek to my implementation (_tag_: `step-4`)
---
## The body
<small><em>tag</em>: <code>step-5</code></small>
- Get the raw JSON body from the response
- Prepare the stream for the next request
----
### More needs
- Need stricter trait bounds for `T`
- Need another `Inner` variant
- Need another boxed `Future` alias
- Need another error variant
- Increment the page in `request_next_page`
----
### `handle_head_data_result`
- Raw JSON → `.text()` from response, we need to detect if we are in the last
page
- Simpler than alternative approaches (custom trait and bound for `T`)
- `Ready` → `self.handle_content_data_result`
- `Pending` → set inner to `BodyRequest` and return `Pending`
- `BodyRequestFuture` returns a `Result<String, reqwest::Error>`
- We are now using the `Context` (I promised)
----
### `handle_content_data_result (1)`
- Similar approach for args and error handling
- Detect if the request has more data to choose how to setup the stream
- If no data is available, return `Ready(None)` and store `Inner::Done`
----
### `handle_content_data_result (2)`
- Otherwise `Deserialize` the string of the body.
- If more data is available, use `self.request_next_page` to set inner to
`Inner::HeadRequest`.
- If no more data is available, set inner to `Done` (but return the data!)
----
### My approach to detect if we have more data
- `enum HasData`, variants `False` and `True { has_another_page: bool }`
- `match serde_json::from_str(&content)` for `Value::Array` and `Value::Object`
- use `self.query.page_size` to check if the number of elements is equal to the
page size (probably it has more data) or not (no more data)
----
### Remember to
- Add the bound where needed: `T: for<'de> Deserialize<'de> + 'static`
- Handle the new inner variant in `poll_next` (hint: poll future, use `ready!`
and call `this.handle_content_data_result`)
- Update `self.query.page` in `request_next_page`!!!
- `impl From<serde_json::Error> for Error` and `fmt::Display`
----
## Don't `Sleep`
<small><em>tag</em>: <code>step-6</code></small>
- Add a `Sleep` variant to inner containing a `Pin<Box<tokio::time::Sleep>>`
- Add a `get_wait_time_for_request` to start managing the bucket
- Implement the error handling in case of too many requests
---
### `Sleep` and `Pin`
- `Sleep` is `!Unpin`, therefore you cannot safely go from `&mut Sleep` to
`Pin<&mut Sleep>` and vice versa
- **For now** we use a `Pin<Box<Sleep>>` for simplicity, we are going to remove
the allocation later on. A `Pin<Box<T>>` can be created safely even for `T:
!Unpin` thanks to `Box::pin`
----
### A common future handling
- Abstract the handling of data future in a `handle_head_data_future` helper
function, taking the behavior from current code for `Empty`
- Use the function to handle `Empty`
- Implement `Sleep` variant handling using `ready!` on polled future and, in
case of readiness, use `request_next_page` and `handle_head_data_future`
----
### `get_wait_time_for_request`
- Returns `Option<std::time::Duration>`
- If a bucket is available, it should try to add `self.query_cost` to the points
in the bucket
- If `add` fails, use `bucket.wait_time_to_use(self.query_cost)` to return the
wait time
- Return `None` if bucket is not available
----
### Handling `TOO_MANY_REQUESTS`
- Use `self.get_wait_time_for_request`
- In case `None` is returned, estimate a wait time
- Use the `sleep` function to create a `Sleep`, and `Box::pin` it
- `poll` the obtained future, and in case of a `Pending` value (we almost expect
that), store it in `inner`. In the strange case of a `Ready` return, just
re-pin self and manually call `poll_next`.
---
## Fix some logic
<small><em>tag</em>: <code>step-7</code></small>
- We should wait enough time if we estimate the bucket to be full enough
- We can even check if we need to wait some time in the initial `Empty` case (if
we fail in the first request, we need to sleep and then set the state to
`Empty`)
- New helper function `request_next_page_if_available_points`
----
### `request_next_page_if_available_points`
- Returns `Result<HeadRequestFuture, Sleep>`
- Uses `self.get_wait_time_for_request`
- Uses `self.request_next_page` if no wait time is needed
- Creates a `Sleep` future otherwise
----
### Changing the `Empty` case
- Use `request_next_page_if_available_points` to maybe get the
`HeadRequestFuture`
- Use `self.handle_head_data_future` just like before if `Result` is `Ok`
- In case of error, create a `Sleep`, `poll` it and eventually restore
`self.inner` to `Empty` and re-run `poll_next`
----
### Fix `handle_content_data_result`
- Use `request_next_page_if_available_points` instead of
`request_next_page`
- Store different inner variants depending on the result
---
## Just an interesting impl
<small><em>tag</em>: <code>step-8</code></small>
We can implement `FusedStream`, because once the stream has the `inner` variant
set to `Done`, it will always return `Ready(None)`.
---
## Let's smash our implementation
If you need a break, now it is a good moment. Things are getting more
complex.
---
## Projecting `Pin`
<small><em>tag</em>: <code>step-9</code></small>
- Instead of `Box::pin`ning `Sleep`, we can _project_ from `Pin<&mut Self>` to
`Pin<&mut Sleep>`, conceptually
- `pin-project` is a crate that allows to perform _pin projection_ without
unsafe. `pin-project-lite` is also available, but for today we are considering
the _big boy_.
- We are going to ignore that `Sleep` is a worse beast than we can imagine
----
### How pin projection works
Before:
```rust
let this: &mut Self = self.as_mut().get_mut();
```
After:
```rust
let mut this: SelfProjection = self.as_mut().project();
```
----
### `SelfProjection`? Not really
```rust
#[pin_project(project = MyStreamProj)]
struct MyStream<T> {
/* Other fields */
#[pin]
inner: Inner<T>,
}
```
Very similar for `enum`s.
----
### Turtles all the way down
```rust
impl<T> MyStreamProj<'_, T> {
/* ... */
}
impl<T> InnerProj<'_, T> {
/* ... */
}
```
---
## Using pin projections
* Unbox `Sleep` variant
* Fix `poll_next`
* Move some `fn`s to `impl *Proj`
* Dereference fields
* `self.inner = x` → `self.inner.set(x)`
----
### Unbox Sleep
* Add the `pin_project` attribute to `Inner`
* Replace `Sleep(Pin<Box<Sleep>>)` with `Sleep(#[pin] Sleep)`
* Follow the chain of required pin projections
----
### Fixing `poll_next`
* Use `.project()`
* `Inner::*` → `InnerProj::*`
* Create `MyStreamProj::set_sleep`, returns a `Pin<&mut Sleep>`
* Change how `sleep` is handled
* Replace some `this.` with `self.` (related to next point)
----
### Move `fn`s
* These needs to be inside `impl MyStreamProj`:
* `request_next_page`
* `request_next_page_if_available_points`
* `get_wait_time_for_request`
* `handle_content_data_result`
* `set_sleep`
----
### Dereference fields
Projections hold a mutable reference (pinned or not) to the fields of the original data.
----
### Use `inner.set`
`MyStreamProj::inner` is `Pin<&mut Inner>`, therefore it is necessary to use `Pin::set`
---
## Fluff those allocs!
<small><em>tag</em>: <code>step-10</code></small>
Harder, better, faster, ~~stronger~~ harder
----
### The basic principle
* Leverage `async`/`await` to create a single `Future` and then `Box::pin` it
* Need to _detach_ `self` from the `Future`
* Requires a major overhaul
----
### The actual change
* `Inner::HeadRequest` + `Inner::BodyRequest` → `Inner::Request(RequestFuture<T>)`
* `marker` is not needed anymore
* `RequestFuture<T>` is a `BoxFuture` of... something not trivial
* `request_next_page` becomes an `async fn`... sort of :sweat_smile:
----
### The two `request_next_page`
* `request_next_page_if_available_points` does not exist anymore
* `MyStreamProj::request_next_page` returns either a `RequestFuture` or a `Sleep`
* `MyStreamProj::request_next_page` uses a free `async fn request_next_page` to create the _detached_ `Future`
----
### `async fn request_next_page`
* Includes _old_:
* `request_next_page_if_available_points`
* `request_next_page`
* `handle_head_data_result`
* `handle_content_data_result`
* Takes all the parts needed from `MyStream`
* Returns `(Result<Option<(T, bool)>, RequestError>, Option<LeakyBucket>)`
----
### `RequestError`
* Needed because we only have one single `Future` returning from the request
* `enum` with two variants:
* `TooManyRequests { wait_time: Duration }`
* `Other(Error)`
* `impl From` for the variants of `Error`
----
### Return of `request_next_page` (1)
`(Result<Option<(T, bool)>, RequestError>, Option<LeakyBucket>)`
* Tuple → we need to return back _detached_ `LeakyBucket`
* `Result` → request can fail because of throttling or a generic error
* `None` → we could have reached the end of data
* `Some` + `bool` → even when we have data, we could have more data to query or not
----
### Return of `request_next_page` (2)
`(Result<Option<(T, bool)>, RequestError>, Option<LeakyBucket>)`
* Type alias `ResultWithBucket<T>`
* `RequestFuture<T> = BoxFuture<'static, ResultWithBucket<T>>`
----
### Move `get_wait_time_for_request`
Now it is free because it is needed by the free version of `request_next_page`
----
### `MyStreamProj::handle_request_result` (1)
* Receives `result: ResultWithBucket<T>`
* Returns `Poll`
* If `Ok` with `has_another_page` → change `inner` depending on the result of `this.request_next_page`
* If `Ok` with `!has_another_page` → `inner.set(Inner::Done)`
----
### `MyStreamProj::handle_request_result` (2)
* If `Err(RequestError::TooManyRequests)` → sleep and try to call `this.request_next_page` until `Ok` is returned
* If `Err(err)` → set `inner` to `Done` and return the error
----
### `MyStreamProj::handle_request_future`
* Similar to `handle_request_future`
* Takes a `future: RequestFuture<T>`
* Small tweaks to the inner logic and nothing more
---
## Still alive?

---
## Avoid recursive calls to `poll_next`
<small><em>tag</em>: <code>step-11</code></small>
* Place a `loop` around all logic
* Break when `Poll` need to be returned
* Just keep looping where a recursive call is needed
---
## A look on nightly
<small><em>tag</em>: <code>step-12</code></small>
* Use `type_alias_impl_trait`
* Set `channel` in `rust-toolchain`
* `type RequestFuture<T> = impl Future`
* Add `MyStreamProj::set_request`
* Adjust minor thing
---
## Rust in 2027 (or even later)
Stop! Demo time!
---
## So long, and thanks for all the turbofish