--- title: "ITE design meeting 2024-02-22: AsyncIterator" tags: ["impl-trait-everywhere", "design-meeting", "minutes"] date: 2024-02-22 discussion: https://rust-lang.zulipchat.com/#narrow/stream/315482-t-compiler.2Fetc.2Fopaque-types/topic/ITE.20triage.20meeting.202024-02-22 url: https://hackmd.io/Gk630R_SRYKPfmoYOOwFcw --- # AsyncIterator discussion prototype ```rust // See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.60AsyncIterator.60.20rename.20meeting.202024-01-22.3F // See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/async.20iterator.20combinators.20using.20new.20async.20functionality // // Author: TC // // This is a demonstration of how we might implement `AsyncIterator`. // We cover the follow topics in the code and comments that follow: // // - How we can allow users to implement an AFIT `async fn next` // equivalent while converting that automatically to an // `AsyncIterator` type with `IntoAsynciterator`. // // - How, for `AsyncIterator` trait objects, we can allow calls to // the `next()` method with only a single indirect call. // // - How we need to handle the backpressure / liveness problem in // our solution to `AsyncIterator`. // // - Why it's important that we can name the future returned by a // method for which we might otherwise use AFIT. This has bearing // on the possible need for RTN. This is larger than the `Send` // bound problem, but it relates to that as well. // // - How the presence of stable `async gen` blocks may affect our // thinking on this design. // // - How we can preserve cancellation safety for the future returned // by the `next()` method while still allowing the future returned // by `next_item()` to have its own non-trivial state. The former // is important since the current ecosystem assumes that futures // returned by `next()` are cancellation safe, while the latter is // important to avoid an easy mistake to make while implementing // the convenient AFIT-like trait. // // - How, even from a `dyn AsyncIterator`, we can return concrete // futures using an `async` block in the implementation. // // - Whether to separately indicate `Empty` and `Done` states to // support use cases such as `FuturesUnordered` (which currently // violate the "done" contract). // // - Whether to use a "pun" return type such as `Poll<Option<T>>` or // to use a dedicated enum in the manner of `CoroutineState`. // // - Whether and how to flatten carried effects into the type of the // value returned from `poll_next`. // // - Whether, since `?` can be used outside of `try` blocks, we // should compose the `Try` carried effect even in normal // (non-`try`) async iterators. // // - Whether `poll_progress` should indicate that a value is ready // to be yielded, that no further work can be done until // `poll_next` is called, or both. // // - Whether `poll_progress` should indicated the done state, if it // knows about, or whether it should force the caller to make a // call to `poll_next`. // // The comments below are written to tell a story and to be read even // if you do not read the code. // // Note that everything here works with `no_std` without any // allocation or type erasure. //#![cfg_attr(not(test), no_std)] #![feature(const_waker)] #![feature(impl_trait_in_assoc_type)] #![allow(async_fn_in_trait)] #![allow(clippy::redundant_pattern_matching)] use core::{ future::Future, hint::unreachable_unchecked, marker::PhantomData, pin::Pin, ptr::addr_of_mut, task::{Context, Poll}, }; // This is an enum that will be returned by `poll_next`. // // We could conceivably define `AsyncIterator::poll_next` to return // values of type `Poll<Option<T>>`, however, for demonstration and to // explore some of the open questions, we'll use a dedicated enum for // this for clarity. // // There may of course be value in doing it this way "for real". In // the some way that we have a dedicated `CoroutineState` rather than // reusing `ControlFlow`, and in the same way that we have // `ControlFlow` rather than reusing `Result`, it could be sensible to // have separate enum rather than punning here. // // As we stack more carried effects, the potential value of having // separate enums becomes more clear. If this were a // `TryAsyncIterator`, then we would simply add a `Residual` variant // to this enum rather than having to worry with how to order the // composition of the carried effects. // // In fact, given that `?` can be used outside of `try` blocks in // Rust, we should consider adding a `Residual` variant here anyway. // // One open design question here is whether to have a separate `Empty` // variant. Important stream types such as `FuturesUnordered` // currently abuse the done state to signal that they are out of items // temporarily but may yield more later. We seem to have these // options: // // 1. Declare that this is a supported use of the done state and // that an `AsyncIterator` that returns done may later yield values. // // 2. Declare that this is a violation of the contract and that // `FuturesUnordered` should instead either returning `Pending` when // it is empty (and we should look into the use cases and reasons // why it does not do that) or return `Yielded(Option<T>)`. // // 3. Provide a separate way to signal empty (and not pending) // versus done. // // The downside of option 2 is that this would de facto become a // separate interface to async iterators that is not represented in // the interface itself, and that could make it more cumbersome for // generic code to handle this. // // On the other hand, having a separate state for this clutters up the // trait for all users. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum NextState<T> { /// A value has been yielded. Yielded(T), /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // This is an enum that will be returned by `poll_progress`. // // Conceivably `poll_progress` could return `Poll<()>` where returning // `Ready(())` indicates that nothing more can be done until // `poll_next` is called. // // However, in implementation experience, we found that didn't express // enough, which resulted in duplicating substantial code between // `poll_progress` and `poll_next` implementations. When // implementing, what we want is to be able to call `poll_progress` // from `poll_next`. However, for that to be useful, we need for // `poll_progress` to return a signal that the next value is actually // ready to be consumed. // // Not all async iterators will be able to implement a `poll_progress` // that promises that. This leaves us with three options: // // 1. Indicate only when `poll_progress` can do no more work until // `poll_next` is called. This results in duplicate code between // `poll_progress` and `poll_next`. // // 2. Indicate only when `poll_next` is ready to yield a value. // This results in some async iterators always returning `Pending` // from `poll_progress` which means that callers may invoke // `poll_pending` more often than needed. // // 3. Indicate both states. // // We've take option 3 below, and have labeled these states `Paused` // and `ReadyToYield`. // // A separate design point is the question of whether, if // `poll_progress` knows that the async iterator is finished, whether // it should indicate that itself or whether it should indicated // `Paused` and force the caller to call `poll_next` to receive the // `Done` or `Empty` indication. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum ProgressState { /// No further progress can be made until `poll_next` is called; /// it's safe to elide calls to `poll_progress` until after the /// next call to `poll_next`. Paused, /// Further progress is possible by calling `poll_progress`. Pending, } // Let's say that the primary `AsyncIterator` trait in the language // uses `poll_next`. All language features such as `async gen` are // defined in terms of this. pub trait AsyncIterator { type Item; // While we're talking about `AsyncIterator`, note that we also // need to solve the backpressure / liveness problem (also known // as the "buffered streams" problem) in some way. We've // discussed `poll_progress` as one solution for this. fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> ProgressState { ProgressState::Paused } // The `for await` syntax would use this method. fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item>; } // For convenience, we define an extension trait so as to provide a // convenient `next` method to callers. This trait cannot be // implemented other than by the blanket impl, and users would be // discouraged from using it in bounds, similar to (but perhaps // stronger than) how we discourage using `From` in bounds. pub trait AsyncFnNext: AsyncIterator { // Note that the returned future here is cancellation safe. That // is, it's always safe to drop this future and no items or other // state will be lost. // // This is in contrast to an approach where `async fn next` is // implemented directly. In that world, callers must assume that // the future returned by `next` may hold non-trivial state and // may not be cancellation safe. type Future<'s>: Future< Output = Option<<Self as AsyncIterator>::Item>, > where Self: 's; fn next(self: Pin<&mut Self>) -> Self::Future<'_>; } // In this blanket impl, for a trait object, `T` here is going to be // `dyn AsyncIterator`. This makes the call to `next` a // statically-dispatched *direct* call. // // This is the optimal implementation of a `next()` method as, for a // `dyn AsyncIterator`, calling this and polling the resulting future // requires only one indirect call. All other known approaches, // including approaches where implementors would implement `async fn // next` directly, require two indirect calls. impl<T: AsyncIterator + ?Sized> AsyncFnNext for T { // We name this associated type rather than using AFIT so that // downstream code can use this name to set bounds or to store // this future in an ADT. type Future<'s> = impl Future<Output = Option<<Self as AsyncIterator>::Item>> where Self: 's; fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> { core::future::poll_fn(move |cx| { // This call to `poll_next` is the only indirect call when // the `Self` type is a trait object. match self.as_mut().poll_next(cx) { NextState::Yielded(x) => Poll::Ready(Some(x)), NextState::Done => Poll::Ready(None), NextState::Empty => Poll::Ready(None), NextState::Pending => Poll::Pending, } }) } } // We have an `IntoAsyncIterator` trait, just as we have an // `IntoFuture` trait. This is called on the expression passed to // `for await` before we pin the result and begin to poll/iterate it. // // Note that if we had `async gen` blocks in the language, those could // be used to implement this directly to much the same effect as we // achieve with the `AsyncFnNextiterator` below. // // Note that this trait has a dummy generic type parameter. We need // this so as to be able to satisfy coherence checking later on when // we need to name our simulated existential lifetime. pub trait IntoAsyncIterator<S> { type Item; type IntoAsyncIterator: AsyncIterator<Item = Self::Item>; fn into_async_iterator(self) -> Self::IntoAsyncIterator; } // Suppose that some users want to implement asynchronous iterators by // using `async` blocks rather than by implementing a // `AsyncIterator::poll_next` manually. We provide this trait to // allow for that. Any time that implements this trait automatically // implements `IntoAsyncIterator`. // // Jump down to the tests below to see how this is used. pub trait AsyncFnNextIterator { type Item; // We give a name to the returned future rather than using AFIT // for two reasons: // // 1. Since the future can be named, callers can add bounds on // this future, which allows use sites to set bounds for it. That // solves, among other things, the `Send` bound problem. // // 2. Since the future can be named, we can store it unboxed when // converting the `Self` into an `AsyncIterator`. // // Note that to match the AFIT version, we need this to be a GAT // so that the returned future can capture a reference to the // `Self` type. type NextFuture<'s>: Future<Output = Option<Self::Item>> where Self: 's; fn next_item(&mut self) -> Self::NextFuture<'_>; } // This struct will wrap our `AsyncFnNextIterator` and hold onto the // previous future that it has returned. This type is // self-referential because the returned future may hold a reference // to data in the async iterator. pub struct ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { iter: I, // Note that to make this work, it's critical that we're able to // name `NextFuture`, as otherwise we could not store it unboxed // in this struct. // // What we actually want here are existential lifetimes so that we // can express the type: // // exists<'s> Option<<I as AsyncFnNextIterator>::NextFuture<'s>> // // That is, we need to name *some* lifetime that is shorter than // the lifetime of `I` and of `Self`. We don't have a way to // express this in Rust currently other than to add this as a // generic lifetime parameter. This forces us to add a dummy // generic parameter to `IntoAsynciterator` so that we can satify // coherence checking. fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>, _p: PhantomData<&'s ()>, } // This blanket impl makes all types that implement // `AsyncFnNextIterator` into types that implement // `IntoAsynciterator`. impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { type Item = <I as AsyncFnNextIterator>::Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { // SAFETY: Writing the following in terms of raw pointers // raises fewer opsem questions than doing it with references, // so we wrap the entire body in `unsafe` to make this // readable. unsafe { // SAFETY: This is for the pin projections below. let this = addr_of_mut!(*self.get_unchecked_mut()); if let None = (*this).fut { // SAFETY: We're doing lifetime extension here. The // fact that we have a pinned mutable reference to // `Self` is what makes this sound. // // This is a simulation of existential lifetimes. let fut: I::NextFuture<'s> = (*this).iter.next_item(); (*this).fut = Some(fut); } let Some(ref mut fut) = (*this).fut else { // SAFETY: We just ensured this must be `Some(_)`. unreachable_unchecked() }; // SAFETY: We uphold the pin contract for this value. let pfut = Pin::new_unchecked(fut); match pfut.poll(cx) { Poll::Ready(x) => { (*this).fut = None; match x { Some(x) => NextState::Yielded(x), None => NextState::Done, } } Poll::Pending => NextState::Pending, } } } } // All `AsyncFnNextIterator` types implement `IntoAsynciterator`. // // Note that this impl is why we need the dummy generic type parameter // on `IntoAsynciterator`. Without it, the `'s` lifetime would be // unconstrained and we would fail coherence checking, and without // being able to name this lifetime in the impl, we would not be able // to name the type of the `ConvertedAsynciterator`. impl<'s, I> IntoAsyncIterator<&'s ()> for I where I: AsyncFnNextIterator + 's, <I as AsyncFnNextIterator>::Item: 's, { type Item = <I as AsyncFnNextIterator>::Item; type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>; fn into_async_iterator(self) -> Self::IntoAsyncIterator { ConvertedAsyncIterator { iter: self, fut: None, _p: PhantomData, } } } // ################################################################### // Tests and demonstration of use. #[cfg(test)] mod always42_test { use super::*; use core::pin::pin; // Let's say a user comes along and wants to make the `Always42` // struct into an asynchronous iterator the "easy way". So the // user writes this impl in terms of the AFIT-like version. struct Always42; impl AsyncFnNextIterator for Always42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_always42_poll() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always42_next() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // By compiling, this demonstrates that our simulation of existential // lifetimes works correctly and that we're not requiring that the // underlying `AsyncFnNextIterator` outlive `'static`. #[cfg(test)] mod borrowclone_test { use super::*; use core::pin::pin; struct BorrowClone<T: Clone>(T); impl<T: Clone> AsyncFnNextIterator for BorrowClone<T> { type Item = T; type NextFuture<'s> = impl Future<Output = Option<Self::Item>> where T: 's; fn next_item(&mut self) -> Self::NextFuture<'_> { let x = &self.0; async move { Some(x.clone()) } } } #[test] fn test_borrowclone_poll() { let mut x = BorrowClone(42u8).into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_borrowclone_next() { let mut x = BorrowClone(42u8).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates that we correctly handle state within the // returned future. #[cfg(test)] mod alwaysonepending_test { use super::*; use core::pin::pin; struct AlwaysOnePending; impl AsyncFnNextIterator for AlwaysOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match once { true => Poll::Ready(Some(42)), false => { once = true; Poll::Pending } }) .await } } } #[test] fn test_always_one_pending_poll() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always_one_pending_next() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates the two FSM situation. We correctly handle the // state both in the underlying `AsyncFnNextIterator` and the state in // the returned future. #[cfg(test)] mod onceonepending_test { use super::*; use core::pin::pin; struct OnceOnePending(bool); impl AsyncFnNextIterator for OnceOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match (self.0, once) { (false, false) => { once = true; Poll::Pending } (false, true) => { self.0 = true; Poll::Ready(Some(42)) } (true, _) => Poll::Ready(None), }) .await } } } #[test] fn test_once_one_pending_poll() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Done, next()); assert_eq!(NextState::Done, next()); } #[test] fn test_once_one_pending_next() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(None), next()); assert_eq!(Poll::Ready(None), next()); } } // Test that we can implement `AsyncFnNextIterator` both such that the // returned future implements `Send` and such that it does not, and // that callers can set bounds so as to rely on this. #[cfg(test)] mod sendalways42_test { use super::*; use core::pin::pin; struct SendAlways42; impl AsyncFnNextIterator for SendAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_sendalways42_next() { let x = SendAlways42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_spawn(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } #[cfg(test)] mod localalways42_test { use super::*; use std::{future::ready, rc::Rc}; struct LocalAlways42(Rc<()>); impl AsyncFnNextIterator for LocalAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { let x = self.0.clone(); async move { Some((ready(()).await, *x, 42).2) } } } #[test] fn test_localalways42_next() { use core::pin::pin; let x = LocalAlways42(Rc::new(())).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // Test that we can implement `AsyncFnNextIterator` using an `async` // block and a concrete future while using the resulting // `AsyncIterator` with dynamic dispatch and type erasure. #[cfg(test)] mod dynalways42_test { use super::*; struct DynAlways42; impl AsyncFnNextIterator for DynAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_dynalways42_poll() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_dynalways42_next() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // ################################################################### // The backpressure / liveness problem. // This is a demonstration of why it may make sense to add // `poll_progress` to `Future`. Doing this allows for higher level // constructs such as `FuturesUnordered` and buffered versions of it // to be built in a more orthogonal way. pub trait ProgressFuture: Future { fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<()> { Poll::Ready(()) } } pub trait BufferedFuture: ProgressFuture { fn buffer(self) -> Buffered<Self> where Self: Sized; } pub enum Buffered<Fut> where Fut: Future, { Pending(Fut), Ready(Option<<Fut as Future>::Output>), } impl<Fut> Future for Buffered<Fut> where Fut: Future, { type Output = <Fut as Future>::Output; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; match this { Buffered::Pending(fut) => { let fut = unsafe { Pin::new_unchecked(fut) }; fut.poll(cx) } Buffered::Ready(None) => Poll::Pending, Buffered::Ready(x) => Poll::Ready(x.take().unwrap()), } } } impl<Fut> ProgressFuture for Buffered<Fut> where Fut: Future, { fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<()> { let this = unsafe { self.get_unchecked_mut() }; let Buffered::Pending(fut) = this else { return Poll::Ready(()); }; let fut = unsafe { Pin::new_unchecked(fut) }; match fut.poll(cx) { Poll::Ready(x) => { *this = Buffered::Ready(Some(x)); Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } impl<Fut: ProgressFuture> BufferedFuture for Fut { fn buffer(self) -> Buffered<Self> where Self: Sized, { Buffered::Pending(self) } } // ################################################################### // `FuturesUnordered` demonstration. #[cfg(test)] macro_rules! cfg_match { ($(cfg($cond:meta) => { $($then:tt)* })+ $(_ => { $($else:tt)* })? ) => { cfg_match! { __xs (); $((($cond) ($($then)*)),)+ $((() ($($else)*)),)? }}; (__id $($xs:tt)*) => { $($xs)* }; (__xs ($($_:meta,)*);) => {}; (__xs ($($ncond:meta,)*); (($($cond:meta)?) ($($body:tt)*)), $($rest:tt,)* ) => { #[cfg(all($($cond,)? not(any($($ncond),*))))] cfg_match! { __id $($body)* } cfg_match! { __xs ($($ncond,)* $($cond,)?); $($rest,)* } }; } // This is a highly simplified and deoptimized version. pub struct FuturesUnordered<Fut> where Fut: ProgressFuture, { queue: Queue<Fut, 1024>, } impl<Fut> FuturesUnordered<Fut> where Fut: ProgressFuture, { pub fn new() -> Self { Self { queue: Queue::new() } } pub fn len(&self) -> usize { self.queue.len() } pub fn push(&mut self, f: Fut) { self.queue.push(f); } } impl<Fut> AsyncIterator for FuturesUnordered<Fut> where Fut: ProgressFuture, { type Item = <Fut as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let (mut i, len) = (0usize, this.queue.len()); while let Some(mut fut) = this.queue.pop() { { let fut = unsafe { Pin::new_unchecked(&mut fut) }; match fut.poll(cx) { Poll::Pending => {} Poll::Ready(x) => return NextState::Yielded(x), } } this.queue.push(fut); i += 1; let false = i == len else { break }; } NextState::Pending } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut state = ProgressState::Paused; for fut in this.queue.iter_mut() { let fut = unsafe { Pin::new_unchecked(fut) }; if let Poll::Pending = fut.poll_progress(cx) { state = ProgressState::Pending; } } state } } pub struct Bottleneck<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { stream: St, queue: FuturesUnordered<<St as AsyncIterator>::Item>, max: usize, } impl<St> Bottleneck<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { pub fn new(stream: St, max: usize) -> Self { Self { stream, queue: FuturesUnordered::new(), max } } fn make_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> (ProgressState, bool) { let this = unsafe { self.get_unchecked_mut() }; let mut is_stream_done = false; if this.queue.len() < this.max { while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty | NextState::Done => { is_stream_done = true; break; } NextState::Pending => break, } } } else { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_progress(cx) { ProgressState::Paused => is_stream_done = true, ProgressState::Pending => {} } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; (queue.poll_progress(cx), is_stream_done) } } impl<St> AsyncIterator for Bottleneck<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { type Item = <<St as AsyncIterator>::Item as Future>::Output; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let (_, is_stream_done) = self.as_mut().make_progress(cx); let this = unsafe { self.get_unchecked_mut() }; let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_next(cx) { NextState::Yielded(x) => NextState::Yielded(x), NextState::Empty if is_stream_done => NextState::Empty, NextState::Empty => NextState::Pending, NextState::Done => unreachable!(), NextState::Pending => NextState::Pending, } } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { match self.make_progress(cx) { (ProgressState::Paused, true) => ProgressState::Paused, (ProgressState::Paused, _) => ProgressState::Pending, (ProgressState::Pending, _) => ProgressState::Pending, } } } #[cfg(test)] mod buffered_test { use super::*; use core::pin::pin; use core::sync::atomic::{AtomicU64, Ordering}; struct DelayFuture<'s> { ticker: &'s AtomicU64, delay: u64, start: u64, last_tick: u64, missed: u64, } impl<'s> DelayFuture<'s> { pub fn new(ticker: &'s AtomicU64, delay: u64) -> Self { let tick = ticker.load(Ordering::Relaxed); Self { ticker, delay, start: tick, last_tick: tick, missed: 0, } } } impl<'s> Future for DelayFuture<'s> { type Output = (u64, u64); fn poll( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Self::Output> { let tick = self.ticker.load(Ordering::Relaxed); if tick > self.last_tick + 1 { self.missed += tick - (self.last_tick + 1); panic!("Missed a tick."); } self.last_tick = tick; let age = tick - self.start; if age >= self.delay { return Poll::Ready((age, self.missed)); } Poll::Pending } } impl<'s> ProgressFuture for DelayFuture<'s> { fn poll_progress( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<()> { let tick = self.ticker.load(Ordering::Relaxed); if tick > self.last_tick + 1 { self.missed += tick - (self.last_tick + 1); panic!("Missed a tick."); } self.last_tick = tick; if tick - self.start >= self.delay { return Poll::Ready(()); } Poll::Pending } } struct DelayStream<'s> { ticker: &'s AtomicU64, max_delay: u64, rand: XorShift, last_tick: u64, missed: u64, } impl<'s> DelayStream<'s> { pub fn new(ticker: &'s AtomicU64, max_delay: u64) -> Self { let tick = ticker.load(Ordering::Relaxed); Self { ticker, max_delay, rand: XorShift::new(), last_tick: tick, missed: 0, } } } impl<'s> AsyncIterator for DelayStream<'s> { type Item = Buffered<DelayFuture<'s>>; fn poll_next( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> NextState<Self::Item> { let tick = self.ticker.load(Ordering::Relaxed); if tick > self.last_tick + 1 { self.missed += tick - (self.last_tick + 1); panic!("Missed a tick."); } self.last_tick = tick; let delay = self.rand.rand() % self.max_delay; NextState::Yielded( DelayFuture::new(self.ticker, delay).buffer(), ) } fn poll_progress( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> ProgressState { let tick = self.ticker.load(Ordering::Relaxed); if tick > self.last_tick + 1 { self.missed += tick - (self.last_tick + 1); panic!("Missed a tick."); } self.last_tick = tick; ProgressState::Paused } } #[test] fn buffered_test() { cfg_match! { cfg(miri) => { const N_ITERS: usize = 2usize.pow(7); const BUFSIZE: usize = 2usize.pow(2); const INV_RATE: usize = 10; const MAX_DELAY: u64 = 80; } _ => { const N_ITERS: usize = 2usize.pow(20); const BUFSIZE: usize = 2usize.pow(8); const INV_RATE: usize = 100; const MAX_DELAY: u64 = 800; } } let ticker = AtomicU64::new(0); let mut cx = NOP_CONTEXT; let ds = DelayStream::new(&ticker, MAX_DELAY); let bs = Bottleneck::new(ds, BUFSIZE); let mut bs = pin!(bs); let tick = || ticker.fetch_add(1, Ordering::Relaxed); let mut max_age = 0; for i in 0..N_ITERS { _ = tick(); if i % INV_RATE == 0 { match bs.as_mut().poll_next(&mut cx) { NextState::Yielded((age, missed)) => { assert_eq!(0, missed); if age > max_age { assert!(age < MAX_DELAY); max_age = age } } NextState::Pending => {} _ => unreachable!(), } } else { _ = bs.as_mut().poll_progress(&mut cx); } } dbg!(max_age); } } // ################################################################### // Supporting code. pub use nop_waker::*; mod nop_waker { use core::{ future::Future, pin::{pin, Pin}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; pub const NOP_RAWWAKER: RawWaker = { fn nop(_: *const ()) {} const VTAB: RawWakerVTable = RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop); RawWaker::new(&() as *const (), &VTAB) }; pub const NOP_WAKER: &Waker = &unsafe { Waker::from_raw(NOP_RAWWAKER) }; pub const NOP_CONTEXT: Context<'static> = Context::from_waker(NOP_WAKER); pub fn poll_once<T, F>(f: &mut F) -> Poll<T> where F: Future<Output = T> + ?Sized + Unpin, { let mut cx = NOP_CONTEXT; Pin::new(f).as_mut().poll(&mut cx) } pub fn poll_once_owned<T, F>(f: F) -> Poll<T> where F: Future<Output = T>, { poll_once(&mut pin!(f)) } #[cfg(test)] pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T> where F: Future<Output = T> + Send, { std::thread::scope(move |s| { s.spawn(move || poll_once_owned(f)).join().unwrap() }) } } pub use circular_queue::*; mod circular_queue { use core::mem::{transmute, MaybeUninit}; pub struct Queue<T, const LEN: usize> { buf: [MaybeUninit<T>; LEN], idx: usize, len: usize, } impl<T: Sized, const LEN: usize> Queue<T, LEN> { pub fn new() -> Self { let buf: [MaybeUninit<T>; LEN] = unsafe { MaybeUninit::uninit().assume_init() }; Self { buf, idx: 0, len: 0 } } pub fn len(&self) -> usize { self.len } pub fn push(&mut self, x: T) { assert!(self.len < LEN); let idx = (self.idx + self.len) % LEN; self.buf[idx] = MaybeUninit::new(x); self.len += 1; } #[must_use] pub fn pop(&mut self) -> Option<T> { let true = self.len > 0 else { return None }; let v = unsafe { self.buf[self.idx].assume_init_read() }; self.idx = (self.idx + 1) % LEN; self.len -= 1; Some(v) } pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> { struct IterMut<'a, T: 'a, const LEN: usize> { queue: &'a mut Queue<T, LEN>, idx: usize, len: usize, } impl<'a, T: 'a, const LEN: usize> Iterator for IterMut<'a, T, LEN> { type Item = &'a mut T; fn next(&mut self) -> Option<Self::Item> { let true = self.len > 0 else { return None }; let v: &'a mut T = unsafe { transmute( self.queue.buf[self.idx] .assume_init_mut(), ) }; self.idx = (self.idx + 1) % LEN; self.len -= 1; Some(v) } } let (idx, len) = (self.idx, self.len); IterMut::<'_, _, LEN> { queue: self, idx, len } } } impl<T, const LEN: usize> Drop for Queue<T, LEN> { fn drop(&mut self) { while self.len() > 0 { _ = self.pop(); } } } } pub use xorshift::*; mod xorshift { pub struct XorShift { seed: u64, } impl XorShift { pub fn new() -> Self { Self { seed: 1 } } pub fn rand(&mut self) -> u64 { self.seed ^= self.seed << 23; self.seed ^= self.seed >> 13; self.seed ^= self.seed << 38; self.seed } } impl Iterator for XorShift { type Item = u64; fn next(&mut self) -> Option<Self::Item> { Some(self.rand()) } } } ``` --- # Discussion ## Attendance - People: TC, Oli