diff --git a/Cargo.lock b/Cargo.lock index 0735167..edb8c4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,17 @@ dependencies = [ "libloading", ] +[[package]] +name = "async-channel" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "atomic_refcell" version = "0.1.8" @@ -190,6 +201,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "cache-padded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" + [[package]] name = "calloop" version = "0.9.3" @@ -321,6 +338,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "copyless" version = "0.1.5" @@ -656,6 +682,12 @@ dependencies = [ "egui", ] +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + [[package]] name = "fern" version = "0.6.0" @@ -693,6 +725,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + [[package]] name = "fxhash" version = "0.2.1" @@ -2165,6 +2203,7 @@ name = "zsw" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "bytemuck", "cgmath", "chrono", diff --git a/zsw/Cargo.toml b/zsw/Cargo.toml index 1847944..f13c0ad 100644 --- a/zsw/Cargo.toml +++ b/zsw/Cargo.toml @@ -22,6 +22,7 @@ egui_winit_platform = "0.13.0" epi = "0.16.0" # Async +async-channel = "1.6.1" pollster = "0.2.4" # X11 diff --git a/zsw/src/app.rs b/zsw/src/app.rs index dd60fa6..a76983b 100644 --- a/zsw/src/app.rs +++ b/zsw/src/app.rs @@ -25,7 +25,13 @@ use { Wgpu, }, anyhow::Context, - std::{iter, num::NonZeroUsize, thread, time::Duration}, + std::{ + iter, + num::NonZeroUsize, + sync::{atomic, atomic::AtomicBool}, + thread, + time::Duration, + }, winit::{ dpi::{PhysicalPosition, PhysicalSize}, event_loop::EventLoop, @@ -84,6 +90,9 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> { // Create the settings window let settings_window = SettingsWindow::new(); + // If the `!` futures should quit. + let never_futures_should_quit = AtomicBool::new(false); + // Start all threads and then wait in the main thread for events // Note: The outer result of `scope` can't be `Err` due to a panic in // another thread, since we manually join all threads at the end. @@ -92,27 +101,34 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> { let mut thread_spawner = util::ThreadSpawner::new(s); // Spawn the playlist loader thread - thread_spawner.spawn_scoped("Playlist loader", || { + thread_spawner.spawn("Playlist loader", || { playlist.add_dir(&args.images_dir); Ok(()) })?; // Spawn the playlist thread - // DEADLOCK: We call `Playlist::stop` at the end - thread_spawner.spawn_scoped("Playlist", || { - playlist.run(); - Ok(()) - })?; + // DEADLOCK: We set `never_futures_should_quit` to `true` at the end, before joining + thread_spawner.spawn( + "Playlist", + util::never_fut_thread_fn(&never_futures_should_quit, Ok(()), playlist.run()), + )?; // Spawn all image loaders - // DEADLOCK: We call `ImageLoader::stop` at the end. + // DEADLOCK: We set `never_futures_should_quit` to `true` at the end, before joining let loader_threads = thread::available_parallelism().map_or(1, NonZeroUsize::get); - let loader_fns = iter::repeat(|| image_loader.run(&playlist).allow::()).take(loader_threads); - thread_spawner.spawn_scoped_multiple("Image loader", loader_fns)?; + let loader_fns = iter::from_fn(|| { + Some(util::never_fut_thread_fn( + &never_futures_should_quit, + Ok(()), + image_loader.run(&playlist), + )) + }) + .take(loader_threads); + thread_spawner.spawn_multiple("Image loader", loader_fns)?; // Spawn the settings window thread // DEADLOCK: We call `SettingsWindow::run` at the end - thread_spawner.spawn_scoped("Settings window", || { + thread_spawner.spawn("Settings window", || { settings_window .run(&wgpu, &egui, &window, &panels, &playlist) .allow::(); @@ -122,7 +138,7 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> { // Spawn the renderer thread // DEADLOCK: We call `Renderer::stop` at the end // We make sure `SettingsWindow::run` runs eventually - thread_spawner.spawn_scoped("Renderer", || { + thread_spawner.spawn("Renderer", || { renderer .run(&window, &wgpu, &panels, &egui, &image_loader, &settings_window) .allow::(); @@ -146,12 +162,8 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> { // Note: As `stop` doesn't block, order doesn't matter. renderer.stop(); settings_window.stop(); - for _ in 0..loader_threads { - image_loader.stop(); - } - playlist.stop(); - // Join all thread + never_futures_should_quit.store(true, atomic::Ordering::Relaxed); thread_spawner.join_all().context("Unable to join all threads") }) .map_err(|err| anyhow::anyhow!("Unable to start/join all threads: {err:?}"))? diff --git a/zsw/src/img/loader.rs b/zsw/src/img/loader.rs index 4117ae2..60461ac 100644 --- a/zsw/src/img/loader.rs +++ b/zsw/src/img/loader.rs @@ -9,78 +9,34 @@ mod load; // Imports use { super::Image, - crate::{ - util::{ - extse::{CrossBeamChannelReceiverSE, CrossBeamChannelSelectSE, CrossBeamChannelSenderSE}, - MightBlock, - }, - Playlist, - PlaylistImage, - }, - zsw_side_effect_macros::side_effect, + crate::{Playlist, PlaylistImage}, }; /// Image loader #[derive(Debug)] pub struct ImageLoader { /// Image sender - image_tx: crossbeam::channel::Sender, + image_tx: async_channel::Sender, /// Image receiver - image_rx: crossbeam::channel::Receiver, - - /// Closing sender - close_tx: crossbeam::channel::Sender<()>, - - /// Closing receiver - close_rx: crossbeam::channel::Receiver<()>, + image_rx: async_channel::Receiver, } impl ImageLoader { /// Creates a new image loader. #[must_use] pub fn new() -> Self { - // Note: Making the close channel unbounded is what allows us to not block - // in `Self::stop`. - let (image_tx, image_rx) = crossbeam::channel::bounded(0); - let (close_tx, close_rx) = crossbeam::channel::unbounded(); + let (image_tx, image_rx) = async_channel::bounded(1); - Self { - image_tx, - image_rx, - close_tx, - close_rx, - } + Self { image_tx, image_rx } } /// Runs this image loader /// /// Multiple image loaders may run at the same time - /// - /// # Blocking - /// Will block in it's own event loop until [`Self::close`] is called. - #[allow(clippy::useless_transmute)] // `crossbeam::select` does it - #[side_effect(MightBlock)] - pub fn run(&self, playlist: &Playlist) -> Result<(), anyhow::Error> { + pub async fn run(&self, playlist: &Playlist) -> ! { loop { - let image = { - let mut select = crossbeam::channel::Select::new(); - let img_idx = playlist.select_next(&mut select); - let close_idx = select.recv(&self.close_rx); - - // DEADLOCK: Caller can call `Self::stop` for us to stop at any moment. - let selected = select.select_se().allow::(); - match selected.index() { - idx if idx == img_idx => playlist.next_selected(selected), - - // Note: This can't return an `Err` because `self` owns a receiver - idx if idx == close_idx => { - selected.recv(&self.close_rx).expect("On-close sender was closed"); - break; - }, - _ => unreachable!(), - } - }; + let image = playlist.next().await; match &*image { PlaylistImage::File(path) => match load::load_image(path) { @@ -91,19 +47,8 @@ impl ImageLoader { image, }; - // DEADLOCK: Caller can call `Self::stop` for us to stop at any moment. - crossbeam::select! { - // Try to send an image - // Note: This can't return an `Err` because `self` owns a receiver - send(self.image_tx, image) -> res => res.expect("Image receiver was closed"), - - // If we get anything in the close channel, break - // Note: This can't return an `Err` because `self` owns a receiver - recv(self.close_rx) -> res => { - res.expect("On-close sender was closed"); - break - }, - } + // Note: This can't return an `Err` because `self` owns a receiver + self.image_tx.send(image).await.expect("Image receiver was closed"); }, // If we couldn't load, log, remove the path and retry @@ -114,41 +59,18 @@ impl ImageLoader { }, } } - - Ok(()) - } - - /// Stops the `run` loop - pub fn stop(&self) { - // Note: This can't return an `Err` because `self` owns a sender - // DEADLOCK: The channel is unbounded, so this will not block. - self.close_tx - .send_se(()) - .allow::() - .expect("On-close receiver was closed"); - } - - /// Receives the image, waiting if not ready yet - /// - /// # Blocking - /// Blocks until [`Self::run`] starts running. - #[side_effect(MightBlock)] - pub fn recv(&self) -> Image { - // Note: This can't return an `Err` because `self` owns a sender - // DEADLOCK: Caller ensures `Self::run` will eventually run. - self.image_rx - .recv_se() - .allow::() - .expect("Image sender was closed") } /// Attempts to receive the image - pub fn try_recv(&self) -> Result, anyhow::Error> { + #[must_use] + #[allow(clippy::missing_panics_doc)] // It's an internal assertion + pub fn try_recv(&self) -> Option { // Try to get the result + // Note: This can't return an `Err` because `self` owns a sender match self.image_rx.try_recv() { - Ok(image) => Ok(Some(image)), - Err(crossbeam::channel::TryRecvError::Empty) => Ok(None), - Err(_) => anyhow::bail!("Unable to get image from loader thread"), + Ok(image) => Some(image), + Err(async_channel::TryRecvError::Empty) => None, + Err(async_channel::TryRecvError::Closed) => panic!("Image loader sender was dropped"), } } } diff --git a/zsw/src/panel.rs b/zsw/src/panel.rs index 299151c..e6fc1f1 100644 --- a/zsw/src/panel.rs +++ b/zsw/src/panel.rs @@ -94,7 +94,7 @@ impl Panels { // Update the image state (panel.state, panel.progress) = match panel.state { // If we're empty, try to get a next image - PanelImageState::Empty => match image_loader.try_recv().context("Unable to get next image")? { + PanelImageState::Empty => match image_loader.try_recv() { Some(image) => ( PanelImageState::PrimaryOnly { front: PanelImageStateImage { @@ -109,24 +109,23 @@ impl Panels { }, // If we only have the primary, try to load the next image - PanelImageState::PrimaryOnly { front } => - match image_loader.try_recv().context("Unable to get next image")? { - Some(image) => ( - PanelImageState::Both { - front, - back: PanelImageStateImage { - id: self.renderer.create_image(wgpu, image), - swap_dir: rand::random(), - }, + PanelImageState::PrimaryOnly { front } => match image_loader.try_recv() { + Some(image) => ( + PanelImageState::Both { + front, + back: PanelImageStateImage { + id: self.renderer.create_image(wgpu, image), + swap_dir: rand::random(), }, - next_progress, - ), - None => (PanelImageState::PrimaryOnly { front }, next_progress), - }, + }, + next_progress, + ), + None => (PanelImageState::PrimaryOnly { front }, next_progress), + }, // If we have both, try to update the progress and swap them if finished PanelImageState::Both { mut front, back } if finished => { - match image_loader.try_recv().context("Unable to get next image")? { + match image_loader.try_recv() { // Note: We update the front and swap them Some(image) => { self.renderer.update_image(wgpu, front.id, image); diff --git a/zsw/src/playlist.rs b/zsw/src/playlist.rs index 0f5e5cc..cf7622b 100644 --- a/zsw/src/playlist.rs +++ b/zsw/src/playlist.rs @@ -4,10 +4,7 @@ // Imports use { - crate::util::{ - extse::{CrossBeamChannelReceiverSE, CrossBeamChannelSenderSE, ParkingLotMutexSe}, - MightBlock, - }, + crate::util::{extse::ParkingLotMutexSe, MightBlock}, parking_lot::Mutex, rand::prelude::SliceRandom, std::{ @@ -15,7 +12,6 @@ use { path::{Path, PathBuf}, sync::Arc, }, - zsw_side_effect_macros::side_effect, }; /// Inner @@ -29,16 +25,10 @@ struct Inner { #[derive(Debug)] pub struct Playlist { /// Image sender - img_tx: crossbeam::channel::Sender>, + img_tx: async_channel::Sender>, /// Image receiver - img_rx: crossbeam::channel::Receiver>, - - /// Closing sender - close_tx: crossbeam::channel::Sender<()>, - - /// Closing receiver - close_rx: crossbeam::channel::Receiver<()>, + img_rx: async_channel::Receiver>, /// Inner inner: Mutex, @@ -50,8 +40,7 @@ impl Playlist { pub fn new() -> Self { // Note: Making the close channel unbounded is what allows us to not block // in `Self::stop`. - let (img_tx, img_rx) = crossbeam::channel::bounded(0); - let (close_tx, close_rx) = crossbeam::channel::unbounded(); + let (img_tx, img_rx) = async_channel::bounded(1); // Create the empty inner data let inner = Inner { images: HashSet::new() }; @@ -59,22 +48,17 @@ impl Playlist { Self { img_tx, img_rx, - close_tx, - close_rx, inner: Mutex::new(inner), } } /// Runs the playlist - /// - /// # Blocking - /// Will block in it's own event loop until [`Self::stop`] is called. #[allow(clippy::useless_transmute)] // `crossbeam::select` does it - pub fn run(&self) { + pub async fn run(&self) -> ! { // All images to send let mut images = vec![]; - 'run: loop { + loop { // Retrieve the next images and shuffle them // DEADLOCK: We ensure we don't block while `inner` is locked { @@ -83,35 +67,14 @@ impl Playlist { } images.shuffle(&mut rand::thread_rng()); - // Then try to send each one and check for the close channel - // DEADLOCK: Caller can call `Self::stop` for us to stop at any moment. + // Then try to send each image for image in images.drain(..) { - crossbeam::select! { - // Try to send an image - // Note: This can't return an `Err` because `self` owns a receiver - send(self.img_tx, image) -> res => res.expect("Image receiver was closed"), - - // If we get anything in the close channel, break - // Note: This can't return an `Err` because `self` owns a receiver - recv(self.close_rx) -> res => { - res.expect("On-close sender was closed"); - break 'run; - }, - } + // Note: This can't return an `Err` because `self` owns a receiver + self.img_tx.send(image).await.expect("Image receiver was closed"); } } } - /// Stops the `run` loop - pub fn stop(&self) { - // Note: This can't return an `Err` because `self` owns a sender - // DEADLOCK: The channel is unbounded, so this will not block. - self.close_tx - .send_se(()) - .allow::() - .expect("On-close receiver was closed"); - } - /// Clears all existing images pub fn clear(&self) { // DEADLOCK: We ensure we don't block while `inner` is locked @@ -146,31 +109,9 @@ impl Playlist { } /// Retrieves the next image - /// - /// # Blocking - /// Blocks until [`Self::run`] starts running. - #[side_effect(MightBlock)] - pub fn next(&self) -> Arc { + pub async fn next(&self) -> Arc { // Note: This can't return an `Err` because `self` owns a sender - // DEADLOCK: Caller ensures `Self::run` will eventually run. - self.img_rx - .recv_se() - .allow::() - .expect("Image sender was closed") - } - - /// Retrieves the next image under `select`. - pub fn select_next<'a>(&'a self, select: &mut crossbeam::channel::Select<'a>) -> usize { - select.recv(&self.img_rx) - } - - /// Retrieves the next image, when selected - /// - /// # Blocking - /// Does *not* block, unlike [`Self::next`] - pub fn next_selected<'a>(&'a self, selected: crossbeam::channel::SelectedOperation<'a>) -> Arc { - // Note: This can't return an `Err` because `self` owns a sender - selected.recv(&self.img_rx).expect("Image sender was closed") + self.img_rx.recv().await.expect("Image sender was closed") } } diff --git a/zsw/src/util.rs b/zsw/src/util.rs index 9cd4893..dc703ff 100644 --- a/zsw/src/util.rs +++ b/zsw/src/util.rs @@ -16,12 +16,20 @@ pub use { // Imports use { + self::extse::ParkingLotMutexSe, anyhow::Context, image::DynamicImage, + parking_lot::{Condvar, Mutex}, std::{ fs, + future::Future, hash::{Hash, Hasher}, path::Path, + sync::{ + atomic::{self, AtomicBool}, + Arc, + }, + task, time::{Duration, Instant}, }, }; @@ -83,3 +91,78 @@ pub fn image_format(image: &DynamicImage) -> &'static str { DynamicImage::ImageRgba16(_) => "Rgba16", } } + +/// Adapts a future into a thread to be run on it's own thread. +/// +/// Will drop the future once `should_quit` becomes true. +pub fn never_fut_thread_fn<'a, T, F>(should_quit: &'a AtomicBool, res: T, f: F) -> impl FnOnce() -> T + 'a +where + T: 'a, + F: Future + Send + 'a, +{ + move || { + // TODO: Not allocate here + let mut f = Box::pin(f); + + // Create the waker + let signal = Arc::new(NeverFutSignal::new()); + let waker = task::Waker::from(Arc::clone(&signal)); + let mut ctx = task::Context::from_waker(&waker); + + // Then poll it until we should quit + loop { + match f.as_mut().poll(&mut ctx) { + task::Poll::Ready(never) => never, + task::Poll::Pending => match should_quit.load(atomic::Ordering::Relaxed) { + true => break, + false => signal.wait(), + }, + } + } + + res + } +} + +/// Signal for [`spawn_fut_never`]'s waker +struct NeverFutSignal { + /// If the future should be polled + should_poll: Mutex, + + /// Condvar for waiting + cond_var: Condvar, +} + +impl NeverFutSignal { + fn new() -> Self { + Self { + should_poll: Mutex::new(true), + cond_var: Condvar::new(), + } + } + + /// Waits until the future should be polled + pub fn wait(&self) { + // Keep waiting until `should_poll` is true + // DEADLOCK: Waker will set `should_poll` to true eventually. + let mut should_poll = self.should_poll.lock_se().allow::(); + while !*should_poll { + self.cond_var.wait(&mut should_poll); + } + + // Then set it to false so the waker may re-set it to true + *should_poll = false; + } +} + +impl task::Wake for NeverFutSignal { + fn wake(self: std::sync::Arc) { + // Set that we should be polling + // DEADLOCK: Mutex is only ever locked temporarily (as `wait`ing unlocks the mutex). + let mut should_poll = self.should_poll.lock_se().allow::(); + *should_poll = true; + + // Then notify the waiter + let _ = self.cond_var.notify_one(); + } +} diff --git a/zsw/src/util/thread.rs b/zsw/src/util/thread.rs index e1e284d..39e670f 100644 --- a/zsw/src/util/thread.rs +++ b/zsw/src/util/thread.rs @@ -25,8 +25,8 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> { } } - /// Spawns a new thread using `crossbeam::thread::Scope` with name - pub fn spawn_scoped(&mut self, name: impl Into, f: F) -> Result<(), anyhow::Error> + /// Spawns a new thread + pub fn spawn(&mut self, name: impl Into, f: F) -> Result<(), anyhow::Error> where F: Send + FnOnce() -> Result<(), anyhow::Error> + 'env, { @@ -43,7 +43,7 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> { } /// Spawns multiple scoped threads - pub fn spawn_scoped_multiple( + pub fn spawn_multiple( &mut self, name: impl Into, threads: impl Iterator, @@ -54,7 +54,7 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> { let name = name.into(); threads .enumerate() - .try_for_each(move |(idx, f)| self.spawn_scoped(format!("{name}${idx}"), f)) + .try_for_each(move |(idx, f)| self.spawn(format!("{name}${idx}"), f)) } /// Joins all threads