Skip to content

Commit

Permalink
feat: store peer info in TableFlowValue
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jul 4, 2024
1 parent ee9a5d7 commit baa23aa
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 53 deletions.
71 changes: 49 additions & 22 deletions src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
Expand All @@ -26,9 +26,10 @@ use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::FlownodeId;

type FlownodeSet = HashSet<FlownodeId>;
type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;

pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;

Expand All @@ -53,13 +54,14 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
Box::pin(async move {
table_flow_manager
.flows(table_id)
.map_ok(|key| key.flownode_id())
.try_collect::<HashSet<_>>()
.map_ok(|(key, value)| (key.flownode_id(), value.peer))
.try_collect::<HashMap<_, _>>()
.await
// We must cache the `HashSet` even if it's empty,
// to avoid future requests to the remote storage next time;
// If the value is added to the remote storage,
// we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
.map(Arc::new)
.map(Some)
})
})
Expand All @@ -69,21 +71,23 @@ async fn handle_create_flow(
cache: &Cache<TableId, FlownodeSet>,
CreateFlow {
source_table_ids,
flownode_ids,
flownode_peers,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
set.extend(flownode_ids.iter().cloned());
let mut map = entry.into_value().as_ref().clone();
map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));

Op::Put(set)
Op::Put(Arc::new(map))
}
None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())),
None => Op::Put(Arc::new(HashMap::from_iter(
flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
))),
},
)
.await;
Expand All @@ -101,14 +105,14 @@ async fn handle_drop_flow(
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
let mut set = entry.into_value().as_ref().clone();
for flownode_id in flownode_ids {
set.remove(flownode_id);
}

Op::Put(set)
Op::Put(Arc::new(set))
}
None => {
// Do nothing
Expand Down Expand Up @@ -140,7 +144,7 @@ fn filter(ident: &CacheIdent) -> bool {

#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
Expand All @@ -150,8 +154,10 @@ mod tests {
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;

#[tokio::test]
async fn test_cache_empty_set() {
Expand Down Expand Up @@ -184,16 +190,31 @@ mod tests {
comment: "comment".to_string(),
options: Default::default(),
},
vec![],
(1..=3)
.map(|i| {
(
(i - 1) as u32,
FlowRouteValue {
peer: Peer::empty(i),
},
)
})
.collect::<Vec<_>>(),
)
.await
.unwrap();
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
);
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
}
Expand All @@ -205,7 +226,7 @@ mod tests {
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flownode_peers: (1..=5).map(Peer::empty).collect(),
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
Expand All @@ -222,11 +243,11 @@ mod tests {
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
flownode_peers: (1..=5).map(Peer::empty).collect(),
}),
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![11, 12],
flownode_peers: (11..=12).map(Peer::empty).collect(),
}),
];
cache.invalidate(&ident).await.unwrap();
Expand All @@ -241,8 +262,14 @@ mod tests {
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
assert_eq!(
set.as_ref().clone(),
HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
);
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl CreateFlowProcedure {
&ctx,
&[CacheIdent::CreateFlow(CreateFlow {
source_table_ids: self.data.source_table_ids.clone(),
flownode_ids: self.data.peers.iter().map(|peer| peer.id).collect(),
flownode_peers: self.data.peers.clone(),
})],
)
.await?;
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use table::table_name::TableName;
use crate::flow_name::FlowName;
use crate::key::schema_name::SchemaName;
use crate::key::FlowId;
use crate::peer::Peer;
use crate::{ClusterId, DatanodeId, FlownodeId};

#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -169,7 +170,7 @@ pub enum CacheIdent {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
pub flownode_peers: Vec<Peer>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
4 changes: 3 additions & 1 deletion src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use common_catalog::consts::{
use common_telemetry::warn;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use flow::flow_route::FlowRouteValue;
use flow::table_flow::TableFlowValue;
use lazy_static::lazy_static;
use regex::Regex;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -1193,7 +1194,8 @@ impl_table_meta_value! {
DatanodeTableValue,
FlowInfoValue,
FlowNameValue,
FlowRouteValue
FlowRouteValue,
TableFlowValue
}

impl_optional_meta_value! {
Expand Down
34 changes: 29 additions & 5 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
use common_telemetry::info;
use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
use snafu::{ensure, OptionExt};
use table_flow::TableFlowValue;

use self::flow_info::{FlowInfoKey, FlowInfoValue};
use self::flow_name::FlowNameKey;
Expand Down Expand Up @@ -159,17 +160,20 @@ impl FlowMetadataManager {

let create_flow_routes_txn = self
.flow_route_manager
.build_create_txn(flow_id, flow_routes)?;
.build_create_txn(flow_id, flow_routes.clone())?;

let create_flownode_flow_txn = self
.flownode_flow_manager
.build_create_txn(flow_id, flow_info.flownode_ids().clone());

let create_table_flow_txn = self.table_flow_manager.build_create_txn(
flow_id,
flow_info.flownode_ids().clone(),
flow_routes
.into_iter()
.map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
.collect(),
flow_info.source_table_ids(),
);
)?;

let txn = Txn::merge_all(vec![
create_flow_flow_name_txn,
Expand Down Expand Up @@ -354,7 +358,11 @@ mod tests {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
let flow_value = test_flow_info_value(
"flow",
[(0, 1u64), (1, 2u64)].into(),
vec![1024, 1025, 1026],
);
let flow_routes = vec![
(
1u32,
Expand Down Expand Up @@ -422,7 +430,23 @@ mod tests {
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_eq!(nodes, vec![TableFlowKey::new(table_id, 1, flow_id, 0)]);
assert_eq!(
nodes,
vec![
(
TableFlowKey::new(table_id, 1, flow_id, 1),
TableFlowValue {
peer: Peer::empty(1)
}
),
(
TableFlowKey::new(table_id, 2, flow_id, 2),
TableFlowValue {
peer: Peer::empty(2)
}
)
]
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/key/flow/flow_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl FlowRouteManager {

/// Builds a create flow routes transaction.
///
/// Puts `__flow/route/{flownode_id}/{partitions}` keys.
/// Puts `__flow/route/{flow_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
&self,
flow_id: FlowId,
Expand Down
54 changes: 34 additions & 20 deletions src/common/meta/src/key/flow/table_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use table::metadata::TableId;

use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey};
use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey, TableMetaValue};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::peer::Peer;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
Expand Down Expand Up @@ -166,9 +168,16 @@ impl<'a> MetaKey<'a, TableFlowKeyInner> for TableFlowKeyInner {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TableFlowValue {
pub(crate) peer: Peer,
}

/// Decodes `KeyValue` to [TableFlowKey].
pub fn table_flow_decoder(kv: KeyValue) -> Result<TableFlowKey> {
TableFlowKey::from_bytes(&kv.key)
pub fn table_flow_decoder(kv: KeyValue) -> Result<(TableFlowKey, TableFlowValue)> {
let key = TableFlowKey::from_bytes(&kv.key)?;
let value = TableFlowValue::try_from_raw_value(&kv.value)?;
Ok((key, value))
}

pub type TableFlowManagerRef = Arc<TableFlowManager>;
Expand All @@ -187,7 +196,10 @@ impl TableFlowManager {
/// Retrieves all [TableFlowKey]s of the specified `table_id`.
///
/// TODO(discord9): add cache for it since range request does not support cache.
pub fn flows(&self, table_id: TableId) -> BoxStream<'static, Result<TableFlowKey>> {
pub fn flows(
&self,
table_id: TableId,
) -> BoxStream<'static, Result<(TableFlowKey, TableFlowValue)>> {
let start_key = TableFlowKey::range_start_key(table_id);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
Expand All @@ -203,25 +215,27 @@ impl TableFlowManager {
/// Builds a create table flow transaction.
///
/// Puts `__flow/source_table/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlownodeId)>>(
pub fn build_create_txn(
&self,
flow_id: FlowId,
flownode_ids: I,
table_flow_values: Vec<(FlowPartitionId, TableFlowValue)>,
source_table_ids: &[TableId],
) -> Txn {
let txns = flownode_ids
.into_iter()
.flat_map(|(partition_id, flownode_id)| {
source_table_ids.iter().map(move |table_id| {
TxnOp::Put(
TableFlowKey::new(*table_id, flownode_id, flow_id, partition_id).to_bytes(),
vec![],
)
})
})
.collect::<Vec<_>>();

Txn::new().and_then(txns)
) -> Result<Txn> {
let mut txns = Vec::with_capacity(source_table_ids.len() * table_flow_values.len());

for (partition_id, table_flow_value) in table_flow_values {
let flownode_id = table_flow_value.peer.id;
let value = table_flow_value.try_as_raw_value()?;
for source_table_id in source_table_ids {
txns.push(TxnOp::Put(
TableFlowKey::new(*source_table_id, flownode_id, flow_id, partition_id)
.to_bytes(),
value.clone(),
));
}
}

Ok(Txn::new().and_then(txns))
}
}

Expand Down
Loading

0 comments on commit baa23aa

Please sign in to comment.