Skip to content

Commit

Permalink
chore: minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Dec 13, 2024
1 parent a98411d commit c6e0a66
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 86 deletions.
8 changes: 4 additions & 4 deletions src/catalog/src/system_schema/information_schema/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl InformationSchemaFlowsBuilder {
.flow_names(&catalog_name)
.await;

let flow_state_size = {
let flow_stat = {
let information_extension =
utils::information_extension(&self.catalog_manager).unwrap();
information_extension.flow_stats().await?.clone()
Expand All @@ -236,7 +236,7 @@ impl InformationSchemaFlowsBuilder {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_state_size)?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)?;
}

self.finish()
Expand All @@ -247,7 +247,7 @@ impl InformationSchemaFlowsBuilder {
predicates: &Predicates,
flow_id: FlowId,
flow_info: FlowInfoValue,
flow_state_size: &Option<FlowStat>,
flow_stat: &Option<FlowStat>,
) -> Result<()> {
let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
Expand All @@ -263,7 +263,7 @@ impl InformationSchemaFlowsBuilder {
self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id));
self.state_sizes.push(
flow_state_size
flow_stat
.as_ref()
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
);
Expand Down
5 changes: 0 additions & 5 deletions src/common/meta/src/key/flow/flow_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ pub struct FlowStateManager {
in_memory: KvBackendRef,
}

#[async_trait::async_trait]
pub trait LocalFlowStateReporter: Send + Sync {
async fn report(&self) -> FlowStateValue;
}

impl FlowStateManager {
pub fn new(in_memory: KvBackendRef) -> Self {
Self { in_memory }
Expand Down
59 changes: 5 additions & 54 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::time::{Duration, Instant, SystemTime};
use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests};
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_meta::key::flow::flow_state::{FlowStat, FlowStateValue, LocalFlowStateReporter};
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
Expand Down Expand Up @@ -52,17 +51,16 @@ use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowOutputTypeMismatchSnafu, InternalSnafu,
TableNotFoundSnafu, UnexpectedSnafu,
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, TableNotFoundSnafu,
UnexpectedSnafu,
};
use crate::expr::{Batch, GlobalId};
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, Row, BATCH_SIZE};

// TODO(discord9): change type mismatch information to a more user friendly message in this PR

mod flownode_impl;
mod parse_expr;
mod stat;
#[cfg(test)]
mod tests;
mod util;
Expand Down Expand Up @@ -269,16 +267,8 @@ impl FlowWorkerManager {
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));

let first_row = reqs
.first()
.and_then(|r| match r {
DiffRequest::Insert(rows) => rows.first(),
DiffRequest::Delete(rows) => rows.first(),
})
.map(|(row, _ts)| row);
let (is_ts_placeholder, proto_schema) = self
.try_fetch_or_create_table(&table_name, first_row)
.await?;
let (is_ts_placeholder, proto_schema) =
self.try_fetch_or_create_table(&table_name).await?;
let schema_len = proto_schema.len();

trace!(
Expand Down Expand Up @@ -426,7 +416,6 @@ impl FlowWorkerManager {
async fn try_fetch_or_create_table(
&self,
table_name: &TableName,
first_row: Option<&Row>,
) -> Result<(bool, Vec<api::v1::ColumnSchema>), Error> {
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
Expand Down Expand Up @@ -536,51 +525,13 @@ impl FlowWorkerManager {

(primary_keys, with_auto_added_col, no_time_index)
};
if let Some(first_row) = first_row {
for (expected_type, actual_type) in schema.iter().zip(first_row.iter()) {
ensure!(
expected_type.data_type == actual_type.data_type() || actual_type.is_null(),
FlowOutputTypeMismatchSnafu {
expected: expected_type.data_type.clone(),
actual: actual_type.data_type()
}
);
}
}

let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Ok((is_ts_placeholder, proto_schema))
}
}

#[async_trait::async_trait]
impl LocalFlowStateReporter for FlowWorkerManager {
async fn report(&self) -> common_meta::key::flow::flow_state::FlowStateValue {
FlowStateValue::new(self.gen_state_report().await.state_size)
}
}

/// Flow Runtime related methods
impl FlowWorkerManager {
pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
for worker in self.worker_handles.iter() {
let worker = worker.lock().await;
match worker.get_state_size().await {
Ok(state_size) => {
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
}
Err(err) => {
common_telemetry::error!(err; "Get state size error");
}
}
}

FlowStat {
state_size: full_report,
}
}

/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
///
/// if heartbeat task is shutdown, this future will exit too
Expand Down
40 changes: 40 additions & 0 deletions src/flow/src/adapter/stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use common_meta::key::flow::flow_state::FlowStat;

use crate::FlowWorkerManager;

impl FlowWorkerManager {
pub async fn gen_state_report(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
for worker in self.worker_handles.iter() {
let worker = worker.lock().await;
match worker.get_state_size().await {
Ok(state_size) => {
full_report.extend(state_size.into_iter().map(|(k, v)| (k as u32, v)))
}
Err(err) => {
common_telemetry::error!(err; "Get flow stat size error");
}
}
}

FlowStat {
state_size: full_report,
}
}
}
17 changes: 1 addition & 16 deletions src/flow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_error::ext::BoxedError;
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};

use crate::adapter::FlowId;
Expand All @@ -46,18 +45,6 @@ pub enum Error {
location: Location,
},

#[snafu(display(
"Flow output type mismatch: expected: {:?}, actual: {:?}",
expected,
actual
))]
FlowOutputTypeMismatch {
expected: ConcreteDataType,
actual: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},

/// TODO(discord9): add detailed location of column
#[snafu(display("Failed to eval stream"))]
Eval {
Expand Down Expand Up @@ -226,9 +213,7 @@ impl ErrorExt for Error {
source.status_code()
}
Self::MetaClientInit { source, .. } => source.status_code(),
Self::ParseAddr { .. } | Self::FlowOutputTypeMismatch { .. } => {
StatusCode::InvalidArguments
}
Self::ParseAddr { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
12 changes: 6 additions & 6 deletions tests/cases/standalone/common/flow/flow_basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -1047,15 +1047,15 @@ FROM

-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
1
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;

+----------+
| Int64(1) |
+----------+
| 1 |
+----------+
+--------------+
| active_flows |
+--------------+
| 1 |
+--------------+

DROP FLOW requests_long_term;

Expand Down
2 changes: 1 addition & 1 deletion tests/cases/standalone/common/flow/flow_basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ FROM

-- Test if FLOWS table works, but don't care about the result since it vary from runs
SELECT
1
count(CASE WHEN state_size > 0 THEN 1 ELSE 0 END) as active_flows,
FROM
INFORMATION_SCHEMA.FLOWS;

Expand Down

0 comments on commit c6e0a66

Please sign in to comment.