Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: new impl for loading CSV. #14645

Merged
merged 10 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/formats/src/common_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct InputCommonSettings {
pub timezone: Tz,
pub disable_variant_check: bool,
pub binary_format: BinaryFormat,
pub is_rounding_mode: bool,
}

#[derive(Clone)]
Expand Down
7 changes: 3 additions & 4 deletions src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use crate::InputCommonSettings;
#[derive(Clone)]
pub struct FastFieldDecoderValues {
common_settings: InputCommonSettings,
rounding_mode: bool,
}

impl FieldDecoder for FastFieldDecoderValues {
Expand All @@ -78,7 +77,7 @@ impl FieldDecoder for FastFieldDecoderValues {
}

impl FastFieldDecoderValues {
pub fn create_for_insert(format: FormatSettings, rounding_mode: bool) -> Self {
pub fn create_for_insert(format: FormatSettings, is_rounding_mode: bool) -> Self {
FastFieldDecoderValues {
common_settings: InputCommonSettings {
true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(),
Expand All @@ -92,8 +91,8 @@ impl FastFieldDecoderValues {
timezone: format.timezone,
disable_variant_check: false,
binary_format: Default::default(),
is_rounding_mode,
},
rounding_mode,
}
}

Expand Down Expand Up @@ -212,7 +211,7 @@ impl FastFieldDecoderValues {
Err(_) => {
// cast float value to integer value
let val: f64 = reader.read_float_text()?;
let new_val: Option<T::Native> = if self.rounding_mode {
let new_val: Option<T::Native> = if self.common_settings.is_rounding_mode {
num_traits::cast::cast(val.round())
} else {
num_traits::cast::cast(val)
Expand Down
8 changes: 4 additions & 4 deletions src/query/formats/src/field_decoder/json_ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct FieldJsonAstDecoder {
timezone: Tz,
pub ident_case_sensitive: bool,
pub is_select: bool,
rounding_mode: bool,
is_rounding_mode: bool,
}

impl FieldDecoder for FieldJsonAstDecoder {
Expand All @@ -68,7 +68,7 @@ impl FieldJsonAstDecoder {
timezone: options.timezone,
ident_case_sensitive: options.ident_case_sensitive,
is_select: options.is_select,
rounding_mode,
is_rounding_mode: rounding_mode,
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl FieldJsonAstDecoder {
Some(v) => num_traits::cast::cast(v),
None => match v.as_f64() {
Some(v) => {
if self.rounding_mode {
if self.is_rounding_mode {
num_traits::cast::cast(v.round())
} else {
num_traits::cast::cast(v)
Expand Down Expand Up @@ -178,7 +178,7 @@ impl FieldJsonAstDecoder {
Some(v) => num_traits::cast::cast(v),
None => match v.as_f64() {
Some(v) => {
if self.rounding_mode {
if self.is_rounding_mode {
num_traits::cast::cast(v.round())
} else {
num_traits::cast::cast(v)
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/field_decoder/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl NestedValues {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
}
}
Expand Down
27 changes: 7 additions & 20 deletions src/query/formats/src/field_decoder/separated_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use crate::NestedValues;
pub struct SeparatedTextDecoder {
common_settings: InputCommonSettings,
nested_decoder: NestedValues,
rounding_mode: bool,
}

impl FieldDecoder for SeparatedTextDecoder {
Expand All @@ -79,11 +78,7 @@ impl FieldDecoder for SeparatedTextDecoder {
/// in CSV, we find the exact bound of each field before decode it to a type.
/// which is diff from the case when parsing values.
impl SeparatedTextDecoder {
pub fn create_csv(
params: &CsvFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_csv(params: &CsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(),
Expand All @@ -94,17 +89,13 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: params.binary_format,
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

pub fn create_tsv(
_params: &TsvFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_tsv(_params: &TsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
null_if: vec![NULL_BYTES_ESCAPE.as_bytes().to_vec()],
Expand All @@ -115,17 +106,13 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

pub fn create_xml(
_params: &XmlFileFormatParams,
options_ext: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Self {
pub fn create_xml(_params: &XmlFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self {
SeparatedTextDecoder {
common_settings: InputCommonSettings {
null_if: vec![NULL_BYTES_LOWER.as_bytes().to_vec()],
Expand All @@ -136,9 +123,9 @@ impl SeparatedTextDecoder {
timezone: options_ext.timezone,
disable_variant_check: options_ext.disable_variant_check,
binary_format: Default::default(),
is_rounding_mode: options_ext.is_rounding_mode,
},
nested_decoder: NestedValues::create(options_ext),
rounding_mode,
}
}

Expand Down Expand Up @@ -242,7 +229,7 @@ impl SeparatedTextDecoder {
Err(_) => {
// cast float value to integer value
let val: f64 = read_num_text_exact(&data[..effective])?;
let new_val: Option<T::Native> = if self.rounding_mode {
let new_val: Option<T::Native> = if self.common_settings.is_rounding_mode {
num_traits::cast::cast(val.round())
} else {
num_traits::cast::cast(val)
Expand Down
8 changes: 8 additions & 0 deletions src/query/formats/src/file_format_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct FileFormatOptionsExt {
pub timezone: Tz,
pub is_select: bool,
pub is_clickhouse: bool,
pub is_rounding_mode: bool,
}

impl FileFormatOptionsExt {
Expand All @@ -54,6 +55,11 @@ impl FileFormatOptionsExt {
is_select: bool,
) -> Result<FileFormatOptionsExt> {
let timezone = parse_timezone(settings)?;
let numeric_cast_option = settings
.get_numeric_cast_option()
.unwrap_or("rounding".to_string());
let is_rounding_mode = numeric_cast_option.as_str() == "rounding";

let options = FileFormatOptionsExt {
ident_case_sensitive: false,
headers: 0,
Expand All @@ -63,6 +69,7 @@ impl FileFormatOptionsExt {
timezone,
is_select,
is_clickhouse: false,
is_rounding_mode,
};
Ok(options)
}
Expand All @@ -81,6 +88,7 @@ impl FileFormatOptionsExt {
timezone,
is_select: false,
is_clickhouse: true,
is_rounding_mode: true,
};
let suf = &clickhouse_type.suffixes;
options.headers = suf.headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,9 @@ impl InputFormatTextBase for InputFormatCSV {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
let csv_params = CsvFileFormatParams::downcast_unchecked(params);
Arc::new(SeparatedTextDecoder::create_csv(
csv_params,
options,
rounding_mode,
))
Arc::new(SeparatedTextDecoder::create_csv(csv_params, options))
}

fn try_create_align_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ impl InputFormatTextBase for InputFormatNDJson {
fn create_field_decoder(
_params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
Arc::new(FieldJsonAstDecoder::create(options, rounding_mode))
Arc::new(FieldJsonAstDecoder::create(
options,
options.is_rounding_mode,
))
}

fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,9 @@ impl InputFormatTextBase for InputFormatTSV {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
let tsv_params = TsvFileFormatParams::downcast_unchecked(params);
Arc::new(SeparatedTextDecoder::create_tsv(
tsv_params,
options,
rounding_mode,
))
Arc::new(SeparatedTextDecoder::create_tsv(tsv_params, options))
}

fn try_create_align_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ impl InputFormatTextBase for InputFormatXML {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder> {
Arc::new(SeparatedTextDecoder::create_xml(
XmlFileFormatParams::downcast_unchecked(params),
options,
rounding_mode,
))
}

Expand Down
14 changes: 2 additions & 12 deletions src/query/pipeline/sources/src/input_formats/input_format_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ pub trait InputFormatTextBase: Sized + Send + Sync + 'static {
fn create_field_decoder(
params: &FileFormatParams,
options: &FileFormatOptionsExt,
rounding_mode: bool,
) -> Arc<dyn FieldDecoder>;

fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()>;
Expand Down Expand Up @@ -578,17 +577,8 @@ impl<T: InputFormatTextBase> BlockBuilder<T> {
)
})
.collect();

let numeric_cast_option = ctx
.settings
.get_numeric_cast_option()
.unwrap_or("rounding".to_string());
let rounding_mode = numeric_cast_option.as_str() == "rounding";
let field_decoder = T::create_field_decoder(
&ctx.file_format_params,
&ctx.file_format_options_ext,
rounding_mode,
);
let field_decoder =
T::create_field_decoder(&ctx.file_format_params, &ctx.file_format_options_ext);
let projection = ctx.projection.clone();

BlockBuilder {
Expand Down
2 changes: 1 addition & 1 deletion src/query/pipeline/sources/src/input_formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod beyond_end_reader;
mod error_utils;
pub mod error_utils;
mod impls;
mod input_context;
mod input_format;
Expand Down
Loading
Loading