include:
- os: ubuntu-latest
steps:
+ - name: check if io-uring is supported in the CI environment
+ run: |
+ # Try to read the io-uring setting in the kernel config file.
+ # https://github.com/torvalds/linux/blob/75f5f23f8787c5e184fcb2fbcd02d8e9317dc5e7/init/Kconfig#L1782-L1789
+ CONFIG_FILE="/boot/config-$(uname -r)"
+ echo "Checking $CONFIG_FILE for io-uring support"
+ if ! grep -q "CONFIG_IO_URING=y" "$CONFIG_FILE"; then
+ echo "Error: io_uring is not supported"
+ exit 1
+ fi
+
- uses: actions/checkout@v4
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@stable
[dev-dependencies]
tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
+tokio-util = { version = "0.7", path = "../tokio-util", features = ["rt"] }
futures = { version = "0.3.0", features = ["async-await"] }
mockall = "0.13.0"
async-stream = "0.3"
[package.metadata.docs.rs]
all-features = true
# enable unstable features in the documentation
-rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
+rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
# it's necessary to _also_ pass `--cfg tokio_unstable` and `--cfg tokio_taskdump`
# to rustc, or else dependencies will not be enabled, and the docs build will fail.
-rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
+rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
[package.metadata.playground]
features = ["full", "test-util"]
mod open_options;
pub use self::open_options::OpenOptions;
+cfg_tokio_uring! {
+ pub(crate) use self::open_options::UringOpenOptions;
+}
mod read;
pub use self::read::read;
use std::io;
use std::path::Path;
+cfg_tokio_uring! {
+ mod uring_open_options;
+ pub(crate) use uring_open_options::UringOpenOptions;
+ use crate::runtime::driver::op::Op;
+}
+
#[cfg(test)]
mod mock_open_options;
#[cfg(test)]
/// }
/// ```
#[derive(Clone, Debug)]
-pub struct OpenOptions(StdOpenOptions);
+pub struct OpenOptions {
+ inner: Kind,
+}
+
+#[derive(Debug, Clone)]
+enum Kind {
+ Std(StdOpenOptions),
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Uring(UringOpenOptions),
+}
impl OpenOptions {
/// Creates a blank new set of options ready for configuration.
/// let future = options.read(true).open("foo.txt");
/// ```
pub fn new() -> OpenOptions {
- OpenOptions(StdOpenOptions::new())
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ let inner = Kind::Uring(UringOpenOptions::new());
+ #[cfg(not(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux")))]
+ let inner = Kind::Std(StdOpenOptions::new());
+
+ OpenOptions { inner }
}
/// Sets the option for read access.
/// }
/// ```
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
- self.0.read(read);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.read(read);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.read(read);
+ }
+ }
self
}
/// }
/// ```
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
- self.0.write(write);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.write(write);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.write(write);
+ }
+ }
self
}
/// }
/// ```
pub fn append(&mut self, append: bool) -> &mut OpenOptions {
- self.0.append(append);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.append(append);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.append(append);
+ }
+ }
self
}
/// }
/// ```
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
- self.0.truncate(truncate);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.truncate(truncate);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.truncate(truncate);
+ }
+ }
self
}
/// }
/// ```
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
- self.0.create(create);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.create(create);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.create(create);
+ }
+ }
self
}
/// }
/// ```
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
- self.0.create_new(create_new);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.create_new(create_new);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.create_new(create_new);
+ }
+ }
self
}
/// open files, too long filename, too many symbolic links in the
/// specified path (Unix-like systems only), etc.
///
+ /// # io_uring support
+ ///
+ /// On Linux, you can also use `io_uring` for executing system calls.
+ /// To enable `io_uring`, you need to specify the `--cfg tokio_uring` flag
+ /// at compile time and set the `Builder::enable_io_uring` runtime option.
+ ///
+ /// Support for `io_uring` is currently experimental, so its behavior may
+ /// change or it may be removed in future versions.
+ ///
/// # Examples
///
/// ```no_run
/// [`Other`]: std::io::ErrorKind::Other
/// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied
pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
+ match &self.inner {
+ Kind::Std(opts) => Self::std_open(opts, path).await,
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ let handle = crate::runtime::Handle::current();
+ let driver_handle = handle.inner.driver().io();
+
+ if driver_handle.check_and_init()? {
+ Op::open(path.as_ref(), opts)?.await
+ } else {
+ let opts = opts.clone().into();
+ Self::std_open(&opts, path).await
+ }
+ }
+ }
+ }
+
+ async fn std_open(opts: &StdOpenOptions, path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
- let opts = self.0.clone();
+ let opts = opts.clone();
let std = asyncify(move || opts.open(path)).await?;
Ok(File::from_std(std))
}
- /// Returns a mutable reference to the underlying `std::fs::OpenOptions`
- #[cfg(any(windows, unix))]
+ #[cfg(windows)]
pub(super) fn as_inner_mut(&mut self) -> &mut StdOpenOptions {
- &mut self.0
+ match &mut self.inner {
+ Kind::Std(ref mut opts) => opts,
+ }
}
}
/// }
/// ```
pub fn mode(&mut self, mode: u32) -> &mut OpenOptions {
- self.as_inner_mut().mode(mode);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.mode(mode);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.mode(mode);
+ }
+ }
self
}
/// }
/// ```
pub fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
- self.as_inner_mut().custom_flags(flags);
+ match &mut self.inner {
+ Kind::Std(opts) => {
+ opts.custom_flags(flags);
+ }
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+ Kind::Uring(opts) => {
+ opts.custom_flags(flags);
+ }
+ }
self
}
}
impl From<StdOpenOptions> for OpenOptions {
fn from(options: StdOpenOptions) -> OpenOptions {
- OpenOptions(options)
+ OpenOptions {
+ inner: Kind::Std(options),
+ // TODO: Add support for converting `StdOpenOptions` to `UringOpenOptions`
+ // if user enables the `--cfg tokio_uring`. It is blocked by:
+ // * https://github.com/rust-lang/rust/issues/74943
+ // * https://github.com/rust-lang/rust/issues/76801
+ }
}
}
--- /dev/null
+use std::{io, os::unix::fs::OpenOptionsExt};
+
+#[cfg(test)]
+use super::mock_open_options::MockOpenOptions as StdOpenOptions;
+#[cfg(not(test))]
+use std::fs::OpenOptions as StdOpenOptions;
+
+#[derive(Debug, Clone)]
+pub(crate) struct UringOpenOptions {
+ pub(crate) read: bool,
+ pub(crate) write: bool,
+ pub(crate) append: bool,
+ pub(crate) truncate: bool,
+ pub(crate) create: bool,
+ pub(crate) create_new: bool,
+ pub(crate) mode: libc::mode_t,
+ pub(crate) custom_flags: libc::c_int,
+}
+
+impl UringOpenOptions {
+ pub(crate) fn new() -> Self {
+ Self {
+ read: false,
+ write: false,
+ append: false,
+ truncate: false,
+ create: false,
+ create_new: false,
+ mode: 0o666,
+ custom_flags: 0,
+ }
+ }
+
+ pub(crate) fn append(&mut self, append: bool) -> &mut Self {
+ self.append = append;
+ self
+ }
+
+ pub(crate) fn create(&mut self, create: bool) -> &mut Self {
+ self.create = create;
+ self
+ }
+
+ pub(crate) fn create_new(&mut self, create_new: bool) -> &mut Self {
+ self.create_new = create_new;
+ self
+ }
+
+ pub(crate) fn read(&mut self, read: bool) -> &mut Self {
+ self.read = read;
+ self
+ }
+
+ pub(crate) fn write(&mut self, write: bool) -> &mut Self {
+ self.write = write;
+ self
+ }
+
+ pub(crate) fn truncate(&mut self, truncate: bool) -> &mut Self {
+ self.truncate = truncate;
+ self
+ }
+
+ pub(crate) fn mode(&mut self, mode: u32) -> &mut Self {
+ self.mode = mode as libc::mode_t;
+ self
+ }
+
+ pub(crate) fn custom_flags(&mut self, flags: i32) -> &mut Self {
+ self.custom_flags = flags;
+ self
+ }
+
+ // Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1118-L1127
+ pub(crate) fn access_mode(&self) -> io::Result<libc::c_int> {
+ match (self.read, self.write, self.append) {
+ (true, false, false) => Ok(libc::O_RDONLY),
+ (false, true, false) => Ok(libc::O_WRONLY),
+ (true, true, false) => Ok(libc::O_RDWR),
+ (false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND),
+ (true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND),
+ (false, false, false) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
+ }
+ }
+
+ // Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1129-L1151
+ pub(crate) fn creation_mode(&self) -> io::Result<libc::c_int> {
+ match (self.write, self.append) {
+ (true, false) => {}
+ (false, false) => {
+ if self.truncate || self.create || self.create_new {
+ return Err(io::Error::from_raw_os_error(libc::EINVAL));
+ }
+ }
+ (_, true) => {
+ if self.truncate && !self.create_new {
+ return Err(io::Error::from_raw_os_error(libc::EINVAL));
+ }
+ }
+ }
+
+ Ok(match (self.create, self.truncate, self.create_new) {
+ (false, false, false) => 0,
+ (true, false, false) => libc::O_CREAT,
+ (false, true, false) => libc::O_TRUNC,
+ (true, true, false) => libc::O_CREAT | libc::O_TRUNC,
+ (_, _, true) => libc::O_CREAT | libc::O_EXCL,
+ })
+ }
+}
+
+impl From<UringOpenOptions> for StdOpenOptions {
+ fn from(value: UringOpenOptions) -> Self {
+ let mut std = StdOpenOptions::new();
+
+ std.append(value.append);
+ std.create(value.create);
+ std.create_new(value.create_new);
+ std.read(value.read);
+ std.truncate(value.truncate);
+ std.write(value.write);
+
+ std.mode(value.mode);
+ std.custom_flags(value.custom_flags);
+
+ std
+ }
+}
pub(crate) use crate::blocking::JoinHandle as Blocking;
}
}
+
+cfg_tokio_uring! {
+ pub(crate) mod uring;
+}
--- /dev/null
+pub(crate) mod open;
+pub(crate) mod utils;
--- /dev/null
+use super::utils::cstr;
+use crate::{
+ fs::UringOpenOptions,
+ runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
+};
+use io_uring::{opcode, types};
+use std::{ffi::CString, io, os::fd::FromRawFd, path::Path};
+
+#[derive(Debug)]
+pub(crate) struct Open {
+ /// This field will be read by the kernel during the operation, so we
+ /// need to ensure it is valid for the entire duration of the operation.
+ #[allow(dead_code)]
+ path: CString,
+}
+
+impl Completable for Open {
+ type Output = crate::fs::File;
+ fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
+ let fd = cqe.result? as i32;
+ let file = unsafe { crate::fs::File::from_raw_fd(fd) };
+ Ok(file)
+ }
+}
+
+impl Cancellable for Open {
+ fn cancel(self) -> CancelData {
+ CancelData::Open(self)
+ }
+}
+
+impl Op<Open> {
+ /// Submit a request to open a file.
+ pub(crate) fn open(path: &Path, options: &UringOpenOptions) -> io::Result<Op<Open>> {
+ let inner_opt = options;
+ let path = cstr(path)?;
+
+ let custom_flags = inner_opt.custom_flags;
+ let flags = libc::O_CLOEXEC
+ | options.access_mode()?
+ | options.creation_mode()?
+ | (custom_flags & !libc::O_ACCMODE);
+
+ let open_op = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
+ .flags(flags)
+ .mode(inner_opt.mode)
+ .build();
+
+ // SAFETY: Parameters are valid for the entire duration of the operation
+ let op = unsafe { Op::new(open_op, Open { path }) };
+ Ok(op)
+ }
+}
--- /dev/null
+use std::os::unix::ffi::OsStrExt;
+use std::{ffi::CString, io, path::Path};
+
+pub(crate) fn cstr(p: &Path) -> io::Result<CString> {
+ Ok(CString::new(p.as_os_str().as_bytes())?)
+}
all(unix, feature = "signal")
))]
self.enable_io();
+
+ #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
+ self.enable_io_uring();
+
#[cfg(feature = "time")]
self.enable_time();
}
}
+cfg_tokio_uring! {
+ impl Builder {
+ /// Enables the tokio's io_uring driver.
+ ///
+ /// Doing this enables using io_uring operations on the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new_multi_thread()
+ /// .enable_io_uring()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ #[cfg_attr(docsrs, doc(cfg(tokio_uring)))]
+ pub fn enable_io_uring(&mut self) -> &mut Self {
+ // Currently, the uring flag is equivalent to `enable_io`.
+ self.enable_io = true;
+ self
+ }
+ }
+}
+
cfg_test_util! {
impl Builder {
/// Controls if the runtime's clock starts paused or advancing.
+use crate::io::uring::open::Open;
use crate::runtime::Handle;
use io_uring::cqueue;
use io_uring::squeue::Entry;
use std::{io, mem};
#[derive(Debug)]
-pub(crate) enum CancelData {}
+pub(crate) enum CancelData {
+ Open(
+ // This field isn't accessed directly, but it holds cancellation data,
+ // so `#[allow(dead_code)]` is needed.
+ #[allow(dead_code)] Open,
+ ),
+}
#[derive(Debug)]
pub(crate) enum Lifecycle {
/// The submitter no longer has interest in the operation result. The state
/// must be passed to the driver and held until the operation completes.
- Cancelled(CancelData),
+ Cancelled(
+ // This field isn't accessed directly, but it holds cancellation data,
+ // so `#[allow(dead_code)]` is needed.
+ #[allow(dead_code)] CancelData,
+ ),
/// The operation has completed with a single cqe result
Completed(io_uring::cqueue::Entry),
}
pub(crate) enum State {
- #[allow(dead_code)]
Initialize(Option<Entry>),
Polled(usize),
Complete,
///
/// Callers must ensure that parameters of the entry (such as buffer) are valid and will
/// be valid for the entire duration of the operation, otherwise it may cause memory problems.
- #[allow(dead_code)]
pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
let handle = Handle::current();
Self {
/// A single CQE result
pub(crate) struct CqeResult {
- #[allow(dead_code)]
pub(crate) result: io::Result<u32>,
}
self.submit().expect("Internal error when dropping driver");
}
- let mut cancel_ops = Slab::new();
- let mut keys_to_move = Vec::new();
-
- for (key, lifecycle) in self.ops.iter() {
- match lifecycle {
- Lifecycle::Waiting(_) | Lifecycle::Submitted | Lifecycle::Cancelled(_) => {
- // these should be cancelled
- keys_to_move.push(key);
- }
- // We don't wait for completed ops.
- Lifecycle::Completed(_) => {}
- }
- }
+ let mut ops = std::mem::take(&mut self.ops);
- for key in keys_to_move {
- let lifecycle = self.remove_op(key);
- cancel_ops.insert(lifecycle);
- }
+ // Remove all completed ops since we don't need to wait for them.
+ ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
- while !cancel_ops.is_empty() {
+ while !ops.is_empty() {
// Wait until at least one completion is available.
self.ring_mut()
.submit_and_wait(1)
for cqe in self.ring_mut().completion() {
let idx = cqe.user_data() as usize;
- cancel_ops.remove(idx);
+ ops.remove(idx);
}
}
}
}
impl Handle {
- #[allow(dead_code)]
fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
let mut source = SourceFd(&uringfd);
self.registry
submit_or_remove(ctx)?;
}
+ // Ensure that the completion queue is not full before submitting the entry.
+ while ctx.ring_mut().completion().is_full() {
+ ctx.dispatch_completions();
+ }
+
// Note: For now, we submit the entry immediately without utilizing batching.
submit_or_remove(ctx)?;
Ok(index)
}
- // TODO: Remove this annotation when operations are actually supported
- #[allow(unused_variables, unreachable_code)]
pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
let mut guard = self.get_uring().lock();
let ctx = &mut *guard;
pub(crate) mod park;
-mod driver;
+pub(crate) mod driver;
pub(crate) mod scheduler;
let mode = format!("{:?}", OpenOptions::new().mode(0o644));
// TESTING HACK: use Debug output to check the stored data
assert!(
- mode.contains("mode: 420 ") || mode.contains("mode: 0o000644 "),
+ mode.contains("mode: 420") || mode.contains("mode: 0o000644"),
"mode is: {mode}"
);
}
// TESTING HACK: use Debug output to check the stored data
assert!(
format!("{:?}", OpenOptions::new().custom_flags(libc::O_TRUNC))
- .contains("custom_flags: 512,")
+ .contains("custom_flags: 512")
);
}
--- /dev/null
+//! Uring file operations tests.
+
+#![cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+
+use futures::future::FutureExt;
+use std::sync::mpsc;
+use std::task::Poll;
+use std::time::Duration;
+use std::{future::poll_fn, path::PathBuf};
+use tempfile::NamedTempFile;
+use tokio::{
+ fs::OpenOptions,
+ runtime::{Builder, Runtime},
+};
+use tokio_util::task::TaskTracker;
+
+fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
+ Box::new(move || {
+ Builder::new_multi_thread()
+ .worker_threads(n)
+ .enable_all()
+ .build()
+ .unwrap()
+ })
+}
+
+fn current_rt() -> Box<dyn Fn() -> Runtime> {
+ Box::new(|| Builder::new_current_thread().enable_all().build().unwrap())
+}
+
+fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
+ vec![
+ current_rt(),
+ multi_rt(1),
+ multi_rt(2),
+ multi_rt(8),
+ multi_rt(64),
+ multi_rt(256),
+ ]
+}
+
+#[test]
+fn shutdown_runtime_while_performing_io_uring_ops() {
+ fn run(rt: Runtime) {
+ let (tx, rx) = mpsc::channel();
+ let (done_tx, done_rx) = mpsc::channel();
+
+ let (_tmp, path) = create_tmp_files(1);
+ rt.spawn(async move {
+ let path = path[0].clone();
+
+ // spawning a bunch of uring operations.
+ loop {
+ let path = path.clone();
+ tokio::spawn(async move {
+ let mut opt = OpenOptions::new();
+ opt.read(true);
+ opt.open(&path).await.unwrap();
+ });
+
+ // Avoid busy looping.
+ tokio::task::yield_now().await;
+ }
+ });
+
+ std::thread::spawn(move || {
+ let rt: Runtime = rx.recv().unwrap();
+ rt.shutdown_timeout(Duration::from_millis(300));
+ done_tx.send(()).unwrap();
+ });
+
+ tx.send(rt).unwrap();
+ done_rx.recv().unwrap();
+ }
+
+ for rt in rt_combinations() {
+ run(rt());
+ }
+}
+
+#[test]
+fn open_many_files() {
+ fn run(rt: Runtime) {
+ const NUM_FILES: usize = 512;
+
+ let (_tmp_files, paths): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(NUM_FILES);
+
+ rt.block_on(async move {
+ let tracker = TaskTracker::new();
+
+ for i in 0..10_000 {
+ let path = paths.get(i % NUM_FILES).unwrap().clone();
+ tracker.spawn(async move {
+ let _file = OpenOptions::new().read(true).open(path).await.unwrap();
+ });
+ }
+ tracker.close();
+ tracker.wait().await;
+ });
+ }
+
+ for rt in rt_combinations() {
+ run(rt());
+ }
+}
+
+#[tokio::test]
+async fn cancel_op_future() {
+ let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
+
+ let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
+ let handle = tokio::spawn(async move {
+ poll_fn(|cx| {
+ let opt = {
+ let mut opt = tokio::fs::OpenOptions::new();
+ opt.read(true);
+ opt
+ };
+
+ let fut = opt.open(&path[0]);
+
+ // If io_uring is enabled (and not falling back to the thread pool),
+ // the first poll should return Pending.
+ let _pending = Box::pin(fut).poll_unpin(cx);
+
+ tx.send(()).unwrap();
+
+ Poll::<()>::Pending
+ })
+ .await;
+ });
+
+ // Wait for the first poll
+ rx.recv().await.unwrap();
+
+ handle.abort();
+
+ let res = handle.await.unwrap_err();
+ assert!(res.is_cancelled());
+}
+
+fn create_tmp_files(num_files: usize) -> (Vec<NamedTempFile>, Vec<PathBuf>) {
+ let mut files = Vec::with_capacity(num_files);
+ for _ in 0..num_files {
+ let tmp = NamedTempFile::new().unwrap();
+ let path = tmp.path().to_path_buf();
+ files.push((tmp, path));
+ }
+
+ files.into_iter().unzip()
+}