From 8263c4f3cf7b6c6d9eb17ea54c0c50a1a17747de Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 23 Dec 2022 13:18:10 -0600 Subject: [PATCH] feat: implement new ingest structure Define new proto for the structure that gets sent from router to ingester and persisted in the ingester WAL. Create ingest_structure crate with functions to convert from line protocol to new proto structure while validating schema. Add function to convert new proto structure to RecordBatch. --- Cargo.lock | 166 +++- Cargo.toml | 1 + .../protos/influxdata/iox/wal/v1/wal.proto | 49 + ingest_structure/Cargo.toml | 40 + ingest_structure/src/lib.rs | 922 ++++++++++++++++++ iox_catalog/src/lib.rs | 6 +- 6 files changed, 1148 insertions(+), 36 deletions(-) create mode 100644 ingest_structure/Cargo.toml create mode 100644 ingest_structure/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 7ffa1d56b1d..880301b7db1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,7 @@ dependencies = [ "paste", "prost", "tokio", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -531,7 +531,7 @@ dependencies = [ "snafu", "test_helpers_end_to_end", "tokio", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -813,8 +813,11 @@ checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "time", + "wasm-bindgen", "winapi", ] @@ -940,7 +943,7 @@ dependencies = [ "reqwest", "thiserror", "tokio", - "tonic", + "tonic 0.9.2", "tower", "workspace-hack", ] @@ -1091,7 +1094,7 @@ checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" dependencies = [ "prost", "prost-types", - "tonic", + "tonic 0.9.2", "tracing-core", ] @@ -1114,7 +1117,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tracing", "tracing-core", "tracing-subscriber", @@ -2005,7 +2008,7 @@ dependencies = [ "prost", "prost-build", "serde", - "tonic", + "tonic 0.9.2", "tonic-build", "workspace-hack", ] @@ -2028,7 +2031,7 @@ checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -2080,7 +2083,7 @@ dependencies = [ "prost-build", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "tonic-build", "tower", "workspace-hack", @@ -2093,7 +2096,7 @@ dependencies = [ "prost", "prost-build", "prost-types", - "tonic", + "tonic 0.9.2", "tonic-build", "workspace-hack", ] @@ -2104,7 +2107,7 @@ version = "0.1.0" dependencies = [ "prost", "prost-build", - "tonic", + "tonic 0.9.2", "tonic-build", "workspace-hack", ] @@ -2576,7 +2579,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tokio_metrics_bridge", - "tonic", + "tonic 0.9.2", "trace_exporters", "trogging", "uuid", @@ -2607,7 +2610,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -2619,7 +2622,7 @@ dependencies = [ "generated_types", "observability_deps", "prost", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -2648,6 +2651,42 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "ingest_structure" +version = "0.1.0" +dependencies = [ + "arrow", + "arrow_util", + "assert_matches", + "async-trait", + "bytes", + "chrono", + "data_types", + "futures", + "generated_types", + "influxdb-line-protocol", + "iox_catalog", + "iox_tests", + "iox_time", + "metric", + "observability_deps", + "once_cell", + "parking_lot", + "paste", + "pretty_assertions", + "schema", + "serde", + "serde_json", + "serde_urlencoded", + "snafu", + "test_helpers", + "tokio", + "tokio-stream", + "tonic 0.8.3", + "trace", + "workspace-hack", +] + [[package]] name = "ingester" version = "0.1.0" @@ -2702,7 +2741,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "trace", "tracker", "uuid", @@ -2726,7 +2765,7 @@ dependencies = [ "query_functions", "serde", "snafu", - "tonic", + "tonic 0.9.2", "tonic-build", "workspace-hack", ] @@ -2760,7 +2799,7 @@ dependencies = [ "test_helpers", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "trace", "workspace-hack", ] @@ -3006,7 +3045,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic", + "tonic 0.9.2", "tonic-health", "tonic-reflection", "tower", @@ -3107,7 +3146,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "trace", "workspace-hack", ] @@ -3481,7 +3520,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.48.0", ] @@ -4496,7 +4535,7 @@ dependencies = [ "test_helpers", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "trace", "trace_http", "tracker", @@ -4779,7 +4818,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "trace", "trace_http", "workspace-hack", @@ -5025,7 +5064,7 @@ dependencies = [ "metric", "parking_lot", "predicate", - "tonic", + "tonic 0.9.2", "trace", "tracker", "workspace-hack", @@ -5041,7 +5080,7 @@ dependencies = [ "metric", "observability_deps", "tokio", - "tonic", + "tonic 0.9.2", "uuid", "workspace-hack", ] @@ -5071,7 +5110,7 @@ dependencies = [ "snafu", "test_helpers", "tokio", - "tonic", + "tonic 0.9.2", "trace", "trace_http", "tracker", @@ -5108,7 +5147,7 @@ dependencies = [ "test_helpers", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", "trace", "trace_http", "tracker", @@ -5127,7 +5166,7 @@ dependencies = [ "observability_deps", "paste", "tokio", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -5145,7 +5184,7 @@ dependencies = [ "observability_deps", "parquet_file", "tokio", - "tonic", + "tonic 0.9.2", "uuid", "workspace-hack", ] @@ -5160,7 +5199,7 @@ dependencies = [ "metric", "observability_deps", "tokio", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -5174,7 +5213,7 @@ dependencies = [ "metric", "observability_deps", "tokio", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -5184,7 +5223,7 @@ version = "0.1.0" dependencies = [ "generated_types", "observability_deps", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -5827,7 +5866,7 @@ dependencies = [ "test_helpers", "tokio", "tokio-util", - "tonic", + "tonic 0.9.2", "workspace-hack", ] @@ -5904,6 +5943,17 @@ dependencies = [ "libc", ] +[[package]] +name = "time" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -6059,6 +6109,38 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.9.2" @@ -6114,7 +6196,7 @@ dependencies = [ "prost", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -6127,7 +6209,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic", + "tonic 0.9.2", ] [[package]] @@ -6262,6 +6344,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.3" @@ -6536,6 +6628,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -6959,7 +7057,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic", + "tonic 0.9.2", "tower", "tracing", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 08358f30c3a..83b30be6f9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "influxdb_tsm", "influxdb2_client", "influxrpc_parser", + "ingest_structure", "ingester_query_grpc", "ingester_test_ctx", "ingester", diff --git a/generated_types/protos/influxdata/iox/wal/v1/wal.proto b/generated_types/protos/influxdata/iox/wal/v1/wal.proto index 0345073f54d..244ff759a88 100644 --- a/generated_types/protos/influxdata/iox/wal/v1/wal.proto +++ b/generated_types/protos/influxdata/iox/wal/v1/wal.proto @@ -44,4 +44,53 @@ message SequencedWalOp { message WalOpBatch { // the ops repeated SequencedWalOp ops = 1; +} + +// WAL operation with a sequence number, used to inform read buffers when to evict data +message NewWalOp { + uint32 namespace_id = 2; + + oneof op { + WriteBatch write = 3; + influxdata.iox.delete.v1.DeletePayload delete = 4; + PersistOp persist = 5; + } +} + +// A single write request that can add data to multiple tables and multiple partitions +// within each table. +message WriteBatch { + repeated TableBatch table_batches = 1; +} + +// A batch of writes for a table to one or more partitions. +message TableBatch { + uint32 table_id = 1; + repeated PartitionBatch partition_batches = 2; +} + +// A batch of rows to a given partition. +message PartitionBatch { + uint32 partition_id = 1; + repeated Row rows = 2; +} + +// A collection of values for a row. +message Row { + repeated Value values = 1; + uint64 sequence_number = 2; +} + +message Value { + uint32 column_id = 1; + oneof value { + int64 i64_value = 2; + double f64_value = 3; + uint64 u64_value = 4; + string string_value = 5; + string tag_value = 6; + bool bool_value = 7; + bytes bytes_value = 8; + int64 time_value = 9; + } } \ No newline at end of file diff --git a/ingest_structure/Cargo.toml b/ingest_structure/Cargo.toml new file mode 100644 index 00000000000..b2b08f08124 --- /dev/null +++ b/ingest_structure/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "ingest_structure" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +arrow = { workspace = true, features = ["prettyprint"] } +async-trait = "0.1" +bytes = "1.3" +chrono = "0.4" +data_types = { path = "../data_types" } +futures = "0.3.25" +generated_types = { path = "../generated_types" } +influxdb-line-protocol = { path = "../influxdb_line_protocol" } +iox_catalog = { path = "../iox_catalog" } +iox_time = { path = "../iox_time" } +metric = { path = "../metric" } +observability_deps = { path = "../observability_deps" } +parking_lot = "0.12" +schema = { path = "../schema" } +serde = "1.0" +serde_json = "1.0.91" +serde_urlencoded = "0.7" +snafu = "0.7" +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tonic = "0.8" +trace = { path = "../trace/" } +workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +arrow_util = { path = "../arrow_util" } +assert_matches = "1.5" +iox_tests = { path = "../iox_tests" } +once_cell = "1" +paste = "1.0.11" +pretty_assertions = "1.3.0" +test_helpers = { version = "0.1.0", path = "../test_helpers", features = ["future_timeout"] } +tokio-stream = { version = "0.1.11", default_features = false, features = [] } diff --git a/ingest_structure/src/lib.rs b/ingest_structure/src/lib.rs new file mode 100644 index 00000000000..fc8abc55a6c --- /dev/null +++ b/ingest_structure/src/lib.rs @@ -0,0 +1,922 @@ +//! This crate specifies the ingest structure for IOx. This is able to take line protocol and +//! convert it into the Proto definition data that is used for the router to write to ingesters, +//! which can then wrap that proto into a message that gets written into the WAL and a message +//! that gets sent to ingester read replicas. +//! +//! This also contains methods to convert that proto data into Arrow `RecordBatch` given the +//! table schema of the data. +use arrow::{ + array::{ + ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder, + StringDictionaryBuilder, TimestampNanosecondBuilder, UInt64Builder, + }, + datatypes::Int32Type, + record_batch::RecordBatch, +}; +use chrono::{TimeZone, Utc}; +use data_types::{ + ColumnType, NamespaceSchema, PartitionId, PartitionKey, TableId, TableSchema, +}; +use generated_types::influxdata::iox::wal::v1::{ + value::Value as ProtoOneOfValue, PartitionBatch as ProtoPartitionBatch, Row as ProtoRow, + TableBatch as ProtoTableBatch, Value as ProtoValue, WriteBatch as ProtoWriteBatch, +}; +use influxdb_line_protocol::{parse_lines, FieldValue, ParsedLine}; +use iox_catalog::interface::Catalog; +use iox_catalog::{interface::RepoCollection, TIME_COLUMN, table_load_or_create}; +use schema::{InfluxColumnType, InfluxFieldType, SchemaBuilder}; +use snafu::{ResultExt, Snafu}; +use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap, HashSet}, + mem, + sync::Arc, +}; + +#[derive(Debug, Snafu)] +#[allow(missing_copy_implementations, missing_docs)] +#[snafu(visibility(pub(crate)))] +pub enum Error { + #[snafu(display("error from catalog: {}", source))] + Catalog { + source: iox_catalog::interface::Error, + }, + + #[snafu(display("column {} is type {} but write has type {}", name, existing, new))] + ColumnTypeMismatch { + name: String, + existing: ColumnType, + new: ColumnType, + }, + + #[snafu(display( + "column {} appears multiple times in a single line. must be unique", + name + ))] + ColumnNameNotUnique { name: String }, + + #[snafu(display("Internal error converting schema: {}", source))] + InternalSchema { source: schema::builder::Error }, + + #[snafu(display("Unexpected column type"))] + UnexpectedColumnType, + + #[snafu(display("Unexpected arrow error: {}", source))] + Arrow { source: arrow::error::ArrowError }, + + #[snafu(display("error parsing line {} (1-based): {}", line, source))] + LineProtocol { + source: influxdb_line_protocol::Error, + line: usize, + }, +} + +/// A specialized `Error` for conversion errors +pub type Result = std::result::Result; + +const YEAR_MONTH_DAY_TIME_FORMAT: &str = "%Y-%m-%d"; + +/// Map of table id to partitions +#[derive(Default, Debug, Clone)] +pub struct TablePartitionMap { + /// the mapping of table id to the partitions + pub map: BTreeMap, +} + +/// Map of partition key to partition id +#[derive(Default, Debug, Clone)] +pub struct PartitionMap { + /// the mapping of partition key to partition id + pub map: BTreeMap, +} + +/// Generates the partition key for a given line or row +#[derive(Debug)] +pub struct Partitioner { + time_format: String, +} + +impl Partitioner { + /// Create a new time based partitioner using the time format + pub fn new_time_partitioner(time_format: impl Into) -> Self { + Self { + time_format: time_format.into(), + } + } + + /// Create a new time based partitioner that partitions by day + pub fn new_per_day_partitioner() -> Self { + Self::new_time_partitioner(YEAR_MONTH_DAY_TIME_FORMAT) + } + + /// Given a parsed line and a default time, generate the string partition key + pub fn partition_key_for_line(&self, line: &ParsedLine<'_>, default_time: i64) -> String { + let timestamp = line.timestamp.unwrap_or(default_time); + format!( + "{}", + Utc.timestamp_nanos(timestamp).format(&self.time_format) + ) + } +} + +/// Mapping of `TableId` to `TableBatch` returned as part of a `ValidationResult` +pub type TableBatchMap = HashMap; + +/// Result of the validation. If the NamespaceSchema or PartitionMap were updated, they will be +/// in the result. +#[derive(Debug, Default)] +pub struct ValidationResult { + /// If the namespace schema is updated with new tables or columns it will be here, which + /// can be used to update the cache. + pub schema: Option, + /// If new partitions are created, they will be in this updated map which can be used to + /// update the cache. + pub table_partition_map: Option, + /// The data as a map of `TableId` to `TableBatch` + pub table_batches: TableBatchMap, + /// Number of lines passed in + pub line_count: usize, + /// Number of fields passed in + pub field_count: usize, + /// Number of tags passed in + pub tag_count: usize, +} + +/// Converts the table_batches part of a `ValidationResult` into the `WriteBatch` proto that +/// can be sent to ingesters or written into the WAL. +pub fn table_batches_map_to_proto(table_batches: TableBatchMap) -> ProtoWriteBatch { + let table_batches: Vec<_> = table_batches + .into_iter() + .map(|(table_id, table_batch)| { + let partition_batches: Vec<_> = table_batch + .partition_batches + .into_iter() + .map(|(partition_id, rows)| ProtoPartitionBatch { + partition_id: partition_id.get() as u32, + rows, + }) + .collect(); + + ProtoTableBatch { + table_id: table_id.get() as u32, + partition_batches, + } + }) + .collect(); + + ProtoWriteBatch { table_batches } +} + +pub fn rows_persist_cost(rows: &[ProtoRow]) -> usize { + // initialize the cost with the size of the sequence number + let mut cost = rows.len() * mem::size_of::(); + + for r in rows { + for v in &r.values { + let value_cost = match &v.value { + Some(ProtoOneOfValue::I64Value(_)) => mem::size_of::(), + Some(ProtoOneOfValue::F64Value(_)) => mem::size_of::(), + Some(ProtoOneOfValue::U64Value(_)) => mem::size_of::(), + Some(ProtoOneOfValue::StringValue(s)) => mem::size_of::() * s.len(), + Some(ProtoOneOfValue::TagValue(t)) => mem::size_of::() * t.len(), + Some(ProtoOneOfValue::BoolValue(_)) => mem::size_of::(), + Some(ProtoOneOfValue::BytesValue(b)) => b.len(), + Some(ProtoOneOfValue::TimeValue(_)) => mem::size_of::(), + None => 0, + }; + + cost += value_cost; + } + } + + cost +} + +pub fn rows_to_record_batch(table_schema: &TableSchema, rows: &[ProtoRow]) -> Result { + let mut schema_builder = SchemaBuilder::new(); + let mut column_index: HashMap = HashMap::new(); + + for (column_name, column_schema) in table_schema.columns.iter() { + assert!(column_index + .insert( + column_schema.id.get() as u32, + ColumnData::new(column_name, column_schema.column_type, rows.len()) + ) + .is_none()); + schema_builder.influx_column(column_name, column_schema.column_type.into()); + } + + let schema = schema_builder + .build() + .context(InternalSchemaSnafu)? + .sort_fields_by_name(); + + for r in rows { + let mut value_added = HashSet::with_capacity(r.values.len()); + + for v in &r.values { + let column_data = column_index.get_mut(&v.column_id).unwrap(); + column_data.append_value(&v.value)?; + value_added.insert(v.column_id); + } + + // insert nulls into columns that didn't get values + for (id, data) in column_index.iter_mut() { + if value_added.get(id).is_none() { + data.append_null(); + } + } + } + + let mut cols: Vec<_> = column_index.into_values().collect(); + cols.sort_by(|a, b| Ord::cmp(&a.name, &b.name)); + let cols: Vec = cols.into_iter().map(|c| c.into_arrow()).collect(); + + let batch = RecordBatch::try_new(schema.as_arrow(), cols).context(ArrowSnafu)?; + Ok(batch) +} + +struct ColumnData<'a> { + name: &'a str, + builder: Builder, +} + +impl<'a> ColumnData<'a> { + fn new(name: &'a str, t: ColumnType, row_count: usize) -> Self { + match t { + ColumnType::I64 => Self { + name, + builder: Builder::I64(Int64Builder::with_capacity(row_count)), + }, + ColumnType::F64 => Self { + name, + builder: Builder::F64(Float64Builder::with_capacity(row_count)), + }, + ColumnType::U64 => Self { + name, + builder: Builder::U64(UInt64Builder::with_capacity(row_count)), + }, + ColumnType::Tag => Self { + name, + builder: Builder::Tag(StringDictionaryBuilder::new()), + }, + ColumnType::String => Self { + name, + builder: Builder::String(StringBuilder::new()), + }, + ColumnType::Time => Self { + name, + builder: Builder::Time(TimestampNanosecondBuilder::with_capacity(row_count)), + }, + ColumnType::Bool => Self { + name, + builder: Builder::Bool(BooleanBuilder::with_capacity(row_count)), + }, + } + } + + fn into_arrow(self) -> ArrayRef { + match self.builder { + Builder::Bool(mut b) => Arc::new(b.finish()), + Builder::I64(mut b) => Arc::new(b.finish()), + Builder::F64(mut b) => Arc::new(b.finish()), + Builder::U64(mut b) => Arc::new(b.finish()), + Builder::String(mut b) => Arc::new(b.finish()), + Builder::Tag(mut b) => Arc::new(b.finish()), + Builder::Time(mut b) => Arc::new(b.finish()), + } + } + + fn append_value(&mut self, v: &Option) -> Result<()> { + match v { + Some(v) => match (v, &mut self.builder) { + (ProtoOneOfValue::I64Value(v), Builder::I64(b)) => b.append_value(*v), + (ProtoOneOfValue::F64Value(v), Builder::F64(b)) => b.append_value(*v), + (ProtoOneOfValue::U64Value(v), Builder::U64(b)) => b.append_value(*v), + (ProtoOneOfValue::BoolValue(v), Builder::Bool(b)) => b.append_value(*v), + (ProtoOneOfValue::TagValue(v), Builder::Tag(b)) => { + b.append(v).context(ArrowSnafu)?; + } + (ProtoOneOfValue::StringValue(v), Builder::String(b)) => b.append_value(v), + (ProtoOneOfValue::TimeValue(v), Builder::Time(b)) => b.append_value(*v), + (_, _) => return UnexpectedColumnTypeSnafu.fail(), + }, + None => self.append_null(), + } + + Ok(()) + } + + fn append_null(&mut self) { + match &mut self.builder { + Builder::Bool(b) => b.append_null(), + Builder::I64(b) => b.append_null(), + Builder::F64(b) => b.append_null(), + Builder::U64(b) => b.append_null(), + Builder::String(b) => b.append_null(), + Builder::Tag(b) => b.append_null(), + Builder::Time(b) => b.append_null(), + } + } +} + +enum Builder { + Bool(BooleanBuilder), + I64(Int64Builder), + F64(Float64Builder), + U64(UInt64Builder), + String(StringBuilder), + Tag(StringDictionaryBuilder), + Time(TimestampNanosecondBuilder), +} + +/// Takes &str of line protocol, parses lines, validates the schema, and inserts new columns +/// and partitions if present. Assigns the default time to any lines that do not include a time +pub async fn parse_validate_and_update_schema_and_partitions<'a, R>( + lp: &str, + schema: &NamespaceSchema, + table_partition_map: &TablePartitionMap, + partitioner: &Partitioner, + repos: &mut R, + default_time: i64, +) -> Result +where + R: RepoCollection + ?Sized, +{ + let mut lines = vec![]; + for (line_idx, maybe_line) in parse_lines(lp).enumerate() { + let line = maybe_line.context(LineProtocolSnafu { line: line_idx + 1 })?; + lines.push(line); + } + + validate_or_insert_schema_and_partitions( + lines, + schema, + table_partition_map, + partitioner, + repos, + default_time, + ) + .await +} + +/// Takes parsed lines, validates their schema, inserting new columns if present. Determines +/// the partition for each line and generates new partitions if present. Assigns the default +/// time to any lines that do not include a time. +pub async fn validate_or_insert_schema_and_partitions<'a, R>( + lines: Vec>, + schema: &NamespaceSchema, + table_partition_map: &TablePartitionMap, + partitioner: &Partitioner, + repos: &mut R, + default_time: i64, +) -> Result +where + R: RepoCollection + ?Sized, +{ + // The (potentially updated) NamespaceSchema to return to the caller. + let mut schema = Cow::Borrowed(schema); + + // The (potentially updated) PartitionMap to return to the caller + let mut table_partition_map = Cow::Borrowed(table_partition_map); + + // The parsed and validated table_batches + let mut table_batches: HashMap = HashMap::new(); + + let line_count = lines.len(); + let mut field_count = 0; + let mut tag_count = 0; + + for line in lines.into_iter() { + field_count += line.field_set.len(); + tag_count += line.series.tag_set.as_ref().map(|t| t.len()).unwrap_or(0); + + validate_and_convert_parsed_line( + line, + &mut table_batches, + &mut schema, + &mut table_partition_map, + partitioner, + repos, + default_time, + ) + .await?; + } + + let schema = match schema { + Cow::Owned(s) => Some(s), + Cow::Borrowed(_) => None, + }; + + let table_partition_map = match table_partition_map { + Cow::Owned(p) => Some(p), + Cow::Borrowed(_) => None, + }; + + Ok(ValidationResult { + schema, + table_partition_map, + table_batches, + line_count, + field_count, + tag_count, + }) +} + +// &mut Cow is used to avoid a copy, so allow it +#[allow(clippy::ptr_arg)] +async fn validate_and_convert_parsed_line( + line: ParsedLine<'_>, + table_batches: &mut HashMap, + schema: &mut Cow<'_, NamespaceSchema>, + table_partition_map: &mut Cow<'_, TablePartitionMap>, + partitioner: &Partitioner, + repos: &mut R, + default_time: i64, +) -> Result<()> +where + R: RepoCollection + ?Sized, +{ + let table_name = line.series.measurement.as_str(); + + // Check if the table exists in the schema. + // + // Because the entry API requires &mut it is not used to avoid a premature + // clone of the Cow. + let mut table = match schema.tables.get(table_name) { + Some(t) => Cow::Borrowed(t), + None => { + let table = table_load_or_create(&mut *repos, schema.id, &schema.partition_template, table_name).await.context(CatalogSnafu)?; + + assert!(schema + .to_mut() + .tables + .insert(table_name.to_string(), table) + .is_none()); + + Cow::Borrowed(schema.tables.get(table_name).unwrap()) + } + }; + + let mut partition_map = match table_partition_map.map.get(&table.id) { + Some(m) => Cow::Borrowed(m), + None => { + assert!(table_partition_map + .to_mut() + .map + .insert(table.id, PartitionMap::default()) + .is_none()); + + Cow::Borrowed(table_partition_map.map.get(&table.id).unwrap()) + } + }; + + let partition_key = partitioner.partition_key_for_line(&line, default_time); + // Check if the partition exists in the partition map + // + // Because the entry API requires &mut it is not used to avoid a premature + // clone of the Cow. + let partition_id = match partition_map.map.get(&partition_key) { + Some(p) => *p, + None => { + let key = PartitionKey::from(partition_key.as_str()); + let partition = repos + .partitions() + .create_or_get(key, table.id) + .await + .context(CatalogSnafu)?; + + assert!(partition_map + .to_mut() + .map + .insert(partition_key, partition.id) + .is_none()); + + partition.id + } + }; + + // The table is now in the schema (either by virtue of it already existing, + // or through adding it above). + // + // If the table itself needs to be updated during column validation it + // becomes a Cow::owned() copy and the modified copy should be inserted into + // the schema before returning. + + // First, see if any of the columns need to be inserted into the schema + let mut new_cols = Vec::with_capacity(line.column_count() + 1); + if let Some(tagset) = &line.series.tag_set { + for (tag_key, _) in tagset { + if table.columns.get(tag_key.as_str()).is_none() { + new_cols.push((tag_key.to_string(), ColumnType::Tag)); + } + } + } + for (field_name, value) in &line.field_set { + if table.columns.get(field_name.as_str()).is_none() { + new_cols.push(( + field_name.to_string(), + ColumnType::from(field_type_to_column_type(value)), + )); + } + } + + if !new_cols.is_empty() { + let mut column_batch: HashMap<&str, ColumnType> = HashMap::new(); + for (name, col_type) in &new_cols { + if column_batch.insert(name, *col_type).is_some() { + return ColumnNameNotUniqueSnafu { name }.fail(); + } + } + + repos + .columns() + .create_or_get_many_unchecked(table.id, column_batch) + .await + .context(CatalogSnafu)? + .into_iter() + .for_each(|c| table.to_mut().add_column(c)); + } + + // now that we've ensured all columns exist in the schema, construct the actual row and values + // while validating the column types match. + let mut values = Vec::with_capacity(line.column_count() + 1); + + // validate tags, collecting any new ones that must be inserted, or adding the values + if let Some(tagset) = line.series.tag_set { + for (tag_key, value) in tagset { + match table.columns.get(tag_key.as_str()) { + Some(existing) if existing.matches_type(InfluxColumnType::Tag) => { + let value = ProtoValue { + column_id: existing.id.get() as u32, + value: Some(ProtoOneOfValue::TagValue(value.to_string())), + }; + values.push(value); + } + Some(existing) => { + return ColumnTypeMismatchSnafu { + name: tag_key.as_str(), + existing: existing.column_type, + new: InfluxColumnType::Tag, + } + .fail(); + } + None => panic!("every tag should have been inserted as a column before this point"), + } + } + } + + // validate fields, collecting any new ones that must be inserted, or adding values + for (field_name, value) in line.field_set { + match table.columns.get(field_name.as_str()) { + Some(existing) if existing.matches_type(field_type_to_column_type(&value)) => { + let one_of_proto = match value { + FieldValue::I64(v) => ProtoOneOfValue::I64Value(v), + FieldValue::F64(v) => ProtoOneOfValue::F64Value(v), + FieldValue::U64(v) => ProtoOneOfValue::U64Value(v), + FieldValue::Boolean(v) => ProtoOneOfValue::BoolValue(v), + FieldValue::String(v) => ProtoOneOfValue::StringValue(v.to_string()), + }; + let value = ProtoValue { + column_id: existing.id.get() as u32, + value: Some(one_of_proto), + }; + values.push(value); + } + Some(existing) => { + return ColumnTypeMismatchSnafu { + name: field_name.as_str(), + existing: existing.column_type, + new: field_type_to_column_type(&value), + } + .fail(); + } + None => panic!("every field should have been inserted as a column before this point"), + } + } + + // set the time value + let time_column = table + .columns + .get(TIME_COLUMN) + .expect("time column should always be here at this point"); + let time_value = line.timestamp.unwrap_or(default_time); + values.push(ProtoValue { + column_id: time_column.id.get() as u32, + value: Some(ProtoOneOfValue::TimeValue(time_value)), + }); + + let table_batch = table_batches.entry(table.id).or_default(); + let partition_batch = table_batch + .partition_batches + .entry(partition_id) + .or_default(); + + // insert the row into the partition batch + partition_batch.push(ProtoRow { + values, + sequence_number: 0, + }); + + if let Cow::Owned(partition_map) = partition_map { + // The specific table's partition map was mutated and needs inserting into in + // table_partition_map to make changes visible to the caller. + assert!(table_partition_map + .to_mut() + .map + .insert(table.id, partition_map) + .is_some()); + } + + if let Cow::Owned(table) = table { + // The table schema was mutated and needs inserting into the namespace + // schema to make the changes visible to the caller. + assert!(schema + .to_mut() + .tables + .insert(table_name.to_string(), table) + .is_some()); + } + + Ok(()) +} + +fn field_type_to_column_type(field: &FieldValue<'_>) -> InfluxColumnType { + match field { + FieldValue::I64(_) => InfluxColumnType::Field(InfluxFieldType::Integer), + FieldValue::F64(_) => InfluxColumnType::Field(InfluxFieldType::Float), + FieldValue::U64(_) => InfluxColumnType::Field(InfluxFieldType::UInteger), + FieldValue::Boolean(_) => InfluxColumnType::Field(InfluxFieldType::Boolean), + FieldValue::String(_) => InfluxColumnType::Field(InfluxFieldType::String), + } +} + +/// Map of partition to the rows in the partition for the write +#[derive(Debug, Default)] +pub struct TableBatch { + /// Map of partition_id to the rows for that partition + pub partition_batches: HashMap>, +} + +/// Structures commonly used in tests related to catalog and ingest data +#[derive(Debug)] +pub struct TestStructure { + pub catalog: Arc, + pub schema: Arc, + pub partition_map: Arc, +} + +pub mod test_helpers { + use super::*; + use iox_catalog::{interface::Catalog, mem::MemCatalog}; + use std::ops::DerefMut; + use data_types::NamespaceName; + + pub async fn lp_to_record_batch(lp: &str) -> RecordBatch { + let (test_structure, op) = lp_to_proto_initialize(lp).await; + let (_, table_schema) = test_structure.schema.tables.first_key_value().unwrap(); + rows_to_record_batch( + table_schema, + &op.table_batches[0].partition_batches[0].rows, + ) + .unwrap() + } + + pub async fn lp_to_proto_initialize(lp: &str) -> (TestStructure, ProtoWriteBatch) { + let metrics = Arc::new(metric::Registry::default()); + let repo = MemCatalog::new(Arc::clone(&metrics)); + let mut repositories = repo.repositories().await; + const NAMESPACE_NAME: &str = "foo"; + let namespace_name = NamespaceName::new(NAMESPACE_NAME).unwrap(); + + let res = { + let namespace = repositories + .namespaces() + .create(&namespace_name, None, None, None) + .await + .unwrap(); + let schema = NamespaceSchema::new_empty_from(&namespace); + let table_partition_map = TablePartitionMap::default(); + let partitioner = Partitioner::new_time_partitioner("%Y-%m-%d"); + + parse_validate_and_update_schema_and_partitions( + lp, + &schema, + &table_partition_map, + &partitioner, + repositories.deref_mut(), + 86401 * 1000000000, + ) + .await + .unwrap() + }; + + let schema = res.schema.unwrap(); + let table_partition_map = res.table_partition_map.unwrap(); + let batches = res.table_batches; + + ( + TestStructure { + catalog: Arc::new(repo), + schema: Arc::new(schema), + partition_map: Arc::new(table_partition_map), + }, + table_batches_map_to_proto(batches), + ) + } + + pub async fn lp_to_proto_update( + lp: &str, + test_structure: &mut TestStructure, + ) -> ProtoWriteBatch { + let partitioner = Partitioner::new_time_partitioner("%Y-%m-%d"); + let mut repos = test_structure.catalog.repositories().await; + + let res = parse_validate_and_update_schema_and_partitions( + lp, + &test_structure.schema, + &test_structure.partition_map, + &partitioner, + repos.deref_mut(), + 86401 * 1000000000, + ) + .await + .unwrap(); + + if let Some(schema) = res.schema { + test_structure.schema = Arc::new(schema); + } + + if let Some(partition_map) = res.table_partition_map { + test_structure.partition_map = Arc::new(partition_map); + } + + table_batches_map_to_proto(res.table_batches) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_util::assert_batches_eq; + use iox_catalog::{interface::Catalog, mem::MemCatalog}; + use std::ops::DerefMut; + use data_types::NamespaceName; + + #[tokio::test] + async fn validate() { + let metrics = Arc::new(metric::Registry::default()); + let repo = MemCatalog::new(metrics); + let mut repos = repo.repositories().await; + const NAMESPACE_NAME: &str = "foo"; + let namespace_name = NamespaceName::new(NAMESPACE_NAME).unwrap(); + + let namespace = repos + .namespaces() + .create(&namespace_name, Default::default(), None, None) + .await + .unwrap(); + let schema = NamespaceSchema::new_empty_from(&namespace); + let table_partition_map = TablePartitionMap::default(); + let partitioner = Partitioner::new_time_partitioner("%Y-%m-%d"); + + let res = parse_validate_and_update_schema_and_partitions( + "m1,t1=hi f1=12i 123\nm2 f1=true 1234\nm2,t2=asdf f2=1i", + &schema, + &table_partition_map, + &partitioner, + repos.deref_mut(), + 86401 * 1000000000, + ) + .await + .unwrap(); + let schema = res.schema.unwrap(); + let table_partition_map = res.table_partition_map.unwrap(); + let batches = res.table_batches; + + let m1 = schema.tables.get("m1").unwrap(); + assert_eq!(m1.columns.get("t1").unwrap().column_type, ColumnType::Tag); + assert_eq!(m1.columns.get("f1").unwrap().column_type, ColumnType::I64); + assert_eq!( + m1.columns.get("time").unwrap().column_type, + ColumnType::Time + ); + assert_eq!(m1.columns.column_count(), 3); + + let m2 = schema.tables.get("m2").unwrap(); + assert_eq!(m2.columns.get("t2").unwrap().column_type, ColumnType::Tag); + assert_eq!(m2.columns.get("f1").unwrap().column_type, ColumnType::Bool); + assert_eq!(m2.columns.get("f2").unwrap().column_type, ColumnType::I64); + assert_eq!( + m2.columns.get("time").unwrap().column_type, + ColumnType::Time + ); + assert_eq!(m2.columns.column_count(), 4); + + let m1_rows = batches + .get(&m1.id) + .unwrap() + .partition_batches + .get(&PartitionId::new(1)) + .unwrap(); + assert_eq!(m1_rows.len(), 1); + let m2_p1_rows = batches + .get(&m2.id) + .unwrap() + .partition_batches + .get(&PartitionId::new(2)) + .unwrap(); + assert_eq!(m2_p1_rows.len(), 1); + let m2_p2_rows = batches + .get(&m2.id) + .unwrap() + .partition_batches + .get(&PartitionId::new(3)) + .unwrap(); + assert_eq!(m2_p2_rows.len(), 1); + + assert!(schema.tables.get("m1").is_some()); + assert!(schema.tables.get("m2").is_some()); + assert!(table_partition_map + .map + .get(&TableId::new(1)) + .unwrap() + .map + .get("1970-01-01") + .is_some()); + assert!(table_partition_map + .map + .get(&TableId::new(2)) + .unwrap() + .map + .get("1970-01-01") + .is_some()); + assert!(table_partition_map + .map + .get(&TableId::new(2)) + .unwrap() + .map + .get("1970-01-02") + .is_some()); + assert_eq!(batches.len(), 2); + + // ensure that if we parse the same data we get none for the schema and partition map in the result + let res = parse_validate_and_update_schema_and_partitions( + "m1,t1=hi f1=12i 123\nm2 f1=true 1234\nm2,t2=asdf f2=1i", + &schema, + &table_partition_map, + &partitioner, + repos.deref_mut(), + 86401 * 1000000000, + ) + .await + .unwrap(); + assert!(res.table_partition_map.is_none()); + assert!(res.schema.is_none()); + } + + #[tokio::test] + async fn proto_rows_to_record_batch() { + let metrics = Arc::new(metric::Registry::default()); + let repo = MemCatalog::new(metrics); + let mut repos = repo.repositories().await; + const NAMESPACE_NAME: &str = "foo"; + let namespace_name = NamespaceName::new(NAMESPACE_NAME).unwrap(); + + let namespace = repos + .namespaces() + .create(&namespace_name, Default::default(), None, None) + .await + .unwrap(); + let schema = NamespaceSchema::new_empty_from(&namespace); + let table_partition_map = TablePartitionMap::default(); + let partitioner = Partitioner::new_time_partitioner("%Y-%m-%d"); + + let result = parse_validate_and_update_schema_and_partitions( + "foo,t1=hi,t2=asdf f1=3i,f2=1.2 123\nfoo,t1=world f1=4i,f3=true,f4=\"arf arf!\" 124", + &schema, + &table_partition_map, + &partitioner, + repos.deref_mut(), + 86401 * 1000000000, + ) + .await + .unwrap(); + let schema = result.schema.unwrap(); + let batches = result.table_batches; + let proto = table_batches_map_to_proto(batches); + let rows = &proto.table_batches[0].partition_batches[0].rows; + let rb = rows_to_record_batch(schema.tables.get("foo").unwrap(), rows).unwrap(); + + let expected_data = &[ + "+----+-----+------+----------+-------+------+--------------------------------+", + "| f1 | f2 | f3 | f4 | t1 | t2 | time |", + "+----+-----+------+----------+-------+------+--------------------------------+", + "| 3 | 1.2 | | | hi | asdf | 1970-01-01T00:00:00.000000123Z |", + "| 4 | | true | arf arf! | world | | 1970-01-01T00:00:00.000000124Z |", + "+----+-----+------+----------+-------+------+--------------------------------+", + ]; + + assert_batches_eq!(expected_data, &[rb]); + } +} diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index ec6102f3630..2f568a60513 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -28,7 +28,8 @@ use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::HashMap}; use thiserror::Error; -const TIME_COLUMN: &str = "time"; +/// Column name for built in time column on every table. +pub const TIME_COLUMN: &str = "time"; /// Default per-namespace table count service protection limit. pub const DEFAULT_MAX_TABLES: i32 = 500; @@ -264,7 +265,8 @@ where Ok(()) } -async fn table_load_or_create( +/// load the table or create a new one +pub async fn table_load_or_create( repos: &mut R, namespace_id: NamespaceId, namespace_partition_template: &NamespacePartitionTemplateOverride,