Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove drop_source rpc in CN #5849

Merged
merged 16 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message DropSourceRequest {
uint32 source_id = 1;
}

message DropSourceResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand All @@ -119,7 +111,6 @@ service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
}
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct DeleteExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
schema: Schema,
identity: String,
Expand All @@ -42,7 +42,7 @@ pub struct DeleteExecutor {
impl DeleteExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -145,15 +145,15 @@ mod tests {
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct InsertExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

child: BoxedExecutor,
schema: Schema,
Expand All @@ -43,7 +43,7 @@ pub struct InsertExecutor {
impl InsertExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::ReadOptions;
use risingwave_storage::*;
Expand All @@ -170,7 +170,7 @@ mod tests {

#[tokio::test]
async fn test_insert_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -37,7 +37,7 @@ use crate::task::BatchTaskContext;
pub struct UpdateExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
schema: Schema,
Expand All @@ -47,7 +47,7 @@ pub struct UpdateExecutor {
impl UpdateExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
identity: String,
Expand Down Expand Up @@ -200,15 +200,15 @@ mod tests {
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_expr::expr::InputRefExpression;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_update_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;
use risingwave_storage::StateStoreImpl;

use super::TaskId;
Expand All @@ -39,7 +39,7 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {
/// Whether `peer_addr` is in same as current task.
fn is_local_addr(&self, peer_addr: &HostAddr) -> bool;

fn source_manager(&self) -> SourceManagerRef;
fn source_manager(&self) -> TableSourceManagerRef;

fn state_store(&self) -> StateStoreImpl;

Expand Down Expand Up @@ -78,7 +78,7 @@ impl BatchTaskContext for ComputeNodeContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
self.env.source_manager_ref()
}

Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use risingwave_common::config::BatchConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::{SourceManager, SourceManagerRef};
use risingwave_source::{TableSourceManager, TableSourceManagerRef};
use risingwave_storage::StateStoreImpl;

use crate::executor::BatchTaskMetrics;
Expand All @@ -36,7 +36,7 @@ pub struct BatchEnvironment {
task_manager: Arc<BatchManager>,

/// Reference to the source manager. This is used to query the sources.
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

/// Batch related configurations.
config: Arc<BatchConfig>,
Expand All @@ -57,7 +57,7 @@ pub struct BatchEnvironment {
impl BatchEnvironment {
#[allow(clippy::too_many_arguments)]
pub fn new(
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
task_manager: Arc<BatchManager>,
server_addr: HostAddr,
config: Arc<BatchConfig>,
Expand Down Expand Up @@ -108,11 +108,11 @@ impl BatchEnvironment {
}

#[expect(clippy::explicit_auto_deref)]
pub fn source_manager(&self) -> &dyn SourceManager {
pub fn source_manager(&self) -> &dyn TableSourceManager {
&*self.source_manager
}

pub fn source_manager_ref(&self) -> SourceManagerRef {
pub fn source_manager_ref(&self) -> TableSourceManagerRef {
self.source_manager.clone()
}

Expand Down
25 changes: 4 additions & 21 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use async_stack_trace::StackTrace;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::tonic_err;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
Expand Down Expand Up @@ -103,7 +102,8 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actor(&actors)?;
let source_mgr = self.env.source_manager();
self.mgr.drop_actor(source_mgr, &actors)?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
Expand All @@ -116,7 +116,8 @@ impl StreamService for StreamServiceImpl {
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.stop_all_actors().await?;
let source_mgr = self.env.source_manager();
self.mgr.stop_all_actors(source_mgr).await?;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand Down Expand Up @@ -197,22 +198,4 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn drop_source(
&self,
request: Request<DropSourceRequest>,
) -> Result<Response<DropSourceResponse>, Status> {
let id = request.into_inner().source_id;
let id = TableId::new(id); // TODO: use SourceId instead

self.env
.source_manager()
.drop_source(&id)
.map_err(tonic_err)?;

tracing::debug!(id = %id, "drop source");

Ok(Response::new(DropSourceResponse { status: None }))
}
}
4 changes: 2 additions & 2 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::streaming_table::state_table::StateTable;
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_table_materialize() -> StreamResult<()> {
use risingwave_stream::executor::state_table_handler::default_source_internal_table;

let memory_state_store = MemoryStateStore::new();
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let source_table_id = TableId::default();
let schema = Schema {
fields: vec![
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::catalog::pg_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
Expand Down Expand Up @@ -58,7 +58,7 @@ impl BatchTaskContext for FrontendBatchTaskContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
unimplemented!("not supported in local mode")
}

Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,6 @@ mod tests {
Ok(Response::new(InjectBarrierResponse::default()))
}

async fn drop_source(
&self,
_request: Request<DropSourceRequest>,
) -> std::result::Result<Response<DropSourceResponse>, Status> {
unimplemented!()
}

async fn barrier_complete(
&self,
_request: Request<BarrierCompleteRequest>,
Expand Down
1 change: 0 additions & 1 deletion src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ macro_rules! for_all_stream_rpc {
,{ 0, drop_actors, DropActorsRequest, DropActorsResponse }
,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse}
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, drop_source, DropSourceRequest, DropSourceResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
}
Expand Down
Loading