owned this note
owned this note
Published
Linked with GitHub
---
title: "Design meeting 2024-02-15: AsyncIterator prototype"
tags: ["WG-async", "design-meeting", "minutes"]
date: 2024-02-15
discussion: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/Design.20meeting.202024-02-15
url: https://hackmd.io/EplPcmBCTCSQ9LPOU6IZDw
---
# A discussion prototype for `AsyncIterator`
```rust
// This is a demonstration of how we might implement `AsyncIterator`.
// We cover the follow topics in the code and comments that follow:
//
// - How we can allow users to implement an AFIT `async fn next`
// equivalent while converting that automatically to an
// `AsyncIterator` type with `IntoAsynciterator`.
//
// - How, for `AsyncIterator` trait objects, we can allow calls to
// the `next()` method with only a single indirect call.
//
// - How we need to handle the backpressure / liveness problem in
// our solution to `AsyncIterator`.
//
// - Why it's important that we can name the future returned by a
// method for which we might otherwise use AFIT. This has bearing
// on the possible need for RTN. This is larger than the `Send`
// bound problem, but it relates to that as well.
//
// - How the presence of stable `async gen` blocks may affect our
// thinking on this design.
//
// - How we can preserve cancellation safety for the future returned
// by the `next()` method while still allowing the future returned
// by `next_item()` to have its own non-trivial state. The former
// is important since the current ecosystem assumes that futures
// returned by `next()` are cancellation safe, while the latter is
// important to avoid an easy mistake to make while implementing
// the convenient AFIT-like trait.
//
// - How, even from a `dyn AsyncIterator`, we can return concrete
// futures using an `async` block in the implementation.
//
// - Whether to separately indicate `Empty` and `Done` states to
// support use cases such as `FuturesUnordered` (which currently
// violate the "done" contract).
//
// - Whether to use a "pun" return type such as `Poll<Option<T>>` or
// to use a dedicated enum in the manner of `CoroutineState`.
//
// - Whether and how to flatten carried effects into the type of the
// value returned from `poll_next`.
//
// - Whether, since `?` can be used outside of `try` blocks, we
// should compose the `Try` carried effect even in normal
// (non-`try`) async iterators.
//
// - Whether `poll_progress` should indicate that a value is ready
// to be yielded, that no further work can be done until
// `poll_next` is called, or both.
//
// - Whether `poll_progress` should indicated the done state, if it
// knows about, or whether it should force the caller to make a
// call to `poll_next`.
//
// Note that everything here (aside from the buffered streams demo
// below) works with `no_std` without any allocation or type erasure.
//#![cfg_attr(not(test), no_std)]
#![feature(const_waker)]
#![feature(impl_trait_in_assoc_type)]
#![allow(async_fn_in_trait)]
#![allow(clippy::redundant_pattern_matching)]
use core::{
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
pin::Pin,
ptr::addr_of_mut,
task::{Context, Poll},
};
// This is an enum that will be returned by `poll_next`.
//
// We could conceivably define `AsyncIterator::poll_next` to return
// values of type `Poll<Option<T>>`, however, for demonstration and to
// explore some of the open questions, we'll use a dedicated enum for
// this for clarity.
//
// There may of course be value in doing it this way "for real". In
// the some way that we have a dedicated `CoroutineState` rather than
// reusing `ControlFlow`, and in the same way that we have
// `ControlFlow` rather than reusing `Result`, it could be sensible to
// have separate enum rather than punning here.
//
// As we stack more carried effects, the potential value of having
// separate enums becomes more clear. If this were a
// `TryAsyncIterator`, then we would simply add a `Residual` variant
// to this enum rather than having to worry with how to order the
// composition of the carried effects.
//
// In fact, given that `?` can be used outside of `try` blocks in
// Rust, we should consider adding a `Residual` variant here anyway.
//
// One open design question here is whether to have a separate `Empty`
// variant. Important stream types such as `FuturesUnordered`
// currently abuse the done state to signal that they are out of items
// temporarily but may yield more later. We seem to have these
// options:
//
// 1. Declare that this is a supported use of the done state and
// that an `AsyncIterator` that returns done may later yield values.
//
// 2. Declare that this is a violation of the contract and that
// `FuturesUnordered` should instead either returning `Pending` when
// it is empty (and we should look into the use cases and reasons
// why it does not do that) or return `Yielded(Option<T>)`.
//
// 3. Provide a separate way to signal empty (and not pending)
// versus done.
//
// The downside of option 2 is that this would de facto become a
// separate interface to async iterators that is not represented in
// the interface itself, and that could make it more cumbersome for
// generic code to handle this.
//
// On the other hand, having a separate state for this clutters up the
// trait for all users.
#[derive(
Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum NextState<T> {
/// A value has been yielded.
Yielded(T),
/// No further values will ever be yielded.
Done,
/// No further values will be yielded until some action by the
/// caller is taken.
Empty,
/// No value is yet ready.
Pending,
}
// This is an enum that will be returned by `poll_progress`.
//
// Conceivably `poll_progress` could return `Poll<()>` where returning
// `Ready(())` indicates that nothing more can be done until
// `poll_next` is called.
//
// However, in implementation experience, we found that didn't express
// enough, which resulted in duplicating substantial code between
// `poll_progress` and `poll_next` implementations. When
// implementing, what we want is to be able to call `poll_progress`
// from `poll_next`. However, for that to be useful, we need for
// `poll_progress` to return a signal that the next value is actually
// ready to be consumed.
//
// Not all async iterators will be able to implement a `poll_progress`
// that promises that. This leaves us with three options:
//
// 1. Indicate only when `poll_progress` can do no more work until
// `poll_next` is called. This results in duplicate code between
// `poll_progress` and `poll_next`.
//
// 2. Indicate only when `poll_next` is ready to yield a value.
// This results in some async iterators always returning `Pending`
// from `poll_progress` which means that callers may invoke
// `poll_pending` more often than needed.
//
// 3. Indicate both states.
//
// We've take option 3 below, and have labeled these states `Paused`
// and `ReadyToYield`.
//
// A separate design point is the question of whether, if
// `poll_progress` knows that the async iterator is finished, whether
// it should indicate that itself or whether it should indicated
// `Paused` and force the caller to call `poll_next` to receive the
// `Done` or `Empty` indication.
#[derive(
Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum ProgressState {
/// A value is ready to be yielded; the next call to `poll_next`
/// is guaranteed to produce a `Yielded(_)` value.
ReadyToYield,
/// No further progress can be made until `poll_next` is called;
/// it's safe to elide calls to `poll_progress` until after the
/// next call to `poll_next`.
Paused,
/// No further values will ever be yielded.
Done,
/// No further values will be yielded until some action by the
/// caller is taken.
Empty,
/// No value is yet ready.
Pending,
}
// Let's say that the primary `AsyncIterator` trait in the language
// uses `poll_next`. All language features such as `async gen` are
// defined in terms of this.
pub trait AsyncIterator {
type Item;
// While we're talking about `AsyncIterator`, note that we also
// need to solve the backpressure / liveness problem (also known
// as the "buffered streams" problem) in some way. We've
// discussed `poll_progress` as one solution for this.
fn poll_progress(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> ProgressState {
ProgressState::Paused
}
// The `for await` syntax would use this method.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item>;
}
// For convenience, we define an extension trait so as to provide a
// convenient `next` method to callers. This trait cannot be
// implemented other than by the blanket impl, and users would be
// discouraged from using it in bounds, similar to (but perhaps
// stronger than) how we discourage using `From` in bounds.
pub trait AsyncFnNext: AsyncIterator {
// Note that the returned future here is cancellation safe. That
// is, it's always safe to drop this future and no items or other
// state will be lost.
//
// This is in contrast to an approach where `async fn next` is
// implemented directly. In that world, callers must assume that
// the future returned by `next` may hold non-trivial state and
// may not be cancellation safe.
type Future<'s>: Future<
Output = Option<<Self as AsyncIterator>::Item>,
>
where
Self: 's;
fn next(self: Pin<&mut Self>) -> Self::Future<'_>;
}
// In this blanket impl, for a trait object, `T` here is going to be
// `dyn AsyncIterator`. This makes the call to `next` a
// statically-dispatched *direct* call.
//
// This is the optimal implementation of a `next()` method as, for a
// `dyn AsyncIterator`, calling this and polling the resulting future
// requires only one indirect call. All other known approaches,
// including approaches where implementors would implement `async fn
// next` directly, require two indirect calls.
impl<T: AsyncIterator + ?Sized> AsyncFnNext for T {
// We name this associated type rather than using AFIT so that
// downstream code can use this name to set bounds or to store
// this future in an ADT.
type Future<'s> =
impl Future<Output = Option<<Self as AsyncIterator>::Item>>
where
Self: 's;
fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> {
core::future::poll_fn(move |cx| {
// This call to `poll_next` is the only indirect call when
// the `Self` type is a trait object.
match self.as_mut().poll_next(cx) {
NextState::Yielded(x) => Poll::Ready(Some(x)),
NextState::Done => Poll::Ready(None),
NextState::Empty => Poll::Ready(None),
NextState::Pending => Poll::Pending,
}
})
}
}
// We have an `IntoAsyncIterator` trait, just as we have an
// `IntoFuture` trait. This is called on the expression passed to
// `for await` before we pin the result and begin to poll/iterate it.
//
// Note that if we had `async gen` blocks in the language, those could
// be used to implement this directly to much the same effect as we
// achieve with the `AsyncFnNextiterator` below.
pub trait IntoAsyncIterator<S> {
type Item;
type IntoAsyncIterator: AsyncIterator<Item = Self::Item>;
fn into_async_iterator(self) -> Self::IntoAsyncIterator;
}
// Suppose that some users want to implement asynchronous iterators by
// using `async` blocks rather than by implementing a
// `AsyncIterator::poll_next` manually. We provide this trait to
// allow for that. Any time that implements this trait automatically
// implements `IntoAsyncIterator`.
//
// Jump down to the tests below to see how this is used.
pub trait AsyncFnNextIterator {
type Item;
// We give a name to the returned future rather than using AFIT
// for two reasons:
//
// 1. Since the future can be named, callers can add bounds on
// this future, which allows use sites to set bounds for it. That
// solves, among other things, the `Send` bound problem.
//
// 2. Since the future can be named, we can store it unboxed when
// converting the `Self` into an `AsyncIterator`.
//
// Note that to match the AFIT version, we need this to be a GAT
// so that the returned future can capture a reference to the
// `Self` type.
type NextFuture<'s>: Future<Output = Option<Self::Item>>
where
Self: 's;
fn next_item(&mut self) -> Self::NextFuture<'_>;
}
// This struct will wrap our `AsyncFnNextIterator` and hold onto the
// previous future that it has returned. This type is
// self-referential because the returned future may hold a reference
// to data in the async iterator.
pub struct ConvertedAsyncIterator<'s, I>
where
I: AsyncFnNextIterator,
Self: 's,
{
iter: I,
// Note that to make this work, it's critical that we're able to
// name `NextFuture`, as otherwise we could not store it unboxed
// in this struct.
fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>,
_p: PhantomData<&'s ()>,
}
// This blanket impl makes all types that implement
// `AsyncFnNextIterator` into types that implement
// `IntoAsynciterator`.
impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I>
where
I: AsyncFnNextIterator,
Self: 's,
{
type Item = <I as AsyncFnNextIterator>::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
// SAFETY: Writing the following in terms of raw pointers
// raises fewer opsem questions than doing it with references,
// so we wrap the entire body in `unsafe` to make this
// readable.
unsafe {
// SAFETY: This is for the pin projections below.
let this = addr_of_mut!(*self.get_unchecked_mut());
if let None = (*this).fut {
// SAFETY: We're doing lifetime extension here. The
// fact that the `Self` type is pinned is what makes
// this sound.
let fut: I::NextFuture<'s> = (*this).iter.next_item();
(*this).fut = Some(fut);
}
let Some(ref mut fut) = (*this).fut else {
// SAFETY: We just ensured this must be `Some(_)`.
unreachable_unchecked()
};
// SAFETY: We uphold the pin contract for this value.
let pfut = Pin::new_unchecked(fut);
match pfut.poll(cx) {
Poll::Ready(x) => {
(*this).fut = None;
match x {
Some(x) => NextState::Yielded(x),
None => NextState::Done,
}
}
Poll::Pending => NextState::Pending,
}
}
}
}
// All `AsyncFnNextIterator` types implement `IntoAsynciterator`.
impl<'s, I> IntoAsyncIterator<&'s ()> for I
where
I: AsyncFnNextIterator + 's,
<I as AsyncFnNextIterator>::Item: 's,
{
type Item = <I as AsyncFnNextIterator>::Item;
type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>;
fn into_async_iterator(self) -> Self::IntoAsyncIterator {
ConvertedAsyncIterator {
iter: self,
fut: None,
_p: PhantomData,
}
}
}
// ###################################################################
// Tests and demonstration of use.
#[cfg(test)]
mod always42_test {
use super::*;
use core::pin::pin;
// Let's say a user comes along and wants to make the `Always42`
// struct into an asynchronous iterator the "easy way". So the
// user writes this impl in terms of the AFIT-like version.
struct Always42;
impl AsyncFnNextIterator for Always42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_always42_poll() {
let mut x = Always42.into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_always42_next() {
let mut x = Always42.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// This demonstrates that we correctly handle state within the
// returned future.
#[cfg(test)]
mod alwaysonepending_test {
use super::*;
use core::pin::pin;
struct AlwaysOnePending;
impl AsyncFnNextIterator for AlwaysOnePending {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async {
let mut once = false;
core::future::poll_fn(move |_| match once {
true => Poll::Ready(Some(42)),
false => {
once = true;
Poll::Pending
}
})
.await
}
}
}
#[test]
fn test_always_one_pending_poll() {
let mut x = AlwaysOnePending.into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_always_one_pending_next() {
let mut x = AlwaysOnePending.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// This demonstrates the two FSM situation. We correctly handle the
// state both in the underlying `AsyncFnNextIterator` and the state in
// the returned future.
#[cfg(test)]
mod onceonepending_test {
use super::*;
use core::pin::pin;
struct OnceOnePending(bool);
impl AsyncFnNextIterator for OnceOnePending {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async {
let mut once = false;
core::future::poll_fn(move |_| match (self.0, once) {
(false, false) => {
once = true;
Poll::Pending
}
(false, true) => {
self.0 = true;
Poll::Ready(Some(42))
}
(true, _) => Poll::Ready(None),
})
.await
}
}
}
#[test]
fn test_once_one_pending_poll() {
let mut x = OnceOnePending(false).into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Done, next());
assert_eq!(NextState::Done, next());
}
#[test]
fn test_once_one_pending_next() {
let mut x = OnceOnePending(false).into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(None), next());
assert_eq!(Poll::Ready(None), next());
}
}
// Test that we can implement `AsyncFnNextIterator` both such that the
// returned future implements `Send` and such that it does not, and
// that callers can set bounds so as to rely on this.
#[cfg(test)]
mod sendalways42_test {
use super::*;
use core::pin::pin;
struct SendAlways42;
impl AsyncFnNextIterator for SendAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_sendalways42_next() {
let x = SendAlways42.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_spawn(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
#[cfg(test)]
mod localalways42_test {
use super::*;
use std::{future::ready, rc::Rc};
struct LocalAlways42(Rc<()>);
impl AsyncFnNextIterator for LocalAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
let x = self.0.clone();
async move { Some((ready(()).await, *x, 42).2) }
}
}
#[test]
fn test_localalways42_next() {
use core::pin::pin;
let x = LocalAlways42(Rc::new(())).into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// Test that we can implement `AsyncFnNextIterator` using an `async`
// block and a concrete future while using the resulting
// `AsyncIterator` with dynamic dispatch and type erasure.
#[cfg(test)]
mod dynalways42_test {
use super::*;
struct DynAlways42;
impl AsyncFnNextIterator for DynAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_dynalways42_poll() {
let x: &mut dyn AsyncIterator<Item = u8> =
&mut DynAlways42.into_async_iterator();
let mut x = unsafe { Pin::new_unchecked(x) };
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_dynalways42_next() {
let x: &mut dyn AsyncIterator<Item = u8> =
&mut DynAlways42.into_async_iterator();
let mut x = unsafe { Pin::new_unchecked(x) };
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// ###################################################################
// The backpressure / liveness problem.
// This is a demonstration of why it may make sense to add
// `poll_progress` to `Future`. Doing this allows for higher level
// constructs such as `FuturesUnordered` and buffered versions of it
// to be built in a more orthogonal way.
pub trait ProgressFuture: Future {
fn poll_progress(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<()> {
Poll::Ready(())
}
}
pub trait BufferedFuture: ProgressFuture {
fn buffer(self) -> Buffered<Self>
where
Self: Sized;
}
pub enum Buffered<Fut>
where
Fut: Future,
{
Pending(Fut),
Ready(Option<<Fut as Future>::Output>),
}
impl<Fut> Future for Buffered<Fut>
where
Fut: Future,
{
type Output = <Fut as Future>::Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
match this {
Buffered::Pending(fut) => {
let fut = unsafe { Pin::new_unchecked(fut) };
fut.poll(cx)
}
Buffered::Ready(None) => Poll::Pending,
Buffered::Ready(x) => Poll::Ready(x.take().unwrap()),
}
}
}
impl<Fut> ProgressFuture for Buffered<Fut>
where
Fut: Future,
{
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
let Buffered::Pending(fut) = this else {
return Poll::Ready(());
};
let fut = unsafe { Pin::new_unchecked(fut) };
match fut.poll(cx) {
Poll::Ready(x) => {
*this = Buffered::Ready(Some(x));
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
impl<Fut> BufferedFuture for Buffered<Fut>
where
Fut: Future,
{
fn buffer(self) -> Buffered<Self>
where
Self: Sized,
{
Buffered::Pending(self)
}
}
// ###################################################################
// `FuturesUnordered` demonstration.
extern crate alloc;
use alloc::collections::VecDeque;
// This is a highly simplified and deoptimized version.
pub struct FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
queue: VecDeque<Fut>,
}
impl<Fut> FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn push(&mut self, f: Fut) {
self.queue.push_back(f);
}
}
impl<Fut> AsyncIterator for FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
type Item = <Fut as Future>::Output;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
let this = unsafe { self.get_unchecked_mut() };
let (mut i, len) = (0usize, this.queue.len());
while let Some(mut fut) = this.queue.pop_front() {
{
let fut = unsafe { Pin::new_unchecked(&mut fut) };
match fut.poll(cx) {
Poll::Pending => {}
Poll::Ready(x) => return NextState::Yielded(x),
}
}
this.queue.push_back(fut);
i += 1;
let false = i == len else { break };
}
NextState::Pending
}
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> ProgressState {
let this = unsafe { self.get_unchecked_mut() };
let mut state = ProgressState::ReadyToYield;
for fut in this.queue.iter_mut() {
let fut = unsafe { Pin::new_unchecked(fut) };
if let Poll::Pending = fut.poll_progress(cx) {
state = ProgressState::Pending;
}
}
state
}
}
pub struct BufferUnordered<St>
where
St: AsyncIterator,
<St as AsyncIterator>::Item: ProgressFuture,
{
stream: St,
queue: FuturesUnordered<<St as AsyncIterator>::Item>,
max: usize,
}
impl<St> AsyncIterator for BufferUnordered<St>
where
St: AsyncIterator,
<St as AsyncIterator>::Item: ProgressFuture,
{
type Item = <<St as AsyncIterator>::Item as Future>::Output;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
let this = unsafe { self.get_unchecked_mut() };
let mut empty = false;
while this.queue.len() < this.max {
let stream =
unsafe { Pin::new_unchecked(&mut this.stream) };
match stream.poll_next(cx) {
NextState::Yielded(fut) => {
this.queue.push(fut);
}
NextState::Empty => {
empty = true;
break;
}
_ => break,
}
}
let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
match queue.poll_next(cx) {
NextState::Yielded(x) => NextState::Yielded(x),
NextState::Empty if empty => NextState::Empty,
_ => NextState::Pending,
}
}
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> ProgressState {
let this = unsafe { self.get_unchecked_mut() };
let mut ready = false;
if this.queue.len() < this.max {
while this.queue.len() < this.max {
let stream =
unsafe { Pin::new_unchecked(&mut this.stream) };
match stream.poll_next(cx) {
NextState::Yielded(fut) => {
this.queue.push(fut);
}
NextState::Empty => {
ready = true;
break;
}
_ => break,
}
}
} else {
let stream =
unsafe { Pin::new_unchecked(&mut this.stream) };
match stream.poll_progress(cx) {
ProgressState::ReadyToYield => ready = true,
_ => {}
}
}
let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
match queue.poll_progress(cx) {
ProgressState::ReadyToYield if ready => {
ProgressState::ReadyToYield
}
_ => ProgressState::Pending,
}
}
}
// ###################################################################
// Supporting code.
pub use nop_waker::*;
mod nop_waker {
use core::{
future::Future,
pin::{pin, Pin},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
pub const NOP_RAWWAKER: RawWaker = {
fn nop(_: *const ()) {}
const VTAB: RawWakerVTable =
RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop);
RawWaker::new(&() as *const (), &VTAB)
};
pub const NOP_WAKER: &Waker =
&unsafe { Waker::from_raw(NOP_RAWWAKER) };
pub const NOP_CONTEXT: Context<'static> =
Context::from_waker(NOP_WAKER);
pub fn poll_once<T, F>(f: &mut F) -> Poll<T>
where
F: Future<Output = T> + ?Sized + Unpin,
{
let mut cx = NOP_CONTEXT;
Pin::new(f).as_mut().poll(&mut cx)
}
pub fn poll_once_owned<T, F>(f: F) -> Poll<T>
where
F: Future<Output = T>,
{
poll_once(&mut pin!(f))
}
#[cfg(test)]
pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T>
where
F: Future<Output = T> + Send,
{
std::thread::scope(move |s| {
s.spawn(move || poll_once_owned(f)).join().unwrap()
})
}
}
```
---
# Discussion
## Attendance
- People: TC, tmandry, eholk, Daria, CE, Vincenzo, Yosh (20 mins late)
## Meeting roles
- Minutes, driver: TC
## AsyncIterator -> futures::Stream backport
Daria: There definitelly should be a way to transform `AsyncIterator` into `futures::Stream` (and also tokio's Stream). What downsides of this are there?
## poll_progress
(In response to a question...)
TC: `poll_progress` is about maintaining liveness. Keeping the plates spinning. But you need to maintain backpressure. `poll_progress` allows you to maintain liveness without having unlimited buffering by separating the liveness concern from having to handle the output.