Skip to content

Commit

Permalink
Remove ClientOperationId and move all to OperationId (#1214)
Browse files Browse the repository at this point in the history
This is in prep to support generic layering of workers & schedulers.
We can no longer know if the source is a client or just an
up-stream-component, so we need to fall back to just using OperationId.

towards #1213
  • Loading branch information
allada authored Aug 6, 2024
1 parent ba7abf3 commit 81db90e
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 503 deletions.
8 changes: 4 additions & 4 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use async_trait::async_trait;
use futures::Future;
use nativelink_error::Error;
use nativelink_metric::RootMetricsComponent;
use nativelink_util::action_messages::{ActionInfo, ActionState, ClientOperationId};
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId};

use crate::platform_property_manager::PlatformPropertyManager;

/// ActionListener interface is responsible for interfacing with clients
/// that are interested in the state of an action.
pub trait ActionListener: Sync + Send + Unpin {
/// Returns the client operation id.
fn client_operation_id(&self) -> &ClientOperationId;
fn client_operation_id(&self) -> &OperationId;

/// Waits for the action state to change.
fn changed(
Expand All @@ -48,13 +48,13 @@ pub trait ActionScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static
/// Adds an action to the scheduler for remote execution.
async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error>;

/// Find an existing action by its name.
async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;
}
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl AwaitedAction {
);
let state = Arc::new(ActionState {
stage,
id: operation_id.clone(),
operation_id: operation_id.clone(),
action_digest: action_info.unique_qualifier.digest(),
});
Self {
version: AwaitedActionVersion(0),
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
use futures::{Future, Stream};
use nativelink_error::Error;
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId};
use nativelink_util::action_messages::{ActionInfo, OperationId};

mod awaited_action;

Expand Down Expand Up @@ -80,7 +80,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
/// Get the AwaitedAction by the client operation id.
fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> impl Future<Output = Result<Option<Self::Subscriber>, Error>> + Send;

/// Get all AwaitedActions. This call should be avoided as much as possible.
Expand Down Expand Up @@ -113,7 +113,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
/// to changes.
fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> impl Future<Output = Result<Self::Subscriber, Error>> + Send;
}
18 changes: 9 additions & 9 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier,
ClientOperationId, OperationId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
};
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
Expand All @@ -48,7 +47,7 @@ use crate::platform_property_manager::PlatformPropertyManager;
type CheckActions = HashMap<
ActionUniqueKey,
Vec<(
ClientOperationId,
OperationId,
oneshot::Sender<Result<Pin<Box<dyn ActionListener>>, Error>>,
)>,
>;
Expand Down Expand Up @@ -98,7 +97,7 @@ type ActionListenerOneshot = oneshot::Receiver<Result<Pin<Box<dyn ActionListener
fn subscribe_to_existing_action(
inflight_cache_checks: &mut MutexGuard<CheckActions>,
unique_qualifier: &ActionUniqueKey,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Option<ActionListenerOneshot> {
inflight_cache_checks
.get_mut(unique_qualifier)
Expand All @@ -109,12 +108,12 @@ fn subscribe_to_existing_action(
})
}
struct CachedActionListener {
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_state: Arc<ActionState>,
}

impl ActionListener for CachedActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
fn client_operation_id(&self) -> &OperationId {
&self.client_operation_id
}

Expand Down Expand Up @@ -148,7 +147,7 @@ impl ActionScheduler for CacheLookupScheduler {

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let unique_key = match &action_info.unique_qualifier {
Expand Down Expand Up @@ -241,8 +240,9 @@ impl ActionScheduler for CacheLookupScheduler {
return; // Nobody is waiting for this action anymore.
};
let action_state = Arc::new(ActionState {
id: OperationId::new(action_info.unique_qualifier.clone()),
operation_id: OperationId::default(),
stage: ActionStage::CompletedFromCache(action_result),
action_digest: action_info.unique_qualifier.digest(),
});
for (client_operation_id, pending_tx) in pending_txs {
// Ignore errors here, as the other end may have hung up.
Expand Down Expand Up @@ -305,7 +305,7 @@ impl ActionScheduler for CacheLookupScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
self.action_scheduler
.find_by_client_operation_id(client_operation_id)
Expand Down
8 changes: 4 additions & 4 deletions nativelink-scheduler/src/default_action_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ use std::sync::Arc;

use futures::Future;
use nativelink_error::{make_err, Code, Error};
use nativelink_util::action_messages::{ActionState, ClientOperationId};
use nativelink_util::action_messages::{ActionState, OperationId};
use tokio::sync::watch;

use crate::action_scheduler::ActionListener;

/// Simple implementation of ActionListener using tokio's watch.
pub struct DefaultActionListener {
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_state: watch::Receiver<Arc<ActionState>>,
}

impl DefaultActionListener {
pub fn new(
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
mut action_state: watch::Receiver<Arc<ActionState>>,
) -> Self {
action_state.mark_changed();
Expand All @@ -54,7 +54,7 @@ impl DefaultActionListener {
}

impl ActionListener for DefaultActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
fn client_operation_id(&self) -> &OperationId {
&self.client_operation_id
}

Expand Down
19 changes: 5 additions & 14 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_util::action_messages::{
ActionInfo, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId,
OperationId, DEFAULT_EXECUTION_PRIORITY,
ActionInfo, ActionState, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::connection_manager::ConnectionManager;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::{background_spawn, tls_utils};
use parking_lot::Mutex;
Expand Down Expand Up @@ -126,16 +123,10 @@ impl GrpcScheduler {
.await
.err_tip(|| "Recieving response from upstream scheduler")?
{
let client_operation_id =
ClientOperationId::from_raw_string(initial_response.name.clone());
let client_operation_id = OperationId::from_raw_string(initial_response.name.clone());
// Our operation_id is not needed here is just a place holder to recycle existing object.
// The only thing that actually matters is the operation_id.
let operation_id =
OperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey {
instance_name: "dummy_instance_name".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::zero_digest(),
}));
let operation_id = OperationId::default();
let action_state =
ActionState::try_from_operation(initial_response, operation_id.clone())
.err_tip(|| "In GrpcScheduler::stream_state")?;
Expand Down Expand Up @@ -243,7 +234,7 @@ impl ActionScheduler for GrpcScheduler {

async fn add_action(
&self,
_client_operation_id: ClientOperationId,
_client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let execution_policy = if action_info.priority == DEFAULT_EXECUTION_PRIORITY {
Expand Down Expand Up @@ -289,7 +280,7 @@ impl ActionScheduler for GrpcScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
let request = WaitExecutionRequest {
name: client_operation_id.to_string(),
Expand Down
23 changes: 11 additions & 12 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier,
ClientOperationId, OperationId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
};
use nativelink_util::chunked_stream::ChunkedStream;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
Expand Down Expand Up @@ -99,7 +98,7 @@ impl LenEntry for ClientAwaitedAction {
/// Actions the AwaitedActionsDb needs to process.
pub(crate) enum ActionEvent {
/// A client has sent a keep alive message.
ClientKeepAlive(ClientOperationId),
ClientKeepAlive(OperationId),
/// A client has dropped and pointed to OperationId.
ClientDroppedOperation(OperationId),
}
Expand All @@ -108,7 +107,7 @@ pub(crate) enum ActionEvent {
/// keep alive config and state.
struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> {
/// The client operation id.
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
/// The last time a keep alive was sent.
last_keep_alive: I,
/// The function to get the current time.
Expand Down Expand Up @@ -136,7 +135,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn
}
pub fn new_with_client(
mut awaited_action_rx: watch::Receiver<AwaitedAction>,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
event_tx: mpsc::UnboundedSender<ActionEvent>,
now_fn: NowFn,
) -> Self
Expand Down Expand Up @@ -351,7 +350,7 @@ impl SortedAwaitedActions {
pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
/// A lookup table to lookup the state of an action by its client operation id.
#[metric(group = "client_operation_ids")]
client_operation_to_awaited_action: EvictingMap<ClientOperationId, Arc<ClientAwaitedAction>, I>,
client_operation_to_awaited_action: EvictingMap<OperationId, Arc<ClientAwaitedAction>, I>,

/// A lookup table to lookup the state of an action by its worker operation id.
#[metric(group = "operation_ids")]
Expand Down Expand Up @@ -381,7 +380,7 @@ pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> {
async fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
let maybe_client_awaited_action = self
.client_operation_to_awaited_action
Expand Down Expand Up @@ -710,7 +709,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI

async fn add_action(
&mut self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> {
// Check to see if the action is already known and subscribe if it is.
Expand All @@ -732,7 +731,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
ActionUniqueQualifier::Cachable(unique_key) => Some(unique_key.clone()),
ActionUniqueQualifier::Uncachable(_unique_key) => None,
};
let operation_id = OperationId::new(action_info.unique_qualifier.clone());
let operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
Expand Down Expand Up @@ -782,7 +781,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI

async fn try_subscribe(
&mut self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
unique_qualifier: &ActionUniqueQualifier,
// TODO(allada) To simplify the scheduler 2024 refactor, we
// removed the ability to upgrade priorities of actions.
Expand Down Expand Up @@ -894,7 +893,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite

async fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Self::Subscriber>, Error> {
self.inner
.lock()
Expand Down Expand Up @@ -980,7 +979,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Self::Subscriber, Error> {
self.inner
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use nativelink_config::schedulers::{PropertyModification, PropertyType};
use nativelink_error::{Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{ActionInfo, ClientOperationId};
use nativelink_util::action_messages::{ActionInfo, OperationId};
use parking_lot::Mutex;

use crate::action_scheduler::{ActionListener, ActionScheduler};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl ActionScheduler for PropertyModifierScheduler {

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
mut action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let platform_property_manager = self
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ActionScheduler for PropertyModifierScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
self.scheduler
.find_by_client_operation_id(client_operation_id)
Expand Down
Loading

0 comments on commit 81db90e

Please sign in to comment.