Skip to content

Commit

Permalink
fixed up some shared types
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiepine committed Dec 19, 2024
1 parent 066cbc5 commit 3435421
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 185 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions core/crates/heavy-lifting/src/file_identifier/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
file_identifier,
job_system::{
job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus},
report::ReportOutputMetadata,
utils::cancel_pending_tasks,
DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks,
},
Expand All @@ -13,7 +12,7 @@ use crate::{

use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::{file_path_for_file_identifier, CasId};

use sd_core_shared_types::jobs::ReportOutputMetadata;
use sd_prisma::{
prisma::{device, file_path, location, SortOrder},
prisma_sync,
Expand Down
30 changes: 6 additions & 24 deletions core/crates/heavy-lifting/src/indexer/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use crate::{
job::{
Job, JobContext, JobName, JobReturn, JobTaskDispatcher, ProgressUpdate, ReturnStatus,
},
report::ReportOutputMetadata,
utils::cancel_pending_tasks,
DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks,
},
utils::sub_path::get_full_path_from_sub_path,
Error, LocationScanState, NonCriticalError, OuterContext,
};

use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_indexer_rules::{IndexerRule, IndexerRuler};
use sd_core_prisma_helpers::location_with_indexer_rules;
use sd_core_shared_types::jobs::ReportOutputMetadata;

use sd_prisma::{
prisma::{device, location},
Expand All @@ -40,7 +39,6 @@ use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::future::TryJoin;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::{debug, instrument, trace, warn, Level};

Expand Down Expand Up @@ -980,28 +978,12 @@ impl From<Metadata> for Vec<ReportOutputMetadata> {
removed_count,
}: Metadata,
) -> Self {
mean_scan_read_time /= u32::max(total_walk_tasks, 1); // To avoid division by zero
mean_db_write_time /= total_save_tasks + total_update_tasks + 1; // +1 to update directories sizes
mean_db_write_time.normalize();
mean_scan_read_time.normalize();

vec![
ReportOutputMetadata::Indexer {
total_paths: u64_to_frontend(total_paths),
},
ReportOutputMetadata::Metrics(HashMap::from([
("mean_scan_read_time".into(), json!(mean_scan_read_time)),
("mean_db_write_time".into(), json!(mean_db_write_time)),
("total_tasks".into(), json!(total_tasks)),
("completed_tasks".into(), json!(completed_tasks)),
("total_paths".into(), json!(total_paths)),
("total_updated_paths".into(), json!(total_updated_paths)),
("total_walk_tasks".into(), json!(total_walk_tasks)),
("total_save_tasks".into(), json!(total_save_tasks)),
("total_update_tasks".into(), json!(total_update_tasks)),
("indexed_count".into(), json!(indexed_count)),
("updated_count".into(), json!(updated_count)),
("removed_count".into(), json!(removed_count)),
])),
]
vec![ReportOutputMetadata::Indexer {
total_paths: (total_paths, total_updated_paths),
}]
}
}

Expand Down
14 changes: 6 additions & 8 deletions core/crates/heavy-lifting/src/job_system/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ use std::{
time::Duration,
};

use super::{
error::DispatcherError,
report::{Report, ReportBuilder, ReportInputMetadata, ReportMetadata, Status},
Command, JobId, JobSystemError, SerializableJob, SerializedTasks,
};
use async_channel as chan;
use chrono::{DateTime, Utc};
use futures::{stream, Future, FutureExt, StreamExt};
use futures_concurrency::{
future::{Join, TryJoin},
stream::Merge,
};
use sd_core_shared_types::jobs::ReportOutputMetadata;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use specta::Type;
Expand All @@ -39,14 +45,6 @@ use tokio::{
use tracing::{debug, error, instrument, trace, warn, Instrument, Level};
use uuid::Uuid;

use super::{
error::DispatcherError,
report::{
Report, ReportBuilder, ReportInputMetadata, ReportMetadata, ReportOutputMetadata, Status,
},
Command, JobId, JobSystemError, SerializableJob, SerializedTasks,
};

#[derive(
Debug, Serialize, Deserialize, EnumString, Display, Clone, Copy, Type, Hash, PartialEq, Eq,
)]
Expand Down
52 changes: 4 additions & 48 deletions core/crates/heavy-lifting/src/job_system/report.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::NonCriticalError;

use sd_prisma::prisma::{file_path, job, location, PrismaClient};
use sd_prisma::prisma::{job, location, PrismaClient};
use sd_utils::db::{maybe_missing, MissingFieldError};

use std::{collections::HashMap, fmt, path::PathBuf, str::FromStr};
use std::{fmt, path::PathBuf, str::FromStr};

use chrono::{DateTime, Utc};
use prisma_client_rust::QueryError;
Expand All @@ -13,6 +13,8 @@ use strum::ParseError;

use super::{job::JobName, JobId};

use sd_core_shared_types::jobs::ReportOutputMetadata;

#[derive(thiserror::Error, Debug)]
pub enum ReportError {
#[error("failed to create job report in database: {0}")]
Expand Down Expand Up @@ -67,52 +69,6 @@ pub enum ReportInputMetadata {
SubPath(PathBuf),
}

#[derive(Debug, Serialize, Deserialize, Type, Clone)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type", content = "data")]
pub enum ReportOutputMetadata {
Metrics(HashMap<String, serde_json::Value>),
Indexer {
total_paths: (u32, u32),
},
FileIdentifier {
total_orphan_paths: (u32, u32),
total_objects_created: (u32, u32),
total_objects_linked: (u32, u32),
},
MediaProcessor {
media_data_extracted: (u32, u32),
media_data_skipped: (u32, u32),
thumbnails_generated: (u32, u32),
thumbnails_skipped: (u32, u32),
},
Copier {
source_location_id: location::id::Type,
target_location_id: location::id::Type,
sources_file_path_ids: Vec<file_path::id::Type>,
target_location_relative_directory_path: PathBuf,
},
Mover {
source_location_id: location::id::Type,
target_location_id: location::id::Type,
sources_file_path_ids: Vec<file_path::id::Type>,
target_location_relative_directory_path: PathBuf,
},
Deleter {
location_id: location::id::Type,
file_path_ids: Vec<file_path::id::Type>,
},
Eraser {
location_id: location::id::Type,
file_path_ids: Vec<file_path::id::Type>,
passes: u32,
},
FileValidator {
location_id: location::id::Type,
sub_path: Option<PathBuf>,
},
}

impl From<ReportInputMetadata> for ReportMetadata {
fn from(value: ReportInputMetadata) -> Self {
Self::Input(value)
Expand Down
4 changes: 2 additions & 2 deletions core/crates/heavy-lifting/src/job_system/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{Error, JobContext};

use sd_prisma::prisma::location;
use sd_task_system::BaseTaskDispatcher;
use sd_utils::error::FileIOError;

Expand Down Expand Up @@ -31,10 +30,11 @@ use uuid::Uuid;

use super::{
job::{DynJob, JobHandle, JobName, JobOutput, OuterContext, ReturnStatus},
report::{self, ReportOutputMetadata},
report::{self},
store::{StoredJob, StoredJobEntry},
Command, JobId, JobSystemError, SerializedTasks,
};
use sd_core_shared_types::jobs::ReportOutputMetadata;

const JOBS_INITIAL_CAPACITY: usize = 32;
const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60);
Expand Down
3 changes: 1 addition & 2 deletions core/crates/heavy-lifting/src/media_processor/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
job_system::{
job::{Job, JobReturn, JobTaskDispatcher, ReturnStatus},
report::ReportOutputMetadata,
utils::cancel_pending_tasks,
DispatcherError, JobErrorOrDispatcherError, SerializableJob, SerializedTasks,
},
Expand All @@ -13,6 +12,7 @@ use crate::{
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_for_media_processor;

use sd_core_shared_types::jobs::ReportOutputMetadata;
use sd_file_ext::extensions::Extension;
use sd_prisma::{
prisma::{location, PrismaClient},
Expand All @@ -24,7 +24,6 @@ use sd_task_system::{
TaskOutput, TaskStatus, TaskSystemError,
};
use sd_utils::{db::maybe_missing, u64_to_frontend};

use std::{
collections::{HashMap, HashSet},
fmt,
Expand Down
1 change: 1 addition & 0 deletions core/crates/shared-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ version = "0.1.0"
prisma-client-rust = { workspace = true }
sd-prisma = { path = "../../../crates/prisma" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
specta = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
66 changes: 66 additions & 0 deletions core/crates/shared-types/src/jobs/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::sd_path::SdPath;
use serde::{Deserialize, Serialize};
use specta::Type;
/// A single file copy operation, whether successful or failed
#[derive(Debug, Serialize, Deserialize, Type, Clone)]
pub struct CopyOperation {
/// Source path of the file
pub source: SdPath,
/// Target path where the file was/should be copied to
pub target: SdPath,
/// Size of the file in bytes
pub size: u64,
/// Whether this was a cross-device copy
pub cross_device: bool,
/// Time taken for the copy in milliseconds (None if failed)
pub duration_ms: Option<u64>,
/// Error message if the copy failed (None if successful)
pub error: Option<String>,
}

/// Statistics and results from a copy operation
#[derive(Debug, Serialize, Deserialize, Type, Clone)]
pub struct CopyStats {
/// Total number of files to copy
pub total_files: u32,
/// Total bytes to copy
pub total_bytes: u64,
/// Number of files successfully copied
pub completed_files: u32,
/// Number of bytes successfully copied
pub completed_bytes: u64,
/// Average speed in bytes per second
pub speed: u64,
/// List of successful copy operations
pub successful: Vec<CopyOperation>,
/// List of failed copy operations
pub failed: Vec<CopyOperation>,
}

impl Default for CopyStats {
fn default() -> Self {
Self {
total_files: 0,
total_bytes: 0,
completed_files: 0,
completed_bytes: 0,
speed: 0,
successful: Vec::new(),
failed: Vec::new(),
}
}
}

impl CopyStats {
pub fn files_skipped(&self) -> u32 {
self.total_files - (self.completed_files + self.failed.len() as u32)
}

pub fn successful_operations(&self) -> impl Iterator<Item = &CopyOperation> {
self.successful.iter()
}

pub fn failed_operations(&self) -> impl Iterator<Item = &CopyOperation> {
self.failed.iter()
}
}
50 changes: 50 additions & 0 deletions core/crates/shared-types/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
pub mod copy;
use crate::jobs::copy::CopyStats;
use serde::{Deserialize, Serialize};
use serde_json;
use specta::Type;
use std::{collections::HashMap, path::PathBuf};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, Type, Clone)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "type", content = "data")]
pub enum ReportOutputMetadata {
Metrics(HashMap<String, serde_json::Value>),
Indexer {
total_paths: (u32, u32),
},
FileIdentifier {
total_orphan_paths: (u32, u32),
total_objects_created: (u32, u32),
total_objects_linked: (u32, u32),
},
MediaProcessor {
media_data_extracted: (u32, u32),
media_data_skipped: (u32, u32),
thumbnails_generated: (u32, u32),
thumbnails_skipped: (u32, u32),
},
Copier(CopyStats),
Deleter {
location_id: Uuid,
file_path_ids: Vec<Uuid>,
},
FileValidator {
location_id: Uuid,
sub_path: Option<PathBuf>,
},

// DEPRECATED
Mover {
source_location_id: Uuid,
target_location_id: Uuid,
sources_file_path_ids: Vec<Uuid>,
target_location_relative_directory_path: PathBuf,
},

Eraser {
location_id: Uuid,
file_path_ids: Vec<Uuid>,
passes: u32,
},
}
Loading

0 comments on commit 3435421

Please sign in to comment.