Add tests for task collections (TaskTracker, JoinSet, JoinMap).
use std::panic::AssertUnwindSafe;
+use futures::future::{pending, FutureExt};
use tokio::sync::oneshot;
+use tokio::task::LocalSet;
use tokio::time::Duration;
use tokio_util::task::JoinMap;
-use futures::future::FutureExt;
-
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
}
+// Spawn `N` tasks that return their index (`i`).
+fn spawn_index_tasks(map: &mut JoinMap<usize, usize>, n: usize, on: Option<&LocalSet>) {
+ for i in 0..n {
+ let rc = std::rc::Rc::new(i);
+ match on {
+ None => map.spawn_local(i, async move { *rc }),
+ Some(local) => map.spawn_local_on(i, async move { *rc }, local),
+ };
+ }
+}
+
+// Spawn `N` “pending” tasks that own a `oneshot::Sender`.
+// When the task is aborted the sender is dropped, which is observed
+// via the returned `Receiver`s.
+fn spawn_pending_tasks(
+ map: &mut JoinMap<usize, ()>,
+ receivers: &mut Vec<oneshot::Receiver<()>>,
+ n: usize,
+ on: Option<&LocalSet>,
+) {
+ for i in 0..n {
+ let (tx, rx) = oneshot::channel::<()>();
+ receivers.push(rx);
+
+ let fut = async move {
+ pending::<()>().await;
+ drop(tx);
+ };
+ match on {
+ None => map.spawn_local(i, fut),
+ Some(local) => map.spawn_local_on(i, fut, local),
+ };
+ }
+}
+
+/// Await every task in JoinMap and assert every task returns its own key.
+async fn drain_joinmap_and_assert(mut map: JoinMap<usize, usize>, n: usize) {
+ let mut seen = vec![false; n];
+ while let Some((k, res)) = map.join_next().await {
+ let v = res.expect("task panicked");
+ assert_eq!(k, v);
+ seen[v] = true;
+ }
+ assert!(seen.into_iter().all(|b| b));
+ assert!(map.is_empty());
+}
+
+// Await every receiver and assert they all return `Err` because the
+// corresponding sender (inside an aborted task) was dropped.
+async fn await_receivers_and_assert(receivers: Vec<oneshot::Receiver<()>>) {
+ for rx in receivers {
+ assert!(
+ rx.await.is_err(),
+ "task should have been aborted and sender dropped"
+ );
+ }
+}
+
#[tokio::test(start_paused = true)]
async fn test_with_sleep() {
let mut map = JoinMap::new();
assert!(map.join_next().await.is_none());
}
+
+mod spawn_local {
+ use super::*;
+
+ #[cfg(tokio_unstable)]
+ mod local_runtime {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+
+ let mut map = JoinMap::new();
+ spawn_index_tasks(&mut map, N, None);
+
+ assert!(map.join_next().now_or_never().is_none());
+ drain_joinmap_and_assert(map, N).await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, None);
+ assert!(map.join_next().now_or_never().is_none());
+
+ map.shutdown().await;
+ assert!(map.is_empty());
+ await_receivers_and_assert(receivers).await;
+ }
+
+ /// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, None);
+ assert!(map.join_next().now_or_never().is_none());
+
+ drop(map);
+ await_receivers_and_assert(receivers).await;
+ }
+ }
+
+ mod local_set {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async move {
+ let mut map = JoinMap::new();
+ spawn_index_tasks(&mut map, N, None);
+ drain_joinmap_and_assert(map, N).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async {
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, None);
+ assert!(map.join_next().now_or_never().is_none());
+
+ map.shutdown().await;
+ assert!(map.is_empty());
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async {
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, None);
+ assert!(map.join_next().now_or_never().is_none());
+
+ drop(map);
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+ }
+}
+
+mod spawn_local_on {
+ use super::*;
+
+ #[cfg(tokio_unstable)]
+ mod local_runtime {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+
+ let local = LocalSet::new();
+ let mut map = JoinMap::new();
+
+ spawn_index_tasks(&mut map, N, Some(&local));
+ assert!(map.join_next().now_or_never().is_none());
+
+ local
+ .run_until(async move {
+ drain_joinmap_and_assert(map, N).await;
+ })
+ .await;
+ }
+ }
+
+ mod local_set {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let mut pending_map = JoinMap::new();
+
+ spawn_index_tasks(&mut pending_map, N, Some(&local));
+ assert!(pending_map.join_next().now_or_never().is_none());
+
+ local
+ .run_until(async move {
+ drain_joinmap_and_assert(pending_map, N).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
+ assert!(map.join_next().now_or_never().is_none());
+
+ local
+ .run_until(async move {
+ map.shutdown().await;
+ assert!(map.is_empty());
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks and then drop the [`JoinMap`]
+ /// before the `LocalSet` is driven and while the `LocalSet` is already driven.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+
+ {
+ let local = LocalSet::new();
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
+ assert!(map.join_next().now_or_never().is_none());
+
+ drop(map);
+ local
+ .run_until(async move { await_receivers_and_assert(receivers).await })
+ .await;
+ }
+
+ {
+ let local = LocalSet::new();
+ let mut map = JoinMap::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
+ assert!(map.join_next().now_or_never().is_none());
+
+ local
+ .run_until(async move {
+ drop(map);
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+ }
+ }
+}
#![warn(rust_2018_idioms)]
+use futures::future::pending;
+#[cfg(tokio_unstable)]
+use std::rc::Rc;
+use tokio::sync::mpsc;
+use tokio::task::LocalSet;
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::task::TaskTracker;
assert_ready!(wait.poll());
}
}
+
+#[cfg(tokio_unstable)]
+mod spawn {
+ use super::*;
+
+ /// Spawn several tasks, and then close the [`TaskTracker`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_close() {
+ const N: usize = 8;
+
+ let tracker = TaskTracker::new();
+
+ for _ in 0..N {
+ tracker.spawn(async {});
+ }
+
+ for _ in 0..N {
+ tracker.spawn_on(async {}, &tokio::runtime::Handle::current());
+ }
+
+ tracker.close();
+ tracker.wait().await;
+
+ assert!(tracker.is_empty());
+ assert!(tracker.is_closed());
+ }
+}
+
+#[cfg(tokio_unstable)]
+mod spawn_local {
+ use super::*;
+
+ /// Spawn several tasks, and then close the [`TaskTracker`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_close() {
+ const N: usize = 8;
+
+ let tracker = TaskTracker::new();
+
+ for _ in 0..N {
+ let rc = Rc::new(());
+ tracker.spawn_local(async move {
+ drop(rc);
+ });
+ }
+
+ tracker.close();
+ tracker.wait().await;
+
+ assert!(tracker.is_empty());
+ assert!(tracker.is_closed());
+ }
+
+ /// Close the [`TaskTracker`], and then spawn several tasks
+ #[tokio::test(flavor = "local")]
+ async fn spawn_after_close() {
+ const N: usize = 8;
+
+ let tracker = TaskTracker::new();
+
+ tracker.close();
+
+ for _ in 0..N {
+ let rc = Rc::new(());
+ tracker.spawn_local(async move {
+ drop(rc);
+ });
+ }
+
+ tracker.wait().await;
+
+ assert!(tracker.is_closed());
+ assert!(tracker.is_empty());
+ }
+}
+
+mod spawn_local_on {
+ use super::*;
+
+ #[cfg(tokio_unstable)]
+ mod local_runtime {
+ use super::*;
+
+ /// Spawn several tasks, and then close the [`TaskTracker`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_close() {
+ const N: usize = 8;
+ let local_set = LocalSet::new();
+
+ let tracker = TaskTracker::new();
+
+ for _ in 0..N {
+ let rc = Rc::new(());
+ tracker.spawn_local_on(
+ async move {
+ drop(rc);
+ },
+ &local_set,
+ );
+ }
+
+ local_set
+ .run_until(async {
+ tracker.close();
+ tracker.wait().await;
+
+ assert!(tracker.is_empty());
+ assert!(tracker.is_closed());
+ })
+ .await;
+ }
+ }
+
+ mod local_set {
+ use super::*;
+
+ /// Spawn several pending-forever tasks, and then drop the [`TaskTracker`]
+ /// while the `LocalSet` is already driven.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let tracker = TaskTracker::new();
+ let (tx, mut rx) = mpsc::unbounded_channel::<()>();
+
+ for _i in 0..N {
+ let tx = tx.clone();
+ tracker.spawn_local_on(
+ async move {
+ pending::<()>().await;
+ drop(tx);
+ },
+ &local,
+ );
+ }
+ drop(tx);
+
+ local
+ .run_until(async move {
+ drop(tracker);
+ tokio::task::yield_now().await;
+
+ use tokio::sync::mpsc::error::TryRecvError;
+
+ assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
+ })
+ .await;
+ }
+
+ /// Close the tracker first, spawn several pending-forever tasks,
+ /// then wait while the`LocalSet` is already driven.
+ #[tokio::test(flavor = "current_thread")]
+ async fn close_then_spawn() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let tracker = TaskTracker::new();
+
+ tracker.close();
+
+ for _ in 0..N {
+ let rc = std::rc::Rc::new(());
+ tracker.spawn_local_on(
+ async move {
+ drop(rc);
+ },
+ &local,
+ );
+ }
+
+ local
+ .run_until(async move {
+ tracker.wait().await;
+ assert!(tracker.is_closed());
+ assert!(tracker.is_empty());
+ })
+ .await;
+ }
+ }
+}
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", tokio_unstable))]
+use std::panic;
use tokio::runtime::LocalOptions;
use tokio::task::spawn_local;
+use tokio::task::LocalSet;
#[test]
fn test_spawn_local_in_runtime() {
spawn_local(async {});
}
+// This test guarantees that **`tokio::task::spawn_local` panics** when it is invoked
+// from a thread that is *not* running the `LocalRuntime` / `LocalSet` to which
+// the task would belong.
+// The test creates a `LocalRuntime` and `LocalSet`, drives the `LocalSet` on the `LocalRuntime`'s thread,
+// then spawns a **separate OS thread** and tries to call
+// `tokio::task::spawn_local` there. `std::panic::catch_unwind` is then used
+// to capture the panic and to assert that it indeed occurs.
+#[test]
+#[cfg_attr(target_family = "wasm", ignore)] // threads not supported
+fn test_spawn_local_panic() {
+ let rt = rt();
+ let local = LocalSet::new();
+
+ rt.block_on(local.run_until(async {
+ let thread_result = std::thread::spawn(|| {
+ let panic_result = panic::catch_unwind(|| {
+ let _jh = tokio::task::spawn_local(async {
+ println!("you will never see this line");
+ });
+ });
+ assert!(panic_result.is_err(), "Expected panic, but none occurred");
+ })
+ .join();
+ assert!(thread_result.is_ok(), "Thread itself panicked unexpectedly");
+ }));
+}
+
#[test]
#[should_panic = "`spawn_local` called from outside of a `task::LocalSet` or `runtime::LocalRuntime`"]
fn test_spawn_local_in_current_thread_runtime() {
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
-use futures::future::FutureExt;
+use futures::future::{pending, FutureExt};
+use std::panic;
use tokio::sync::oneshot;
-use tokio::task::JoinSet;
+use tokio::task::{JoinSet, LocalSet};
use tokio::time::Duration;
fn rt() -> tokio::runtime::Runtime {
.unwrap()
}
+// Spawn `N` tasks that return their index (`i`).
+fn spawn_index_tasks(set: &mut JoinSet<usize>, n: usize, on: Option<&LocalSet>) {
+ for i in 0..n {
+ let rc = std::rc::Rc::new(i);
+ match on {
+ None => set.spawn_local(async move { *rc }),
+ Some(local) => set.spawn_local_on(async move { *rc }, local),
+ };
+ }
+}
+
+// Spawn `N` “pending” tasks that own a `oneshot::Sender`.
+// When the task is aborted the sender is dropped, which is observed
+// via the returned `Receiver`s.
+fn spawn_pending_tasks(
+ set: &mut JoinSet<()>,
+ receivers: &mut Vec<oneshot::Receiver<()>>,
+ n: usize,
+ on: Option<&LocalSet>,
+) {
+ for _ in 0..n {
+ let (tx, rx) = oneshot::channel::<()>();
+ receivers.push(rx);
+
+ let fut = async move {
+ pending::<()>().await;
+ drop(tx);
+ };
+
+ match on {
+ None => set.spawn_local(fut),
+ Some(local) => set.spawn_local_on(fut, local),
+ };
+ }
+}
+
+// Await every task in a JoinSet and assert every task returns its own index.
+async fn drain_joinset_and_assert(mut set: JoinSet<usize>, n: usize) {
+ let mut seen = vec![false; n];
+ while let Some(res) = set.join_next().await {
+ let idx = res.expect("task panicked");
+ seen[idx] = true;
+ }
+ assert!(seen.into_iter().all(|b| b));
+ assert!(set.is_empty());
+}
+
+// Await every receiver and assert they all return `Err` because the
+// corresponding sender (inside an aborted task) was dropped.
+async fn await_receivers_and_assert(receivers: Vec<oneshot::Receiver<()>>) {
+ for rx in receivers {
+ assert!(
+ rx.await.is_err(),
+ "the task should have been aborted and the sender dropped"
+ );
+ }
+}
+
#[tokio::test(start_paused = true)]
async fn test_with_sleep() {
let mut set = JoinSet::new();
assert_eq!(count, TASK_NUM);
assert_eq!(joined, spawned);
}
+
+mod spawn_local {
+ use super::*;
+
+ #[cfg(tokio_unstable)]
+ mod local_runtime {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+
+ let mut set = JoinSet::new();
+ spawn_index_tasks(&mut set, N, None);
+
+ assert!(set.try_join_next().is_none());
+ drain_joinset_and_assert(set, N).await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, None);
+
+ assert!(set.try_join_next().is_none());
+ set.shutdown().await;
+ assert!(set.is_empty());
+
+ await_receivers_and_assert(receivers).await;
+ }
+
+ /// Spawn several pending-forever tasks, and then drop the [`JoinSet`].
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, None);
+
+ assert!(set.try_join_next().is_none());
+ drop(set);
+
+ await_receivers_and_assert(receivers).await;
+ }
+ }
+
+ mod local_set {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async move {
+ let mut set = JoinSet::new();
+ spawn_index_tasks(&mut set, N, None);
+ drain_joinset_and_assert(set, N).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async {
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, None);
+ assert!(set.try_join_next().is_none());
+
+ set.shutdown().await;
+ assert!(set.is_empty());
+
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then drop the [`JoinSet`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+
+ local
+ .run_until(async {
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, None);
+ assert!(set.try_join_next().is_none());
+
+ drop(set);
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+ }
+}
+
+mod spawn_local_on {
+ use super::*;
+
+ #[cfg(tokio_unstable)]
+ mod local_runtime {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "local")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+
+ let local = LocalSet::new();
+ let mut set = JoinSet::new();
+
+ spawn_index_tasks(&mut set, N, Some(&local));
+ assert!(set.try_join_next().is_none());
+
+ local
+ .run_until(async move {
+ drain_joinset_and_assert(set, N).await;
+ })
+ .await;
+ }
+ }
+
+ mod local_set {
+ use super::*;
+
+ /// Spawn several tasks, and then join all tasks.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_join_next() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let mut pending_set = JoinSet::new();
+
+ spawn_index_tasks(&mut pending_set, N, Some(&local));
+ assert!(pending_set.try_join_next().is_none());
+
+ local
+ .run_until(async move {
+ drain_joinset_and_assert(pending_set, N).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`].
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_shutdown() {
+ const N: usize = 8;
+ let local = LocalSet::new();
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local));
+ assert!(set.try_join_next().is_none());
+
+ local
+ .run_until(async move {
+ set.shutdown().await;
+ assert!(set.is_empty());
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+
+ /// Spawn several pending-forever tasks and then drop the [`JoinSet`]
+ /// before the `LocalSet` is driven and while the `LocalSet` is already driven.
+ #[tokio::test(flavor = "current_thread")]
+ async fn spawn_then_drop() {
+ const N: usize = 8;
+
+ {
+ let local = LocalSet::new();
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local));
+ assert!(set.try_join_next().is_none());
+
+ drop(set);
+
+ local
+ .run_until(async move {
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+
+ {
+ let local = LocalSet::new();
+ let mut set = JoinSet::new();
+ let mut receivers = Vec::new();
+
+ spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local));
+ assert!(set.try_join_next().is_none());
+
+ local
+ .run_until(async move {
+ drop(set);
+ await_receivers_and_assert(receivers).await;
+ })
+ .await;
+ }
+ }
+ }
+}