Moved path loading to it's own module.

This commit is contained in:
Filipe Rodrigues 2021-11-18 18:12:57 +00:00
parent 2806e61fc6
commit aa583ffff3
5 changed files with 321 additions and 229 deletions

View File

@ -8,29 +8,20 @@ mod load;
mod request;
// Imports
use self::request::{ImageRequest, LoadImageError};
use crate::sync::{once_channel, priority_spmc};
use self::request::ImageRequest;
use crate::{
path_loader::PathReceiver,
sync::{once_channel, priority_spmc},
PathLoader,
};
use anyhow::Context;
use cgmath::Vector2;
use image::Rgba;
use notify::Watcher;
use rand::prelude::SliceRandom;
use std::{
mem,
num::NonZeroUsize,
ops::ControlFlow,
path::{Path, PathBuf},
sync::mpsc,
thread,
time::Duration,
};
use std::{num::NonZeroUsize, ops::Deref, thread};
/// Image buffer
pub type ImageBuffer = image::ImageBuffer<Rgba<u8>, Vec<u8>>;
/// Responder for the path distributer
type PathResponder = once_channel::Sender<Result<(), LoadImageError>>;
/// Responder for an image loader
type ImageResponder = once_channel::Sender<Result<ImageBuffer, ()>>;
@ -38,43 +29,22 @@ type ImageResponder = once_channel::Sender<Result<ImageBuffer, ()>>;
pub struct ImageLoader {
/// Image request sender
image_request_sender: priority_spmc::Sender<(ImageRequest, ImageResponder)>,
/// Filesystem watcher
_fs_watcher: notify::RecommendedWatcher,
}
impl ImageLoader {
/// Creates a new image loader
///
///
/// # Errors
/// Returns an error if unable to start the filesystem watcher
/// Returns an error if unable to create all the loader threads
pub fn new(
base_path: PathBuf, loader_threads: Option<usize>, upscale_waifu2x: bool,
path_loader: &PathLoader, loader_threads: Option<usize>, upscale_waifu2x: bool,
) -> Result<Self, anyhow::Error> {
// Start the watcher and start watching the path
let (fs_tx, fs_rx) = mpsc::channel();
let mut fs_watcher =
notify::watcher(fs_tx, Duration::from_secs(2)).context("Unable to create directory watcher")?;
fs_watcher
.watch(&base_path, notify::RecursiveMode::Recursive)
.context("Unable to start watching directory")?;
// Start the distributer thread
let (path_sender, path_receiver) = priority_spmc::channel();
thread::Builder::new()
.name("Path distributer".to_owned())
.spawn(move || match self::path_distributer(&base_path, &path_sender, &fs_rx) {
Ok(()) => log::debug!("Path distributer successfully quit"),
Err(err) => log::warn!("Path distributer returned `Err`: {err:?}"),
})
.context("Unable to spawn distributer thread")?;
// Start the image loader distribution thread
let (image_request_sender, image_request_receiver) = priority_spmc::channel();
let loader_threads = loader_threads.unwrap_or_else(default_loader_threads).max(1);
for thread_idx in 0..loader_threads {
let request_receiver = image_request_receiver.clone();
let path_receiver = path_receiver.clone();
let path_receiver = path_loader.receiver();
thread::Builder::new()
.name(format!("Image loader #{thread_idx}"))
.spawn(
@ -87,10 +57,7 @@ impl ImageLoader {
}
Ok(Self {
image_request_sender,
_fs_watcher: fs_watcher,
})
Ok(Self { image_request_sender })
}
/// Queues an image to be loaded for a certain window size
@ -171,139 +138,35 @@ impl ImageReceiver {
}
}
/// Path distributer thread function
///
/// Responsible for distributing paths to the image loader
fn path_distributer(
base_path: &Path, path_sender: &priority_spmc::Sender<(PathBuf, PathResponder)>,
fs_rx: &mpsc::Receiver<notify::DebouncedEvent>,
) -> Result<(), anyhow::Error> {
// Load all paths
let mut paths = vec![];
self::scan_dir(&mut paths, base_path);
// All response receivers
let mut response_receivers: Vec<once_channel::Receiver<Result<(), LoadImageError>>> = vec![];
// Start the reset-wait loop on our modifier
loop {
// Check if we have any filesystem events
// Note: For rename and remove events, we simply ignore the
// file that no longer exists. The loader threads will
// mark the path for removal once they find it.
while let Ok(event) = fs_rx.try_recv() {
self::handle_fs_event(event, base_path, &mut paths);
}
// Check if we got any responses
for receiver in mem::take(&mut response_receivers) {
match receiver.try_recv() {
// If everything went alright, don't do anything
Ok(Ok(())) => (),
// If we couldn't load the image, remove it
// TODO: Maybe use some sort of ordered set to make this not perform as badly?
Ok(Err(err)) => paths.retain(|path| path != &err.path),
// If they're not done yet, push them back
Err(once_channel::TryRecvError::NotReady(receiver)) => response_receivers.push(receiver),
// If we couldn't get a response, ignore
Err(_) => continue,
}
}
// If we have no paths, wait for a filesystem event, or return, if unable to
while paths.is_empty() {
log::warn!("No paths found, waiting for new files from the filesystem watcher");
match fs_rx.recv() {
Ok(event) => self::handle_fs_event(event, base_path, &mut paths),
Err(_) => anyhow::bail!("No paths are available and the filesystem watcher closed their channel"),
}
}
// Then shuffle the paths we have
log::trace!("Shuffling all files");
paths.shuffle(&mut rand::thread_rng());
// And request responses from them all
for path in paths.iter().cloned() {
// Create the channel for responding
let (sender, receiver) = once_channel::channel();
response_receivers.push(receiver);
// Then send it and quit if we're done
// Note: Priority for the path sender isn't mega relevant
if path_sender.send((path, sender), 0).is_err() {
return Ok(());
}
}
}
}
/// Returns the default number of loader threads to use
fn default_loader_threads() -> usize {
thread::available_parallelism().map_or(1, NonZeroUsize::get)
}
/// Handles a filesystem event
fn handle_fs_event(event: notify::DebouncedEvent, path: &Path, paths: &mut Vec<PathBuf>) {
log::trace!("Receive filesystem event: {event:?}");
#[allow(clippy::match_same_arms)] // They're logically in different parts
match event {
// Add the new path
notify::DebouncedEvent::Create(path) | notify::DebouncedEvent::Rename(_, path) => {
log::info!("Adding {path:?}");
paths.push(path);
},
notify::DebouncedEvent::Remove(_) => (),
// Clear all paths and rescan
notify::DebouncedEvent::Rescan => {
log::warn!("Re-scanning");
paths.clear();
self::scan_dir(paths, path);
},
// Note: Ignore any R/W events
// TODO: Check if we should be doing this?
notify::DebouncedEvent::NoticeWrite(_) |
notify::DebouncedEvent::NoticeRemove(_) |
notify::DebouncedEvent::Write(_) |
notify::DebouncedEvent::Chmod(_) => (),
// Log the error
notify::DebouncedEvent::Error(err, path) => match path {
Some(path) => log::warn!("Found error for path {path:?}: {:?}", anyhow::anyhow!(err)),
None => log::warn!("Found error for unknown path: {:?}", anyhow::anyhow!(err)),
},
}
}
/// Image loader thread function
///
/// Responsible for receiving requests and loading them.
fn image_loader(
request_receiver: &priority_spmc::Receiver<(ImageRequest, ImageResponder)>,
path_receiver: &priority_spmc::Receiver<(PathBuf, PathResponder)>, upscale_waifu2x: bool,
request_receiver: &priority_spmc::Receiver<(ImageRequest, ImageResponder)>, path_receiver: &PathReceiver,
upscale_waifu2x: bool,
) -> Result<(), anyhow::Error> {
loop {
// Get the path
let (path, path_response_sender) = match path_receiver.recv() {
let path = match path_receiver.recv() {
Ok(path) => path,
Err(_) => return Ok(()),
};
// Load the image
// Try to load the image
let image = match load::load_image(&path) {
Ok(image) => {
log::debug!("Finished loading {path:?}");
image
},
// If we didn't get it, send an error response and try again
// If we didn't manage to, remove the path and retry
Err(err) => {
log::info!("Unable to load {path:?}: {err:?}");
if path_response_sender.send(Err(LoadImageError { path })).is_err() {
return Ok(());
}
let _ = path_receiver.remove_path(path.deref().clone());
continue;
},
};
@ -322,84 +185,13 @@ fn image_loader(
Ok(image) => {
log::debug!("Finished processing {path:?}");
let _ = sender.send(Ok(image));
if path_response_sender.send(Ok(())).is_err() {
return Ok(());
}
},
// If we couldn't, send an error
// If we didn't manage to, remove the path and retry
Err(err) => {
log::info!("Unable to process {path:?}: {err:?}");
let _ = sender.send(Err(()));
if path_response_sender.send(Err(LoadImageError { path })).is_err() {
return Ok(());
}
let _ = path_receiver.remove_path(path.deref().clone());
},
};
}
}
/// Scans a directory and insert all it's paths onto `paths`
fn scan_dir(paths: &mut Vec<PathBuf>, path: &Path) {
let mut visitor = |path| {
paths.push(path);
ControlFlow::CONTINUE
};
self::visit_files_dir::<!, _>(path, &mut visitor).into_ok();
}
/// Visits all files in `path`, recursively.
///
/// # Errors
/// Ignores all errors reading directories, simply logging them.
///
/// # Return
/// Returns the number of files successfully loaded
fn visit_files_dir<E, F>(path: &Path, f: &mut F) -> Result<usize, E>
where
F: FnMut(PathBuf) -> ControlFlow<E>,
{
let mut files_loaded = 0;
let dir = match std::fs::read_dir(path) {
Ok(dir) => dir,
Err(err) => {
log::warn!("Unable to read directory `{path:?}`: {:?}", anyhow::anyhow!(err));
return Ok(0);
},
};
for entry in dir {
// Read the entry and file type
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
log::warn!("Unable to read file entry in `{path:?}`: {:?}", anyhow::anyhow!(err));
continue;
},
};
let entry_path = entry.path();
let file_type = match entry.file_type() {
Ok(file_type) => file_type,
Err(err) => {
log::warn!(
"Unable to read file type for `{entry_path:?}`: {:?}",
anyhow::anyhow!(err)
);
continue;
},
};
match file_type.is_dir() {
// Recurse on directories
true => {
files_loaded += self::visit_files_dir(&entry.path(), f)?;
},
// Visit files
false => match f(entry_path) {
ControlFlow::Continue(()) => files_loaded += 1,
ControlFlow::Break(err) => return Err(err),
},
}
}
Ok(files_loaded)
}

View File

@ -17,14 +17,17 @@ mod args;
mod gl_image;
mod image_loader;
mod image_uvs;
mod path_loader;
mod rect;
mod sync;
mod util;
mod vertex;
// Exports
pub use gl_image::GlImage;
pub use image_loader::{ImageBuffer, ImageLoader};
pub use image_uvs::ImageUvs;
pub use path_loader::PathLoader;
pub use rect::Rect;
pub use vertex::Vertex;
@ -91,9 +94,13 @@ fn main() -> Result<(), anyhow::Error> {
self::set_display_always_below(&display);
}
// Create the path loader
log::debug!("Starting the path loader");
let path_loader = PathLoader::new(args.images_dir.clone()).context("Unable to create path loader")?;
// Create the loader and start loading images
log::debug!("Starting the image loader");
let image_loader = ImageLoader::new(args.images_dir.clone(), args.loader_threads, args.upscale_waifu2x)
let image_loader = ImageLoader::new(&path_loader, args.loader_threads, args.upscale_waifu2x)
.context("Unable to create image loader")?;
// Create the indices buffer

222
src/path_loader.rs Normal file
View File

@ -0,0 +1,222 @@
//! Path loader
// Imports
use crate::{sync::priority_spmc, util};
use anyhow::Context;
use notify::Watcher;
use rand::prelude::SliceRandom;
use std::{
ops::ControlFlow,
path::{Path, PathBuf},
sync::{mpsc, Arc},
thread,
time::Duration,
};
/// The path loader
pub struct PathLoader {
/// Receiver for the path
path_receiver: priority_spmc::Receiver<Arc<PathBuf>>,
/// Filesystem event sender
fs_sender: mpsc::Sender<notify::DebouncedEvent>,
/// Filesystem watcher
_fs_watcher: notify::RecommendedWatcher,
}
impl PathLoader {
/// Creates a new path loader
///
/// # Errors
/// Returns an error if unable to start watching the filesystem, or if unable to start
/// the path loader thread
pub fn new(base_path: PathBuf) -> Result<Self, anyhow::Error> {
// Start the filesystem watcher and start watching the path
let (fs_sender, fs_receiver) = mpsc::channel();
let mut fs_watcher =
notify::watcher(fs_sender.clone(), Duration::from_secs(2)).context("Unable to create directory watcher")?;
fs_watcher
.watch(&base_path, notify::RecursiveMode::Recursive)
.context("Unable to start watching directory")?;
// Create both channels
let (path_sender, path_receiver) = priority_spmc::channel();
// Then start the path loader thread
thread::Builder::new()
.name("Path loader".to_owned())
.spawn(
move || match self::loader_thread(&base_path, &fs_receiver, &path_sender) {
Ok(()) => log::debug!("Path loader successfully returned"),
Err(err) => log::error!("Path loader returned an error: {err:?}"),
},
)
.context("Unable to start path loader thread")?;
Ok(Self {
path_receiver,
fs_sender,
_fs_watcher: fs_watcher,
})
}
/// Returns a receiver for paths
pub fn receiver(&self) -> PathReceiver {
PathReceiver {
path_receiver: self.path_receiver.clone(),
fs_sender: self.fs_sender.clone(),
}
}
}
/// A path receiver
pub struct PathReceiver {
/// Receiver for the path
path_receiver: priority_spmc::Receiver<Arc<PathBuf>>,
/// Filesystem event sender
fs_sender: mpsc::Sender<notify::DebouncedEvent>,
}
impl PathReceiver {
/// Receives a path
pub fn recv(&self) -> Result<Arc<PathBuf>, RecvError> {
self.path_receiver.recv().map_err(|_| RecvError)
}
/// Tries to receive a value
#[allow(dead_code)] // It might be useful eventually
pub fn try_recv(&self) -> Result<Arc<PathBuf>, TryRecvError> {
self.path_receiver.try_recv().map_err(|err| match err {
priority_spmc::TryRecvError::SenderQuit => TryRecvError::LoaderQuit,
priority_spmc::TryRecvError::NotReady => TryRecvError::NotReady,
})
}
/// Reports a path for removal
pub fn remove_path(&self, path: PathBuf) -> Result<(), RemovePathError> {
self.fs_sender
.send(notify::DebouncedEvent::Remove(path))
.map_err(|_| RemovePathError)
}
}
/// Error for [`PathReceiver::recv`]
#[derive(Debug, thiserror::Error)]
#[error("Path loader thread quit")]
pub struct RecvError;
/// Error for [`PathReceiver::try_recv`]
#[derive(Debug, thiserror::Error)]
pub enum TryRecvError {
/// Loader thread quit
#[error("Path loader thread quit")]
LoaderQuit,
/// Not ready
#[error("Not ready")]
NotReady,
}
/// Error for [`PathReceiver::remove_path`]
#[derive(Debug, thiserror::Error)]
#[error("Path loader thread quit")]
pub struct RemovePathError;
/// Path loader thread
fn loader_thread(
base_path: &Path, fs_rx: &mpsc::Receiver<notify::DebouncedEvent>, path_sender: &priority_spmc::Sender<Arc<PathBuf>>,
) -> Result<(), anyhow::Error> {
// Load all existing paths
let mut paths = vec![];
self::scan_dir(base_path, &mut paths);
loop {
// Check if we have any filesystem events
while let Ok(event) = fs_rx.try_recv() {
self::handle_fs_event(event, base_path, &mut paths);
}
// If we have no paths, wait for a filesystem event, or return, if unable to
while paths.is_empty() {
log::warn!("No paths found, waiting for new files from the filesystem watcher");
match fs_rx.recv() {
Ok(event) => self::handle_fs_event(event, base_path, &mut paths),
Err(_) => anyhow::bail!("No paths are available and the filesystem watcher closed their channel"),
}
}
// Then shuffle the paths we have
log::trace!("Shuffling all files");
paths.shuffle(&mut rand::thread_rng());
// Then send all paths through the sender
for path in paths.iter().map(Arc::clone) {
// Send it and quit if we're done
// Note: Priority for the path sender isn't mega relevant for now
if path_sender.send(path, 0).is_err() {
return Ok(());
}
}
}
}
/// Handles a filesystem event
fn handle_fs_event(event: notify::DebouncedEvent, base_path: &Path, paths: &mut Vec<Arc<PathBuf>>) {
log::trace!("Receive filesystem event: {event:?}");
#[allow(clippy::match_same_arms)] // They're logically in different parts
match event {
// Add the path
notify::DebouncedEvent::Create(path) => {
log::debug!("Adding {path:?}");
paths.push(Arc::new(path));
},
// Replace the path
notify::DebouncedEvent::Rename(old_path, new_path) => {
log::debug!("Renaming {old_path:?} to {new_path:?}");
for path in paths {
if **path == old_path {
*path = Arc::new(new_path);
break;
}
}
},
// Remove the path
notify::DebouncedEvent::Remove(path_to_remove) => {
log::debug!("Removing {path_to_remove:?}");
paths.retain(|path| **path != path_to_remove);
},
// Clear all paths and rescan
notify::DebouncedEvent::Rescan => {
log::warn!("Re-scanning");
paths.clear();
self::scan_dir(base_path, paths);
},
// Note: Ignore any R/W events
// TODO: Check if we should be doing this?
notify::DebouncedEvent::NoticeWrite(_) |
notify::DebouncedEvent::NoticeRemove(_) |
notify::DebouncedEvent::Write(_) |
notify::DebouncedEvent::Chmod(_) => (),
// Log the error
notify::DebouncedEvent::Error(err, path) => match path {
Some(path) => log::warn!("Found error for path {path:?}: {:?}", anyhow::anyhow!(err)),
None => log::warn!("Found error for unknown path: {:?}", anyhow::anyhow!(err)),
},
}
}
/// Scans `base_path` to `paths`
fn scan_dir(base_path: &Path, paths: &mut Vec<Arc<PathBuf>>) {
let paths_loaded = util::visit_files_dir::<!, _>(base_path, &mut |path| {
paths.push(Arc::new(path));
ControlFlow::CONTINUE
})
.into_ok();
log::info!("Loaded {paths_loaded} paths");
}

7
src/util.rs Normal file
View File

@ -0,0 +1,7 @@
//! Utility
// Modules
mod scan_dir;
// Exports
pub use scan_dir::visit_files_dir;

64
src/util/scan_dir.rs Normal file
View File

@ -0,0 +1,64 @@
//! Directory scanning
// Imports
use std::{
ops::ControlFlow,
path::{Path, PathBuf},
};
/// Visits all files in `path`, recursively.
///
/// # Errors
/// Ignores all errors reading directories, simply logging them.
///
/// # Return
/// Returns the number of files successfully loaded
pub fn visit_files_dir<E, F>(path: &Path, f: &mut F) -> Result<usize, E>
where
F: FnMut(PathBuf) -> ControlFlow<E>,
{
let mut files_loaded = 0;
let dir = match std::fs::read_dir(path) {
Ok(dir) => dir,
Err(err) => {
log::warn!("Unable to read directory `{path:?}`: {:?}", anyhow::anyhow!(err));
return Ok(0);
},
};
for entry in dir {
// Read the entry and file type
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
log::warn!("Unable to read file entry in `{path:?}`: {:?}", anyhow::anyhow!(err));
continue;
},
};
let entry_path = entry.path();
let file_type = match entry.file_type() {
Ok(file_type) => file_type,
Err(err) => {
log::warn!(
"Unable to read file type for `{entry_path:?}`: {:?}",
anyhow::anyhow!(err)
);
continue;
},
};
match file_type.is_dir() {
// Recurse on directories
true => {
files_loaded += self::visit_files_dir(&entry.path(), f)?;
},
// Visit files
false => match f(entry_path) {
ControlFlow::Continue(()) => files_loaded += 1,
ControlFlow::Break(err) => return Err(err),
},
}
}
Ok(files_loaded)
}