owned this note changed a year ago
Published Linked with GitHub

AsyncIterator discussion prototype

// See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.60AsyncIterator.60.20rename.20meeting.202024-01-22.3F
// See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/async.20iterator.20combinators.20using.20new.20async.20functionality
//
// Author: TC
//
// This is a demonstration of how we might implement `AsyncIterator`.
// We cover the follow topics in the code and comments that follow:
//
//   - How we can allow users to implement an AFIT `async fn next`
//     equivalent while converting that automatically to an
//     `AsyncIterator` type with `IntoAsynciterator`.
//
//   - How, for `AsyncIterator` trait objects, we can allow calls to
//     the `next()` method with only a single indirect call.
//
//   - How we need to handle the backpressure / liveness problem in
//     our solution to `AsyncIterator`.
//
//   - Why it's important that we can name the future returned by a
//     method for which we might otherwise use AFIT.  This has bearing
//     on the possible need for RTN.  This is larger than the `Send`
//     bound problem, but it relates to that as well.
//
//   - How the presence of stable `async gen` blocks may affect our
//     thinking on this design.
//
//   - How we can preserve cancellation safety for the future returned
//     by the `next()` method while still allowing the future returned
//     by `next_item()` to have its own non-trivial state.  The former
//     is important since the current ecosystem assumes that futures
//     returned by `next()` are cancellation safe, while the latter is
//     important to avoid an easy mistake to make while implementing
//     the convenient AFIT-like trait.
//
//   - How, even from a `dyn AsyncIterator`, we can return concrete
//     futures using an `async` block in the implementation.
//
//   - Whether to separately indicate `Empty` and `Done` states to
//     support use cases such as `FuturesUnordered` (which currently
//     violate the "done" contract).
//
//   - Whether to use a "pun" return type such as `Poll<Option<T>>` or
//     to use a dedicated enum in the manner of `CoroutineState`.
//
//   - Whether and how to flatten carried effects into the type of the
//     value returned from `poll_next`.
//
//   - Whether, since `?` can be used outside of `try` blocks, we
//     should compose the `Try` carried effect even in normal
//     (non-`try`) async iterators.
//
//   - Whether `poll_progress` should indicate that a value is ready
//     to be yielded, that no further work can be done until
//     `poll_next` is called, or both.
//
//   - Whether `poll_progress` should indicated the done state, if it
//     knows about, or whether it should force the caller to make a
//     call to `poll_next`.
//
// The comments below are written to tell a story and to be read even
// if you do not read the code.
//
// Note that everything here works with `no_std` without any
// allocation or type erasure.
//#![cfg_attr(not(test), no_std)]
#![feature(const_waker)]
#![feature(impl_trait_in_assoc_type)]
#![allow(async_fn_in_trait)]
#![allow(clippy::redundant_pattern_matching)]

use core::{
    future::Future,
    hint::unreachable_unchecked,
    marker::PhantomData,
    pin::Pin,
    ptr::addr_of_mut,
    task::{Context, Poll},
};

// This is an enum that will be returned by `poll_next`.
//
// We could conceivably define `AsyncIterator::poll_next` to return
// values of type `Poll<Option<T>>`, however, for demonstration and to
// explore some of the open questions, we'll use a dedicated enum for
// this for clarity.
//
// There may of course be value in doing it this way "for real".  In
// the some way that we have a dedicated `CoroutineState` rather than
// reusing `ControlFlow`, and in the same way that we have
// `ControlFlow` rather than reusing `Result`, it could be sensible to
// have separate enum rather than punning here.
//
// As we stack more carried effects, the potential value of having
// separate enums becomes more clear.  If this were a
// `TryAsyncIterator`, then we would simply add a `Residual` variant
// to this enum rather than having to worry with how to order the
// composition of the carried effects.
//
// In fact, given that `?` can be used outside of `try` blocks in
// Rust, we should consider adding a `Residual` variant here anyway.
//
// One open design question here is whether to have a separate `Empty`
// variant.  Important stream types such as `FuturesUnordered`
// currently abuse the done state to signal that they are out of items
// temporarily but may yield more later.  We seem to have these
// options:
//
//   1. Declare that this is a supported use of the done state and
//   that an `AsyncIterator` that returns done may later yield values.
//
//   2. Declare that this is a violation of the contract and that
//   `FuturesUnordered` should instead either returning `Pending` when
//   it is empty (and we should look into the use cases and reasons
//   why it does not do that) or return `Yielded(Option<T>)`.
//
//   3. Provide a separate way to signal empty (and not pending)
//   versus done.
//
// The downside of option 2 is that this would de facto become a
// separate interface to async iterators that is not represented in
// the interface itself, and that could make it more cumbersome for
// generic code to handle this.
//
// On the other hand, having a separate state for this clutters up the
// trait for all users.
#[derive(
    Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum NextState<T> {
    /// A value has been yielded.
    Yielded(T),
    /// No further values will ever be yielded.
    Done,
    /// No further values will be yielded until some action by the
    /// caller is taken.
    Empty,
    /// No value is yet ready.
    Pending,
}

// This is an enum that will be returned by `poll_progress`.
//
// Conceivably `poll_progress` could return `Poll<()>` where returning
// `Ready(())` indicates that nothing more can be done until
// `poll_next` is called.
//
// However, in implementation experience, we found that didn't express
// enough, which resulted in duplicating substantial code between
// `poll_progress` and `poll_next` implementations.  When
// implementing, what we want is to be able to call `poll_progress`
// from `poll_next`.  However, for that to be useful, we need for
// `poll_progress` to return a signal that the next value is actually
// ready to be consumed.
//
// Not all async iterators will be able to implement a `poll_progress`
// that promises that.  This leaves us with three options:
//
//   1. Indicate only when `poll_progress` can do no more work until
//   `poll_next` is called.  This results in duplicate code between
//   `poll_progress` and `poll_next`.
//
//   2. Indicate only when `poll_next` is ready to yield a value.
//   This results in some async iterators always returning `Pending`
//   from `poll_progress` which means that callers may invoke
//   `poll_pending` more often than needed.
//
//   3. Indicate both states.
//
// We've take option 3 below, and have labeled these states `Paused`
// and `ReadyToYield`.
//
// A separate design point is the question of whether, if
// `poll_progress` knows that the async iterator is finished, whether
// it should indicate that itself or whether it should indicated
// `Paused` and force the caller to call `poll_next` to receive the
// `Done` or `Empty` indication.
#[derive(
    Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum ProgressState {
    /// No further progress can be made until `poll_next` is called;
    /// it's safe to elide calls to `poll_progress` until after the
    /// next call to `poll_next`.
    Paused,
    /// Further progress is possible by calling `poll_progress`.
    Pending,
}

// Let's say that the primary `AsyncIterator` trait in the language
// uses `poll_next`.  All language features such as `async gen` are
// defined in terms of this.
pub trait AsyncIterator {
    type Item;
    // While we're talking about `AsyncIterator`, note that we also
    // need to solve the backpressure / liveness problem (also known
    // as the "buffered streams" problem) in some way.  We've
    // discussed `poll_progress` as one solution for this.
    fn poll_progress(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> ProgressState {
        ProgressState::Paused
    }
    // The `for await` syntax would use this method.
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item>;
}

// For convenience, we define an extension trait so as to provide a
// convenient `next` method to callers.  This trait cannot be
// implemented other than by the blanket impl, and users would be
// discouraged from using it in bounds, similar to (but perhaps
// stronger than) how we discourage using `From` in bounds.
pub trait AsyncFnNext: AsyncIterator {
    // Note that the returned future here is cancellation safe.  That
    // is, it's always safe to drop this future and no items or other
    // state will be lost.
    //
    // This is in contrast to an approach where `async fn next` is
    // implemented directly.  In that world, callers must assume that
    // the future returned by `next` may hold non-trivial state and
    // may not be cancellation safe.
    type Future<'s>: Future<
        Output = Option<<Self as AsyncIterator>::Item>,
    >
    where
        Self: 's;
    fn next(self: Pin<&mut Self>) -> Self::Future<'_>;
}

// In this blanket impl, for a trait object, `T` here is going to be
// `dyn AsyncIterator`.  This makes the call to `next` a
// statically-dispatched *direct* call.
//
// This is the optimal implementation of a `next()` method as, for a
// `dyn AsyncIterator`, calling this and polling the resulting future
// requires only one indirect call.  All other known approaches,
// including approaches where implementors would implement `async fn
// next` directly, require two indirect calls.
impl<T: AsyncIterator + ?Sized> AsyncFnNext for T {
    // We name this associated type rather than using AFIT so that
    // downstream code can use this name to set bounds or to store
    // this future in an ADT.
    type Future<'s> =
        impl Future<Output = Option<<Self as AsyncIterator>::Item>>
    where
        Self: 's;
    fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> {
        core::future::poll_fn(move |cx| {
            // This call to `poll_next` is the only indirect call when
            // the `Self` type is a trait object.
            match self.as_mut().poll_next(cx) {
                NextState::Yielded(x) => Poll::Ready(Some(x)),
                NextState::Done => Poll::Ready(None),
                NextState::Empty => Poll::Ready(None),
                NextState::Pending => Poll::Pending,
            }
        })
    }
}

// We have an `IntoAsyncIterator` trait, just as we have an
// `IntoFuture` trait.  This is called on the expression passed to
// `for await` before we pin the result and begin to poll/iterate it.
//
// Note that if we had `async gen` blocks in the language, those could
// be used to implement this directly to much the same effect as we
// achieve with the `AsyncFnNextiterator` below.
//
// Note that this trait has a dummy generic type parameter.  We need
// this so as to be able to satisfy coherence checking later on when
// we need to name our simulated existential lifetime.
pub trait IntoAsyncIterator<S> {
    type Item;
    type IntoAsyncIterator: AsyncIterator<Item = Self::Item>;
    fn into_async_iterator(self) -> Self::IntoAsyncIterator;
}

// Suppose that some users want to implement asynchronous iterators by
// using `async` blocks rather than by implementing a
// `AsyncIterator::poll_next` manually.  We provide this trait to
// allow for that.  Any time that implements this trait automatically
// implements `IntoAsyncIterator`.
//
// Jump down to the tests below to see how this is used.
pub trait AsyncFnNextIterator {
    type Item;
    // We give a name to the returned future rather than using AFIT
    // for two reasons:
    //
    // 1. Since the future can be named, callers can add bounds on
    // this future, which allows use sites to set bounds for it.  That
    // solves, among other things, the `Send` bound problem.
    //
    // 2. Since the future can be named, we can store it unboxed when
    // converting the `Self` into an `AsyncIterator`.
    //
    // Note that to match the AFIT version, we need this to be a GAT
    // so that the returned future can capture a reference to the
    // `Self` type.
    type NextFuture<'s>: Future<Output = Option<Self::Item>>
    where
        Self: 's;
    fn next_item(&mut self) -> Self::NextFuture<'_>;
}

// This struct will wrap our `AsyncFnNextIterator` and hold onto the
// previous future that it has returned.  This type is
// self-referential because the returned future may hold a reference
// to data in the async iterator.
pub struct ConvertedAsyncIterator<'s, I>
where
    I: AsyncFnNextIterator,
    Self: 's,
{
    iter: I,
    // Note that to make this work, it's critical that we're able to
    // name `NextFuture`, as otherwise we could not store it unboxed
    // in this struct.
    //
    // What we actually want here are existential lifetimes so that we
    // can express the type:
    //
    //     exists<'s> Option<<I as AsyncFnNextIterator>::NextFuture<'s>>
    //
    // That is, we need to name *some* lifetime that is shorter than
    // the lifetime of `I` and of `Self`.  We don't have a way to
    // express this in Rust currently other than to add this as a
    // generic lifetime parameter.  This forces us to add a dummy
    // generic parameter to `IntoAsynciterator` so that we can satify
    // coherence checking.
    fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>,
    _p: PhantomData<&'s ()>,
}

// This blanket impl makes all types that implement
// `AsyncFnNextIterator` into types that implement
// `IntoAsynciterator`.
impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I>
where
    I: AsyncFnNextIterator,
    Self: 's,
{
    type Item = <I as AsyncFnNextIterator>::Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        // SAFETY: Writing the following in terms of raw pointers
        // raises fewer opsem questions than doing it with references,
        // so we wrap the entire body in `unsafe` to make this
        // readable.
        unsafe {
            // SAFETY: This is for the pin projections below.
            let this = addr_of_mut!(*self.get_unchecked_mut());
            if let None = (*this).fut {
                // SAFETY: We're doing lifetime extension here.  The
                // fact that we have a pinned mutable reference to
                // `Self` is what makes this sound.
                //
                // This is a simulation of existential lifetimes.
                let fut: I::NextFuture<'s> = (*this).iter.next_item();
                (*this).fut = Some(fut);
            }
            let Some(ref mut fut) = (*this).fut else {
                // SAFETY: We just ensured this must be `Some(_)`.
                unreachable_unchecked()
            };
            // SAFETY: We uphold the pin contract for this value.
            let pfut = Pin::new_unchecked(fut);
            match pfut.poll(cx) {
                Poll::Ready(x) => {
                    (*this).fut = None;
                    match x {
                        Some(x) => NextState::Yielded(x),
                        None => NextState::Done,
                    }
                }
                Poll::Pending => NextState::Pending,
            }
        }
    }
}

// All `AsyncFnNextIterator` types implement `IntoAsynciterator`.
//
// Note that this impl is why we need the dummy generic type parameter
// on `IntoAsynciterator`.  Without it, the `'s` lifetime would be
// unconstrained and we would fail coherence checking, and without
// being able to name this lifetime in the impl, we would not be able
// to name the type of the `ConvertedAsynciterator`.
impl<'s, I> IntoAsyncIterator<&'s ()> for I
where
    I: AsyncFnNextIterator + 's,
    <I as AsyncFnNextIterator>::Item: 's,
{
    type Item = <I as AsyncFnNextIterator>::Item;
    type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>;
    fn into_async_iterator(self) -> Self::IntoAsyncIterator {
        ConvertedAsyncIterator {
            iter: self,
            fut: None,
            _p: PhantomData,
        }
    }
}

// ###################################################################
// Tests and demonstration of use.

#[cfg(test)]
mod always42_test {
    use super::*;
    use core::pin::pin;

    // Let's say a user comes along and wants to make the `Always42`
    // struct into an asynchronous iterator the "easy way".  So the
    // user writes this impl in terms of the AFIT-like version.
    struct Always42;
    impl AsyncFnNextIterator for Always42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_always42_poll() {
        let mut x = Always42.into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_always42_next() {
        let mut x = Always42.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// By compiling, this demonstrates that our simulation of existential
// lifetimes works correctly and that we're not requiring that the
// underlying `AsyncFnNextIterator` outlive `'static`.

#[cfg(test)]
mod borrowclone_test {
    use super::*;
    use core::pin::pin;

    struct BorrowClone<T: Clone>(T);
    impl<T: Clone> AsyncFnNextIterator for BorrowClone<T> {
        type Item = T;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>
        where
            T: 's;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            let x = &self.0;
            async move { Some(x.clone()) }
        }
    }

    #[test]
    fn test_borrowclone_poll() {
        let mut x = BorrowClone(42u8).into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_borrowclone_next() {
        let mut x = BorrowClone(42u8).into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// This demonstrates that we correctly handle state within the
// returned future.

#[cfg(test)]
mod alwaysonepending_test {
    use super::*;
    use core::pin::pin;

    struct AlwaysOnePending;
    impl AsyncFnNextIterator for AlwaysOnePending {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async {
                let mut once = false;
                core::future::poll_fn(move |_| match once {
                    true => Poll::Ready(Some(42)),
                    false => {
                        once = true;
                        Poll::Pending
                    }
                })
                .await
            }
        }
    }

    #[test]
    fn test_always_one_pending_poll() {
        let mut x = AlwaysOnePending.into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_always_one_pending_next() {
        let mut x = AlwaysOnePending.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// This demonstrates the two FSM situation.  We correctly handle the
// state both in the underlying `AsyncFnNextIterator` and the state in
// the returned future.

#[cfg(test)]
mod onceonepending_test {
    use super::*;
    use core::pin::pin;

    struct OnceOnePending(bool);
    impl AsyncFnNextIterator for OnceOnePending {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async {
                let mut once = false;
                core::future::poll_fn(move |_| match (self.0, once) {
                    (false, false) => {
                        once = true;
                        Poll::Pending
                    }
                    (false, true) => {
                        self.0 = true;
                        Poll::Ready(Some(42))
                    }
                    (true, _) => Poll::Ready(None),
                })
                .await
            }
        }
    }

    #[test]
    fn test_once_one_pending_poll() {
        let mut x = OnceOnePending(false).into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Done, next());
        assert_eq!(NextState::Done, next());
    }

    #[test]
    fn test_once_one_pending_next() {
        let mut x = OnceOnePending(false).into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(None), next());
        assert_eq!(Poll::Ready(None), next());
    }
}

// Test that we can implement `AsyncFnNextIterator` both such that the
// returned future implements `Send` and such that it does not, and
// that callers can set bounds so as to rely on this.

#[cfg(test)]
mod sendalways42_test {
    use super::*;
    use core::pin::pin;

    struct SendAlways42;
    impl AsyncFnNextIterator for SendAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_sendalways42_next() {
        let x = SendAlways42.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_spawn(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

#[cfg(test)]
mod localalways42_test {
    use super::*;
    use std::{future::ready, rc::Rc};

    struct LocalAlways42(Rc<()>);
    impl AsyncFnNextIterator for LocalAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            let x = self.0.clone();
            async move { Some((ready(()).await, *x, 42).2) }
        }
    }

    #[test]
    fn test_localalways42_next() {
        use core::pin::pin;
        let x = LocalAlways42(Rc::new(())).into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// Test that we can implement `AsyncFnNextIterator` using an `async`
// block and a concrete future while using the resulting
// `AsyncIterator` with dynamic dispatch and type erasure.

#[cfg(test)]
mod dynalways42_test {
    use super::*;

    struct DynAlways42;
    impl AsyncFnNextIterator for DynAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_dynalways42_poll() {
        let x: &mut dyn AsyncIterator<Item = u8> =
            &mut DynAlways42.into_async_iterator();
        let mut x = unsafe { Pin::new_unchecked(x) };
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_dynalways42_next() {
        let x: &mut dyn AsyncIterator<Item = u8> =
            &mut DynAlways42.into_async_iterator();
        let mut x = unsafe { Pin::new_unchecked(x) };
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// ###################################################################
// The backpressure / liveness problem.

// This is a demonstration of why it may make sense to add
// `poll_progress` to `Future`.  Doing this allows for higher level
// constructs such as `FuturesUnordered` and buffered versions of it
// to be built in a more orthogonal way.

pub trait ProgressFuture: Future {
    fn poll_progress(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<()> {
        Poll::Ready(())
    }
}

pub trait BufferedFuture: ProgressFuture {
    fn buffer(self) -> Buffered<Self>
    where
        Self: Sized;
}

pub enum Buffered<Fut>
where
    Fut: Future,
{
    Pending(Fut),
    Ready(Option<<Fut as Future>::Output>),
}

impl<Fut> Future for Buffered<Fut>
where
    Fut: Future,
{
    type Output = <Fut as Future>::Output;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        let this = unsafe { self.get_unchecked_mut() };
        match this {
            Buffered::Pending(fut) => {
                let fut = unsafe { Pin::new_unchecked(fut) };
                fut.poll(cx)
            }
            Buffered::Ready(None) => Poll::Pending,
            Buffered::Ready(x) => Poll::Ready(x.take().unwrap()),
        }
    }
}

impl<Fut> ProgressFuture for Buffered<Fut>
where
    Fut: Future,
{
    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<()> {
        let this = unsafe { self.get_unchecked_mut() };
        let Buffered::Pending(fut) = this else {
            return Poll::Ready(());
        };
        let fut = unsafe { Pin::new_unchecked(fut) };
        match fut.poll(cx) {
            Poll::Ready(x) => {
                *this = Buffered::Ready(Some(x));
                Poll::Ready(())
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<Fut: ProgressFuture> BufferedFuture for Fut {
    fn buffer(self) -> Buffered<Self>
    where
        Self: Sized,
    {
        Buffered::Pending(self)
    }
}

// ###################################################################
// `FuturesUnordered` demonstration.

#[cfg(test)]
macro_rules! cfg_match {
  ($(cfg($cond:meta) => { $($then:tt)* })+
   $(_ => { $($else:tt)* })?
  ) => { cfg_match! {
    __xs (); $((($cond) ($($then)*)),)+ $((() ($($else)*)),)?
  }};
  (__id $($xs:tt)*) => { $($xs)* };
  (__xs ($($_:meta,)*);) => {};
  (__xs ($($ncond:meta,)*);
   (($($cond:meta)?) ($($body:tt)*)), $($rest:tt,)*
  ) => {
    #[cfg(all($($cond,)? not(any($($ncond),*))))]
    cfg_match! { __id $($body)* }
    cfg_match! { __xs ($($ncond,)* $($cond,)?); $($rest,)* }
  };
}

// This is a highly simplified and deoptimized version.
pub struct FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    queue: Queue<Fut, 1024>,
}

impl<Fut> FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    pub fn new() -> Self {
        Self { queue: Queue::new() }
    }

    pub fn len(&self) -> usize {
        self.queue.len()
    }

    pub fn push(&mut self, f: Fut) {
        self.queue.push(f);
    }
}

impl<Fut> AsyncIterator for FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    type Item = <Fut as Future>::Output;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        let this = unsafe { self.get_unchecked_mut() };
        let (mut i, len) = (0usize, this.queue.len());
        while let Some(mut fut) = this.queue.pop() {
            {
                let fut = unsafe { Pin::new_unchecked(&mut fut) };
                match fut.poll(cx) {
                    Poll::Pending => {}
                    Poll::Ready(x) => return NextState::Yielded(x),
                }
            }
            this.queue.push(fut);
            i += 1;
            let false = i == len else { break };
        }
        NextState::Pending
    }

    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> ProgressState {
        let this = unsafe { self.get_unchecked_mut() };
        let mut state = ProgressState::Paused;
        for fut in this.queue.iter_mut() {
            let fut = unsafe { Pin::new_unchecked(fut) };
            if let Poll::Pending = fut.poll_progress(cx) {
                state = ProgressState::Pending;
            }
        }
        state
    }
}

pub struct Bottleneck<St>
where
    St: AsyncIterator,
    <St as AsyncIterator>::Item: ProgressFuture,
{
    stream: St,
    queue: FuturesUnordered<<St as AsyncIterator>::Item>,
    max: usize,
}

impl<St> Bottleneck<St>
where
    St: AsyncIterator,
    <St as AsyncIterator>::Item: ProgressFuture,
{
    pub fn new(stream: St, max: usize) -> Self {
        Self { stream, queue: FuturesUnordered::new(), max }
    }

    fn make_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> (ProgressState, bool) {
        let this = unsafe { self.get_unchecked_mut() };
        let mut is_stream_done = false;
        if this.queue.len() < this.max {
            while this.queue.len() < this.max {
                let stream =
                    unsafe { Pin::new_unchecked(&mut this.stream) };
                match stream.poll_next(cx) {
                    NextState::Yielded(fut) => {
                        this.queue.push(fut);
                    }
                    NextState::Empty | NextState::Done => {
                        is_stream_done = true;
                        break;
                    }
                    NextState::Pending => break,
                }
            }
        } else {
            let stream =
                unsafe { Pin::new_unchecked(&mut this.stream) };
            match stream.poll_progress(cx) {
                ProgressState::Paused => is_stream_done = true,
                ProgressState::Pending => {}
            }
        }
        let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
        (queue.poll_progress(cx), is_stream_done)
    }
}

impl<St> AsyncIterator for Bottleneck<St>
where
    St: AsyncIterator,
    <St as AsyncIterator>::Item: ProgressFuture,
{
    type Item = <<St as AsyncIterator>::Item as Future>::Output;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        let (_, is_stream_done) = self.as_mut().make_progress(cx);
        let this = unsafe { self.get_unchecked_mut() };
        let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
        match queue.poll_next(cx) {
            NextState::Yielded(x) => NextState::Yielded(x),
            NextState::Empty if is_stream_done => NextState::Empty,
            NextState::Empty => NextState::Pending,
            NextState::Done => unreachable!(),
            NextState::Pending => NextState::Pending,
        }
    }

    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> ProgressState {
        match self.make_progress(cx) {
            (ProgressState::Paused, true) => ProgressState::Paused,
            (ProgressState::Paused, _) => ProgressState::Pending,
            (ProgressState::Pending, _) => ProgressState::Pending,
        }
    }
}

#[cfg(test)]
mod buffered_test {
    use super::*;
    use core::pin::pin;
    use core::sync::atomic::{AtomicU64, Ordering};

    struct DelayFuture<'s> {
        ticker: &'s AtomicU64,
        delay: u64,
        start: u64,
        last_tick: u64,
        missed: u64,
    }

    impl<'s> DelayFuture<'s> {
        pub fn new(ticker: &'s AtomicU64, delay: u64) -> Self {
            let tick = ticker.load(Ordering::Relaxed);
            Self {
                ticker,
                delay,
                start: tick,
                last_tick: tick,
                missed: 0,
            }
        }
    }

    impl<'s> Future for DelayFuture<'s> {
        type Output = (u64, u64);

        fn poll(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
        ) -> Poll<Self::Output> {
            let tick = self.ticker.load(Ordering::Relaxed);
            if tick > self.last_tick + 1 {
                self.missed += tick - (self.last_tick + 1);
                panic!("Missed a tick.");
            }
            self.last_tick = tick;
            let age = tick - self.start;
            if age >= self.delay {
                return Poll::Ready((age, self.missed));
            }
            Poll::Pending
        }
    }

    impl<'s> ProgressFuture for DelayFuture<'s> {
        fn poll_progress(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
        ) -> Poll<()> {
            let tick = self.ticker.load(Ordering::Relaxed);
            if tick > self.last_tick + 1 {
                self.missed += tick - (self.last_tick + 1);
                panic!("Missed a tick.");
            }
            self.last_tick = tick;
            if tick - self.start >= self.delay {
                return Poll::Ready(());
            }
            Poll::Pending
        }
    }

    struct DelayStream<'s> {
        ticker: &'s AtomicU64,
        max_delay: u64,
        rand: XorShift,
        last_tick: u64,
        missed: u64,
    }

    impl<'s> DelayStream<'s> {
        pub fn new(ticker: &'s AtomicU64, max_delay: u64) -> Self {
            let tick = ticker.load(Ordering::Relaxed);
            Self {
                ticker,
                max_delay,
                rand: XorShift::new(),
                last_tick: tick,
                missed: 0,
            }
        }
    }

    impl<'s> AsyncIterator for DelayStream<'s> {
        type Item = Buffered<DelayFuture<'s>>;

        fn poll_next(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
        ) -> NextState<Self::Item> {
            let tick = self.ticker.load(Ordering::Relaxed);
            if tick > self.last_tick + 1 {
                self.missed += tick - (self.last_tick + 1);
                panic!("Missed a tick.");
            }
            self.last_tick = tick;
            let delay = self.rand.rand() % self.max_delay;
            NextState::Yielded(
                DelayFuture::new(self.ticker, delay).buffer(),
            )
        }

        fn poll_progress(
            mut self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
        ) -> ProgressState {
            let tick = self.ticker.load(Ordering::Relaxed);
            if tick > self.last_tick + 1 {
                self.missed += tick - (self.last_tick + 1);
                panic!("Missed a tick.");
            }
            self.last_tick = tick;
            ProgressState::Paused
        }
    }

    #[test]
    fn buffered_test() {
        cfg_match! {
            cfg(miri) => {
                const N_ITERS: usize = 2usize.pow(7);
                const BUFSIZE: usize = 2usize.pow(2);
                const INV_RATE: usize = 10;
                const MAX_DELAY: u64 = 80;
            }
            _ => {
                const N_ITERS: usize = 2usize.pow(20);
                const BUFSIZE: usize = 2usize.pow(8);
                const INV_RATE: usize = 100;
                const MAX_DELAY: u64 = 800;
            }
        }
        let ticker = AtomicU64::new(0);
        let mut cx = NOP_CONTEXT;
        let ds = DelayStream::new(&ticker, MAX_DELAY);
        let bs = Bottleneck::new(ds, BUFSIZE);
        let mut bs = pin!(bs);
        let tick = || ticker.fetch_add(1, Ordering::Relaxed);
        let mut max_age = 0;
        for i in 0..N_ITERS {
            _ = tick();
            if i % INV_RATE == 0 {
                match bs.as_mut().poll_next(&mut cx) {
                    NextState::Yielded((age, missed)) => {
                        assert_eq!(0, missed);
                        if age > max_age {
                            assert!(age < MAX_DELAY);
                            max_age = age
                        }
                    }
                    NextState::Pending => {}
                    _ => unreachable!(),
                }
            } else {
                _ = bs.as_mut().poll_progress(&mut cx);
            }
        }
        dbg!(max_age);
    }
}

// ###################################################################
// Supporting code.

pub use nop_waker::*;
mod nop_waker {
    use core::{
        future::Future,
        pin::{pin, Pin},
        task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
    };
    pub const NOP_RAWWAKER: RawWaker = {
        fn nop(_: *const ()) {}
        const VTAB: RawWakerVTable =
            RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop);
        RawWaker::new(&() as *const (), &VTAB)
    };
    pub const NOP_WAKER: &Waker =
        &unsafe { Waker::from_raw(NOP_RAWWAKER) };
    pub const NOP_CONTEXT: Context<'static> =
        Context::from_waker(NOP_WAKER);
    pub fn poll_once<T, F>(f: &mut F) -> Poll<T>
    where
        F: Future<Output = T> + ?Sized + Unpin,
    {
        let mut cx = NOP_CONTEXT;
        Pin::new(f).as_mut().poll(&mut cx)
    }
    pub fn poll_once_owned<T, F>(f: F) -> Poll<T>
    where
        F: Future<Output = T>,
    {
        poll_once(&mut pin!(f))
    }
    #[cfg(test)]
    pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T>
    where
        F: Future<Output = T> + Send,
    {
        std::thread::scope(move |s| {
            s.spawn(move || poll_once_owned(f)).join().unwrap()
        })
    }
}

pub use circular_queue::*;
mod circular_queue {
    use core::mem::{transmute, MaybeUninit};

    pub struct Queue<T, const LEN: usize> {
        buf: [MaybeUninit<T>; LEN],
        idx: usize,
        len: usize,
    }

    impl<T: Sized, const LEN: usize> Queue<T, LEN> {
        pub fn new() -> Self {
            let buf: [MaybeUninit<T>; LEN] =
                unsafe { MaybeUninit::uninit().assume_init() };
            Self { buf, idx: 0, len: 0 }
        }

        pub fn len(&self) -> usize {
            self.len
        }

        pub fn push(&mut self, x: T) {
            assert!(self.len < LEN);
            let idx = (self.idx + self.len) % LEN;
            self.buf[idx] = MaybeUninit::new(x);
            self.len += 1;
        }

        #[must_use]
        pub fn pop(&mut self) -> Option<T> {
            let true = self.len > 0 else { return None };
            let v = unsafe { self.buf[self.idx].assume_init_read() };
            self.idx = (self.idx + 1) % LEN;
            self.len -= 1;
            Some(v)
        }

        pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
            struct IterMut<'a, T: 'a, const LEN: usize> {
                queue: &'a mut Queue<T, LEN>,
                idx: usize,
                len: usize,
            }
            impl<'a, T: 'a, const LEN: usize> Iterator for IterMut<'a, T, LEN> {
                type Item = &'a mut T;
                fn next(&mut self) -> Option<Self::Item> {
                    let true = self.len > 0 else { return None };
                    let v: &'a mut T = unsafe {
                        transmute(
                            self.queue.buf[self.idx]
                                .assume_init_mut(),
                        )
                    };
                    self.idx = (self.idx + 1) % LEN;
                    self.len -= 1;
                    Some(v)
                }
            }
            let (idx, len) = (self.idx, self.len);
            IterMut::<'_, _, LEN> { queue: self, idx, len }
        }
    }

    impl<T, const LEN: usize> Drop for Queue<T, LEN> {
        fn drop(&mut self) {
            while self.len() > 0 {
                _ = self.pop();
            }
        }
    }
}

pub use xorshift::*;
mod xorshift {
    pub struct XorShift {
        seed: u64,
    }

    impl XorShift {
        pub fn new() -> Self {
            Self { seed: 1 }
        }

        pub fn rand(&mut self) -> u64 {
            self.seed ^= self.seed << 23;
            self.seed ^= self.seed >> 13;
            self.seed ^= self.seed << 38;
            self.seed
        }
    }

    impl Iterator for XorShift {
        type Item = u64;
        fn next(&mut self) -> Option<Self::Item> {
            Some(self.rand())
        }
    }
}

Discussion

Attendance

  • People: TC, Oli
Select a repo