Rust Async Working Group
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Write
        • Owners
        • Signed-in users
        • Everyone
        Owners Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
      • Invitee
      • No invitee
    • Publish Note

      Publish Note

      Everyone on the web can find and read all notes of this public team.
      Once published, notes can be searched and viewed by anyone online.
      See published notes
      Please check the box to agree to the Community Guidelines.
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Sharing URL Help
Menu
Options
Versions and GitHub Sync Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Write
Owners
  • Owners
  • Signed-in users
  • Everyone
Owners Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
Invitee
No invitee
Publish Note

Publish Note

Everyone on the web can find and read all notes of this public team.
Once published, notes can be searched and viewed by anyone online.
See published notes
Please check the box to agree to the Community Guidelines.
Engagement control
Commenting
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
  • Everyone
Suggest edit
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
Emoji Reply
Enable
Import from Dropbox Google Drive Gist Clipboard
   owned this note    owned this note      
Published Linked with GitHub
Subscribed
  • Any changes
    Be notified of any changes
  • Mention me
    Be notified of mention me
  • Unsubscribe
Subscribe
--- title: "Design meeting 2024-02-15: AsyncIterator prototype" tags: ["WG-async", "design-meeting", "minutes"] date: 2024-02-15 discussion: https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async/topic/Design.20meeting.202024-02-15 url: https://hackmd.io/EplPcmBCTCSQ9LPOU6IZDw --- # A discussion prototype for `AsyncIterator` ```rust // This is a demonstration of how we might implement `AsyncIterator`. // We cover the follow topics in the code and comments that follow: // // - How we can allow users to implement an AFIT `async fn next` // equivalent while converting that automatically to an // `AsyncIterator` type with `IntoAsynciterator`. // // - How, for `AsyncIterator` trait objects, we can allow calls to // the `next()` method with only a single indirect call. // // - How we need to handle the backpressure / liveness problem in // our solution to `AsyncIterator`. // // - Why it's important that we can name the future returned by a // method for which we might otherwise use AFIT. This has bearing // on the possible need for RTN. This is larger than the `Send` // bound problem, but it relates to that as well. // // - How the presence of stable `async gen` blocks may affect our // thinking on this design. // // - How we can preserve cancellation safety for the future returned // by the `next()` method while still allowing the future returned // by `next_item()` to have its own non-trivial state. The former // is important since the current ecosystem assumes that futures // returned by `next()` are cancellation safe, while the latter is // important to avoid an easy mistake to make while implementing // the convenient AFIT-like trait. // // - How, even from a `dyn AsyncIterator`, we can return concrete // futures using an `async` block in the implementation. // // - Whether to separately indicate `Empty` and `Done` states to // support use cases such as `FuturesUnordered` (which currently // violate the "done" contract). // // - Whether to use a "pun" return type such as `Poll<Option<T>>` or // to use a dedicated enum in the manner of `CoroutineState`. // // - Whether and how to flatten carried effects into the type of the // value returned from `poll_next`. // // - Whether, since `?` can be used outside of `try` blocks, we // should compose the `Try` carried effect even in normal // (non-`try`) async iterators. // // - Whether `poll_progress` should indicate that a value is ready // to be yielded, that no further work can be done until // `poll_next` is called, or both. // // - Whether `poll_progress` should indicated the done state, if it // knows about, or whether it should force the caller to make a // call to `poll_next`. // // Note that everything here (aside from the buffered streams demo // below) works with `no_std` without any allocation or type erasure. //#![cfg_attr(not(test), no_std)] #![feature(const_waker)] #![feature(impl_trait_in_assoc_type)] #![allow(async_fn_in_trait)] #![allow(clippy::redundant_pattern_matching)] use core::{ future::Future, hint::unreachable_unchecked, marker::PhantomData, pin::Pin, ptr::addr_of_mut, task::{Context, Poll}, }; // This is an enum that will be returned by `poll_next`. // // We could conceivably define `AsyncIterator::poll_next` to return // values of type `Poll<Option<T>>`, however, for demonstration and to // explore some of the open questions, we'll use a dedicated enum for // this for clarity. // // There may of course be value in doing it this way "for real". In // the some way that we have a dedicated `CoroutineState` rather than // reusing `ControlFlow`, and in the same way that we have // `ControlFlow` rather than reusing `Result`, it could be sensible to // have separate enum rather than punning here. // // As we stack more carried effects, the potential value of having // separate enums becomes more clear. If this were a // `TryAsyncIterator`, then we would simply add a `Residual` variant // to this enum rather than having to worry with how to order the // composition of the carried effects. // // In fact, given that `?` can be used outside of `try` blocks in // Rust, we should consider adding a `Residual` variant here anyway. // // One open design question here is whether to have a separate `Empty` // variant. Important stream types such as `FuturesUnordered` // currently abuse the done state to signal that they are out of items // temporarily but may yield more later. We seem to have these // options: // // 1. Declare that this is a supported use of the done state and // that an `AsyncIterator` that returns done may later yield values. // // 2. Declare that this is a violation of the contract and that // `FuturesUnordered` should instead either returning `Pending` when // it is empty (and we should look into the use cases and reasons // why it does not do that) or return `Yielded(Option<T>)`. // // 3. Provide a separate way to signal empty (and not pending) // versus done. // // The downside of option 2 is that this would de facto become a // separate interface to async iterators that is not represented in // the interface itself, and that could make it more cumbersome for // generic code to handle this. // // On the other hand, having a separate state for this clutters up the // trait for all users. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum NextState<T> { /// A value has been yielded. Yielded(T), /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // This is an enum that will be returned by `poll_progress`. // // Conceivably `poll_progress` could return `Poll<()>` where returning // `Ready(())` indicates that nothing more can be done until // `poll_next` is called. // // However, in implementation experience, we found that didn't express // enough, which resulted in duplicating substantial code between // `poll_progress` and `poll_next` implementations. When // implementing, what we want is to be able to call `poll_progress` // from `poll_next`. However, for that to be useful, we need for // `poll_progress` to return a signal that the next value is actually // ready to be consumed. // // Not all async iterators will be able to implement a `poll_progress` // that promises that. This leaves us with three options: // // 1. Indicate only when `poll_progress` can do no more work until // `poll_next` is called. This results in duplicate code between // `poll_progress` and `poll_next`. // // 2. Indicate only when `poll_next` is ready to yield a value. // This results in some async iterators always returning `Pending` // from `poll_progress` which means that callers may invoke // `poll_pending` more often than needed. // // 3. Indicate both states. // // We've take option 3 below, and have labeled these states `Paused` // and `ReadyToYield`. // // A separate design point is the question of whether, if // `poll_progress` knows that the async iterator is finished, whether // it should indicate that itself or whether it should indicated // `Paused` and force the caller to call `poll_next` to receive the // `Done` or `Empty` indication. #[derive( Clone, Copy, Debug, Hash, Eq, Ord, PartialEq, PartialOrd, )] pub enum ProgressState { /// A value is ready to be yielded; the next call to `poll_next` /// is guaranteed to produce a `Yielded(_)` value. ReadyToYield, /// No further progress can be made until `poll_next` is called; /// it's safe to elide calls to `poll_progress` until after the /// next call to `poll_next`. Paused, /// No further values will ever be yielded. Done, /// No further values will be yielded until some action by the /// caller is taken. Empty, /// No value is yet ready. Pending, } // Let's say that the primary `AsyncIterator` trait in the language // uses `poll_next`. All language features such as `async gen` are // defined in terms of this. pub trait AsyncIterator { type Item; // While we're talking about `AsyncIterator`, note that we also // need to solve the backpressure / liveness problem (also known // as the "buffered streams" problem) in some way. We've // discussed `poll_progress` as one solution for this. fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> ProgressState { ProgressState::Paused } // The `for await` syntax would use this method. fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item>; } // For convenience, we define an extension trait so as to provide a // convenient `next` method to callers. This trait cannot be // implemented other than by the blanket impl, and users would be // discouraged from using it in bounds, similar to (but perhaps // stronger than) how we discourage using `From` in bounds. pub trait AsyncFnNext: AsyncIterator { // Note that the returned future here is cancellation safe. That // is, it's always safe to drop this future and no items or other // state will be lost. // // This is in contrast to an approach where `async fn next` is // implemented directly. In that world, callers must assume that // the future returned by `next` may hold non-trivial state and // may not be cancellation safe. type Future<'s>: Future< Output = Option<<Self as AsyncIterator>::Item>, > where Self: 's; fn next(self: Pin<&mut Self>) -> Self::Future<'_>; } // In this blanket impl, for a trait object, `T` here is going to be // `dyn AsyncIterator`. This makes the call to `next` a // statically-dispatched *direct* call. // // This is the optimal implementation of a `next()` method as, for a // `dyn AsyncIterator`, calling this and polling the resulting future // requires only one indirect call. All other known approaches, // including approaches where implementors would implement `async fn // next` directly, require two indirect calls. impl<T: AsyncIterator + ?Sized> AsyncFnNext for T { // We name this associated type rather than using AFIT so that // downstream code can use this name to set bounds or to store // this future in an ADT. type Future<'s> = impl Future<Output = Option<<Self as AsyncIterator>::Item>> where Self: 's; fn next(mut self: Pin<&mut Self>) -> Self::Future<'_> { core::future::poll_fn(move |cx| { // This call to `poll_next` is the only indirect call when // the `Self` type is a trait object. match self.as_mut().poll_next(cx) { NextState::Yielded(x) => Poll::Ready(Some(x)), NextState::Done => Poll::Ready(None), NextState::Empty => Poll::Ready(None), NextState::Pending => Poll::Pending, } }) } } // We have an `IntoAsyncIterator` trait, just as we have an // `IntoFuture` trait. This is called on the expression passed to // `for await` before we pin the result and begin to poll/iterate it. // // Note that if we had `async gen` blocks in the language, those could // be used to implement this directly to much the same effect as we // achieve with the `AsyncFnNextiterator` below. pub trait IntoAsyncIterator<S> { type Item; type IntoAsyncIterator: AsyncIterator<Item = Self::Item>; fn into_async_iterator(self) -> Self::IntoAsyncIterator; } // Suppose that some users want to implement asynchronous iterators by // using `async` blocks rather than by implementing a // `AsyncIterator::poll_next` manually. We provide this trait to // allow for that. Any time that implements this trait automatically // implements `IntoAsyncIterator`. // // Jump down to the tests below to see how this is used. pub trait AsyncFnNextIterator { type Item; // We give a name to the returned future rather than using AFIT // for two reasons: // // 1. Since the future can be named, callers can add bounds on // this future, which allows use sites to set bounds for it. That // solves, among other things, the `Send` bound problem. // // 2. Since the future can be named, we can store it unboxed when // converting the `Self` into an `AsyncIterator`. // // Note that to match the AFIT version, we need this to be a GAT // so that the returned future can capture a reference to the // `Self` type. type NextFuture<'s>: Future<Output = Option<Self::Item>> where Self: 's; fn next_item(&mut self) -> Self::NextFuture<'_>; } // This struct will wrap our `AsyncFnNextIterator` and hold onto the // previous future that it has returned. This type is // self-referential because the returned future may hold a reference // to data in the async iterator. pub struct ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { iter: I, // Note that to make this work, it's critical that we're able to // name `NextFuture`, as otherwise we could not store it unboxed // in this struct. fut: Option<<I as AsyncFnNextIterator>::NextFuture<'s>>, _p: PhantomData<&'s ()>, } // This blanket impl makes all types that implement // `AsyncFnNextIterator` into types that implement // `IntoAsynciterator`. impl<'s, I> AsyncIterator for ConvertedAsyncIterator<'s, I> where I: AsyncFnNextIterator, Self: 's, { type Item = <I as AsyncFnNextIterator>::Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { // SAFETY: Writing the following in terms of raw pointers // raises fewer opsem questions than doing it with references, // so we wrap the entire body in `unsafe` to make this // readable. unsafe { // SAFETY: This is for the pin projections below. let this = addr_of_mut!(*self.get_unchecked_mut()); if let None = (*this).fut { // SAFETY: We're doing lifetime extension here. The // fact that the `Self` type is pinned is what makes // this sound. let fut: I::NextFuture<'s> = (*this).iter.next_item(); (*this).fut = Some(fut); } let Some(ref mut fut) = (*this).fut else { // SAFETY: We just ensured this must be `Some(_)`. unreachable_unchecked() }; // SAFETY: We uphold the pin contract for this value. let pfut = Pin::new_unchecked(fut); match pfut.poll(cx) { Poll::Ready(x) => { (*this).fut = None; match x { Some(x) => NextState::Yielded(x), None => NextState::Done, } } Poll::Pending => NextState::Pending, } } } } // All `AsyncFnNextIterator` types implement `IntoAsynciterator`. impl<'s, I> IntoAsyncIterator<&'s ()> for I where I: AsyncFnNextIterator + 's, <I as AsyncFnNextIterator>::Item: 's, { type Item = <I as AsyncFnNextIterator>::Item; type IntoAsyncIterator = ConvertedAsyncIterator<'s, I>; fn into_async_iterator(self) -> Self::IntoAsyncIterator { ConvertedAsyncIterator { iter: self, fut: None, _p: PhantomData, } } } // ################################################################### // Tests and demonstration of use. #[cfg(test)] mod always42_test { use super::*; use core::pin::pin; // Let's say a user comes along and wants to make the `Always42` // struct into an asynchronous iterator the "easy way". So the // user writes this impl in terms of the AFIT-like version. struct Always42; impl AsyncFnNextIterator for Always42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_always42_poll() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always42_next() { let mut x = Always42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates that we correctly handle state within the // returned future. #[cfg(test)] mod alwaysonepending_test { use super::*; use core::pin::pin; struct AlwaysOnePending; impl AsyncFnNextIterator for AlwaysOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match once { true => Poll::Ready(Some(42)), false => { once = true; Poll::Pending } }) .await } } } #[test] fn test_always_one_pending_poll() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_always_one_pending_next() { let mut x = AlwaysOnePending.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // This demonstrates the two FSM situation. We correctly handle the // state both in the underlying `AsyncFnNextIterator` and the state in // the returned future. #[cfg(test)] mod onceonepending_test { use super::*; use core::pin::pin; struct OnceOnePending(bool); impl AsyncFnNextIterator for OnceOnePending { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { let mut once = false; core::future::poll_fn(move |_| match (self.0, once) { (false, false) => { once = true; Poll::Pending } (false, true) => { self.0 = true; Poll::Ready(Some(42)) } (true, _) => Poll::Ready(None), }) .await } } } #[test] fn test_once_one_pending_poll() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Pending, next()); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Done, next()); assert_eq!(NextState::Done, next()); } #[test] fn test_once_one_pending_next() { let mut x = OnceOnePending(false).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Pending, next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(None), next()); assert_eq!(Poll::Ready(None), next()); } } // Test that we can implement `AsyncFnNextIterator` both such that the // returned future implements `Send` and such that it does not, and // that callers can set bounds so as to rely on this. #[cfg(test)] mod sendalways42_test { use super::*; use core::pin::pin; struct SendAlways42; impl AsyncFnNextIterator for SendAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_sendalways42_next() { let x = SendAlways42.into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_spawn(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } #[cfg(test)] mod localalways42_test { use super::*; use std::{future::ready, rc::Rc}; struct LocalAlways42(Rc<()>); impl AsyncFnNextIterator for LocalAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { let x = self.0.clone(); async move { Some((ready(()).await, *x, 42).2) } } } #[test] fn test_localalways42_next() { use core::pin::pin; let x = LocalAlways42(Rc::new(())).into_async_iterator(); let mut x = pin!(x); let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // Test that we can implement `AsyncFnNextIterator` using an `async` // block and a concrete future while using the resulting // `AsyncIterator` with dynamic dispatch and type erasure. #[cfg(test)] mod dynalways42_test { use super::*; struct DynAlways42; impl AsyncFnNextIterator for DynAlways42 { type Item = u8; type NextFuture<'s> = impl Future<Output = Option<Self::Item>>; fn next_item(&mut self) -> Self::NextFuture<'_> { async { Some(42) } } } #[test] fn test_dynalways42_poll() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut cx = NOP_CONTEXT; let mut next = || x.as_mut().poll_next(&mut cx); assert_eq!(NextState::Yielded(42), next()); assert_eq!(NextState::Yielded(42), next()); } #[test] fn test_dynalways42_next() { let x: &mut dyn AsyncIterator<Item = u8> = &mut DynAlways42.into_async_iterator(); let mut x = unsafe { Pin::new_unchecked(x) }; let mut next = || poll_once_owned(x.as_mut().next()); assert_eq!(Poll::Ready(Some(42)), next()); assert_eq!(Poll::Ready(Some(42)), next()); } } // ################################################################### // The backpressure / liveness problem. // This is a demonstration of why it may make sense to add // `poll_progress` to `Future`. Doing this allows for higher level // constructs such as `FuturesUnordered` and buffered versions of it // to be built in a more orthogonal way. pub trait ProgressFuture: Future { fn poll_progress( self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<()> { Poll::Ready(()) } } pub trait BufferedFuture: ProgressFuture { fn buffer(self) -> Buffered<Self> where Self: Sized; } pub enum Buffered<Fut> where Fut: Future, { Pending(Fut), Ready(Option<<Fut as Future>::Output>), } impl<Fut> Future for Buffered<Fut> where Fut: Future, { type Output = <Fut as Future>::Output; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Self::Output> { let this = unsafe { self.get_unchecked_mut() }; match this { Buffered::Pending(fut) => { let fut = unsafe { Pin::new_unchecked(fut) }; fut.poll(cx) } Buffered::Ready(None) => Poll::Pending, Buffered::Ready(x) => Poll::Ready(x.take().unwrap()), } } } impl<Fut> ProgressFuture for Buffered<Fut> where Fut: Future, { fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<()> { let this = unsafe { self.get_unchecked_mut() }; let Buffered::Pending(fut) = this else { return Poll::Ready(()); }; let fut = unsafe { Pin::new_unchecked(fut) }; match fut.poll(cx) { Poll::Ready(x) => { *this = Buffered::Ready(Some(x)); Poll::Ready(()) } Poll::Pending => Poll::Pending, } } } impl<Fut> BufferedFuture for Buffered<Fut> where Fut: Future, { fn buffer(self) -> Buffered<Self> where Self: Sized, { Buffered::Pending(self) } } // ################################################################### // `FuturesUnordered` demonstration. extern crate alloc; use alloc::collections::VecDeque; // This is a highly simplified and deoptimized version. pub struct FuturesUnordered<Fut> where Fut: ProgressFuture, { queue: VecDeque<Fut>, } impl<Fut> FuturesUnordered<Fut> where Fut: ProgressFuture, { pub fn len(&self) -> usize { self.queue.len() } pub fn push(&mut self, f: Fut) { self.queue.push_back(f); } } impl<Fut> AsyncIterator for FuturesUnordered<Fut> where Fut: ProgressFuture, { type Item = <Fut as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let (mut i, len) = (0usize, this.queue.len()); while let Some(mut fut) = this.queue.pop_front() { { let fut = unsafe { Pin::new_unchecked(&mut fut) }; match fut.poll(cx) { Poll::Pending => {} Poll::Ready(x) => return NextState::Yielded(x), } } this.queue.push_back(fut); i += 1; let false = i == len else { break }; } NextState::Pending } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut state = ProgressState::ReadyToYield; for fut in this.queue.iter_mut() { let fut = unsafe { Pin::new_unchecked(fut) }; if let Poll::Pending = fut.poll_progress(cx) { state = ProgressState::Pending; } } state } } pub struct BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { stream: St, queue: FuturesUnordered<<St as AsyncIterator>::Item>, max: usize, } impl<St> AsyncIterator for BufferUnordered<St> where St: AsyncIterator, <St as AsyncIterator>::Item: ProgressFuture, { type Item = <<St as AsyncIterator>::Item as Future>::Output; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> NextState<Self::Item> { let this = unsafe { self.get_unchecked_mut() }; let mut empty = false; while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { empty = true; break; } _ => break, } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_next(cx) { NextState::Yielded(x) => NextState::Yielded(x), NextState::Empty if empty => NextState::Empty, _ => NextState::Pending, } } fn poll_progress( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> ProgressState { let this = unsafe { self.get_unchecked_mut() }; let mut ready = false; if this.queue.len() < this.max { while this.queue.len() < this.max { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_next(cx) { NextState::Yielded(fut) => { this.queue.push(fut); } NextState::Empty => { ready = true; break; } _ => break, } } } else { let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; match stream.poll_progress(cx) { ProgressState::ReadyToYield => ready = true, _ => {} } } let queue = unsafe { Pin::new_unchecked(&mut this.queue) }; match queue.poll_progress(cx) { ProgressState::ReadyToYield if ready => { ProgressState::ReadyToYield } _ => ProgressState::Pending, } } } // ################################################################### // Supporting code. pub use nop_waker::*; mod nop_waker { use core::{ future::Future, pin::{pin, Pin}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; pub const NOP_RAWWAKER: RawWaker = { fn nop(_: *const ()) {} const VTAB: RawWakerVTable = RawWakerVTable::new(|_| NOP_RAWWAKER, nop, nop, nop); RawWaker::new(&() as *const (), &VTAB) }; pub const NOP_WAKER: &Waker = &unsafe { Waker::from_raw(NOP_RAWWAKER) }; pub const NOP_CONTEXT: Context<'static> = Context::from_waker(NOP_WAKER); pub fn poll_once<T, F>(f: &mut F) -> Poll<T> where F: Future<Output = T> + ?Sized + Unpin, { let mut cx = NOP_CONTEXT; Pin::new(f).as_mut().poll(&mut cx) } pub fn poll_once_owned<T, F>(f: F) -> Poll<T> where F: Future<Output = T>, { poll_once(&mut pin!(f)) } #[cfg(test)] pub fn poll_once_spawn<T: Send, F>(f: F) -> Poll<T> where F: Future<Output = T> + Send, { std::thread::scope(move |s| { s.spawn(move || poll_once_owned(f)).join().unwrap() }) } } ``` --- # Discussion ## Attendance - People: TC, tmandry, eholk, Daria, CE, Vincenzo, Yosh (20 mins late) ## Meeting roles - Minutes, driver: TC ## AsyncIterator -> futures::Stream backport Daria: There definitelly should be a way to transform `AsyncIterator` into `futures::Stream` (and also tokio's Stream). What downsides of this are there? ## poll_progress (In response to a question...) TC: `poll_progress` is about maintaining liveness. Keeping the plates spinning. But you need to maintain backpressure. `poll_progress` allows you to maintain liveness without having unlimited buffering by separating the liveness concern from having to handle the output.

Import from clipboard

Advanced permission required

Your current role can only read. Ask the system administrator to acquire write and comment permission.

This team is disabled

Sorry, this team is disabled. You can't edit this note.

This note is locked

Sorry, only owner can edit this note.

Reach the limit

Sorry, you've reached the max length this note can be.
Please reduce the content or divide it to more notes, thank you!

Import from Gist

Import from Snippet

or

Export to Snippet

Are you sure?

Do you really want to delete this note?
All users will lose their connection.

Create a note from template

Create a note from template

Oops...
This template is not available.
Upgrade
All
  • All
  • Team
No template found.

Create custom template

Upgrade

Delete template

Do you really want to delete this template?
Turn this template into a regular note and keep its content, versions, and comments.

This page need refresh

You have an incompatible client version.
Refresh to update.
New version available!
See releases notes here
Refresh to enjoy new features.
Your user state has changed.
Refresh to load new user state.

Sign in

Forgot password

or

By clicking below, you agree to our terms of service.

Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
Wallet ( )
Connect another wallet

New to HackMD? Sign up

Help

  • English
  • 中文
  • Français
  • Deutsch
  • 日本語
  • Español
  • Català
  • Ελληνικά
  • Português
  • italiano
  • Türkçe
  • Русский
  • Nederlands
  • hrvatski jezik
  • język polski
  • Українська
  • हिन्दी
  • svenska
  • Esperanto
  • dansk

Documents

Help & Tutorial

How to use Book mode

How to use Slide mode

API Docs

Edit in VSCode

Install browser extension

Get in Touch

Feedback

Discord

Send us email

Resources

Releases

Pricing

Blog

Policy

Terms

Privacy

Cheatsheet

Syntax Example Reference
# Header Header 基本排版
- Unordered List
  • Unordered List
1. Ordered List
  1. Ordered List
- [ ] Todo List
  • Todo List
> Blockquote
Blockquote
**Bold font** Bold font
*Italics font* Italics font
~~Strikethrough~~ Strikethrough
19^th^ 19th
H~2~O H2O
++Inserted text++ Inserted text
==Marked text== Marked text
[link text](https:// "title") Link
![image alt](https:// "title") Image
`Code` Code 在筆記中貼入程式碼
```javascript
var i = 0;
```
var i = 0;
:smile: :smile: Emoji list
{%youtube youtube_id %} Externals
$L^aT_eX$ LaTeX
:::info
This is a alert area.
:::

This is a alert area.

Versions and GitHub Sync
Upgrade to Prime Plan

  • Edit version name
  • Delete

revision author avatar     named on  

More Less

No updates to save
Compare
    Choose a version
    No search result
    Version not found
Sign in to link this note to GitHub
Learn more
This note is not linked with GitHub
 

Feedback

Submission failed, please try again

Thanks for your support.

On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

Please give us some advice and help us improve HackMD.

 

Thanks for your feedback

Remove version name

Do you want to remove this version name and description?

Transfer ownership

Transfer to
    Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

      Link with GitHub

      Please authorize HackMD on GitHub
      • Please sign in to GitHub and install the HackMD app on your GitHub repo.
      • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
      Learn more  Sign in to GitHub

      Push the note to GitHub Push to GitHub Pull a file from GitHub

        Authorize again
       

      Choose which file to push to

      Select repo
      Refresh Authorize more repos
      Select branch
      Select file
      Select branch
      Choose version(s) to push
      • Save a new version and push
      • Choose from existing versions
      Include title and tags
      Available push count

      Upgrade

      Pull from GitHub

       
      File from GitHub
      File from HackMD

      GitHub Link Settings

      File linked

      Linked by
      File path
      Last synced branch
      Available push count

      Upgrade

      Danger Zone

      Unlink
      You will no longer receive notification when GitHub file changes after unlink.

      Syncing

      Push failed

      Push successfully