From 3435421888c84a52beb2a73f866dcf00021823b7 Mon Sep 17 00:00:00 2001 From: James Pine Date: Wed, 18 Dec 2024 18:06:30 -0800 Subject: [PATCH] fixed up some shared types --- Cargo.lock | 1 + .../heavy-lifting/src/file_identifier/job.rs | 3 +- core/crates/heavy-lifting/src/indexer/job.rs | 30 ++----- .../heavy-lifting/src/job_system/job.rs | 14 ++-- .../heavy-lifting/src/job_system/report.rs | 52 +----------- .../heavy-lifting/src/job_system/runner.rs | 4 +- .../heavy-lifting/src/media_processor/job.rs | 3 +- core/crates/shared-types/Cargo.toml | 1 + core/crates/shared-types/src/jobs/copy.rs | 66 +++++++++++++++ core/crates/shared-types/src/jobs/mod.rs | 50 ++++++++++++ core/crates/shared-types/src/lib.rs | 81 +------------------ core/crates/shared-types/src/sd_path.rs | 19 ----- 12 files changed, 139 insertions(+), 185 deletions(-) create mode 100644 core/crates/shared-types/src/jobs/copy.rs create mode 100644 core/crates/shared-types/src/jobs/mod.rs diff --git a/Cargo.lock b/Cargo.lock index acc6644bd30e..7890fa7f9cbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10135,6 +10135,7 @@ dependencies = [ "prisma-client-rust", "sd-prisma", "serde", + "serde_json", "specta", "thiserror 1.0.64", "uuid", diff --git a/core/crates/heavy-lifting/src/file_identifier/job.rs b/core/crates/heavy-lifting/src/file_identifier/job.rs index 058744629d28..92cd4dfd28b7 100644 --- a/core/crates/heavy-lifting/src/file_identifier/job.rs +++ b/core/crates/heavy-lifting/src/file_identifier/job.rs @@ -2,7 +2,6 @@ use crate::{ file_identifier, job_system::{ job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus}, - report::ReportOutputMetadata, utils::cancel_pending_tasks, DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks, }, @@ -13,7 +12,7 @@ use crate::{ use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_prisma_helpers::{file_path_for_file_identifier, CasId}; - +use sd_core_shared_types::jobs::ReportOutputMetadata; use sd_prisma::{ prisma::{device, file_path, location, SortOrder}, prisma_sync, diff --git a/core/crates/heavy-lifting/src/indexer/job.rs b/core/crates/heavy-lifting/src/indexer/job.rs index f55ce317bf48..221ba1adfea7 100644 --- a/core/crates/heavy-lifting/src/indexer/job.rs +++ b/core/crates/heavy-lifting/src/indexer/job.rs @@ -4,17 +4,16 @@ use crate::{ job::{ Job, JobContext, JobName, JobReturn, JobTaskDispatcher, ProgressUpdate, ReturnStatus, }, - report::ReportOutputMetadata, utils::cancel_pending_tasks, DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks, }, utils::sub_path::get_full_path_from_sub_path, Error, LocationScanState, NonCriticalError, OuterContext, }; - use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_indexer_rules::{IndexerRule, IndexerRuler}; use sd_core_prisma_helpers::location_with_indexer_rules; +use sd_core_shared_types::jobs::ReportOutputMetadata; use sd_prisma::{ prisma::{device, location}, @@ -40,7 +39,6 @@ use futures::{stream::FuturesUnordered, StreamExt}; use futures_concurrency::future::TryJoin; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use serde_json::json; use tokio::time::Instant; use tracing::{debug, instrument, trace, warn, Level}; @@ -980,28 +978,12 @@ impl From for Vec { removed_count, }: Metadata, ) -> Self { - mean_scan_read_time /= u32::max(total_walk_tasks, 1); // To avoid division by zero - mean_db_write_time /= total_save_tasks + total_update_tasks + 1; // +1 to update directories sizes + mean_db_write_time.normalize(); + mean_scan_read_time.normalize(); - vec![ - ReportOutputMetadata::Indexer { - total_paths: u64_to_frontend(total_paths), - }, - ReportOutputMetadata::Metrics(HashMap::from([ - ("mean_scan_read_time".into(), json!(mean_scan_read_time)), - ("mean_db_write_time".into(), json!(mean_db_write_time)), - ("total_tasks".into(), json!(total_tasks)), - ("completed_tasks".into(), json!(completed_tasks)), - ("total_paths".into(), json!(total_paths)), - ("total_updated_paths".into(), json!(total_updated_paths)), - ("total_walk_tasks".into(), json!(total_walk_tasks)), - ("total_save_tasks".into(), json!(total_save_tasks)), - ("total_update_tasks".into(), json!(total_update_tasks)), - ("indexed_count".into(), json!(indexed_count)), - ("updated_count".into(), json!(updated_count)), - ("removed_count".into(), json!(removed_count)), - ])), - ] + vec![ReportOutputMetadata::Indexer { + total_paths: (total_paths, total_updated_paths), + }] } } diff --git a/core/crates/heavy-lifting/src/job_system/job.rs b/core/crates/heavy-lifting/src/job_system/job.rs index e32897dcbabe..af40ca523653 100644 --- a/core/crates/heavy-lifting/src/job_system/job.rs +++ b/core/crates/heavy-lifting/src/job_system/job.rs @@ -20,6 +20,11 @@ use std::{ time::Duration, }; +use super::{ + error::DispatcherError, + report::{Report, ReportBuilder, ReportInputMetadata, ReportMetadata, Status}, + Command, JobId, JobSystemError, SerializableJob, SerializedTasks, +}; use async_channel as chan; use chrono::{DateTime, Utc}; use futures::{stream, Future, FutureExt, StreamExt}; @@ -27,6 +32,7 @@ use futures_concurrency::{ future::{Join, TryJoin}, stream::Merge, }; +use sd_core_shared_types::jobs::ReportOutputMetadata; use serde::{Deserialize, Serialize}; use serde_json::Value; use specta::Type; @@ -39,14 +45,6 @@ use tokio::{ use tracing::{debug, error, instrument, trace, warn, Instrument, Level}; use uuid::Uuid; -use super::{ - error::DispatcherError, - report::{ - Report, ReportBuilder, ReportInputMetadata, ReportMetadata, ReportOutputMetadata, Status, - }, - Command, JobId, JobSystemError, SerializableJob, SerializedTasks, -}; - #[derive( Debug, Serialize, Deserialize, EnumString, Display, Clone, Copy, Type, Hash, PartialEq, Eq, )] diff --git a/core/crates/heavy-lifting/src/job_system/report.rs b/core/crates/heavy-lifting/src/job_system/report.rs index 9f87b1eb4361..b9082806642d 100644 --- a/core/crates/heavy-lifting/src/job_system/report.rs +++ b/core/crates/heavy-lifting/src/job_system/report.rs @@ -1,9 +1,9 @@ use crate::NonCriticalError; -use sd_prisma::prisma::{file_path, job, location, PrismaClient}; +use sd_prisma::prisma::{job, location, PrismaClient}; use sd_utils::db::{maybe_missing, MissingFieldError}; -use std::{collections::HashMap, fmt, path::PathBuf, str::FromStr}; +use std::{fmt, path::PathBuf, str::FromStr}; use chrono::{DateTime, Utc}; use prisma_client_rust::QueryError; @@ -13,6 +13,8 @@ use strum::ParseError; use super::{job::JobName, JobId}; +use sd_core_shared_types::jobs::ReportOutputMetadata; + #[derive(thiserror::Error, Debug)] pub enum ReportError { #[error("failed to create job report in database: {0}")] @@ -67,52 +69,6 @@ pub enum ReportInputMetadata { SubPath(PathBuf), } -#[derive(Debug, Serialize, Deserialize, Type, Clone)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "type", content = "data")] -pub enum ReportOutputMetadata { - Metrics(HashMap), - Indexer { - total_paths: (u32, u32), - }, - FileIdentifier { - total_orphan_paths: (u32, u32), - total_objects_created: (u32, u32), - total_objects_linked: (u32, u32), - }, - MediaProcessor { - media_data_extracted: (u32, u32), - media_data_skipped: (u32, u32), - thumbnails_generated: (u32, u32), - thumbnails_skipped: (u32, u32), - }, - Copier { - source_location_id: location::id::Type, - target_location_id: location::id::Type, - sources_file_path_ids: Vec, - target_location_relative_directory_path: PathBuf, - }, - Mover { - source_location_id: location::id::Type, - target_location_id: location::id::Type, - sources_file_path_ids: Vec, - target_location_relative_directory_path: PathBuf, - }, - Deleter { - location_id: location::id::Type, - file_path_ids: Vec, - }, - Eraser { - location_id: location::id::Type, - file_path_ids: Vec, - passes: u32, - }, - FileValidator { - location_id: location::id::Type, - sub_path: Option, - }, -} - impl From for ReportMetadata { fn from(value: ReportInputMetadata) -> Self { Self::Input(value) diff --git a/core/crates/heavy-lifting/src/job_system/runner.rs b/core/crates/heavy-lifting/src/job_system/runner.rs index d44c898f3a15..8c7db79753ba 100644 --- a/core/crates/heavy-lifting/src/job_system/runner.rs +++ b/core/crates/heavy-lifting/src/job_system/runner.rs @@ -1,6 +1,5 @@ use crate::{Error, JobContext}; -use sd_prisma::prisma::location; use sd_task_system::BaseTaskDispatcher; use sd_utils::error::FileIOError; @@ -31,10 +30,11 @@ use uuid::Uuid; use super::{ job::{DynJob, JobHandle, JobName, JobOutput, OuterContext, ReturnStatus}, - report::{self, ReportOutputMetadata}, + report::{self}, store::{StoredJob, StoredJobEntry}, Command, JobId, JobSystemError, SerializedTasks, }; +use sd_core_shared_types::jobs::ReportOutputMetadata; const JOBS_INITIAL_CAPACITY: usize = 32; const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60); diff --git a/core/crates/heavy-lifting/src/media_processor/job.rs b/core/crates/heavy-lifting/src/media_processor/job.rs index fb622e1622b1..fbea81b36046 100644 --- a/core/crates/heavy-lifting/src/media_processor/job.rs +++ b/core/crates/heavy-lifting/src/media_processor/job.rs @@ -1,7 +1,6 @@ use crate::{ job_system::{ job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus}, - report::ReportOutputMetadata, utils::cancel_pending_tasks, DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks, }, @@ -13,6 +12,7 @@ use crate::{ use sd_core_file_path_helper::IsolatedFilePathData; use sd_core_prisma_helpers::file_path_for_media_processor; +use sd_core_shared_types::jobs::ReportOutputMetadata; use sd_file_ext::extensions::Extension; use sd_prisma::{ prisma::{location, PrismaClient}, @@ -24,7 +24,6 @@ use sd_task_system::{ TaskOutput, TaskStatus, TaskSystemError, }; use sd_utils::{db::maybe_missing, u64_to_frontend}; - use std::{ collections::{HashMap, HashSet}, fmt, diff --git a/core/crates/shared-types/Cargo.toml b/core/crates/shared-types/Cargo.toml index 14240bdd28f7..32d20a8ce3da 100644 --- a/core/crates/shared-types/Cargo.toml +++ b/core/crates/shared-types/Cargo.toml @@ -12,6 +12,7 @@ version = "0.1.0" prisma-client-rust = { workspace = true } sd-prisma = { path = "../../../crates/prisma" } serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } specta = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } diff --git a/core/crates/shared-types/src/jobs/copy.rs b/core/crates/shared-types/src/jobs/copy.rs new file mode 100644 index 000000000000..f7b5af59135d --- /dev/null +++ b/core/crates/shared-types/src/jobs/copy.rs @@ -0,0 +1,66 @@ +use crate::sd_path::SdPath; +use serde::{Deserialize, Serialize}; +use specta::Type; +/// A single file copy operation, whether successful or failed +#[derive(Debug, Serialize, Deserialize, Type, Clone)] +pub struct CopyOperation { + /// Source path of the file + pub source: SdPath, + /// Target path where the file was/should be copied to + pub target: SdPath, + /// Size of the file in bytes + pub size: u64, + /// Whether this was a cross-device copy + pub cross_device: bool, + /// Time taken for the copy in milliseconds (None if failed) + pub duration_ms: Option, + /// Error message if the copy failed (None if successful) + pub error: Option, +} + +/// Statistics and results from a copy operation +#[derive(Debug, Serialize, Deserialize, Type, Clone)] +pub struct CopyStats { + /// Total number of files to copy + pub total_files: u32, + /// Total bytes to copy + pub total_bytes: u64, + /// Number of files successfully copied + pub completed_files: u32, + /// Number of bytes successfully copied + pub completed_bytes: u64, + /// Average speed in bytes per second + pub speed: u64, + /// List of successful copy operations + pub successful: Vec, + /// List of failed copy operations + pub failed: Vec, +} + +impl Default for CopyStats { + fn default() -> Self { + Self { + total_files: 0, + total_bytes: 0, + completed_files: 0, + completed_bytes: 0, + speed: 0, + successful: Vec::new(), + failed: Vec::new(), + } + } +} + +impl CopyStats { + pub fn files_skipped(&self) -> u32 { + self.total_files - (self.completed_files + self.failed.len() as u32) + } + + pub fn successful_operations(&self) -> impl Iterator { + self.successful.iter() + } + + pub fn failed_operations(&self) -> impl Iterator { + self.failed.iter() + } +} diff --git a/core/crates/shared-types/src/jobs/mod.rs b/core/crates/shared-types/src/jobs/mod.rs new file mode 100644 index 000000000000..da916d8b02fc --- /dev/null +++ b/core/crates/shared-types/src/jobs/mod.rs @@ -0,0 +1,50 @@ +pub mod copy; +use crate::jobs::copy::CopyStats; +use serde::{Deserialize, Serialize}; +use serde_json; +use specta::Type; +use std::{collections::HashMap, path::PathBuf}; +use uuid::Uuid; +#[derive(Debug, Serialize, Deserialize, Type, Clone)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type", content = "data")] +pub enum ReportOutputMetadata { + Metrics(HashMap), + Indexer { + total_paths: (u32, u32), + }, + FileIdentifier { + total_orphan_paths: (u32, u32), + total_objects_created: (u32, u32), + total_objects_linked: (u32, u32), + }, + MediaProcessor { + media_data_extracted: (u32, u32), + media_data_skipped: (u32, u32), + thumbnails_generated: (u32, u32), + thumbnails_skipped: (u32, u32), + }, + Copier(CopyStats), + Deleter { + location_id: Uuid, + file_path_ids: Vec, + }, + FileValidator { + location_id: Uuid, + sub_path: Option, + }, + + // DEPRECATED + Mover { + source_location_id: Uuid, + target_location_id: Uuid, + sources_file_path_ids: Vec, + target_location_relative_directory_path: PathBuf, + }, + + Eraser { + location_id: Uuid, + file_path_ids: Vec, + passes: u32, + }, +} diff --git a/core/crates/shared-types/src/lib.rs b/core/crates/shared-types/src/lib.rs index f94c0f247c9f..49b68d5b8153 100644 --- a/core/crates/shared-types/src/lib.rs +++ b/core/crates/shared-types/src/lib.rs @@ -1,81 +1,2 @@ -use serde::{Deserialize, Serialize}; -use specta::Type; -use std::path::PathBuf; -use uuid::Uuid; - +pub mod jobs; pub mod sd_path; - -/// A path to a file or directory in Spacedrive -#[derive(Debug, Clone, Serialize, Deserialize, Type)] -pub struct PathData { - /// The device ID where this path is located - pub device: Uuid, - /// The location ID if this path is in an indexed location - pub location: Option, - /// The local path on the device - pub path: PathBuf, -} - -/// A single file copy operation, whether successful or failed -#[derive(Debug, Serialize, Deserialize, Type, Clone)] -pub struct CopyOperation { - /// Source path of the file - pub source: PathData, - /// Target path where the file was/should be copied to - pub target: PathData, - /// Size of the file in bytes - pub size: u64, - /// Whether this was a cross-device copy - pub cross_device: bool, - /// Time taken for the copy in milliseconds (None if failed) - pub duration_ms: Option, - /// Error message if the copy failed (None if successful) - pub error: Option, -} - -/// Statistics and results from a copy operation -#[derive(Debug, Serialize, Deserialize, Type, Clone)] -pub struct CopyStats { - /// Total number of files to copy - pub total_files: u32, - /// Total bytes to copy - pub total_bytes: u64, - /// Number of files successfully copied - pub completed_files: u32, - /// Number of bytes successfully copied - pub completed_bytes: u64, - /// Average speed in bytes per second - pub speed: u64, - /// List of successful copy operations - pub successful: Vec, - /// List of failed copy operations - pub failed: Vec, -} - -impl Default for CopyStats { - fn default() -> Self { - Self { - total_files: 0, - total_bytes: 0, - completed_files: 0, - completed_bytes: 0, - speed: 0, - successful: Vec::new(), - failed: Vec::new(), - } - } -} - -impl CopyStats { - pub fn files_skipped(&self) -> u32 { - self.total_files - (self.completed_files + self.failed.len() as u32) - } - - pub fn successful_operations(&self) -> impl Iterator { - self.successful.iter() - } - - pub fn failed_operations(&self) -> impl Iterator { - self.failed.iter() - } -} diff --git a/core/crates/shared-types/src/sd_path.rs b/core/crates/shared-types/src/sd_path.rs index 0010b3016e63..ac055313fa85 100644 --- a/core/crates/shared-types/src/sd_path.rs +++ b/core/crates/shared-types/src/sd_path.rs @@ -1,6 +1,5 @@ use std::path::{Path, PathBuf}; -use super::PathData; use sd_prisma::prisma::{device, location, PrismaClient}; use serde::{Deserialize, Serialize}; use specta::Type; @@ -48,24 +47,6 @@ impl SdPath { } } - /// Convert from PathData - pub fn from_path_data(data: PathData) -> Self { - Self { - device: data.device, - location: data.location, - local_path: data.path, - } - } - - /// Convert to PathData - pub fn to_path_data(&self) -> PathData { - PathData { - device: self.device, - location: self.location, - path: self.local_path.clone(), - } - } - /// Validate that this path exists and is accessible pub async fn validate(&self, db: &PrismaClient) -> Result<(), SdPathError> { // Check device exists