Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Delta to use Apache Spark 3.3.0 #1257

Closed
wants to merge 2 commits into from

Conversation

vkorukanti
Copy link
Collaborator

@vkorukanti vkorukanti commented Jul 7, 2022

Description

Upgrade the Spark dependency version to 3.3.0. Following are the major changes:

  • Test fixes to change the expected error message
  • VacuumCommand: Update the parallel delete to first check if there are entries before trying to reduce.
  • Update the LogicalPlan used to represent the create or replace command in DeltaTableBuilder
  • Spark ersion upgrade in build and test setup scripts

Fixes #1217

* This API is used just for parsing the SELECT queries. Delta parser doesn't override
* the Spark parser, that means this can be delegated directly to the Spark parser.
*/
override def parseQuery(sqlText: String): LogicalPlan = delegate.parseQuery(sqlText)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didnt quite get what this means. What has changed in the control flow of SELECT queries between 3.2 and 3.3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the issue that was causing a lot of failures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a new API added to the SparkSQL parser interface. We don't need to change anything here, just delegate it to the Spark parser.

No this wasn't causing test failures. It just failed the build.

CreateTableStatement(
table,
CreateTable(
UnresolvedDBObjectName(table, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the false for? use paramName = false format.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the parameter name isNamespace. As far as I know this is not applicable to Delta catalog, so marked as false

ReplaceTableStatement(
table,
ReplaceTable(
UnresolvedDBObjectName(table, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the response in above comment.

@@ -406,15 +406,15 @@ trait DeltaAlterTableTests extends DeltaAlterTableTestBase {
|ALTER TABLE $tableName ADD COLUMNS (m.key.mkv3 long)
""".stripMargin)
}
checkErrMsg(ex.getMessage, Seq("m", "key", "mkv3"))
checkErrMsg(ex.getMessage, Seq("`m`", "`key`", "`mkv3`"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed? Isnt checkErrMsg doing substring match where searching for "m" should work even if the error message has "m"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it searches for `m`.`key`.`mkv3`. Earlier it used to check for m.key.mkv3. I reworked this to change only the checkErrMsg

@@ -117,12 +117,12 @@ class DeltaDropColumnSuite extends QueryTest
val err1 = intercept[AnalysisException] {
spark.table("t1").where("a = 'str1'").collect()
}.getMessage
assert(err1.contains("cannot be resolved") || err1.contains("cannot resolve"))
assert(err1.contains("Column 'a' does not exist") || err1.contains("cannot resolve"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to put this as an alternative with ||

@@ -221,7 +221,7 @@ abstract class DeltaInsertIntoTestsWithTempViews(
case e: AnalysisException =>
assert(e.getMessage.contains("Inserting into a view is not allowed") ||
e.getMessage.contains("Inserting into an RDD-based table is not allowed") ||
e.getMessage.contains("Table default.v not found"))
e.getMessage.contains("Table or view 'v' not found in database 'default'"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this alternatives?

@tdas
Copy link
Contributor

tdas commented Jul 22, 2022

Please update the title and description.

case v10x_and_above if Try {
v10x_and_above.split('.')(0).toInt
}.toOption.exists(_ >= 1) =>
case v21x_and_above if scala.util.Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some plain english explanation for each case. it's getting complicated.

I also think this can be simplified further by pulling out this version splitting into a separate function (there are multiple copies now). Also, I think the function can simply convert string x.y.z into a decimal number x.y (z will never decide anything) making the comparison far easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we really need to support all previous versions? we can only support only up to last major version. In fact, just choosing spark version based on delta version will not work seamlessly as a scala versions will have to change as well if we go too far back.

either way, I am okay with this change in this PR (add docs if you can, not a blocker), but would good to create an issue to simplify this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Spark version lookup is added as part of 1118a72. The reason could be to allow running the examples on any Delta version which can be set through env: DELTA_VERSION.

I will simplify this

@@ -318,6 +318,8 @@ trait VacuumCommandImpl extends DeltaCommand {
import spark.implicits._

if (parallel) {
// If there are no entries, do not call reduce as it results in empty collection error
if (diff.count() == 0) return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use take(1).isEmpty instead. much cheaper.

@vkorukanti vkorukanti changed the title [WIP] Update to Spark 3.3.0 Upgrade Delta to use Apache Spark 3.3.0 Jul 23, 2022
Copy link
Contributor

@jaceklaskowski jaceklaskowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except this tiny nit (sorry, couldn't resist)

@@ -320,37 +322,30 @@ class DeltaTableBuilder private[tables](
colNames.map(name => DeltaTableUtils.parseColToTransform(name))
}.getOrElse(Seq.empty[Transform])

val tableSpec = org.apache.spark.sql.catalyst.plans.logical.TableSpec(
properties = this.properties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this. (unless I'm mistaken, it's not used throughout the codebase)

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ganeshchand pushed a commit to ganeshchand/delta that referenced this pull request Aug 10, 2022
## Description
Upgrade the Spark dependency version to 3.3.0. Following are the major changes:
* Test fixes to change the expected error message
* `VacuumCommand`: Update the parallel delete to first check if there are entries before trying to `reduce`.
* Update the `LogicalPlan` used to represent the create or replace command in `DeltaTableBuilder`
* Spark version upgrade in build and test setup scripts
* Spark 3.3 upgraded the log4j from 1.x to 2.x which has a different log4j properties format

Fixes delta-io#1217

Closes delta-io#1257

Signed-off-by: Venki Korukanti <[email protected]>
GitOrigin-RevId: 3e930d3c2cef5fca5f2cd8dd94a8617dbe2f747b
@cometta
Copy link

cometta commented Aug 26, 2022

will this be merged anytime soon?

@tdas
Copy link
Contributor

tdas commented Aug 26, 2022

It has been merge and there is a Delta 2.1 Preview on Spark 3.3 currently undergoing community testing - https://github.com/delta-io/delta/releases/tag/v2.1.0rc1

We are hoping to make the final release of 2.1 early next week.

@allisonport-db allisonport-db added this to the 2.1.0 milestone Aug 28, 2022
@vkorukanti vkorukanti deleted the spark33 branch October 2, 2023 05:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Spark 3.3 support
6 participants