Fixed AsyncSignal sometimes polling the future after being finished.

`AsyncSignal` and `LoadableSignal` no longer support mutable access.

This change fixes the mentioned bug and allows us to be more efficient by removing locks.
This commit is contained in:
Filipe Rodrigues 2024-12-22 18:25:13 +00:00
parent 95103a9ad9
commit 575dc5be17
Signed by: zenithsiz
SSH Key Fingerprint: SHA256:Mb5ppb3Sh7IarBO/sBTXLHbYEOz37hJAlslLQPPAPaU
3 changed files with 76 additions and 197 deletions

View File

@ -3,12 +3,8 @@
// Imports
use {
crate::Loadable,
core::{
fmt,
future::Future,
ops::{Deref, DerefMut},
},
dynatos_reactive::{async_signal, AsyncSignal, SignalBorrow, SignalBorrowMut, SignalUpdate, SignalWith},
core::{fmt, future::Future, ops::Deref},
dynatos_reactive::{AsyncSignal, SignalBorrow, SignalWith},
};
/// Loadable signal.
@ -57,7 +53,7 @@ impl<F: Future<Output = Result<T, E>>, T, E> LoadableSignal<F> {
E: Clone + 'static,
{
let value = self.inner.load().await;
match &*value {
match value {
Ok(_) => Ok(BorrowRef(value)),
Err(err) => Err(err.clone()),
}
@ -71,7 +67,7 @@ impl<F: Future<Output = Result<T, E>>, T, E> LoadableSignal<F> {
{
let borrow = self.inner.borrow_suspended();
match borrow {
Some(borrow) => match &*borrow {
Some(borrow) => match borrow {
Ok(_) => Loadable::Loaded(BorrowRef(borrow)),
Err(err) => Loadable::Err(err.clone()),
},
@ -111,7 +107,7 @@ where
/// Reference type for [`SignalBorrow`] impl
#[derive(Debug)]
pub struct BorrowRef<'a, T, E>(async_signal::BorrowRef<'a, Result<T, E>>);
pub struct BorrowRef<'a, T, E>(&'a Result<T, E>);
impl<T, E> Deref for BorrowRef<'_, T, E> {
type Target = T;
@ -133,7 +129,7 @@ impl<F: Future<Output = Result<T, E>> + 'static, T: 'static, E: Clone + 'static>
fn borrow(&self) -> Self::Ref<'_> {
let borrow = self.inner.borrow();
match borrow {
Some(borrow) => match &*borrow {
Some(borrow) => match borrow {
Ok(_) => Loadable::Loaded(BorrowRef(borrow)),
Err(err) => Loadable::Err(err.clone()),
},
@ -154,62 +150,3 @@ impl<F: Future<Output = Result<T, E>> + 'static, T: 'static, E: Clone + 'static>
f(value.as_deref())
}
}
/// Reference type for [`SignalBorrowMut`] impl
#[derive(Debug)]
pub struct BorrowRefMut<'a, T, E>(async_signal::BorrowRefMut<'a, Result<T, E>>);
impl<T, E> Deref for BorrowRefMut<'_, T, E> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0
.as_ref()
.unwrap_or_else(|_| panic!("Loadable should not be an error"))
}
}
impl<T, E> DerefMut for BorrowRefMut<'_, T, E> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
.as_mut()
.unwrap_or_else(|_| panic!("Loadable should not be an error"))
}
}
impl<F: Future<Output = Result<T, E>>, T: 'static, E: Clone + 'static> SignalBorrowMut for LoadableSignal<F> {
type RefMut<'a>
= Loadable<BorrowRefMut<'a, T, E>, E>
where
Self: 'a;
#[track_caller]
fn borrow_mut(&self) -> Self::RefMut<'_> {
let borrow = self.inner.borrow_mut();
match borrow {
Some(borrow) => match &*borrow {
Ok(_) => Loadable::Loaded(BorrowRefMut(borrow)),
Err(err) => Loadable::Err(err.clone()),
},
None => Loadable::Empty,
}
}
}
/// Updates the value within the loadable signal.
impl<F: Future<Output = Result<T, E>>, T: 'static, E: Clone + 'static> SignalUpdate for LoadableSignal<F>
where
F::Output: 'static,
{
type Value<'a> = Loadable<&'a mut T, E>;
#[track_caller]
fn update<F2, O>(&self, f: F2) -> O
where
F2: for<'a> FnOnce(Self::Value<'a>) -> O,
{
// Note: No need to check if we're suspended, `borrow_mut` doesn't poll
let mut value = self.borrow_mut();
f(value.as_deref_mut())
}
}

View File

@ -3,21 +3,26 @@
// TODO: Support wakers that wake from a separate thread
// by using some runtime and a channel.
// TODO: Trigger whenever we finish loading the future, not just when
// the waker wakes.
// Imports
#[cfg(not(feature = "sync"))]
use std::thread::{self, ThreadId};
use {
crate::{signal, SignalBorrow, SignalBorrowMut, SignalUpdate, SignalWith, Trigger},
crate::{SignalBorrow, SignalWith, Trigger},
core::{
fmt,
future::{self, Future},
ops::{Deref, DerefMut},
pin::Pin,
sync::atomic::{self, AtomicBool},
task::{self, Poll},
},
dynatos_reactive_sync::{IMut, IMutExt, IMutRef, IMutRefMut, Rc},
std::{sync::Arc, task::Wake},
dynatos_reactive_sync::{IMut, IMutExt, Rc},
std::{
sync::{Arc, OnceLock},
task::Wake,
},
};
/// Waker
@ -63,8 +68,8 @@ struct Inner<F: Future> {
/// Future
///
/// SAFETY:
/// Must not be moved out until we're dropped.
fut: IMut<F>,
/// Must not be moved out until it's finished.
fut: IMut<Option<F>>,
/// Waker
waker: Arc<Waker>,
@ -73,7 +78,7 @@ struct Inner<F: Future> {
is_suspended: AtomicBool,
/// Value
value: IMut<Option<F::Output>>,
value: OnceLock<F::Output>,
}
/// Async signal.
@ -103,14 +108,14 @@ impl<F: Future> AsyncSignal<F> {
#[track_caller]
fn new_inner(fut: F, is_suspended: bool) -> Self {
let inner = Rc::pin(Inner {
fut: IMut::new(fut),
fut: IMut::new(Some(fut)),
waker: Arc::new(Waker {
#[cfg(not(feature = "sync"))]
thread: thread::current().id(),
trigger: Trigger::new(),
}),
is_suspended: AtomicBool::new(is_suspended),
value: IMut::new(None),
value: OnceLock::new(),
});
Self { inner }
}
@ -127,37 +132,54 @@ impl<F: Future> AsyncSignal<F> {
}
/// Loads this value asynchronously and returns the value
pub async fn load(&self) -> BorrowRef<'_, F::Output> {
// Poll until we're loaded
future::poll_fn(|cx| {
// Get the inner future through pin projection.
let mut fut = self.inner.fut.imut_write();
// SAFETY: We guarantee that the future is not moved until it's dropped.
let mut fut = unsafe { Pin::new_unchecked(&mut *fut) };
// Then poll it, and store the value if finished.
let new_value = task::ready!(fut.as_mut().poll(cx));
*self.inner.value.imut_write() = Some(new_value);
Poll::Ready(())
})
.await;
// Then borrow
pub async fn load(&self) -> &'_ F::Output {
// Gather subcribers before polling
// TODO: Is this correct? We should probably be gathering by task,
// instead of by thread.
self.inner.waker.trigger.gather_subscribers();
let borrow = self.inner.value.imut_read();
BorrowRef(borrow)
// Poll until we're loaded
future::poll_fn(|cx| match self.try_load(cx) {
Some(value) => Poll::Ready(value),
None => Poll::Pending,
})
.await
}
/// Inner function to try to load the future
fn try_load(&self, cx: &mut task::Context<'_>) -> Option<&F::Output> {
self.inner
.value
.get_or_try_init(|| {
// Get the inner future through pin projection.
let mut inner_fut = self.inner.fut.imut_write();
let fut = inner_fut
.as_mut()
.expect("Future was missing without value being initialized");
// SAFETY: We guarantee that the future is not moved until it's finished.
let mut fut = unsafe { Pin::new_unchecked(&mut *fut) };
// Then poll it
match fut.as_mut().poll(cx) {
Poll::Ready(value) => {
// Drop the future once we load it
let _: Option<F> = inner_fut.take();
Ok(value)
},
Poll::Pending => Err(()),
}
})
.ok()
}
/// Borrows the inner value, without polling the future.
#[must_use]
#[track_caller]
pub fn borrow_suspended(&self) -> Option<BorrowRef<'_, F::Output>> {
pub fn borrow_suspended(&self) -> Option<&'_ F::Output> {
self.inner.waker.trigger.gather_subscribers();
let borrow = self.inner.value.imut_read();
borrow.is_some().then(|| BorrowRef(borrow))
self.inner.value.get()
}
/// Uses the inner value, without polling the future.
@ -167,7 +189,7 @@ impl<F: Future> AsyncSignal<F> {
F2: for<'a> FnOnce(Option<&'a F::Output>) -> O,
{
let borrow = self.borrow_suspended();
f(borrow.as_deref())
f(borrow)
}
}
@ -184,49 +206,31 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let value = self.inner.value.imut_read();
let value = self.inner.value.get();
f.debug_struct("AsyncSignal").field("value", &value).finish()
}
}
/// Reference type for [`SignalBorrow`] impl
#[derive(Debug)]
pub struct BorrowRef<'a, T>(IMutRef<'a, Option<T>>);
impl<T> Deref for BorrowRef<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("Value wasn't initialized")
}
}
impl<F: Future> SignalBorrow for AsyncSignal<F> {
type Ref<'a>
= Option<BorrowRef<'a, F::Output>>
= Option<&'a F::Output>
where
Self: 'a;
#[track_caller]
fn borrow(&self) -> Self::Ref<'_> {
// Try to poll the future, if we're not suspended and don't have a value yet
// TODO: Is it fine to not keep the value locked throughout the poll?
if !self.is_suspended() && self.inner.value.imut_read().is_none() {
// Get the inner future through pin projection.
let mut fut = self.inner.fut.imut_write();
self.inner.waker.trigger.gather_subscribers();
// SAFETY: We guarantee that the future is not moved until it's dropped.
let mut fut = unsafe { Pin::new_unchecked(&mut *fut) };
// Then poll it, and store the value if finished.
let waker = task::Waker::from(Arc::clone(&self.inner.waker));
let mut cx = task::Context::from_waker(&waker);
if let Poll::Ready(new_value) = fut.as_mut().poll(&mut cx) {
*self.inner.value.imut_write() = Some(new_value);
}
// If we're suspended, don't poll
if self.is_suspended() {
return self.inner.value.get();
}
self.borrow_suspended()
// Otherwise, try to load it
self.inner.waker.trigger.gather_subscribers();
let waker = task::Waker::from(Arc::clone(&self.inner.waker));
let mut cx = task::Context::from_waker(&waker);
self.try_load(&mut cx)
}
}
@ -247,69 +251,6 @@ where
}
let value = self.borrow();
f(value.as_deref())
}
}
/// Reference type for [`SignalBorrowMut`] impl
#[derive(Debug)]
pub struct BorrowRefMut<'a, T> {
/// Value
value: IMutRefMut<'a, Option<T>>,
/// Trigger on drop
// Note: Must be dropped *after* `value`.
_trigger_on_drop: signal::TriggerOnDrop<'a>,
}
impl<T> Deref for BorrowRefMut<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.value.as_ref().expect("Value wasn't initialized")
}
}
impl<T> DerefMut for BorrowRefMut<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.value.as_mut().expect("Value wasn't initialized")
}
}
impl<F: Future> SignalBorrowMut for AsyncSignal<F> {
type RefMut<'a>
= Option<BorrowRefMut<'a, F::Output>>
where
Self: 'a;
#[track_caller]
fn borrow_mut(&self) -> Self::RefMut<'_> {
// Note: No need to check if we're suspended, since we doesn't poll here
let value = self.inner.value.imut_write();
value.is_some().then(|| BorrowRefMut {
value,
_trigger_on_drop: signal::TriggerOnDrop(&self.inner.waker.trigger),
})
}
}
/// Updates the value within the async signal.
///
/// Does not poll the inner future, and does not allow
/// early initializing the signal.
impl<F: Future> SignalUpdate for AsyncSignal<F>
where
F::Output: 'static,
{
type Value<'a> = Option<&'a mut F::Output>;
#[track_caller]
fn update<F2, O>(&self, f: F2) -> O
where
F2: for<'a> FnOnce(Self::Value<'a>) -> O,
{
let mut value = self.borrow_mut();
f(value.as_deref_mut())
f(value)
}
}

View File

@ -9,7 +9,8 @@
test,
thread_local,
cfg_match,
trait_alias
trait_alias,
once_cell_try
)]
// Modules