Expand description
Facility to wait for a dynamic set of tasks to complete, with a single waiter and multiple waitees (things that are waited for). Notably, each waitee can also start more work to be waited for.
§Implementation Details
The implementation of waiting in this module is just a wrapper around
tokio::sync::mpsc::channel. A WaitGroup holds the unique
tokio::sync::mpsc::Receiver and each WaitGuard holds a
tokio::sync::mpsc::Sender. Despite this simple implementation, the
WaitGroup and WaitGuard wrappers are useful to make this discoverable.
§Example
This example demonstrates use of the WaitGroup and WaitGuard to
(very inefficiently) compute the Fibonacci number F(n) using recursive channels.
The given waiter will be used to detect when the work has finished and it will
close the channels. Additionally, waiter can be omitted to show that without
the WaitGroup, the tasks would not terminate.
async fn fibonacci_waiter_example(n: usize, waiter: Option<(WaitGroup, WaitGuard)>) -> usize {
let (send, recv) = unbounded_channel();
let (incr_count, recv_count) = channel(1);
let (waiter, guard) = match waiter {
Some((waiter, guard)) => (Some(waiter), Some(guard)),
None => (None, None),
};
let recursive_task = tokio::task::spawn({
let send = send.clone();
fibonacci_waiter_example_task(recv, send, incr_count, waiter)
});
let count_task = tokio::task::spawn(async move {
ReceiverStream::new(recv_count).count().await
});
send.send((guard, n)).expect("initial send"); // note `guard` must be moved!
let ((), result) = futures::try_join!(recursive_task, count_task).expect("join");
result
}
/// An inefficient Fibonacci implementation. This computes `F(n)` by sending
/// by `n-1` and `n-2` back into the channel. This shows how one work item can
/// create multiple subsequent work items.
async fn fibonacci_waiter_example_task(
recv: UnboundedReceiver<(Option<WaitGuard>, usize)>,
send: UnboundedSender<(Option<WaitGuard>, usize)>,
incr_count: Sender<()>,
waiter: Option<WaitGroup>,
) {
let stream = UnboundedReceiverStream::new(recv);
let stream = match waiter {
Some(waiter) => stream.take_until(waiter.wait()).left_stream(),
None => stream.right_stream(),
};
stream
.for_each(async |(guard, n)| match n {
0 => (),
1 => incr_count.send(()).await.expect("send incr"),
n => {
send.send((guard.clone(), n - 1)).expect("send 1");
send.send((guard.clone(), n - 2)).expect("send 2");
}
})
.await;
}
// basic termination works as expected and computes the right result.
assert_eq!(fibonacci_waiter_example(0, Some(WaitGroup::new())).await, 0);
assert_eq!(fibonacci_waiter_example(9, Some(WaitGroup::new())).await, 34);
assert_eq!(fibonacci_waiter_example(10, Some(WaitGroup::new())).await, 55);
// task does not terminate if WaitGroup is not used, due to recursive channels.
assert!(timeout(fibonacci_waiter_example(9, None)).await.is_err());
// even a "trivial" case does not terminate.
assert!(timeout(fibonacci_waiter_example(0, None)).await.is_err());
// in these tests, we do use a WaitGroup but it doesn't terminate because we
// *clone* the guard and the test function holds an extra guard, blocking
// WaitGroup from returning. this is an example of something that can go wrong
// when using the waiter.
let (waiter, guard) = WaitGroup::new();
assert!(timeout(fibonacci_waiter_example(9, Some((waiter, guard.clone()))))
.await.is_err());
let (waiter, guard) = WaitGroup::new();
assert!(timeout(fibonacci_waiter_example(0, Some((waiter, guard.clone()))))
.await.is_err());