Skip to content

Commit

Permalink
Remove ActionScheduler and introduce KnownPlatformPropertyProvider
Browse files Browse the repository at this point in the history
We want to phase out ActionScheduler in favor of ClientStateManager. In
doing so, we need to have a way to let CapabilitiesServer know about
platform properties; if we do this, it'd be a breaking change. Instead
of causing a breaking change, we instead are going to make the provided
scheduler allow the capabilities service query the optionally provided
property provider interface and if it does not exist throw a warning and
have it return an empty set (most RBE clients ignore this field anyway).

towards TraceMachina#1213
  • Loading branch information
allada committed Aug 12, 2024
1 parent 7777938 commit 18cc117
Show file tree
Hide file tree
Showing 28 changed files with 395 additions and 512 deletions.
1 change: 0 additions & 1 deletion nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ load(
rust_library(
name = "nativelink-scheduler",
srcs = [
"src/action_scheduler.rs",
"src/api_worker_scheduler.rs",
"src/awaited_action_db/awaited_action.rs",
"src/awaited_action_db/mod.rs",
Expand Down
8 changes: 4 additions & 4 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use nativelink_metric::{
group, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
RootMetricsComponent,
};
use nativelink_util::action_messages::{ActionInfo, ActionStage, OperationId, WorkerId};
use nativelink_util::action_messages::{ActionStage, OperationId, WorkerId};
use nativelink_util::operation_state_manager::WorkerStateManager;
use nativelink_util::platform_properties::PlatformProperties;
use tokio::sync::Notify;
use tonic::async_trait;
use tracing::{event, Level};

use crate::platform_property_manager::PlatformPropertyManager;
use crate::worker::{Worker, WorkerTimestamp, WorkerUpdate};
use crate::worker::{ActionInfoWithProps, Worker, WorkerTimestamp, WorkerUpdate};
use crate::worker_scheduler::WorkerScheduler;

struct Workers(LruCache<WorkerId, Worker>);
Expand Down Expand Up @@ -248,7 +248,7 @@ impl ApiWorkerSchedulerImpl {
&mut self,
worker_id: WorkerId,
operation_id: OperationId,
action_info: Arc<ActionInfo>,
action_info: ActionInfoWithProps,
) -> Result<(), Error> {
if let Some(worker) = self.workers.get_mut(&worker_id) {
let notify_worker_result =
Expand Down Expand Up @@ -345,7 +345,7 @@ impl ApiWorkerScheduler {
&self,
worker_id: WorkerId,
operation_id: OperationId,
action_info: Arc<ActionInfo>,
action_info: ActionInfoWithProps,
) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
inner
Expand Down
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/awaited_action_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub trait AwaitedActionSubscriber: Send + Sync + Sized + 'static {
}

/// A trait that defines the interface for an AwaitedActionDb.
pub trait AwaitedActionDb: Send + Sync + MetricsComponent + 'static {
pub trait AwaitedActionDb: Send + Sync + MetricsComponent + Unpin + 'static {
type Subscriber: AwaitedActionSubscriber;

/// Get the AwaitedAction by the client operation id.
Expand Down
27 changes: 10 additions & 17 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use nativelink_util::action_messages::{
use nativelink_util::background_spawn;
use nativelink_util::common::DigestInfo;
use nativelink_util::digest_hasher::DigestHasherFunc;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
Expand All @@ -39,9 +40,6 @@ use tokio::sync::oneshot;
use tonic::Request;
use tracing::{event, Level};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

/// Actions that are having their cache checked or failed cache lookup and are
/// being forwarded upstream. Missing the skip_cache_check actions which are
/// forwarded directly.
Expand All @@ -62,7 +60,7 @@ pub struct CacheLookupScheduler {
/// The "real" scheduler to use to perform actions if they were not found
/// in the action cache.
#[metric(group = "action_scheduler")]
action_scheduler: Arc<dyn ActionScheduler>,
action_scheduler: Arc<dyn ClientStateManager>,
/// Actions that are currently performing a CacheCheck.
inflight_cache_checks: Arc<Mutex<CheckActions>>,
}
Expand Down Expand Up @@ -142,7 +140,10 @@ impl ActionStateResult for CacheLookupActionStateResult {
}

impl CacheLookupScheduler {
pub fn new(ac_store: Store, action_scheduler: Arc<dyn ActionScheduler>) -> Result<Self, Error> {
pub fn new(
ac_store: Store,
action_scheduler: Arc<dyn ClientStateManager>,
) -> Result<Self, Error> {
Ok(Self {
ac_store,
action_scheduler,
Expand Down Expand Up @@ -321,18 +322,6 @@ impl CacheLookupScheduler {
}
}

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.action_scheduler
.get_platform_property_manager(instance_name)
.await
}
}

#[async_trait]
impl ClientStateManager for CacheLookupScheduler {
async fn add_action(
Expand All @@ -350,6 +339,10 @@ impl ClientStateManager for CacheLookupScheduler {
) -> Result<ActionStateResultStream, Error> {
self.inner_filter_operations(filter).await
}

fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
self.action_scheduler.as_known_platform_property_provider()
}
}

impl RootMetricsComponent for CacheLookupScheduler {}
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ use std::sync::Arc;
use nativelink_config::schedulers::SchedulerConfig;
use nativelink_error::{Error, ResultExt};
use nativelink_store::store_manager::StoreManager;
use nativelink_util::operation_state_manager::ClientStateManager;

use crate::action_scheduler::ActionScheduler;
use crate::cache_lookup_scheduler::CacheLookupScheduler;
use crate::grpc_scheduler::GrpcScheduler;
use crate::property_modifier_scheduler::PropertyModifierScheduler;
use crate::simple_scheduler::SimpleScheduler;
use crate::worker_scheduler::WorkerScheduler;

pub type SchedulerFactoryResults = (
Option<Arc<dyn ActionScheduler>>,
Option<Arc<dyn ClientStateManager>>,
Option<Arc<dyn WorkerScheduler>>,
);

Expand Down
59 changes: 22 additions & 37 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use nativelink_util::action_messages::{
ActionInfo, ActionState, ActionUniqueQualifier, OperationId, DEFAULT_EXECUTION_PRIORITY,
};
use nativelink_util::connection_manager::ConnectionManager;
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
Expand All @@ -46,9 +47,6 @@ use tokio::time::sleep;
use tonic::{Request, Streaming};
use tracing::{event, Level};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;

struct GrpcActionStateResult {
client_operation_id: OperationId,
rx: watch::Receiver<Arc<ActionState>>,
Expand Down Expand Up @@ -87,7 +85,7 @@ impl ActionStateResult for GrpcActionStateResult {
#[derive(MetricsComponent)]
pub struct GrpcScheduler {
#[metric(group = "property_managers")]
platform_property_managers: Mutex<HashMap<String, Arc<PlatformPropertyManager>>>,
supported_props: Mutex<HashMap<String, Vec<String>>>,
retrier: Retrier,
connection_manager: ConnectionManager,
}
Expand Down Expand Up @@ -115,7 +113,7 @@ impl GrpcScheduler {
let endpoint = tls_utils::endpoint(&config.endpoint)?;
let jitter_fn = Arc::new(jitter_fn);
Ok(Self {
platform_property_managers: Mutex::new(HashMap::new()),
supported_props: Mutex::new(HashMap::new()),
retrier: Retrier::new(
Arc::new(|duration| Box::pin(sleep(duration))),
jitter_fn.clone(),
Expand Down Expand Up @@ -220,14 +218,9 @@ impl GrpcScheduler {
))
}

async fn inner_get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
if let Some(platform_property_manager) =
self.platform_property_managers.lock().get(instance_name)
{
return Ok(platform_property_manager.clone());
async fn inner_get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
if let Some(supported_props) = self.supported_props.lock().get(instance_name) {
return Ok(supported_props.clone());
}

self.perform_request(instance_name, |instance_name| async move {
Expand All @@ -244,25 +237,17 @@ impl GrpcScheduler {
.await
.err_tip(|| "Retrieving upstream GrpcScheduler capabilities");
let capabilities = capabilities_result?.into_inner();
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
capabilities
.execution_capabilities
.err_tip(|| "Unable to get execution properties in GrpcScheduler")?
.supported_node_properties
.iter()
.map(|property| {
(
property.clone(),
nativelink_config::schedulers::PropertyType::exact,
)
})
.collect(),
));
let supported_props = capabilities
.execution_capabilities
.err_tip(|| "Unable to get execution properties in GrpcScheduler")?
.supported_node_properties
.into_iter()
.collect::<Vec<String>>();

self.platform_property_managers
self.supported_props
.lock()
.insert(instance_name.to_string(), platform_property_manager.clone());
Ok(platform_property_manager)
.insert(instance_name.to_string(), supported_props.clone());
Ok(supported_props)
})
.await
}
Expand Down Expand Up @@ -376,16 +361,16 @@ impl ClientStateManager for GrpcScheduler {
) -> Result<ActionStateResultStream<'a>, Error> {
self.inner_filter_operations(filter).await
}

fn as_known_platform_property_provider(&self) -> Option<&dyn KnownPlatformPropertyProvider> {
Some(self)
}
}

#[async_trait]
impl ActionScheduler for GrpcScheduler {
async fn get_platform_property_manager(
&self,
instance_name: &str,
) -> Result<Arc<PlatformPropertyManager>, Error> {
self.inner_get_platform_property_manager(instance_name)
.await
impl KnownPlatformPropertyProvider for GrpcScheduler {
async fn get_known_properties(&self, instance_name: &str) -> Result<Vec<String>, Error> {
self.inner_get_known_properties(instance_name).await
}
}

Expand Down
1 change: 0 additions & 1 deletion nativelink-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod action_scheduler;
pub mod api_worker_scheduler;
mod awaited_action_db;
pub mod cache_lookup_scheduler;
Expand Down
16 changes: 15 additions & 1 deletion nativelink-scheduler/src/platform_property_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use nativelink_error::{make_input_err, Code, Error, ResultExt};
use nativelink_metric::{
group, MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
};
use nativelink_util::platform_properties::PlatformPropertyValue;
use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyValue};

/// Helps manage known properties and conversion into `PlatformPropertyValue`.
pub struct PlatformPropertyManager {
Expand Down Expand Up @@ -57,6 +57,20 @@ impl PlatformPropertyManager {
&self.known_properties
}

/// Given a map of key-value pairs, returns a map of `PlatformPropertyValue` based on the
/// configuration passed into the `PlatformPropertyManager` constructor.
pub fn make_platform_properties(
&self,
properties: HashMap<String, String>,
) -> Result<PlatformProperties, Error> {
let mut platform_properties = HashMap::with_capacity(properties.len());
for (key, value) in properties {
let prop_value = self.make_prop_value(&key, &value)?;
platform_properties.insert(key, prop_value);
}
Ok(PlatformProperties::new(platform_properties))
}

/// Given a specific key and value, returns the translated `PlatformPropertyValue`. This will
/// automatically convert any strings to the type-value pairs of `PlatformPropertyValue` based
/// on the configuration passed into the `PlatformPropertyManager` constructor.
Expand Down
Loading

0 comments on commit 18cc117

Please sign in to comment.