owned this note changed a year ago
Published Linked with GitHub

A discussion prototype for AsyncIterator

// This is a demonstration of how we might implement `AsyncIterator`.
// We cover the follow topics in the code and comments that follow:
//
//   - How we can allow users to implement an AFIT `async fn next`
//     equivalent while converting that automatically to an
//     `AsyncIterator` type with `IntoAsynciterator`.
//
//   - How, for `AsyncIterator` trait objects, we can allow calls to
//     the `next()` method with only a single indirect call.
//
//   - How we need to handle the backpressure / liveness problem in
//     our solution to `AsyncIterator`.
//
//   - Why it's important that we can name the future returned by a
//     method for which we might otherwise use AFIT.  This has bearing
//     on the possible need for RTN.  This is larger than the `Send`
//     bound problem, but it relates to that as well.
//
//   - How the presence of stable `async gen` blocks may affect our
//     thinking on this design.
//
//   - How we can preserve cancellation safety for the future returned
//     by the `next()` method while still allowing the future returned
//     by `next_item()` to have its own non-trivial state.  The former
//     is important since the current ecosystem assumes that futures
//     returned by `next()` are cancellation safe, while the latter is
//     important to avoid an easy mistake to make while implementing
//     the convenient AFIT-like trait.
//
//   - How, even from a `dyn AsyncIterator`, we can return concrete
//     futures using an `async` block in the implementation.
//
//   - Whether to separately indicate `Empty` and `Done` states to
//     support use cases such as `FuturesUnordered` (which currently
//     violate the "done" contract).
//
//   - Whether to use a "pun" return type such as `Poll<Option<T>>` or
//     to use a dedicated enum in the manner of `CoroutineState`.
//
//   - Whether and how to flatten carried effects into the type of the
//     value returned from `poll_next`.
//
//   - Whether, since `?` can be used outside of `try` blocks, we
//     should compose the `Try` carried effect even in normal
//     (non-`try`) async iterators.
//
//   - Whether `poll_progress` should indicate that a value is ready
//     to be yielded, that no further work can be done until
//     `poll_next` is called, or both.
//
//   - Whether `poll_progress` should indicated the done state, if it
//     knows about, or whether it should force the caller to make a
//     call to `poll_next`.
//
// Note that everything here (aside from the buffered streams demo
// below) works with `no_std` without any allocation or type erasure.
//#![cfg_attr(not(test), no_std)]
#![feature(const_waker)]
#![feature(impl_trait_in_assoc_type)]
#![allow(async_fn_in_trait)]
#![allow(clippy::redundant_pattern_matching)]

use core::{
    future::Future,
    hint::unreachable_unchecked,
    marker::PhantomData,
    pin::Pin,
    ptr::addr_of_mut,
    task::{Context, Poll},
};

// This is an enum that will be returned by `poll_next`.
//
// We could conceivably define `AsyncIterator::poll_next` to return
// values of type `Poll<Option<T>>`, however, for demonstration and to
// explore some of the open questions, we'll use a dedicated enum for
// this for clarity.
//
// There may of course be value in doing it this way "for real".  In
// the some way that we have a dedicated `CoroutineState` rather than
// reusing `ControlFlow`, and in the same way that we have
// `ControlFlow` rather than reusing `Result`, it could be sensible to
// have separate enum rather than punning here.
//
// As we stack more carried effects, the potential value of having
// separate enums becomes more clear.  If this were a
// `TryAsyncIterator`, then we would simply add a `Residual` variant
// to this enum rather than having to worry with how to order the
// composition of the carried effects.
//
// In fact, given that `?` can be used outside of `try` blocks in
// Rust, we should consider adding a `Residual` variant here anyway.
//
// One open design question here is whether to have a separate `Empty`
// variant.  Important stream types such as `FuturesUnordered`
// currently abuse the done state to signal that they are out of items
// temporarily but may yield more later.  We seem to have these
// options:
//
//   1. Declare that this is a supported use of the done state and
//   that an `AsyncIterator` that returns done may later yield values.
//
//   2. Declare that this is a violation of the contract and that
//   `FuturesUnordered` should instead either returning `Pending` when
//   it is empty (and we should look into the use cases and reasons
//   why it does not do that) or return `Yielded(Option<T>)`.
//
//   3. Provide a separate way to signal empty (and not pending)
//   versus done.
//
// The downside of option 2 is that this would de facto become a
// separate interface to async iterators that is not represented in
// the interface itself, and that could make it more cumbersome for
// generic code to handle this.
//
// On the other hand, having a separate state for this clutters up the
// trait for all users.
#[derive(
    Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum NextState<T> {
    /// A value has been yielded.
    Yielded(T),
    /// No further values will ever be yielded.
    Done,
    /// No further values will be yielded until some action by the
    /// caller is taken.
    Empty,
    /// No value is yet ready.
    Pending,
}

// This is an enum that will be returned by `poll_progress`.
//
// Conceivably `poll_progress` could return `Poll<()>` where returning
// `Ready(())` indicates that nothing more can be done until
// `poll_next` is called.
//
// However, in implementation experience, we found that didn't express
// enough, which resulted in duplicating substantial code between
// `poll_progress` and `poll_next` implementations.  When
// implementing, what we want is to be able to call `poll_progress`
// from `poll_next`.  However, for that to be useful, we need for
// `poll_progress` to return a signal that the next value is actually
// ready to be consumed.
//
// Not all async iterators will be able to implement a `poll_progress`
// that promises that.  This leaves us with three options:
//
//   1. Indicate only when `poll_progress` can do no more work until
//   `poll_next` is called.  This results in duplicate code between
//   `poll_progress` and `poll_next`.
//
//   2. Indicate only when `poll_next` is ready to yield a value.
//   This results in some async iterators always returning `Pending`
//   from `poll_progress` which means that callers may invoke
//   `poll_pending` more often than needed.
//
//   3. Indicate both states.
//
// We've take option 3 below, and have labeled these states `Paused`
// and `ReadyToYield`.
//
// A separate design point is the question of whether, if
// `poll_progress` knows that the async iterator is finished, whether
// it should indicate that itself or whether it should indicated
// `Paused` and force the caller to call `poll_next` to receive the
// `Done` or `Empty` indication.
#[derive(
    Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum ProgressState {
    /// A value is ready to be yielded; the next call to `poll_next`
    /// is guaranteed to produce a `Yielded(_)` value.
    ReadyToYield,
    /// No further progress can be made until `poll_next` is called;
    /// it's safe to elide calls to `poll_progress` until after the
    /// next call to `poll_next`.
    Paused,
    /// No further values will ever be yielded.
    Done,
    /// No further values will be yielded until some action by the
    /// caller is taken.
    Empty,
    /// No value is yet ready.
    Pending,
}

// Let's say that the primary `AsyncIterator` trait in the language
// uses `poll_next`.  All language features such as `async gen` are
// defined in terms of this.
pub trait AsyncIterator {
    type Item;
    // While we're talking about `AsyncIterator`, note that we also
    // need to solve the backpressure / liveness problem (also known
    // as the "buffered streams" problem) in some way.  We've
    // discussed `poll_progress` as one solution for this.
    fn poll_progress(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> ProgressState {
        ProgressState::Paused
    }
    // The `for await` syntax would use this method.
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item>;
}

// For convenience, we define an extension trait so as to provide a
// convenient `next` method to callers.  This trait cannot be
// implemented other than by the blanket impl, and users would be
// discouraged from using it in bounds, similar to (but perhaps
// stronger than) how we discourage using `From` in bounds.
pub trait AsyncFnNext: AsyncIterator {
    // Note that the returned future here is cancellation safe.  That
    // is, it's always safe to drop this future and no items or other
    // state will be lost.
    //
    // This is in contrast to an approach where `async fn next` is
    // implemented directly.  In that world, callers must assume that
    // the future returned by `next` may hold non-trivial state and
    // may not be cancellation safe.
    type Future<'s>: Future<
        Output = Option<<Self as AsyncIterator>::Item>,
    >
    where
        Self: 's;
    fn next(self: Pin<&mut Self>) -> Self::Future<'_>;
}

// In this blanket impl, for a trait object, `T` here is going to be
// `dyn AsyncIterator`.  This makes the call to `next` a
// statically-dispatched *direct* call.
//
// This is the optimal implementation of a `next()` method as, for a
// `dyn AsyncIterator`, calling this and polling the resulting future
// requires only one indirect call.  All other known approaches,
// including approaches where implementors would implement `async fn
// next` directly, require two indirect calls.
impl<T: AsyncIterator + ?Sized> AsyncFnNext for T {
    // We name this associated type rather than using AFIT so that
    // downstream code can use this name to set bounds or to store
    // this future in an ADT.
    type Future<'s> =
        impl Future<Output = Option<<Self as AsyncIterator>::Item>>
    where
        Self: 's;
    fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> {
        core::future::poll_fn(move |cx| {
            // This call to `poll_next` is the only indirect call when
            // the `Self` type is a trait object.
            match self.as_mut().poll_next(cx) {
                NextState::Yielded(x) => Poll::Ready(Some(x)),
                NextState::Done => Poll::Ready(None),
                NextState::Empty => Poll::Ready(None),
                NextState::Pending => Poll::Pending,
            }
        })
    }
}

// We have an `IntoAsyncIterator` trait, just as we have an
// `IntoFuture` trait.  This is called on the expression passed to
// `for await` before we pin the result and begin to poll/iterate it.
//
// Note that if we had `async gen` blocks in the language, those could
// be used to implement this directly to much the same effect as we
// achieve with the `AsyncFnNextiterator` below.
pub trait IntoAsyncIterator<S> {
    type Item;
    type IntoAsyncIterator: AsyncIterator<Item = Self::Item>;
    fn into_async_iterator(self) -> Self::IntoAsyncIterator;
}

// Suppose that some users want to implement asynchronous iterators by
// using `async` blocks rather than by implementing a
// `AsyncIterator::poll_next` manually.  We provide this trait to
// allow for that.  Any time that implements this trait automatically
// implements `IntoAsyncIterator`.
//
// Jump down to the tests below to see how this is used.
pub trait AsyncFnNextIterator {
    type Item;
    // We give a name to the returned future rather than using AFIT
    // for two reasons:
    //
    // 1. Since the future can be named, callers can add bounds on
    // this future, which allows use sites to set bounds for it.  That
    // solves, among other things, the `Send` bound problem.
    //
    // 2. Since the future can be named, we can store it unboxed when
    // converting the `Self` into an `AsyncIterator`.
    //
    // Note that to match the AFIT version, we need this to be a GAT
    // so that the returned future can capture a reference to the
    // `Self` type.
    type NextFuture<'s>: Future<Output = Option<Self::Item>>
    where
        Self: 's;
    fn next_item(&mut self) -> Self::NextFuture<'_>;
}

// This struct will wrap our `AsyncFnNextIterator` and hold onto the
// previous future that it has returned.  This type is
// self-referential because the returned future may hold a reference
// to data in the async iterator.
pub struct ConvertedAsyncIterator<'s, I>
where
    I: AsyncFnNextIterator,
    Self: 's,
{
    iter: I,
    // Note that to make this work, it's critical that we're able to
    // name `NextFuture`, as otherwise we could not store it unboxed
    // in this struct.
    fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>,
    _p: PhantomData<&'s ()>,
}

// This blanket impl makes all types that implement
// `AsyncFnNextIterator` into types that implement
// `IntoAsynciterator`.
impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I>
where
    I: AsyncFnNextIterator,
    Self: 's,
{
    type Item = <I as AsyncFnNextIterator>::Item;
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        // SAFETY: Writing the following in terms of raw pointers
        // raises fewer opsem questions than doing it with references,
        // so we wrap the entire body in `unsafe` to make this
        // readable.
        unsafe {
            // SAFETY: This is for the pin projections below.
            let this = addr_of_mut!(*self.get_unchecked_mut());
            if let None = (*this).fut {
                // SAFETY: We're doing lifetime extension here.  The
                // fact that the `Self` type is pinned is what makes
                // this sound.
                let fut: I::NextFuture<'s> = (*this).iter.next_item();
                (*this).fut = Some(fut);
            }
            let Some(ref mut fut) = (*this).fut else {
                // SAFETY: We just ensured this must be `Some(_)`.
                unreachable_unchecked()
            };
            // SAFETY: We uphold the pin contract for this value.
            let pfut = Pin::new_unchecked(fut);
            match pfut.poll(cx) {
                Poll::Ready(x) => {
                    (*this).fut = None;
                    match x {
                        Some(x) => NextState::Yielded(x),
                        None => NextState::Done,
                    }
                }
                Poll::Pending => NextState::Pending,
            }
        }
    }
}

// All `AsyncFnNextIterator` types implement `IntoAsynciterator`.
impl<'s, I> IntoAsyncIterator<&'s ()> for I
where
    I: AsyncFnNextIterator + 's,
    <I as AsyncFnNextIterator>::Item: 's,
{
    type Item = <I as AsyncFnNextIterator>::Item;
    type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>;
    fn into_async_iterator(self) -> Self::IntoAsyncIterator {
        ConvertedAsyncIterator {
            iter: self,
            fut: None,
            _p: PhantomData,
        }
    }
}

// ###################################################################
// Tests and demonstration of use.

#[cfg(test)]
mod always42_test {
    use super::*;
    use core::pin::pin;

    // Let's say a user comes along and wants to make the `Always42`
    // struct into an asynchronous iterator the "easy way".  So the
    // user writes this impl in terms of the AFIT-like version.
    struct Always42;
    impl AsyncFnNextIterator for Always42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_always42_poll() {
        let mut x = Always42.into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_always42_next() {
        let mut x = Always42.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// This demonstrates that we correctly handle state within the
// returned future.

#[cfg(test)]
mod alwaysonepending_test {
    use super::*;
    use core::pin::pin;

    struct AlwaysOnePending;
    impl AsyncFnNextIterator for AlwaysOnePending {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async {
                let mut once = false;
                core::future::poll_fn(move |_| match once {
                    true => Poll::Ready(Some(42)),
                    false => {
                        once = true;
                        Poll::Pending
                    }
                })
                .await
            }
        }
    }

    #[test]
    fn test_always_one_pending_poll() {
        let mut x = AlwaysOnePending.into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_always_one_pending_next() {
        let mut x = AlwaysOnePending.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// This demonstrates the two FSM situation.  We correctly handle the
// state both in the underlying `AsyncFnNextIterator` and the state in
// the returned future.

#[cfg(test)]
mod onceonepending_test {
    use super::*;
    use core::pin::pin;

    struct OnceOnePending(bool);
    impl AsyncFnNextIterator for OnceOnePending {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async {
                let mut once = false;
                core::future::poll_fn(move |_| match (self.0, once) {
                    (false, false) => {
                        once = true;
                        Poll::Pending
                    }
                    (false, true) => {
                        self.0 = true;
                        Poll::Ready(Some(42))
                    }
                    (true, _) => Poll::Ready(None),
                })
                .await
            }
        }
    }

    #[test]
    fn test_once_one_pending_poll() {
        let mut x = OnceOnePending(false).into_async_iterator();
        let mut x = pin!(x);
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Pending, next());
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Done, next());
        assert_eq!(NextState::Done, next());
    }

    #[test]
    fn test_once_one_pending_next() {
        let mut x = OnceOnePending(false).into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Pending, next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(None), next());
        assert_eq!(Poll::Ready(None), next());
    }
}

// Test that we can implement `AsyncFnNextIterator` both such that the
// returned future implements `Send` and such that it does not, and
// that callers can set bounds so as to rely on this.

#[cfg(test)]
mod sendalways42_test {
    use super::*;
    use core::pin::pin;

    struct SendAlways42;
    impl AsyncFnNextIterator for SendAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_sendalways42_next() {
        let x = SendAlways42.into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_spawn(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

#[cfg(test)]
mod localalways42_test {
    use super::*;
    use std::{future::ready, rc::Rc};

    struct LocalAlways42(Rc<()>);
    impl AsyncFnNextIterator for LocalAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            let x = self.0.clone();
            async move { Some((ready(()).await, *x, 42).2) }
        }
    }

    #[test]
    fn test_localalways42_next() {
        use core::pin::pin;
        let x = LocalAlways42(Rc::new(())).into_async_iterator();
        let mut x = pin!(x);
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// Test that we can implement `AsyncFnNextIterator` using an `async`
// block and a concrete future while using the resulting
// `AsyncIterator` with dynamic dispatch and type erasure.

#[cfg(test)]
mod dynalways42_test {
    use super::*;

    struct DynAlways42;
    impl AsyncFnNextIterator for DynAlways42 {
        type Item = u8;
        type NextFuture<'s> =
            impl Future<Output = Option<Self::Item>>;
        fn next_item(&mut self) -> Self::NextFuture<'_> {
            async { Some(42) }
        }
    }

    #[test]
    fn test_dynalways42_poll() {
        let x: &mut dyn AsyncIterator<Item = u8> =
            &mut DynAlways42.into_async_iterator();
        let mut x = unsafe { Pin::new_unchecked(x) };
        let mut cx = NOP_CONTEXT;
        let mut next = || x.as_mut().poll_next(&mut cx);
        assert_eq!(NextState::Yielded(42), next());
        assert_eq!(NextState::Yielded(42), next());
    }

    #[test]
    fn test_dynalways42_next() {
        let x: &mut dyn AsyncIterator<Item = u8> =
            &mut DynAlways42.into_async_iterator();
        let mut x = unsafe { Pin::new_unchecked(x) };
        let mut next = || poll_once_owned(x.as_mut().next());
        assert_eq!(Poll::Ready(Some(42)), next());
        assert_eq!(Poll::Ready(Some(42)), next());
    }
}

// ###################################################################
// The backpressure / liveness problem.

// This is a demonstration of why it may make sense to add
// `poll_progress` to `Future`.  Doing this allows for higher level
// constructs such as `FuturesUnordered` and buffered versions of it
// to be built in a more orthogonal way.

pub trait ProgressFuture: Future {
    fn poll_progress(
        self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<()> {
        Poll::Ready(())
    }
}

pub trait BufferedFuture: ProgressFuture {
    fn buffer(self) -> Buffered<Self>
    where
        Self: Sized;
}

pub enum Buffered<Fut>
where
    Fut: Future,
{
    Pending(Fut),
    Ready(Option<<Fut as Future>::Output>),
}

impl<Fut> Future for Buffered<Fut>
where
    Fut: Future,
{
    type Output = <Fut as Future>::Output;

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output> {
        let this = unsafe { self.get_unchecked_mut() };
        match this {
            Buffered::Pending(fut) => {
                let fut = unsafe { Pin::new_unchecked(fut) };
                fut.poll(cx)
            }
            Buffered::Ready(None) => Poll::Pending,
            Buffered::Ready(x) => Poll::Ready(x.take().unwrap()),
        }
    }
}

impl<Fut> ProgressFuture for Buffered<Fut>
where
    Fut: Future,
{
    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<()> {
        let this = unsafe { self.get_unchecked_mut() };
        let Buffered::Pending(fut) = this else {
            return Poll::Ready(());
        };
        let fut = unsafe { Pin::new_unchecked(fut) };
        match fut.poll(cx) {
            Poll::Ready(x) => {
                *this = Buffered::Ready(Some(x));
                Poll::Ready(())
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

impl<Fut> BufferedFuture for Buffered<Fut>
where
    Fut: Future,
{
    fn buffer(self) -> Buffered<Self>
    where
        Self: Sized,
    {
        Buffered::Pending(self)
    }
}

// ###################################################################
// `FuturesUnordered` demonstration.

extern crate alloc;
use alloc::collections::VecDeque;

// This is a highly simplified and deoptimized version.
pub struct FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    queue: VecDeque<Fut>,
}

impl<Fut> FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    pub fn len(&self) -> usize {
        self.queue.len()
    }

    pub fn push(&mut self, f: Fut) {
        self.queue.push_back(f);
    }
}

impl<Fut> AsyncIterator for FuturesUnordered<Fut>
where
    Fut: ProgressFuture,
{
    type Item = <Fut as Future>::Output;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        let this = unsafe { self.get_unchecked_mut() };
        let (mut i, len) = (0usize, this.queue.len());
        while let Some(mut fut) = this.queue.pop_front() {
            {
                let fut = unsafe { Pin::new_unchecked(&mut fut) };
                match fut.poll(cx) {
                    Poll::Pending => {}
                    Poll::Ready(x) => return NextState::Yielded(x),
                }
            }
            this.queue.push_back(fut);
            i += 1;
            let false = i == len else { break };
        }
        NextState::Pending
    }

    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> ProgressState {
        let this = unsafe { self.get_unchecked_mut() };
        let mut state = ProgressState::ReadyToYield;
        for fut in this.queue.iter_mut() {
            let fut = unsafe { Pin::new_unchecked(fut) };
            if let Poll::Pending = fut.poll_progress(cx) {
                state = ProgressState::Pending;
            }
        }
        state
    }
}

pub struct BufferUnordered<St>
where
    St: AsyncIterator,
    <St as AsyncIterator>::Item: ProgressFuture,
{
    stream: St,
    queue: FuturesUnordered<<St as AsyncIterator>::Item>,
    max: usize,
}

impl<St> AsyncIterator for BufferUnordered<St>
where
    St: AsyncIterator,
    <St as AsyncIterator>::Item: ProgressFuture,
{
    type Item = <<St as AsyncIterator>::Item as Future>::Output;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> NextState<Self::Item> {
        let this = unsafe { self.get_unchecked_mut() };
        let mut empty = false;
        while this.queue.len() < this.max {
            let stream =
                unsafe { Pin::new_unchecked(&mut this.stream) };
            match stream.poll_next(cx) {
                NextState::Yielded(fut) => {
                    this.queue.push(fut);
                }
                NextState::Empty => {
                    empty = true;
                    break;
                }
                _ => break,
            }
        }
        let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
        match queue.poll_next(cx) {
            NextState::Yielded(x) => NextState::Yielded(x),
            NextState::Empty if empty => NextState::Empty,
            _ => NextState::Pending,
        }
    }

    fn poll_progress(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> ProgressState {
        let this = unsafe { self.get_unchecked_mut() };
        let mut ready = false;
        if this.queue.len() < this.max {
            while this.queue.len() < this.max {
                let stream =
                    unsafe { Pin::new_unchecked(&mut this.stream) };
                match stream.poll_next(cx) {
                    NextState::Yielded(fut) => {
                        this.queue.push(fut);
                    }
                    NextState::Empty => {
                        ready = true;
                        break;
                    }
                    _ => break,
                }
            }
        } else {
            let stream =
                unsafe { Pin::new_unchecked(&mut this.stream) };
            match stream.poll_progress(cx) {
                ProgressState::ReadyToYield => ready = true,
                _ => {}
            }
        }
        let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
        match queue.poll_progress(cx) {
            ProgressState::ReadyToYield if ready => {
                ProgressState::ReadyToYield
            }
            _ => ProgressState::Pending,
        }
    }
}

// ###################################################################
// Supporting code.

pub use nop_waker::*;
mod nop_waker {
    use core::{
        future::Future,
        pin::{pin, Pin},
        task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
    };
    pub const NOP_RAWWAKER: RawWaker = {
        fn nop(_: *const ()) {}
        const VTAB: RawWakerVTable =
            RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop);
        RawWaker::new(&() as *const (), &VTAB)
    };
    pub const NOP_WAKER: &Waker =
        &unsafe { Waker::from_raw(NOP_RAWWAKER) };
    pub const NOP_CONTEXT: Context<'static> =
        Context::from_waker(NOP_WAKER);
    pub fn poll_once<T, F>(f: &mut F) -> Poll<T>
    where
        F: Future<Output = T> + ?Sized + Unpin,
    {
        let mut cx = NOP_CONTEXT;
        Pin::new(f).as_mut().poll(&mut cx)
    }
    pub fn poll_once_owned<T, F>(f: F) -> Poll<T>
    where
        F: Future<Output = T>,
    {
        poll_once(&mut pin!(f))
    }
    #[cfg(test)]
    pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T>
    where
        F: Future<Output = T> + Send,
    {
        std::thread::scope(move |s| {
            s.spawn(move || poll_once_owned(f)).join().unwrap()
        })
    }
}

Discussion

Attendance

  • People: TC, tmandry, eholk, Daria, CE, Vincenzo, Yosh (20 mins late)

Meeting roles

  • Minutes, driver: TC

AsyncIterator -> futures::Stream backport

Daria: There definitelly should be a way to transform AsyncIterator into futures::Stream (and also tokio's Stream). What downsides of this are there?

poll_progress

(In response to a question)

TC: poll_progress is about maintaining liveness. Keeping the plates spinning. But you need to maintain backpressure. poll_progress allows you to maintain liveness without having unlimited buffering by separating the liveness concern from having to handle the output.

Select a repo