From 467bee5dff3c295fe691c11927e02e9a6a0660c2 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 2 Sep 2021 13:15:47 +0800 Subject: [PATCH] Remove GetFileMetadata --- ballista/docs/architecture.md | 1 - ballista/rust/core/proto/ballista.proto | 16 -------- ballista/rust/scheduler/src/lib.rs | 50 ++----------------------- 3 files changed, 3 insertions(+), 64 deletions(-) diff --git a/ballista/docs/architecture.md b/ballista/docs/architecture.md index 2868d52b943e..c14ade92a66f 100644 --- a/ballista/docs/architecture.md +++ b/ballista/docs/architecture.md @@ -50,7 +50,6 @@ The scheduler process implements a gRPC interface (defined in | -------------------- | -------------------------------------------------------------------- | | ExecuteQuery | Submit a logical query plan or SQL query for execution | | GetExecutorsMetadata | Retrieves a list of executors that have registered with a scheduler | -| GetFileMetadata | Retrieve metadata about files available in the cluster file system | | GetJobStatus | Get the status of a submitted query | | RegisterExecutor | Executors call this method to register themselves with the scheduler | diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 45ff6c5984ca..3d89d38b755b 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -913,26 +913,10 @@ message GetJobStatusResult { JobStatus status = 1; } -message GetFileMetadataParams { - string path = 1; - FileType file_type = 2; -} - -message GetFileMetadataResult { - Schema schema = 1; - repeated FilePartitionMetadata partitions = 2; -} - -message FilePartitionMetadata { - repeated string filename = 1; -} - service SchedulerGrpc { // Executors must poll the scheduler for heartbeat and to receive tasks rpc PollWork (PollWorkParams) returns (PollWorkResult) {} - rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {} - rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {} rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {} diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index f03d08b1b0ed..4fa3aa02df31 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -40,10 +40,9 @@ use std::{fmt, net::IpAddr}; use ballista_core::serde::protobuf::{ execute_query_params::Query, executor_registration::OptionalHost, job_status, scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams, - ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType, - GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, - JobStatus, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, - TaskDefinition, TaskStatus, + ExecuteQueryResult, FailedJob, GetJobStatusParams, GetJobStatusResult, JobStatus, + PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition, + TaskStatus, }; use ballista_core::serde::scheduler::ExecutorMeta; @@ -82,7 +81,6 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use datafusion::datasource::parquet::ParquetTableDescriptor; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -268,48 +266,6 @@ impl SchedulerGrpc for SchedulerServer { } } - async fn get_file_metadata( - &self, - request: Request, - ) -> std::result::Result, tonic::Status> { - let GetFileMetadataParams { path, file_type } = request.into_inner(); - - let file_type: FileType = file_type.try_into().map_err(|e| { - let msg = format!("Error reading request: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - - match file_type { - FileType::Parquet => { - let parquet_desc = ParquetTableDescriptor::new(&path).map_err(|e| { - let msg = format!("Error opening parquet files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - - let partitions = parquet_desc - .descriptor - .partition_files - .iter() - .map(|pf| FilePartitionMetadata { - filename: vec![pf.path.clone()], - }) - .collect(); - - //TODO include statistics and any other info needed to reconstruct ParquetExec - Ok(Response::new(GetFileMetadataResult { - schema: Some(parquet_desc.schema().as_ref().into()), - partitions, - })) - } - //TODO implement for CSV - _ => Err(tonic::Status::unimplemented( - "get_file_metadata unsupported file type", - )), - } - } - async fn execute_query( &self, request: Request,