Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support alter partition table #1244

Merged
merged 6 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.16"
ceresdbproto = "1.0.17"
codec = { path = "components/codec" }
notifier = { path = "components/notifier" }
chrono = "0.4"
Expand Down
19 changes: 16 additions & 3 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ impl<'a> Alterer<'a> {
);

// Validate alter schema request.
self.validate_before_alter(&request)?;
// if the alter schema request is idempotent, we can skip the alter operation.
if self.validate_before_alter(&request)? {
info!(
"Skip alter because of the altered schema is the same as the current, table:{}",
self.table_data.name
);
return Ok(());
}

// Now we can persist and update the schema, since this function is called by
// write worker, so there is no other concurrent writer altering the
Expand Down Expand Up @@ -157,14 +164,20 @@ impl<'a> Alterer<'a> {

// Most validation should be done by catalog module, so we don't do too much
// duplicate check here, especially the schema compatibility.
fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<()> {
// The returned value denotes whether the altered schema is same as the current
// one.
fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<bool> {
ensure!(
!self.table_data.is_dropped(),
AlterDroppedTable {
table: &self.table_data.name,
}
);

if self.table_data.schema().columns() == request.schema.columns() {
return Ok(true);
}

let current_version = self.table_data.schema_version();
ensure!(
current_version < request.schema.version(),
Expand All @@ -184,7 +197,7 @@ impl<'a> Alterer<'a> {
}
);

Ok(())
Ok(false)
}

pub async fn alter_options_of_table(
Expand Down
18 changes: 1 addition & 17 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_types::{column_schema::ColumnSchema, table::ShardId};
use common_types::table::ShardId;
use generic_error::GenericError;
use macros::define_result;
use snafu::{Backtrace, Snafu};
Expand Down Expand Up @@ -338,22 +338,6 @@ pub struct CloseOptions {
pub table_engine: TableEngineRef,
}

/// Alter table operations.
#[derive(Debug)]
pub enum AlterTableOperation {
/// Add column operation, the column id in [ColumnSchema] will be ignored.
/// Primary key column is not allowed to be added, so all columns will
/// be added as normal columns.
AddColumn(ColumnSchema),
}

/// Alter table request.
#[derive(Debug)]
pub struct AlterTableRequest {
pub table_name: String,
pub operations: Vec<AlterTableOperation>,
}

#[derive(Debug, Clone)]
pub struct OpenShardRequest {
/// Shard id
Expand Down
32 changes: 32 additions & 0 deletions integration_tests/cases/env/cluster/ddl/partition_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,38 @@ UInt64(9923681778193615344),Timestamp(1651737067000),String("ceresdb8"),Int32(0)
UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0),Double(109.0),


ALTER TABLE partition_table_t ADD COLUMN (b string);

affected_rows: 0

ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';

affected_rows: 0

SHOW CREATE TABLE __partition_table_t_0;

Table,Create Table,
String("__partition_table_t_0"),String("CREATE TABLE `__partition_table_t_0` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"),


SHOW CREATE TABLE __partition_table_t_1;

Table,Create Table,
String("__partition_table_t_1"),String("CREATE TABLE `__partition_table_t_1` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"),


SHOW CREATE TABLE __partition_table_t_2;

Table,Create Table,
String("__partition_table_t_2"),String("CREATE TABLE `__partition_table_t_2` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"),


SHOW CREATE TABLE __partition_table_t_3;

Table,Create Table,
String("__partition_table_t_3"),String("CREATE TABLE `__partition_table_t_3` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, `b` string, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', memtable_type='skiplist', num_rows_per_row_group='8192', segment_duration='2h', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"),


DROP TABLE IF EXISTS `partition_table_t`;

affected_rows: 0
Expand Down
12 changes: 12 additions & 0 deletions integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2

SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7","ceresdb8", "ceresdb9", "ceresdb10") order by name;

ALTER TABLE partition_table_t ADD COLUMN (b string);

ALTER TABLE partition_table_t MODIFY SETTING enable_ttl='true';

SHOW CREATE TABLE __partition_table_t_0;

SHOW CREATE TABLE __partition_table_t_1;

SHOW CREATE TABLE __partition_table_t_2;

SHOW CREATE TABLE __partition_table_t_3;

DROP TABLE IF EXISTS `partition_table_t`;

SHOW CREATE TABLE partition_table_t;
Expand Down
125 changes: 110 additions & 15 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_types::{
};
use futures::{stream::FuturesUnordered, StreamExt};
use generic_error::BoxError;
use logger::error;
use snafu::ResultExt;
use table_engine::{
partition::{
Expand All @@ -36,16 +37,16 @@ use table_engine::{
},
remote::{
model::{
ReadRequest as RemoteReadRequest, TableIdentifier, WriteBatchResult,
WriteRequest as RemoteWriteRequest,
AlterTableOptionsRequest, AlterTableSchemaRequest, ReadRequest as RemoteReadRequest,
TableIdentifier, WriteBatchResult, WriteRequest as RemoteWriteRequest,
},
RemoteEngineRef,
},
stream::{PartitionedStreams, SendableRecordBatchStream},
table::{
AlterSchemaRequest, CreatePartitionRule, FlushRequest, GetRequest, LocatePartitions,
ReadRequest, Result, Scan, Table, TableId, TableStats, UnexpectedWithMsg,
UnsupportedMethod, Write, WriteBatch, WriteRequest,
AlterOptions, AlterSchema, AlterSchemaRequest, CreatePartitionRule, FlushRequest,
GetRequest, LocatePartitions, ReadRequest, Result, Scan, Table, TableId, TableStats,
UnexpectedWithMsg, UnsupportedMethod, Write, WriteBatch, WriteRequest,
},
};

Expand Down Expand Up @@ -328,20 +329,114 @@ impl Table for PartitionTableImpl {
Ok(PartitionedStreams { streams })
}

async fn alter_schema(&self, _request: AlterSchemaRequest) -> Result<usize> {
UnsupportedMethod {
table: self.name(),
method: "alter_schema",
async fn alter_schema(&self, request: AlterSchemaRequest) -> Result<usize> {
let partition_num = match self.partition_info() {
None => UnexpectedWithMsg {
msg: "partition table partition info can't be empty",
}
.fail()?,
Some(partition_info) => partition_info.get_partition_num(),
};

// Alter schema of partitions except the first one.
// Because the schema of partition table is stored in the first partition.
let mut futures = FuturesUnordered::new();
for id in 1..partition_num {
let partition = self
.remote_engine
.alter_table_schema(AlterTableSchemaRequest {
table_ident: self.get_sub_table_ident(id),
table_schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
});
futures.push(partition);
}
.fail()

let mut alter_err = None;
while let Some(alter_ret) = futures.next().await {
if let Err(e) = &alter_ret {
error!("Alter schema failed, table_name:{}, err:{e}", self.name());
alter_err.get_or_insert(
alter_ret
.box_err()
.context(AlterSchema { table: self.name() }),
);
}
}

// Remove the first error.
if let Some(ret) = alter_err {
ret?;
}

// Alter schema of the first partition.
self.remote_engine
.alter_table_schema(AlterTableSchemaRequest {
table_ident: self.get_sub_table_ident(0),
table_schema: request.schema.clone(),
pre_schema_version: request.pre_schema_version,
})
.await
.box_err()
.with_context(|| AlterSchema {
table: self.get_sub_table_ident(0).table,
})?;

Ok(0)
}

async fn alter_options(&self, _options: HashMap<String, String>) -> Result<usize> {
UnsupportedMethod {
table: self.name(),
method: "alter_options",
async fn alter_options(&self, options: HashMap<String, String>) -> Result<usize> {
let partition_num = match self.partition_info() {
None => UnexpectedWithMsg {
msg: "partition table partition info can't be empty",
}
.fail()?,
Some(partition_info) => partition_info.get_partition_num(),
};

// Alter options of partitions except the first one.
// Because the schema of partition table is stored in the first partition.
let mut futures = FuturesUnordered::new();
for id in 1..partition_num {
let partition = self
.remote_engine
.alter_table_options(AlterTableOptionsRequest {
table_ident: self.get_sub_table_ident(id),
options: options.clone(),
});
futures.push(partition);
}
.fail()

let mut alter_err = None;
while let Some(alter_ret) = futures.next().await {
if let Err(e) = &alter_ret {
error!("Alter options failed, table_name:{}, err:{e}", self.name());
alter_err.get_or_insert(
alter_ret
.box_err()
.context(AlterOptions { table: self.name() }),
);
}
}

// Remove the first error.
if let Some(ret) = alter_err {
ret?;
}

// Alter options of the first partition.
self.remote_engine
.alter_table_options(AlterTableOptionsRequest {
table_ident: self.get_sub_table_ident(0),
options: options.clone(),
})
.await
.box_err()
.with_context(|| AlterOptions {
table: self.get_sub_table_ident(0).table,
})?;

Ok(0)
}

// Partition table is a virtual table, so it don't need to flush.
Expand Down
Loading