-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support catalog.schema.table.column in SQL SELECT and WHERE #5343
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apologize for such a size PR, especially in regards to the amount of breaking API changes involved, and am happy for any feedback, especially if you see a better way to achieve the original issue without causing as much breakage
@@ -677,7 +687,7 @@ mod tests { | |||
// lookup with unqualified name "t1.c0" | |||
let err = schema.index_of_column(&col).err().unwrap(); | |||
assert_eq!( | |||
"Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 't1'.'c1'.", | |||
r#"Schema error: No field named "t1.c0". Valid fields are "t1"."c0", "t1"."c1"."#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a separate change i decided to add, mainly since i believe having quoted identifiers allows for more robust messages as itll properly account for any special characters within and allows users to clearly delineate identifiers in that case
e.g. in old way a user could have an ident like t1'.'t2
which would show in error as 't1'.'t2'
even though it is a single ident, it looks like two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the new change is better.
We can maybe go a step farther and only put "
if the identifier would be normalized (e.g. it has something other than lowercase ascii letters) -- as a potential follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this did come to mind, but:
- Either have to store alongside the name that it was normalized
- Or each time quoting string have to do a pass over it to check for special characters/uppercase characters
1st one I don't really like as have to add more to TableReference (and related structs)
2nd one would be simpler to implement, maybe I'm overthinking the impact of the overhead 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we can improve it as a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raised ticket: #5523
/// Schema contains duplicate unqualified field name | ||
DuplicateUnqualifiedField { name: String }, | ||
/// No field with this name | ||
FieldNotFound { | ||
field: Column, | ||
field: Box<Column>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise clippy lint about large Err variant appeared
relation: Some(relation), | ||
name, | ||
})) | ||
// Possibilities we search with, in order from top to bottom for each len: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main fix for original issue is in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you write some more unit tests for these cases (the comments are great, but it would be better if they were tests so they can't get out of sync as easily)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored and added unit tests
I plan to review this PR carefully tomorrow. I am sorry for the delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you very much @Jefffrey -- I think this PR is very well coded and structured. Very nice. I very much apologize for the delay in reviewing
As one measure of how impactful this change might be, I ran the code on this branch against the IOx test suite (see https://github.com/influxdata/influxdb_iox/pull/7077) and it passed with minor changes. Nice work
Here is what I think is needed prior to merge:
- Some additional testing for name resolution in
identifier.rs
(basically turn the comments into tests) - Some discussion with the broader community about how to handle
col("A")
being normalized tocol("a")
-- I think this may be very surprising to some users and I offered an alternate suggestion (ident
, that is still a breaking API change but may be less confusing)
cc @houqp who did some of the original work on multi-part identifiers
cc @liukun4515 @andygrove @avantgardnerio @thinkharderdev @jackwener @ozankabak in case they have thoughts about this API change or the structure in genera
TableReference::Full { schema, table, .. } => { | ||
schema == qq && table == name | ||
} | ||
let column = Column::from_qualified_name(field.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a nicer API for sure
@@ -677,7 +687,7 @@ mod tests { | |||
// lookup with unqualified name "t1.c0" | |||
let err = schema.index_of_column(&col).err().unwrap(); | |||
assert_eq!( | |||
"Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 't1'.'c1'.", | |||
r#"Schema error: No field named "t1.c0". Valid fields are "t1"."c0", "t1"."c1"."#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the new change is better.
We can maybe go a step farther and only put "
if the identifier would be normalized (e.g. it has something other than lowercase ascii letters) -- as a potential follow on PR
} | ||
|
||
/// Parse a `TableReference` into a OwnedTableReference | ||
impl From<&'_ TableReference<'_>> for OwnedTableReference { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
let r_is_right = | ||
right.schema().field_with_qualified_name(rr, &r.name); | ||
let l_is_left = self.plan.schema().field_with_qualified_name( | ||
&lr.as_table_reference(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried playing around with the signature of field_with_qualified_name
and I couldn't find a way to make lr
still work. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah was thinking maybe having something like Into<TableReference>
or the like.
Related, but I feel TableReference and its related structs OwnedTableReference and ResolvedTableReference got a bit out of hand, since they seem all very similar and have conversions between them, but still are quite separate. I think I initially had field_with_qualified_name
accept a reference to an OwnedTableReference but changed it to TableReference as that wouldn't require owning a bunch of strings if you just wanted to pass in a reference, but can see it doesn't really result in a nice API imo
Maybe need refactoring around the TableReference structs to try make them more interchangeable, via a common trait possibly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe need refactoring around the TableReference structs to try make them more interchangeable, via a common trait possibly
I wonder if we could maybe change TableReference to use Cow
somehow rather than separate variants 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about removing OwnedTableReference -- I started to sketch out what this might look like in #5508
Working with the structures then I think becomes less cumbersome
@@ -497,8 +494,10 @@ fn push_down_scan( | |||
let schema = scan.source.schema(); | |||
let mut projection: BTreeSet<usize> = required_columns | |||
.iter() | |||
// TODO: change scan.table_name from String? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth a follow on ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ticket raised: #5522
let df = df.filter(col("\"A\"").lt_eq(col("c")))? | ||
.aggregate(vec![col("\"A\"")], vec![min(col("b"))])? | ||
.limit(0, Some(100))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a pretty major change for people (that col()
will now parse / normalize the column names according to SQL semantics).
However, it does have the nice side benefit that the handling of .
is no longer treated differently in col()
What do you think about possibly
- changing
col
so that it forms a unqualified reference always (aka does not respect.
) - adding some new function like
ident()
orparse_col()
that treats column like a sql identifier and correctly splits based on.
etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that would have the benefit of being less breaking and being a bit more explicit in what is happening. parse_col
would make it more explicit though I'm not really a fan of that name, feels a bit clunky.
For what its worth Spark does seem to be able to parse the input to their col
function, instead of taking it wholly unqualified I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we simply add an ident()
function that doesn't parse at all? I can help do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the ident()
expr function and updated the user-guide/example-usage.md
doc
Let me know if there are other places I need to change/add docs for
relation: Some(relation), | ||
name, | ||
})) | ||
// Possibilities we search with, in order from top to bottom for each len: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you write some more unit tests for these cases (the comments are great, but it would be better if they were tests so they can't get out of sync as easily)
So in my opinion this PR, while a non trivial breaking change, results in a better outcome -- however, before we merge it let's solict additional community feedback. I posted a note in the slack channel and the dev list https://lists.apache.org/thread/m42rk0sbh88bnbgnx83wshhm97wrxjf7 |
I agree that this is a good change and the API change is OK with us. However, honestly I can not confidently tell that it won't mess with others' usage patterns. I think waiting for some more community feedback is a good idea. |
At a high-level, this seems necessary but painful :/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this PR again I think we should merge it because even though it is disruptive, it fixes a bug and further more a long standing source of bugs (from the inconsistent behavior of col()
)
I recommend as next steps:
- Add an
ident()
orname()
function that allows users to refer to columns likeA
without having to use "A" - Possibly reconsider OwnedTableReference. Here is a proposal of how to handle
OwnedTableReference
s: https://github.com/apache/arrow-datafusion/pull/5508/files
I think the ident
/ name
is important before merge. The OwnedTableReference thing we can do as a follow on PR
THanks again @Jefffrey for pushing this forward
let r_is_right = | ||
right.schema().field_with_qualified_name(rr, &r.name); | ||
let l_is_left = self.plan.schema().field_with_qualified_name( | ||
&lr.as_table_reference(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about removing OwnedTableReference -- I started to sketch out what this might look like in #5508
Working with the structures then I think becomes less cumbersome
let df = df.filter(col("\"A\"").lt_eq(col("c")))? | ||
.aggregate(vec![col("\"A\"")], vec![min(col("b"))])? | ||
.limit(0, Some(100))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we simply add an ident()
function that doesn't parse at all? I can help do that
I've done the changes for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All in all this is really nice work @Jefffrey -- it demonstrates a wonderful knowledge of the DataFusion codebase and I think cleans up some long standing techdebt (@houqp would be proud)
Here is what I think we should do with this PR given it has such a significant API change (at least I think so), unless anyone objects:
- Wait for the next datafusion release (should be cut on this Friday, March 10)
- Merge the PR first thing
This will give us the maximal time to let the dust settle, and a few bleeding edge adopters to pick it up (aka I will integrate it into IOx, and fix any bugs / rough edges we might find during that process).
cc @andygrove @jdye64 @Dandandan (this PR this will mean that anyone that uses col("FoorBar")
it will be the same as col(foobar)
which is not the same today (the colum reference is not normalized).
@Jefffrey has added the indent()
function that has similar behavior to the old one
@@ -53,26 +70,36 @@ impl Column { | |||
/// Deserialize a fully qualified name string into a column | |||
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self { | |||
let flat_name = flat_name.into(); | |||
use sqlparser::tokenizer::Token; | |||
|
|||
let dialect = sqlparser::dialect::GenericDialect {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is one of the key changes in my mind -- use the full standard identifier normalization rules rather than some custom sqlparser based semantics
@@ -677,7 +687,7 @@ mod tests { | |||
// lookup with unqualified name "t1.c0" | |||
let err = schema.index_of_column(&col).err().unwrap(); | |||
assert_eq!( | |||
"Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 't1'.'c1'.", | |||
r#"Schema error: No field named "t1.c0". Valid fields are "t1"."c0", "t1"."c1"."#, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we can improve it as a follow on PR
table: String, | ||
}, | ||
} | ||
pub type OwnedTableReference = TableReference<'static>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -329,19 +329,19 @@ impl SessionContext { | |||
or_replace, | |||
}) => { | |||
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone()); | |||
let table = self.table(&name).await; | |||
let table = self.table(name.clone()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid these clones. Here is one way (this is not required as I don't think these codepaths are particularly performance critical)
This compiled locally for me:
(arrow_dev) alamb@MacBook-Pro-8 arrow-datafusion2 % git diff | cat
git diff | cat
diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs
index 7fb3dfcc7..a710dfc03 100644
--- a/datafusion/common/src/table_reference.rs
+++ b/datafusion/common/src/table_reference.rs
@@ -189,8 +189,9 @@ impl<'a> TableReference<'a> {
}
}
- /// Converts directly into an [`OwnedTableReference`]
- pub fn to_owned_reference(self) -> OwnedTableReference {
+ /// Converts directly into an [`OwnedTableReference`] without
+ /// copying the underlying Strings
+ pub fn to_owned_reference(&self) -> OwnedTableReference {
match self {
Self::Full {
catalog,
@@ -260,6 +261,12 @@ impl<'a> TableReference<'a> {
}
}
+impl <'a> From<&'a OwnedTableReference> for TableReference<'a> {
+ fn from(value: &'a OwnedTableReference) -> Self {
+ value.to_owned_reference()
+ }
+}
+
/// Parse a `String` into a OwnedTableReference
impl From<String> for OwnedTableReference {
fn from(s: String) -> Self {
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index f72ea7560..8807721f7 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -329,19 +329,19 @@ impl SessionContext {
or_replace,
}) => {
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
- let table = self.table(name.clone()).await;
+ let table = self.table(&name).await;
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
- self.deregister_table(name.clone())?;
+ self.deregister_table(&name)?;
let schema = Arc::new(input.schema().as_ref().into());
let physical = DataFrame::new(self.state(), input);
let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(schema, batches)?);
- self.register_table(name.clone(), table)?;
+ self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(true, true, Ok(_)) => Err(DataFusionError::Execution(
@@ -369,15 +369,15 @@ impl SessionContext {
or_replace,
definition,
}) => {
- let view = self.table(name.clone()).await;
+ let view = self.table(&name).await;
match (or_replace, view) {
(true, Ok(_)) => {
- self.deregister_table(name.clone())?;
+ self.deregister_table(&name)?;
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);
- self.register_table(name.clone(), table)?;
+ self.register_table(&name, table)?;
self.return_empty_dataframe()
}
(_, Err(_)) => {
@@ -397,7 +397,7 @@ impl SessionContext {
name, if_exists, ..
}) => {
let result = self
- .find_and_deregister(name.clone(), TableType::Base)
+ .find_and_deregister(&name, TableType::Base)
.await;
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
@@ -412,7 +412,7 @@ impl SessionContext {
name, if_exists, ..
}) => {
let result = self
- .find_and_deregister(name.clone(), TableType::View)
+ .find_and_deregister(&name, TableType::View)
.await;
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
@@ -571,7 +571,7 @@ impl SessionContext {
&self,
cmd: &CreateExternalTable,
) -> Result<DataFrame> {
- let exist = self.table_exist(cmd.name.clone())?;
+ let exist = self.table_exist(cmd.&name)?;
if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
@@ -586,7 +586,7 @@ impl SessionContext {
let table_provider: Arc<dyn TableProvider> =
self.create_custom_table(cmd).await?;
- self.register_table(cmd.name.clone(), table_provider)?;
+ self.register_table(cmd.&name, table_provider)?;
self.return_empty_dataframe()
}
(arrow_dev) alamb@MacBook-Pro-8 arrow-datafusion2 %
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this looks good, as it looks better without the clone, however I think to_owned_reference(...)
will clone the underlying strings anyway:
Edit: implemented something similar
datafusion/expr/src/expr_fn.rs
Outdated
@@ -39,6 +39,12 @@ pub fn col(ident: impl Into<Column>) -> Expr { | |||
Expr::Column(ident.into()) | |||
} | |||
|
|||
/// Create an unqualified column expression from the provided name, without normalizing | |||
/// the column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// the column | |
/// the column | |
/// | |
/// For example `col("A")` refers to a column named 'a' (normalized via SQL rules) | |
/// but `ident("A")` refers to a column named 'A' (not normalized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated doc to have example
@@ -39,6 +39,12 @@ pub fn col(ident: impl Into<Column>) -> Expr { | |||
Expr::Column(ident.into()) | |||
} | |||
|
|||
/// Create an unqualified column expression from the provided name, without normalizing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also update the docstring of col
to note that the identifier is normalized according to SQL rules (and in particular, capital letters are reduced to lowercase)
Perhaps some examples would help too:
As in a doc comment showing:
col("A") == col("a")
col(r#""A""#) != col("a")
I can help write this if you would like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc updated
let mut origin_batches = ctx.table(&table_reference).await?.collect().await?; | ||
let schema = ctx.table_provider(&table_reference).await?.schema(); | ||
ctx.deregister_table(&table_reference)?; | ||
let mut origin_batches = ctx.table(table_reference.clone()).await?.collect().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut origin_batches = ctx.table(table_reference.clone()).await?.collect().await?; | |
let mut origin_batches = ctx.table(&table_reference).await?.collect().await?; |
If you make the changes to owned_table_reference
I suggested above
#[test] | ||
fn test_form_identifier() -> Result<()> { | ||
let err = form_identifier(&[]).expect_err("empty identifiers didn't fail"); | ||
let expected = "Internal error: Incorrect number of identifiers: 0. \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible, I think errors that are due to user supplied input (like sql queries or inputs to col()) should return something other than Internal
-- perhaps DataFusion::Plan
? I think Internal errors are supposed to signal a bug in DataFusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made it Internal
error since form_identifier(...)
is a private function that is only used by sql_compound_identifier_to_expr(...)
and generate_schema_search_terms(...)
which already check that it doesn't pass an invalid list in (hence calls unwrap on it).
So in normal usage this error can't ever occur. I made it return an error type just in case in future the method is used elsewhere. I'll update the comments a bit to reflect this
Edit: comments updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you 👍 makes sense
I plan to merge this PR tomorrow unless anyone else has comments or wants more time to review |
Benchmark runs are scheduled for baseline = 612eb1d and contender = 4d07360. 4d07360 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Thanks again @Jefffrey -- this is pretty epic. It may be disruptive but I think the new consistency will be worth it |
Which issue does this PR close?
Closes #4872
Rationale for this change
Per original issue, be able to support SQL such as
This required a larger refactoring specifically around
Column
, to changerelation
from anOption<String>
toOption<OwnedTableReference>
, since otherwise can only support a single level of identifier for the relation of a column (unless do some weird stuff like serializing/deserializing into that single String)What changes are included in this PR?
Change
Column
to haveOption<OwnedTableReference>
forrelation
field. Accompanying fixes to rest of codebase for this.Also fix for the original issue, where in parsing sql compound identifiers can properly identify identifiers which are comprised like
catalog.schema.table.column
which previously wasn't supported.Are these changes tested?
Unit test added & fixed, also added test to sqllogictest from the original issue
Are there any user-facing changes?
Yes, breaking changes