lychee_lib/waiter.rs
1//! Facility to wait for a dynamic set of tasks to complete, with a single
2//! waiter and multiple waitees (things that are waited for). Notably, each
3//! waitee can also start more work to be waited for.
4//!
5//! # Implementation Details
6//!
7//! The implementation of waiting in this module is just a wrapper around
8//! [`tokio::sync::mpsc::channel`]. A [`WaitGroup`] holds the unique
9//! [`tokio::sync::mpsc::Receiver`] and each [`WaitGuard`] holds a
10//! [`tokio::sync::mpsc::Sender`]. Despite this simple implementation, the
11//! [`WaitGroup`] and [`WaitGuard`] wrappers are useful to make this discoverable.
12//!
13//! # Example
14//!
15//! This example demonstrates use of the [`WaitGroup`] and [`WaitGuard`] to
16//! (very inefficiently) compute the Fibonacci number `F(n)` using recursive channels.
17//!
18//! The given `waiter` will be used to detect when the work has finished and it will
19//! close the channels. Additionally, `waiter` can be omitted to show that without
20//! the [`WaitGroup`], the tasks would not terminate.
21//!
22//! ```rust
23//! # use lychee_lib::waiter::{WaitGuard, WaitGroup};
24//! # use futures::StreamExt;
25//! # use tokio::sync::mpsc::{Receiver, Sender, channel};
26//! # use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
27//! # use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
28//! #
29//! # use std::time::Duration;
30//! #
31//! # fn timeout<F: IntoFuture>(fut: F) -> tokio::time::Timeout<F::IntoFuture> {
32//! # tokio::time::timeout(Duration::from_millis(250), fut)
33//! # }
34//! #
35//! # #[tokio::main]
36//! # async fn main() {
37//! async fn fibonacci_waiter_example(n: usize, waiter: Option<(WaitGroup, WaitGuard)>) -> usize {
38//! let (send, recv) = unbounded_channel();
39//! let (incr_count, recv_count) = channel(1);
40//!
41//! let (waiter, guard) = match waiter {
42//! Some((waiter, guard)) => (Some(waiter), Some(guard)),
43//! None => (None, None),
44//! };
45//!
46//! let recursive_task = tokio::task::spawn({
47//! let send = send.clone();
48//! fibonacci_waiter_example_task(recv, send, incr_count, waiter)
49//! });
50//!
51//! let count_task = tokio::task::spawn(async move {
52//! ReceiverStream::new(recv_count).count().await
53//! });
54//!
55//! send.send((guard, n)).expect("initial send"); // note `guard` must be moved!
56//!
57//! let ((), result) = futures::try_join!(recursive_task, count_task).expect("join");
58//! result
59//! }
60//!
61//! /// An inefficient Fibonacci implementation. This computes `F(n)` by sending
62//! /// by `n-1` and `n-2` back into the channel. This shows how one work item can
63//! /// create multiple subsequent work items.
64//! async fn fibonacci_waiter_example_task(
65//! recv: UnboundedReceiver<(Option<WaitGuard>, usize)>,
66//! send: UnboundedSender<(Option<WaitGuard>, usize)>,
67//! incr_count: Sender<()>,
68//! waiter: Option<WaitGroup>,
69//! ) {
70//! let stream = UnboundedReceiverStream::new(recv);
71//! let stream = match waiter {
72//! Some(waiter) => stream.take_until(waiter.wait()).left_stream(),
73//! None => stream.right_stream(),
74//! };
75//!
76//! stream
77//! .for_each(async |(guard, n)| match n {
78//! 0 => (),
79//! 1 => incr_count.send(()).await.expect("send incr"),
80//! n => {
81//! send.send((guard.clone(), n - 1)).expect("send 1");
82//! send.send((guard.clone(), n - 2)).expect("send 2");
83//! }
84//! })
85//! .await;
86//! }
87//!
88//! // basic termination works as expected and computes the right result.
89//! assert_eq!(fibonacci_waiter_example(0, Some(WaitGroup::new())).await, 0);
90//! assert_eq!(fibonacci_waiter_example(9, Some(WaitGroup::new())).await, 34);
91//! assert_eq!(fibonacci_waiter_example(10, Some(WaitGroup::new())).await, 55);
92//!
93//! // task does not terminate if WaitGroup is not used, due to recursive channels.
94//! assert!(timeout(fibonacci_waiter_example(9, None)).await.is_err());
95//! // even a "trivial" case does not terminate.
96//! assert!(timeout(fibonacci_waiter_example(0, None)).await.is_err());
97//!
98//! // in these tests, we do use a WaitGroup but it doesn't terminate because we
99//! // *clone* the guard and the test function holds an extra guard, blocking
100//! // WaitGroup from returning. this is an example of something that can go wrong
101//! // when using the waiter.
102//! let (waiter, guard) = WaitGroup::new();
103//! assert!(timeout(fibonacci_waiter_example(9, Some((waiter, guard.clone()))))
104//! .await.is_err());
105//!
106//! let (waiter, guard) = WaitGroup::new();
107//! assert!(timeout(fibonacci_waiter_example(0, Some((waiter, guard.clone()))))
108//! .await.is_err());
109//! # }
110//! ```
111
112use futures::never::Never;
113use tokio::sync::mpsc::{Receiver, Sender, channel};
114
115/// Manager for a particular wait group. This can spawn a number of [`WaitGuard`]s
116/// and it can then wait for them to all complete.
117///
118/// Each [`WaitGroup`] is single-use—calling [`WaitGroup::wait`] to start
119/// waiting consumes the [`WaitGroup`]. Additionally, once all [`WaitGuard`]s
120/// have been dropped, it is not possible to create any more [`WaitGuard`]s.
121#[derive(Debug)]
122pub struct WaitGroup {
123 /// [`Receiver`] is held to wait for multiple [`Sender`]s and detect
124 /// when they have closed. The [`Never`] type means no value can/will
125 /// ever be received through the channel.
126 recv: Receiver<Never>,
127}
128
129/// RAII guard held by a task which is being waited for.
130///
131/// The existence of values of this type represents outstanding work for
132/// its corresponding [`WaitGroup`].
133///
134/// A [`WaitGuard`] can be cloned using [`WaitGuard::clone`]. This allows
135/// a task to spawn additional tasks, recursively.
136#[derive(Clone, Debug)]
137pub struct WaitGuard {
138 /// [`Sender`] is held to keep the [`Receiver`] end open (stored in [`WaitGroup`]).
139 /// The dropping of all senders will cause the receiver to detect and close.
140 /// The [`Never`] type means no value can/will ever be sent through the channel.
141 _send: Sender<Never>,
142}
143
144impl WaitGroup {
145 /// Creates a new [`WaitGroup`] and its first associated [`WaitGuard`].
146 ///
147 /// Note that [`WaitGroup`] itself has no ability to create new guards.
148 /// If needed, new guards should be created by cloning the returned [`WaitGuard`].
149 #[must_use]
150 pub fn new() -> (Self, WaitGuard) {
151 let (send, recv) = channel(1);
152 (Self { recv }, WaitGuard { _send: send })
153 }
154
155 /// Waits, asynchronously, until all the associated [`WaitGuard`]s have finished.
156 pub async fn wait(mut self) {
157 let None = self.recv.recv().await;
158 }
159}