Rust Async Working Group
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Write
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
    • Invite by email
      Invitee

      This note has no invitees

    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Note Insights
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Versions and GitHub Sync Note Insights Sharing URL Help
Menu
Options
Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Write
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
  • Invite by email
    Invitee

    This note has no invitees

  • Publish Note

    Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

    Your note will be visible on your profile and discoverable by anyone.
    Your note is now live.
    This note is visible on your profile and discoverable online.
    Everyone on the web can find and read all notes of this public team.
    See published notes
    Unpublish note
    Please check the box to agree to the Community Guidelines.
    View profile
    Engagement control
    Commenting
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    • Everyone
    Suggest edit
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    Emoji Reply
    Enable
    Import from Dropbox Google Drive Gist Clipboard
       owned this note    owned this note      
    Published Linked with GitHub
    Subscribed
    • Any changes
      Be notified of any changes
    • Mention me
      Be notified of mention me
    • Unsubscribe
    Subscribe
    --- title: "Design meeting 2024-02-15: AsyncIterator prototype" tags: ["WG-async", "design-meeting", "minutes"] date: 2024-02-15 discussion: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/Design.20meeting.202024-02-15 url: https://hackmd.io/EplPcmBCTCSQ9LPOU6IZDw --- # A discussion prototype for `AsyncIterator` ```rust // This is a demonstration of how we might implement `AsyncIterator`. // We cover the follow topics in the code and comments that follow: // // - How we can allow users to implement an AFIT `async fn next` // equivalent while converting that automatically to an // `AsyncIterator` type with `IntoAsynciterator`. // // - How, for `AsyncIterator` trait objects, we can allow calls to // the `next()` method with only a single indirect call. // // - How we need to handle the backpressure / liveness problem in // our solution to `AsyncIterator`. // // - Why it's important that we can name the future returned by a // method for which we might otherwise use AFIT. This has bearing // on the possible need for RTN. This is larger than the `Send` // bound problem, but it relates to that as well. // // - How the presence of stable `async gen` blocks may affect our // thinking on this design. // // - How we can preserve cancellation safety for the future returned // by the `next()` method while still allowing the future returned // by `next_item()` to have its own non-trivial state. The former // is important since the current ecosystem assumes that futures // returned by `next()` are cancellation safe, while the latter is // important to avoid an easy mistake to make while implementing // the convenient AFIT-like trait. // // - How, even from a `dyn AsyncIterator`, we can return concrete // futures using an `async` block in the implementation. // // - Whether to separately indicate `Empty` and `Done` states to // support use cases such as `FuturesUnordered` (which currently // violate the "done" contract). // // - Whether to use a "pun" return type such as `Poll<Option<T>>` or // to use a dedicated enum in the manner of `CoroutineState`. // // - Whether and how to flatten carried effects into the type of the // value returned from `poll_next`. // // - Whether, since `?` can be used outside of `try` blocks, we // should compose the `Try` carried effect even in normal // (non-`try`) async iterators. // // - Whether `poll_progress` should indicate that a value is ready // to be yielded, that no further work can be done until // `poll_next` is called, or both. // // - Whether `poll_progress` should indicated the done state, if it // knows about, or whether it should force the caller to make a // call to `poll_next`. // // Note that everything here (aside from the buffered streams demo // below) works with `no_std` without any allocation or type erasure. //#![cfg_attr(not(test), no_std)] #![feature(const_waker)] #![feature(impl_trait_in_assoc_type)] #![allow(async_fn_in_trait)] #![allow(clippy::redundant_pattern_matching)] use core::{ future::Future, hint::unreachable_unchecked, marker::PhantomData, pin::Pin, ptr::addr_of_mut, task::{Context, Poll}, }; // This is an enum that will be returned by `poll_next`. // // We could conceivably define `AsyncIterator::poll_next` to return // values of type `Poll<Option<T>>`, however, for demonstration and to // explore some of the open questions, we'll use a dedicated enum for // this for clarity. // // There may of course be value in doing it this way "for real". In // the some way that we have a dedicated `CoroutineState` rather than // reusing `ControlFlow`, and in the same way that we have // `ControlFlow` rather than reusing `Result`, it could be sensible to // have separate enum rather than punning here. // // As we stack more carried effects, the potential value of having // separate enums becomes more clear. If this were a // `TryAsyncIterator`, then we would simply add a `Residual` variant // to this enum rather than having to worry with how to order the // composition of the carried effects. // // In fact, given that `?` can be used outside of `try` blocks in // Rust, we should consider adding a `Residual` variant here anyway. // // One open design question here is whether to have a separate `Empty` // variant. Important stream types such as `FuturesUnordered` // currently abuse the done state to signal that they are out of items // temporarily but may yield more later. We seem to have these // options: // // 1. Declare that this is a supported use of the done state and // that an `AsyncIterator` that returns done may later yield values. // // 2. Declare that this is a violation of the contract and that // `FuturesUnordered` should instead either returning `Pending` when // it is empty (and we should look into the use cases and reasons // why it does not do that) or return `Yielded(Option<T>)`. // // 3. Provide a separate way to signal empty (and not pending) // versus done. // // The downside of option 2 is that this would de facto become a // separate interface to async iterators that is not represented in // the interface itself, and that could make it more cumbersome for // generic code to handle this. // // On the other hand, having a separate state for this clutters up the // trait for all users. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum NextState<T> { /// A value has been yielded. Yielded(T), /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // This is an enum that will be returned by `poll_progress`. // // Conceivably `poll_progress` could return `Poll<()>` where returning // `Ready(())` indicates that nothing more can be done until // `poll_next` is called. // // However, in implementation experience, we found that didn't express // enough, which resulted in duplicating substantial code between // `poll_progress` and `poll_next` implementations. When // implementing, what we want is to be able to call `poll_progress` // from `poll_next`. However, for that to be useful, we need for // `poll_progress` to return a signal that the next value is actually // ready to be consumed. // // Not all async iterators will be able to implement a `poll_progress` // that promises that. This leaves us with three options: // // 1. Indicate only when `poll_progress` can do no more work until // `poll_next` is called. This results in duplicate code between // `poll_progress` and `poll_next`. // // 2. Indicate only when `poll_next` is ready to yield a value. // This results in some async iterators always returning `Pending` // from `poll_progress` which means that callers may invoke // `poll_pending` more often than needed. // // 3. Indicate both states. // // We've take option 3 below, and have labeled these states `Paused` // and `ReadyToYield`. // // A separate design point is the question of whether, if // `poll_progress` knows that the async iterator is finished, whether // it should indicate that itself or whether it should indicated // `Paused` and force the caller to call `poll_next` to receive the // `Done` or `Empty` indication. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum ProgressState { /// A value is ready to be yielded; the next call to `poll_next` /// is guaranteed to produce a `Yielded(_)` value. ReadyToYield, /// No further progress can be made until `poll_next` is called; /// it's safe to elide calls to `poll_progress` until after the /// next call to `poll_next`. Paused, /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // Let's say that the primary `AsyncIterator` trait in the language // uses `poll_next`. All language features such as `async gen` are // defined in terms of this. pub trait AsyncIterator { type Item; // While we're talking about `AsyncIterator`, note that we also // need to solve the backpressure / liveness problem (also known // as the "buffered streams" problem) in some way. We've // discussed `poll_progress` as one solution for this. fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> ProgressState { ProgressState::Paused } // The `for await` syntax would use this method. fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item>; } // For convenience, we define an extension trait so as to provide a // convenient `next` method to callers. This trait cannot be // implemented other than by the blanket impl, and users would be // discouraged from using it in bounds, similar to (but perhaps // stronger than) how we discourage using `From` in bounds. pub trait AsyncFnNext: AsyncIterator { // Note that the returned future here is cancellation safe. That // is, it's always safe to drop this future and no items or other // state will be lost. // // This is in contrast to an approach where `async fn next` is // implemented directly. In that world, callers must assume that // the future returned by `next` may hold non-trivial state and // may not be cancellation safe. type Future<'s>: Future< Output = Option<<Self as AsyncIterator>::Item>, > where Self: 's; fn next(self: Pin<&mut Self>) -> Self::Future<'_>; } // In this blanket impl, for a trait object, `T` here is going to be // `dyn AsyncIterator`. This makes the call to `next` a // statically-dispatched *direct* call. // // This is the optimal implementation of a `next()` method as, for a // `dyn AsyncIterator`, calling this and polling the resulting future // requires only one indirect call. All other known approaches, // including approaches where implementors would implement `async fn // next` directly, require two indirect calls. impl<T: AsyncIterator + ?Sized> AsyncFnNext for T { // We name this associated type rather than using AFIT so that // downstream code can use this name to set bounds or to store // this future in an ADT. type Future<'s> = impl Future<Output = Option<<Self as AsyncIterator>::Item>> where Self: 's; fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> { core::future::poll_fn(move |cx| { // This call to `poll_next` is the only indirect call when // the `Self` type is a trait object. match self.as_mut().poll_next(cx) { NextState::Yielded(x) => Poll::Ready(Some(x)), NextState::Done => Poll::Ready(None), NextState::Empty => Poll::Ready(None), NextState::Pending => Poll::Pending, } }) } } // We have an `IntoAsyncIterator` trait, just as we have an // `IntoFuture` trait. This is called on the expression passed to // `for await` before we pin the result and begin to poll/iterate it. // // Note that if we had `async gen` blocks in the language, those could // be used to implement this directly to much the same effect as we // achieve with the `AsyncFnNextiterator` below. pub trait IntoAsyncIterator<S> { type Item; type IntoAsyncIterator: AsyncIterator<Item = Self::Item>; fn into_async_iterator(self) -> Self::IntoAsyncIterator; } // Suppose that some users want to implement asynchronous iterators by // using `async` blocks rather than by implementing a // `AsyncIterator::poll_next` manually. We provide this trait to // allow for that. Any time that implements this trait automatically // implements `IntoAsyncIterator`. // // Jump down to the tests below to see how this is used. pub trait AsyncFnNextIterator { type Item; // We give a name to the returned future rather than using AFIT // for two reasons: // // 1. Since the future can be named, callers can add bounds on // this future, which allows use sites to set bounds for it. That // solves, among other things, the `Send` bound problem. // // 2. Since the future can be named, we can store it unboxed when // converting the `Self` into an `AsyncIterator`. // // Note that to match the AFIT version, we need this to be a GAT // so that the returned future can capture a reference to the // `Self` type. type NextFuture<'s>: Future<Output = Option<Self::Item>> where Self: 's; fn next_item(&mut self) -> Self::NextFuture<'_>; } // This struct will wrap our `AsyncFnNextIterator` and hold onto the // previous future that it has returned. This type is // self-referential because the returned future may hold a reference // to data in the async iterator. pub struct ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { iter: I, // Note that to make this work, it's critical that we're able to // name `NextFuture`, as otherwise we could not store it unboxed // in this struct. fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>, _p: PhantomData<&'s ()>, } // This blanket impl makes all types that implement // `AsyncFnNextIterator` into types that implement // `IntoAsynciterator`. impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { type Item = <I as AsyncFnNextIterator>::Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { // SAFETY: Writing the following in terms of raw pointers // raises fewer opsem questions than doing it with references, // so we wrap the entire body in `unsafe` to make this // readable. unsafe { // SAFETY: This is for the pin projections below. let this = addr_of_mut!(*self.get_unchecked_mut()); if let None = (*this).fut { // SAFETY: We're doing lifetime extension here. The // fact that the `Self` type is pinned is what makes // this sound. let fut: I::NextFuture<'s> = (*this).iter.next_item(); (*this).fut = Some(fut); } let Some(ref mut fut) = (*this).fut else { // SAFETY: We just ensured this must be `Some(_)`. unreachable_unchecked() }; // SAFETY: We uphold the pin contract for this value. let pfut = Pin::new_unchecked(fut); match pfut.poll(cx) { Poll::Ready(x) => { (*this).fut = None; match x { Some(x) => NextState::Yielded(x), None => NextState::Done, } } Poll::Pending => NextState::Pending, } } } } // All `AsyncFnNextIterator` types implement `IntoAsynciterator`. impl<'s, I> IntoAsyncIterator<&'s ()> for I where I: AsyncFnNextIterator + 's, <I as AsyncFnNextIterator>::Item: 's, { type Item = <I as AsyncFnNextIterator>::Item; type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>; fn into_async_iterator(self) -> Self::IntoAsyncIterator { ConvertedAsyncIterator { iter: self, fut: None, _p: PhantomData, } } } // ################################################################### // Tests and demonstration of use. #[cfg(test)] mod always42_test { use super::*; use core::pin::pin; // Let's say a user comes along and wants to make the `Always42` // struct into an asynchronous iterator the "easy way". So the // user writes this impl in terms of the AFIT-like version. struct Always42; impl AsyncFnNextIterator for Always42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_always42_poll() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always42_next() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates that we correctly handle state within the // returned future. #[cfg(test)] mod alwaysonepending_test { use super::*; use core::pin::pin; struct AlwaysOnePending; impl AsyncFnNextIterator for AlwaysOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match once { true => Poll::Ready(Some(42)), false => { once = true; Poll::Pending } }) .await } } } #[test] fn test_always_one_pending_poll() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always_one_pending_next() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates the two FSM situation. We correctly handle the // state both in the underlying `AsyncFnNextIterator` and the state in // the returned future. #[cfg(test)] mod onceonepending_test { use super::*; use core::pin::pin; struct OnceOnePending(bool); impl AsyncFnNextIterator for OnceOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match (self.0, once) { (false, false) => { once = true; Poll::Pending } (false, true) => { self.0 = true; Poll::Ready(Some(42)) } (true, _) => Poll::Ready(None), }) .await } } } #[test] fn test_once_one_pending_poll() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Done, next()); assert_eq!(NextState::Done, next()); } #[test] fn test_once_one_pending_next() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(None), next()); assert_eq!(Poll::Ready(None), next()); } } // Test that we can implement `AsyncFnNextIterator` both such that the // returned future implements `Send` and such that it does not, and // that callers can set bounds so as to rely on this. #[cfg(test)] mod sendalways42_test { use super::*; use core::pin::pin; struct SendAlways42; impl AsyncFnNextIterator for SendAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_sendalways42_next() { let x = SendAlways42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_spawn(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } #[cfg(test)] mod localalways42_test { use super::*; use std::{future::ready, rc::Rc}; struct LocalAlways42(Rc<()>); impl AsyncFnNextIterator for LocalAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { let x = self.0.clone(); async move { Some((ready(()).await, *x, 42).2) } } } #[test] fn test_localalways42_next() { use core::pin::pin; let x = LocalAlways42(Rc::new(())).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // Test that we can implement `AsyncFnNextIterator` using an `async` // block and a concrete future while using the resulting // `AsyncIterator` with dynamic dispatch and type erasure. #[cfg(test)] mod dynalways42_test { use super::*; struct DynAlways42; impl AsyncFnNextIterator for DynAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_dynalways42_poll() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_dynalways42_next() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // ################################################################### // The backpressure / liveness problem. // This is a demonstration of why it may make sense to add // `poll_progress` to `Future`. Doing this allows for higher level // constructs such as `FuturesUnordered` and buffered versions of it // to be built in a more orthogonal way. pub trait ProgressFuture: Future { fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<()> { Poll::Ready(()) } } pub trait BufferedFuture: ProgressFuture { fn buffer(self) -> Buffered<Self> where Self: Sized; } pub enum Buffered<Fut> where Fut: Future, { Pending(Fut), Ready(Option<<Fut as Future>::Output>), } impl<Fut> Future for Buffered<Fut> where Fut: Future, { type Output = <Fut as Future>::Output; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; match this { Buffered::Pending(fut) => { let fut = unsafe { Pin::new_unchecked(fut) }; fut.poll(cx) } Buffered::Ready(None) => Poll::Pending, Buffered::Ready(x) => Poll::Ready(x.take().unwrap()), } } } impl<Fut> ProgressFuture for Buffered<Fut> where Fut: Future, { fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<()> { let this = unsafe { self.get_unchecked_mut() }; let Buffered::Pending(fut) = this else { return Poll::Ready(()); }; let fut = unsafe { Pin::new_unchecked(fut) }; match fut.poll(cx) { Poll::Ready(x) => { *this = Buffered::Ready(Some(x)); Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } impl<Fut> BufferedFuture for Buffered<Fut> where Fut: Future, { fn buffer(self) -> Buffered<Self> where Self: Sized, { Buffered::Pending(self) } } // ################################################################### // `FuturesUnordered` demonstration. extern crate alloc; use alloc::collections::VecDeque; // This is a highly simplified and deoptimized version. pub struct FuturesUnordered<Fut> where Fut: ProgressFuture, { queue: VecDeque<Fut>, } impl<Fut> FuturesUnordered<Fut> where Fut: ProgressFuture, { pub fn len(&self) -> usize { self.queue.len() } pub fn push(&mut self, f: Fut) { self.queue.push_back(f); } } impl<Fut> AsyncIterator for FuturesUnordered<Fut> where Fut: ProgressFuture, { type Item = <Fut as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let (mut i, len) = (0usize, this.queue.len()); while let Some(mut fut) = this.queue.pop_front() { { let fut = unsafe { Pin::new_unchecked(&mut fut) }; match fut.poll(cx) { Poll::Pending => {} Poll::Ready(x) => return NextState::Yielded(x), } } this.queue.push_back(fut); i += 1; let false = i == len else { break }; } NextState::Pending } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut state = ProgressState::ReadyToYield; for fut in this.queue.iter_mut() { let fut = unsafe { Pin::new_unchecked(fut) }; if let Poll::Pending = fut.poll_progress(cx) { state = ProgressState::Pending; } } state } } pub struct BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { stream: St, queue: FuturesUnordered<<St as AsyncIterator>::Item>, max: usize, } impl<St> AsyncIterator for BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { type Item = <<St as AsyncIterator>::Item as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let mut empty = false; while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { empty = true; break; } _ => break, } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_next(cx) { NextState::Yielded(x) => NextState::Yielded(x), NextState::Empty if empty => NextState::Empty, _ => NextState::Pending, } } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut ready = false; if this.queue.len() < this.max { while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { ready = true; break; } _ => break, } } } else { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_progress(cx) { ProgressState::ReadyToYield => ready = true, _ => {} } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_progress(cx) { ProgressState::ReadyToYield if ready => { ProgressState::ReadyToYield } _ => ProgressState::Pending, } } } // ################################################################### // Supporting code. pub use nop_waker::*; mod nop_waker { use core::{ future::Future, pin::{pin, Pin}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; pub const NOP_RAWWAKER: RawWaker = { fn nop(_: *const ()) {} const VTAB: RawWakerVTable = RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop); RawWaker::new(&() as *const (), &VTAB) }; pub const NOP_WAKER: &Waker = &unsafe { Waker::from_raw(NOP_RAWWAKER) }; pub const NOP_CONTEXT: Context<'static> = Context::from_waker(NOP_WAKER); pub fn poll_once<T, F>(f: &mut F) -> Poll<T> where F: Future<Output = T> + ?Sized + Unpin, { let mut cx = NOP_CONTEXT; Pin::new(f).as_mut().poll(&mut cx) } pub fn poll_once_owned<T, F>(f: F) -> Poll<T> where F: Future<Output = T>, { poll_once(&mut pin!(f)) } #[cfg(test)] pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T> where F: Future<Output = T> + Send, { std::thread::scope(move |s| { s.spawn(move || poll_once_owned(f)).join().unwrap() }) } } ``` --- # Discussion ## Attendance - People: TC, tmandry, eholk, Daria, CE, Vincenzo, Yosh (20 mins late) ## Meeting roles - Minutes, driver: TC ## AsyncIterator -> futures::Stream backport Daria: There definitelly should be a way to transform `AsyncIterator` into `futures::Stream` (and also tokio's Stream). What downsides of this are there? ## poll_progress (In response to a question...) TC: `poll_progress` is about maintaining liveness. Keeping the plates spinning. But you need to maintain backpressure. `poll_progress` allows you to maintain liveness without having unlimited buffering by separating the liveness concern from having to handle the output.

    Import from clipboard

    Paste your markdown or webpage here...

    Advanced permission required

    Your current role can only read. Ask the system administrator to acquire write and comment permission.

    This team is disabled

    Sorry, this team is disabled. You can't edit this note.

    This note is locked

    Sorry, only owner can edit this note.

    Reach the limit

    Sorry, you've reached the max length this note can be.
    Please reduce the content or divide it to more notes, thank you!

    Import from Gist

    Import from Snippet

    or

    Export to Snippet

    Are you sure?

    Do you really want to delete this note?
    All users will lose their connection.

    Create a note from template

    Create a note from template

    Oops...
    This template has been removed or transferred.
    Upgrade
    All
    • All
    • Team
    No template.

    Create a template

    Upgrade

    Delete template

    Do you really want to delete this template?
    Turn this template into a regular note and keep its content, versions, and comments.

    This page need refresh

    You have an incompatible client version.
    Refresh to update.
    New version available!
    See releases notes here
    Refresh to enjoy new features.
    Your user state has changed.
    Refresh to load new user state.

    Sign in

    Forgot password

    or

    By clicking below, you agree to our terms of service.

    Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
    Wallet ( )
    Connect another wallet

    New to HackMD? Sign up

    Help

    • English
    • 中文
    • Français
    • Deutsch
    • 日本語
    • Español
    • Català
    • Ελληνικά
    • Português
    • italiano
    • Türkçe
    • Русский
    • Nederlands
    • hrvatski jezik
    • język polski
    • Українська
    • हिन्दी
    • svenska
    • Esperanto
    • dansk

    Documents

    Help & Tutorial

    How to use Book mode

    Slide Example

    API Docs

    Edit in VSCode

    Install browser extension

    Contacts

    Feedback

    Discord

    Send us email

    Resources

    Releases

    Pricing

    Blog

    Policy

    Terms

    Privacy

    Cheatsheet

    Syntax Example Reference
    # Header Header 基本排版
    - Unordered List
    • Unordered List
    1. Ordered List
    1. Ordered List
    - [ ] Todo List
    • Todo List
    > Blockquote
    Blockquote
    **Bold font** Bold font
    *Italics font* Italics font
    ~~Strikethrough~~ Strikethrough
    19^th^ 19th
    H~2~O H2O
    ++Inserted text++ Inserted text
    ==Marked text== Marked text
    [link text](https:// "title") Link
    ![image alt](https:// "title") Image
    `Code` Code 在筆記中貼入程式碼
    ```javascript
    var i = 0;
    ```
    var i = 0;
    :smile: :smile: Emoji list
    {%youtube youtube_id %} Externals
    $L^aT_eX$ LaTeX
    :::info
    This is a alert area.
    :::

    This is a alert area.

    Versions and GitHub Sync
    Get Full History Access

    • Edit version name
    • Delete

    revision author avatar     named on  

    More Less

    Note content is identical to the latest version.
    Compare
      Choose a version
      No search result
      Version not found
    Sign in to link this note to GitHub
    Learn more
    This note is not linked with GitHub
     

    Feedback

    Submission failed, please try again

    Thanks for your support.

    On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

    Please give us some advice and help us improve HackMD.

     

    Thanks for your feedback

    Remove version name

    Do you want to remove this version name and description?

    Transfer ownership

    Transfer to
      Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

        Link with GitHub

        Please authorize HackMD on GitHub
        • Please sign in to GitHub and install the HackMD app on your GitHub repo.
        • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
        Learn more  Sign in to GitHub

        Push the note to GitHub Push to GitHub Pull a file from GitHub

          Authorize again
         

        Choose which file to push to

        Select repo
        Refresh Authorize more repos
        Select branch
        Select file
        Select branch
        Choose version(s) to push
        • Save a new version and push
        • Choose from existing versions
        Include title and tags
        Available push count

        Pull from GitHub

         
        File from GitHub
        File from HackMD

        GitHub Link Settings

        File linked

        Linked by
        File path
        Last synced branch
        Available push count

        Danger Zone

        Unlink
        You will no longer receive notification when GitHub file changes after unlink.

        Syncing

        Push failed

        Push successfully