Skip to content

Commit

Permalink
[SPARK-50426][PYTHON] Avoid static Python data source lookup when usi…
Browse files Browse the repository at this point in the history
…ng builtin or Java data sources

### What changes were proposed in this pull request?

This PR updates the data source lookup logic to avoid triggering expensive Python data source lookups when the data source is a built-in or Java data source.

### Why are the changes needed?

Python data source lookups can be expensive, and they are often unnecessary when users only use built-in or Java data sources. The primary benefit of the lookup is to display a better error message when a Java data source and a Python data source share the same name, which can be rare. In these cases, we can simply prefer loading Java data sources instead of throwing exceptions.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48971 from allisonwang-db/spark-50426-avoid-static-pyds-lookup.

Lead-authored-by: Allison Wang <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
allisonwang-db and HyukjinKwon committed Nov 27, 2024
1 parent e03319f commit 0138019
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
5 changes: 5 additions & 0 deletions python/docs/source/user_guide/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,8 @@ The following example demonstrates how to implement a basic Data Source using Ar
df = spark.read.format("arrowbatch").load()
df.show()
Usage Notes
-----------

- During Data Source resolution, built-in and Scala/Java Data Sources take precedence over Python Data Sources with the same name; to explicitly use a Python Data Source, make sure its name does not conflict with the other Data Sources.
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,10 @@ object DataSource extends Logging {
throw e
}
}
case _ :: Nil if isUserDefinedDataSource =>
// There was DSv1 or DSv2 loaded, but the same name source was found
// in user defined data source.
throw QueryCompilationErrors.foundMultipleDataSources(provider)
case head :: Nil =>
// We do not check whether the provider is a Python data source
// (isUserDefinedDataSource) to avoid the lookup cost. Java data sources
// always take precedence over Python user-defined data sources.
head.getClass
case sources =>
// There are multiple registered aliases for the input. If there is single datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ object DataSourceManager extends Logging {

private def initialStaticDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = {
if (shouldLoadPythonDataSources) this.synchronized {
logInfo("Loading static Python Data Sources.")
if (dataSourceBuilders.isEmpty) {
val maybeResult = try {
Some(UserDefinedPythonDataSource.lookupAllDataSourcesInPython())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ abstract class PythonDataSourceSuiteBase extends QueryTest with SharedSparkSessi
class PythonDataSourceSuite extends PythonDataSourceSuiteBase {
import IntegratedUDFTestUtils._

test("SPARK-50426: should not trigger static Python data source lookup") {
assume(shouldTestPandasUDFs)
val testAppender = new LogAppender("Python data source lookup")
// Using builtin and Java data sources should not trigger a static
// Python data source lookup
withLogAppender(testAppender) {
spark.read.format("org.apache.spark.sql.test").load()
spark.range(3).write.mode("overwrite").format("noop").save()
}
assert(!testAppender.loggingEvents
.exists(msg => msg.getMessage.getFormattedMessage.contains(
"Loading static Python Data Sources.")))
// Now trigger a Python data source lookup
withLogAppender(testAppender) {
spark.read.format(staticSourceName).load()
}
assert(testAppender.loggingEvents
.exists(msg => msg.getMessage.getFormattedMessage.contains(
"Loading static Python Data Sources.")))
}

test("SPARK-45917: automatic registration of Python Data Source") {
assume(shouldTestPandasUDFs)
val df = spark.read.format(staticSourceName).load()
Expand Down

0 comments on commit 0138019

Please sign in to comment.