BudiBadu Logo
Samplebadu

Rust by Example: Async Channels

Rust 1.75+

Coordinate asynchronous tasks using Tokio's channels. This example shows how to implement message passing between async tasks, handling backpressure and concurrency gracefully.

Code

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a bounded channel with capacity 32
    // If the channel is full, sending will await (suspend)
    let (tx, mut rx) = mpsc::channel(32);
    
    // Spawn a producer task
    let tx2 = tx.clone();
    tokio::spawn(async move {
        for i in 0..5 {
            let msg = format!("Message {}", i);
            tx2.send(msg).await.unwrap();
        }
    });

    // Spawn another producer
    tokio::spawn(async move {
        tx.send("One-off message".to_string()).await.unwrap();
    });

    // Consume messages
    // recv() is async and returns Option<T>
    // None means all senders have closed
    while let Some(message) = rx.recv().await {
        println!("GOT: {}", message);
    }
    
    println!("All senders dropped, channel closed.");
}

Explanation

Just as threads communicate via channels, async tasks communicate via Async Channels. The tokio::sync::mpsc channel is designed for async contexts. The key difference from the standard library channel is that the send method is async (.await). If the channel buffer is full, send will yield the task until space becomes available, rather than blocking the thread. This mechanism is called Backpressure.

Tokio provides several channel types: mpsc (multi-producer, single-consumer), oneshot (single value), broadcast (multi-producer, multi-consumer), and watch (single value that changes over time). Choosing the right channel is important for performance and correctness.

Note that rx.recv() is also async. It yields if the channel is empty but not closed. It returns None only when all tx handles have been dropped, signaling that no more messages will ever arrive.

Code Breakdown

7
mpsc::channel(32). Creates a bounded channel. Bounded channels are preferred in async code to provide backpressure. If the receiver is slow, the senders will eventually be suspended, preventing memory exhaustion.
13
tx2.send(msg).await. We must await the send operation. If the buffer (32 items) was full, this line would pause execution of this task until the receiver clears some space.
25
while let Some(message) = rx.recv().await. A common pattern to drain a channel. It keeps looping as long as recv() returns Some.
29
println!(...). This line runs only after the loop finishes, which happens when all senders (tx and tx2) are dropped.