Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove create_source/sync_source rpc in CN(#5269) #5654

Merged
merged 12 commits into from
Oct 6, 2022
12 changes: 5 additions & 7 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ message ActorMapping {

// todo: StreamSourceNode or TableSourceNode
message SourceNode {
enum SourceType {
UNSPECIFIED = 0;
TABLE = 1;
SOURCE = 2;
}
uint32 source_id = 1; // use source_id to fetch SourceDesc from local source manager
repeated int32 column_ids = 2;
SourceType source_type = 3;
catalog.Table state_table = 4;
catalog.Table state_table = 3;
oneof info {
catalog.StreamSourceInfo stream_source = 4;
catalog.TableSourceInfo table_source = 5;
}
}

message SinkNode {
Expand Down
19 changes: 0 additions & 19 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package stream_service;

import "catalog.proto";
import "common.proto";
import "hummock.proto";
import "stream_plan.proto";
Expand Down Expand Up @@ -97,14 +96,6 @@ message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message CreateSourceRequest {
catalog.Source source = 1;
}

message CreateSourceResponse {
common.Status status = 1;
}

message DropSourceRequest {
uint32 source_id = 1;
}
Expand All @@ -113,14 +104,6 @@ message DropSourceResponse {
common.Status status = 1;
}

message SyncSourcesRequest {
repeated catalog.Source sources = 1;
}

message SyncSourcesResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand All @@ -136,8 +119,6 @@ service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc CreateSource(CreateSourceRequest) returns (CreateSourceResponse);
rpc SyncSources(SyncSourcesRequest) returns (SyncSourcesResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
Expand Down
33 changes: 8 additions & 25 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,18 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::Array;
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand All @@ -164,19 +165,6 @@ mod tests {
// Schema of the table
let schema = schema_test_utils::ii();

let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

mock_executor.add(DataChunk::from_pretty(
"i i
1 2
Expand All @@ -185,20 +173,15 @@ mod tests {
7 8
9 10",
));
let row_id_index = None;
let pk_column_ids = vec![1];

let info = create_table_info(&schema, None, vec![1]);

// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source.stream_reader(vec![0.into(), 1.into()]).await?;

Expand Down
31 changes: 7 additions & 24 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::{Array, ArrayImpl, I32Array, StructArray};
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::ReadOptions;
use risingwave_storage::*;
Expand All @@ -172,7 +173,7 @@ mod tests {

#[tokio::test]
async fn test_insert_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
Expand All @@ -190,18 +191,6 @@ mod tests {
let mut schema = schema_test_utils::ii();
schema.fields.push(struct_field);
schema.fields.push(Field::unnamed(DataType::Int64)); // row_id column
let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

let col1 = column_nonnull! { I32Array, [1, 3, 5, 7, 9] };
let col2 = column_nonnull! { I32Array, [2, 4, 6, 8, 10] };
Expand All @@ -219,20 +208,14 @@ mod tests {
mock_executor.add(data_chunk.clone());

// To match the row_id column in the schema
let row_id_index = Some(3);
let pk_column_ids = vec![3];
let info = create_table_info(&schema, Some(3), vec![3]);

// Create the table.
let table_id = TableId::new(0);
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source
.stream_reader(vec![0.into(), 1.into(), 2.into()])
Expand Down
32 changes: 7 additions & 25 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,19 @@ mod tests {

use futures::StreamExt;
use risingwave_common::array::Array;
use risingwave_common::catalog::{schema_test_utils, ColumnDesc, ColumnId};
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_expr::expr::InputRefExpression;
use risingwave_source::{MemSourceManager, SourceManager};
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_update_executor() -> Result<()> {
let source_manager = Arc::new(MemSourceManager::default());
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand All @@ -216,19 +217,6 @@ mod tests {
// Schema of the table
let schema = schema_test_utils::ii();

let table_columns: Vec<_> = schema
.fields
.iter()
.enumerate()
.map(|(i, f)| ColumnDesc {
data_type: f.data_type.clone(),
column_id: ColumnId::from(i as i32), // use column index as column id
name: f.name.clone(),
field_descs: vec![],
type_name: "".to_string(),
})
.collect();

mock_executor.add(DataChunk::from_pretty(
"i i
1 2
Expand All @@ -245,18 +233,12 @@ mod tests {
];

// Create the table.
let info = create_table_info(&schema, None, vec![1]);
let table_id = TableId::new(0);
let row_id_index = None;
let pk_column_ids = vec![1];
source_manager.create_table_source(
&table_id,
table_columns.to_vec(),
row_id_index,
pk_column_ids,
)?;
let source_builder = SourceDescBuilder::new(table_id, &info, &source_manager);

// Create reader
let source_desc = source_manager.get_source(&table_id)?;
let source_desc = source_builder.build().await?;
let source = source_desc.source.as_table().unwrap();
let mut reader = source.stream_reader(vec![0.into(), 1.into()]).await?;

Expand Down
7 changes: 7 additions & 0 deletions src/common/src/util/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::stream_plan::source_node::Info as SourceInfo;
use risingwave_pb::stream_plan::SourceNode;
use risingwave_pb::{batch_plan, data};

pub trait TypeUrl {
Expand All @@ -29,3 +31,8 @@ impl TypeUrl for data::Column {
"type.googleapis.com/data.Column"
}
}

#[inline(always)]
pub fn is_stream_source(source_node: &SourceNode) -> bool {
matches!(source_node.info.as_ref(), Some(SourceInfo::StreamSource(_)))
}
66 changes: 1 addition & 65 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use std::sync::Arc;
use async_stack_trace::StackTrace;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{tonic_err, Result as RwResult};
use risingwave_pb::catalog::Source;
use risingwave_common::error::tonic_err;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
Expand Down Expand Up @@ -199,35 +198,6 @@ impl StreamService for StreamServiceImpl {
Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn create_source(
&self,
request: Request<CreateSourceRequest>,
) -> Result<Response<CreateSourceResponse>, Status> {
let source = request.into_inner().source.unwrap();
self.create_source_inner(&source).await.map_err(tonic_err)?;
tracing::debug!(id = %source.id, "create table source");

Ok(Response::new(CreateSourceResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn sync_sources(
&self,
request: Request<SyncSourcesRequest>,
) -> Result<Response<SyncSourcesResponse>, Status> {
let sources = request.into_inner().sources;
self.env
.source_manager()
.clear_sources()
.map_err(tonic_err)?;
for source in sources {
self.create_source_inner(&source).await.map_err(tonic_err)?
}

Ok(Response::new(SyncSourcesResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn drop_source(
&self,
Expand All @@ -246,37 +216,3 @@ impl StreamService for StreamServiceImpl {
Ok(Response::new(DropSourceResponse { status: None }))
}
}

impl StreamServiceImpl {
async fn create_source_inner(&self, source: &Source) -> RwResult<()> {
use risingwave_pb::catalog::source::Info;

let id = TableId::new(source.id); // TODO: use SourceId instead

match source.get_info()? {
Info::StreamSource(info) => {
self.env
.source_manager()
.create_source(&id, info.to_owned())
.await?;
}
Info::TableSource(info) => {
let columns = info
.columns
.iter()
.cloned()
.map(|c| c.column_desc.unwrap().into())
.collect_vec();

self.env.source_manager().create_table_source(
&id,
columns,
info.row_id_index.as_ref().map(|index| index.index as _),
info.pk_column_ids.clone(),
)?;
}
};

Ok(())
}
}
Loading