Slightly refactored path loading to load the initial paths in the background.

This commit is contained in:
Filipe Rodrigues 2021-12-27 20:38:39 +00:00
parent 137fc561f3
commit 842e1c8659
7 changed files with 87 additions and 62 deletions

3
.gitignore vendored
View File

@ -3,3 +3,6 @@
# Rust
/target
# Logs
latest.log

View File

@ -48,3 +48,7 @@ twox-hash = "1.6.1"
# meaningfully tests most things
[profile.dev.package."*"]
opt-level = 3
# Add debug to release for profiling
[profile.release]
debug = true

16
run.sh
View File

@ -1,23 +1,17 @@
set -e
#!/bin/env bash
#cargo run --release -- \
# ~/.wallpaper/test \
# --image-duration 5.0 \
# --window-geometry "3280x1080+0+0" \
# --image-geometry "1360x768+0+312" \
# --image-geometry "1920x1080+1360+0" \
# --fade-point 0.8 \
# --image-backlog 0 \
set -e
cargo run -- \
~/.wallpaper/active \
--image-duration 5.0 \
--image-duration 1.0 \
--window-geometry "3280x1080+0+0" \
--image-geometry "1360x768+0+312" \
--image-geometry "1920x1080+1360+0" \
--fade-point 0.8 \
--image-backlog 0 \
--loader-threads 4 \
--processor-threads 4 \
--processor-threads 4
#cargo run -- \
# ~/.wallpaper/active \

View File

@ -5,7 +5,7 @@ use crate::Rect;
use anyhow::Context;
use cgmath::{EuclideanSpace, Point2, Vector2};
use clap::{App as ClapApp, Arg as ClapArg};
use std::{path::PathBuf, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};
/// Args
#[derive(Debug)]
@ -20,7 +20,7 @@ pub struct Args {
pub image_duration: Duration,
/// Images directory
pub images_dir: PathBuf,
pub images_dir: Arc<PathBuf>,
/// Fade point (0.5..1.0)
pub fade_point: f32,
@ -216,6 +216,7 @@ pub fn get() -> Result<Args, anyhow::Error> {
.value_of_os(arg_name::IMAGES_DIR)
.context("Required argument was missing")?,
);
let images_dir = Arc::new(images_dir);
let fade = matches
.value_of(arg_name::FADE_POINT)

View File

@ -17,10 +17,10 @@ use std::{
/// The path loader
pub struct PathLoader {
/// Receiver for the path
path_receiver: priority_spmc::Receiver<Arc<PathBuf>>,
path_rx: priority_spmc::Receiver<Arc<PathBuf>>,
/// Filesystem event sender
fs_sender: mpsc::Sender<notify::DebouncedEvent>,
fs_tx: mpsc::Sender<notify::DebouncedEvent>,
/// Filesystem watcher
_fs_watcher: notify::RecommendedWatcher,
@ -32,34 +32,42 @@ impl PathLoader {
/// # Errors
/// Returns an error if unable to start watching the filesystem, or if unable to start
/// the path loader thread
pub fn new(base_path: PathBuf) -> Result<Self, anyhow::Error> {
pub fn new(base_path: Arc<PathBuf>) -> Result<Self, anyhow::Error> {
// Start the filesystem watcher and start watching the path
let (fs_sender, fs_receiver) = mpsc::channel();
let (fs_tx, fs_rx) = mpsc::channel();
let mut fs_watcher =
notify::watcher(fs_sender.clone(), Duration::from_secs(2)).context("Unable to create directory watcher")?;
notify::watcher(fs_tx.clone(), Duration::from_secs(2)).context("Unable to create directory watcher")?;
fs_watcher
.watch(&base_path, notify::RecursiveMode::Recursive)
.watch(&*base_path, notify::RecursiveMode::Recursive)
.context("Unable to start watching directory")?;
// Then start loading all existing path
{
let base_path = base_path.clone();
let fs_sender = fs_tx.clone();
thread::Builder::new()
.name("Path loader".to_owned())
.spawn(move || self::load_paths(&base_path, &fs_sender))
.context("Unable to start path loader thread")?;
}
// Create both channels
// Note: Since we can hand out paths quickly, we can use a relatively low capacity
#[allow(clippy::expect_used)] // It won't panic
let (path_sender, path_receiver) = priority_spmc::channel(Some(NonZeroUsize::new(16).expect("16 isn't 0")));
let (path_tx, path_rx) = priority_spmc::channel(Some(NonZeroUsize::new(16).expect("16 isn't 0")));
// Then start the path loader thread
thread::Builder::new()
.name("Path loader".to_owned())
.spawn(
move || match self::loader_thread(&base_path, &fs_receiver, &path_sender) {
Ok(()) => log::debug!("Path loader successfully returned"),
Err(err) => log::error!("Path loader returned an error: {err:?}"),
},
)
.context("Unable to start path loader thread")?;
.name("Path distributor".to_owned())
.spawn(move || match self::loader_thread(&base_path, &fs_rx, &path_tx) {
Ok(()) => log::debug!("Path distributor successfully returned"),
Err(err) => log::error!("Path distributor returned an error: {err:?}"),
})
.context("Unable to start path distributor thread")?;
Ok(Self {
path_receiver,
fs_sender,
path_rx,
fs_tx,
_fs_watcher: fs_watcher,
})
}
@ -67,8 +75,8 @@ impl PathLoader {
/// Returns a receiver for paths
pub fn receiver(&self) -> PathReceiver {
PathReceiver {
path_receiver: self.path_receiver.clone(),
fs_sender: self.fs_sender.clone(),
path_rx: self.path_rx.clone(),
fs_tx: self.fs_tx.clone(),
}
}
}
@ -76,22 +84,22 @@ impl PathLoader {
/// A path receiver
pub struct PathReceiver {
/// Receiver for the path
path_receiver: priority_spmc::Receiver<Arc<PathBuf>>,
path_rx: priority_spmc::Receiver<Arc<PathBuf>>,
/// Filesystem event sender
fs_sender: mpsc::Sender<notify::DebouncedEvent>,
fs_tx: mpsc::Sender<notify::DebouncedEvent>,
}
impl PathReceiver {
/// Receives a path
pub fn recv(&self) -> Result<Arc<PathBuf>, RecvError> {
self.path_receiver.recv().map_err(|_| RecvError)
self.path_rx.recv().map_err(|_| RecvError)
}
/// Tries to receive a value
#[allow(dead_code)] // It might be useful eventually
pub fn try_recv(&self) -> Result<Arc<PathBuf>, TryRecvError> {
self.path_receiver.try_recv().map_err(|err| match err {
self.path_rx.try_recv().map_err(|err| match err {
priority_spmc::TryRecvError::SenderQuit => TryRecvError::LoaderQuit,
priority_spmc::TryRecvError::NotReady => TryRecvError::NotReady,
})
@ -99,7 +107,7 @@ impl PathReceiver {
/// Reports a path for removal
pub fn remove_path(&self, path: PathBuf) -> Result<(), RemovePathError> {
self.fs_sender
self.fs_tx
.send(notify::DebouncedEvent::Remove(path))
.map_err(|_| RemovePathError)
}
@ -131,9 +139,9 @@ pub struct RemovePathError;
fn loader_thread(
base_path: &Path, fs_rx: &mpsc::Receiver<notify::DebouncedEvent>, path_sender: &priority_spmc::Sender<Arc<PathBuf>>,
) -> Result<(), anyhow::Error> {
// Load all existing paths
// Load all existing paths in a background thread
let mut paths = vec![];
self::scan_dir(base_path, &mut paths);
loop {
// Check if we have any filesystem events
@ -166,7 +174,7 @@ fn loader_thread(
}
/// Handles a filesystem event
fn handle_fs_event(event: notify::DebouncedEvent, base_path: &Path, paths: &mut Vec<Arc<PathBuf>>) {
fn handle_fs_event(event: notify::DebouncedEvent, _base_path: &Path, paths: &mut Vec<Arc<PathBuf>>) {
log::trace!("Receive filesystem event: {event:?}");
#[allow(clippy::match_same_arms)] // They're logically in different parts
@ -193,11 +201,7 @@ fn handle_fs_event(event: notify::DebouncedEvent, base_path: &Path, paths: &mut
},
// Clear all paths and rescan
notify::DebouncedEvent::Rescan => {
log::warn!("Re-scanning");
paths.clear();
self::scan_dir(base_path, paths);
},
notify::DebouncedEvent::Rescan => log::warn!("Re-scanning (Not yet implemented)"),
// Note: Ignore any R/W events
// TODO: Check if we should be doing this?
@ -214,12 +218,22 @@ fn handle_fs_event(event: notify::DebouncedEvent, base_path: &Path, paths: &mut
}
}
/// Scans `base_path` to `paths`
fn scan_dir(base_path: &Path, paths: &mut Vec<Arc<PathBuf>>) {
let paths_loaded = util::visit_files_dir::<!, _>(base_path, &mut |path| {
paths.push(Arc::new(path));
ControlFlow::CONTINUE
/// Loads all paths from `base_path` and sends them to `fs_tx`
fn load_paths(base_path: &Path, fs_tx: &mpsc::Sender<notify::DebouncedEvent>) {
let mut paths_loaded = 0;
let loading_duration = util::measure(|| {
util::visit_files_dir(
base_path,
&mut |path| match fs_tx.send(notify::DebouncedEvent::Create(path)) {
Ok(()) => {
paths_loaded += 1;
ControlFlow::CONTINUE
},
Err(_) => ControlFlow::BREAK,
},
)
})
.into_ok();
log::info!("Loaded {paths_loaded} paths");
.1;
log::debug!("Finishing loading all {paths_loaded} paths in {loading_duration:?}");
}

View File

@ -5,3 +5,14 @@ mod scan_dir;
// Exports
pub use scan_dir::visit_files_dir;
// Imports
use std::time::{Duration, Instant};
/// Measures how long it took to execute a function
pub fn measure<T>(f: impl FnOnce() -> T) -> (T, Duration) {
let start_time = Instant::now();
let value = f();
let duration = Instant::now().saturating_duration_since(start_time);
(value, duration)
}

View File

@ -13,16 +13,15 @@ use std::{
///
/// # Return
/// Returns the number of files successfully loaded
pub fn visit_files_dir<E, F>(path: &Path, f: &mut F) -> Result<usize, E>
pub fn visit_files_dir<E, F>(path: &Path, f: &mut F) -> Result<(), E>
where
F: FnMut(PathBuf) -> ControlFlow<E>,
{
let mut files_loaded = 0;
let dir = match std::fs::read_dir(path) {
Ok(dir) => dir,
Err(err) => {
log::warn!("Unable to read directory `{path:?}`: {:?}", anyhow::anyhow!(err));
return Ok(0);
return Ok(());
},
};
for entry in dir {
@ -48,17 +47,16 @@ where
match file_type.is_dir() {
// Recurse on directories
true => {
files_loaded += self::visit_files_dir(&entry.path(), f)?;
},
true => self::visit_files_dir(&entry.path(), f)?,
// Visit files
false => match f(entry_path) {
ControlFlow::Continue(()) => files_loaded += 1,
ControlFlow::Break(err) => return Err(err),
false => {
if let ControlFlow::Break(err) = f(entry_path) {
return Err(err);
}
},
}
}
Ok(files_loaded)
Ok(())
}