diff --git a/Cargo.lock b/Cargo.lock index 9cdbece319..7754bfcfb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1325,9 +1325,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39fca0a20131625263de118b8d09be05ffcc0d0ed6392f75d29e277f36b0d32c" +checksum = "1bb4aa3d3a6b99a22cc2e35cd2cd04d52e825dbcb8f0dd7c7095a0d4e7bdfed1" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index d180bd0c3f..71d779bcc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.16" +ceresdbproto = "1.0.17" codec = { path = "components/codec" } notifier = { path = "components/notifier" } chrono = "0.4" diff --git a/analytic_engine/src/instance/alter.rs b/analytic_engine/src/instance/alter.rs index 9aaf6f746f..131a9cc509 100644 --- a/analytic_engine/src/instance/alter.rs +++ b/analytic_engine/src/instance/alter.rs @@ -72,7 +72,14 @@ impl<'a> Alterer<'a> { ); // Validate alter schema request. - self.validate_before_alter(&request)?; + // if the alter schema request is idempotent, we can skip the alter operation. + if self.validate_before_alter(&request)? { + info!( + "Skip alter because of the altered schema is the same as the current, table:{}", + self.table_data.name + ); + return Ok(()); + } // Now we can persist and update the schema, since this function is called by // write worker, so there is no other concurrent writer altering the @@ -157,7 +164,9 @@ impl<'a> Alterer<'a> { // Most validation should be done by catalog module, so we don't do too much // duplicate check here, especially the schema compatibility. - fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<()> { + // The returned value denotes whether the altered schema is same as the current + // one. + fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result { ensure!( !self.table_data.is_dropped(), AlterDroppedTable { @@ -165,6 +174,10 @@ impl<'a> Alterer<'a> { } ); + if self.table_data.schema().columns() == request.schema.columns() { + return Ok(true); + } + let current_version = self.table_data.schema_version(); ensure!( current_version < request.schema.version(), @@ -184,7 +197,7 @@ impl<'a> Alterer<'a> { } ); - Ok(()) + Ok(false) } pub async fn alter_options_of_table( diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs index af3263b51f..1943c26e68 100644 --- a/catalog/src/schema.rs +++ b/catalog/src/schema.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; -use common_types::{column_schema::ColumnSchema, table::ShardId}; +use common_types::table::ShardId; use generic_error::GenericError; use macros::define_result; use snafu::{Backtrace, Snafu}; @@ -338,22 +338,6 @@ pub struct CloseOptions { pub table_engine: TableEngineRef, } -/// Alter table operations. -#[derive(Debug)] -pub enum AlterTableOperation { - /// Add column operation, the column id in [ColumnSchema] will be ignored. - /// Primary key column is not allowed to be added, so all columns will - /// be added as normal columns. - AddColumn(ColumnSchema), -} - -/// Alter table request. -#[derive(Debug)] -pub struct AlterTableRequest { - pub table_name: String, - pub operations: Vec, -} - #[derive(Debug, Clone)] pub struct OpenShardRequest { /// Shard id diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index d42c0f9257..ae1bdfaa52 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -78,6 +78,38 @@ UInt64(9923681778193615344),Timestamp(1651737067000),String("ceresdb8"),Int32(0) UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0),Double(109.0), +ALTER TABLE partition_table_t ADD COLUMN (b string); + +affected_rows: 0 + +ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true'; + +affected_rows: 0 + +SHOW CREATE TABLE __partition_table_t_0; + +Table,Create Table, +String("__partition_table_t_0"),String("CREATE TABLE `__partition_table_t_0` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), + + +SHOW CREATE TABLE __partition_table_t_1; + +Table,Create Table, +String("__partition_table_t_1"),String("CREATE TABLE `__partition_table_t_1` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), + + +SHOW CREATE TABLE __partition_table_t_2; + +Table,Create Table, +String("__partition_table_t_2"),String("CREATE TABLE `__partition_table_t_2` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), + + +SHOW CREATE TABLE __partition_table_t_3; + +Table,Create Table, +String("__partition_table_t_3"),String("CREATE TABLE `__partition_table_t_3` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), + + DROP TABLE IF EXISTS `partition_table_t`; affected_rows: 0 diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index 54e3806d74..658760eb96 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -35,6 +35,18 @@ SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2 SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7","ceresdb8", "ceresdb9", "ceresdb10") order by name; +ALTER TABLE partition_table_t ADD COLUMN (b string); + +ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true'; + +SHOW CREATE TABLE __partition_table_t_0; + +SHOW CREATE TABLE __partition_table_t_1; + +SHOW CREATE TABLE __partition_table_t_2; + +SHOW CREATE TABLE __partition_table_t_3; + DROP TABLE IF EXISTS `partition_table_t`; SHOW CREATE TABLE partition_table_t; diff --git a/partition_table_engine/src/partition.rs b/partition_table_engine/src/partition.rs index e51dc1403c..21af54da20 100644 --- a/partition_table_engine/src/partition.rs +++ b/partition_table_engine/src/partition.rs @@ -24,6 +24,7 @@ use common_types::{ }; use futures::{stream::FuturesUnordered, StreamExt}; use generic_error::BoxError; +use logger::error; use snafu::ResultExt; use table_engine::{ partition::{ @@ -36,16 +37,16 @@ use table_engine::{ }, remote::{ model::{ - ReadRequest as RemoteReadRequest, TableIdentifier, WriteBatchResult, - WriteRequest as RemoteWriteRequest, + AlterTableOptionsRequest, AlterTableSchemaRequest, ReadRequest as RemoteReadRequest, + TableIdentifier, WriteBatchResult, WriteRequest as RemoteWriteRequest, }, RemoteEngineRef, }, stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ - AlterSchemaRequest, CreatePartitionRule, FlushRequest, GetRequest, LocatePartitions, - ReadRequest, Result, Scan, Table, TableId, TableStats, UnexpectedWithMsg, - UnsupportedMethod, Write, WriteBatch, WriteRequest, + AlterOptions, AlterSchema, AlterSchemaRequest, CreatePartitionRule, FlushRequest, + GetRequest, LocatePartitions, ReadRequest, Result, Scan, Table, TableId, TableStats, + UnexpectedWithMsg, UnsupportedMethod, Write, WriteBatch, WriteRequest, }, }; @@ -328,20 +329,114 @@ impl Table for PartitionTableImpl { Ok(PartitionedStreams { streams }) } - async fn alter_schema(&self, _request: AlterSchemaRequest) -> Result { - UnsupportedMethod { - table: self.name(), - method: "alter_schema", + async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { + let partition_num = match self.partition_info() { + None => UnexpectedWithMsg { + msg: "partition table partition info can't be empty", + } + .fail()?, + Some(partition_info) => partition_info.get_partition_num(), + }; + + // Alter schema of partitions except the first one. + // Because the schema of partition table is stored in the first partition. + let mut futures = FuturesUnordered::new(); + for id in 1..partition_num { + let partition = self + .remote_engine + .alter_table_schema(AlterTableSchemaRequest { + table_ident: self.get_sub_table_ident(id), + table_schema: request.schema.clone(), + pre_schema_version: request.pre_schema_version, + }); + futures.push(partition); } - .fail() + + let mut alter_err = None; + while let Some(alter_ret) = futures.next().await { + if let Err(e) = &alter_ret { + error!("Alter schema failed, table_name:{}, err:{e}", self.name()); + alter_err.get_or_insert( + alter_ret + .box_err() + .context(AlterSchema { table: self.name() }), + ); + } + } + + // Remove the first error. + if let Some(ret) = alter_err { + ret?; + } + + // Alter schema of the first partition. + self.remote_engine + .alter_table_schema(AlterTableSchemaRequest { + table_ident: self.get_sub_table_ident(0), + table_schema: request.schema.clone(), + pre_schema_version: request.pre_schema_version, + }) + .await + .box_err() + .with_context(|| AlterSchema { + table: self.get_sub_table_ident(0).table, + })?; + + Ok(0) } - async fn alter_options(&self, _options: HashMap) -> Result { - UnsupportedMethod { - table: self.name(), - method: "alter_options", + async fn alter_options(&self, options: HashMap) -> Result { + let partition_num = match self.partition_info() { + None => UnexpectedWithMsg { + msg: "partition table partition info can't be empty", + } + .fail()?, + Some(partition_info) => partition_info.get_partition_num(), + }; + + // Alter options of partitions except the first one. + // Because the schema of partition table is stored in the first partition. + let mut futures = FuturesUnordered::new(); + for id in 1..partition_num { + let partition = self + .remote_engine + .alter_table_options(AlterTableOptionsRequest { + table_ident: self.get_sub_table_ident(id), + options: options.clone(), + }); + futures.push(partition); } - .fail() + + let mut alter_err = None; + while let Some(alter_ret) = futures.next().await { + if let Err(e) = &alter_ret { + error!("Alter options failed, table_name:{}, err:{e}", self.name()); + alter_err.get_or_insert( + alter_ret + .box_err() + .context(AlterOptions { table: self.name() }), + ); + } + } + + // Remove the first error. + if let Some(ret) = alter_err { + ret?; + } + + // Alter options of the first partition. + self.remote_engine + .alter_table_options(AlterTableOptionsRequest { + table_ident: self.get_sub_table_ident(0), + options: options.clone(), + }) + .await + .box_err() + .with_context(|| AlterOptions { + table: self.get_sub_table_ident(0).table, + })?; + + Ok(0) } // Partition table is a virtual table, so it don't need to flush. diff --git a/proxy/src/write.rs b/proxy/src/write.rs index 6cfbe4c834..d1bc231ebf 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -38,7 +38,7 @@ use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt} use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; -use logger::{debug, error, info}; +use logger::{debug, error, info, warn}; use query_frontend::{ frontend::{Context as FrontendContext, Frontend}, plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan}, @@ -481,15 +481,15 @@ impl Proxy { ) -> Result { let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); - let catalog = self.instance.catalog_manager.default_catalog_name(); + let catalog_name = self.instance.catalog_manager.default_catalog_name(); let req_ctx = req.context.context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, })?; - let schema = req_ctx.database; + let schema_name = req_ctx.database; debug!( - "Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}", + "Local write begin, catalog:{catalog_name}, schema:{schema_name}, request_id:{request_id}, first_table:{:?}, num_tables:{}", req.table_requests .first() .map(|m| (&m.table, &m.tag_names, &m.field_names)), @@ -499,8 +499,8 @@ impl Proxy { let write_context = WriteContext { request_id, deadline, - catalog: catalog.to_string(), - schema: schema.clone(), + catalog: catalog_name.to_string(), + schema: schema_name.clone(), auto_create_table: self.auto_create_table, }; @@ -510,9 +510,30 @@ impl Proxy { let mut success = 0; for insert_plan in plan_vec { - success += self - .execute_insert_plan(request_id, catalog, &schema, insert_plan, deadline) - .await?; + let table = insert_plan.table.clone(); + match self + .execute_insert_plan( + request_id, + catalog_name, + &schema_name, + insert_plan, + deadline, + ) + .await + { + Ok(n) => { + success += n; + } + Err(e) => { + // TODO: remove this logic. + // Refer to https://github.com/CeresDB/ceresdb/issues/1248. + if e.error_message().contains("No field named") { + self.evict_partition_table(table, catalog_name, &schema_name) + .await; + } + return Err(e); + } + } } Ok(WriteResponse { @@ -579,8 +600,8 @@ impl Proxy { async fn execute_insert_plan( &self, request_id: RequestId, - catalog: &str, - schema: &str, + catalog_name: &str, + schema_name: &str, insert_plan: InsertPlan, deadline: Option, ) -> Result { @@ -591,7 +612,7 @@ impl Proxy { ); let plan = Plan::Insert(insert_plan); let output = self - .execute_plan(request_id, catalog, schema, plan, deadline) + .execute_plan(request_id, catalog_name, schema_name, plan, deadline) .await; output.and_then(|output| match output { Output::AffectedRows(n) => Ok(n), @@ -664,6 +685,31 @@ impl Proxy { info!("Add columns success, request_id:{request_id}, table:{table_name}"); Ok(()) } + + async fn evict_partition_table(&self, table: TableRef, catalog_name: &str, schema_name: &str) { + if table.partition_info().is_some() { + let catalog = self.get_catalog(catalog_name); + if catalog.is_err() { + return; + } + let schema = self.get_schema(&catalog.unwrap(), schema_name); + if schema.is_err() { + return; + } + let _ = self + .drop_partition_table( + schema.unwrap(), + catalog_name.to_string(), + schema_name.to_string(), + table.name().to_string(), + ) + .await + .map_err(|e| { + warn!("Failed to drop partition table, err:{:?}", e); + e + }); + } + } } fn find_new_columns( diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index aca9b7a933..c4830e86d7 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -32,17 +32,19 @@ use ceresdbproto::{ use common_types::{record_batch::RecordBatch, schema::RecordSchema}; use futures::{Stream, StreamExt}; use generic_error::BoxError; -use logger::info; +use logger::{error, info}; use router::RouterRef; use runtime::Runtime; use snafu::{ensure, OptionExt, ResultExt}; use table_engine::{ remote::model::{ - ExecutePlanRequest, GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, - WriteBatchRequest, WriteBatchResult, WriteRequest, + AlterTableOptionsRequest, AlterTableSchemaRequest, ExecutePlanRequest, GetTableInfoRequest, + ReadRequest, TableIdentifier, TableInfo, WriteBatchRequest, WriteBatchResult, WriteRequest, }, table::{SchemaId, TableId}, }; +use time_ext::ReadableDuration; +use tokio::time::sleep; use tonic::{transport::Channel, Request, Streaming}; use crate::{cached_router::CachedRouter, config::Config, error::*, status_code}; @@ -57,17 +59,23 @@ pub struct Client { cached_router: Arc, io_runtime: Arc, pub compression: CompressOptions, + max_retry: usize, + retry_interval: ReadableDuration, } impl Client { pub fn new(config: Config, router: RouterRef, io_runtime: Arc) -> Self { let compression = config.compression; + let max_retry = config.max_retry; + let retry_interval = config.retry_interval; let cached_router = CachedRouter::new(router, config); Self { cached_router: Arc::new(cached_router), io_runtime, compression, + max_retry, + retry_interval, } } @@ -239,6 +247,118 @@ impl Client { Ok(results) } + pub async fn alter_table_schema(&self, request: AlterTableSchemaRequest) -> Result<()> { + // Find the channel from router firstly. + let route_context = self.cached_router.route(&request.table_ident).await?; + + let table_ident = request.table_ident.clone(); + let request_pb: ceresdbproto::remote_engine::AlterTableSchemaRequest = request.into(); + let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); + + let mut result = Ok(()); + // Alter schema to remote engine with retry. + // TODO: Define a macro to reuse the retry logic. + for i in 0..(self.max_retry + 1) { + let resp = rpc_client + .alter_table_schema(Request::new(request_pb.clone())) + .await + .with_context(|| Rpc { + table_idents: vec![table_ident.clone()], + msg: "Failed to alter schema to remote engine", + }); + + let resp = resp.and_then(|response| { + let response = response.into_inner(); + if let Some(header) = &response.header && !status_code::is_ok(header.code) { + Server { + table_idents: vec![table_ident.clone()], + code: header.code, + msg: header.error.clone(), + }.fail() + } else { + Ok(()) + } + }); + + if let Err(e) = resp { + error!( + "Failed to alter schema to remote engine, + table:{table_ident:?}, err:{e}" + ); + + result = Err(e); + + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.evict_route_from_cache(&[table_ident.clone()]).await; + + // Break if it's the last retry. + if i == self.max_retry { + break; + } + + sleep(self.retry_interval.0).await; + continue; + } + return Ok(()); + } + result + } + + pub async fn alter_table_options(&self, request: AlterTableOptionsRequest) -> Result<()> { + // Find the channel from router firstly. + let route_context = self.cached_router.route(&request.table_ident).await?; + + let table_ident = request.table_ident.clone(); + let request_pb: ceresdbproto::remote_engine::AlterTableOptionsRequest = request.into(); + let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); + + let mut result = Ok(()); + // Alter options to remote engine with retry. + for i in 0..(self.max_retry + 1) { + let resp = rpc_client + .alter_table_options(Request::new(request_pb.clone())) + .await + .with_context(|| Rpc { + table_idents: vec![table_ident.clone()], + msg: "Failed to alter options to remote engine", + }); + + let resp = resp.and_then(|response| { + let response = response.into_inner(); + if let Some(header) = &response.header && !status_code::is_ok(header.code) { + Server { + table_idents: vec![table_ident.clone()], + code: header.code, + msg: header.error.clone(), + }.fail() + } else { + Ok(()) + } + }); + + if let Err(e) = resp { + error!("Failed to alter options to remote engine, table:{table_ident:?}, err:{e}"); + + result = Err(e); + + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.evict_route_from_cache(&[table_ident.clone()]).await; + + // Break if it's the last retry. + if i == self.max_retry { + break; + } + + sleep(self.retry_interval.0).await; + continue; + } + return Ok(()); + } + result + } + pub async fn get_table_info(&self, request: GetTableInfoRequest) -> Result { // Find the channel from router firstly. let route_context = self.cached_router.route(&request.table).await?; @@ -267,9 +387,9 @@ impl Client { code: header.code, msg: header.error.clone(), }.fail() - } else { - Ok(response) - } + } else { + Ok(response) + } }); match result { diff --git a/remote_engine_client/src/config.rs b/remote_engine_client/src/config.rs index c3c1466425..b7fd75b8f3 100644 --- a/remote_engine_client/src/config.rs +++ b/remote_engine_client/src/config.rs @@ -14,8 +14,6 @@ //! Config for [Client] -use std::str::FromStr; - use arrow_ext::ipc::CompressOptions; use serde::{Deserialize, Serialize}; use time_ext::ReadableDuration; @@ -32,20 +30,24 @@ pub struct Config { pub route_cache_max_size_per_partition: usize, pub route_cache_partition_num: usize, pub compression: CompressOptions, + pub max_retry: usize, + pub retry_interval: ReadableDuration, } impl Default for Config { fn default() -> Self { Self { - connect_timeout: ReadableDuration::from_str("3s").unwrap(), + connect_timeout: ReadableDuration::secs(3), channel_pool_max_size_per_partition: 16, channel_pool_partition_num: 16, - channel_keep_alive_interval: ReadableDuration::from_str("600s").unwrap(), - channel_keep_alive_timeout: ReadableDuration::from_str("3s").unwrap(), + channel_keep_alive_interval: ReadableDuration::secs(600), + channel_keep_alive_timeout: ReadableDuration::secs(3), channel_keep_alive_while_idle: true, route_cache_max_size_per_partition: 16, route_cache_partition_num: 16, compression: CompressOptions::default(), + max_retry: 5, + retry_interval: ReadableDuration::secs(5), } } } diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index cf57315c89..e6cb6ab24a 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -41,8 +41,8 @@ use table_engine::{ remote::{ self, model::{ - ExecutePlanRequest, GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, - WriteRequest, + AlterTableOptionsRequest, AlterTableSchemaRequest, ExecutePlanRequest, + GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, WriteRequest, }, RemoteEngine, }, @@ -163,6 +163,22 @@ impl RemoteEngine for RemoteEngineImpl { .context(remote::Write) } + async fn alter_table_schema(&self, request: AlterTableSchemaRequest) -> remote::Result<()> { + self.client + .alter_table_schema(request) + .await + .box_err() + .context(remote::AlterSchema) + } + + async fn alter_table_options(&self, request: AlterTableOptionsRequest) -> remote::Result<()> { + self.client + .alter_table_options(request) + .await + .box_err() + .context(remote::AlterOptions) + } + async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result { self.client .get_table_info(request) diff --git a/server/src/grpc/metrics.rs b/server/src/grpc/metrics.rs index aa0c150f34..6af37ce147 100644 --- a/server/src/grpc/metrics.rs +++ b/server/src/grpc/metrics.rs @@ -42,6 +42,8 @@ make_auto_flush_static_metric! { get_table_info, write_batch, execute_physical_plan, + alter_table_schema, + alter_table_options, } pub struct RemoteEngineGrpcHandlerDurationHistogramVec: LocalHistogram { diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 0d6c140271..817ce1b7fe 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -28,7 +28,8 @@ use catalog::{manager::ManagerRef, schema::SchemaRef}; use ceresdbproto::{ remote_engine::{ execute_plan_request, read_response::Output::Arrow, - remote_engine_service_server::RemoteEngineService, row_group, ExecContext, + remote_engine_service_server::RemoteEngineService, row_group, AlterTableOptionsRequest, + AlterTableOptionsResponse, AlterTableSchemaRequest, AlterTableSchemaResponse, ExecContext, ExecutePlanRequest, GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteBatchRequest, WriteRequest, WriteResponse, }, @@ -56,7 +57,7 @@ use table_engine::{ predicate::PredicateRef, remote::model::{self, TableIdentifier}, stream::{PartitionedStreams, SendableRecordBatchStream}, - table::TableRef, + table::{AlterSchemaRequest, TableRef}, }; use time_ext::InstantExt; use tokio::sync::mpsc::{self, Sender}; @@ -654,6 +655,70 @@ impl RemoteEngineServiceImpl { )) } + async fn alter_table_schema_internal( + &self, + request: Request, + ) -> std::result::Result, Status> { + let begin_instant = Instant::now(); + let ctx = self.handler_ctx(); + let handle = self.runtimes.read_runtime.spawn(async move { + let request = request.into_inner(); + handle_alter_table_schema(ctx, request).await + }); + + let res = handle.await.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + }); + + let mut resp = AlterTableSchemaResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(error::build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(error::build_err_header(e)); + } + }; + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .alter_table_schema + .observe(begin_instant.saturating_elapsed().as_secs_f64()); + Ok(Response::new(resp)) + } + + async fn alter_table_options_internal( + &self, + request: Request, + ) -> std::result::Result, Status> { + let begin_instant = Instant::now(); + let ctx = self.handler_ctx(); + let handle = self.runtimes.read_runtime.spawn(async move { + let request = request.into_inner(); + handle_alter_table_options(ctx, request).await + }); + + let res = handle.await.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + }); + + let mut resp = AlterTableOptionsResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(error::build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(error::build_err_header(e)); + } + }; + + REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .alter_table_options + .observe(begin_instant.saturating_elapsed().as_secs_f64()); + Ok(Response::new(resp)) + } + fn handler_ctx(&self) -> HandlerContext { HandlerContext { catalog_manager: self.instance.catalog_manager.clone(), @@ -740,6 +805,20 @@ impl RemoteEngineService for RemoteEngineServiceImpl { record_stream_to_response_stream!(record_stream_result, ExecutePhysicalPlanStream) } + + async fn alter_table_schema( + &self, + request: Request, + ) -> std::result::Result, Status> { + self.alter_table_schema_internal(request).await + } + + async fn alter_table_options( + &self, + request: Request, + ) -> std::result::Result, Status> { + self.alter_table_options_internal(request).await + } } async fn handle_stream_read( @@ -1003,6 +1082,83 @@ async fn handle_execute_plan( }) } +async fn handle_alter_table_schema( + ctx: HandlerContext, + request: AlterTableSchemaRequest, +) -> Result<()> { + let request: table_engine::remote::model::AlterTableSchemaRequest = + request.try_into().box_err().context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert alter table schema", + })?; + + let schema = find_schema_by_identifier(&ctx, &request.table_ident)?; + let table = schema + .table_by_name(&request.table_ident.table) + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to get table, table:{}", request.table_ident.table), + })? + .context(ErrNoCause { + code: StatusCode::NotFound, + msg: format!("table is not found, table:{}", request.table_ident.table), + })?; + + table + .alter_schema(AlterSchemaRequest { + schema: request.table_schema, + pre_schema_version: request.pre_schema_version, + }) + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!( + "fail to alter table schema, table:{}", + request.table_ident.table + ), + })?; + Ok(()) +} + +async fn handle_alter_table_options( + ctx: HandlerContext, + request: AlterTableOptionsRequest, +) -> Result<()> { + let request: table_engine::remote::model::AlterTableOptionsRequest = + request.try_into().box_err().context(ErrWithCause { + code: StatusCode::BadRequest, + msg: "fail to convert alter table options", + })?; + + let schema = find_schema_by_identifier(&ctx, &request.table_ident)?; + let table = schema + .table_by_name(&request.table_ident.table) + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!("fail to get table, table:{}", request.table_ident.table), + })? + .context(ErrNoCause { + code: StatusCode::NotFound, + msg: format!("table is not found, table:{}", request.table_ident.table), + })?; + + table + .alter_options(request.options) + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: format!( + "fail to alter table options, table:{}", + request.table_ident.table + ), + })?; + Ok(()) +} + fn check_and_extract_plan( typed_plan: execute_plan_request::PhysicalPlan, engine_type: QueryEngineType, diff --git a/table_engine/src/memory.rs b/table_engine/src/memory.rs index 678e2276e5..b2f3fbf1bd 100644 --- a/table_engine/src/memory.rs +++ b/table_engine/src/memory.rs @@ -41,7 +41,10 @@ use crate::{ }, remote::{ self, - model::{self, ExecutePlanRequest, GetTableInfoRequest, WriteBatchResult}, + model::{ + self, AlterTableOptionsRequest, AlterTableSchemaRequest, ExecutePlanRequest, + GetTableInfoRequest, WriteBatchResult, + }, RemoteEngine, }, stream::{ @@ -361,6 +364,14 @@ impl RemoteEngine for MockRemoteEngine { unimplemented!() } + async fn alter_table_schema(&self, _requests: AlterTableSchemaRequest) -> remote::Result<()> { + unimplemented!() + } + + async fn alter_table_options(&self, _requests: AlterTableOptionsRequest) -> remote::Result<()> { + unimplemented!() + } + async fn get_table_info( &self, _request: GetTableInfoRequest, diff --git a/table_engine/src/remote/mod.rs b/table_engine/src/remote/mod.rs index e48d3e3284..571dd08797 100644 --- a/table_engine/src/remote/mod.rs +++ b/table_engine/src/remote/mod.rs @@ -25,7 +25,10 @@ use model::{ReadRequest, WriteRequest}; use snafu::Snafu; use crate::{ - remote::model::{ExecutePlanRequest, GetTableInfoRequest, TableInfo, WriteBatchResult}, + remote::model::{ + AlterTableOptionsRequest, AlterTableSchemaRequest, ExecutePlanRequest, GetTableInfoRequest, + TableInfo, WriteBatchResult, + }, stream::SendableRecordBatchStream, }; @@ -38,6 +41,12 @@ pub enum Error { #[snafu(display("Failed to write to remote, err:{}", source))] Write { source: GenericError }, + #[snafu(display("Failed to alter schema, err:{}", source))] + AlterSchema { source: GenericError }, + + #[snafu(display("Failed to alter options, err:{}", source))] + AlterOptions { source: GenericError }, + #[snafu(display("Failed to get table info from remote, err:{}", source))] GetTableInfo { source: GenericError }, @@ -58,6 +67,10 @@ pub trait RemoteEngine: fmt::Debug + Send + Sync { async fn write_batch(&self, requests: Vec) -> Result>; + async fn alter_table_schema(&self, request: AlterTableSchemaRequest) -> Result<()>; + + async fn alter_table_options(&self, request: AlterTableOptionsRequest) -> Result<()>; + async fn get_table_info(&self, request: GetTableInfoRequest) -> Result; async fn execute_physical_plan( diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 9f1a7583ce..7d55876c72 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -27,7 +27,7 @@ use common_types::{ contiguous::{ContiguousRow, ContiguousRowReader, ContiguousRowWriter}, Row, RowGroup, RowGroupBuilder, }, - schema::{IndexInWriterSchema, RecordSchema, Schema}, + schema::{IndexInWriterSchema, RecordSchema, Schema, Version}, }; use generic_error::{BoxError, GenericError, GenericResult}; use macros::define_result; @@ -247,6 +247,75 @@ pub struct WriteBatchResult { pub result: GenericResult, } +#[derive(Debug)] +pub struct AlterTableSchemaRequest { + pub table_ident: TableIdentifier, + pub table_schema: Schema, + /// Previous schema version before alteration. + pub pre_schema_version: Version, +} + +impl TryFrom for AlterTableSchemaRequest { + type Error = Error; + + fn try_from(value: remote_engine::AlterTableSchemaRequest) -> Result { + let table = value.table.context(EmptyTableIdentifier)?.into(); + let table_schema = value + .table_schema + .context(EmptyTableSchema)? + .try_into() + .box_err() + .context(ConvertTableSchema)?; + Ok(Self { + table_ident: table, + table_schema, + pre_schema_version: value.pre_schema_version, + }) + } +} + +impl From for ceresdbproto::remote_engine::AlterTableSchemaRequest { + fn from(value: AlterTableSchemaRequest) -> Self { + let table = value.table_ident.into(); + let table_schema = (&value.table_schema).into(); + Self { + table: Some(table), + table_schema: Some(table_schema), + pre_schema_version: value.pre_schema_version, + } + } +} + +#[derive(Debug)] +pub struct AlterTableOptionsRequest { + pub table_ident: TableIdentifier, + pub options: HashMap, +} + +impl TryFrom for AlterTableOptionsRequest { + type Error = Error; + + fn try_from(value: remote_engine::AlterTableOptionsRequest) -> Result { + let table = value.table.context(EmptyTableIdentifier)?.into(); + let options = value.options; + Ok(Self { + table_ident: table, + options, + }) + } +} + +impl From for ceresdbproto::remote_engine::AlterTableOptionsRequest { + fn from(value: AlterTableOptionsRequest) -> Self { + let table = value.table_ident.into(); + let options = value.options; + Self { + table: Some(table), + options, + } + } +} + pub struct GetTableInfoRequest { pub table: TableIdentifier, }