Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): remove drop_source rpc in CN #5849

Merged
merged 16 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ message BroadcastActorInfoTableResponse {
common.Status status = 1;
}

message DropSourceRequest {
uint32 source_id = 1;
}

message DropSourceResponse {
common.Status status = 1;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
}
Expand All @@ -119,7 +111,6 @@ service StreamService {
rpc DropActors(DropActorsRequest) returns (DropActorsResponse);
rpc ForceStopActors(ForceStopActorsRequest) returns (ForceStopActorsResponse);
rpc InjectBarrier(InjectBarrierRequest) returns (InjectBarrierResponse);
rpc DropSource(DropSourceRequest) returns (DropSourceResponse);
rpc BarrierComplete(BarrierCompleteRequest) returns (BarrierCompleteResponse);
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
}
Expand Down
22 changes: 2 additions & 20 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use async_stack_trace::StackTrace;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::error::tonic_err;
use risingwave_pb::stream_service::barrier_complete_response::GroupedSstableInfo;
use risingwave_pb::stream_service::stream_service_server::StreamService;
Expand Down Expand Up @@ -103,7 +102,8 @@ impl StreamService for StreamServiceImpl {
) -> std::result::Result<Response<DropActorsResponse>, Status> {
let req = request.into_inner();
let actors = req.actor_ids;
self.mgr.drop_actor(&actors)?;
let source_mgr = self.env.source_manager();
self.mgr.drop_actor(source_mgr, &actors)?;
Ok(Response::new(DropActorsResponse {
request_id: req.request_id,
status: None,
Expand Down Expand Up @@ -197,22 +197,4 @@ impl StreamService for StreamServiceImpl {

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

#[cfg_attr(coverage, no_coverage)]
async fn drop_source(
&self,
request: Request<DropSourceRequest>,
) -> Result<Response<DropSourceResponse>, Status> {
let id = request.into_inner().source_id;
let id = TableId::new(id); // TODO: use SourceId instead

self.env
.source_manager()
.drop_source(&id)
.map_err(tonic_err)?;

tracing::debug!(id = %id, "drop source");

Ok(Response::new(DropSourceResponse { status: None }))
}
}
7 changes: 0 additions & 7 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,6 @@ mod tests {
Ok(Response::new(InjectBarrierResponse::default()))
}

async fn drop_source(
&self,
_request: Request<DropSourceRequest>,
) -> std::result::Result<Response<DropSourceResponse>, Status> {
unimplemented!()
}

async fn barrier_complete(
&self,
_request: Request<BarrierCompleteRequest>,
Expand Down
1 change: 0 additions & 1 deletion src/rpc_client/src/stream_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ macro_rules! for_all_stream_rpc {
,{ 0, drop_actors, DropActorsRequest, DropActorsResponse }
,{ 0, force_stop_actors, ForceStopActorsRequest, ForceStopActorsResponse}
,{ 0, inject_barrier, InjectBarrierRequest, InjectBarrierResponse }
,{ 0, drop_source, DropSourceRequest, DropSourceResponse }
,{ 0, barrier_complete, BarrierCompleteRequest, BarrierCompleteResponse }
,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse }
}
Expand Down
113 changes: 55 additions & 58 deletions src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use parking_lot::{Mutex, MutexGuard};
use parking_lot::Mutex;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::ensure;
use risingwave_common::error::ErrorCode::{ConnectorError, InternalError, ProtocolError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
Expand All @@ -38,14 +37,11 @@ pub type SourceRef = Arc<SourceImpl>;
#[async_trait]
pub trait SourceManager: Debug + Sync + Send {
PanQL marked this conversation as resolved.
Show resolved Hide resolved
fn get_source(&self, source_id: &TableId) -> Result<SourceDesc>;
fn drop_source(&self, source_id: &TableId) -> Result<()>;

/// Clear sources, this is used when failover happens.
fn clear_sources(&self) -> Result<()>;
fn insert_source(&self, table_id: &TableId, info: &TableSourceInfo) -> SourceDesc;
fn try_drop_source(&self, source_id: &TableId);

fn metrics(&self) -> Arc<SourceMetrics>;
fn msg_buf_size(&self) -> usize;
fn get_sources(&self) -> Result<MutexGuard<'_, HashMap<TableId, SourceDesc>>>;
}

/// `SourceColumnDesc` is used to describe a column in the Source and is used as the column
Expand Down Expand Up @@ -117,9 +113,15 @@ pub struct SourceDesc {

pub type SourceManagerRef = Arc<dyn SourceManager>;

#[derive(Debug, Default)]
struct MemSourceManagerInner {
sources: HashMap<TableId, SourceDesc>,
source_actor_num: HashMap<TableId, usize>,
}

#[derive(Debug)]
pub struct MemSourceManager {
sources: Mutex<HashMap<TableId, SourceDesc>>,
inner: Mutex<MemSourceManagerInner>,
/// local source metrics
metrics: Arc<SourceMetrics>,
/// The capacity of the chunks in the channel that connects between `ConnectorSource` and
Expand All @@ -130,27 +132,53 @@ pub struct MemSourceManager {
#[async_trait]
impl SourceManager for MemSourceManager {
fn get_source(&self, table_id: &TableId) -> Result<SourceDesc> {
let sources = self.get_sources()?;
sources.get(table_id).cloned().ok_or_else(|| {
let inner = self.inner.lock();
inner.sources.get(table_id).cloned().ok_or_else(|| {
InternalError(format!("Get source table id not exists: {:?}", table_id)).into()
})
}

fn drop_source(&self, table_id: &TableId) -> Result<()> {
let mut sources = self.get_sources()?;
ensure!(
sources.contains_key(table_id),
"Source does not exist: {:?}",
table_id
);
sources.remove(table_id);
Ok(())
fn insert_table_source(
&self,
table_id: &TableId,
info: &TableSourceInfo,
) -> Result<SourceDesc> {
let mut inner = self.inner.lock();
let actor_num = inner.source_actor_num.entry(*table_id).or_insert(0usize);
*actor_num += 1;
PanQL marked this conversation as resolved.
Show resolved Hide resolved
let desc = inner.sources.entry(*table_id).or_insert_with(|| {
let columns = info
.columns
.iter()
.cloned()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()))
.collect_vec();
let row_id_index = info.row_id_index.as_ref().map(|index| index.index as _);
let pk_column_ids = info.pk_column_ids.clone();

// Table sources do not need columns and format
SourceDesc {
columns: columns.iter().map(SourceColumnDesc::from).collect(),
source: Arc::new(SourceImpl::Table(TableSource::new(columns))),
format: SourceFormat::Invalid,
row_id_index,
pk_column_ids,
metrics: self.metrics.clone(),
}
});
Ok(desc.clone())
}

fn clear_sources(&self) -> Result<()> {
let mut sources = self.get_sources()?;
sources.clear();
Ok(())
fn try_drop_source(&self, source_id: &TableId) {
let mut inner = self.inner.lock();
let actor_num = inner
.source_actor_num
.get_mut(source_id)
.expect("double release in local source manager!");
PanQL marked this conversation as resolved.
Show resolved Hide resolved
*actor_num -= 1;
if *actor_num == 0 {
inner.sources.remove(source_id);
}
}

fn metrics(&self) -> Arc<SourceMetrics> {
Expand All @@ -160,16 +188,12 @@ impl SourceManager for MemSourceManager {
fn msg_buf_size(&self) -> usize {
self.connector_message_buffer_size
}

fn get_sources(&self) -> Result<MutexGuard<'_, HashMap<TableId, SourceDesc>>> {
Ok(self.sources.lock())
}
}

impl Default for MemSourceManager {
fn default() -> Self {
MemSourceManager {
sources: Default::default(),
inner: Default::default(),
metrics: Default::default(),
connector_message_buffer_size: 16,
}
Expand All @@ -179,7 +203,7 @@ impl Default for MemSourceManager {
impl MemSourceManager {
pub fn new(metrics: Arc<SourceMetrics>, connector_message_buffer_size: usize) -> Self {
MemSourceManager {
sources: Mutex::new(HashMap::new()),
inner: Mutex::new(MemSourceManagerInner::default()),
metrics,
connector_message_buffer_size,
}
Expand Down Expand Up @@ -215,32 +239,7 @@ impl SourceDescBuilder {
table_id: &TableId,
info: &TableSourceInfo,
) -> Result<SourceDesc> {
let mut sources = mgr.get_sources()?;
if let Some(source_desc) = sources.get(table_id) {
return Ok(source_desc.clone());
}

let columns: Vec<_> = info
.columns
.iter()
.cloned()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()))
.collect();
let row_id_index = info.row_id_index.as_ref().map(|index| index.index as _);
let pk_column_ids = info.pk_column_ids.clone();

// Table sources do not need columns and format
let desc = SourceDesc {
columns: columns.iter().map(SourceColumnDesc::from).collect(),
source: Arc::new(SourceImpl::Table(TableSource::new(columns))),
format: SourceFormat::Invalid,
row_id_index,
pk_column_ids,
metrics: mgr.metrics(),
};

sources.insert(*table_id, desc.clone());
Ok(desc)
mgr.insert_table_source(table_id, info)
}

async fn build_stream_source(
Expand Down Expand Up @@ -402,9 +401,7 @@ mod tests {
let get_source_res = mem_source_manager.get_source(&table_id);
assert!(get_source_res.is_ok());

// drop source
let drop_source_res = mem_source_manager.drop_source(&table_id);
assert!(drop_source_res.is_ok());
mem_source_manager.try_drop_source(&table_id);
let get_source_res = mem_source_manager.get_source(&table_id);
assert!(get_source_res.is_err());

Expand Down
16 changes: 11 additions & 5 deletions src/stream/src/from_proto/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use anyhow::anyhow;
use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
use risingwave_common::ensure;
use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::source_node::Info as SourceNodeInfo;
use risingwave_source::SourceDescBuilder;
Expand All @@ -40,11 +41,16 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.register_sender(params.actor_context.id, sender);

let source_id = TableId::new(node.source_id);
let source_builder = SourceDescBuilder::new(
source_id,
node.get_info()?,
&params.env.source_manager_ref(),
);
let node_info = node.get_info()?;
// As stream source desc is not stored in local source mgr now,
// we only recode the actor holding table source.
if matches!(node_info, SourceNodeInfo::TableSource(_)) {
let actor_id = params.actor_context.id;
ensure!(!stream.actor_tables.contains_key(&actor_id));
stream.actor_tables.insert(actor_id, source_id);
}
let source_builder =
SourceDescBuilder::new(source_id, node_info, &params.env.source_manager_ref());

let column_ids: Vec<_> = node
.get_column_ids()
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_hummock_sdk::LocalSstableInfo;
use risingwave_pb::common::ActorInfo;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::StreamNode;
use risingwave_pb::{stream_plan, stream_service};
use risingwave_source::SourceManager;
use risingwave_storage::{dispatch_state_store, StateStore, StateStoreImpl};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -65,6 +67,9 @@ pub struct LocalStreamManagerCore {
/// Stores all actor information, taken after actor built.
actors: HashMap<ActorId, stream_plan::StreamActor>,

/// Stores all actor's table_id information.
pub(crate) actor_tables: HashMap<ActorId, TableId>,
PanQL marked this conversation as resolved.
Show resolved Hide resolved

/// Stores all actor tokio runtime monitoring tasks.
actor_monitor_tasks: HashMap<ActorId, ActorHandle>,

Expand Down Expand Up @@ -284,10 +289,17 @@ impl LocalStreamManager {
Ok(())
}

pub fn drop_actor(&self, actors: &[ActorId]) -> StreamResult<()> {
pub fn drop_actor(
&self,
source_mgr: &dyn SourceManager,
actors: &[ActorId],
) -> StreamResult<()> {
let mut core = self.core.lock();
for id in actors {
core.drop_actor(*id);
if let Some(table_id) = core.actor_tables.remove(id) {
PanQL marked this conversation as resolved.
Show resolved Hide resolved
source_mgr.try_drop_source(&table_id);
PanQL marked this conversation as resolved.
Show resolved Hide resolved
}
}
tracing::debug!(actors = ?actors, "drop actors");
Ok(())
Expand Down Expand Up @@ -398,6 +410,7 @@ impl LocalStreamManagerCore {
handles: HashMap::new(),
context: Arc::new(context),
actors: HashMap::new(),
actor_tables: HashMap::new(),
actor_monitor_tasks: HashMap::new(),
state_store,
streaming_metrics,
Expand Down