Skip to main content

Module waiter

Module waiter 

Source
Expand description

Facility to wait for a dynamic set of tasks to complete, with a single waiter and multiple waitees (things that are waited for). Notably, each waitee can also start more work to be waited for.

§Implementation Details

The implementation of waiting in this module is just a wrapper around tokio::sync::mpsc::channel. A WaitGroup holds the unique tokio::sync::mpsc::Receiver and each WaitGuard holds a tokio::sync::mpsc::Sender. Despite this simple implementation, the WaitGroup and WaitGuard wrappers are useful to make this discoverable.

§Example

This example demonstrates use of the WaitGroup and WaitGuard to (very inefficiently) compute the Fibonacci number F(n) using recursive channels.

The given waiter will be used to detect when the work has finished and it will close the channels. Additionally, waiter can be omitted to show that without the WaitGroup, the tasks would not terminate.

async fn fibonacci_waiter_example(n: usize, waiter: Option<(WaitGroup, WaitGuard)>) -> usize {
    let (send, recv) = unbounded_channel();
    let (incr_count, recv_count) = channel(1);

    let (waiter, guard) = match waiter {
        Some((waiter, guard)) => (Some(waiter), Some(guard)),
        None => (None, None),
    };

    let recursive_task = tokio::task::spawn({
        let send = send.clone();
        fibonacci_waiter_example_task(recv, send, incr_count, waiter)
    });

    let count_task = tokio::task::spawn(async move {
        ReceiverStream::new(recv_count).count().await
    });

    send.send((guard, n)).expect("initial send"); // note `guard` must be moved!

    let ((), result) = futures::try_join!(recursive_task, count_task).expect("join");
    result
}

/// An inefficient Fibonacci implementation. This computes `F(n)` by sending
/// by `n-1` and `n-2` back into the channel. This shows how one work item can
/// create multiple subsequent work items.
async fn fibonacci_waiter_example_task(
    recv: UnboundedReceiver<(Option<WaitGuard>, usize)>,
    send: UnboundedSender<(Option<WaitGuard>, usize)>,
    incr_count: Sender<()>,
    waiter: Option<WaitGroup>,
) {
    let stream = UnboundedReceiverStream::new(recv);
    let stream = match waiter {
        Some(waiter) => stream.take_until(waiter.wait()).left_stream(),
        None => stream.right_stream(),
    };

    stream
        .for_each(async |(guard, n)| match n {
            0 => (),
            1 => incr_count.send(()).await.expect("send incr"),
            n => {
                send.send((guard.clone(), n - 1)).expect("send 1");
                send.send((guard.clone(), n - 2)).expect("send 2");
            }
        })
        .await;
}

// basic termination works as expected and computes the right result.
assert_eq!(fibonacci_waiter_example(0, Some(WaitGroup::new())).await, 0);
assert_eq!(fibonacci_waiter_example(9, Some(WaitGroup::new())).await, 34);
assert_eq!(fibonacci_waiter_example(10, Some(WaitGroup::new())).await, 55);

// task does not terminate if WaitGroup is not used, due to recursive channels.
assert!(timeout(fibonacci_waiter_example(9, None)).await.is_err());
// even a "trivial" case does not terminate.
assert!(timeout(fibonacci_waiter_example(0, None)).await.is_err());

// in these tests, we do use a WaitGroup but it doesn't terminate because we
// *clone* the guard and the test function holds an extra guard, blocking
// WaitGroup from returning. this is an example of something that can go wrong
// when using the waiter.
let (waiter, guard) = WaitGroup::new();
assert!(timeout(fibonacci_waiter_example(9, Some((waiter, guard.clone()))))
        .await.is_err());

let (waiter, guard) = WaitGroup::new();
assert!(timeout(fibonacci_waiter_example(0, Some((waiter, guard.clone()))))
        .await.is_err());

Structs§

WaitGroup
Manager for a particular wait group. This can spawn a number of WaitGuards and it can then wait for them to all complete.
WaitGuard
RAII guard held by a task which is being waited for.