// See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/.60AsyncIterator.60.20rename.20meeting.202024-01-22.3F
// See: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/async.20iterator.20combinators.20using.20new.20async.20functionality
//
// Author: TC
//
// This is a demonstration of how we might implement `AsyncIterator`.
// We cover the follow topics in the code and comments that follow:
//
// - How we can allow users to implement an AFIT `async fn next`
// equivalent while converting that automatically to an
// `AsyncIterator` type with `IntoAsynciterator`.
//
// - How, for `AsyncIterator` trait objects, we can allow calls to
// the `next()` method with only a single indirect call.
//
// - How we need to handle the backpressure / liveness problem in
// our solution to `AsyncIterator`.
//
// - Why it's important that we can name the future returned by a
// method for which we might otherwise use AFIT. This has bearing
// on the possible need for RTN. This is larger than the `Send`
// bound problem, but it relates to that as well.
//
// - How the presence of stable `async gen` blocks may affect our
// thinking on this design.
//
// - How we can preserve cancellation safety for the future returned
// by the `next()` method while still allowing the future returned
// by `next_item()` to have its own non-trivial state. The former
// is important since the current ecosystem assumes that futures
// returned by `next()` are cancellation safe, while the latter is
// important to avoid an easy mistake to make while implementing
// the convenient AFIT-like trait.
//
// - How, even from a `dyn AsyncIterator`, we can return concrete
// futures using an `async` block in the implementation.
//
// - Whether to separately indicate `Empty` and `Done` states to
// support use cases such as `FuturesUnordered` (which currently
// violate the "done" contract).
//
// - Whether to use a "pun" return type such as `Poll<Option<T>>` or
// to use a dedicated enum in the manner of `CoroutineState`.
//
// - Whether and how to flatten carried effects into the type of the
// value returned from `poll_next`.
//
// - Whether, since `?` can be used outside of `try` blocks, we
// should compose the `Try` carried effect even in normal
// (non-`try`) async iterators.
//
// - Whether `poll_progress` should indicate that a value is ready
// to be yielded, that no further work can be done until
// `poll_next` is called, or both.
//
// - Whether `poll_progress` should indicated the done state, if it
// knows about, or whether it should force the caller to make a
// call to `poll_next`.
//
// The comments below are written to tell a story and to be read even
// if you do not read the code.
//
// Note that everything here works with `no_std` without any
// allocation or type erasure.
//#![cfg_attr(not(test), no_std)]
#![feature(const_waker)]
#![feature(impl_trait_in_assoc_type)]
#![allow(async_fn_in_trait)]
#![allow(clippy::redundant_pattern_matching)]
use core::{
future::Future,
hint::unreachable_unchecked,
marker::PhantomData,
pin::Pin,
ptr::addr_of_mut,
task::{Context, Poll},
};
// This is an enum that will be returned by `poll_next`.
//
// We could conceivably define `AsyncIterator::poll_next` to return
// values of type `Poll<Option<T>>`, however, for demonstration and to
// explore some of the open questions, we'll use a dedicated enum for
// this for clarity.
//
// There may of course be value in doing it this way "for real". In
// the some way that we have a dedicated `CoroutineState` rather than
// reusing `ControlFlow`, and in the same way that we have
// `ControlFlow` rather than reusing `Result`, it could be sensible to
// have separate enum rather than punning here.
//
// As we stack more carried effects, the potential value of having
// separate enums becomes more clear. If this were a
// `TryAsyncIterator`, then we would simply add a `Residual` variant
// to this enum rather than having to worry with how to order the
// composition of the carried effects.
//
// In fact, given that `?` can be used outside of `try` blocks in
// Rust, we should consider adding a `Residual` variant here anyway.
//
// One open design question here is whether to have a separate `Empty`
// variant. Important stream types such as `FuturesUnordered`
// currently abuse the done state to signal that they are out of items
// temporarily but may yield more later. We seem to have these
// options:
//
// 1. Declare that this is a supported use of the done state and
// that an `AsyncIterator` that returns done may later yield values.
//
// 2. Declare that this is a violation of the contract and that
// `FuturesUnordered` should instead either returning `Pending` when
// it is empty (and we should look into the use cases and reasons
// why it does not do that) or return `Yielded(Option<T>)`.
//
// 3. Provide a separate way to signal empty (and not pending)
// versus done.
//
// The downside of option 2 is that this would de facto become a
// separate interface to async iterators that is not represented in
// the interface itself, and that could make it more cumbersome for
// generic code to handle this.
//
// On the other hand, having a separate state for this clutters up the
// trait for all users.
#[derive(
Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum NextState<T> {
/// A value has been yielded.
Yielded(T),
/// No further values will ever be yielded.
Done,
/// No further values will be yielded until some action by the
/// caller is taken.
Empty,
/// No value is yet ready.
Pending,
}
// This is an enum that will be returned by `poll_progress`.
//
// Conceivably `poll_progress` could return `Poll<()>` where returning
// `Ready(())` indicates that nothing more can be done until
// `poll_next` is called.
//
// However, in implementation experience, we found that didn't express
// enough, which resulted in duplicating substantial code between
// `poll_progress` and `poll_next` implementations. When
// implementing, what we want is to be able to call `poll_progress`
// from `poll_next`. However, for that to be useful, we need for
// `poll_progress` to return a signal that the next value is actually
// ready to be consumed.
//
// Not all async iterators will be able to implement a `poll_progress`
// that promises that. This leaves us with three options:
//
// 1. Indicate only when `poll_progress` can do no more work until
// `poll_next` is called. This results in duplicate code between
// `poll_progress` and `poll_next`.
//
// 2. Indicate only when `poll_next` is ready to yield a value.
// This results in some async iterators always returning `Pending`
// from `poll_progress` which means that callers may invoke
// `poll_pending` more often than needed.
//
// 3. Indicate both states.
//
// We've take option 3 below, and have labeled these states `Paused`
// and `ReadyToYield`.
//
// A separate design point is the question of whether, if
// `poll_progress` knows that the async iterator is finished, whether
// it should indicate that itself or whether it should indicated
// `Paused` and force the caller to call `poll_next` to receive the
// `Done` or `Empty` indication.
#[derive(
Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd,
)]
pub enum ProgressState {
/// No further progress can be made until `poll_next` is called;
/// it's safe to elide calls to `poll_progress` until after the
/// next call to `poll_next`.
Paused,
/// Further progress is possible by calling `poll_progress`.
Pending,
}
// Let's say that the primary `AsyncIterator` trait in the language
// uses `poll_next`. All language features such as `async gen` are
// defined in terms of this.
pub trait AsyncIterator {
type Item;
// While we're talking about `AsyncIterator`, note that we also
// need to solve the backpressure / liveness problem (also known
// as the "buffered streams" problem) in some way. We've
// discussed `poll_progress` as one solution for this.
fn poll_progress(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> ProgressState {
ProgressState::Paused
}
// The `for await` syntax would use this method.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item>;
}
// For convenience, we define an extension trait so as to provide a
// convenient `next` method to callers. This trait cannot be
// implemented other than by the blanket impl, and users would be
// discouraged from using it in bounds, similar to (but perhaps
// stronger than) how we discourage using `From` in bounds.
pub trait AsyncFnNext: AsyncIterator {
// Note that the returned future here is cancellation safe. That
// is, it's always safe to drop this future and no items or other
// state will be lost.
//
// This is in contrast to an approach where `async fn next` is
// implemented directly. In that world, callers must assume that
// the future returned by `next` may hold non-trivial state and
// may not be cancellation safe.
type Future<'s>: Future<
Output = Option<<Self as AsyncIterator>::Item>,
>
where
Self: 's;
fn next(self: Pin<&mut Self>) -> Self::Future<'_>;
}
// In this blanket impl, for a trait object, `T` here is going to be
// `dyn AsyncIterator`. This makes the call to `next` a
// statically-dispatched *direct* call.
//
// This is the optimal implementation of a `next()` method as, for a
// `dyn AsyncIterator`, calling this and polling the resulting future
// requires only one indirect call. All other known approaches,
// including approaches where implementors would implement `async fn
// next` directly, require two indirect calls.
impl<T: AsyncIterator + ?Sized> AsyncFnNext for T {
// We name this associated type rather than using AFIT so that
// downstream code can use this name to set bounds or to store
// this future in an ADT.
type Future<'s> =
impl Future<Output = Option<<Self as AsyncIterator>::Item>>
where
Self: 's;
fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> {
core::future::poll_fn(move |cx| {
// This call to `poll_next` is the only indirect call when
// the `Self` type is a trait object.
match self.as_mut().poll_next(cx) {
NextState::Yielded(x) => Poll::Ready(Some(x)),
NextState::Done => Poll::Ready(None),
NextState::Empty => Poll::Ready(None),
NextState::Pending => Poll::Pending,
}
})
}
}
// We have an `IntoAsyncIterator` trait, just as we have an
// `IntoFuture` trait. This is called on the expression passed to
// `for await` before we pin the result and begin to poll/iterate it.
//
// Note that if we had `async gen` blocks in the language, those could
// be used to implement this directly to much the same effect as we
// achieve with the `AsyncFnNextiterator` below.
//
// Note that this trait has a dummy generic type parameter. We need
// this so as to be able to satisfy coherence checking later on when
// we need to name our simulated existential lifetime.
pub trait IntoAsyncIterator<S> {
type Item;
type IntoAsyncIterator: AsyncIterator<Item = Self::Item>;
fn into_async_iterator(self) -> Self::IntoAsyncIterator;
}
// Suppose that some users want to implement asynchronous iterators by
// using `async` blocks rather than by implementing a
// `AsyncIterator::poll_next` manually. We provide this trait to
// allow for that. Any time that implements this trait automatically
// implements `IntoAsyncIterator`.
//
// Jump down to the tests below to see how this is used.
pub trait AsyncFnNextIterator {
type Item;
// We give a name to the returned future rather than using AFIT
// for two reasons:
//
// 1. Since the future can be named, callers can add bounds on
// this future, which allows use sites to set bounds for it. That
// solves, among other things, the `Send` bound problem.
//
// 2. Since the future can be named, we can store it unboxed when
// converting the `Self` into an `AsyncIterator`.
//
// Note that to match the AFIT version, we need this to be a GAT
// so that the returned future can capture a reference to the
// `Self` type.
type NextFuture<'s>: Future<Output = Option<Self::Item>>
where
Self: 's;
fn next_item(&mut self) -> Self::NextFuture<'_>;
}
// This struct will wrap our `AsyncFnNextIterator` and hold onto the
// previous future that it has returned. This type is
// self-referential because the returned future may hold a reference
// to data in the async iterator.
pub struct ConvertedAsyncIterator<'s, I>
where
I: AsyncFnNextIterator,
Self: 's,
{
iter: I,
// Note that to make this work, it's critical that we're able to
// name `NextFuture`, as otherwise we could not store it unboxed
// in this struct.
//
// What we actually want here are existential lifetimes so that we
// can express the type:
//
// exists<'s> Option<<I as AsyncFnNextIterator>::NextFuture<'s>>
//
// That is, we need to name *some* lifetime that is shorter than
// the lifetime of `I` and of `Self`. We don't have a way to
// express this in Rust currently other than to add this as a
// generic lifetime parameter. This forces us to add a dummy
// generic parameter to `IntoAsynciterator` so that we can satify
// coherence checking.
fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>,
_p: PhantomData<&'s ()>,
}
// This blanket impl makes all types that implement
// `AsyncFnNextIterator` into types that implement
// `IntoAsynciterator`.
impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I>
where
I: AsyncFnNextIterator,
Self: 's,
{
type Item = <I as AsyncFnNextIterator>::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
// SAFETY: Writing the following in terms of raw pointers
// raises fewer opsem questions than doing it with references,
// so we wrap the entire body in `unsafe` to make this
// readable.
unsafe {
// SAFETY: This is for the pin projections below.
let this = addr_of_mut!(*self.get_unchecked_mut());
if let None = (*this).fut {
// SAFETY: We're doing lifetime extension here. The
// fact that we have a pinned mutable reference to
// `Self` is what makes this sound.
//
// This is a simulation of existential lifetimes.
let fut: I::NextFuture<'s> = (*this).iter.next_item();
(*this).fut = Some(fut);
}
let Some(ref mut fut) = (*this).fut else {
// SAFETY: We just ensured this must be `Some(_)`.
unreachable_unchecked()
};
// SAFETY: We uphold the pin contract for this value.
let pfut = Pin::new_unchecked(fut);
match pfut.poll(cx) {
Poll::Ready(x) => {
(*this).fut = None;
match x {
Some(x) => NextState::Yielded(x),
None => NextState::Done,
}
}
Poll::Pending => NextState::Pending,
}
}
}
}
// All `AsyncFnNextIterator` types implement `IntoAsynciterator`.
//
// Note that this impl is why we need the dummy generic type parameter
// on `IntoAsynciterator`. Without it, the `'s` lifetime would be
// unconstrained and we would fail coherence checking, and without
// being able to name this lifetime in the impl, we would not be able
// to name the type of the `ConvertedAsynciterator`.
impl<'s, I> IntoAsyncIterator<&'s ()> for I
where
I: AsyncFnNextIterator + 's,
<I as AsyncFnNextIterator>::Item: 's,
{
type Item = <I as AsyncFnNextIterator>::Item;
type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>;
fn into_async_iterator(self) -> Self::IntoAsyncIterator {
ConvertedAsyncIterator {
iter: self,
fut: None,
_p: PhantomData,
}
}
}
// ###################################################################
// Tests and demonstration of use.
#[cfg(test)]
mod always42_test {
use super::*;
use core::pin::pin;
// Let's say a user comes along and wants to make the `Always42`
// struct into an asynchronous iterator the "easy way". So the
// user writes this impl in terms of the AFIT-like version.
struct Always42;
impl AsyncFnNextIterator for Always42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_always42_poll() {
let mut x = Always42.into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_always42_next() {
let mut x = Always42.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// By compiling, this demonstrates that our simulation of existential
// lifetimes works correctly and that we're not requiring that the
// underlying `AsyncFnNextIterator` outlive `'static`.
#[cfg(test)]
mod borrowclone_test {
use super::*;
use core::pin::pin;
struct BorrowClone<T: Clone>(T);
impl<T: Clone> AsyncFnNextIterator for BorrowClone<T> {
type Item = T;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>
where
T: 's;
fn next_item(&mut self) -> Self::NextFuture<'_> {
let x = &self.0;
async move { Some(x.clone()) }
}
}
#[test]
fn test_borrowclone_poll() {
let mut x = BorrowClone(42u8).into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_borrowclone_next() {
let mut x = BorrowClone(42u8).into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// This demonstrates that we correctly handle state within the
// returned future.
#[cfg(test)]
mod alwaysonepending_test {
use super::*;
use core::pin::pin;
struct AlwaysOnePending;
impl AsyncFnNextIterator for AlwaysOnePending {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async {
let mut once = false;
core::future::poll_fn(move |_| match once {
true => Poll::Ready(Some(42)),
false => {
once = true;
Poll::Pending
}
})
.await
}
}
}
#[test]
fn test_always_one_pending_poll() {
let mut x = AlwaysOnePending.into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_always_one_pending_next() {
let mut x = AlwaysOnePending.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// This demonstrates the two FSM situation. We correctly handle the
// state both in the underlying `AsyncFnNextIterator` and the state in
// the returned future.
#[cfg(test)]
mod onceonepending_test {
use super::*;
use core::pin::pin;
struct OnceOnePending(bool);
impl AsyncFnNextIterator for OnceOnePending {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async {
let mut once = false;
core::future::poll_fn(move |_| match (self.0, once) {
(false, false) => {
once = true;
Poll::Pending
}
(false, true) => {
self.0 = true;
Poll::Ready(Some(42))
}
(true, _) => Poll::Ready(None),
})
.await
}
}
}
#[test]
fn test_once_one_pending_poll() {
let mut x = OnceOnePending(false).into_async_iterator();
let mut x = pin!(x);
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Pending, next());
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Done, next());
assert_eq!(NextState::Done, next());
}
#[test]
fn test_once_one_pending_next() {
let mut x = OnceOnePending(false).into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Pending, next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(None), next());
assert_eq!(Poll::Ready(None), next());
}
}
// Test that we can implement `AsyncFnNextIterator` both such that the
// returned future implements `Send` and such that it does not, and
// that callers can set bounds so as to rely on this.
#[cfg(test)]
mod sendalways42_test {
use super::*;
use core::pin::pin;
struct SendAlways42;
impl AsyncFnNextIterator for SendAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_sendalways42_next() {
let x = SendAlways42.into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_spawn(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
#[cfg(test)]
mod localalways42_test {
use super::*;
use std::{future::ready, rc::Rc};
struct LocalAlways42(Rc<()>);
impl AsyncFnNextIterator for LocalAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
let x = self.0.clone();
async move { Some((ready(()).await, *x, 42).2) }
}
}
#[test]
fn test_localalways42_next() {
use core::pin::pin;
let x = LocalAlways42(Rc::new(())).into_async_iterator();
let mut x = pin!(x);
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// Test that we can implement `AsyncFnNextIterator` using an `async`
// block and a concrete future while using the resulting
// `AsyncIterator` with dynamic dispatch and type erasure.
#[cfg(test)]
mod dynalways42_test {
use super::*;
struct DynAlways42;
impl AsyncFnNextIterator for DynAlways42 {
type Item = u8;
type NextFuture<'s> =
impl Future<Output = Option<Self::Item>>;
fn next_item(&mut self) -> Self::NextFuture<'_> {
async { Some(42) }
}
}
#[test]
fn test_dynalways42_poll() {
let x: &mut dyn AsyncIterator<Item = u8> =
&mut DynAlways42.into_async_iterator();
let mut x = unsafe { Pin::new_unchecked(x) };
let mut cx = NOP_CONTEXT;
let mut next = || x.as_mut().poll_next(&mut cx);
assert_eq!(NextState::Yielded(42), next());
assert_eq!(NextState::Yielded(42), next());
}
#[test]
fn test_dynalways42_next() {
let x: &mut dyn AsyncIterator<Item = u8> =
&mut DynAlways42.into_async_iterator();
let mut x = unsafe { Pin::new_unchecked(x) };
let mut next = || poll_once_owned(x.as_mut().next());
assert_eq!(Poll::Ready(Some(42)), next());
assert_eq!(Poll::Ready(Some(42)), next());
}
}
// ###################################################################
// The backpressure / liveness problem.
// This is a demonstration of why it may make sense to add
// `poll_progress` to `Future`. Doing this allows for higher level
// constructs such as `FuturesUnordered` and buffered versions of it
// to be built in a more orthogonal way.
pub trait ProgressFuture: Future {
fn poll_progress(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<()> {
Poll::Ready(())
}
}
pub trait BufferedFuture: ProgressFuture {
fn buffer(self) -> Buffered<Self>
where
Self: Sized;
}
pub enum Buffered<Fut>
where
Fut: Future,
{
Pending(Fut),
Ready(Option<<Fut as Future>::Output>),
}
impl<Fut> Future for Buffered<Fut>
where
Fut: Future,
{
type Output = <Fut as Future>::Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
match this {
Buffered::Pending(fut) => {
let fut = unsafe { Pin::new_unchecked(fut) };
fut.poll(cx)
}
Buffered::Ready(None) => Poll::Pending,
Buffered::Ready(x) => Poll::Ready(x.take().unwrap()),
}
}
}
impl<Fut> ProgressFuture for Buffered<Fut>
where
Fut: Future,
{
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<()> {
let this = unsafe { self.get_unchecked_mut() };
let Buffered::Pending(fut) = this else {
return Poll::Ready(());
};
let fut = unsafe { Pin::new_unchecked(fut) };
match fut.poll(cx) {
Poll::Ready(x) => {
*this = Buffered::Ready(Some(x));
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
}
}
impl<Fut: ProgressFuture> BufferedFuture for Fut {
fn buffer(self) -> Buffered<Self>
where
Self: Sized,
{
Buffered::Pending(self)
}
}
// ###################################################################
// `FuturesUnordered` demonstration.
#[cfg(test)]
macro_rules! cfg_match {
($(cfg($cond:meta) => { $($then:tt)* })+
$(_ => { $($else:tt)* })?
) => { cfg_match! {
__xs (); $((($cond) ($($then)*)),)+ $((() ($($else)*)),)?
}};
(__id $($xs:tt)*) => { $($xs)* };
(__xs ($($_:meta,)*);) => {};
(__xs ($($ncond:meta,)*);
(($($cond:meta)?) ($($body:tt)*)), $($rest:tt,)*
) => {
#[cfg(all($($cond,)? not(any($($ncond),*))))]
cfg_match! { __id $($body)* }
cfg_match! { __xs ($($ncond,)* $($cond,)?); $($rest,)* }
};
}
// This is a highly simplified and deoptimized version.
pub struct FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
queue: Queue<Fut, 1024>,
}
impl<Fut> FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
pub fn new() -> Self {
Self { queue: Queue::new() }
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn push(&mut self, f: Fut) {
self.queue.push(f);
}
}
impl<Fut> AsyncIterator for FuturesUnordered<Fut>
where
Fut: ProgressFuture,
{
type Item = <Fut as Future>::Output;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
let this = unsafe { self.get_unchecked_mut() };
let (mut i, len) = (0usize, this.queue.len());
while let Some(mut fut) = this.queue.pop() {
{
let fut = unsafe { Pin::new_unchecked(&mut fut) };
match fut.poll(cx) {
Poll::Pending => {}
Poll::Ready(x) => return NextState::Yielded(x),
}
}
this.queue.push(fut);
i += 1;
let false = i == len else { break };
}
NextState::Pending
}
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> ProgressState {
let this = unsafe { self.get_unchecked_mut() };
let mut state = ProgressState::Paused;
for fut in this.queue.iter_mut() {
let fut = unsafe { Pin::new_unchecked(fut) };
if let Poll::Pending = fut.poll_progress(cx) {
state = ProgressState::Pending;
}
}
state
}
}
pub struct Bottleneck<St>
where
St: AsyncIterator,
<St as AsyncIterator>::Item: ProgressFuture,
{
stream: St,
queue: FuturesUnordered<<St as AsyncIterator>::Item>,
max: usize,
}
impl<St> Bottleneck<St>
where
St: AsyncIterator,
<St as AsyncIterator>::Item: ProgressFuture,
{
pub fn new(stream: St, max: usize) -> Self {
Self { stream, queue: FuturesUnordered::new(), max }
}
fn make_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> (ProgressState, bool) {
let this = unsafe { self.get_unchecked_mut() };
let mut is_stream_done = false;
if this.queue.len() < this.max {
while this.queue.len() < this.max {
let stream =
unsafe { Pin::new_unchecked(&mut this.stream) };
match stream.poll_next(cx) {
NextState::Yielded(fut) => {
this.queue.push(fut);
}
NextState::Empty | NextState::Done => {
is_stream_done = true;
break;
}
NextState::Pending => break,
}
}
} else {
let stream =
unsafe { Pin::new_unchecked(&mut this.stream) };
match stream.poll_progress(cx) {
ProgressState::Paused => is_stream_done = true,
ProgressState::Pending => {}
}
}
let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
(queue.poll_progress(cx), is_stream_done)
}
}
impl<St> AsyncIterator for Bottleneck<St>
where
St: AsyncIterator,
<St as AsyncIterator>::Item: ProgressFuture,
{
type Item = <<St as AsyncIterator>::Item as Future>::Output;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> NextState<Self::Item> {
let (_, is_stream_done) = self.as_mut().make_progress(cx);
let this = unsafe { self.get_unchecked_mut() };
let queue = unsafe { Pin::new_unchecked(&mut this.queue) };
match queue.poll_next(cx) {
NextState::Yielded(x) => NextState::Yielded(x),
NextState::Empty if is_stream_done => NextState::Empty,
NextState::Empty => NextState::Pending,
NextState::Done => unreachable!(),
NextState::Pending => NextState::Pending,
}
}
fn poll_progress(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> ProgressState {
match self.make_progress(cx) {
(ProgressState::Paused, true) => ProgressState::Paused,
(ProgressState::Paused, _) => ProgressState::Pending,
(ProgressState::Pending, _) => ProgressState::Pending,
}
}
}
#[cfg(test)]
mod buffered_test {
use super::*;
use core::pin::pin;
use core::sync::atomic::{AtomicU64, Ordering};
struct DelayFuture<'s> {
ticker: &'s AtomicU64,
delay: u64,
start: u64,
last_tick: u64,
missed: u64,
}
impl<'s> DelayFuture<'s> {
pub fn new(ticker: &'s AtomicU64, delay: u64) -> Self {
let tick = ticker.load(Ordering::Relaxed);
Self {
ticker,
delay,
start: tick,
last_tick: tick,
missed: 0,
}
}
}
impl<'s> Future for DelayFuture<'s> {
type Output = (u64, u64);
fn poll(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let tick = self.ticker.load(Ordering::Relaxed);
if tick > self.last_tick + 1 {
self.missed += tick - (self.last_tick + 1);
panic!("Missed a tick.");
}
self.last_tick = tick;
let age = tick - self.start;
if age >= self.delay {
return Poll::Ready((age, self.missed));
}
Poll::Pending
}
}
impl<'s> ProgressFuture for DelayFuture<'s> {
fn poll_progress(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<()> {
let tick = self.ticker.load(Ordering::Relaxed);
if tick > self.last_tick + 1 {
self.missed += tick - (self.last_tick + 1);
panic!("Missed a tick.");
}
self.last_tick = tick;
if tick - self.start >= self.delay {
return Poll::Ready(());
}
Poll::Pending
}
}
struct DelayStream<'s> {
ticker: &'s AtomicU64,
max_delay: u64,
rand: XorShift,
last_tick: u64,
missed: u64,
}
impl<'s> DelayStream<'s> {
pub fn new(ticker: &'s AtomicU64, max_delay: u64) -> Self {
let tick = ticker.load(Ordering::Relaxed);
Self {
ticker,
max_delay,
rand: XorShift::new(),
last_tick: tick,
missed: 0,
}
}
}
impl<'s> AsyncIterator for DelayStream<'s> {
type Item = Buffered<DelayFuture<'s>>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> NextState<Self::Item> {
let tick = self.ticker.load(Ordering::Relaxed);
if tick > self.last_tick + 1 {
self.missed += tick - (self.last_tick + 1);
panic!("Missed a tick.");
}
self.last_tick = tick;
let delay = self.rand.rand() % self.max_delay;
NextState::Yielded(
DelayFuture::new(self.ticker, delay).buffer(),
)
}
fn poll_progress(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> ProgressState {
let tick = self.ticker.load(Ordering::Relaxed);
if tick > self.last_tick + 1 {
self.missed += tick - (self.last_tick + 1);
panic!("Missed a tick.");
}
self.last_tick = tick;
ProgressState::Paused
}
}
#[test]
fn buffered_test() {
cfg_match! {
cfg(miri) => {
const N_ITERS: usize = 2usize.pow(7);
const BUFSIZE: usize = 2usize.pow(2);
const INV_RATE: usize = 10;
const MAX_DELAY: u64 = 80;
}
_ => {
const N_ITERS: usize = 2usize.pow(20);
const BUFSIZE: usize = 2usize.pow(8);
const INV_RATE: usize = 100;
const MAX_DELAY: u64 = 800;
}
}
let ticker = AtomicU64::new(0);
let mut cx = NOP_CONTEXT;
let ds = DelayStream::new(&ticker, MAX_DELAY);
let bs = Bottleneck::new(ds, BUFSIZE);
let mut bs = pin!(bs);
let tick = || ticker.fetch_add(1, Ordering::Relaxed);
let mut max_age = 0;
for i in 0..N_ITERS {
_ = tick();
if i % INV_RATE == 0 {
match bs.as_mut().poll_next(&mut cx) {
NextState::Yielded((age, missed)) => {
assert_eq!(0, missed);
if age > max_age {
assert!(age < MAX_DELAY);
max_age = age
}
}
NextState::Pending => {}
_ => unreachable!(),
}
} else {
_ = bs.as_mut().poll_progress(&mut cx);
}
}
dbg!(max_age);
}
}
// ###################################################################
// Supporting code.
pub use nop_waker::*;
mod nop_waker {
use core::{
future::Future,
pin::{pin, Pin},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
pub const NOP_RAWWAKER: RawWaker = {
fn nop(_: *const ()) {}
const VTAB: RawWakerVTable =
RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop);
RawWaker::new(&() as *const (), &VTAB)
};
pub const NOP_WAKER: &Waker =
&unsafe { Waker::from_raw(NOP_RAWWAKER) };
pub const NOP_CONTEXT: Context<'static> =
Context::from_waker(NOP_WAKER);
pub fn poll_once<T, F>(f: &mut F) -> Poll<T>
where
F: Future<Output = T> + ?Sized + Unpin,
{
let mut cx = NOP_CONTEXT;
Pin::new(f).as_mut().poll(&mut cx)
}
pub fn poll_once_owned<T, F>(f: F) -> Poll<T>
where
F: Future<Output = T>,
{
poll_once(&mut pin!(f))
}
#[cfg(test)]
pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T>
where
F: Future<Output = T> + Send,
{
std::thread::scope(move |s| {
s.spawn(move || poll_once_owned(f)).join().unwrap()
})
}
}
pub use circular_queue::*;
mod circular_queue {
use core::mem::{transmute, MaybeUninit};
pub struct Queue<T, const LEN: usize> {
buf: [MaybeUninit<T>; LEN],
idx: usize,
len: usize,
}
impl<T: Sized, const LEN: usize> Queue<T, LEN> {
pub fn new() -> Self {
let buf: [MaybeUninit<T>; LEN] =
unsafe { MaybeUninit::uninit().assume_init() };
Self { buf, idx: 0, len: 0 }
}
pub fn len(&self) -> usize {
self.len
}
pub fn push(&mut self, x: T) {
assert!(self.len < LEN);
let idx = (self.idx + self.len) % LEN;
self.buf[idx] = MaybeUninit::new(x);
self.len += 1;
}
#[must_use]
pub fn pop(&mut self) -> Option<T> {
let true = self.len > 0 else { return None };
let v = unsafe { self.buf[self.idx].assume_init_read() };
self.idx = (self.idx + 1) % LEN;
self.len -= 1;
Some(v)
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
struct IterMut<'a, T: 'a, const LEN: usize> {
queue: &'a mut Queue<T, LEN>,
idx: usize,
len: usize,
}
impl<'a, T: 'a, const LEN: usize> Iterator for IterMut<'a, T, LEN> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
let true = self.len > 0 else { return None };
let v: &'a mut T = unsafe {
transmute(
self.queue.buf[self.idx]
.assume_init_mut(),
)
};
self.idx = (self.idx + 1) % LEN;
self.len -= 1;
Some(v)
}
}
let (idx, len) = (self.idx, self.len);
IterMut::<'_, _, LEN> { queue: self, idx, len }
}
}
impl<T, const LEN: usize> Drop for Queue<T, LEN> {
fn drop(&mut self) {
while self.len() > 0 {
_ = self.pop();
}
}
}
}
pub use xorshift::*;
mod xorshift {
pub struct XorShift {
seed: u64,
}
impl XorShift {
pub fn new() -> Self {
Self { seed: 1 }
}
pub fn rand(&mut self) -> u64 {
self.seed ^= self.seed << 23;
self.seed ^= self.seed >> 13;
self.seed ^= self.seed << 38;
self.seed
}
}
impl Iterator for XorShift {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
Some(self.rand())
}
}
}
Meeting date: 2024-02-15
Feb 16, 2024We propose to stabilize #![feature(impl_trait_in_assoc_type)], commonly called either "associated type position impl Trait" (ATPIT) or "impl Trait in associated type" (ITIAT).
Feb 7, 2024Meeting date: 2024-01-05
Jan 6, 2024Meeting date: 2023-12-14
Dec 15, 2023or
By clicking below, you agree to our terms of service.
New to HackMD? Sign up