I want to introduce a way to express sync and async iterator combinators through new syntax.
I explicitly reject the implementation of generalized coroutines for this feature to ensure being able to land this subset of that feature as quickly as possible.
The main impetus for this is that these kind of algorithms are common enough in both library and end user code, writing it by hand can be verbose to express, writing a Stream
requires interaction with Pin
, and transforming an Iterator
implementation into a Stream
is non-trivial. The desire is to reduce the pain of implementing new async iterators, but if the feature is introduced it makes sense to also support sync iterators.
Iterators (both sync and async) are very common for a subset of applications.
Implementating Stream
s also has subtleties that people can make mistakes on, and even when everything is done correctly, the experience isn't great.
The nightly compiler currently has a generator
feature, used by async
/await
, but its surface syntax isn't ideal. It takes over the closure syntax when the user uses yield
inside of them, with no externally visible flagpost on the closure's head, and the way to interact with them is through the Generator
trait
and GeneratorState
enum
.
Some crates have explored exposing similar syntax, with varying levels of success on their ease of use.
gen-iter is a nightly-only crate that exposes a macro-by-example that allows the user to write:
#![feature(generators)]
#![feature(conservative_impl_trait)]
use gen_iter::gen_iter;
fn my_range(start: usize, end: usize) -> impl Iterator<Item = u64> {
gen_iter!({
let mut current = start;
loop {
let prev = current;
current += 1;
yield prev;
if prev >= end {
break;
}
}
})
}
for i in my_range(0, 3) {
println!("{}", i);
}
Propane is a nightly-only crate that exposes proc-macros that allows the user to write Iterator
s and Stream
s by annotating a function or inline in an expression:
#[propane::generator]
fn foo(start: usize, end: usize) -> usize {
for n in start..end {
yield n;
}
}
fn main() {
let mut iter = propane::gen! {
for x in 0..3 {
yield x;
}
};
let mut iter = foo(0, 3);
for _ in iter {}
}
[async-stream] is similar to propane, providing a proc-macro to write Stream
s exclusively, ignoring Iterator
s:
async fn foo() {
let s = stream! {
for i in 0..3 {
yield i;
}
};
pin_mut!(s); // needed for iteration
while let Some(value) = s.next().await {
println!("got {}", value);
}
}
I do not find the conversation about the surface syntax of the feature to be where the most problems might arise. I want to focus on the semantics of the feature. Having said that, because we need some syntax to talk about the feature, I'll be using a strawman syntax for the examples:
fn* ident(arg: ty) yield ty {}
async fn* ident(arg: ty) yield ty {}
which would roughly desugar to:
fn ident(arg: ty) -> impl IntoIterator<Item = yield_ty> {}
fn ident(arg: ty) -> impl IntoAsyncIterator<Item = yield_ty> {}
We can bikeshed the actual syntax at a later time, but the crux of my proposal is to introduce a new type of Item that desugars to a function returning an iterable.
Changing from a sync to async generator should be straighforward to the end user.
I'm explicitly ignoring an expression position generator feature for now to focus on the broader strokes of the design. There's no one-way-door that wouldn't let us add it to the grammar in some way. Open to discuss this today. A rough syntax for it could be, if we don't mind adding keywords:
let iter = async move iterator {
for await x in stream {
yield x;
}
};
for await x in iter {
println!("{:?}", x);
}
yield
point, or "If you like it better put a Pin
on it"The same considerations around Pin
ning that people must have when writing async fn
s are present when writing generators. For example, writing a generator that results in a self-borrowing type, requires that the borrow is Pin
ned:
fn* foo() yields i32 {
let v = vec![1, 2, 3];
for i in &v { // <-- can't do this without pin
yield *i;
}
}
There are a few options here:
Pin
inside of fn*
sPin
the whole generator immediately to avoid compilation issues, but this is overly restrictiveNote that making things "just work" would likely require to also implement some kind of automatic "pin projection" and including some "auto-pinning before iteration".
Writing the following today
use core::pin::Pin;
async fn bar() -> i32 {
1
}
async fn foo() {
let v = vec![bar(), bar(), bar()];
for i in Pin::new(&mut v) {
i.await;
}
}
results in
error[E0277]: `from_generator::GenFuture<[static generator@src/main.rs:2:23: 4:2 {}]>` cannot be unpinned
--> src/main.rs:8:14
|
8 | for i in Pin::new(&mut v) {
| ^^^^^^^^ within `Vec<impl Future>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@src/main.rs:2:23: 4:2 {}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `PhantomData<impl Future>`
= note: required because it appears within the type `Unique<impl Future>`
= note: required because it appears within the type `alloc::raw_vec::RawVec<impl Future>`
= note: required because it appears within the type `Vec<impl Future>`
= note: required by `Pin::<P>::new`
The distance between Iterator
(which doesn't have a self: Pin<&mut Self>
) and Stream
(which does) needs to be accounted for.
Here, you can see an example excercise to compare and contrast the current experience in Python, Rust and the proposed experience.
Given K iterables of sorted "intervals" with two values (start and end), we want to produce a single iterable that merges overlapping intervals.
One possible implementation of a solution to this problem is to split concerns and write two combinators, one to merge overlapping intervals in a sorted iterable, and another to merge a list of sorted iterables that returns a single sorted iterable. In Python such an implementation could look like the following:
# Use tuples as the type for the intervals
intervals = [(1, 4), (3, 6), (8, 10), (9, 11)]
def merge_intervals(input):
"""
Takes a sorted interval iterable (list or generator) and
returns a generator of non-overlapping intervals.
"""
prev = None
for interval in input:
if prev is None:
prev = interval
if prev[1] >= interval[0]:
prev = (prev[0], max(prev[1], interval[1]))
else:
yield prev
prev = interval
if prev is not None:
yield prev
def join_n_intervals(inputs):
"""
Takes a list of sorted interval iterables and returns
a single generator of sorted intervals.
"""
latest = {}
for k, input in enumerate(inputs):
n = next(input, None)
if n is not None:
latest[k] = n
while len(latest.keys()) > 0:
min_interval, pos = None, None
for current_pos, interval in latest.items():
if min_interval is None:
min_interval = interval
pos = current_pos
elif min_interval[0] > interval[0]:
min_interval = interval
pos = current_pos
yield min_interval
interval = next(inputs[pos], None)
if interval is None:
del latest[pos]
else:
latest[pos] = interval
intervals = [
(i for i in [(1, 4), (8, 10), (13, 29)]),
(i for i in [(3, 6), (8, 10), (9, 11), (25, 50)]),
(i for i in [(3, 9), ]),
]
print(list(merge_intervals(sort_n_intervals(intervals))))
The same algorithm written in stable Rust today, would look the
following way:
use std::cmp::max;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Interval { start: usize, end: usize, }
fn i(start: usize, end: usize) -> Interval { Interval { start, end, } }
struct MergeIntervals<I: Iterator<Item = Interval>> {
input: I,
prev: Option<Interval>,
}
impl<I: Iterator<Item = Interval>> Iterator for MergeIntervals<I> {
type Item = Interval;
fn next(&mut self) -> Option<Interval> {
loop {
let next = match self.input.next() {
Some(next) => next,
None => {
let prev = self.prev;
self.prev = None;
return prev;
}
};
let prev = self.prev.unwrap_or(next);
if prev.end >= next.start {
self.prev = Some(Interval {
start: prev.start,
end: max(prev.end, next.end),
});
} else {
self.prev = Some(next);
return Some(prev);
}
}
}
}
fn merge_intervals(mut input: impl Iterator<Item = Interval>) -> impl Iterator<Item = Interval> {
let prev = input.next();
MergeIntervals {
input,
prev,
}
}
struct JoinIter<I: Iterator<Item = Interval>> {
inputs: Vec<I>,
latest: Vec<Option<Interval>>,
current_min: Option<(usize, Interval)>,
}
impl<I: Iterator<Item = Interval>> Iterator for JoinIter<I> {
type Item = Interval;
fn next(&mut self) -> Option<Interval> {
for (k, input) in self.latest.iter().enumerate() {
match (input, self.current_min) {
(Some(interval), Some((_, min))) => {
if min.start > interval.start {
self.current_min = Some((k, *interval));
}
}
(Some(interval), None) => {
self.current_min = Some((k, *interval));
}
(None, _) => {}
}
}
match self.current_min {
Some((k, current)) => {
self.latest[k] = self.inputs[k].next();
self.current_min = None;
Some(current)
}
None => None,
}
}
}
fn join_n_intervals<I: Iterator<Item = Interval>>(
mut inputs: Vec<I>,
) -> impl Iterator<Item = Interval> {
let latest = inputs.iter_mut().map(|i| i.next()).collect();
JoinIter {
inputs,
latest,
current_min: None,
}
}
fn main() {
let intervals = vec![
vec![i(1, 4), i(8, 10), i(13, 29)].into_iter(),
vec![i(3, 6), i(8, 10), i(9, 11), i(25, 50)].into_iter(),
vec![i(3, 9), ].into_iter(),
];
println!("{:?}", merge_intervals(join_n_intervals(intervals)).collect::<Vec<_>>());
}
Besides the verbosity difference, the approach is roughly the same. The main difference lies in having to define a new struct
for each iterator that we desired to construct, as well as an appropriate Iterator
implementation. The requirement of having to explicitly define what data will continue living from one iteration to the next isn't insurmountable, but it is there.
Under (one possible) end state of the current iterator item feature, the same algorithm would look like this:
use std::cmp::max;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Interval { start: usize, end: usize, }
fn i(start: usize, end: usize) -> Interval { Interval { start, end, } }
fn* merge_intervals(mut input: impl Iterator<Item = Interval>) yield Interval {
let mut prev = input.next()?;
while let Some(interval) = input.next() {
if prev.end >= interval.start {
prev.end = max(prev.end, interval.end);
} else {
yield prev;
prev = interval;
}
}
yield prev;
}
fn* join_n_intervals<I: Iterator<Item = Interval>>(mut inputs: Vec<I>) yield Interval {
let mut latest = inputs.iter_mut().map(|i| i.next()).collect();
let mut current_min = latest.iter().enumerate().filter_map(|(pos, i)| i.map(|i| (pos, i))).next()?;
loop {
let mut any = false;
while let Some((k, input)) in latest.iter().enumerate().next() {
if let Some(interval) = input {
any = true;
if current_min.1.start > interval.start {
current_min = Some((k, *interval));
}
}
}
if !any {
break;
}
yield current_min.1;
latest[current_min.0] = inputs[current_min.0].next();
}
}
fn main() {
let intervals = vec![
vec![i(1, 4), i(8, 10), i(13, 29)].into_iter(),
vec![i(3, 6), i(8, 10), i(9, 11), i(25, 50)].into_iter(),
vec![i(3, 9), ].into_iter(),
];
println!("{:?}", merge_intervals(join_n_intervals(intervals)).collect::<Vec<_>>());
}
The most obvious change is that the iterator backing struct
s are implicit. This is an analogous situation to closures and async fn
s: you can always represent the equivalent of a closure or async fn
by hand, with a backing struct
that holds the closed over values or the values retained accross yield points.
feature(generators)
Under the current nightly-only generators feature, the same algorithm can be expressed, but can't be used as nicely, principaly due to the lack of a [generator yield X]: IntoIterator<Item = X>
implementation.
#![feature(generators, generator_trait)]
use std::cmp::max;
use std::ops::{Generator, GeneratorState};
use std::pin::Pin;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Interval { start: usize, end: usize, }
fn i(start: usize, end: usize) -> Interval { Interval { start, end, } }
fn merge_intervals(mut input: impl Iterator<Item = Interval>) -> impl Generator<Yield = Interval> {
move || {
if let Some(mut prev) = input.next() {
while let Some(interval) = input.next() {
if prev.end >= interval.start {
prev.end = max(prev.end, interval.end);
} else {
yield prev;
prev = interval;
}
}
yield prev;
}
}
}
fn join_n_intervals<I: Iterator<Item = Interval>>(mut inputs: Vec<I>) -> impl Generator<Yield = Interval> {
move || {
let mut latest: Vec<Option<Interval>> = inputs.iter_mut().map(|i| i.next()).collect();
let mut current_min = latest.iter().enumerate().filter_map(|(pos, i)| i.map(|i| (pos, i))).next().unwrap();
loop {
let mut any = false;
while let Some((k, input)) = latest.iter().enumerate().next() {
if let Some(interval) = input {
any = true;
if current_min.1.start > interval.start {
current_min = (k, *interval);
}
}
}
if !any {
break;
}
yield current_min.1;
latest[current_min.0] = inputs[current_min.0].next();
}
}
}
fn main() {
let intervals = vec![
vec![i(1, 4), i(8, 10), i(13, 29)].into_iter(),
vec![i(3, 6), i(8, 10), i(9, 11), i(25, 50)].into_iter(),
vec![i(3, 9), ].into_iter(),
];
let mut gen = merge_intervals(vec![i(3, 6), i(8, 10), i(9, 11), i(25, 50)].into_iter());
loop {
match Pin::new(&mut gen).resume(()) {
GeneratorState::Yielded(interval) => println!("{:?}", interval),
GeneratorState::Complete(_) => break,
}
}
}
That lack of easy conversion of a [generator]
into an Iterator
also makes it harder to compose these functions neatly, not only how to consume them.
If we continued with the current surface for generalized generators, using them in for
and while let
loops would require either extra syntactic sugar or extra method calls to explicitly convert them to somethng iterable.
The following implementation is taken from async_std
use core::fmt;
use core::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream that takes items from two other streams simultaneously.
///
/// This `struct` is created by the [`zip`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`zip`]: trait.Stream.html#method.zip
/// [`Stream`]: trait.Stream.html
pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>,
#[pin]
first: A,
#[pin]
second: B,
}
}
impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Zip")
.field("first", &self.first)
.field("second", &self.second)
.finish()
}
}
impl<A: Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self {
Self {
item_slot: None,
first,
second,
}
}
}
impl<A: Stream, B: Stream> Stream for Zip<A, B> {
type Item = (A::Item, B::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if this.item_slot.is_none() {
match this.first.poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(item)) => *this.item_slot = Some(item),
}
}
let second_item = futures_core::ready!(this.second.poll_next(cx));
let first_item = this.item_slot.take().unwrap();
Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
}
}
Under the proposed final behavior, the current will
async fn* zip<A: Stream, B: Stream>(a: A, b: B) yield (A::Item, B::Item) {
let mut a_item = a.next();
let mut b_item = b.next();
loop {
match join!(a_item, b_item).await {
(Some(a), Some(b)) => {
yield (a, b);
a_item = a.next();
b_item = b.next();
}
_ => break,
}
}
}
And this is how it can be used:
async fn main() {
let mut s = zip(foo(), bar());
pin_mut!(s); // let mut s = Pin::new_unchecked(&mut s);
while let Some((a, b)) = s.next().await {
println!("{:?} {:?}", a, b);
}
}
miniredis uses async-stream
instead of hand-crafting a Stream
:
// Subscribe to the channel.
let rx = Box::pin(async_stream::stream! {
loop {
match rx.recv().await {
Ok(msg) => yield msg,
// If we lagged in consuming messages, just resume.
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(_) => break,
}
}
});
As you can see in the previous examples, iterating over async iterators today requires explicit pinning:
async fn main() {
let mut s = zip(foo(), bar());
pin_mut!(s); // let mut s = Pin::new_unchecked(&mut s);
while let Some((a, b)) = s.next().await {
println!("{:?} {:?}", a, b);
}
}
For futures, f.await
evades the need to pin explicitly by making pinning part of the await desugaring:
// .await: `fn lower_expr_await`
unsafe {
::std::future::Future::poll(
::std::pin::Pin::new_unchecked(&mut pinned),
::std::future::get_context(task_context),
)
}
Although not the subject of this design meeting, conceivably we could add an await loop in the future that would incorporate pinning:
async fn main() {
for await (a, b) in zip(foo(), bar()) { // <--- the stream is pinned here!
println!("{:?} {:?}", a, b);
}
// or for (a, b).wait in zip(..)...
}
one could also imagine a comparable while await
syntax:
while await let Some((a, b)) in zip(foo(), bar()) { // <--- the stream is pinned here!
println!("{:?} {:?}", a, b);
}
Pin
ning on iteration consumptionAsync loop expressions could potentially auto-desugar to using Pin::new_unchecked(&mut stream)
.
()
.In a pre-meeting discussion, @cramertj showed the following alternative that people can write today:
#[pin_project]
struct GeneratorStream<Fut: Future<Output = ()>, Item> {
#[pin]
fut: Fuse<Fut>,
receiver: Receiver<Item>,
}
impl<Fut: Future<Output = ()>, Item> Stream for GeneratorStream<Fut, Item> {
type Item = Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Ok(item) = this.receiver.try_next() {
return Poll::Ready(item);
}
let _ = this.fut.poll(cx);
this.receiver.poll_next_unpin(cx)
}
}
// Users can just write this:
fn my_stream() -> impl Stream<Item = u8> {
let (mut sender, receiver) = channel(0);
let fut = async move {
for i in 0..100 {
sender.send(i).await.unwrap();
}
};
GeneratorStream { fut: fut.fuse(), receiver: receiver }
}
// Just for example
fn main() {
let s = my_stream();
futures::pin_mut!(s);
for i in futures::executor::block_on_stream(s) {
println!("{}", i);
}
println!("done")
}
u32
) or impl Iterator<Item=u32>
(or yields u32
(cramertj)). (josh)
yield
or yields
or some other way of expressing the Item
in the syntax is to allow the potential future extension for return types/values: async fn* foo(_: i32) yield i32 -> Result<i32, ()> {...}
// Some other syntaxes I toyed with.
async gen foo(_: i32) yields i32 -> Result<i32, ()> {...}
async iterator foo(_: i32) where yield = i32 -> Result<i32, ()> {...}
impl Iterator<Item=Result<u32, SomeError>>
and returning Result<impl Iterator<Item=u32>, SomeError>
? Or some other combination? That would be much easier if we spell out the return type. Related: how do we smoothly handle errors? Whatever first-class syntax we use for iteration should allow bubbling up Result
, which means it should have a natural place for a ?
in it somewhere.propane
or some other procedural macro? My biggest concern is generally that Iterator
is not pinned but "async iterator" (stream) is, which means that one can borrows and one can't. I don't see what can be done about this, but I'm thinking about whether it's likely to come up. (I believe propane has this same disparity.) (nikomatsakis)for await
– iterating over Streams one element at a time is usually not what you want (cramertj)
for i await in foo() {
if i < 3 {
yield i * 2;
}
}
// which could instead be
foo().filter_map(|i| if i<3 {Some(i * 2)} else {None})
// but then we are restricted in control flow and ownership,
// the same situation why you would use a `for` loop today
// instead of `filter_map` on an `Iterator`.
()
iter::from_fn
could be used to lighten the verbosity (somewhat)
from_fn
doesn't work as wellPin
is one of the biggest pain points that people have when learning async-await
impl Stream
today if you don't mind that the resulting stream has some restrictions
Xyz
for which I need an XyzIter
)Debug
impl, or make it Clone
, things like that, make it nameable. Those are things I can't do if I don't write it by hand.Debug
, but it sounds like that may be a requirement for the syntax (e.g., being able to name the type that is being produced and then add methods, impls, etc)Iterator
just don't exist in C#.fn ident
return an IntoIterator
rather than an Iterator
? Is this for future-compatibility with more expansive Generator types that are not themselves iterators, but which can be converted into iterators? (cramertj)
impl Iterator
we have less freedom to change things.impl Iterator
doesn't prevent naming it elsewhere. Or we could give a syntax for naming it.into_iter
just to compose iterators (like zip)Itertools
that return IntoIterator
that you need to store somewhere so that the Iterator
can return borrows into it, but maybe -> impl LendingIterator
would avoid all those problems.Iterator
methods that are important for some situations, like size_hint
or fold
? Providing a useful size_hint
seems fundamentally hard? Providing a fold
might be particularly nice, though – and maybe a try_fold
could be made to work? Interesting that one of the examples has the next
implementation using a loop
, which reminds me of how next
is always try_for_each(Err).err()
… (scott)
size_hint
?-> impl Iterator
, you lose ExactSize
etcIterator
methods that are important for some situations, like size_hint
or fold
? Providing a useful size_hint
seems fundamentally hard? Providing a fold
might be particularly nice, though – and maybe a try_fold
could be made to work? Interesting that one of the examples has the next
implementation using a loop
, which reminds me of how next
is always try_for_each(Err).err()
… (scott)
impl
block?DoubleEndedIterator
or ExactSizeIterator
or …A generator function could return an iterator type, which may be either an anonymous iterator or a named iterator type. In the latter case, it'd also be an option to implement further functions (e.g. size_hint
) on the named type.
gen fn xyz(...) -> impl Iterator<Item=u32> { ... }
gen fn abc(...) -> MyIterator { ... }
Esteban: this could be done if generators are blocks: fn abc(…) -> impl Iterator { gen { yield 1; } }
cramertj: but how do you access captured values?
pnkfelix: something something zen ;) doesn't seem like an unsolvable problem to give access to state
josh: we could "invert" the flow of syntax; have a way to make an impl of iterator for a type where you don't have to handwrite the next function
Queued question (retroactively inserted): Could it be easier to do a version of this that's an expression instead of the function-level? Like how we're having an easier time deciding try{}
expressions instead of try fn
? There may be some pinning complexity with a block, though…
scott: would an expression form avoid some of the questions?
josh: I find only having a block syntax (and not a function syntax) appealing.
niko: it doesn't solve the size-hint problem and you don't get to use
cramertj: sidesteps bikeshed
josh: gives you just enough syntax support to let you write the iterator block, then I can use a proc macro that results in defining the myiterator type and lets you define size-hint
niko: you can do that with a fn item
josh: having "just enough" syntax support does allow for more experimentation in the ecosystem with the top-level syntax
cramertj: I agree with that, and it could help avoid some syntax questions
josh: a block syntax solves some fraction of the "painful to write an iterator or stream" and anything beyond it is additional ergonomic and expressiveness improvement
cramertj: not totally convinced, still need some way to annotate the return type
scottmcm: not sure how I would store the generator etc
estebank: is size-hint about "base" iterators or propagating size hints from other iterators? the latter might be handleable..
josh: for the sake of completeness, it's possible in some cases for the compiler to infer and write a size_hint
function, if the iteration is "obvious" (e.g. for x in other_iter { ... yield ... }
with no break
we could infer and propagate.)
Queued question: I'm not sure I understand the bit about "auto-Pin
-ing inside gen fn
/fn*
." I would expect the resulting generator to need to be pinned, not any value inside the generator itself. (cramertj)
yield
ing borrows into the stream state is out of scope, right? (cramertj)