Skip to content

Commit

Permalink
Support shared futures on no_std (#2868)
Browse files Browse the repository at this point in the history
  • Loading branch information
adavis628 authored Jan 16, 2025
1 parent bbaa0e3 commit 951d353
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 16 deletions.
7 changes: 4 additions & 3 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Common utilities and extension traits for the futures-rs library.

[features]
default = ["std", "async-await", "async-await-macro"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
alloc = ["futures-core/alloc", "futures-task/alloc"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab/std"]
alloc = ["futures-core/alloc", "futures-task/alloc", "slab"]
async-await = []
async-await-macro = ["async-await", "futures-macro"]
compat = ["std", "futures_01"]
Expand All @@ -37,12 +37,13 @@ futures-channel = { path = "../futures-channel", version = "=0.4.0-alpha.0", def
futures-io = { path = "../futures-io", version = "0.3.31", default-features = false, features = ["std"], optional = true }
futures-sink = { path = "../futures-sink", version = "=0.4.0-alpha.0", default-features = false, optional = true }
futures-macro = { path = "../futures-macro", version = "=0.4.0-alpha.0", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
slab = { version = "0.4.2", default-features = false, optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project-lite = "0.2.6"
spin = { version = "0.9.8", optional = true }

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/future/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ mod remote_handle;
#[cfg(feature = "std")]
pub use self::remote_handle::{Remote, RemoteHandle};

#[cfg(feature = "std")]
#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))]
mod shared;
#[cfg(feature = "std")]
#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))]
pub use self::shared::{Shared, WeakShared};

impl<T: ?Sized> FutureExt for T where T: Future {}
Expand Down Expand Up @@ -440,7 +440,7 @@ pub trait FutureExt: Future {
/// into a cloneable future. It enables a future to be polled by multiple
/// threads.
///
/// This method is only available when the `std` feature of this
/// This method is only available when the `std` or 'spin' feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
Expand Down Expand Up @@ -474,7 +474,7 @@ pub trait FutureExt: Future {
/// join_handle.join().unwrap();
/// # });
/// ```
#[cfg(feature = "std")]
#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))]
fn shared(self) -> Shared<Self>
where
Self: Sized,
Expand Down
37 changes: 29 additions & 8 deletions futures-util/src/future/future/shared.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::task::{waker_ref, ArcWake};
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::fmt;
use core::hash::Hasher;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, SeqCst};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
use std::hash::Hasher;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex, Weak};

#[cfg(feature = "std")]
type Mutex<T> = std::sync::Mutex<T>;
#[cfg(not(feature = "std"))]
type Mutex<T> = spin::Mutex<T>;

/// Future for the [`shared`](super::FutureExt::shared) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand Down Expand Up @@ -204,7 +209,10 @@ where
{
/// Registers the current task to receive a wakeup when we are awoken.
fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
#[cfg(feature = "std")]
let mut wakers_guard = self.notifier.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let mut wakers_guard = self.notifier.wakers.lock();

let wakers_mut = wakers_guard.as_mut();

Expand Down Expand Up @@ -345,7 +353,11 @@ where
inner.notifier.state.store(COMPLETE, SeqCst);

// Wake all tasks and drop the slab
#[cfg(feature = "std")]
let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let mut wakers_guard = inner.notifier.wakers.lock();

let mut wakers = wakers_guard.take().unwrap();
for waker in wakers.drain().flatten() {
waker.wake();
Expand Down Expand Up @@ -375,19 +387,28 @@ where
fn drop(&mut self) {
if self.waker_key != NULL_WAKER_KEY {
if let Some(ref inner) = self.inner {
#[cfg(feature = "std")]
if let Ok(mut wakers) = inner.notifier.wakers.lock() {
if let Some(wakers) = wakers.as_mut() {
wakers.remove(self.waker_key);
}
}
#[cfg(not(feature = "std"))]
if let Some(wakers) = inner.notifier.wakers.lock().as_mut() {
wakers.remove(self.waker_key);
}
}
}
}
}

impl ArcWake for Notifier {
fn wake_by_ref(arc_self: &Arc<Self>) {
#[cfg(feature = "std")]
let wakers = &mut *arc_self.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let wakers = &mut *arc_self.wakers.lock();

if let Some(wakers) = wakers.as_mut() {
for (_key, opt_waker) in wakers {
if let Some(waker) = opt_waker.take() {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use self::future::CatchUnwind;
#[cfg(feature = "std")]
pub use self::future::{Remote, RemoteHandle};

#[cfg(feature = "std")]
#[cfg(any(feature = "std", all(feature = "alloc", feature = "spin")))]
pub use self::future::{Shared, WeakShared};

mod try_future;
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ compat = ["std", "futures-util/compat"]
io-compat = ["compat", "futures-util/io-compat"]
executor = ["std", "futures-executor/std"]
thread-pool = ["executor", "futures-executor/thread-pool"]
spin = ["futures-util/spin"]

# Unstable features
# These features are outside of the normal semver guarantees and require the
Expand Down

0 comments on commit 951d353

Please sign in to comment.