-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional APIs for async physical plan evaluation #1382
Conversation
Conformance comparison report
Number passing in both: 5384 Number failing in both: 434 Number passing in Base (5121093) but now fail: 0 Number failing in Base (5121093) but now pass: 0 |
cfd8b46
to
4a1a79c
Compare
4a1a79c
to
eea05a0
Compare
// runBlocking { | ||
println("Calling") | ||
someAsyncOp() | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, in the synchronous evaluator, this someAsyncOp()
call would require a runBlocking
call. With the async evaluator, this runBlocking
call is no longer necessary.
rows.add(state.registers.clone()) | ||
} | ||
|
||
val rowWithValues = rows.map { row -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the async evaluator for Sort
, we evaluate the sortKeys
before creating the comparator (L46). This is needed because the evaluation needs to occur in a suspend fun
.
In the synchronous evaluator, the evaluation took place within the fold of the comparator: https://github.com/partiql/partiql-lang-kotlin/blob/plan-eval-async-keep-sync/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryDefault.kt#L65-L75, which is not a suspend fun
, hence the non-trivial refactor.
@@ -0,0 +1,112 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the async operator factories are nearly the same as the synchronous versions other than the SortOperator
, which needed a slight refactor. The primary changes for the operator factories are the following:
- deprecate the synchronous operator factories and relevant classes
- creation of async operator factory with
Async
suffix (e.g.<some operator>OperatorFactory
-><some operator>OperatorFactoryAsync
) that implementsRelationExpressionAsync
- Any reference to
ValueExpression
orRelationExpression
are changed toValueExpressionAsync
andRelationExpressionAsync
- The
evaluate
function is now asuspend fun
@@ -71,7 +72,10 @@ class PartiQLCompilerPipelineExample(out: PrintStream) : Example(out) { | |||
|
|||
print("PartiQL query:", query) | |||
@OptIn(ExperimentalPartiQLCompilerPipeline::class) | |||
val exprValue = when (val result = partiQLCompilerPipeline.compile(query).eval(session)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Examples using the synchronous physical plan evaluator and the cli were changed to use the async physical plan evaluator.
parser, planner, compiler | ||
); | ||
|
||
String query = "SELECT t.name FROM myTable AS t WHERE t.age > 20"; | ||
|
||
print("PartiQL query:", query); | ||
PartiQLResult result = pipeline.compile(query).eval(session); | ||
|
||
// Calling Kotlin coroutines from Java requires some additional libraries from `kotlinx.coroutines.future` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not too easy to call Kotlin async functions from Java. This example demonstrates one way to do so by converting the Kotlin async call into a Java CompletableFuture
. If there's a need to call the async evaluator from Java in the future, we can add more helper functions to the Kotlin APIs.
/** | ||
* [PartiQLCompilerAsync] is responsible for transforming a [PartiqlPhysical.Plan] into an executable [PartiQLStatementAsync]. | ||
*/ | ||
@ExperimentalPartiQLCompilerPipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reuse the existing experimental annotation that requires an @OptIn
.
val currentRegister = env.registers.clone() | ||
val elements: Flow<ExprValue> = flow { | ||
env.load(currentRegister) | ||
val relItr = bexprThunk(env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This thunk evaluation is an async call. In the synchronous code, this call was within a sequence -- https://github.com/partiql/partiql-lang-kotlin/blob/plan-eval-async-keep-sync/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerImpl.kt#L335-L341. But since the evaluation is an async call, we need to use a coroutine Flow
(similar to a sequence), which allows for an async call within the block.
More on Flow
s can be found here: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/.
@@ -30,15 +32,24 @@ class TestContext { | |||
assertEquals(expectedIon, result.toIonValue(ION)) | |||
} | |||
|
|||
// Executes query on async evaluator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests in partiql-lang that tested the synchronous physical plan evaluator will also test the async physical plan evaluator. I tried to limit the amount of copied code, where possible, while also making it easy to remove the synchronous tests when we remove the synchronous evaluator in an upcoming major version.
/** Converts instances of [PartiqlPhysical.Bexpr] to any [T]. A `suspend` version of the physical plan converter | ||
* interface is added since PIG currently does not output async functions. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan to async thunk converter implements this Converter
, which is the same as the PIG-generated Converter
but the functions within the interface are all async (i.e. suspend fun
s). We could consider adding some configurability to PIG to support generating async versions.
* @param TEnv The type of the environment. Generic so that the legacy AST compiler and the new compiler may use | ||
* different types here. | ||
*/ | ||
internal typealias ThunkAsync<TEnv> = suspend (TEnv) -> ExprValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File is basically the same as Thunk.kt
. Notable changes include
- Making any functions/lambda/types async (i.e. adding
suspend
) - Converting sequence constructors that make calls to async functions use coroutine
Flow
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only concern I have is about the .toList
calls in the thunk returned from compileBindingsToValues
. We need to decide if we should try to avoid the eagerness now or if I should file a feature request later.
@@ -1,9 +1,10 @@ | |||
package org.partiql.examples |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you want to rename this class and file to include the Async
suffix? The java version of this has the suffix.
Also, what's the plan when the sync pipeline is removed? Will the new classes keep their Async
suffix or will the suffix be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the catch. Class and file should include the Async
suffix. I will change it.
Also, what's the plan when the sync pipeline is removed? Will the new classes keep their Async suffix or will the suffix be removed?
The new classes will keep the Async
suffix.
partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerAsyncImpl.kt
Show resolved
Hide resolved
internal val MetaContainer.sourceLocationMeta get() = this[SourceLocationMeta.TAG] as? SourceLocationMeta | ||
internal val MetaContainer.sourceLocationMetaOrUnknown get() = this.sourceLocationMeta ?: UNKNOWN_SOURCE_LOCATION | ||
|
||
internal fun StaticType.getTypes() = when (val flattened = this.flatten()) { | ||
is AnyOfType -> flattened.types | ||
else -> listOf(this) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I remove these top-level internal functions, this is a breaking change from Java. From the Java to Kotlin interop docs:
internal
declarations becomepublic
in Java. Members ofinternal
classes go through name mangling, to make it harder to accidentally use them from Java and to allow overloading for members with the same signature that don't see each other according to Kotlin rules
So from Java code, internal
classes will have some name mangling. But for top-level internal
functions, there is no name mangling. For instance, StaticType.getTypes()
is directly callable from Java by:
PhysicalPlanCompilerAsyncImplKt.getTypes(StaticType.ANY);
IntelliJ will give an error like "Usage of Kotlin internal declaration from different module " but these are just errors in the IDE that can be disabled. Gradle and the Java compiler can still run the internal Kotlin code.
I'll cut an issue to track if we should allow for these top-level functions (both internal
and public
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GitHub issue: #1387
Ran the Java API compliance checker between this PR and v0.14.3. The only breaking changes were the removal of the internal functions from |
Description
Creates an asynchronous version of the existing physical plan evaluator APIs. This PR differs from the other attempt to make the physical plan evaluator async (main...plan-eval-async-make-statement-eval-async), which had changed the existing synchronous APIs to be async. Performance-wise, we see about a 10-20% drop in performance when using the async APIs on a single query.
This PR chooses to have both APIs to be compatible with semver. Due to the performance drop, we choose to not just wrap the async evaluator w/ a
runBlocking
call. Including both versions also makes it easier to test the synchronous and asynchronous API performance more easily. The previous synchronous versions have been marked as deprecated and are expected to be removed in the next major version.For reviewers, I recommend starting to look at
The remaining changes were pretty straightforward (other than what's noted in the self-review comments). Essentially they were
Async
added to the end of the previous class/interface)suspend fun
Other Information
Updated Unreleased Section in CHANGELOG: [YES]
Any backward-incompatible changes? [NO]
Any new external dependencies? [YES]
CompletableFuture
)Do your changes comply with the Contributing Guidelines
and Code Style Guidelines? [YES]
License Information
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.