Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: risingwavelabs/risingwave
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 22fd88359751c300475947161e0420431ef3263d
Choose a base ref
..
head repository: risingwavelabs/risingwave
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: e1f410dcba3668323cbdd71025bfe3dc80f2b895
Choose a head ref
Showing with 3,585 additions and 2,487 deletions.
  1. +1 −0 proto/catalog.proto
  2. +6 −3 proto/common.proto
  3. +12 −0 proto/ddl_service.proto
  4. +1 −1 src/common/src/util/worker_util.rs
  5. +6 −6 src/compute/src/lib.rs
  6. +1 −1 src/compute/src/server.rs
  7. +2 −4 src/frontend/planner_test/tests/testdata/output/agg.yaml
  8. +4 −3 src/frontend/planner_test/tests/testdata/output/append_only.yaml
  9. +5 −4 src/frontend/planner_test/tests/testdata/output/batch_dist_agg.yaml
  10. +295 −268 src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml
  11. +17 −14 src/frontend/planner_test/tests/testdata/output/functional_dependency.yaml
  12. +7 −6 src/frontend/planner_test/tests/testdata/output/index_selection.yaml
  13. +1 −1 src/frontend/planner_test/tests/testdata/output/join.yaml
  14. +6 −4 src/frontend/planner_test/tests/testdata/output/limit.yaml
  15. +23 −19 src/frontend/planner_test/tests/testdata/output/nexmark.yaml
  16. +25 −21 src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml
  17. +25 −21 src/frontend/planner_test/tests/testdata/output/nexmark_source_kafka.yaml
  18. +46 −42 src/frontend/planner_test/tests/testdata/output/nexmark_temporal_filter.yaml
  19. +42 −38 src/frontend/planner_test/tests/testdata/output/nexmark_watermark.yaml
  20. +27 −22 src/frontend/planner_test/tests/testdata/output/order_by.yaml
  21. +8 −7 src/frontend/planner_test/tests/testdata/output/project_set.yaml
  22. +17 −17 src/frontend/planner_test/tests/testdata/output/range_scan.yaml
  23. +1 −1 src/frontend/planner_test/tests/testdata/output/row_filter.yaml
  24. +10 −20 src/frontend/planner_test/tests/testdata/output/singleton.yaml
  25. +114 −79 src/frontend/planner_test/tests/testdata/output/stream_dist_agg.yaml
  26. +20 −22 src/frontend/planner_test/tests/testdata/output/subquery.yaml
  27. +1 −1 src/frontend/planner_test/tests/testdata/output/topn.yaml
  28. +885 −816 src/frontend/planner_test/tests/testdata/output/tpch.yaml
  29. +612 −594 src/frontend/planner_test/tests/testdata/output/tpch_kafka.yaml
  30. +133 −125 src/frontend/planner_test/tests/testdata/output/tpch_variant.yaml
  31. +37 −3 src/frontend/src/catalog/catalog_service.rs
  32. +3 −3 src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs
  33. +99 −0 src/frontend/src/handler/alter_resource_group.rs
  34. +3 −1 src/frontend/src/handler/create_database.rs
  35. +20 −4 src/frontend/src/handler/create_mv.rs
  36. +19 −0 src/frontend/src/handler/mod.rs
  37. +12 −0 src/frontend/src/optimizer/mod.rs
  38. +26 −17 src/frontend/src/optimizer/plan_node/stream_materialize.rs
  39. +20 −2 src/frontend/src/test_utils.rs
  40. +3 −3 src/meta/model/migration/src/lib.rs
  41. +0 −35 src/meta/model/migration/src/m20241022_072553_node_label.rs
  42. +89 −0 src/meta/model/migration/src/m20241202_071413_resource_group.rs
  43. +2 −0 src/meta/model/src/database.rs
  44. +1 −0 src/meta/model/src/streaming_job.rs
  45. +1 −1 src/meta/model/src/worker_property.rs
  46. +40 −2 src/meta/service/src/ddl_service.rs
  47. +25 −17 src/meta/service/src/scale_service.rs
  48. +7 −4 src/meta/src/barrier/command.rs
  49. +2 −2 src/meta/src/barrier/context/context_impl.rs
  50. +38 −38 src/meta/src/barrier/context/recovery.rs
  51. +1 −0 src/meta/src/controller/catalog/alter_op.rs
  52. +67 −0 src/meta/src/controller/catalog/get_op.rs
  53. +42 −14 src/meta/src/controller/cluster.rs
  54. +1 −0 src/meta/src/controller/mod.rs
  55. +9 −0 src/meta/src/controller/scale.rs
  56. +24 −7 src/meta/src/controller/streaming_job.rs
  57. +61 −3 src/meta/src/controller/utils.rs
  58. +19 −6 src/meta/src/manager/metadata.rs
  59. +21 −1 src/meta/src/model/stream.rs
  60. +53 −14 src/meta/src/rpc/ddl_controller.rs
  61. +211 −105 src/meta/src/stream/scale.rs
  62. +6 −2 src/meta/src/stream/stream_graph/actor.rs
  63. +63 −41 src/meta/src/stream/stream_manager.rs
  64. +8 −1 src/meta/src/stream/test_fragmenter.rs
  65. +6 −0 src/prost/src/lib.rs
  66. +19 −0 src/rpc_client/src/meta_client.rs
  67. +18 −0 src/sqlparser/src/ast/ddl.rs
  68. +2 −0 src/sqlparser/src/keywords.rs
  69. +25 −0 src/sqlparser/src/parser.rs
  70. +15 −0 src/tests/simulation/src/cluster.rs
  71. +1 −0 src/tests/simulation/src/main.rs
  72. +1 −1 src/tests/simulation/src/nexmark.rs
  73. +1 −0 src/tests/simulation/tests/integration_tests/scale/mod.rs
  74. +111 −0 src/tests/simulation/tests/integration_tests/scale/resource_group.rs
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
@@ -524,6 +524,7 @@ message Database {
uint32 id = 1;
string name = 2;
uint32 owner = 3;
string resource_group = 4;
}

message Comment {
9 changes: 6 additions & 3 deletions proto/common.proto
Original file line number Diff line number Diff line change
@@ -56,11 +56,14 @@ message WorkerNode {
bool is_unschedulable = 3;
// This is used for frontend node to register its rpc address
string internal_rpc_host_addr = 4;
// Meta may assign labels to worker nodes to partition workload by label.
// This is used for serverless backfilling of materialized views.
optional string node_label = 5;

reserved 5;
reserved "node_label";

uint32 parallelism = 6;

// resource group for scheduling
optional string resource_group = 7;
}
message Resource {
string rw_version = 1;
12 changes: 12 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
@@ -144,6 +144,9 @@ message CreateMaterializedViewRequest {

// The list of object IDs that this materialized view depends on.
repeated uint32 dependencies = 4;

// The specific resource group to use for the materialized view. If not set, the database resource group is used.
optional string specific_resource_group = 5;
}

message CreateMaterializedViewResponse {
@@ -271,6 +274,14 @@ message AlterParallelismRequest {

message AlterParallelismResponse {}

message AlterResourceGroupRequest {
uint32 table_id = 1;
optional string resource_group = 2;
bool deferred = 3;
}

message AlterResourceGroupResponse {}

message AlterOwnerResponse {
common.Status status = 1;
WaitVersion version = 2;
@@ -550,6 +561,7 @@ service DdlService {
rpc AlterOwner(AlterOwnerRequest) returns (AlterOwnerResponse);
rpc AlterSetSchema(AlterSetSchemaRequest) returns (AlterSetSchemaResponse);
rpc AlterParallelism(AlterParallelismRequest) returns (AlterParallelismResponse);
rpc AlterResourceGroup(AlterResourceGroupRequest) returns (AlterResourceGroupResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
2 changes: 1 addition & 1 deletion src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
@@ -14,4 +14,4 @@

pub type WorkerNodeId = u32;

pub const DEFAULT_COMPUTE_NODE_LABEL: &str = "default";
pub const DEFAULT_RESOURCE_GROUP: &str = "default";
12 changes: 6 additions & 6 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ use risingwave_common::util::meta_addr::MetaAddressStrategy;
use risingwave_common::util::resource_util::cpu::total_cpu_available;
use risingwave_common::util::resource_util::memory::system_memory_available_bytes;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_common::util::worker_util::DEFAULT_COMPUTE_NODE_LABEL;
use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
use serde::{Deserialize, Serialize};

/// If `total_memory_bytes` is not specified, the default memory limit will be set to
@@ -113,9 +113,9 @@ pub struct ComputeNodeOpts {
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
pub parallelism: usize,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_NODE_LABEL", default_value_t = default_node_label())]
pub node_label: String,
/// Resource group for scheduling, default value is "default"
#[clap(long, env = "RW_RESOURCE_GROUP", default_value_t = default_resource_group())]
pub resource_group: String,

/// Decides whether the compute node can be used for streaming and serving.
#[clap(long, env = "RW_COMPUTE_NODE_ROLE", value_enum, default_value_t = default_role())]
@@ -262,8 +262,8 @@ pub fn default_parallelism() -> usize {
total_cpu_available().ceil() as usize
}

pub fn default_node_label() -> String {
DEFAULT_COMPUTE_NODE_LABEL.to_owned()
pub fn default_resource_group() -> String {
DEFAULT_RESOURCE_GROUP.to_owned()
}

pub fn default_role() -> Role {
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ pub async fn compute_node_serve(
is_serving: opts.role.for_serving(),
is_unschedulable: false,
internal_rpc_host_addr: "".to_owned(),
node_label: Some(opts.node_label.clone()),
resource_group: Some(opts.resource_group.clone()),
},
&config.meta,
)
6 changes: 2 additions & 4 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
@@ -312,8 +312,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSortAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [mv.v1 DESC], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
- sql: |
create table t(v1 int, v2 int);
select v1, max(v2) from t group by v1 order by v1 desc;
@@ -367,8 +366,7 @@
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [mv.v1], aggs: [max(mv.v2)] }
└─BatchExchange { order: [], dist: HashShard(mv.v1) }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: SomeShard }
└─BatchScan { table: mv, columns: [mv.v1, mv.v2], distribution: UpstreamHashShard(mv.v1) }
with_config_map:
RW_BATCH_ENABLE_SORT_AGG: 'false'
- name: Not use BatchSortAgg, when output requires order
Original file line number Diff line number Diff line change
@@ -25,9 +25,10 @@
select v1 from t1 order by v1 limit 3 offset 3;
stream_plan: |-
StreamMaterialize { columns: [v1, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [v1, t1._row_id], pk_conflict: NoCheck }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTopN [append_only] { order: [t1.v1 ASC], limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
- sql: |
create table t1 (v1 int, v2 int) append only;
select max(v1) as max_v1 from t1;
Original file line number Diff line number Diff line change
@@ -11,9 +11,10 @@
sql: |
select max(v) as a1 from S;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: Single }
BatchSimpleAgg { aggs: [max(max(s.v))] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: SomeShard }
batch_local_plan: |-
BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: Single }
@@ -160,7 +161,7 @@
└─BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: HashShard(s.k) }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: Single }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: SomeShard }
batch_local_plan: |-
BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
Loading