Skip to content

Commit

Permalink
feat: hint options for gRPC isnert
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jul 29, 2024
1 parent 9a5fa49 commit 0b88d71
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 13 deletions.
11 changes: 10 additions & 1 deletion src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,18 @@ impl Inserter {
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<TableRef> {
let mut hint_options = vec![];
let options: &[(&str, &str)] = match create_type {
AutoCreateTableType::Logical(_) => unreachable!(),
AutoCreateTableType::Physical => &[],
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
hint_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
hint_options.push((MERGE_MODE_KEY, merge_mode));
}
hint_options.as_slice()
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")],
Expand Down
44 changes: 41 additions & 3 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use futures::StreamExt;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{cancellation, TonicResult};

pub const GREPTIME_DB_HEADER_HINT_PREFIX: &str = "x-greptime-hint:";

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
}
Expand All @@ -33,6 +36,31 @@ impl DatabaseService {
pub(crate) fn new(handler: GreptimeRequestHandler) -> Self {
Self { handler }
}

fn extract_hints(&self, metadata: &MetadataMap) -> Vec<(String, String)> {
metadata
.iter()
.filter_map(|kv| {
let KeyAndValueRef::Ascii(key, value) = kv else {
return None;
};
let key = key.as_str();
if !key.starts_with(GREPTIME_DB_HEADER_HINT_PREFIX) {
return None;
}
let Ok(value) = value.to_str() else {
// Simply return None for non-string values.
return None;
};
// Safety: we already checked the prefix.
let new_key = key
.strip_prefix(GREPTIME_DB_HEADER_HINT_PREFIX)
.unwrap()
.to_string();
Some((new_key, value.to_string()))
})
.collect()
}
}

#[async_trait]
Expand All @@ -42,10 +70,15 @@ impl GreptimeDatabase for DatabaseService {
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let remote_addr = request.remote_addr();
let hints = self.extract_hints(request.metadata());
debug!(
"GreptimeDatabase::Handle: request from {:?} with hints: {:?}",
remote_addr, hints
);
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let output = handler.handle_request(request, hints).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
Expand Down Expand Up @@ -83,14 +116,19 @@ impl GreptimeDatabase for DatabaseService {
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let remote_addr = request.remote_addr();
let hints = self.extract_hints(request.metadata());
debug!(
"GreptimeDatabase::HandleRequests: request from {:?} with hints: {:?}",
remote_addr, hints
);
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl FlightCraft for GreptimeRequestHandler {
request_type = get_request_type(&request)
);
async {
let output = self.handle_request(request).await?;
let output = self.handle_request(request, Default::default()).await?;
let stream: Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync>> =
to_flight_data_stream(output, TracingContext::from_current_span());
Ok(Response::new(stream))
Expand Down
23 changes: 16 additions & 7 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ impl GreptimeRequestHandler {
}

#[tracing::instrument(skip_all, fields(protocol = "grpc", request_type = get_request_type(&request)))]
pub(crate) async fn handle_request(&self, request: GreptimeRequest) -> Result<Output> {
pub(crate) async fn handle_request(
&self,
request: GreptimeRequest,
hints: Vec<(String, String)>,
) -> Result<Output> {
let query = request.request.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;

let header = request.header.as_ref();
let query_ctx = create_query_context(header);
let query_ctx = create_query_context(header, hints);
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
query_ctx.set_current_user(user_info);

Expand Down Expand Up @@ -164,7 +168,10 @@ pub(crate) async fn auth(
})
}

pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryContextRef {
pub(crate) fn create_query_context(
header: Option<&RequestHeader>,
extensions: Vec<(String, String)>,
) -> QueryContextRef {
let (catalog, schema) = header
.map(|header| {
// We provide dbname field in newer versions of protos/sdks
Expand Down Expand Up @@ -193,12 +200,14 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
)
});
let timezone = parse_timezone(header.map(|h| h.timezone.as_str()));
QueryContextBuilder::default()
let mut ctx_builder = QueryContextBuilder::default()
.current_catalog(catalog)
.current_schema(schema)
.timezone(timezone)
.build()
.into()
.timezone(timezone);
for (key, value) in extensions {
ctx_builder = ctx_builder.set_extension(key, value);
}
ctx_builder.build().into()
}

/// Histogram timer for handling gRPC request.
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl PrometheusGateway for PrometheusGatewayService {
};

let header = inner.header.as_ref();
let query_ctx = create_query_context(header);
let query_ctx = create_query_context(header, Default::default());
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
query_ctx.set_current_user(user_info);

Expand Down

0 comments on commit 0b88d71

Please sign in to comment.