Skip to content

Commit

Permalink
refactor: new impl for loading CSV. (#14645)
Browse files Browse the repository at this point in the history
* refactor infer_schema.

* new impl for loading CSV.

* Add new setting enable_new_copy_for_text_formats.

* fix typo.

* better guess of rows of text file.

* comments for pipeline.

* fix typo.

* refactor to avoid use of unreachable!

* comments for CSV separator.

* rename file.
  • Loading branch information
youngsofun authored Feb 20, 2024
1 parent 221bcd6 commit 346a955
Show file tree
Hide file tree
Showing 41 changed files with 2,127 additions and 330 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/formats/src/common_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct InputCommonSettings {
pub timezone: Tz,
pub disable_variant_check: bool,
pub binary_format: BinaryFormat,
pub is_rounding_mode: bool,
}

#[derive(Clone)]
Expand Down
7 changes: 3 additions & 4 deletions src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use crate::InputCommonSettings;
#[derive(Clone)]
pub struct FastFieldDecoderValues {
common_settings: InputCommonSettings,
rounding_mode: bool,
}

impl FieldDecoder for FastFieldDecoderValues {
Expand All @@ -78,7 +77,7 @@ impl FieldDecoder for FastFieldDecoderValues {
}

impl FastFieldDecoderValues {
pub fn create_for_insert(format: FormatSettings, rounding_mode: bool) -> Self {
pub fn create_for_insert(format: FormatSettings, is_rounding_mode: bool) -> Self {
FastFieldDecoderValues {
common_settings: InputCommonSettings {
true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(),
Expand All @@ -92,8 +91,8 @@ impl FastFieldDecoderValues {
timezone: format.timezone,
disable_variant_check: false,
binary_format: Default::default(),
is_rounding_mode,
},
rounding_mode,
}
}

Expand Down Expand Up @@ -212,7 +211,7 @@ impl FastFieldDecoderValues {
Err(_) => {
// cast float value to integer value
let val: f64 = reader.read_float_text()?;
let new_val: Option<T::Native> = if self.rounding_mode {
let new_val: Option<T::Native> = if self.common_settings.is_rounding_mode {
num_traits::cast::cast(val.round())
} else {
num_traits::cast::cast(val)
Expand Down
8 changes: 4 additions & 4 deletions src/query/formats/src/field_decoder/json_ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct FieldJsonAstDecoder {
timezone: Tz,
pub ident_case_sensitive: bool,
pub is_select: bool,
rounding_mode: bool,
is_rounding_mode: bool,
}

impl FieldDecoder for FieldJsonAstDecoder {
Expand All @@ -68,7 +68,7 @@ impl FieldJsonAstDecoder {
timezone: options.timezone,
ident_case_sensitive: options.ident_case_sensitive,
is_select: options.is_select,
rounding_mode,
is_rounding_mode: rounding_mode,
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl FieldJsonAstDecoder {
Some(v) => num_traits::cast::cast(v),
None => match v.as_f64() {
Some(v) => {
if self.rounding_mode {
if self.is_rounding_mode {
num_traits::cast::cast(v.round())
} else {
num_traits::cast::cast(v)
Expand Down Expand Up @@ -178,7 +178,7 @@ impl FieldJsonAstDecoder {
Some(v) => num_traits::cast::cast(v),
None => match v.as_f64() {
Some(v) => {
if self.rounding_mode {
if self.is_rounding_mode {
num_traits::cast::cast(v.round())
} else {
num_traits::cast::cast(v)
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/field_decoder/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl NestedValues {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
}
}
Expand Down
27 changes: 7 additions & 20 deletions src/query/formats/src/field_decoder/separated_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use crate::NestedValues;
pub struct SeparatedTextDecoder {
common_settings: InputCommonSettings,
nested_decoder: NestedValues,
rounding_mode: bool,
}

impl FieldDecoder for SeparatedTextDecoder {
Expand All @@ -79,11 +78,7 @@ impl FieldDecoder for SeparatedTextDecoder {
/// in CSV, we find the exact bound of each field before decode it to a type.
/// which is diff from the case when parsing values.
impl SeparatedTextDecoder {
pub fn create_csv(
params: &CsvFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_csv(params: &CsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(),
Expand All @@ -94,17 +89,13 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: params.binary_format,
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

pub fn create_tsv(
_params: &TsvFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_tsv(_params: &TsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
null_if: vec![NULL_BYTES_ESCAPE.as_bytes().to_vec()],
Expand All @@ -115,17 +106,13 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

pub fn create_xml(
_params: &XmlFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_xml(_params: &XmlFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
null_if: vec![NULL_BYTES_LOWER.as_bytes().to_vec()],
Expand All @@ -136,9 +123,9 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

Expand Down Expand Up @@ -242,7 +229,7 @@ impl SeparatedTextDecoder {
Err(_) => {
// cast float value to integer value
let val: f64 = read_num_text_exact(&data[..effective])?;
let new_val: Option<T::Native> = if self.rounding_mode {
let new_val: Option<T::Native> = if self.common_settings.is_rounding_mode {
num_traits::cast::cast(val.round())
} else {
num_traits::cast::cast(val)
Expand Down
8 changes: 8 additions & 0 deletions src/query/formats/src/file_format_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct FileFormatOptionsExt {
pub timezone: Tz,
pub is_select: bool,
pub is_clickhouse: bool,
pub is_rounding_mode: bool,
}

impl FileFormatOptionsExt {
Expand All @@ -54,6 +55,11 @@ impl FileFormatOptionsExt {
is_select: bool,
) -> Result<FileFormatOptionsExt> {
let timezone = parse_timezone(settings)?;
let numeric_cast_option = settings
.get_numeric_cast_option()
.unwrap_or("rounding".to_string());
let is_rounding_mode = numeric_cast_option.as_str() == "rounding";

let options = FileFormatOptionsExt {
ident_case_sensitive: false,
headers: 0,
Expand All @@ -63,6 +69,7 @@ impl FileFormatOptionsExt {
timezone,
is_select,
is_clickhouse: false,
is_rounding_mode,
};
Ok(options)
}
Expand All @@ -81,6 +88,7 @@ impl FileFormatOptionsExt {
timezone,
is_select: false,
is_clickhouse: true,
is_rounding_mode: true,
};
let suf = &clickhouse_type.suffixes;
options.headers = suf.headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,9 @@ impl InputFormatTextBase for InputFormatCSV {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
let csv_params = CsvFileFormatParams::downcast_unchecked(params);
Arc::new(SeparatedTextDecoder::create_csv(
csv_params,
options,
rounding_mode,
))
Arc::new(SeparatedTextDecoder::create_csv(csv_params, options))
}

fn try_create_align_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ impl InputFormatTextBase for InputFormatNDJson {
fn create_field_decoder(
_params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
Arc::new(FieldJsonAstDecoder::create(options, rounding_mode))
Arc::new(FieldJsonAstDecoder::create(
options,
options.is_rounding_mode,
))
}

fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,9 @@ impl InputFormatTextBase for InputFormatTSV {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
let tsv_params = TsvFileFormatParams::downcast_unchecked(params);
Arc::new(SeparatedTextDecoder::create_tsv(
tsv_params,
options,
rounding_mode,
))
Arc::new(SeparatedTextDecoder::create_tsv(tsv_params, options))
}

fn try_create_align_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ impl InputFormatTextBase for InputFormatXML {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
Arc::new(SeparatedTextDecoder::create_xml(
XmlFileFormatParams::downcast_unchecked(params),
options,
rounding_mode,
))
}

Expand Down
14 changes: 2 additions & 12 deletions src/query/pipeline/sources/src/input_formats/input_format_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ pub trait InputFormatTextBase: Sized + Send + Sync + 'static {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder>;

fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()>;
Expand Down Expand Up @@ -578,17 +577,8 @@ impl<T: InputFormatTextBase> BlockBuilder<T> {
)
})
.collect();

let numeric_cast_option = ctx
.settings
.get_numeric_cast_option()
.unwrap_or("rounding".to_string());
let rounding_mode = numeric_cast_option.as_str() == "rounding";
let field_decoder = T::create_field_decoder(
&ctx.file_format_params,
&ctx.file_format_options_ext,
rounding_mode,
);
let field_decoder =
T::create_field_decoder(&ctx.file_format_params, &ctx.file_format_options_ext);
let projection = ctx.projection.clone();

BlockBuilder {
Expand Down
2 changes: 1 addition & 1 deletion src/query/pipeline/sources/src/input_formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod beyond_end_reader;
mod error_utils;
pub mod error_utils;
mod impls;
mod input_context;
mod input_format;
Expand Down
Loading

0 comments on commit 346a955

Please sign in to comment.