From 231676c47b4d12747cfc1bb1b5ae9c71eaee4896 Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Mon, 29 Jul 2024 12:36:30 -0500 Subject: [PATCH] Remove ClientOperationId and move all to OperationId This is in prep to support generic layering of workers & schedulers. We can no longer know if the source is a client or just an up-stream-component, so we need to fall back to just using OperationId. towards #1213 --- nativelink-scheduler/src/action_scheduler.rs | 8 +- .../src/awaited_action_db/awaited_action.rs | 3 +- .../src/awaited_action_db/mod.rs | 6 +- .../src/cache_lookup_scheduler.rs | 18 +- .../src/default_action_listener.rs | 8 +- nativelink-scheduler/src/grpc_scheduler.rs | 19 +- .../src/memory_awaited_action_db.rs | 23 +- .../src/property_modifier_scheduler.rs | 6 +- nativelink-scheduler/src/simple_scheduler.rs | 18 +- .../src/simple_scheduler_state_manager.rs | 11 +- .../tests/action_messages_test.rs | 10 +- .../tests/cache_lookup_scheduler_test.rs | 16 +- .../tests/property_modifier_scheduler_test.rs | 37 ++-- .../tests/simple_scheduler_test.rs | 116 +++++----- .../tests/utils/mock_scheduler.rs | 14 +- nativelink-service/src/execution_server.rs | 29 ++- nativelink-service/src/worker_api_server.rs | 5 +- .../tests/worker_api_server_test.rs | 2 +- nativelink-util/BUILD.bazel | 1 + nativelink-util/src/action_messages.rs | 202 +++++------------- .../src/operation_state_manager.rs | 6 +- nativelink-util/tests/operation_id_tests.rs | 121 ++--------- nativelink-worker/src/local_worker.rs | 18 +- .../src/running_actions_manager.rs | 9 +- nativelink-worker/tests/local_worker_test.rs | 2 +- .../tests/running_actions_manager_test.rs | 55 ++--- 26 files changed, 260 insertions(+), 503 deletions(-) diff --git a/nativelink-scheduler/src/action_scheduler.rs b/nativelink-scheduler/src/action_scheduler.rs index c23ed996f5..ae583f36db 100644 --- a/nativelink-scheduler/src/action_scheduler.rs +++ b/nativelink-scheduler/src/action_scheduler.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use futures::Future; use nativelink_error::Error; use nativelink_metric::RootMetricsComponent; -use nativelink_util::action_messages::{ActionInfo, ActionState, ClientOperationId}; +use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId}; use crate::platform_property_manager::PlatformPropertyManager; @@ -27,7 +27,7 @@ use crate::platform_property_manager::PlatformPropertyManager; /// that are interested in the state of an action. pub trait ActionListener: Sync + Send + Unpin { /// Returns the client operation id. - fn client_operation_id(&self) -> &ClientOperationId; + fn client_operation_id(&self) -> &OperationId; /// Waits for the action state to change. fn changed( @@ -48,13 +48,13 @@ pub trait ActionScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static /// Adds an action to the scheduler for remote execution. async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: ActionInfo, ) -> Result>, Error>; /// Find an existing action by its name. async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error>; } diff --git a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs index c9e9fd4861..5437eee1f8 100644 --- a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs +++ b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs @@ -86,7 +86,8 @@ impl AwaitedAction { ); let state = Arc::new(ActionState { stage, - id: operation_id.clone(), + operation_id: operation_id.clone(), + action_digest: action_info.unique_qualifier.digest(), }); Self { version: AwaitedActionVersion(0), diff --git a/nativelink-scheduler/src/awaited_action_db/mod.rs b/nativelink-scheduler/src/awaited_action_db/mod.rs index 6d5576363a..c8560ae4b6 100644 --- a/nativelink-scheduler/src/awaited_action_db/mod.rs +++ b/nativelink-scheduler/src/awaited_action_db/mod.rs @@ -20,7 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey}; use futures::{Future, Stream}; use nativelink_error::Error; use nativelink_metric::MetricsComponent; -use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId}; +use nativelink_util::action_messages::{ActionInfo, OperationId}; mod awaited_action; @@ -80,7 +80,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static { /// Get the AwaitedAction by the client operation id. fn get_awaited_action_by_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> impl Future, Error>> + Send + Sync; /// Get all AwaitedActions. This call should be avoided as much as possible. @@ -117,7 +117,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static { /// to changes. fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> impl Future> + Send + Sync; } diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index fb09b9db43..62c3ab9136 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -26,8 +26,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::grpc_store::GrpcStore; use nativelink_util::action_messages::{ - ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, - ClientOperationId, OperationId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId, }; use nativelink_util::background_spawn; use nativelink_util::common::DigestInfo; @@ -48,7 +47,7 @@ use crate::platform_property_manager::PlatformPropertyManager; type CheckActions = HashMap< ActionUniqueKey, Vec<( - ClientOperationId, + OperationId, oneshot::Sender>, Error>>, )>, >; @@ -98,7 +97,7 @@ type ActionListenerOneshot = oneshot::Receiver, unique_qualifier: &ActionUniqueKey, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Option { inflight_cache_checks .get_mut(unique_qualifier) @@ -109,12 +108,12 @@ fn subscribe_to_existing_action( }) } struct CachedActionListener { - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_state: Arc, } impl ActionListener for CachedActionListener { - fn client_operation_id(&self) -> &ClientOperationId { + fn client_operation_id(&self) -> &OperationId { &self.client_operation_id } @@ -148,7 +147,7 @@ impl ActionScheduler for CacheLookupScheduler { async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: ActionInfo, ) -> Result>, Error> { let unique_key = match &action_info.unique_qualifier { @@ -241,8 +240,9 @@ impl ActionScheduler for CacheLookupScheduler { return; // Nobody is waiting for this action anymore. }; let action_state = Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::CompletedFromCache(action_result), + action_digest: action_info.unique_qualifier.digest(), }); for (client_operation_id, pending_tx) in pending_txs { // Ignore errors here, as the other end may have hung up. @@ -305,7 +305,7 @@ impl ActionScheduler for CacheLookupScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { self.action_scheduler .find_by_client_operation_id(client_operation_id) diff --git a/nativelink-scheduler/src/default_action_listener.rs b/nativelink-scheduler/src/default_action_listener.rs index ec399790ab..6fcf6cba86 100644 --- a/nativelink-scheduler/src/default_action_listener.rs +++ b/nativelink-scheduler/src/default_action_listener.rs @@ -17,20 +17,20 @@ use std::sync::Arc; use futures::Future; use nativelink_error::{make_err, Code, Error}; -use nativelink_util::action_messages::{ActionState, ClientOperationId}; +use nativelink_util::action_messages::{ActionState, OperationId}; use tokio::sync::watch; use crate::action_scheduler::ActionListener; /// Simple implementation of ActionListener using tokio's watch. pub struct DefaultActionListener { - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_state: watch::Receiver>, } impl DefaultActionListener { pub fn new( - client_operation_id: ClientOperationId, + client_operation_id: OperationId, mut action_state: watch::Receiver>, ) -> Self { action_state.mark_changed(); @@ -54,7 +54,7 @@ impl DefaultActionListener { } impl ActionListener for DefaultActionListener { - fn client_operation_id(&self) -> &ClientOperationId { + fn client_operation_id(&self) -> &OperationId { &self.client_operation_id } diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index 0f24aa07b9..429822413e 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -30,12 +30,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_proto::google::longrunning::Operation; use nativelink_util::action_messages::{ - ActionInfo, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, - OperationId, DEFAULT_EXECUTION_PRIORITY, + ActionInfo, ActionState, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY, }; -use nativelink_util::common::DigestInfo; use nativelink_util::connection_manager::ConnectionManager; -use nativelink_util::digest_hasher::DigestHasherFunc; use nativelink_util::retry::{Retrier, RetryResult}; use nativelink_util::{background_spawn, tls_utils}; use parking_lot::Mutex; @@ -126,16 +123,10 @@ impl GrpcScheduler { .await .err_tip(|| "Recieving response from upstream scheduler")? { - let client_operation_id = - ClientOperationId::from_raw_string(initial_response.name.clone()); + let client_operation_id = OperationId::from_raw_string(initial_response.name.clone()); // Our operation_id is not needed here is just a place holder to recycle existing object. // The only thing that actually matters is the operation_id. - let operation_id = - OperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { - instance_name: "dummy_instance_name".to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::zero_digest(), - })); + let operation_id = OperationId::default(); let action_state = ActionState::try_from_operation(initial_response, operation_id.clone()) .err_tip(|| "In GrpcScheduler::stream_state")?; @@ -243,7 +234,7 @@ impl ActionScheduler for GrpcScheduler { async fn add_action( &self, - _client_operation_id: ClientOperationId, + _client_operation_id: OperationId, action_info: ActionInfo, ) -> Result>, Error> { let execution_policy = if action_info.priority == DEFAULT_EXECUTION_PRIORITY { @@ -289,7 +280,7 @@ impl ActionScheduler for GrpcScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { let request = WaitExecutionRequest { name: client_operation_id.to_string(), diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 49cde1281f..57f7e85f71 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -24,8 +24,7 @@ use nativelink_config::stores::EvictionPolicy; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::action_messages::{ - ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, - ClientOperationId, OperationId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId, }; use nativelink_util::chunked_stream::ChunkedStream; use nativelink_util::evicting_map::{EvictingMap, LenEntry}; @@ -99,7 +98,7 @@ impl LenEntry for ClientAwaitedAction { /// Actions the AwaitedActionsDb needs to process. pub(crate) enum ActionEvent { /// A client has sent a keep alive message. - ClientKeepAlive(ClientOperationId), + ClientKeepAlive(OperationId), /// A client has dropped and pointed to OperationId. ClientDroppedOperation(OperationId), } @@ -108,7 +107,7 @@ pub(crate) enum ActionEvent { /// keep alive config and state. struct ClientInfo I> { /// The client operation id. - client_operation_id: ClientOperationId, + client_operation_id: OperationId, /// The last time a keep alive was sent. last_keep_alive: I, /// The function to get the current time. @@ -136,7 +135,7 @@ impl I> MemoryAwaitedActionSubscriber, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, event_tx: mpsc::UnboundedSender, now_fn: NowFn, ) -> Self @@ -351,7 +350,7 @@ impl SortedAwaitedActions { pub struct AwaitedActionDbImpl I> { /// A lookup table to lookup the state of an action by its client operation id. #[metric(group = "client_operation_ids")] - client_operation_to_awaited_action: EvictingMap, I>, + client_operation_to_awaited_action: EvictingMap, I>, /// A lookup table to lookup the state of an action by its worker operation id. #[metric(group = "operation_ids")] @@ -381,7 +380,7 @@ pub struct AwaitedActionDbImpl I> { impl I + Clone + Send + Sync> AwaitedActionDbImpl { async fn get_awaited_action_by_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>, Error> { let maybe_client_awaited_action = self .client_operation_to_awaited_action @@ -710,7 +709,7 @@ impl I + Clone + Send + Sync> AwaitedActionDbI async fn add_action( &mut self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> Result, Error> { // Check to see if the action is already known and subscribe if it is. @@ -732,7 +731,7 @@ impl I + Clone + Send + Sync> AwaitedActionDbI ActionUniqueQualifier::Cachable(unique_key) => Some(unique_key.clone()), ActionUniqueQualifier::Uncachable(_unique_key) => None, }; - let operation_id = OperationId::new(action_info.unique_qualifier.clone()); + let operation_id = OperationId::default(); let awaited_action = AwaitedAction::new(operation_id.clone(), action_info); debug_assert!( ActionStage::Queued == awaited_action.state().stage, @@ -782,7 +781,7 @@ impl I + Clone + Send + Sync> AwaitedActionDbI async fn try_subscribe( &mut self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, unique_qualifier: &ActionUniqueQualifier, // TODO(allada) To simplify the scheduler 2024 refactor, we // removed the ability to upgrade priorities of actions. @@ -894,7 +893,7 @@ impl I + Clone + Send + Sync + 'static> Awaite async fn get_awaited_action_by_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result, Error> { self.inner .lock() @@ -980,7 +979,7 @@ impl I + Clone + Send + Sync + 'static> Awaite async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> Result { self.inner diff --git a/nativelink-scheduler/src/property_modifier_scheduler.rs b/nativelink-scheduler/src/property_modifier_scheduler.rs index 723d4d1deb..bbebd92eed 100644 --- a/nativelink-scheduler/src/property_modifier_scheduler.rs +++ b/nativelink-scheduler/src/property_modifier_scheduler.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use nativelink_config::schedulers::{PropertyModification, PropertyType}; use nativelink_error::{Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; -use nativelink_util::action_messages::{ActionInfo, ClientOperationId}; +use nativelink_util::action_messages::{ActionInfo, OperationId}; use parking_lot::Mutex; use crate::action_scheduler::{ActionListener, ActionScheduler}; @@ -93,7 +93,7 @@ impl ActionScheduler for PropertyModifierScheduler { async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, mut action_info: ActionInfo, ) -> Result>, Error> { let platform_property_manager = self @@ -122,7 +122,7 @@ impl ActionScheduler for PropertyModifierScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { self.scheduler .find_by_client_operation_id(client_operation_id) diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index 3eb1983112..308a63b2a6 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -22,7 +22,7 @@ use nativelink_config::stores::EvictionPolicy; use nativelink_error::{Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::action_messages::{ - ActionInfo, ActionStage, ActionState, ClientOperationId, OperationId, WorkerId, + ActionInfo, ActionStage, ActionState, OperationId, WorkerId, }; use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::operation_state_manager::{ @@ -57,13 +57,13 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u32 = 60; const DEFAULT_MAX_JOB_RETRIES: usize = 3; struct SimpleSchedulerActionListener { - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_state_result: Box, } impl SimpleSchedulerActionListener { fn new( - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_state_result: Box, ) -> Self { Self { @@ -74,7 +74,7 @@ impl SimpleSchedulerActionListener { } impl ActionListener for SimpleSchedulerActionListener { - fn client_operation_id(&self) -> &ClientOperationId { + fn client_operation_id(&self) -> &OperationId { &self.client_operation_id } @@ -127,7 +127,7 @@ impl SimpleScheduler { /// value. async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> Result>, Error> { let add_action_result = self @@ -143,7 +143,7 @@ impl SimpleScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { let filter = OperationFilter { client_operation_id: Some(client_operation_id.clone()), @@ -205,7 +205,7 @@ impl SimpleScheduler { .as_state() .await .err_tip(|| "Failed to get action_info from as_state_result stream")?; - action_state.id.clone() + action_state.operation_id.clone() }; // Tell the matching engine that the operation is being assigned to a worker. @@ -366,7 +366,7 @@ impl ActionScheduler for SimpleScheduler { async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: ActionInfo, ) -> Result>, Error> { self.add_action(client_operation_id, Arc::new(action_info)) @@ -375,7 +375,7 @@ impl ActionScheduler for SimpleScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { let maybe_receiver = self .find_by_client_operation_id(client_operation_id) diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index 60f7d38827..e4cb3cdbcb 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -20,8 +20,8 @@ use futures::{future, stream, StreamExt, TryStreamExt}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::action_messages::{ - ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ClientOperationId, - ExecutionMetadata, OperationId, WorkerId, + ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata, + OperationId, WorkerId, }; use nativelink_util::operation_state_manager::{ ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager, @@ -249,7 +249,8 @@ impl SimpleSchedulerStateManager { } awaited_action.set_state(Arc::new(ActionState { stage, - id: operation_id.clone(), + operation_id: operation_id.clone(), + action_digest: awaited_action.action_info().digest(), })); awaited_action.increment_version(); @@ -285,7 +286,7 @@ impl SimpleSchedulerStateManager { async fn inner_add_operation( &self, - new_client_operation_id: ClientOperationId, + new_client_operation_id: OperationId, action_info: Arc, ) -> Result { let rx = self @@ -411,7 +412,7 @@ impl SimpleSchedulerStateManager { impl ClientStateManager for SimpleSchedulerStateManager { async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> Result, Error> { let sub = self diff --git a/nativelink-scheduler/tests/action_messages_test.rs b/nativelink-scheduler/tests/action_messages_test.rs index e1ae1e4445..c5e92164a1 100644 --- a/nativelink-scheduler/tests/action_messages_test.rs +++ b/nativelink-scheduler/tests/action_messages_test.rs @@ -22,7 +22,7 @@ use nativelink_proto::google::longrunning::{operation, Operation}; use nativelink_proto::google::rpc::Status; use nativelink_util::action_messages::{ ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, - ClientOperationId, ExecutionMetadata, OperationId, + ExecutionMetadata, OperationId, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::DigestHasherFunc; @@ -35,12 +35,14 @@ async fn action_state_any_url_test() -> Result<(), Error> { digest_function: DigestHasherFunc::Sha256, digest: DigestInfo::new([1u8; 32], 5), }); - let client_id = ClientOperationId::new(unique_qualifier.clone()); - let operation_id = OperationId::new(unique_qualifier); + let action_digest = unique_qualifier.digest(); + let client_id = OperationId::default(); + let operation_id = OperationId::default(); let action_state = ActionState { - id: operation_id.clone(), + operation_id: operation_id.clone(), // Result is only populated if has_action_result. stage: ActionStage::Completed(ActionResult::default()), + action_digest, }; let operation: Operation = action_state.as_operation(client_id); diff --git a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs index ab3a79c5fc..0567ca2c19 100644 --- a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs +++ b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs @@ -31,11 +31,9 @@ use nativelink_scheduler::default_action_listener::DefaultActionListener; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; use nativelink_store::memory_store::MemoryStore; use nativelink_util::action_messages::{ - ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, - ClientOperationId, OperationId, + ActionResult, ActionStage, ActionState, ActionUniqueQualifier, OperationId, }; use nativelink_util::common::DigestInfo; -use nativelink_util::digest_hasher::DigestHasherFunc; use nativelink_util::store_trait::{Store, StoreLike}; use pretty_assertions::assert_eq; use prost::Message; @@ -95,15 +93,16 @@ async fn add_action_handles_skip_cache() -> Result<(), Error> { .await?; let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let ActionUniqueQualifier::Cachable(action_key) = action_info.unique_qualifier.clone() else { panic!("This test should be testing when item was cached first"); }; let mut skip_cache_action = action_info.clone(); skip_cache_action.unique_qualifier = ActionUniqueQualifier::Uncachable(action_key); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let _ = join!( context .cache_scheduler @@ -121,12 +120,7 @@ async fn add_action_handles_skip_cache() -> Result<(), Error> { #[nativelink_test] async fn find_by_client_operation_id_call_passed() -> Result<(), Error> { let context = make_cache_scheduler()?; - let client_operation_id = - ClientOperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { - instance_name: "instance".to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([8; 32], 1), - })); + let client_operation_id = OperationId::default(); let (actual_result, actual_client_id) = join!( context .cache_scheduler diff --git a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs index 8265e39ea1..74dde4f9a6 100644 --- a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs +++ b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs @@ -29,12 +29,8 @@ use nativelink_scheduler::action_scheduler::ActionScheduler; use nativelink_scheduler::default_action_listener::DefaultActionListener; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; use nativelink_scheduler::property_modifier_scheduler::PropertyModifierScheduler; -use nativelink_util::action_messages::{ - ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, - OperationId, -}; +use nativelink_util::action_messages::{ActionStage, ActionState, OperationId}; use nativelink_util::common::DigestInfo; -use nativelink_util::digest_hasher::DigestHasherFunc; use nativelink_util::platform_properties::PlatformPropertyValue; use pretty_assertions::assert_eq; use tokio::sync::watch; @@ -73,14 +69,15 @@ async fn add_action_adds_property() -> Result<(), Error> { let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( name.clone(), PropertyType::exact, )]))); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let (_, _, (passed_client_operation_id, action_info)) = join!( context .modifier_scheduler @@ -120,14 +117,15 @@ async fn add_action_overwrites_property() -> Result<(), Error> { .insert(name.clone(), PlatformPropertyValue::Unknown(original_value)); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( name.clone(), PropertyType::exact, )]))); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let (_, _, (passed_client_operation_id, action_info)) = join!( context .modifier_scheduler @@ -164,14 +162,15 @@ async fn add_action_property_added_after_remove() -> Result<(), Error> { let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( name.clone(), PropertyType::exact, )]))); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let (_, _, (passed_client_operation_id, action_info)) = join!( context .modifier_scheduler @@ -208,14 +207,15 @@ async fn add_action_property_remove_after_add() -> Result<(), Error> { let action_info = make_base_action_info(UNIX_EPOCH, DigestInfo::zero_digest()); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::from([( name, PropertyType::exact, )]))); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let (_, _, (passed_client_operation_id, action_info)) = join!( context .modifier_scheduler @@ -250,11 +250,12 @@ async fn add_action_property_remove() -> Result<(), Error> { .insert(name, PlatformPropertyValue::Unknown(value)); let (_forward_watch_channel_tx, forward_watch_channel_rx) = watch::channel(Arc::new(ActionState { - id: OperationId::new(action_info.unique_qualifier.clone()), + operation_id: OperationId::default(), stage: ActionStage::Queued, + action_digest: action_info.unique_qualifier.digest(), })); let platform_property_manager = Arc::new(PlatformPropertyManager::new(HashMap::new())); - let client_operation_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_operation_id = OperationId::default(); let (_, _, (passed_client_operation_id, action_info)) = join!( context .modifier_scheduler @@ -280,11 +281,7 @@ async fn add_action_property_remove() -> Result<(), Error> { #[nativelink_test] async fn find_by_client_operation_id_call_passed() -> Result<(), Error> { let context = make_modifier_scheduler(vec![]); - let operation_id = ClientOperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey { - instance_name: "instance".to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: DigestInfo::new([8; 32], 1), - })); + let operation_id = OperationId::default(); let (actual_result, actual_operation_id) = join!( context .modifier_scheduler diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index d749562222..2f681f81e6 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -32,12 +32,10 @@ use nativelink_scheduler::simple_scheduler::SimpleScheduler; use nativelink_scheduler::worker::Worker; use nativelink_scheduler::worker_scheduler::WorkerScheduler; use nativelink_util::action_messages::{ - ActionResult, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, - ClientOperationId, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, OperationId, - SymlinkInfo, WorkerId, INTERNAL_ERROR_EXIT_CODE, + ActionResult, ActionStage, ActionState, DirectoryInfo, ExecutionMetadata, FileInfo, NameOrPath, + OperationId, SymlinkInfo, WorkerId, INTERNAL_ERROR_EXIT_CODE, }; use nativelink_util::common::DigestInfo; -use nativelink_util::digest_hasher::DigestHasherFunc; use nativelink_util::instant_wrapper::MockInstantWrapped; use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue}; use pretty_assertions::assert_eq; @@ -136,7 +134,7 @@ async fn setup_action( ) -> Result>, Error> { let mut action_info = make_base_action_info(insert_timestamp, action_digest); action_info.platform_properties = platform_properties; - let client_id = ClientOperationId::new(action_info.unique_qualifier.clone()); + let client_id = OperationId::default(); let result = scheduler.add_action(client_id, action_info).await; tokio::task::yield_now().await; // Allow task<->worker matcher to run. result @@ -190,8 +188,9 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -254,8 +253,9 @@ async fn find_executing_action() -> Result<(), Error> { let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -328,7 +328,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err .expect("`update` should be set on UpdateForWorker"); let (operation_id, rx_start_execute) = match update_for_worker { update_for_worker::Update::StartAction(start_execute) => ( - OperationId::try_from(start_execute.operation_id.as_str()).unwrap(), + OperationId::from(start_execute.operation_id.as_str()), start_execute, ), v => panic!("Expected StartAction, got : {v:?}"), @@ -347,7 +347,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err .expect("`update` should be set on UpdateForWorker"); let (operation_id, rx_start_execute) = match update_for_worker { update_for_worker::Update::StartAction(start_execute) => ( - OperationId::try_from(start_execute.operation_id.as_str()).unwrap(), + OperationId::from(start_execute.operation_id.as_str()), start_execute, ), v => panic!("Expected StartAction, got : {v:?}"), @@ -460,7 +460,7 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> // Other tests check full data. We only care if we got StartAction. let operation_id = match rx_from_worker.recv().await.unwrap().update { Some(update_for_worker::Update::StartAction(start_execute)) => { - OperationId::try_from(start_execute.operation_id.as_str()).unwrap() + OperationId::from(start_execute.operation_id) } v => panic!("Expected StartAction, got : {v:?}"), }; @@ -491,8 +491,9 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Queued, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -506,8 +507,9 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -553,8 +555,9 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Queued, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -582,8 +585,9 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -608,15 +612,11 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { ); let action_digest = DigestInfo::new([99u8; 32], 512); - let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { - instance_name: String::new(), - digest: DigestInfo::zero_digest(), - digest_function: DigestHasherFunc::Sha256, - }); - let id = OperationId::new(unique_qualifier); + let operation_id = OperationId::default(); let mut expected_action_state = ActionState { - id, + operation_id, stage: ActionStage::Queued, + action_digest, }; let insert_timestamp1 = make_system_time(1); @@ -641,7 +641,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { let action_state1 = client1_action_listener.changed().await.unwrap(); let action_state2 = client2_action_listener.changed().await.unwrap(); // Name is random so we set force it to be the same. - expected_action_state.id = action_state1.id.clone(); + expected_action_state.operation_id = action_state1.operation_id.clone(); assert_eq!(action_state1.as_ref(), &expected_action_state); assert_eq!(action_state2.as_ref(), &expected_action_state); } @@ -731,8 +731,9 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Queued, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -800,7 +801,7 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { )), } ); - OperationId::try_from(operation_id.as_str()).unwrap() + OperationId::from(operation_id) }; { @@ -809,8 +810,9 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { assert_eq!( action_state.as_ref(), &ActionState { - id: operation_id.clone(), + operation_id: operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, } ); } @@ -841,8 +843,9 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { assert_eq!( action_state.as_ref(), &ActionState { - id: operation_id.clone(), + operation_id: operation_id.clone(), stage: ActionStage::Executing, + action_digest: action_state.action_digest, } ); } @@ -939,7 +942,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err scheduler .update_action( &worker_id, - &OperationId::try_from(operation_id.as_str())?, + &OperationId::from(operation_id), Ok(ActionStage::Completed(action_result.clone())), ) .await?; @@ -949,8 +952,9 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Completed(action_result), + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -992,7 +996,7 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E v => panic!("Expected StartAction, got : {v:?}"), }; // Other tests check full data. We only care if client thinks we are Executing. - OperationId::try_from(operation_id.as_str())? + OperationId::from(operation_id) }; let action_result = ActionResult { @@ -1051,8 +1055,9 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Completed(action_result), + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -1097,11 +1102,6 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { } let _ = setup_new_worker(&scheduler, rogue_worker_id, PlatformProperties::default()).await?; - let action_info_hash_key = ActionUniqueQualifier::Cachable(ActionUniqueKey { - instance_name: INSTANCE_NAME.to_string(), - digest_function: DigestHasherFunc::Sha256, - digest: action_digest, - }); let action_result = ActionResult { output_files: Vec::default(), output_folders: Vec::default(), @@ -1129,7 +1129,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { let update_action_result = scheduler .update_action( &rogue_worker_id, - &OperationId::new(action_info_hash_key), + &OperationId::default(), Ok(ActionStage::Completed(action_result.clone())), ) .await; @@ -1171,15 +1171,11 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro ); let action_digest = DigestInfo::new([99u8; 32], 512); - let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { - instance_name: String::new(), - digest: DigestInfo::zero_digest(), - digest_function: DigestHasherFunc::Sha256, - }); - let id = OperationId::new(unique_qualifier); + let operation_id = OperationId::default(); let mut expected_action_state = ActionState { - id, + operation_id, stage: ActionStage::Executing, + action_digest, }; let insert_timestamp = make_system_time(1); @@ -1216,9 +1212,9 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro // Client should get notification saying it's being executed. let action_state = action_listener.changed().await.unwrap(); // We now know the name of the action so populate it. - expected_action_state.id = action_state.id.clone(); + expected_action_state.operation_id = action_state.operation_id.clone(); assert_eq!(action_state.as_ref(), &expected_action_state); - action_state.id.clone() + action_state.operation_id.clone() }; let action_result = ActionResult { @@ -1279,7 +1275,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro expected_action_state.stage = ActionStage::Executing; let action_state = action_listener.changed().await.unwrap(); // The name of the action changed (since it's a new action), so update it. - expected_action_state.id = action_state.id.clone(); + expected_action_state.operation_id = action_state.operation_id.clone(); assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -1333,7 +1329,7 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> assert_eq!(state_1.stage, ActionStage::Executing); // Second client should be in a queued state. assert_eq!(state_2.stage, ActionStage::Queued); - (state_1.id.clone(), state_2.id.clone()) + (state_1.operation_id.clone(), state_2.operation_id.clone()) }; let action_result = ActionResult { @@ -1373,13 +1369,12 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> { // First action should now be completed. let action_state = client1_action_listener.changed().await.unwrap(); - let mut expected_action_state = ActionState { + let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Completed(action_result.clone()), + action_digest: action_state.action_digest, }; - // We now know the name of the action so populate it. - expected_action_state.id.unique_qualifier = action_state.id.unique_qualifier.clone(); assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -1411,13 +1406,12 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> { // Our second client should be notified it completed. let action_state = client2_action_listener.changed().await.unwrap(); - let mut expected_action_state = ActionState { + let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Completed(action_result.clone()), + action_digest: action_state.action_digest, }; - // We now know the name of the action so populate it. - expected_action_state.id.unique_qualifier = action_state.id.unique_qualifier.clone(); assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -1519,7 +1513,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> action_listener.changed().await.unwrap().stage, ActionStage::Executing ); - OperationId::try_from(operation_id.as_str())? + OperationId::from(operation_id) }; let _ = scheduler @@ -1535,8 +1529,9 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Queued, + action_digest: action_state.action_digest, }; assert_eq!(action_state.as_ref(), &expected_action_state); } @@ -1568,7 +1563,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> let action_state = action_listener.changed().await.unwrap(); let expected_action_state = ActionState { // Name is a random string, so we ignore it and just make it the same. - id: action_state.id.clone(), + operation_id: action_state.operation_id.clone(), stage: ActionStage::Completed(ActionResult { output_files: Vec::default(), output_folders: Vec::default(), @@ -1593,6 +1588,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> error: Some(err.clone()), message: String::new(), }), + action_digest: action_state.action_digest, }; let mut received_state = action_state.as_ref().clone(); if let ActionStage::Completed(stage) = &mut received_state.stage { @@ -1688,7 +1684,7 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), action_listener.changed().await.unwrap().stage, ActionStage::Executing ); - OperationId::try_from(operation_id.as_str())? + OperationId::from(operation_id) }; let _ = scheduler @@ -1769,7 +1765,7 @@ async fn client_reconnect_keeps_action_alive() -> Result<(), Error> { // evicting map. So we constantly ask for some other client // to trigger eviction logic. scheduler - .find_by_client_operation_id(&ClientOperationId::from_raw_string( + .find_by_client_operation_id(&OperationId::from_raw_string( "dummy_client_id".to_string(), )) .await diff --git a/nativelink-scheduler/tests/utils/mock_scheduler.rs b/nativelink-scheduler/tests/utils/mock_scheduler.rs index f878a79f25..5ccc659795 100644 --- a/nativelink-scheduler/tests/utils/mock_scheduler.rs +++ b/nativelink-scheduler/tests/utils/mock_scheduler.rs @@ -20,14 +20,14 @@ use nativelink_error::{make_input_err, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_scheduler::action_scheduler::{ActionListener, ActionScheduler}; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; -use nativelink_util::action_messages::{ActionInfo, ClientOperationId}; +use nativelink_util::action_messages::{ActionInfo, OperationId}; use tokio::sync::{mpsc, Mutex}; #[allow(clippy::large_enum_variant)] enum ActionSchedulerCalls { GetPlatformPropertyManager(String), - AddAction((ClientOperationId, ActionInfo)), - FindExistingAction(ClientOperationId), + AddAction((OperationId, ActionInfo)), + FindExistingAction(OperationId), } enum ActionSchedulerReturns { @@ -85,7 +85,7 @@ impl MockActionScheduler { pub async fn expect_add_action( &self, result: Result>, Error>, - ) -> (ClientOperationId, ActionInfo) { + ) -> (OperationId, ActionInfo) { let mut rx_call_lock = self.rx_call.lock().await; let ActionSchedulerCalls::AddAction(req) = rx_call_lock .recv() @@ -104,7 +104,7 @@ impl MockActionScheduler { pub async fn expect_find_by_client_operation_id( &self, result: Result>>, Error>, - ) -> ClientOperationId { + ) -> OperationId { let mut rx_call_lock = self.rx_call.lock().await; let ActionSchedulerCalls::FindExistingAction(req) = rx_call_lock .recv() @@ -145,7 +145,7 @@ impl ActionScheduler for MockActionScheduler { async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: ActionInfo, ) -> Result>, Error> { self.tx_call @@ -167,7 +167,7 @@ impl ActionScheduler for MockActionScheduler { async fn find_by_client_operation_id( &self, - client_operation_id: &ClientOperationId, + client_operation_id: &OperationId, ) -> Result>>, Error> { self.tx_call .send(ActionSchedulerCalls::FindExistingAction( diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index bf47664dfb..b7e5f8896c 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -32,8 +32,7 @@ use nativelink_scheduler::action_scheduler::{ActionListener, ActionScheduler}; use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::{ - ActionInfo, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId, - DEFAULT_EXECUTION_PRIORITY, + ActionInfo, ActionUniqueKey, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY, }; use nativelink_util::common::DigestInfo; use nativelink_util::digest_hasher::{make_ctx_for_hash_func, DigestHasherFunc}; @@ -44,19 +43,19 @@ use tracing::{error_span, event, instrument, Level}; type InstanceInfoName = String; -struct NativelinkClientOperationId { +struct NativelinkOperationId { instance_name: InstanceInfoName, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, } -impl NativelinkClientOperationId { +impl NativelinkOperationId { fn from_name(name: &str) -> Result { let (instance_name, name) = name .split_once('/') .err_tip(|| "Expected instance_name and name to be separated by '/'")?; Ok(Self { instance_name: instance_name.to_string(), - client_operation_id: ClientOperationId::from_raw_string(name.to_string()), + client_operation_id: OperationId::from_raw_string(name.to_string()), }) } @@ -206,7 +205,7 @@ impl ExecutionServer { } fn to_execute_stream( - nl_client_operation_id: NativelinkClientOperationId, + nl_client_operation_id: NativelinkOperationId, action_listener: Pin>, ) -> Response { let client_operation_id_string = nl_client_operation_id.into_string(); @@ -219,9 +218,8 @@ impl ExecutionServer { match action_listener.changed().await { Ok(action_update) => { event!(Level::INFO, ?action_update, "Execute Resp Stream"); - let client_operation_id = ClientOperationId::from_raw_string( - client_operation_id_string.clone(), - ); + let client_operation_id = + OperationId::from_raw_string(client_operation_id_string.clone()); // If the action is finished we won't be sending any more updates. let maybe_action_listener = if action_update.stage.is_finished() { None @@ -284,15 +282,12 @@ impl ExecutionServer { let action_listener = instance_info .scheduler - .add_action( - ClientOperationId::new(action_info.unique_qualifier.clone()), - action_info, - ) + .add_action(OperationId::default(), action_info) .await .err_tip(|| "Failed to schedule task")?; Ok(Self::to_execute_stream( - NativelinkClientOperationId { + NativelinkOperationId { instance_name, client_operation_id: action_listener.client_operation_id().clone(), }, @@ -305,7 +300,7 @@ impl ExecutionServer { request: Request, ) -> Result, Status> { let (instance_name, client_operation_id) = - NativelinkClientOperationId::from_name(&request.into_inner().name) + NativelinkOperationId::from_name(&request.into_inner().name) .map(|v| (v.instance_name, v.client_operation_id)) .err_tip(|| "Failed to parse operation_id in ExecutionServer::wait_execution")?; let Some(instance_info) = self.instance_infos.get(&instance_name) else { @@ -322,7 +317,7 @@ impl ExecutionServer { return Err(Status::not_found("Failed to find existing task")); }; Ok(Self::to_execute_stream( - NativelinkClientOperationId { + NativelinkOperationId { instance_name, client_operation_id, }, diff --git a/nativelink-service/src/worker_api_server.rs b/nativelink-service/src/worker_api_server.rs index c4e377b99c..99af7efd2c 100644 --- a/nativelink-service/src/worker_api_server.rs +++ b/nativelink-service/src/worker_api_server.rs @@ -199,10 +199,7 @@ impl WorkerApiServer { execute_result: ExecuteResult, ) -> Result, Error> { let worker_id: WorkerId = execute_result.worker_id.try_into()?; - let operation_id = - OperationId::try_from(execute_result.operation_id.as_str()).err_tip(|| { - "Failed to convert operation_id in WorkerApiServer::inner_execution_response" - })?; + let operation_id = OperationId::from(execute_result.operation_id); match execute_result .result diff --git a/nativelink-service/tests/worker_api_server_test.rs b/nativelink-service/tests/worker_api_server_test.rs index fc472765f2..d5323b0443 100644 --- a/nativelink-service/tests/worker_api_server_test.rs +++ b/nativelink-service/tests/worker_api_server_test.rs @@ -393,7 +393,7 @@ pub async fn execution_response_success_test() -> Result<(), Box Self { - Self(OperationId::new(unique_qualifier).to_string()) - } +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub enum OperationId { + Uuid(Uuid), + String(String), +} +impl OperationId { pub fn from_raw_string(name: String) -> Self { - Self(name) + Self::String(name) } pub fn into_string(self) -> String { - self.0 - } -} - -impl std::fmt::Display for ClientOperationId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("{}", self.0.clone())) + match self { + Self::Uuid(uuid) => uuid.to_string(), + Self::String(name) => name, + } } } -fn uuid_to_string(uuid: &Uuid) -> String { - uuid.hyphenated().to_string() -} - -#[derive(Clone, Serialize, Deserialize, MetricsComponent)] -pub struct OperationId { - #[metric(help = "The unique qualifier of the operation")] - pub unique_qualifier: ActionUniqueQualifier, - #[metric(help = "The id of the operation", handler = uuid_to_string)] - pub id: Uuid, -} - -impl PartialEq for OperationId { - fn eq(&self, other: &Self) -> bool { - self.id.eq(&other.id) +impl Default for OperationId { + fn default() -> Self { + Self::Uuid(Uuid::new_v4()) } } -impl Eq for OperationId {} - -impl PartialOrd for OperationId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) +impl std::fmt::Display for OperationId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Uuid(uuid) => uuid.fmt(f), + Self::String(name) => f.write_str(name), + } } } -impl Ord for OperationId { - fn cmp(&self, other: &Self) -> Ordering { - self.id.cmp(&other.id) +impl MetricsComponent for OperationId { + fn publish( + &self, + _kind: MetricKind, + _field_metadata: MetricFieldData, + ) -> Result { + Ok(MetricPublishKnownKindData::String(self.to_string())) } } -impl Hash for OperationId { - fn hash(&self, state: &mut H) { - self.id.hash(state) - } +fn uuid_to_string(uuid: &Uuid) -> String { + uuid.hyphenated().to_string() } -impl OperationId { - pub fn new(unique_qualifier: ActionUniqueQualifier) -> Self { - Self { - id: Uuid::new_v4(), - unique_qualifier, +impl From<&str> for OperationId { + fn from(value: &str) -> Self { + match Uuid::parse_str(value) { + Ok(uuid) => Self::Uuid(uuid), + Err(_) => Self::String(value.to_string()), } } } -impl TryFrom<&str> for OperationId { - type Error = Error; - - /// Attempts to convert a string slice into an `OperationId`. - /// - /// The input string `value` is expected to be in the format: - /// `//-//`. - /// - /// # Parameters - /// - /// - `value`: A `&str` representing the `OperationId` to be converted. - /// - /// # Returns - /// - /// - `Result`: Returns `Ok(OperationId)` if the - /// conversion is successful, or an `Err(Error)` if it fails. - /// - /// # Errors - /// - /// This function will return an error if the input string is not in the expected format or if - /// any of the components cannot be parsed correctly. The detailed error messages provide - /// insight into which part of the input string caused the failure. - /// - /// ## Example Usage - /// - /// ```no_run - /// use nativelink_util::action_messages::OperationId; - /// let operation_id_str = "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"; - /// let operation_id = OperationId::try_from(operation_id_str); - /// ``` - /// - /// In this example, `operation_id_str` is a string representing an `OperationId`. - /// The `try_from` method is used to convert it into an `OperationId` instance. - /// If any part of the string is incorrectly formatted, an error will be returned with a - /// descriptive message. - fn try_from(value: &str) -> Result { - let (unique_qualifier, id) = value - .rsplit_once('/') - .err_tip(|| format!("Invalid OperationId unique_qualifier / id fragment - {value}"))?; - let (instance_name, rest) = unique_qualifier - .split_once('/') - .err_tip(|| format!("Invalid UniqueQualifier instance name fragment - {value}"))?; - let (digest_function, rest) = rest - .split_once('/') - .err_tip(|| format!("Invalid UniqueQualifier digest function fragment - {value}"))?; - let (digest_hash, rest) = rest - .split_once('-') - .err_tip(|| format!("Invalid UniqueQualifier digest hash fragment - {value}"))?; - let (digest_size, cachable) = rest - .split_once('/') - .err_tip(|| format!("Invalid UniqueQualifier digest size fragment - {value}"))?; - let digest = DigestInfo::try_new( - digest_hash, - digest_size - .parse::() - .err_tip(|| format!("Invalid UniqueQualifier size value fragment - {value}"))?, - ) - .err_tip(|| format!("Invalid DigestInfo digest hash - {value}"))?; - let cachable = match cachable { - "u" => false, - "c" => true, - _ => { - return Err(make_input_err!( - "Invalid UniqueQualifier cachable value fragment - {value}" - )); - } - }; - let unique_key = ActionUniqueKey { - instance_name: instance_name.to_string(), - digest_function: digest_function.try_into()?, - digest, - }; - let unique_qualifier = if cachable { - ActionUniqueQualifier::Cachable(unique_key) - } else { - ActionUniqueQualifier::Uncachable(unique_key) - }; - let id = Uuid::parse_str(id).map_err(|e| make_input_err!("Failed to parse {e} as uuid"))?; - Ok(Self { - unique_qualifier, - id, - }) - } -} - -impl std::fmt::Display for OperationId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("{}/{}", self.unique_qualifier, self.id)) - } -} - -impl std::fmt::Debug for OperationId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self, f) +impl From for OperationId { + fn from(value: String) -> Self { + match Uuid::parse_str(&value) { + Ok(uuid) => Self::Uuid(uuid), + Err(_) => Self::String(value), + } } } @@ -1144,7 +1041,9 @@ pub struct ActionState { #[metric(help = "The current stage of the action.")] pub stage: ActionStage, #[metric(help = "The unique identifier of the action.")] - pub id: OperationId, + pub operation_id: OperationId, + #[metric(help = "The digest of the action.")] + pub action_digest: DigestInfo, } impl ActionState { @@ -1190,13 +1089,20 @@ impl ActionState { } }; + let action_digest = metadata + .action_digest + .err_tip(|| "No action_digest in upstream operation")? + .try_into() + .err_tip(|| "Could not convert action_digest into DigestInfo")?; + Ok(Self { - id: operation_id, + operation_id, stage, + action_digest, }) } - pub fn as_operation(&self, client_operation_id: ClientOperationId) -> Operation { + pub fn as_operation(&self, client_operation_id: OperationId) -> Operation { let stage = Into::::into(&self.stage) as i32; let name = client_operation_id.into_string(); @@ -1206,7 +1112,7 @@ impl ActionState { } else { None }; - let digest = Some(self.id.unique_qualifier.digest().into()); + let digest = Some(self.action_digest.into()); let metadata = ExecuteOperationMetadata { stage, diff --git a/nativelink-util/src/operation_state_manager.rs b/nativelink-util/src/operation_state_manager.rs index 066a80e382..237b4bc79b 100644 --- a/nativelink-util/src/operation_state_manager.rs +++ b/nativelink-util/src/operation_state_manager.rs @@ -23,7 +23,7 @@ use nativelink_error::Error; use nativelink_metric::MetricsComponent; use crate::action_messages::{ - ActionInfo, ActionStage, ActionState, ActionUniqueKey, ClientOperationId, OperationId, WorkerId, + ActionInfo, ActionStage, ActionState, ActionUniqueKey, OperationId, WorkerId, }; use crate::common::DigestInfo; @@ -69,7 +69,7 @@ pub struct OperationFilter { pub stages: OperationStageFlags, /// The client operation id. - pub client_operation_id: Option, + pub client_operation_id: Option, /// The operation id. pub operation_id: Option, @@ -101,7 +101,7 @@ pub trait ClientStateManager: Sync + Send + MetricsComponent { /// Add a new action to the queue or joins an existing action. async fn add_action( &self, - client_operation_id: ClientOperationId, + client_operation_id: OperationId, action_info: Arc, ) -> Result, Error>; diff --git a/nativelink-util/tests/operation_id_tests.rs b/nativelink-util/tests/operation_id_tests.rs index a2513c8f73..3df46f2c4b 100644 --- a/nativelink-util/tests/operation_id_tests.rs +++ b/nativelink-util/tests/operation_id_tests.rs @@ -12,123 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. -use nativelink_error::{Code, Error}; +use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_util::action_messages::OperationId; use pretty_assertions::assert_eq; +use uuid::Uuid; #[nativelink_test] async fn parse_operation_id() -> Result<(), Error> { - { - // Check no cached. - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap(); - assert_eq!( - operation_id.to_string(), - "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - assert_eq!( - operation_id.id.to_string(), - "19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" - ); - } - { - // Check cached. - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/c/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap(); - assert_eq!( - operation_id.to_string(), - "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/c/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - assert_eq!( - operation_id.id.to_string(), - "19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" - ); - } + let operation_id = OperationId::from("19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); + assert_eq!( + operation_id, + OperationId::Uuid(Uuid::parse_str("19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").unwrap()) + ); Ok(()) } #[nativelink_test] async fn parse_empty_failure() -> Result<(), Error> { - let operation_id = OperationId::try_from("").err().unwrap(); - assert_eq!(operation_id.code, Code::Internal); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!( - operation_id.messages[0], - "Invalid OperationId unique_qualifier / id fragment - " - ); - - let operation_id = OperationId::try_from("/").err().unwrap(); - assert_eq!(operation_id.code, Code::Internal); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!( - operation_id.messages[0], - "Invalid UniqueQualifier instance name fragment - /" - ); - - let operation_id = OperationId::try_from("main").err().unwrap(); - assert_eq!(operation_id.code, Code::Internal); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!( - operation_id.messages[0], - "Invalid OperationId unique_qualifier / id fragment - main" - ); - - let operation_id = OperationId::try_from("main/nohashfn/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!( - operation_id.messages[0], - "Unknown or unsupported digest function for string conversion: \"NOHASHFN\"" - ); - - let operation_id = - OperationId::try_from("main/SHA256/badhash-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52") - .err() - .unwrap(); - assert_eq!(operation_id.messages.len(), 3); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "Odd number of digits"); - assert_eq!(operation_id.messages[1], "Invalid sha256 hash: badhash"); - assert_eq!( - operation_id.messages[2], - "Invalid DigestInfo digest hash - main/SHA256/badhash-211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52" - ); - - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.messages.len(), 2); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!( - operation_id.messages[0], - "cannot parse integer from empty string" - ); - assert_eq!(operation_id.messages[1], "Invalid UniqueQualifier size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f--211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages.len(), 2); - assert_eq!(operation_id.messages[0], "invalid digit found in string"); - assert_eq!(operation_id.messages[1], "Invalid UniqueQualifier size value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f--211/u/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/x/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier cachable value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/x/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/-10/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52").err().unwrap(); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier cachable value fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/-10/19b16cf8-a1ad-4948-aaac-b6f4eb7fca52"); - - let operation_id = OperationId::try_from("main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/u/baduuid").err().unwrap(); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!(operation_id.code, Code::InvalidArgument); - assert_eq!(operation_id.messages[0], "Failed to parse invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `u` at 4 as uuid"); - - let operation_id = OperationId::try_from( - "main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0", - ) - .err() - .unwrap(); - assert_eq!(operation_id.messages.len(), 1); - assert_eq!(operation_id.code, Code::Internal); - assert_eq!(operation_id.messages[0], "Invalid UniqueQualifier digest size fragment - main/SHA256/4a0885a39d5ba8da3123c02ff56b73196a8b23fd3c835e1446e74a3a3ff4313f-211/0"); + { + let operation_id = OperationId::from(""); + assert_eq!(operation_id, OperationId::String(String::new())); + } + { + let operation_id = OperationId::from("foobar"); + assert_eq!(operation_id, OperationId::String("foobar".to_string())); + } Ok(()) } diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index a99e80720f..0857fb8e88 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -211,22 +211,8 @@ impl<'a, T: WorkerApiClientTrait, U: RunningActionsManager> LocalWorkerImpl<'a, self.metrics.keep_alives_received.inc(); } Update::KillOperationRequest(kill_operation_request) => { - let operation_id_res = kill_operation_request - .operation_id - .as_str() - .try_into(); - let operation_id = match operation_id_res { - Ok(operation_id) => operation_id, - Err(err) => { - event!( - Level::ERROR, - ?kill_operation_request, - ?err, - "Failed to convert string to operation_id" - ); - continue; - } - }; + let operation_id = kill_operation_request + .operation_id.into(); if let Err(err) = self.running_actions_manager.kill_operation(&operation_id).await { event!( Level::ERROR, diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 15d82de515..50a029cd2f 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1704,7 +1704,7 @@ impl RunningActionsManagerImpl { operation_id: &'a OperationId, ) -> impl Future> + 'a { self.metrics.make_action_directory.wrap(async move { - let action_directory = format!("{}/{}", self.root_action_directory, operation_id.id); + let action_directory = format!("{}/{}", self.root_action_directory, operation_id); fs::create_dir(&action_directory) .await .err_tip(|| format!("Error creating action directory {action_directory}"))?; @@ -1791,11 +1791,8 @@ impl RunningActionsManager for RunningActionsManagerImpl { .queued_timestamp .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); - let operation_id: OperationId = start_execute - .operation_id - .as_str() - .try_into() - .err_tip(|| "Could not convert to operation_id in RunningActionsManager::create_and_add_action")?; + let operation_id = start_execute + .operation_id.as_str().into(); let action_info = self.create_action_info(start_execute, queued_timestamp).await?; event!( Level::INFO, diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 3feff1b5c4..e27377b257 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -668,7 +668,7 @@ async fn kill_action_request_kills_action() -> Result<(), Box SystemTime { previous_time } -fn make_operation_id(execute_request: &ExecuteRequest) -> OperationId { - let unique_qualifier = ActionUniqueQualifier::Cachable(ActionUniqueKey { - instance_name: execute_request.instance_name.clone(), - digest_function: execute_request.digest_function.try_into().unwrap(), - digest: execute_request - .action_digest - .clone() - .unwrap() - .try_into() - .unwrap(), - }); - OperationId::new(unique_qualifier) -} - #[nativelink_test] async fn download_to_directory_file_download_test() -> Result<(), Box> { const FILE1_NAME: &str = "file1.txt"; @@ -506,7 +491,7 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action = running_actions_manager .create_and_add_action( @@ -623,7 +608,7 @@ async fn ensure_output_files_full_directories_are_created_test( action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action = running_actions_manager .create_and_add_action( @@ -756,7 +741,7 @@ async fn blake3_upload_files() -> Result<(), Box> { digest_function: ProtoDigestFunction::Blake3.into(), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .create_and_add_action( @@ -928,7 +913,7 @@ async fn upload_files_from_above_cwd_test() -> Result<(), Box Result<(), Box> action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .create_and_add_action( @@ -1279,7 +1264,7 @@ async fn cleanup_happens_on_job_failure() -> Result<(), Box Result<(), Box> { action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .clone() @@ -1555,7 +1540,7 @@ exit 0 action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .clone() @@ -1722,7 +1707,7 @@ exit 0 action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .clone() @@ -1862,7 +1847,7 @@ exit 1 action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .clone() @@ -2377,7 +2362,7 @@ async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let execute_results_fut = running_actions_manager .create_and_add_action( @@ -2779,7 +2764,7 @@ async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box Result<(), Box> { action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .create_and_add_action( @@ -3020,7 +3005,7 @@ async fn action_directory_contents_are_cleaned() -> Result<(), Box Result<(), Box> { action_digest: Some(action_digest.into()), ..Default::default() }; - let operation_id = make_operation_id(&execute_request).to_string(); + let operation_id = OperationId::default().to_string(); let running_action_impl = running_actions_manager .create_and_add_action( @@ -3327,7 +3312,7 @@ async fn running_actions_manager_respects_action_timeout() -> Result<(), Box