Image loader threads are now loaded during running.

This commit is contained in:
Filipe Rodrigues 2022-01-26 04:17:56 +00:00
parent 4481c8e969
commit ea413ae971
3 changed files with 74 additions and 56 deletions

View File

@ -3,13 +3,13 @@
//! See the [`App`] type for more details
// Imports
use crate::{paths, Args, Egui, ImageLoader, Panel, PanelState, PanelsRenderer, Rect, Wgpu};
use crate::{paths, util, Args, Egui, ImageLoader, Panel, PanelState, PanelsRenderer, Rect, Wgpu};
use anyhow::Context;
use cgmath::{Point2, Vector2};
use crossbeam::atomic::AtomicCell;
use egui::Widget;
use parking_lot::Mutex;
use std::{mem, thread, time::Duration};
use std::{mem, num::NonZeroUsize, thread, time::Duration};
use winit::{
dpi::{PhysicalPosition, PhysicalSize},
event::{Event, WindowEvent},
@ -88,7 +88,7 @@ impl App {
let (paths_distributer, paths_rx) = paths::new(args.images_dir);
// Create the image loader
let image_loader = ImageLoader::new(&paths_rx).context("Unable to create image loader")?;
let image_loader = ImageLoader::new(paths_rx).context("Unable to create image loader")?;
// Create all panels
let panels = args
@ -136,34 +136,24 @@ impl App {
// Start all threads and then wait in the main thread for events
// TODO: Not ignore errors here, although given how `thread::scope` works
// it's somewhat hard to do so
let inner = &self.inner;
crossbeam::thread::scope(|s| {
// Spawn the path distributer thread
let _path_distributer = s
.builder()
.name("Path distributer".to_owned())
.spawn(|_| inner.paths_distributer.run())
.context("Unable to start renderer thread")?;
let _path_distributer = util::spawn_scoped(s, "Path distributer", || self.inner.paths_distributer.run())?;
// Spawn the updater thread
let _updater_thread = s
.builder()
.name("Updater".to_owned())
.spawn(|_| Self::run_updater(inner))
.context("Unable to start renderer thread")?;
// Spawn all image loaders
let loader_threads = thread::available_parallelism().map_or(1, NonZeroUsize::get);
let _image_loaders =
util::spawn_scoped_multiple(s, "Image loader", loader_threads, || || self.inner.image_loader.run())?;
// Spawn the renderer thread
let _renderer_thread = s
.builder()
.name("Renderer".to_owned())
.spawn(|_| Self::run_renderer(inner))
.context("Unable to start renderer thread")?;
// Spawn the updater and renderer thread
let _updater_thread = util::spawn_scoped(s, "Updater", || Self::run_updater(&self.inner))?;
let _renderer_thread = util::spawn_scoped(s, "Renderer", || Self::run_renderer(&self.inner))?;
// Run event loop in this thread until we quit
let mut cursor_pos = PhysicalPosition::new(0.0, 0.0);
self.event_loop.run_return(|event, _, control_flow| {
// Update egui
inner.egui.platform().lock().handle_event(&event);
self.inner.egui.platform().lock().handle_event(&event);
// Set control for to wait for next event, since we're not doing
// anything else on the main thread
@ -185,7 +175,7 @@ impl App {
},
// If we resized, queue a resize on wgpu
WindowEvent::Resized(size) => inner.wgpu.resize(size),
WindowEvent::Resized(size) => self.inner.wgpu.resize(size),
// On move, update the cursor position
WindowEvent::CursorMoved { position, .. } => cursor_pos = position,
@ -196,7 +186,7 @@ impl App {
button: winit::event::MouseButton::Right,
..
} => {
inner.queued_settings_window_open_click.store(Some(cursor_pos));
self.inner.queued_settings_window_open_click.store(Some(cursor_pos));
},
_ => (),
},

View File

@ -10,13 +10,18 @@ mod load;
use super::Image;
use crate::{paths, util};
use anyhow::Context;
use std::thread;
/// Image loader
#[derive(Debug)]
pub struct ImageLoader {
/// Image receiver
image_rx: crossbeam::channel::Receiver<Image>,
/// Image sender
image_tx: crossbeam::channel::Sender<Image>,
/// Paths receiver
paths_rx: paths::Receiver,
}
impl ImageLoader {
@ -24,26 +29,26 @@ impl ImageLoader {
///
/// # Errors
/// Returns an error if unable to create all the loader threads
pub fn new(paths_rx: &paths::Receiver) -> Result<Self, anyhow::Error> {
pub fn new(paths_rx: paths::Receiver) -> Result<Self, anyhow::Error> {
// Create the image channel with the number of threads, since that's likely to
// be the number of runners we have
let loader_threads = std::thread::available_parallelism()
.context("Unable to get available parallelism")?
.get();
// Start all the loader threads
let (image_tx, image_rx) = crossbeam::channel::bounded(2 * loader_threads);
for thread_idx in 0..loader_threads {
let image_tx = image_tx.clone();
let paths_rx = paths_rx.clone();
let _loader_thread = thread::Builder::new()
.name("Image loader".to_owned())
.spawn(move || match self::image_loader(&image_tx, &paths_rx) {
Ok(()) => log::debug!("Image loader #{thread_idx} successfully quit"),
Err(err) => log::warn!("Image loader #{thread_idx} returned `Err`: {err:?}"),
})
.context("Unable to spawn image loader")?;
}
Ok(Self { image_rx })
Ok(Self {
image_rx,
image_tx,
paths_rx,
})
}
/// Runs an image loader.
///
/// Multiple image loaders may run at the same time
pub fn run(&self) -> Result<(), anyhow::Error> {
self::run_image_loader(&self.image_tx, &self.paths_rx)
}
/// Receives the image, waiting if not ready yet
@ -62,24 +67,19 @@ impl ImageLoader {
}
}
/// Image loader thread function
fn image_loader(image_tx: &crossbeam::channel::Sender<Image>, paths_rx: &paths::Receiver) -> Result<(), anyhow::Error> {
#[allow(clippy::while_let_loop)] // We might add more steps before/after getting a path
loop {
// Get the next path
let path = match paths_rx.recv() {
Ok(path) => path,
Err(_) => break,
};
// And try to process it
// Note: We can ignore errors on sending, since other senders might still be alive
#[allow(clippy::let_underscore_drop)]
/// Runs the image loader
fn run_image_loader(
image_tx: &crossbeam::channel::Sender<Image>, paths_rx: &paths::Receiver,
) -> Result<(), anyhow::Error> {
while let Ok(path) = paths_rx.recv() {
match util::measure(|| load::load_image(&path)) {
// If we got it, send it
(Ok(image), duration) => {
log::trace!("Took {duration:?} to load {path:?}");
let _ = image_tx.send(image.to_rgba8());
if image_tx.send(image.to_rgba8()).is_err() {
log::info!("No more receivers found, quitting");
break;
}
},
// If we didn't manage to, log and try again with another path
(Err(err), _) => {

View File

@ -1,14 +1,15 @@
//! Utility
// Modules
mod scan_dir;
mod display_wrapper;
mod scan_dir;
// Exports
pub use scan_dir::visit_files_dir;
pub use display_wrapper::DisplayWrapper;
pub use scan_dir::visit_files_dir;
// Imports
use anyhow::Context;
use std::{
hash::{Hash, Hasher},
time::{Duration, Instant},
@ -40,6 +41,33 @@ pub macro measure_dbg {
}
}
/// Spawns a new thread using `crossbeam::thread::Scope` with name
pub fn spawn_scoped<'scope, 'env, T, F>(
s: &'scope crossbeam::thread::Scope<'env>, name: impl Into<String>, f: F,
) -> Result<crossbeam::thread::ScopedJoinHandle<'scope, T>, anyhow::Error>
where
T: Send + 'env,
F: Send + FnOnce() -> T + 'env,
{
let name = name.into();
s.builder()
.name(name.clone())
.spawn(|_| f())
.with_context(|| format!("Unable to start thread {name:?}"))
}
/// Spawns multiple scoped threads
pub fn spawn_scoped_multiple<'scope, 'env, T, F>(
s: &'scope crossbeam::thread::Scope<'env>, name: impl Into<String>, threads: usize, mut f: impl FnMut() -> F,
) -> Result<Vec<crossbeam::thread::ScopedJoinHandle<'scope, T>>, anyhow::Error>
where
T: Send + 'env,
F: Send + FnOnce() -> T + 'env,
{
let name = name.into();
(0..threads).map(move |_| self::spawn_scoped(s, &name, f())).collect()
}
/// Hashes a value using `twox_hash`
pub fn _hash_of<T: ?Sized + Hash>(value: &T) -> u64 {
let mut hasher = twox_hash::XxHash64::with_seed(0);