Skip to content

Commit

Permalink
[SPARK-45475][SQL] Uses DataFrame.foreachPartition instead of RDD.for…
Browse files Browse the repository at this point in the history
…eachPartition in JdbcUtils

### What changes were proposed in this pull request?

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

### Why are the changes needed?

In order to probably assign the SQL execution ID so `df.observe` works with this.

### Does this PR introduce _any_ user-facing change?

Yes. `df.observe` will work with JDBC connectors.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

Unit test was added.

Closes #43304 from HyukjinKwon/foreachbatch.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Oct 10, 2023
1 parent d33644a commit 39cc4ab
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
repartitionedDF.rdd.foreachPartition { iterator => savePartition(
repartitionedDF.foreachPartition { iterator => savePartition(
table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, options)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._

import org.apache.spark.{SparkException, SparkSQLException}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, Observation, QueryTest, Row}
import org.apache.spark.sql.catalyst.{analysis, TableIdentifier}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCo
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper
import org.apache.spark.sql.functions.{lit, percentile_approx}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -2105,4 +2106,18 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-45475: saving a table via JDBC should work with observe API") {
val tableName = "test_table"
val namedObservation = Observation("named")
val observed_df = spark.range(100).observe(
namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.write.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", tableName).save()

val expected = Map("percentile_approx_val" -> 49)
assert(namedObservation.get === expected)
}
}

0 comments on commit 39cc4ab

Please sign in to comment.