]> git.feebdaed.xyz Git - 0xmirror/tokio.git/commitdiff
fs: add io_uring `open` operation (#7321)
authorMotoyuki Kimura <moymoymox@gmail.com>
Fri, 8 Aug 2025 07:51:42 +0000 (16:51 +0900)
committerGitHub <noreply@github.com>
Fri, 8 Aug 2025 07:51:42 +0000 (09:51 +0200)
15 files changed:
.github/workflows/ci.yml
tokio/Cargo.toml
tokio/src/fs/mod.rs
tokio/src/fs/open_options.rs
tokio/src/fs/open_options/uring_open_options.rs [new file with mode: 0644]
tokio/src/io/mod.rs
tokio/src/io/uring/mod.rs [new file with mode: 0644]
tokio/src/io/uring/open.rs [new file with mode: 0644]
tokio/src/io/uring/utils.rs [new file with mode: 0644]
tokio/src/runtime/builder.rs
tokio/src/runtime/driver/op.rs
tokio/src/runtime/io/driver/uring.rs
tokio/src/runtime/mod.rs
tokio/tests/fs_open_options.rs
tokio/tests/fs_uring.rs [new file with mode: 0644]

index e53d566dcf378faf4f7429789e30748b27ae1e13..91027f9da8297b333d8a450c1aae318e80fd4003 100644 (file)
@@ -359,6 +359,17 @@ jobs:
         include:
           - os: ubuntu-latest
     steps:
+      - name: check if io-uring is supported in the CI environment
+        run: |
+          # Try to read the io-uring setting in the kernel config file.
+          # https://github.com/torvalds/linux/blob/75f5f23f8787c5e184fcb2fbcd02d8e9317dc5e7/init/Kconfig#L1782-L1789
+          CONFIG_FILE="/boot/config-$(uname -r)"
+          echo "Checking $CONFIG_FILE for io-uring support"
+          if ! grep -q "CONFIG_IO_URING=y" "$CONFIG_FILE"; then
+            echo "Error: io_uring is not supported"
+            exit 1
+          fi
+
       - uses: actions/checkout@v4
       - name: Install Rust ${{ env.rust_stable }}
         uses: dtolnay/rust-toolchain@stable
index 8512cc576b6c89a96de40d52b6d7b9d2e542c94e..52085e9e3a10dd8562c730b3ca9edb00b97e4041 100644 (file)
@@ -136,6 +136,7 @@ features = [
 [dev-dependencies]
 tokio-test = { version = "0.4.0", path = "../tokio-test" }
 tokio-stream = { version = "0.1", path = "../tokio-stream" }
+tokio-util = { version = "0.7", path = "../tokio-util", features = ["rt"] }
 futures = { version = "0.3.0", features = ["async-await"] }
 mockall = "0.13.0"
 async-stream = "0.3"
@@ -164,10 +165,10 @@ tracing-mock = "= 0.1.0-beta.1"
 [package.metadata.docs.rs]
 all-features = true
 # enable unstable features in the documentation
-rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
+rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
 # it's necessary to _also_ pass `--cfg tokio_unstable` and `--cfg tokio_taskdump`
 # to rustc, or else dependencies will not be enabled, and the docs build will fail.
-rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
+rustc-args = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump", "--cfg", "tokio_uring"]
 
 [package.metadata.playground]
 features = ["full", "test-util"]
index c1855c42aebaffdac6ee72dca9a07020904c4f3e..7e0c35ba84a1ed94d6b517ff92aff70b5b3def98 100644 (file)
@@ -237,6 +237,9 @@ pub use self::metadata::metadata;
 
 mod open_options;
 pub use self::open_options::OpenOptions;
+cfg_tokio_uring! {
+    pub(crate) use self::open_options::UringOpenOptions;
+}
 
 mod read;
 pub use self::read::read;
index e70e6aa0b6f90f1af10a788650f848d533c0d557..c10f4ee48bc8d235f908d9f25a759a7759f7c0b3 100644 (file)
@@ -3,6 +3,12 @@ use crate::fs::{asyncify, File};
 use std::io;
 use std::path::Path;
 
+cfg_tokio_uring! {
+    mod uring_open_options;
+    pub(crate) use uring_open_options::UringOpenOptions;
+    use crate::runtime::driver::op::Op;
+}
+
 #[cfg(test)]
 mod mock_open_options;
 #[cfg(test)]
@@ -79,7 +85,16 @@ use std::os::windows::fs::OpenOptionsExt;
 /// }
 /// ```
 #[derive(Clone, Debug)]
-pub struct OpenOptions(StdOpenOptions);
+pub struct OpenOptions {
+    inner: Kind,
+}
+
+#[derive(Debug, Clone)]
+enum Kind {
+    Std(StdOpenOptions),
+    #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+    Uring(UringOpenOptions),
+}
 
 impl OpenOptions {
     /// Creates a blank new set of options ready for configuration.
@@ -99,7 +114,12 @@ impl OpenOptions {
     /// let future = options.read(true).open("foo.txt");
     /// ```
     pub fn new() -> OpenOptions {
-        OpenOptions(StdOpenOptions::new())
+        #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+        let inner = Kind::Uring(UringOpenOptions::new());
+        #[cfg(not(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux")))]
+        let inner = Kind::Std(StdOpenOptions::new());
+
+        OpenOptions { inner }
     }
 
     /// Sets the option for read access.
@@ -128,7 +148,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn read(&mut self, read: bool) -> &mut OpenOptions {
-        self.0.read(read);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.read(read);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.read(read);
+            }
+        }
         self
     }
 
@@ -158,7 +186,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn write(&mut self, write: bool) -> &mut OpenOptions {
-        self.0.write(write);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.write(write);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.write(write);
+            }
+        }
         self
     }
 
@@ -217,7 +253,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn append(&mut self, append: bool) -> &mut OpenOptions {
-        self.0.append(append);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.append(append);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.append(append);
+            }
+        }
         self
     }
 
@@ -250,7 +294,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
-        self.0.truncate(truncate);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.truncate(truncate);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.truncate(truncate);
+            }
+        }
         self
     }
 
@@ -286,7 +338,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn create(&mut self, create: bool) -> &mut OpenOptions {
-        self.0.create(create);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.create(create);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.create(create);
+            }
+        }
         self
     }
 
@@ -329,7 +389,15 @@ impl OpenOptions {
     /// }
     /// ```
     pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
-        self.0.create_new(create_new);
+        match &mut self.inner {
+            Kind::Std(opts) => {
+                opts.create_new(create_new);
+            }
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                opts.create_new(create_new);
+            }
+        }
         self
     }
 
@@ -366,6 +434,15 @@ impl OpenOptions {
     ///   open files, too long filename, too many symbolic links in the
     ///   specified path (Unix-like systems only), etc.
     ///
+    /// # io_uring support
+    ///
+    /// On Linux, you can also use `io_uring` for executing system calls.
+    /// To enable `io_uring`, you need to specify the `--cfg tokio_uring` flag
+    /// at compile time and set the `Builder::enable_io_uring` runtime option.
+    ///
+    /// Support for `io_uring` is currently experimental, so its behavior may
+    /// change or it may be removed in future versions.
+    ///
     /// # Examples
     ///
     /// ```no_run
@@ -386,17 +463,36 @@ impl OpenOptions {
     /// [`Other`]: std::io::ErrorKind::Other
     /// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied
     pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
+        match &self.inner {
+            Kind::Std(opts) => Self::std_open(opts, path).await,
+            #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+            Kind::Uring(opts) => {
+                let handle = crate::runtime::Handle::current();
+                let driver_handle = handle.inner.driver().io();
+
+                if driver_handle.check_and_init()? {
+                    Op::open(path.as_ref(), opts)?.await
+                } else {
+                    let opts = opts.clone().into();
+                    Self::std_open(&opts, path).await
+                }
+            }
+        }
+    }
+
+    async fn std_open(opts: &StdOpenOptions, path: impl AsRef<Path>) -> io::Result<File> {
         let path = path.as_ref().to_owned();
-        let opts = self.0.clone();
+        let opts = opts.clone();
 
         let std = asyncify(move || opts.open(path)).await?;
         Ok(File::from_std(std))
     }
 
-    /// Returns a mutable reference to the underlying `std::fs::OpenOptions`
-    #[cfg(any(windows, unix))]
+    #[cfg(windows)]
     pub(super) fn as_inner_mut(&mut self) -> &mut StdOpenOptions {
-        &mut self.0
+        match &mut self.inner {
+            Kind::Std(ref mut opts) => opts,
+        }
     }
 }
 
@@ -428,7 +524,15 @@ feature! {
         /// }
         /// ```
         pub fn mode(&mut self, mode: u32) -> &mut OpenOptions {
-            self.as_inner_mut().mode(mode);
+            match &mut self.inner {
+                Kind::Std(opts) => {
+                    opts.mode(mode);
+                }
+                #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+                Kind::Uring(opts) => {
+                    opts.mode(mode);
+                }
+            }
             self
         }
 
@@ -459,7 +563,15 @@ feature! {
         /// }
         /// ```
         pub fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
-            self.as_inner_mut().custom_flags(flags);
+            match &mut self.inner {
+                Kind::Std(opts) => {
+                    opts.custom_flags(flags);
+                }
+                #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+                Kind::Uring(opts) => {
+                    opts.custom_flags(flags);
+                }
+            }
             self
         }
     }
@@ -651,7 +763,13 @@ cfg_windows! {
 
 impl From<StdOpenOptions> for OpenOptions {
     fn from(options: StdOpenOptions) -> OpenOptions {
-        OpenOptions(options)
+        OpenOptions {
+            inner: Kind::Std(options),
+            // TODO: Add support for converting `StdOpenOptions` to `UringOpenOptions`
+            // if user enables the `--cfg tokio_uring`. It is blocked by:
+            // * https://github.com/rust-lang/rust/issues/74943
+            // * https://github.com/rust-lang/rust/issues/76801
+        }
     }
 }
 
diff --git a/tokio/src/fs/open_options/uring_open_options.rs b/tokio/src/fs/open_options/uring_open_options.rs
new file mode 100644 (file)
index 0000000..48297ca
--- /dev/null
@@ -0,0 +1,128 @@
+use std::{io, os::unix::fs::OpenOptionsExt};
+
+#[cfg(test)]
+use super::mock_open_options::MockOpenOptions as StdOpenOptions;
+#[cfg(not(test))]
+use std::fs::OpenOptions as StdOpenOptions;
+
+#[derive(Debug, Clone)]
+pub(crate) struct UringOpenOptions {
+    pub(crate) read: bool,
+    pub(crate) write: bool,
+    pub(crate) append: bool,
+    pub(crate) truncate: bool,
+    pub(crate) create: bool,
+    pub(crate) create_new: bool,
+    pub(crate) mode: libc::mode_t,
+    pub(crate) custom_flags: libc::c_int,
+}
+
+impl UringOpenOptions {
+    pub(crate) fn new() -> Self {
+        Self {
+            read: false,
+            write: false,
+            append: false,
+            truncate: false,
+            create: false,
+            create_new: false,
+            mode: 0o666,
+            custom_flags: 0,
+        }
+    }
+
+    pub(crate) fn append(&mut self, append: bool) -> &mut Self {
+        self.append = append;
+        self
+    }
+
+    pub(crate) fn create(&mut self, create: bool) -> &mut Self {
+        self.create = create;
+        self
+    }
+
+    pub(crate) fn create_new(&mut self, create_new: bool) -> &mut Self {
+        self.create_new = create_new;
+        self
+    }
+
+    pub(crate) fn read(&mut self, read: bool) -> &mut Self {
+        self.read = read;
+        self
+    }
+
+    pub(crate) fn write(&mut self, write: bool) -> &mut Self {
+        self.write = write;
+        self
+    }
+
+    pub(crate) fn truncate(&mut self, truncate: bool) -> &mut Self {
+        self.truncate = truncate;
+        self
+    }
+
+    pub(crate) fn mode(&mut self, mode: u32) -> &mut Self {
+        self.mode = mode as libc::mode_t;
+        self
+    }
+
+    pub(crate) fn custom_flags(&mut self, flags: i32) -> &mut Self {
+        self.custom_flags = flags;
+        self
+    }
+
+    // Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1118-L1127
+    pub(crate) fn access_mode(&self) -> io::Result<libc::c_int> {
+        match (self.read, self.write, self.append) {
+            (true, false, false) => Ok(libc::O_RDONLY),
+            (false, true, false) => Ok(libc::O_WRONLY),
+            (true, true, false) => Ok(libc::O_RDWR),
+            (false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND),
+            (true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND),
+            (false, false, false) => Err(io::Error::from_raw_os_error(libc::EINVAL)),
+        }
+    }
+
+    // Equivalent to https://github.com/rust-lang/rust/blob/64c81fd10509924ca4da5d93d6052a65b75418a5/library/std/src/sys/fs/unix.rs#L1129-L1151
+    pub(crate) fn creation_mode(&self) -> io::Result<libc::c_int> {
+        match (self.write, self.append) {
+            (true, false) => {}
+            (false, false) => {
+                if self.truncate || self.create || self.create_new {
+                    return Err(io::Error::from_raw_os_error(libc::EINVAL));
+                }
+            }
+            (_, true) => {
+                if self.truncate && !self.create_new {
+                    return Err(io::Error::from_raw_os_error(libc::EINVAL));
+                }
+            }
+        }
+
+        Ok(match (self.create, self.truncate, self.create_new) {
+            (false, false, false) => 0,
+            (true, false, false) => libc::O_CREAT,
+            (false, true, false) => libc::O_TRUNC,
+            (true, true, false) => libc::O_CREAT | libc::O_TRUNC,
+            (_, _, true) => libc::O_CREAT | libc::O_EXCL,
+        })
+    }
+}
+
+impl From<UringOpenOptions> for StdOpenOptions {
+    fn from(value: UringOpenOptions) -> Self {
+        let mut std = StdOpenOptions::new();
+
+        std.append(value.append);
+        std.create(value.create);
+        std.create_new(value.create_new);
+        std.read(value.read);
+        std.truncate(value.truncate);
+        std.write(value.write);
+
+        std.mode(value.mode);
+        std.custom_flags(value.custom_flags);
+
+        std
+    }
+}
index bfdd1ccfd1c3c68ab30f344c20084860bddecb4c..763a3fabbf4b7988d32154a47b122cfb5a24f7fd 100644 (file)
@@ -293,3 +293,7 @@ cfg_io_blocking! {
         pub(crate) use crate::blocking::JoinHandle as Blocking;
     }
 }
+
+cfg_tokio_uring! {
+    pub(crate) mod uring;
+}
diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs
new file mode 100644 (file)
index 0000000..e5ac85a
--- /dev/null
@@ -0,0 +1,2 @@
+pub(crate) mod open;
+pub(crate) mod utils;
diff --git a/tokio/src/io/uring/open.rs b/tokio/src/io/uring/open.rs
new file mode 100644 (file)
index 0000000..68f434f
--- /dev/null
@@ -0,0 +1,53 @@
+use super::utils::cstr;
+use crate::{
+    fs::UringOpenOptions,
+    runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op},
+};
+use io_uring::{opcode, types};
+use std::{ffi::CString, io, os::fd::FromRawFd, path::Path};
+
+#[derive(Debug)]
+pub(crate) struct Open {
+    /// This field will be read by the kernel during the operation, so we
+    /// need to ensure it is valid for the entire duration of the operation.
+    #[allow(dead_code)]
+    path: CString,
+}
+
+impl Completable for Open {
+    type Output = crate::fs::File;
+    fn complete(self, cqe: CqeResult) -> io::Result<Self::Output> {
+        let fd = cqe.result? as i32;
+        let file = unsafe { crate::fs::File::from_raw_fd(fd) };
+        Ok(file)
+    }
+}
+
+impl Cancellable for Open {
+    fn cancel(self) -> CancelData {
+        CancelData::Open(self)
+    }
+}
+
+impl Op<Open> {
+    /// Submit a request to open a file.
+    pub(crate) fn open(path: &Path, options: &UringOpenOptions) -> io::Result<Op<Open>> {
+        let inner_opt = options;
+        let path = cstr(path)?;
+
+        let custom_flags = inner_opt.custom_flags;
+        let flags = libc::O_CLOEXEC
+            | options.access_mode()?
+            | options.creation_mode()?
+            | (custom_flags & !libc::O_ACCMODE);
+
+        let open_op = opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), path.as_ptr())
+            .flags(flags)
+            .mode(inner_opt.mode)
+            .build();
+
+        // SAFETY: Parameters are valid for the entire duration of the operation
+        let op = unsafe { Op::new(open_op, Open { path }) };
+        Ok(op)
+    }
+}
diff --git a/tokio/src/io/uring/utils.rs b/tokio/src/io/uring/utils.rs
new file mode 100644 (file)
index 0000000..e30e7a5
--- /dev/null
@@ -0,0 +1,6 @@
+use std::os::unix::ffi::OsStrExt;
+use std::{ffi::CString, io, path::Path};
+
+pub(crate) fn cstr(p: &Path) -> io::Result<CString> {
+    Ok(CString::new(p.as_os_str().as_bytes())?)
+}
index a0207b3a04618564950acf9393472c4b579e32a3..76800296a2784475561780d66f2ab22a9b39746e 100644 (file)
@@ -338,6 +338,10 @@ impl Builder {
             all(unix, feature = "signal")
         ))]
         self.enable_io();
+
+        #[cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux",))]
+        self.enable_io_uring();
+
         #[cfg(feature = "time")]
         self.enable_time();
 
@@ -1578,6 +1582,31 @@ cfg_time! {
     }
 }
 
+cfg_tokio_uring! {
+    impl Builder {
+        /// Enables the tokio's io_uring driver.
+        ///
+        /// Doing this enables using io_uring operations on the runtime.
+        ///
+        /// # Examples
+        ///
+        /// ```
+        /// use tokio::runtime;
+        ///
+        /// let rt = runtime::Builder::new_multi_thread()
+        ///     .enable_io_uring()
+        ///     .build()
+        ///     .unwrap();
+        /// ```
+        #[cfg_attr(docsrs, doc(cfg(tokio_uring)))]
+        pub fn enable_io_uring(&mut self) -> &mut Self {
+            // Currently, the uring flag is equivalent to `enable_io`.
+            self.enable_io = true;
+            self
+        }
+    }
+}
+
 cfg_test_util! {
     impl Builder {
         /// Controls if the runtime's clock starts paused or advancing.
index 40a135d744bc4f51fa2756823de335bba192eed0..94afe163a134d98c43e2cc888b2e166533c16a27 100644 (file)
@@ -1,3 +1,4 @@
+use crate::io::uring::open::Open;
 use crate::runtime::Handle;
 use io_uring::cqueue;
 use io_uring::squeue::Entry;
@@ -9,7 +10,13 @@ use std::task::Waker;
 use std::{io, mem};
 
 #[derive(Debug)]
-pub(crate) enum CancelData {}
+pub(crate) enum CancelData {
+    Open(
+        // This field isn't accessed directly, but it holds cancellation data,
+        // so `#[allow(dead_code)]` is needed.
+        #[allow(dead_code)] Open,
+    ),
+}
 
 #[derive(Debug)]
 pub(crate) enum Lifecycle {
@@ -21,14 +28,17 @@ pub(crate) enum Lifecycle {
 
     /// The submitter no longer has interest in the operation result. The state
     /// must be passed to the driver and held until the operation completes.
-    Cancelled(CancelData),
+    Cancelled(
+        // This field isn't accessed directly, but it holds cancellation data,
+        // so `#[allow(dead_code)]` is needed.
+        #[allow(dead_code)] CancelData,
+    ),
 
     /// The operation has completed with a single cqe result
     Completed(io_uring::cqueue::Entry),
 }
 
 pub(crate) enum State {
-    #[allow(dead_code)]
     Initialize(Option<Entry>),
     Polled(usize),
     Complete,
@@ -48,7 +58,6 @@ impl<T: Cancellable> Op<T> {
     ///
     /// Callers must ensure that parameters of the entry (such as buffer) are valid and will
     /// be valid for the entire duration of the operation, otherwise it may cause memory problems.
-    #[allow(dead_code)]
     pub(crate) unsafe fn new(entry: Entry, data: T) -> Self {
         let handle = Handle::current();
         Self {
@@ -82,7 +91,6 @@ impl<T: Cancellable> Drop for Op<T> {
 
 /// A single CQE result
 pub(crate) struct CqeResult {
-    #[allow(dead_code)]
     pub(crate) result: io::Result<u32>,
 }
 
index fc4a098bafd79ea7752db04ca8cc7748000407de..2df69cbbebf166c5aaefe581d2ae2e1f6f4c9da3 100644 (file)
@@ -147,26 +147,12 @@ impl Drop for UringContext {
             self.submit().expect("Internal error when dropping driver");
         }
 
-        let mut cancel_ops = Slab::new();
-        let mut keys_to_move = Vec::new();
-
-        for (key, lifecycle) in self.ops.iter() {
-            match lifecycle {
-                Lifecycle::Waiting(_) | Lifecycle::Submitted | Lifecycle::Cancelled(_) => {
-                    // these should be cancelled
-                    keys_to_move.push(key);
-                }
-                // We don't wait for completed ops.
-                Lifecycle::Completed(_) => {}
-            }
-        }
+        let mut ops = std::mem::take(&mut self.ops);
 
-        for key in keys_to_move {
-            let lifecycle = self.remove_op(key);
-            cancel_ops.insert(lifecycle);
-        }
+        // Remove all completed ops since we don't need to wait for them.
+        ops.retain(|_, lifecycle| !matches!(lifecycle, Lifecycle::Completed(_)));
 
-        while !cancel_ops.is_empty() {
+        while !ops.is_empty() {
             // Wait until at least one completion is available.
             self.ring_mut()
                 .submit_and_wait(1)
@@ -174,14 +160,13 @@ impl Drop for UringContext {
 
             for cqe in self.ring_mut().completion() {
                 let idx = cqe.user_data() as usize;
-                cancel_ops.remove(idx);
+                ops.remove(idx);
             }
         }
     }
 }
 
 impl Handle {
-    #[allow(dead_code)]
     fn add_uring_source(&self, uringfd: RawFd) -> io::Result<()> {
         let mut source = SourceFd(&uringfd);
         self.registry
@@ -265,14 +250,17 @@ impl Handle {
             submit_or_remove(ctx)?;
         }
 
+        // Ensure that the completion queue is not full before submitting the entry.
+        while ctx.ring_mut().completion().is_full() {
+            ctx.dispatch_completions();
+        }
+
         // Note: For now, we submit the entry immediately without utilizing batching.
         submit_or_remove(ctx)?;
 
         Ok(index)
     }
 
-    // TODO: Remove this annotation when operations are actually supported
-    #[allow(unused_variables, unreachable_code)]
     pub(crate) fn cancel_op<T: Cancellable>(&self, index: usize, data: Option<T>) {
         let mut guard = self.get_uring().lock();
         let ctx = &mut *guard;
index 78a0114f48e853df5d0b573c39676f1003ee4ba7..031fde5d0b0fd77dd45539c2398c3e5882e834a4 100644 (file)
@@ -323,7 +323,7 @@ pub(crate) mod context;
 
 pub(crate) mod park;
 
-mod driver;
+pub(crate) mod driver;
 
 pub(crate) mod scheduler;
 
index 58d7de647e2e4f5ae55cdb5ee2fcc88331ab7baa..58982d679df007784338480e9419fe75711cf995 100644 (file)
@@ -58,7 +58,7 @@ async fn open_options_mode() {
     let mode = format!("{:?}", OpenOptions::new().mode(0o644));
     // TESTING HACK: use Debug output to check the stored data
     assert!(
-        mode.contains("mode: 420 ") || mode.contains("mode: 0o000644 "),
+        mode.contains("mode: 420") || mode.contains("mode: 0o000644"),
         "mode is: {mode}"
     );
 }
@@ -69,7 +69,7 @@ async fn open_options_custom_flags_linux() {
     // TESTING HACK: use Debug output to check the stored data
     assert!(
         format!("{:?}", OpenOptions::new().custom_flags(libc::O_TRUNC))
-            .contains("custom_flags: 512,")
+            .contains("custom_flags: 512")
     );
 }
 
diff --git a/tokio/tests/fs_uring.rs b/tokio/tests/fs_uring.rs
new file mode 100644 (file)
index 0000000..04ec62c
--- /dev/null
@@ -0,0 +1,151 @@
+//! Uring file operations tests.
+
+#![cfg(all(tokio_uring, feature = "rt", feature = "fs", target_os = "linux"))]
+
+use futures::future::FutureExt;
+use std::sync::mpsc;
+use std::task::Poll;
+use std::time::Duration;
+use std::{future::poll_fn, path::PathBuf};
+use tempfile::NamedTempFile;
+use tokio::{
+    fs::OpenOptions,
+    runtime::{Builder, Runtime},
+};
+use tokio_util::task::TaskTracker;
+
+fn multi_rt(n: usize) -> Box<dyn Fn() -> Runtime> {
+    Box::new(move || {
+        Builder::new_multi_thread()
+            .worker_threads(n)
+            .enable_all()
+            .build()
+            .unwrap()
+    })
+}
+
+fn current_rt() -> Box<dyn Fn() -> Runtime> {
+    Box::new(|| Builder::new_current_thread().enable_all().build().unwrap())
+}
+
+fn rt_combinations() -> Vec<Box<dyn Fn() -> Runtime>> {
+    vec![
+        current_rt(),
+        multi_rt(1),
+        multi_rt(2),
+        multi_rt(8),
+        multi_rt(64),
+        multi_rt(256),
+    ]
+}
+
+#[test]
+fn shutdown_runtime_while_performing_io_uring_ops() {
+    fn run(rt: Runtime) {
+        let (tx, rx) = mpsc::channel();
+        let (done_tx, done_rx) = mpsc::channel();
+
+        let (_tmp, path) = create_tmp_files(1);
+        rt.spawn(async move {
+            let path = path[0].clone();
+
+            // spawning a bunch of uring operations.
+            loop {
+                let path = path.clone();
+                tokio::spawn(async move {
+                    let mut opt = OpenOptions::new();
+                    opt.read(true);
+                    opt.open(&path).await.unwrap();
+                });
+
+                // Avoid busy looping.
+                tokio::task::yield_now().await;
+            }
+        });
+
+        std::thread::spawn(move || {
+            let rt: Runtime = rx.recv().unwrap();
+            rt.shutdown_timeout(Duration::from_millis(300));
+            done_tx.send(()).unwrap();
+        });
+
+        tx.send(rt).unwrap();
+        done_rx.recv().unwrap();
+    }
+
+    for rt in rt_combinations() {
+        run(rt());
+    }
+}
+
+#[test]
+fn open_many_files() {
+    fn run(rt: Runtime) {
+        const NUM_FILES: usize = 512;
+
+        let (_tmp_files, paths): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(NUM_FILES);
+
+        rt.block_on(async move {
+            let tracker = TaskTracker::new();
+
+            for i in 0..10_000 {
+                let path = paths.get(i % NUM_FILES).unwrap().clone();
+                tracker.spawn(async move {
+                    let _file = OpenOptions::new().read(true).open(path).await.unwrap();
+                });
+            }
+            tracker.close();
+            tracker.wait().await;
+        });
+    }
+
+    for rt in rt_combinations() {
+        run(rt());
+    }
+}
+
+#[tokio::test]
+async fn cancel_op_future() {
+    let (_tmp_file, path): (Vec<NamedTempFile>, Vec<PathBuf>) = create_tmp_files(1);
+
+    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
+    let handle = tokio::spawn(async move {
+        poll_fn(|cx| {
+            let opt = {
+                let mut opt = tokio::fs::OpenOptions::new();
+                opt.read(true);
+                opt
+            };
+
+            let fut = opt.open(&path[0]);
+
+            // If io_uring is enabled (and not falling back to the thread pool),
+            // the first poll should return Pending.
+            let _pending = Box::pin(fut).poll_unpin(cx);
+
+            tx.send(()).unwrap();
+
+            Poll::<()>::Pending
+        })
+        .await;
+    });
+
+    // Wait for the first poll
+    rx.recv().await.unwrap();
+
+    handle.abort();
+
+    let res = handle.await.unwrap_err();
+    assert!(res.is_cancelled());
+}
+
+fn create_tmp_files(num_files: usize) -> (Vec<NamedTempFile>, Vec<PathBuf>) {
+    let mut files = Vec::with_capacity(num_files);
+    for _ in 0..num_files {
+        let tmp = NamedTempFile::new().unwrap();
+        let path = tmp.path().to_path_buf();
+        files.push((tmp, path));
+    }
+
+    files.into_iter().unzip()
+}