Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove drop_source rpc in CN #5849

Merged
merged 16 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct DeleteExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
schema: Schema,
identity: String,
Expand All @@ -42,7 +42,7 @@ pub struct DeleteExecutor {
impl DeleteExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -145,15 +145,15 @@ mod tests {
use risingwave_common::catalog::schema_test_utils;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_delete_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand All @@ -33,7 +33,7 @@ use crate::task::BatchTaskContext;
pub struct InsertExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

child: BoxedExecutor,
schema: Schema,
Expand All @@ -43,7 +43,7 @@ pub struct InsertExecutor {
impl InsertExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
identity: String,
) -> Self {
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::store::ReadOptions;
use risingwave_storage::*;
Expand All @@ -170,7 +170,7 @@ mod tests {

#[tokio::test]
async fn test_insert_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let store = MemoryStateStore::new();

// Make struct field
Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
Expand All @@ -37,7 +37,7 @@ use crate::task::BatchTaskContext;
pub struct UpdateExecutor {
/// Target table id.
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
schema: Schema,
Expand All @@ -47,7 +47,7 @@ pub struct UpdateExecutor {
impl UpdateExecutor {
pub fn new(
table_id: TableId,
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
child: BoxedExecutor,
exprs: Vec<BoxedExpression>,
identity: String,
Expand Down Expand Up @@ -200,15 +200,15 @@ mod tests {
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_expr::expr::InputRefExpression;
use risingwave_source::table_test_utils::create_table_info;
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};

use super::*;
use crate::executor::test_utils::MockExecutor;
use crate::*;

#[tokio::test]
async fn test_update_executor() -> Result<()> {
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());

// Schema for mock executor.
let schema = schema_test_utils::ii();
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;
use risingwave_storage::StateStoreImpl;

use super::TaskId;
Expand All @@ -39,7 +39,7 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {
/// Whether `peer_addr` is in same as current task.
fn is_local_addr(&self, peer_addr: &HostAddr) -> bool;

fn source_manager(&self) -> SourceManagerRef;
fn source_manager(&self) -> TableSourceManagerRef;

fn state_store(&self) -> StateStoreImpl;

Expand Down Expand Up @@ -78,7 +78,7 @@ impl BatchTaskContext for ComputeNodeContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
self.env.source_manager_ref()
}

Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use risingwave_common::config::BatchConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::{SourceManager, SourceManagerRef};
use risingwave_source::{TableSourceManager, TableSourceManagerRef};
use risingwave_storage::StateStoreImpl;

use crate::executor::BatchTaskMetrics;
Expand All @@ -36,7 +36,7 @@ pub struct BatchEnvironment {
task_manager: Arc<BatchManager>,

/// Reference to the source manager. This is used to query the sources.
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,

/// Batch related configurations.
config: Arc<BatchConfig>,
Expand All @@ -57,7 +57,7 @@ pub struct BatchEnvironment {
impl BatchEnvironment {
#[allow(clippy::too_many_arguments)]
pub fn new(
source_manager: SourceManagerRef,
source_manager: TableSourceManagerRef,
task_manager: Arc<BatchManager>,
server_addr: HostAddr,
config: Arc<BatchConfig>,
Expand Down Expand Up @@ -108,11 +108,11 @@ impl BatchEnvironment {
}

#[expect(clippy::explicit_auto_deref)]
pub fn source_manager(&self) -> &dyn SourceManager {
pub fn source_manager(&self) -> &dyn TableSourceManager {
&*self.source_manager
}

pub fn source_manager_ref(&self) -> SourceManagerRef {
pub fn source_manager_ref(&self) -> TableSourceManagerRef {
self.source_manager.clone()
}

Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl StreamService for StreamServiceImpl {
request: Request<ForceStopActorsRequest>,
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
let req = request.into_inner();
self.mgr.stop_all_actors().await?;
let source_mgr = self.env.source_manager();
self.mgr.stop_all_actors(source_mgr).await?;
Ok(Response::new(ForceStopActorsResponse {
request_id: req.request_id,
status: None,
Expand Down
4 changes: 2 additions & 2 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_source::{MemSourceManager, SourceDescBuilder, SourceManagerRef};
use risingwave_source::{MemSourceManager, SourceDescBuilder, TableSourceManagerRef};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::streaming_table::state_table::StateTable;
Expand Down Expand Up @@ -92,7 +92,7 @@ async fn test_table_materialize() -> StreamResult<()> {
use risingwave_stream::executor::state_table_handler::default_source_internal_table;

let memory_state_store = MemoryStateStore::new();
let source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let source_table_id = TableId::default();
let schema = Schema {
fields: vec![
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_source::SourceManagerRef;
use risingwave_source::TableSourceManagerRef;

use crate::catalog::pg_catalog::SysCatalogReaderImpl;
use crate::session::{AuthContext, FrontendEnv};
Expand Down Expand Up @@ -58,7 +58,7 @@ impl BatchTaskContext for FrontendBatchTaskContext {
is_local_address(self.env.server_address(), peer_addr)
}

fn source_manager(&self) -> SourceManagerRef {
fn source_manager(&self) -> TableSourceManagerRef {
unimplemented!("not supported in local mode")
}

Expand Down
52 changes: 25 additions & 27 deletions src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::error::ErrorCode::{ConnectorError, InternalError, ProtocolError};
Expand All @@ -32,10 +33,11 @@ use crate::table::TableSource;
use crate::{ConnectorSource, SourceFormat, SourceImpl, SourceParserImpl};

pub type SourceRef = Arc<SourceImpl>;
type WeakSourceRef = Weak<SourceImpl>;

/// The local source manager on the compute node.
#[async_trait]
pub trait SourceManager: Debug + Sync + Send {
pub trait TableSourceManager: Debug + Sync + Send {
PanQL marked this conversation as resolved.
Show resolved Hide resolved
fn get_source(&self, source_id: &TableId) -> Result<SourceDesc>;
fn insert_source(&self, table_id: &TableId, info: &TableSourceInfo) -> SourceDesc;
fn try_drop_source(&self, source_id: &TableId);
Expand Down Expand Up @@ -111,12 +113,12 @@ pub struct SourceDesc {
pub pk_column_ids: Vec<i32>,
}

pub type SourceManagerRef = Arc<dyn SourceManager>;
pub type TableSourceManagerRef = Arc<dyn TableSourceManager>;

#[derive(Debug, Default)]
struct MemSourceManagerInner {
sources: HashMap<TableId, SourceDesc>,
source_actor_num: HashMap<TableId, usize>,
source_ref_count: HashMap<TableId, WeakSourceRef>,
PanQL marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug)]
Expand All @@ -130,22 +132,16 @@ pub struct MemSourceManager {
}

#[async_trait]
impl SourceManager for MemSourceManager {
impl TableSourceManager for MemSourceManager {
fn get_source(&self, table_id: &TableId) -> Result<SourceDesc> {
let inner = self.inner.lock();
inner.sources.get(table_id).cloned().ok_or_else(|| {
InternalError(format!("Get source table id not exists: {:?}", table_id)).into()
})
}

fn insert_table_source(
&self,
table_id: &TableId,
info: &TableSourceInfo,
) -> Result<SourceDesc> {
fn insert_source(&self, table_id: &TableId, info: &TableSourceInfo) -> SourceDesc {
let mut inner = self.inner.lock();
let actor_num = inner.source_actor_num.entry(*table_id).or_insert(0usize);
*actor_num += 1;
let desc = inner.sources.entry(*table_id).or_insert_with(|| {
let columns = info
.columns
Expand All @@ -166,18 +162,20 @@ impl SourceManager for MemSourceManager {
metrics: self.metrics.clone(),
}
});
Ok(desc.clone())
let res = desc.clone();
let weak_ref = Arc::downgrade(&res.source);
inner.source_ref_count.entry(*table_id).or_insert(weak_ref);
res
}

fn try_drop_source(&self, source_id: &TableId) {
let mut inner = self.inner.lock();
let actor_num = inner
.source_actor_num
.get_mut(source_id)
.expect("double release in local source manager!");
*actor_num -= 1;
if *actor_num == 0 {
inner.sources.remove(source_id);
if let Some(weak_ref) = inner.source_ref_count.get(source_id) {
// No other replica but the one in inner.source, can remove
if weak_ref.strong_count() == 1 {
inner.sources.remove(source_id);
inner.source_ref_count.remove(source_id);
}
}
}

Expand Down Expand Up @@ -214,11 +212,11 @@ impl MemSourceManager {
pub struct SourceDescBuilder {
id: TableId,
info: ProstSourceInfo,
mgr: SourceManagerRef,
mgr: TableSourceManagerRef,
}

impl SourceDescBuilder {
pub fn new(id: TableId, info: &ProstSourceInfo, mgr: &SourceManagerRef) -> Self {
pub fn new(id: TableId, info: &ProstSourceInfo, mgr: &TableSourceManagerRef) -> Self {
Self {
id,
info: info.clone(),
Expand All @@ -235,15 +233,15 @@ impl SourceDescBuilder {
}

fn build_table_source(
mgr: &SourceManagerRef,
mgr: &TableSourceManagerRef,
table_id: &TableId,
info: &TableSourceInfo,
) -> Result<SourceDesc> {
mgr.insert_table_source(table_id, info)
Ok(mgr.insert_source(table_id, info))
}

async fn build_stream_source(
mgr: &SourceManagerRef,
mgr: &TableSourceManagerRef,
info: &StreamSourceInfo,
) -> Result<SourceDesc> {
let format = match info.get_row_format()? {
Expand Down Expand Up @@ -344,7 +342,7 @@ mod tests {
};
let source_id = TableId::default();

let mem_source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let mem_source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let source_builder =
SourceDescBuilder::new(source_id, &Info::StreamSource(info), &mem_source_manager);
let source = source_builder.build().await;
Expand Down Expand Up @@ -391,7 +389,7 @@ mod tests {

let _keyspace = Keyspace::table_root(MemoryStateStore::new(), &table_id);

let mem_source_manager: SourceManagerRef = Arc::new(MemSourceManager::default());
let mem_source_manager: TableSourceManagerRef = Arc::new(MemSourceManager::default());
let source_builder =
SourceDescBuilder::new(table_id, &Info::TableSource(info), &mem_source_manager);
let res = source_builder.build().await;
Expand Down
Loading