Skip to content

Commit

Permalink
Remove ClientOperationId and move all to OperationId
Browse files Browse the repository at this point in the history
This is in prep to support generic layering of workers & schedulers.
We can no longer know if the source is a client or just an
up-stream-component, so we need to fall back to just using OperationId.

towards TraceMachina#1213
  • Loading branch information
allada committed Aug 2, 2024
1 parent 44a4a91 commit 231676c
Show file tree
Hide file tree
Showing 26 changed files with 260 additions and 503 deletions.
8 changes: 4 additions & 4 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use async_trait::async_trait;
use futures::Future;
use nativelink_error::Error;
use nativelink_metric::RootMetricsComponent;
use nativelink_util::action_messages::{ActionInfo, ActionState, ClientOperationId};
use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId};

use crate::platform_property_manager::PlatformPropertyManager;

/// ActionListener interface is responsible for interfacing with clients
/// that are interested in the state of an action.
pub trait ActionListener: Sync + Send + Unpin {
/// Returns the client operation id.
fn client_operation_id(&self) -> &ClientOperationId;
fn client_operation_id(&self) -> &OperationId;

/// Waits for the action state to change.
fn changed(
Expand All @@ -48,13 +48,13 @@ pub trait ActionScheduler: Sync + Send + Unpin + RootMetricsComponent + 'static
/// Adds an action to the scheduler for remote execution.
async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error>;

/// Find an existing action by its name.
async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;
}
3 changes: 2 additions & 1 deletion nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ impl AwaitedAction {
);
let state = Arc::new(ActionState {
stage,
id: operation_id.clone(),
operation_id: operation_id.clone(),
action_digest: action_info.unique_qualifier.digest(),
});
Self {
version: AwaitedActionVersion(0),
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use awaited_action::{AwaitedAction, AwaitedActionSortKey};
use futures::{Future, Stream};
use nativelink_error::Error;
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{ActionInfo, ClientOperationId, OperationId};
use nativelink_util::action_messages::{ActionInfo, OperationId};

mod awaited_action;

Expand Down Expand Up @@ -80,7 +80,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
/// Get the AwaitedAction by the client operation id.
fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> impl Future<Output = Result<Option<Self::Subscriber>, Error>> + Send + Sync;

/// Get all AwaitedActions. This call should be avoided as much as possible.
Expand Down Expand Up @@ -117,7 +117,7 @@ pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
/// to changes.
fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> impl Future<Output = Result<Self::Subscriber, Error>> + Send + Sync;
}
18 changes: 9 additions & 9 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::grpc_store::GrpcStore;
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier,
ClientOperationId, OperationId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
};
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
Expand All @@ -48,7 +47,7 @@ use crate::platform_property_manager::PlatformPropertyManager;
type CheckActions = HashMap<
ActionUniqueKey,
Vec<(
ClientOperationId,
OperationId,
oneshot::Sender<Result<Pin<Box<dyn ActionListener>>, Error>>,
)>,
>;
Expand Down Expand Up @@ -98,7 +97,7 @@ type ActionListenerOneshot = oneshot::Receiver<Result<Pin<Box<dyn ActionListener
fn subscribe_to_existing_action(
inflight_cache_checks: &mut MutexGuard<CheckActions>,
unique_qualifier: &ActionUniqueKey,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Option<ActionListenerOneshot> {
inflight_cache_checks
.get_mut(unique_qualifier)
Expand All @@ -109,12 +108,12 @@ fn subscribe_to_existing_action(
})
}
struct CachedActionListener {
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_state: Arc<ActionState>,
}

impl ActionListener for CachedActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
fn client_operation_id(&self) -> &OperationId {
&self.client_operation_id
}

Expand Down Expand Up @@ -148,7 +147,7 @@ impl ActionScheduler for CacheLookupScheduler {

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let unique_key = match &action_info.unique_qualifier {
Expand Down Expand Up @@ -241,8 +240,9 @@ impl ActionScheduler for CacheLookupScheduler {
return; // Nobody is waiting for this action anymore.
};
let action_state = Arc::new(ActionState {
id: OperationId::new(action_info.unique_qualifier.clone()),
operation_id: OperationId::default(),
stage: ActionStage::CompletedFromCache(action_result),
action_digest: action_info.unique_qualifier.digest(),
});
for (client_operation_id, pending_tx) in pending_txs {
// Ignore errors here, as the other end may have hung up.
Expand Down Expand Up @@ -305,7 +305,7 @@ impl ActionScheduler for CacheLookupScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
self.action_scheduler
.find_by_client_operation_id(client_operation_id)
Expand Down
8 changes: 4 additions & 4 deletions nativelink-scheduler/src/default_action_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ use std::sync::Arc;

use futures::Future;
use nativelink_error::{make_err, Code, Error};
use nativelink_util::action_messages::{ActionState, ClientOperationId};
use nativelink_util::action_messages::{ActionState, OperationId};
use tokio::sync::watch;

use crate::action_scheduler::ActionListener;

/// Simple implementation of ActionListener using tokio's watch.
pub struct DefaultActionListener {
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_state: watch::Receiver<Arc<ActionState>>,
}

impl DefaultActionListener {
pub fn new(
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
mut action_state: watch::Receiver<Arc<ActionState>>,
) -> Self {
action_state.mark_changed();
Expand All @@ -54,7 +54,7 @@ impl DefaultActionListener {
}

impl ActionListener for DefaultActionListener {
fn client_operation_id(&self) -> &ClientOperationId {
fn client_operation_id(&self) -> &OperationId {
&self.client_operation_id
}

Expand Down
19 changes: 5 additions & 14 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
};
use nativelink_proto::google::longrunning::Operation;
use nativelink_util::action_messages::{
ActionInfo, ActionState, ActionUniqueKey, ActionUniqueQualifier, ClientOperationId,
OperationId, DEFAULT_EXECUTION_PRIORITY,
ActionInfo, ActionState, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
};
use nativelink_util::common::DigestInfo;
use nativelink_util::connection_manager::ConnectionManager;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::{background_spawn, tls_utils};
use parking_lot::Mutex;
Expand Down Expand Up @@ -126,16 +123,10 @@ impl GrpcScheduler {
.await
.err_tip(|| "Recieving response from upstream scheduler")?
{
let client_operation_id =
ClientOperationId::from_raw_string(initial_response.name.clone());
let client_operation_id = OperationId::from_raw_string(initial_response.name.clone());
// Our operation_id is not needed here is just a place holder to recycle existing object.
// The only thing that actually matters is the operation_id.
let operation_id =
OperationId::new(ActionUniqueQualifier::Uncachable(ActionUniqueKey {
instance_name: "dummy_instance_name".to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::zero_digest(),
}));
let operation_id = OperationId::default();
let action_state =
ActionState::try_from_operation(initial_response, operation_id.clone())
.err_tip(|| "In GrpcScheduler::stream_state")?;
Expand Down Expand Up @@ -243,7 +234,7 @@ impl ActionScheduler for GrpcScheduler {

async fn add_action(
&self,
_client_operation_id: ClientOperationId,
_client_operation_id: OperationId,
action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let execution_policy = if action_info.priority == DEFAULT_EXECUTION_PRIORITY {
Expand Down Expand Up @@ -289,7 +280,7 @@ impl ActionScheduler for GrpcScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
let request = WaitExecutionRequest {
name: client_operation_id.to_string(),
Expand Down
23 changes: 11 additions & 12 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier,
ClientOperationId, OperationId,
ActionInfo, ActionStage, ActionState, ActionUniqueKey, ActionUniqueQualifier, OperationId,
};
use nativelink_util::chunked_stream::ChunkedStream;
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
Expand Down Expand Up @@ -99,7 +98,7 @@ impl LenEntry for ClientAwaitedAction {
/// Actions the AwaitedActionsDb needs to process.
pub(crate) enum ActionEvent {
/// A client has sent a keep alive message.
ClientKeepAlive(ClientOperationId),
ClientKeepAlive(OperationId),
/// A client has dropped and pointed to OperationId.
ClientDroppedOperation(OperationId),
}
Expand All @@ -108,7 +107,7 @@ pub(crate) enum ActionEvent {
/// keep alive config and state.
struct ClientInfo<I: InstantWrapper, NowFn: Fn() -> I> {
/// The client operation id.
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
/// The last time a keep alive was sent.
last_keep_alive: I,
/// The function to get the current time.
Expand Down Expand Up @@ -136,7 +135,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I> MemoryAwaitedActionSubscriber<I, NowFn
}
pub fn new_with_client(
mut awaited_action_rx: watch::Receiver<AwaitedAction>,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
event_tx: mpsc::UnboundedSender<ActionEvent>,
now_fn: NowFn,
) -> Self
Expand Down Expand Up @@ -351,7 +350,7 @@ impl SortedAwaitedActions {
pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
/// A lookup table to lookup the state of an action by its client operation id.
#[metric(group = "client_operation_ids")]
client_operation_to_awaited_action: EvictingMap<ClientOperationId, Arc<ClientAwaitedAction>, I>,
client_operation_to_awaited_action: EvictingMap<OperationId, Arc<ClientAwaitedAction>, I>,

/// A lookup table to lookup the state of an action by its worker operation id.
#[metric(group = "operation_ids")]
Expand Down Expand Up @@ -381,7 +380,7 @@ pub struct AwaitedActionDbImpl<I: InstantWrapper, NowFn: Fn() -> I> {
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbImpl<I, NowFn> {
async fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<MemoryAwaitedActionSubscriber<I, NowFn>>, Error> {
let maybe_client_awaited_action = self
.client_operation_to_awaited_action
Expand Down Expand Up @@ -710,7 +709,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI

async fn add_action(
&mut self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<MemoryAwaitedActionSubscriber<I, NowFn>, Error> {
// Check to see if the action is already known and subscribe if it is.
Expand All @@ -732,7 +731,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
ActionUniqueQualifier::Cachable(unique_key) => Some(unique_key.clone()),
ActionUniqueQualifier::Uncachable(_unique_key) => None,
};
let operation_id = OperationId::new(action_info.unique_qualifier.clone());
let operation_id = OperationId::default();
let awaited_action = AwaitedAction::new(operation_id.clone(), action_info);
debug_assert!(
ActionStage::Queued == awaited_action.state().stage,
Expand Down Expand Up @@ -782,7 +781,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI

async fn try_subscribe(
&mut self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
unique_qualifier: &ActionUniqueQualifier,
// TODO(allada) To simplify the scheduler 2024 refactor, we
// removed the ability to upgrade priorities of actions.
Expand Down Expand Up @@ -894,7 +893,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite

async fn get_awaited_action_by_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Self::Subscriber>, Error> {
self.inner
.lock()
Expand Down Expand Up @@ -980,7 +979,7 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> Awaite

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
action_info: Arc<ActionInfo>,
) -> Result<Self::Subscriber, Error> {
self.inner
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use async_trait::async_trait;
use nativelink_config::schedulers::{PropertyModification, PropertyType};
use nativelink_error::{Error, ResultExt};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::action_messages::{ActionInfo, ClientOperationId};
use nativelink_util::action_messages::{ActionInfo, OperationId};
use parking_lot::Mutex;

use crate::action_scheduler::{ActionListener, ActionScheduler};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl ActionScheduler for PropertyModifierScheduler {

async fn add_action(
&self,
client_operation_id: ClientOperationId,
client_operation_id: OperationId,
mut action_info: ActionInfo,
) -> Result<Pin<Box<dyn ActionListener>>, Error> {
let platform_property_manager = self
Expand Down Expand Up @@ -122,7 +122,7 @@ impl ActionScheduler for PropertyModifierScheduler {

async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
client_operation_id: &OperationId,
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error> {
self.scheduler
.find_by_client_operation_id(client_operation_id)
Expand Down
Loading

0 comments on commit 231676c

Please sign in to comment.