--- title: "Design meeting 2024-02-15: AsyncIterator prototype" tags: ["WG-async", "design-meeting", "minutes"] date: 2024-02-15 discussion: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/Design.20meeting.202024-02-15 url: https://hackmd.io/EplPcmBCTCSQ9LPOU6IZDw --- # A discussion prototype for `AsyncIterator` ```rust // This is a demonstration of how we might implement `AsyncIterator`. // We cover the follow topics in the code and comments that follow: // // - How we can allow users to implement an AFIT `async fn next` // equivalent while converting that automatically to an // `AsyncIterator` type with `IntoAsynciterator`. // // - How, for `AsyncIterator` trait objects, we can allow calls to // the `next()` method with only a single indirect call. // // - How we need to handle the backpressure / liveness problem in // our solution to `AsyncIterator`. // // - Why it's important that we can name the future returned by a // method for which we might otherwise use AFIT. This has bearing // on the possible need for RTN. This is larger than the `Send` // bound problem, but it relates to that as well. // // - How the presence of stable `async gen` blocks may affect our // thinking on this design. // // - How we can preserve cancellation safety for the future returned // by the `next()` method while still allowing the future returned // by `next_item()` to have its own non-trivial state. The former // is important since the current ecosystem assumes that futures // returned by `next()` are cancellation safe, while the latter is // important to avoid an easy mistake to make while implementing // the convenient AFIT-like trait. // // - How, even from a `dyn AsyncIterator`, we can return concrete // futures using an `async` block in the implementation. // // - Whether to separately indicate `Empty` and `Done` states to // support use cases such as `FuturesUnordered` (which currently // violate the "done" contract). // // - Whether to use a "pun" return type such as `Poll<Option<T>>` or // to use a dedicated enum in the manner of `CoroutineState`. // // - Whether and how to flatten carried effects into the type of the // value returned from `poll_next`. // // - Whether, since `?` can be used outside of `try` blocks, we // should compose the `Try` carried effect even in normal // (non-`try`) async iterators. // // - Whether `poll_progress` should indicate that a value is ready // to be yielded, that no further work can be done until // `poll_next` is called, or both. // // - Whether `poll_progress` should indicated the done state, if it // knows about, or whether it should force the caller to make a // call to `poll_next`. // // Note that everything here (aside from the buffered streams demo // below) works with `no_std` without any allocation or type erasure. //#![cfg_attr(not(test), no_std)] #![feature(const_waker)] #![feature(impl_trait_in_assoc_type)] #![allow(async_fn_in_trait)] #![allow(clippy::redundant_pattern_matching)] use core::{ future::Future, hint::unreachable_unchecked, marker::PhantomData, pin::Pin, ptr::addr_of_mut, task::{Context, Poll}, }; // This is an enum that will be returned by `poll_next`. // // We could conceivably define `AsyncIterator::poll_next` to return // values of type `Poll<Option<T>>`, however, for demonstration and to // explore some of the open questions, we'll use a dedicated enum for // this for clarity. // // There may of course be value in doing it this way "for real". In // the some way that we have a dedicated `CoroutineState` rather than // reusing `ControlFlow`, and in the same way that we have // `ControlFlow` rather than reusing `Result`, it could be sensible to // have separate enum rather than punning here. // // As we stack more carried effects, the potential value of having // separate enums becomes more clear. If this were a // `TryAsyncIterator`, then we would simply add a `Residual` variant // to this enum rather than having to worry with how to order the // composition of the carried effects. // // In fact, given that `?` can be used outside of `try` blocks in // Rust, we should consider adding a `Residual` variant here anyway. // // One open design question here is whether to have a separate `Empty` // variant. Important stream types such as `FuturesUnordered` // currently abuse the done state to signal that they are out of items // temporarily but may yield more later. We seem to have these // options: // // 1. Declare that this is a supported use of the done state and // that an `AsyncIterator` that returns done may later yield values. // // 2. Declare that this is a violation of the contract and that // `FuturesUnordered` should instead either returning `Pending` when // it is empty (and we should look into the use cases and reasons // why it does not do that) or return `Yielded(Option<T>)`. // // 3. Provide a separate way to signal empty (and not pending) // versus done. // // The downside of option 2 is that this would de facto become a // separate interface to async iterators that is not represented in // the interface itself, and that could make it more cumbersome for // generic code to handle this. // // On the other hand, having a separate state for this clutters up the // trait for all users. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum NextState<T> { /// A value has been yielded. Yielded(T), /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // This is an enum that will be returned by `poll_progress`. // // Conceivably `poll_progress` could return `Poll<()>` where returning // `Ready(())` indicates that nothing more can be done until // `poll_next` is called. // // However, in implementation experience, we found that didn't express // enough, which resulted in duplicating substantial code between // `poll_progress` and `poll_next` implementations. When // implementing, what we want is to be able to call `poll_progress` // from `poll_next`. However, for that to be useful, we need for // `poll_progress` to return a signal that the next value is actually // ready to be consumed. // // Not all async iterators will be able to implement a `poll_progress` // that promises that. This leaves us with three options: // // 1. Indicate only when `poll_progress` can do no more work until // `poll_next` is called. This results in duplicate code between // `poll_progress` and `poll_next`. // // 2. Indicate only when `poll_next` is ready to yield a value. // This results in some async iterators always returning `Pending` // from `poll_progress` which means that callers may invoke // `poll_pending` more often than needed. // // 3. Indicate both states. // // We've take option 3 below, and have labeled these states `Paused` // and `ReadyToYield`. // // A separate design point is the question of whether, if // `poll_progress` knows that the async iterator is finished, whether // it should indicate that itself or whether it should indicated // `Paused` and force the caller to call `poll_next` to receive the // `Done` or `Empty` indication. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum ProgressState { /// A value is ready to be yielded; the next call to `poll_next` /// is guaranteed to produce a `Yielded(_)` value. ReadyToYield, /// No further progress can be made until `poll_next` is called; /// it's safe to elide calls to `poll_progress` until after the /// next call to `poll_next`. Paused, /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // Let's say that the primary `AsyncIterator` trait in the language // uses `poll_next`. All language features such as `async gen` are // defined in terms of this. pub trait AsyncIterator { type Item; // While we're talking about `AsyncIterator`, note that we also // need to solve the backpressure / liveness problem (also known // as the "buffered streams" problem) in some way. We've // discussed `poll_progress` as one solution for this. fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> ProgressState { ProgressState::Paused } // The `for await` syntax would use this method. fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item>; } // For convenience, we define an extension trait so as to provide a // convenient `next` method to callers. This trait cannot be // implemented other than by the blanket impl, and users would be // discouraged from using it in bounds, similar to (but perhaps // stronger than) how we discourage using `From` in bounds. pub trait AsyncFnNext: AsyncIterator { // Note that the returned future here is cancellation safe. That // is, it's always safe to drop this future and no items or other // state will be lost. // // This is in contrast to an approach where `async fn next` is // implemented directly. In that world, callers must assume that // the future returned by `next` may hold non-trivial state and // may not be cancellation safe. type Future<'s>: Future< Output = Option<<Self as AsyncIterator>::Item>, > where Self: 's; fn next(self: Pin<&mut Self>) -> Self::Future<'_>; } // In this blanket impl, for a trait object, `T` here is going to be // `dyn AsyncIterator`. This makes the call to `next` a // statically-dispatched *direct* call. // // This is the optimal implementation of a `next()` method as, for a // `dyn AsyncIterator`, calling this and polling the resulting future // requires only one indirect call. All other known approaches, // including approaches where implementors would implement `async fn // next` directly, require two indirect calls. impl<T: AsyncIterator + ?Sized> AsyncFnNext for T { // We name this associated type rather than using AFIT so that // downstream code can use this name to set bounds or to store // this future in an ADT. type Future<'s> = impl Future<Output = Option<<Self as AsyncIterator>::Item>> where Self: 's; fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> { core::future::poll_fn(move |cx| { // This call to `poll_next` is the only indirect call when // the `Self` type is a trait object. match self.as_mut().poll_next(cx) { NextState::Yielded(x) => Poll::Ready(Some(x)), NextState::Done => Poll::Ready(None), NextState::Empty => Poll::Ready(None), NextState::Pending => Poll::Pending, } }) } } // We have an `IntoAsyncIterator` trait, just as we have an // `IntoFuture` trait. This is called on the expression passed to // `for await` before we pin the result and begin to poll/iterate it. // // Note that if we had `async gen` blocks in the language, those could // be used to implement this directly to much the same effect as we // achieve with the `AsyncFnNextiterator` below. pub trait IntoAsyncIterator<S> { type Item; type IntoAsyncIterator: AsyncIterator<Item = Self::Item>; fn into_async_iterator(self) -> Self::IntoAsyncIterator; } // Suppose that some users want to implement asynchronous iterators by // using `async` blocks rather than by implementing a // `AsyncIterator::poll_next` manually. We provide this trait to // allow for that. Any time that implements this trait automatically // implements `IntoAsyncIterator`. // // Jump down to the tests below to see how this is used. pub trait AsyncFnNextIterator { type Item; // We give a name to the returned future rather than using AFIT // for two reasons: // // 1. Since the future can be named, callers can add bounds on // this future, which allows use sites to set bounds for it. That // solves, among other things, the `Send` bound problem. // // 2. Since the future can be named, we can store it unboxed when // converting the `Self` into an `AsyncIterator`. // // Note that to match the AFIT version, we need this to be a GAT // so that the returned future can capture a reference to the // `Self` type. type NextFuture<'s>: Future<Output = Option<Self::Item>> where Self: 's; fn next_item(&mut self) -> Self::NextFuture<'_>; } // This struct will wrap our `AsyncFnNextIterator` and hold onto the // previous future that it has returned. This type is // self-referential because the returned future may hold a reference // to data in the async iterator. pub struct ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { iter: I, // Note that to make this work, it's critical that we're able to // name `NextFuture`, as otherwise we could not store it unboxed // in this struct. fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>, _p: PhantomData<&'s ()>, } // This blanket impl makes all types that implement // `AsyncFnNextIterator` into types that implement // `IntoAsynciterator`. impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { type Item = <I as AsyncFnNextIterator>::Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { // SAFETY: Writing the following in terms of raw pointers // raises fewer opsem questions than doing it with references, // so we wrap the entire body in `unsafe` to make this // readable. unsafe { // SAFETY: This is for the pin projections below. let this = addr_of_mut!(*self.get_unchecked_mut()); if let None = (*this).fut { // SAFETY: We're doing lifetime extension here. The // fact that the `Self` type is pinned is what makes // this sound. let fut: I::NextFuture<'s> = (*this).iter.next_item(); (*this).fut = Some(fut); } let Some(ref mut fut) = (*this).fut else { // SAFETY: We just ensured this must be `Some(_)`. unreachable_unchecked() }; // SAFETY: We uphold the pin contract for this value. let pfut = Pin::new_unchecked(fut); match pfut.poll(cx) { Poll::Ready(x) => { (*this).fut = None; match x { Some(x) => NextState::Yielded(x), None => NextState::Done, } } Poll::Pending => NextState::Pending, } } } } // All `AsyncFnNextIterator` types implement `IntoAsynciterator`. impl<'s, I> IntoAsyncIterator<&'s ()> for I where I: AsyncFnNextIterator + 's, <I as AsyncFnNextIterator>::Item: 's, { type Item = <I as AsyncFnNextIterator>::Item; type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>; fn into_async_iterator(self) -> Self::IntoAsyncIterator { ConvertedAsyncIterator { iter: self, fut: None, _p: PhantomData, } } } // ################################################################### // Tests and demonstration of use. #[cfg(test)] mod always42_test { use super::*; use core::pin::pin; // Let's say a user comes along and wants to make the `Always42` // struct into an asynchronous iterator the "easy way". So the // user writes this impl in terms of the AFIT-like version. struct Always42; impl AsyncFnNextIterator for Always42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_always42_poll() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always42_next() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates that we correctly handle state within the // returned future. #[cfg(test)] mod alwaysonepending_test { use super::*; use core::pin::pin; struct AlwaysOnePending; impl AsyncFnNextIterator for AlwaysOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match once { true => Poll::Ready(Some(42)), false => { once = true; Poll::Pending } }) .await } } } #[test] fn test_always_one_pending_poll() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always_one_pending_next() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates the two FSM situation. We correctly handle the // state both in the underlying `AsyncFnNextIterator` and the state in // the returned future. #[cfg(test)] mod onceonepending_test { use super::*; use core::pin::pin; struct OnceOnePending(bool); impl AsyncFnNextIterator for OnceOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match (self.0, once) { (false, false) => { once = true; Poll::Pending } (false, true) => { self.0 = true; Poll::Ready(Some(42)) } (true, _) => Poll::Ready(None), }) .await } } } #[test] fn test_once_one_pending_poll() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Done, next()); assert_eq!(NextState::Done, next()); } #[test] fn test_once_one_pending_next() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(None), next()); assert_eq!(Poll::Ready(None), next()); } } // Test that we can implement `AsyncFnNextIterator` both such that the // returned future implements `Send` and such that it does not, and // that callers can set bounds so as to rely on this. #[cfg(test)] mod sendalways42_test { use super::*; use core::pin::pin; struct SendAlways42; impl AsyncFnNextIterator for SendAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_sendalways42_next() { let x = SendAlways42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_spawn(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } #[cfg(test)] mod localalways42_test { use super::*; use std::{future::ready, rc::Rc}; struct LocalAlways42(Rc<()>); impl AsyncFnNextIterator for LocalAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { let x = self.0.clone(); async move { Some((ready(()).await, *x, 42).2) } } } #[test] fn test_localalways42_next() { use core::pin::pin; let x = LocalAlways42(Rc::new(())).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // Test that we can implement `AsyncFnNextIterator` using an `async` // block and a concrete future while using the resulting // `AsyncIterator` with dynamic dispatch and type erasure. #[cfg(test)] mod dynalways42_test { use super::*; struct DynAlways42; impl AsyncFnNextIterator for DynAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_dynalways42_poll() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_dynalways42_next() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // ################################################################### // The backpressure / liveness problem. // This is a demonstration of why it may make sense to add // `poll_progress` to `Future`. Doing this allows for higher level // constructs such as `FuturesUnordered` and buffered versions of it // to be built in a more orthogonal way. pub trait ProgressFuture: Future { fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<()> { Poll::Ready(()) } } pub trait BufferedFuture: ProgressFuture { fn buffer(self) -> Buffered<Self> where Self: Sized; } pub enum Buffered<Fut> where Fut: Future, { Pending(Fut), Ready(Option<<Fut as Future>::Output>), } impl<Fut> Future for Buffered<Fut> where Fut: Future, { type Output = <Fut as Future>::Output; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; match this { Buffered::Pending(fut) => { let fut = unsafe { Pin::new_unchecked(fut) }; fut.poll(cx) } Buffered::Ready(None) => Poll::Pending, Buffered::Ready(x) => Poll::Ready(x.take().unwrap()), } } } impl<Fut> ProgressFuture for Buffered<Fut> where Fut: Future, { fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<()> { let this = unsafe { self.get_unchecked_mut() }; let Buffered::Pending(fut) = this else { return Poll::Ready(()); }; let fut = unsafe { Pin::new_unchecked(fut) }; match fut.poll(cx) { Poll::Ready(x) => { *this = Buffered::Ready(Some(x)); Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } impl<Fut> BufferedFuture for Buffered<Fut> where Fut: Future, { fn buffer(self) -> Buffered<Self> where Self: Sized, { Buffered::Pending(self) } } // ################################################################### // `FuturesUnordered` demonstration. extern crate alloc; use alloc::collections::VecDeque; // This is a highly simplified and deoptimized version. pub struct FuturesUnordered<Fut> where Fut: ProgressFuture, { queue: VecDeque<Fut>, } impl<Fut> FuturesUnordered<Fut> where Fut: ProgressFuture, { pub fn len(&self) -> usize { self.queue.len() } pub fn push(&mut self, f: Fut) { self.queue.push_back(f); } } impl<Fut> AsyncIterator for FuturesUnordered<Fut> where Fut: ProgressFuture, { type Item = <Fut as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let (mut i, len) = (0usize, this.queue.len()); while let Some(mut fut) = this.queue.pop_front() { { let fut = unsafe { Pin::new_unchecked(&mut fut) }; match fut.poll(cx) { Poll::Pending => {} Poll::Ready(x) => return NextState::Yielded(x), } } this.queue.push_back(fut); i += 1; let false = i == len else { break }; } NextState::Pending } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut state = ProgressState::ReadyToYield; for fut in this.queue.iter_mut() { let fut = unsafe { Pin::new_unchecked(fut) }; if let Poll::Pending = fut.poll_progress(cx) { state = ProgressState::Pending; } } state } } pub struct BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { stream: St, queue: FuturesUnordered<<St as AsyncIterator>::Item>, max: usize, } impl<St> AsyncIterator for BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { type Item = <<St as AsyncIterator>::Item as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let mut empty = false; while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { empty = true; break; } _ => break, } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_next(cx) { NextState::Yielded(x) => NextState::Yielded(x), NextState::Empty if empty => NextState::Empty, _ => NextState::Pending, } } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut ready = false; if this.queue.len() < this.max { while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { ready = true; break; } _ => break, } } } else { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_progress(cx) { ProgressState::ReadyToYield => ready = true, _ => {} } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_progress(cx) { ProgressState::ReadyToYield if ready => { ProgressState::ReadyToYield } _ => ProgressState::Pending, } } } // ################################################################### // Supporting code. pub use nop_waker::*; mod nop_waker { use core::{ future::Future, pin::{pin, Pin}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; pub const NOP_RAWWAKER: RawWaker = { fn nop(_: *const ()) {} const VTAB: RawWakerVTable = RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop); RawWaker::new(&() as *const (), &VTAB) }; pub const NOP_WAKER: &Waker = &unsafe { Waker::from_raw(NOP_RAWWAKER) }; pub const NOP_CONTEXT: Context<'static> = Context::from_waker(NOP_WAKER); pub fn poll_once<T, F>(f: &mut F) -> Poll<T> where F: Future<Output = T> + ?Sized + Unpin, { let mut cx = NOP_CONTEXT; Pin::new(f).as_mut().poll(&mut cx) } pub fn poll_once_owned<T, F>(f: F) -> Poll<T> where F: Future<Output = T>, { poll_once(&mut pin!(f)) } #[cfg(test)] pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T> where F: Future<Output = T> + Send, { std::thread::scope(move |s| { s.spawn(move || poll_once_owned(f)).join().unwrap() }) } } ``` --- # Discussion ## Attendance - People: TC, tmandry, eholk, Daria, CE, Vincenzo, Yosh (20 mins late) ## Meeting roles - Minutes, driver: TC ## AsyncIterator -> futures::Stream backport Daria: There definitelly should be a way to transform `AsyncIterator` into `futures::Stream` (and also tokio's Stream). What downsides of this are there? ## poll_progress (In response to a question...) TC: `poll_progress` is about maintaining liveness. Keeping the plates spinning. But you need to maintain backpressure. `poll_progress` allows you to maintain liveness without having unlimited buffering by separating the liveness concern from having to handle the output.