Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional APIs for async physical plan evaluation #1382

Merged
merged 12 commits into from
Mar 12, 2024
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,28 @@ Thank you to all who have contributed!

### Added
- Added constrained decimal as valid parameter type to functions that take in numeric parameters.
- Added async version of physical plan evaluator `PartiQLCompilerAsync`.
- The following related async APIs have been added:
- `org.partiql.lang.compiler` -- `PartiQLCompilerAsync`, `PartiQLCompilerAsyncBuilder`, `PartiQLCompilerAsyncDefault`, `PartiQLCompilerPipelineAsync`
- `org.partiql.lang.eval` -- `PartiQLStatementAsync`
- `org.partiql.lang.eval.physical` -- `VariableBindingAsync`
- `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactoryAsync`, `CompiledGroupKeyAsync`, `CompiledAggregateFunctionAsync`, `FilterRelationalOperatorFactoryAsync`, `JoinRelationalOperatorFactoryAsync`, `LetRelationalOperatorFactoryAsync`, `LimitRelationalOperatorFactoryAsync`, `OffsetRelationalOperatorFactoryAsync`, `ProjectRelationalOperatorFactoryAsync`, `RelationExpressionAsync`, `ScanRelationalOperatorFactoryAsync`, `SortOperatorFactoryAsync`, `CompiledSortKeyAsync`, `UnpivotOperatorFactoryAsync`, `ValueExpressionAsync`, `WindowRelationalOperatorFactoryAsync`, `CompiledWindowFunctionAsync`
- `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunctionAsync`, `WindowFunctionAsync`
- Overall, we see about a 10-20% performance decline in running a single query on the synchronous vs async evaluator
- JMH benchmarks added to partiql-lang: `PartiQLCompilerPipelineBenchmark` and `PartiQLCompilerPipelineAsyncBenchmark`

### Changed
- Function resolution logic: Now the function resolver would match all possible candidate(based on if the argument can be coerced to the Signature parameter type). If there are multiple match it will first attempt to pick the one requires the least cast, then pick the function with the highest precedence.
- partiql-cli -- experimental version of CLI now uses the async physical plan evaluator

### Deprecated
- As part of the additions to make an async physical plan evaluator, the synchronous physical plan evaluator `PartiQLCompiler` has been deprecated.
- The following related APIs have been deprecated
- `org.partiql.lang.compiler` -- `PartiQLCompiler`, `PartiQLCompilerBuilder`, `PartiQLCompilerDefault`, `PartiQLCompilerPipeline`
- `org.partiql.lang.eval` -- `PartiQLStatement`
- `org.partiql.lang.eval.physical` -- `VariableBinding`
- `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactory`, `CompiledGroupKey`, `CompiledAggregateFunction`, `FilterRelationalOperatorFactory`, `JoinRelationalOperatorFactory`, `LetRelationalOperatorFactory`, `LimitRelationalOperatorFactory`, `OffsetRelationalOperatorFactory`, `ProjectRelationalOperatorFactory`, `RelationExpression`, `ScanRelationalOperatorFactory`, `SortOperatorFactory`, `CompiledSortKey`, `UnpivotOperatorFactory`, `ValueExpression`, `WindowRelationalOperatorFactory`, `CompiledWindowFunction`
- `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunction`, `WindowFunction`

### Fixed

Expand All @@ -43,7 +60,7 @@ Thank you to all who have contributed!

### Contributors
Thank you to all who have contributed!
- @<your-username>
- @alancai98

## [0.14.3] - 2024-02-14

Expand Down
18 changes: 16 additions & 2 deletions buildSrc/src/main/kotlin/partiql.versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ object Versions {
const val jansi = "2.4.0"
const val jgenhtml = "1.6"
const val jline = "3.21.0"
const val jmh = "0.5.3"
const val jmhGradlePlugin = "0.5.3"
const val jmhCore = "1.37"
const val jmhGeneratorAnnprocess = "1.37"
const val jmhGeneratorBytecode = "1.37"
const val joda = "2.12.1"
const val kotlinPoet = "1.11.0"
const val kotlinxCollections = "0.3.5"
const val picoCli = "4.7.0"
const val kasechange = "1.3.0"
const val ktlint = "11.6.0"
const val pig = "0.6.2"
const val kotlinxCoroutines = "1.6.0"
const val kotlinxCoroutinesJdk8 = "1.6.0"

// Testing
const val assertj = "3.11.0"
Expand All @@ -54,6 +59,7 @@ object Versions {
const val junit4Params = "1.1.1"
const val mockito = "4.5.0"
const val mockk = "1.11.0"
const val kotlinxCoroutinesTest = "1.6.0"
}

object Deps {
Expand Down Expand Up @@ -84,6 +90,8 @@ object Deps {
const val picoCli = "info.picocli:picocli:${Versions.picoCli}"
const val pig = "org.partiql:partiql-ir-generator:${Versions.pig}"
const val pigRuntime = "org.partiql:partiql-ir-generator-runtime:${Versions.pig}"
const val kotlinxCoroutines = "org.jetbrains.kotlinx:kotlinx-coroutines-core:${Versions.kotlinxCoroutines}"
const val kotlinxCoroutinesJdk8 = "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:${Versions.kotlinxCoroutinesJdk8}"

// Testing
const val assertj = "org.assertj:assertj-core:${Versions.assertj}"
Expand All @@ -97,6 +105,12 @@ object Deps {
const val kotlinTestJunit = "org.jetbrains.kotlin:kotlin-test-junit5:${Versions.kotlin}"
const val mockito = "org.mockito:mockito-junit-jupiter:${Versions.mockito}"
const val mockk = "io.mockk:mockk:${Versions.mockk}"
const val kotlinxCoroutinesTest = "org.jetbrains.kotlinx:kotlinx-coroutines-test:${Versions.kotlinxCoroutinesTest}"

// JMH Benchmarking
const val jmhCore = "org.openjdk.jmh:jmh-core:${Versions.jmhCore}"
const val jmhGeneratorAnnprocess = "org.openjdk.jmh:jmh-core:${Versions.jmhGeneratorAnnprocess}"
const val jmhGeneratorBytecode = "org.openjdk.jmh:jmh-core:${Versions.jmhGeneratorBytecode}"
}

object Plugins {
Expand All @@ -114,4 +128,4 @@ object Plugins {
const val ktlint = "org.jlleitschuh.gradle.ktlint"
const val library = "org.gradle.java-library"
const val testFixtures = "org.gradle.java-test-fixtures"
}
}
2 changes: 2 additions & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ application {
dependencies {
implementation(project(":partiql-lang"))
implementation(project(":partiql-types"))
implementation(Deps.kotlinxCoroutines)
implementation(Deps.kotlinxCoroutinesJdk8)
implementation(Deps.awsSdkS3)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@

import com.amazon.ion.IonSystem;
import com.amazon.ion.system.IonSystemBuilder;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import kotlin.OptIn;
import kotlin.coroutines.EmptyCoroutineContext;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.future.FutureKt;
import org.jetbrains.annotations.NotNull;
import org.partiql.annotations.ExperimentalPartiQLCompilerPipeline;
import org.partiql.examples.util.Example;
import org.partiql.lang.compiler.PartiQLCompiler;
import org.partiql.lang.compiler.PartiQLCompilerBuilder;
import org.partiql.lang.compiler.PartiQLCompilerPipeline;
import org.partiql.lang.compiler.PartiQLCompilerAsync;
import org.partiql.lang.compiler.PartiQLCompilerAsyncBuilder;
import org.partiql.lang.compiler.PartiQLCompilerPipelineAsync;
import org.partiql.lang.eval.Bindings;
import org.partiql.lang.eval.EvaluationSession;
import org.partiql.lang.eval.ExprValue;
import org.partiql.lang.eval.PartiQLResult;
import org.partiql.lang.eval.PartiQLStatementAsync;
import org.partiql.lang.eval.ProjectionIterationBehavior;
import org.partiql.lang.planner.EvaluatorOptions;
import org.partiql.lang.planner.GlobalResolutionResult;
Expand All @@ -25,14 +33,14 @@
import java.io.PrintStream;

/**
* This is an example of using PartiQLCompilerPipeline in Java.
* This is an example of using PartiQLCompilerPipelineAsync in Java.
* It is an experimental feature and is marked as such, with @OptIn, in this example.
* Unfortunately, it seems like the Java does not recognize the Optin annotation specified in Kotlin.
* Unfortunately, it seems like the Java does not recognize the OptIn annotation specified in Kotlin.
* Java users will be able to access the experimental APIs freely, and not be warned at all.
*/
public class PartiQLCompilerPipelineJavaExample extends Example {
public class PartiQLCompilerPipelineAsyncJavaExample extends Example {

public PartiQLCompilerPipelineJavaExample(@NotNull PrintStream out) {
public PartiQLCompilerPipelineAsyncJavaExample(@NotNull PrintStream out) {
super(out);
}

Expand All @@ -49,10 +57,7 @@ public void run() {
"{name: \"mary\", age: 19}" +
"]";

final Bindings<ExprValue> globalVariables = Bindings.<ExprValue>lazyBindingsBuilder().addBinding("myTable", () -> {
ExprValue exprValue = ExprValue.of(ion.singleValue(myTable));
return exprValue;
}).build();
final Bindings<ExprValue> globalVariables = Bindings.<ExprValue>lazyBindingsBuilder().addBinding("myTable", () -> ExprValue.of(ion.singleValue(myTable))).build();

final EvaluationSession session = EvaluationSession.builder()
.globals(globalVariables)
Expand All @@ -79,17 +84,41 @@ public void run() {
final PartiQLPlanner planner = PartiQLPlannerBuilder.standard().globalVariableResolver(globalVariableResolver).build();

@OptIn(markerClass = ExperimentalPartiQLCompilerPipeline.class)
final PartiQLCompiler compiler = PartiQLCompilerBuilder.standard().options(evaluatorOptions).build();
final PartiQLCompilerAsync compiler = PartiQLCompilerAsyncBuilder.standard().options(evaluatorOptions).build();

@OptIn(markerClass = ExperimentalPartiQLCompilerPipeline.class)
final PartiQLCompilerPipeline pipeline = new PartiQLCompilerPipeline(
final PartiQLCompilerPipelineAsync pipeline = new PartiQLCompilerPipelineAsync(
parser, planner, compiler
);

String query = "SELECT t.name FROM myTable AS t WHERE t.age > 20";

print("PartiQL query:", query);
PartiQLResult result = pipeline.compile(query).eval(session);

// Calling Kotlin coroutines from Java requires some additional libraries from `kotlinx.coroutines.future`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too easy to call Kotlin async functions from Java. This example demonstrates one way to do so by converting the Kotlin async call into a Java CompletableFuture. If there's a need to call the async evaluator from Java in the future, we can add more helper functions to the Kotlin APIs.

// to return a `java.util.concurrent.CompletableFuture`. If a use case arises to call the
// `PartiQLCompilerPipelineAsync` APIs directly from Java, we can add Kotlin functions that directly return
// Java's async libraries (e.g. in https://stackoverflow.com/a/52887677).
CompletableFuture<PartiQLStatementAsync> statementFuture = FutureKt.future(
CoroutineScopeKt.CoroutineScope(EmptyCoroutineContext.INSTANCE),
EmptyCoroutineContext.INSTANCE,
CoroutineStart.DEFAULT,
(scope, continuation) -> pipeline.compile(query, continuation)
);

PartiQLResult result;
try {
PartiQLStatementAsync statement = statementFuture.get();
CompletableFuture<PartiQLResult> resultFuture = FutureKt.future(
CoroutineScopeKt.CoroutineScope(EmptyCoroutineContext.INSTANCE),
EmptyCoroutineContext.INSTANCE,
CoroutineStart.DEFAULT,
(scope, continuation) -> statement.eval(session, continuation)
);
result = resultFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
ExprValue exprValue = null;
if (result instanceof PartiQLResult.Value) {
exprValue = ((PartiQLResult.Value) result).getValue();
Expand All @@ -100,4 +129,4 @@ public void run() {

print("result", exprValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.partiql.examples

import com.amazon.ion.system.IonSystemBuilder
import kotlinx.coroutines.runBlocking
import org.partiql.annotations.ExperimentalPartiQLCompilerPipeline
import org.partiql.examples.util.Example
import org.partiql.lang.compiler.PartiQLCompilerPipeline
import org.partiql.lang.compiler.PartiQLCompilerPipelineAsync
import org.partiql.lang.eval.Bindings
import org.partiql.lang.eval.EvaluationSession
import org.partiql.lang.eval.ExprValue
Expand All @@ -20,7 +21,7 @@ import java.io.PrintStream
* One way to do so is to add the `Optin(Experimental<X>::class) before the class. where <X> is the feature name.
* Also see: https://kotlinlang.org/docs/opt-in-requirements.html#module-wide-opt-in
*/
class PartiQLCompilerPipelineExample(out: PrintStream) : Example(out) {
class PartiQLCompilerPipelineAsyncExample(out: PrintStream) : Example(out) {

private val myIonSystem = IonSystemBuilder.standard().build()

Expand Down Expand Up @@ -59,7 +60,7 @@ class PartiQLCompilerPipelineExample(out: PrintStream) : Example(out) {
.build()

@OptIn(ExperimentalPartiQLCompilerPipeline::class)
private val partiQLCompilerPipeline = PartiQLCompilerPipeline.build {
private val partiQLCompilerPipeline = PartiQLCompilerPipelineAsync.build {
planner
.globalVariableResolver(globalVariableResolver)
compiler
Expand All @@ -71,7 +72,10 @@ class PartiQLCompilerPipelineExample(out: PrintStream) : Example(out) {

print("PartiQL query:", query)
@OptIn(ExperimentalPartiQLCompilerPipeline::class)
val exprValue = when (val result = partiQLCompilerPipeline.compile(query).eval(session)) {
val result = runBlocking {
partiQLCompilerPipeline.compile(query).eval(session)
}
val exprValue = when (result) {
is PartiQLResult.Value -> result.value
is PartiQLResult.Delete,
is PartiQLResult.Explain.Domain,
Expand Down
10 changes: 6 additions & 4 deletions examples/src/main/kotlin/org/partiql/examples/util/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import org.partiql.examples.EvaluationWithLazyBindings
import org.partiql.examples.ParserErrorExample
import org.partiql.examples.ParserExample
import org.partiql.examples.ParserJavaExample
import org.partiql.examples.PartiQLCompilerPipelineExample
import org.partiql.examples.PartiQLCompilerPipelineJavaExample
import org.partiql.examples.PartiQLCompilerPipelineAsyncExample
import org.partiql.examples.PartiQLCompilerPipelineAsyncJavaExample
import org.partiql.examples.PartialEvaluationVisitorTransformExample
import org.partiql.examples.PreventJoinVisitorExample
import org.partiql.examples.S3JavaExample
Expand All @@ -26,7 +26,9 @@ private val examples = mapOf(
S3JavaExample::class.java.simpleName to S3JavaExample(System.out),
EvaluationJavaExample::class.java.simpleName to EvaluationJavaExample(System.out),
ParserJavaExample::class.java.simpleName to ParserJavaExample(System.out),
PartiQLCompilerPipelineJavaExample::class.java.simpleName to PartiQLCompilerPipelineJavaExample(System.out),
PartiQLCompilerPipelineAsyncJavaExample::class.java.simpleName to PartiQLCompilerPipelineAsyncJavaExample(
System.out
),

// Kotlin Examples
CsvExprValueExample::class.java.simpleName to CsvExprValueExample(System.out),
Expand All @@ -39,7 +41,7 @@ private val examples = mapOf(
PartialEvaluationVisitorTransformExample::class.java.simpleName to PartialEvaluationVisitorTransformExample(System.out),
PreventJoinVisitorExample::class.java.simpleName to PreventJoinVisitorExample(System.out),
SimpleExpressionEvaluation::class.java.simpleName to SimpleExpressionEvaluation(System.out),
PartiQLCompilerPipelineExample::class.java.simpleName to PartiQLCompilerPipelineExample(System.out)
PartiQLCompilerPipelineAsyncExample::class.java.simpleName to PartiQLCompilerPipelineAsyncExample(System.out)
)

fun main(args: Array<String>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package org.partiql.examples
import org.partiql.examples.util.Example
import java.io.PrintStream

class PartiQLCompilerPipelineExampleTest : BaseExampleTest() {
override fun example(out: PrintStream): Example = PartiQLCompilerPipelineExample(out)
class PartiQLCompilerPipelineAsyncExampleTest : BaseExampleTest() {
override fun example(out: PrintStream): Example = PartiQLCompilerPipelineAsyncExample(out)

override val expected = """
|PartiQL query:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package org.partiql.examples
import org.partiql.examples.util.Example
import java.io.PrintStream

class PartiQLCompilerPipelineJavaExampleTest : BaseExampleTest() {
override fun example(out: PrintStream): Example = PartiQLCompilerPipelineJavaExample(out)
class PartiQLCompilerPipelineAsyncJavaExampleTest : BaseExampleTest() {
override fun example(out: PrintStream): Example =
PartiQLCompilerPipelineAsyncJavaExample(out)

override val expected = """
|PartiQL query:
Expand Down
1 change: 1 addition & 0 deletions partiql-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies {
implementation(Deps.joda)
implementation(Deps.picoCli)
implementation(Deps.kotlinReflect)
implementation(Deps.kotlinxCoroutines)
testImplementation(Deps.mockito)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.ionelement.api.ionInt
import com.amazon.ionelement.api.ionString
import com.amazon.ionelement.api.ionStructOf
import com.amazon.ionelement.api.toIonValue
import kotlinx.coroutines.runBlocking
import org.partiql.annotations.ExperimentalPartiQLCompilerPipeline
import org.partiql.cli.Debug
import org.partiql.cli.functions.QueryDDB
Expand All @@ -29,8 +30,8 @@ import org.partiql.cli.functions.WriteFile_1
import org.partiql.cli.functions.WriteFile_2
import org.partiql.cli.utils.ServiceLoaderUtil
import org.partiql.lang.CompilerPipeline
import org.partiql.lang.compiler.PartiQLCompilerBuilder
import org.partiql.lang.compiler.PartiQLCompilerPipeline
import org.partiql.lang.compiler.PartiQLCompilerAsyncBuilder
import org.partiql.lang.compiler.PartiQLCompilerPipelineAsync
import org.partiql.lang.eval.CompileOptions
import org.partiql.lang.eval.EvaluationSession
import org.partiql.lang.eval.ExprFunction
Expand All @@ -50,7 +51,7 @@ import java.nio.file.Path
import java.time.ZoneOffset

/**
* A means by which we can run both the EvaluatingCompiler and PartiQLCompilerPipeline
* A means by which we can run both the EvaluatingCompiler and [PartiQLCompilerPipelineAsync].
*/
internal sealed class AbstractPipeline(open val options: PipelineOptions) {

Expand Down Expand Up @@ -163,7 +164,7 @@ internal sealed class AbstractPipeline(open val options: PipelineOptions) {
}

/**
* Wraps the PartiQLCompilerPipeline
* Wraps the [PartiQLCompilerPipelineAsync]
*/
@OptIn(ExperimentalPartiQLCompilerPipeline::class)
class PipelineExperimental(options: PipelineOptions) : AbstractPipeline(options) {
Expand All @@ -183,17 +184,19 @@ internal sealed class AbstractPipeline(open val options: PipelineOptions) {

override fun compile(input: String, session: EvaluationSession): PartiQLResult {
val globalVariableResolver = createGlobalVariableResolver(session)
val pipeline = PartiQLCompilerPipeline(
val pipeline = PartiQLCompilerPipelineAsync(
parser = options.parser,
planner = PartiQLPlannerBuilder.standard()
.options(plannerOptions)
.globalVariableResolver(globalVariableResolver)
.build(),
compiler = PartiQLCompilerBuilder.standard()
compiler = PartiQLCompilerAsyncBuilder.standard()
.options(evaluatorOptions)
.build(),
)
return pipeline.compile(input).eval(session)
return runBlocking {
pipeline.compile(input).eval(session)
}
}

private fun createGlobalVariableResolver(session: EvaluationSession) = GlobalVariableResolver {
Expand Down
Loading
Loading