Rust by Example: Async Streams
Process asynchronous sequences of data. Learn how to use the Stream trait (the async equivalent of Iterator) to handle data that arrives over time, such as network packets or sensor readings.
Code
use tokio_stream::{self as stream, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
// Create a stream from a vector
// This is like an iterator, but we await each item
let mut stream = stream::iter(vec![1, 2, 3]);
while let Some(num) = stream.next().await {
println!("Got from iter: {}", num);
}
println!("---");
// A more complex stream using 'unfold'
// This generates values asynchronously
let mut ticker = stream::StreamExt::timeout(
stream::iter(vec![10, 20, 30]),
Duration::from_millis(50)
);
// Note: In real code, you'd use the 'async-stream' crate
// or 'futures::stream' for more complex generation.
// Here we simulate a stream of events.
let mut events = stream::iter(0..3).map(|i| {
format!("Event {}", i)
});
while let Some(event) = events.next().await {
// Simulate async work between events
sleep(Duration::from_millis(100)).await;
println!("Processed: {}", event);
}
}Explanation
Streams are the asynchronous equivalent of Iterators. While an Iterator yields T, a Stream yields Poll<Option<T>>. This allows the stream to yield "Pending" if the next item isn't ready yet (e.g., waiting for a network packet).
To use streams, you typically need the tokio-stream crate or the futures crate. The StreamExt trait provides familiar methods like map, filter, and fold, but they work asynchronously.
The most common way to consume a stream is with a while let Some(item) = stream.next().await loop. This effectively "iterates" over the stream, awaiting each item as it becomes available.
Code Breakdown
use tokio_stream::StreamExt. This trait is essential. Without it, you cannot call next(), map(), or other utility methods on a stream.stream::iter(...). Converts a standard iterator into a stream. This is useful for testing or when you have a collection you want to process in an async pipeline.stream.next().await. Retrieves the next item. It returns a Future that resolves to Option<T>. None indicates the stream has ended.map(...). Just like with iterators, streams are lazy. This map operation doesn't run until we actually pull items from the stream in the loop below.
