Playlist and ImageLoader now use async so they may be cancelled without a dedicated channel.

This commit is contained in:
Filipe Rodrigues 2022-02-12 08:09:24 +00:00
parent 5505047c04
commit b32c8ab72a
8 changed files with 197 additions and 200 deletions

39
Cargo.lock generated
View File

@ -83,6 +83,17 @@ dependencies = [
"libloading",
]
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "atomic_refcell"
version = "0.1.8"
@ -190,6 +201,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cache-padded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]]
name = "calloop"
version = "0.9.3"
@ -321,6 +338,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "copyless"
version = "0.1.5"
@ -656,6 +682,12 @@ dependencies = [
"egui",
]
[[package]]
name = "event-listener"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "fern"
version = "0.6.0"
@ -693,6 +725,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "futures-core"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "fxhash"
version = "0.2.1"
@ -2165,6 +2203,7 @@ name = "zsw"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"bytemuck",
"cgmath",
"chrono",

View File

@ -22,6 +22,7 @@ egui_winit_platform = "0.13.0"
epi = "0.16.0"
# Async
async-channel = "1.6.1"
pollster = "0.2.4"
# X11

View File

@ -25,7 +25,13 @@ use {
Wgpu,
},
anyhow::Context,
std::{iter, num::NonZeroUsize, thread, time::Duration},
std::{
iter,
num::NonZeroUsize,
sync::{atomic, atomic::AtomicBool},
thread,
time::Duration,
},
winit::{
dpi::{PhysicalPosition, PhysicalSize},
event_loop::EventLoop,
@ -84,6 +90,9 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> {
// Create the settings window
let settings_window = SettingsWindow::new();
// If the `!` futures should quit.
let never_futures_should_quit = AtomicBool::new(false);
// Start all threads and then wait in the main thread for events
// Note: The outer result of `scope` can't be `Err` due to a panic in
// another thread, since we manually join all threads at the end.
@ -92,27 +101,34 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> {
let mut thread_spawner = util::ThreadSpawner::new(s);
// Spawn the playlist loader thread
thread_spawner.spawn_scoped("Playlist loader", || {
thread_spawner.spawn("Playlist loader", || {
playlist.add_dir(&args.images_dir);
Ok(())
})?;
// Spawn the playlist thread
// DEADLOCK: We call `Playlist::stop` at the end
thread_spawner.spawn_scoped("Playlist", || {
playlist.run();
Ok(())
})?;
// DEADLOCK: We set `never_futures_should_quit` to `true` at the end, before joining
thread_spawner.spawn(
"Playlist",
util::never_fut_thread_fn(&never_futures_should_quit, Ok(()), playlist.run()),
)?;
// Spawn all image loaders
// DEADLOCK: We call `ImageLoader::stop` at the end.
// DEADLOCK: We set `never_futures_should_quit` to `true` at the end, before joining
let loader_threads = thread::available_parallelism().map_or(1, NonZeroUsize::get);
let loader_fns = iter::repeat(|| image_loader.run(&playlist).allow::<MightBlock>()).take(loader_threads);
thread_spawner.spawn_scoped_multiple("Image loader", loader_fns)?;
let loader_fns = iter::from_fn(|| {
Some(util::never_fut_thread_fn(
&never_futures_should_quit,
Ok(()),
image_loader.run(&playlist),
))
})
.take(loader_threads);
thread_spawner.spawn_multiple("Image loader", loader_fns)?;
// Spawn the settings window thread
// DEADLOCK: We call `SettingsWindow::run` at the end
thread_spawner.spawn_scoped("Settings window", || {
thread_spawner.spawn("Settings window", || {
settings_window
.run(&wgpu, &egui, &window, &panels, &playlist)
.allow::<MightBlock>();
@ -122,7 +138,7 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> {
// Spawn the renderer thread
// DEADLOCK: We call `Renderer::stop` at the end
// We make sure `SettingsWindow::run` runs eventually
thread_spawner.spawn_scoped("Renderer", || {
thread_spawner.spawn("Renderer", || {
renderer
.run(&window, &wgpu, &panels, &egui, &image_loader, &settings_window)
.allow::<MightBlock>();
@ -146,12 +162,8 @@ pub fn run(args: &Args) -> Result<(), anyhow::Error> {
// Note: As `stop` doesn't block, order doesn't matter.
renderer.stop();
settings_window.stop();
for _ in 0..loader_threads {
image_loader.stop();
}
playlist.stop();
// Join all thread
never_futures_should_quit.store(true, atomic::Ordering::Relaxed);
thread_spawner.join_all().context("Unable to join all threads")
})
.map_err(|err| anyhow::anyhow!("Unable to start/join all threads: {err:?}"))?

View File

@ -9,78 +9,34 @@ mod load;
// Imports
use {
super::Image,
crate::{
util::{
extse::{CrossBeamChannelReceiverSE, CrossBeamChannelSelectSE, CrossBeamChannelSenderSE},
MightBlock,
},
Playlist,
PlaylistImage,
},
zsw_side_effect_macros::side_effect,
crate::{Playlist, PlaylistImage},
};
/// Image loader
#[derive(Debug)]
pub struct ImageLoader {
/// Image sender
image_tx: crossbeam::channel::Sender<Image>,
image_tx: async_channel::Sender<Image>,
/// Image receiver
image_rx: crossbeam::channel::Receiver<Image>,
/// Closing sender
close_tx: crossbeam::channel::Sender<()>,
/// Closing receiver
close_rx: crossbeam::channel::Receiver<()>,
image_rx: async_channel::Receiver<Image>,
}
impl ImageLoader {
/// Creates a new image loader.
#[must_use]
pub fn new() -> Self {
// Note: Making the close channel unbounded is what allows us to not block
// in `Self::stop`.
let (image_tx, image_rx) = crossbeam::channel::bounded(0);
let (close_tx, close_rx) = crossbeam::channel::unbounded();
let (image_tx, image_rx) = async_channel::bounded(1);
Self {
image_tx,
image_rx,
close_tx,
close_rx,
}
Self { image_tx, image_rx }
}
/// Runs this image loader
///
/// Multiple image loaders may run at the same time
///
/// # Blocking
/// Will block in it's own event loop until [`Self::close`] is called.
#[allow(clippy::useless_transmute)] // `crossbeam::select` does it
#[side_effect(MightBlock)]
pub fn run(&self, playlist: &Playlist) -> Result<(), anyhow::Error> {
pub async fn run(&self, playlist: &Playlist) -> ! {
loop {
let image = {
let mut select = crossbeam::channel::Select::new();
let img_idx = playlist.select_next(&mut select);
let close_idx = select.recv(&self.close_rx);
// DEADLOCK: Caller can call `Self::stop` for us to stop at any moment.
let selected = select.select_se().allow::<MightBlock>();
match selected.index() {
idx if idx == img_idx => playlist.next_selected(selected),
// Note: This can't return an `Err` because `self` owns a receiver
idx if idx == close_idx => {
selected.recv(&self.close_rx).expect("On-close sender was closed");
break;
},
_ => unreachable!(),
}
};
let image = playlist.next().await;
match &*image {
PlaylistImage::File(path) => match load::load_image(path) {
@ -91,19 +47,8 @@ impl ImageLoader {
image,
};
// DEADLOCK: Caller can call `Self::stop` for us to stop at any moment.
crossbeam::select! {
// Try to send an image
// Note: This can't return an `Err` because `self` owns a receiver
send(self.image_tx, image) -> res => res.expect("Image receiver was closed"),
// If we get anything in the close channel, break
// Note: This can't return an `Err` because `self` owns a receiver
recv(self.close_rx) -> res => {
res.expect("On-close sender was closed");
break
},
}
// Note: This can't return an `Err` because `self` owns a receiver
self.image_tx.send(image).await.expect("Image receiver was closed");
},
// If we couldn't load, log, remove the path and retry
@ -114,41 +59,18 @@ impl ImageLoader {
},
}
}
Ok(())
}
/// Stops the `run` loop
pub fn stop(&self) {
// Note: This can't return an `Err` because `self` owns a sender
// DEADLOCK: The channel is unbounded, so this will not block.
self.close_tx
.send_se(())
.allow::<MightBlock>()
.expect("On-close receiver was closed");
}
/// Receives the image, waiting if not ready yet
///
/// # Blocking
/// Blocks until [`Self::run`] starts running.
#[side_effect(MightBlock)]
pub fn recv(&self) -> Image {
// Note: This can't return an `Err` because `self` owns a sender
// DEADLOCK: Caller ensures `Self::run` will eventually run.
self.image_rx
.recv_se()
.allow::<MightBlock>()
.expect("Image sender was closed")
}
/// Attempts to receive the image
pub fn try_recv(&self) -> Result<Option<Image>, anyhow::Error> {
#[must_use]
#[allow(clippy::missing_panics_doc)] // It's an internal assertion
pub fn try_recv(&self) -> Option<Image> {
// Try to get the result
// Note: This can't return an `Err` because `self` owns a sender
match self.image_rx.try_recv() {
Ok(image) => Ok(Some(image)),
Err(crossbeam::channel::TryRecvError::Empty) => Ok(None),
Err(_) => anyhow::bail!("Unable to get image from loader thread"),
Ok(image) => Some(image),
Err(async_channel::TryRecvError::Empty) => None,
Err(async_channel::TryRecvError::Closed) => panic!("Image loader sender was dropped"),
}
}
}

View File

@ -94,7 +94,7 @@ impl Panels {
// Update the image state
(panel.state, panel.progress) = match panel.state {
// If we're empty, try to get a next image
PanelImageState::Empty => match image_loader.try_recv().context("Unable to get next image")? {
PanelImageState::Empty => match image_loader.try_recv() {
Some(image) => (
PanelImageState::PrimaryOnly {
front: PanelImageStateImage {
@ -109,24 +109,23 @@ impl Panels {
},
// If we only have the primary, try to load the next image
PanelImageState::PrimaryOnly { front } =>
match image_loader.try_recv().context("Unable to get next image")? {
Some(image) => (
PanelImageState::Both {
front,
back: PanelImageStateImage {
id: self.renderer.create_image(wgpu, image),
swap_dir: rand::random(),
},
PanelImageState::PrimaryOnly { front } => match image_loader.try_recv() {
Some(image) => (
PanelImageState::Both {
front,
back: PanelImageStateImage {
id: self.renderer.create_image(wgpu, image),
swap_dir: rand::random(),
},
next_progress,
),
None => (PanelImageState::PrimaryOnly { front }, next_progress),
},
},
next_progress,
),
None => (PanelImageState::PrimaryOnly { front }, next_progress),
},
// If we have both, try to update the progress and swap them if finished
PanelImageState::Both { mut front, back } if finished => {
match image_loader.try_recv().context("Unable to get next image")? {
match image_loader.try_recv() {
// Note: We update the front and swap them
Some(image) => {
self.renderer.update_image(wgpu, front.id, image);

View File

@ -4,10 +4,7 @@
// Imports
use {
crate::util::{
extse::{CrossBeamChannelReceiverSE, CrossBeamChannelSenderSE, ParkingLotMutexSe},
MightBlock,
},
crate::util::{extse::ParkingLotMutexSe, MightBlock},
parking_lot::Mutex,
rand::prelude::SliceRandom,
std::{
@ -15,7 +12,6 @@ use {
path::{Path, PathBuf},
sync::Arc,
},
zsw_side_effect_macros::side_effect,
};
/// Inner
@ -29,16 +25,10 @@ struct Inner {
#[derive(Debug)]
pub struct Playlist {
/// Image sender
img_tx: crossbeam::channel::Sender<Arc<PlaylistImage>>,
img_tx: async_channel::Sender<Arc<PlaylistImage>>,
/// Image receiver
img_rx: crossbeam::channel::Receiver<Arc<PlaylistImage>>,
/// Closing sender
close_tx: crossbeam::channel::Sender<()>,
/// Closing receiver
close_rx: crossbeam::channel::Receiver<()>,
img_rx: async_channel::Receiver<Arc<PlaylistImage>>,
/// Inner
inner: Mutex<Inner>,
@ -50,8 +40,7 @@ impl Playlist {
pub fn new() -> Self {
// Note: Making the close channel unbounded is what allows us to not block
// in `Self::stop`.
let (img_tx, img_rx) = crossbeam::channel::bounded(0);
let (close_tx, close_rx) = crossbeam::channel::unbounded();
let (img_tx, img_rx) = async_channel::bounded(1);
// Create the empty inner data
let inner = Inner { images: HashSet::new() };
@ -59,22 +48,17 @@ impl Playlist {
Self {
img_tx,
img_rx,
close_tx,
close_rx,
inner: Mutex::new(inner),
}
}
/// Runs the playlist
///
/// # Blocking
/// Will block in it's own event loop until [`Self::stop`] is called.
#[allow(clippy::useless_transmute)] // `crossbeam::select` does it
pub fn run(&self) {
pub async fn run(&self) -> ! {
// All images to send
let mut images = vec![];
'run: loop {
loop {
// Retrieve the next images and shuffle them
// DEADLOCK: We ensure we don't block while `inner` is locked
{
@ -83,35 +67,14 @@ impl Playlist {
}
images.shuffle(&mut rand::thread_rng());
// Then try to send each one and check for the close channel
// DEADLOCK: Caller can call `Self::stop` for us to stop at any moment.
// Then try to send each image
for image in images.drain(..) {
crossbeam::select! {
// Try to send an image
// Note: This can't return an `Err` because `self` owns a receiver
send(self.img_tx, image) -> res => res.expect("Image receiver was closed"),
// If we get anything in the close channel, break
// Note: This can't return an `Err` because `self` owns a receiver
recv(self.close_rx) -> res => {
res.expect("On-close sender was closed");
break 'run;
},
}
// Note: This can't return an `Err` because `self` owns a receiver
self.img_tx.send(image).await.expect("Image receiver was closed");
}
}
}
/// Stops the `run` loop
pub fn stop(&self) {
// Note: This can't return an `Err` because `self` owns a sender
// DEADLOCK: The channel is unbounded, so this will not block.
self.close_tx
.send_se(())
.allow::<MightBlock>()
.expect("On-close receiver was closed");
}
/// Clears all existing images
pub fn clear(&self) {
// DEADLOCK: We ensure we don't block while `inner` is locked
@ -146,31 +109,9 @@ impl Playlist {
}
/// Retrieves the next image
///
/// # Blocking
/// Blocks until [`Self::run`] starts running.
#[side_effect(MightBlock)]
pub fn next(&self) -> Arc<PlaylistImage> {
pub async fn next(&self) -> Arc<PlaylistImage> {
// Note: This can't return an `Err` because `self` owns a sender
// DEADLOCK: Caller ensures `Self::run` will eventually run.
self.img_rx
.recv_se()
.allow::<MightBlock>()
.expect("Image sender was closed")
}
/// Retrieves the next image under `select`.
pub fn select_next<'a>(&'a self, select: &mut crossbeam::channel::Select<'a>) -> usize {
select.recv(&self.img_rx)
}
/// Retrieves the next image, when selected
///
/// # Blocking
/// Does *not* block, unlike [`Self::next`]
pub fn next_selected<'a>(&'a self, selected: crossbeam::channel::SelectedOperation<'a>) -> Arc<PlaylistImage> {
// Note: This can't return an `Err` because `self` owns a sender
selected.recv(&self.img_rx).expect("Image sender was closed")
self.img_rx.recv().await.expect("Image sender was closed")
}
}

View File

@ -16,12 +16,20 @@ pub use {
// Imports
use {
self::extse::ParkingLotMutexSe,
anyhow::Context,
image::DynamicImage,
parking_lot::{Condvar, Mutex},
std::{
fs,
future::Future,
hash::{Hash, Hasher},
path::Path,
sync::{
atomic::{self, AtomicBool},
Arc,
},
task,
time::{Duration, Instant},
},
};
@ -83,3 +91,78 @@ pub fn image_format(image: &DynamicImage) -> &'static str {
DynamicImage::ImageRgba16(_) => "Rgba16",
}
}
/// Adapts a future into a thread to be run on it's own thread.
///
/// Will drop the future once `should_quit` becomes true.
pub fn never_fut_thread_fn<'a, T, F>(should_quit: &'a AtomicBool, res: T, f: F) -> impl FnOnce() -> T + 'a
where
T: 'a,
F: Future<Output = !> + Send + 'a,
{
move || {
// TODO: Not allocate here
let mut f = Box::pin(f);
// Create the waker
let signal = Arc::new(NeverFutSignal::new());
let waker = task::Waker::from(Arc::clone(&signal));
let mut ctx = task::Context::from_waker(&waker);
// Then poll it until we should quit
loop {
match f.as_mut().poll(&mut ctx) {
task::Poll::Ready(never) => never,
task::Poll::Pending => match should_quit.load(atomic::Ordering::Relaxed) {
true => break,
false => signal.wait(),
},
}
}
res
}
}
/// Signal for [`spawn_fut_never`]'s waker
struct NeverFutSignal {
/// If the future should be polled
should_poll: Mutex<bool>,
/// Condvar for waiting
cond_var: Condvar,
}
impl NeverFutSignal {
fn new() -> Self {
Self {
should_poll: Mutex::new(true),
cond_var: Condvar::new(),
}
}
/// Waits until the future should be polled
pub fn wait(&self) {
// Keep waiting until `should_poll` is true
// DEADLOCK: Waker will set `should_poll` to true eventually.
let mut should_poll = self.should_poll.lock_se().allow::<MightBlock>();
while !*should_poll {
self.cond_var.wait(&mut should_poll);
}
// Then set it to false so the waker may re-set it to true
*should_poll = false;
}
}
impl task::Wake for NeverFutSignal {
fn wake(self: std::sync::Arc<Self>) {
// Set that we should be polling
// DEADLOCK: Mutex is only ever locked temporarily (as `wait`ing unlocks the mutex).
let mut should_poll = self.should_poll.lock_se().allow::<MightBlock>();
*should_poll = true;
// Then notify the waiter
let _ = self.cond_var.notify_one();
}
}

View File

@ -25,8 +25,8 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> {
}
}
/// Spawns a new thread using `crossbeam::thread::Scope` with name
pub fn spawn_scoped<F>(&mut self, name: impl Into<String>, f: F) -> Result<(), anyhow::Error>
/// Spawns a new thread
pub fn spawn<F>(&mut self, name: impl Into<String>, f: F) -> Result<(), anyhow::Error>
where
F: Send + FnOnce() -> Result<(), anyhow::Error> + 'env,
{
@ -43,7 +43,7 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> {
}
/// Spawns multiple scoped threads
pub fn spawn_scoped_multiple<F>(
pub fn spawn_multiple<F>(
&mut self,
name: impl Into<String>,
threads: impl Iterator<Item = F>,
@ -54,7 +54,7 @@ impl<'scope, 'env> ThreadSpawner<'scope, 'env> {
let name = name.into();
threads
.enumerate()
.try_for_each(move |(idx, f)| self.spawn_scoped(format!("{name}${idx}"), f))
.try_for_each(move |(idx, f)| self.spawn(format!("{name}${idx}"), f))
}
/// Joins all threads