Files
Sprimo/skills/m07-concurrency/patterns/async-patterns.md
2026-02-12 22:58:33 +08:00

410 lines
8.1 KiB
Markdown

# Async Patterns in Rust
## Task Spawning
### Basic Spawn
```rust
use tokio::task;
#[tokio::main]
async fn main() {
// Spawn a task that runs concurrently
let handle = task::spawn(async {
expensive_computation().await
});
// Do other work while task runs
other_work().await;
// Wait for result
let result = handle.await.unwrap();
}
```
### Spawn with Shared State
```rust
use std::sync::Arc;
use tokio::sync::Mutex;
async fn process_with_state() {
let state = Arc::new(Mutex::new(vec![]));
let handles: Vec<_> = (0..10)
.map(|i| {
let state = Arc::clone(&state);
tokio::spawn(async move {
let mut guard = state.lock().await;
guard.push(i);
})
})
.collect();
// Wait for all tasks
for handle in handles {
handle.await.unwrap();
}
}
```
---
## Select Pattern
### Racing Multiple Futures
```rust
use tokio::select;
use tokio::time::{sleep, Duration};
async fn first_response() {
select! {
result = fetch_from_server_a() => {
println!("A responded first: {:?}", result);
}
result = fetch_from_server_b() => {
println!("B responded first: {:?}", result);
}
}
}
```
### Select with Timeout
```rust
use tokio::time::timeout;
async fn with_timeout() -> Result<Data, Error> {
select! {
result = fetch_data() => result,
_ = sleep(Duration::from_secs(5)) => {
Err(Error::Timeout)
}
}
}
// Or use timeout directly
async fn with_timeout2() -> Result<Data, Error> {
timeout(Duration::from_secs(5), fetch_data())
.await
.map_err(|_| Error::Timeout)?
}
```
### Select with Channel
```rust
use tokio::sync::mpsc;
async fn process_messages(mut rx: mpsc::Receiver<Message>) {
loop {
select! {
Some(msg) = rx.recv() => {
handle_message(msg).await;
}
_ = tokio::signal::ctrl_c() => {
println!("Shutting down...");
break;
}
}
}
}
```
---
## Channel Patterns
### MPSC (Multi-Producer, Single-Consumer)
```rust
use tokio::sync::mpsc;
async fn producer_consumer() {
let (tx, mut rx) = mpsc::channel(100);
// Spawn producers
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(format!("Message from {}", i)).await.unwrap();
});
}
// Drop original sender so channel closes
drop(tx);
// Consume
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}
```
### Oneshot (Single-Shot Response)
```rust
use tokio::sync::oneshot;
async fn request_response() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = compute_something().await;
tx.send(result).unwrap();
});
// Wait for response
let response = rx.await.unwrap();
}
```
### Broadcast (Multi-Consumer)
```rust
use tokio::sync::broadcast;
async fn pub_sub() {
let (tx, _) = broadcast::channel(16);
// Subscribe multiple consumers
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("Consumer 1: {}", msg);
}
});
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("Consumer 2: {}", msg);
}
});
// Publish
tx.send("Hello").unwrap();
}
```
### Watch (Single Latest Value)
```rust
use tokio::sync::watch;
async fn config_updates() {
let (tx, mut rx) = watch::channel(Config::default());
// Consumer watches for changes
tokio::spawn(async move {
while rx.changed().await.is_ok() {
let config = rx.borrow();
apply_config(&config);
}
});
// Update config
tx.send(Config::new()).unwrap();
}
```
---
## Structured Concurrency
### JoinSet for Task Groups
```rust
use tokio::task::JoinSet;
async fn parallel_fetch(urls: Vec<String>) -> Vec<Result<Response, Error>> {
let mut set = JoinSet::new();
for url in urls {
set.spawn(async move {
fetch(&url).await
});
}
let mut results = vec![];
while let Some(res) = set.join_next().await {
results.push(res.unwrap());
}
results
}
```
### Scoped Tasks (no 'static)
```rust
// Using tokio-scoped or async-scoped crate
use async_scoped::TokioScope;
async fn scoped_example(data: &[u32]) {
let results = TokioScope::scope_and_block(|scope| {
for item in data {
scope.spawn(async move {
process(item).await
});
}
});
}
```
---
## Cancellation Patterns
### Using CancellationToken
```rust
use tokio_util::sync::CancellationToken;
async fn cancellable_task(token: CancellationToken) {
loop {
select! {
_ = token.cancelled() => {
println!("Task cancelled");
break;
}
_ = do_work() => {
// Continue working
}
}
}
}
async fn main_with_cancellation() {
let token = CancellationToken::new();
let task_token = token.clone();
let handle = tokio::spawn(cancellable_task(task_token));
// Cancel after some condition
tokio::time::sleep(Duration::from_secs(5)).await;
token.cancel();
handle.await.unwrap();
}
```
### Graceful Shutdown
```rust
async fn serve_with_shutdown(shutdown: impl Future) {
let server = TcpListener::bind("0.0.0.0:8080").await.unwrap();
loop {
select! {
Ok((socket, _)) = server.accept() => {
tokio::spawn(handle_connection(socket));
}
_ = &mut shutdown => {
println!("Shutting down...");
break;
}
}
}
}
#[tokio::main]
async fn main() {
let ctrl_c = async {
tokio::signal::ctrl_c().await.unwrap();
};
serve_with_shutdown(ctrl_c).await;
}
```
---
## Backpressure Patterns
### Bounded Channels
```rust
use tokio::sync::mpsc;
async fn with_backpressure() {
// Buffer of 10 - producers will wait if full
let (tx, mut rx) = mpsc::channel(10);
let producer = tokio::spawn(async move {
for i in 0..1000 {
// This will wait if channel is full
tx.send(i).await.unwrap();
}
});
let consumer = tokio::spawn(async move {
while let Some(item) = rx.recv().await {
// Slow consumer
tokio::time::sleep(Duration::from_millis(10)).await;
process(item);
}
});
let _ = tokio::join!(producer, consumer);
}
```
### Semaphore for Rate Limiting
```rust
use tokio::sync::Semaphore;
use std::sync::Arc;
async fn rate_limited_requests(urls: Vec<String>) {
let semaphore = Arc::new(Semaphore::new(10)); // max 10 concurrent
let handles: Vec<_> = urls
.into_iter()
.map(|url| {
let sem = Arc::clone(&semaphore);
tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
fetch(&url).await
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
}
```
---
## Error Handling in Async
### Propagating Errors
```rust
async fn fetch_and_parse(url: &str) -> Result<Data, Error> {
let response = fetch(url).await?;
let data = parse(response).await?;
Ok(data)
}
```
### Handling Task Panics
```rust
async fn robust_spawn() {
let handle = tokio::spawn(async {
risky_operation().await
});
match handle.await {
Ok(result) => println!("Success: {:?}", result),
Err(e) if e.is_panic() => {
println!("Task panicked: {:?}", e);
}
Err(e) => {
println!("Task cancelled: {:?}", e);
}
}
}
```
### Try-Join for Multiple Results
```rust
use tokio::try_join;
async fn fetch_all() -> Result<(A, B, C), Error> {
// All must succeed, or first error returned
try_join!(
fetch_a(),
fetch_b(),
fetch_c(),
)
}
```