Skip to content

Commit

Permalink
refactor(source): remove drop_source rpc in CN (#5849)
Browse files Browse the repository at this point in the history
* remove drop_source rpc

* drop related tables when drop actor in drop_actor rpc

* impl insert_source/try_drop_source for source mgr

* fix unittest

* exclude stream source id from actor_tables

* Update src/source/src/manager.rs

Co-authored-by: August <[email protected]>

* Update src/source/src/manager.rs

Co-authored-by: August <[email protected]>

* rename SourceManager; clear actor_tables when doing stop_all_actors;

* use Weak/Arc instead of usize to count source ref

* repair unit tests

* merge sources and source_ref_count into a map;clear source map when
drop_all_actors.

* modification:

- remove TableSourceManager trait and use MemSourceManager directly.
- remove useless source desc during insert_source instead.

* add clear_source and call it during force_stop_actors.

* improve unittest coverage.

Co-authored-by: August <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 17, 2022
1 parent 5e1c97e commit 6d8c3c5
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 179 deletions.
9 changes: 0 additions & 9 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message DropSourceRequest {
uint32 source_id = 1;
}

message DropSourceResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand All @@ -119,7 +111,6 @@ service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
}
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct DeleteExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
schema: Schema,
identity: String,
Expand All @@ -42,7 +42,7 @@ pub struct DeleteExecutor {
impl DeleteExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -145,15 +145,15 @@ mod tests {
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{SourceDescBuilder, TableSourceManager, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct InsertExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

child: BoxedExecutor,
schema: Schema,
Expand All @@ -43,7 +43,7 @@ pub struct InsertExecutor {
impl InsertExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{SourceDescBuilder, TableSourceManager, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::ReadOptions;
use risingwave_storage::*;
Expand All @@ -170,7 +170,7 @@ mod tests {

#[tokio::test]
async fn test_insert_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -37,7 +37,7 @@ use crate::task::BatchTaskContext;
pub struct UpdateExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
schema: Schema,
Expand All @@ -47,7 +47,7 @@ pub struct UpdateExecutor {
impl UpdateExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
identity: String,
Expand Down Expand Up @@ -200,15 +200,15 @@ mod tests {
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_expr::expr::InputRefExpression;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{SourceDescBuilder, TableSourceManager, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_update_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;
use risingwave_storage::StateStoreImpl;

use super::TaskId;
Expand All @@ -39,7 +39,7 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {
/// Whether `peer_addr` is in same as current task.
fn is_local_addr(&self, peer_addr: &HostAddr) -> bool;

fn source_manager(&self) -> SourceManagerRef;
fn source_manager(&self) -> TableSourceManagerRef;

fn state_store(&self) -> StateStoreImpl;

Expand Down Expand Up @@ -78,7 +78,7 @@ impl BatchTaskContext for ComputeNodeContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
self.env.source_manager_ref()
}

Expand Down
13 changes: 6 additions & 7 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use risingwave_common::config::BatchConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::{SourceManager, SourceManagerRef};
use risingwave_source::{TableSourceManager, TableSourceManagerRef};
use risingwave_storage::StateStoreImpl;

use crate::executor::BatchTaskMetrics;
Expand All @@ -36,7 +36,7 @@ pub struct BatchEnvironment {
task_manager: Arc<BatchManager>,

/// Reference to the source manager. This is used to query the sources.
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

/// Batch related configurations.
config: Arc<BatchConfig>,
Expand All @@ -57,7 +57,7 @@ pub struct BatchEnvironment {
impl BatchEnvironment {
#[allow(clippy::too_many_arguments)]
pub fn new(
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
task_manager: Arc<BatchManager>,
server_addr: HostAddr,
config: Arc<BatchConfig>,
Expand All @@ -82,13 +82,12 @@ impl BatchEnvironment {
#[cfg(test)]
pub fn for_test() -> Self {
use risingwave_rpc_client::ComputeClientPool;
use risingwave_source::MemSourceManager;
use risingwave_storage::monitor::StateStoreMetrics;

BatchEnvironment {
task_manager: Arc::new(BatchManager::new(None)),
server_addr: "127.0.0.1:5688".parse().unwrap(),
source_manager: std::sync::Arc::new(MemSourceManager::default()),
source_manager: std::sync::Arc::new(TableSourceManager::default()),
config: Arc::new(BatchConfig::default()),
worker_id: WorkerNodeId::default(),
state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
Expand All @@ -108,11 +107,11 @@ impl BatchEnvironment {
}

#[expect(clippy::explicit_auto_deref)]
pub fn source_manager(&self) -> &dyn SourceManager {
pub fn source_manager(&self) -> &TableSourceManager {
&*self.source_manager
}

pub fn source_manager_ref(&self) -> SourceManagerRef {
pub fn source_manager_ref(&self) -> TableSourceManagerRef {
self.source_manager.clone()
}

Expand Down
20 changes: 1 addition & 19 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use async_stack_trace::StackTrace;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::tonic_err;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
Expand Down Expand Up @@ -117,6 +116,7 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.stop_all_actors().await?;
self.env.source_manager().clear_sources();
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand Down Expand Up @@ -197,22 +197,4 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn drop_source(
&self,
request: Request<DropSourceRequest>,
) -> Result<Response<DropSourceResponse>, Status> {
let id = request.into_inner().source_id;
let id = TableId::new(id); // TODO: use SourceId instead

self.env
.source_manager()
.drop_source(&id)
.map_err(tonic_err)?;

tracing::debug!(id = %id, "drop source");

Ok(Response::new(DropSourceResponse { status: None }))
}
}
4 changes: 2 additions & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer;
use risingwave_pb::task_service::task_service_server::TaskServiceServer;
use risingwave_rpc_client::{ComputeClientPool, ExtraInfoSourceRef, MetaClient};
use risingwave_source::monitor::SourceMetrics;
use risingwave_source::MemSourceManager;
use risingwave_source::TableSourceManager;
use risingwave_storage::hummock::compactor::{
CompactionExecutor, Compactor, CompactorContext, Context,
};
Expand Down Expand Up @@ -190,7 +190,7 @@ pub async fn compute_node_serve(
opts.enable_async_stack_trace,
opts.enable_managed_cache,
));
let source_mgr = Arc::new(MemSourceManager::new(
let source_mgr = Arc::new(TableSourceManager::new(
source_metrics,
stream_config.developer.stream_connector_message_buffer_size,
));
Expand Down
4 changes: 2 additions & 2 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{SourceDescBuilder, TableSourceManager, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::streaming_table::state_table::StateTable;
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_table_materialize() -> StreamResult<()> {
use risingwave_stream::executor::state_table_handler::default_source_internal_table;

let memory_state_store = MemoryStateStore::new();
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(TableSourceManager::default());
let source_table_id = TableId::default();
let schema = Schema {
fields: vec![
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::catalog::pg_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
Expand Down Expand Up @@ -58,7 +58,7 @@ impl BatchTaskContext for FrontendBatchTaskContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
unimplemented!("not supported in local mode")
}

Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,6 @@ mod tests {
Ok(Response::new(InjectBarrierResponse::default()))
}

async fn drop_source(
&self,
_request: Request<DropSourceRequest>,
) -> std::result::Result<Response<DropSourceResponse>, Status> {
unimplemented!()
}

async fn barrier_complete(
&self,
_request: Request<BarrierCompleteRequest>,
Expand Down
1 change: 0 additions & 1 deletion src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ macro_rules! for_all_stream_rpc {
,{ 0, drop_actors, DropActorsRequest, DropActorsResponse }
,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse}
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, drop_source, DropSourceRequest, DropSourceResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
}
Expand Down
1 change: 1 addition & 0 deletions src/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![feature(lint_reasons)]
#![feature(result_option_inspect)]
#![feature(generators)]
#![feature(hash_drain_filter)]

use std::collections::HashMap;
use std::fmt::Debug;
Expand Down
Loading

0 comments on commit 6d8c3c5

Please sign in to comment.