diff --git a/CHANGELOG.md b/CHANGELOG.md index bdbe12f7a7..12c3bd83c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,24 +29,28 @@ Thank you to all who have contributed! ### Added - Added constrained decimal as valid parameter type to functions that take in numeric parameters. -- Added async version of physical plan evaluator `PartiQLCompilerAsync`. The following related async APIs have been added: - - `org.partiql.lang.compiler` -- `PartiQLCompilerAsync`, `PartiQLCompilerAsyncBuilder`, `PartiQLCompilerAsyncDefault`, `PartiQLCompilerPipelineAsync` - - `org.partiql.lang.eval` -- `PartiQLStatementAsync` - - `org.partiql.lang.eval.physical` -- `VariableBindingAsync` - - `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactoryAsync`, `CompiledGroupKeyAsync`, `CompiledAggregateFunctionAsync`, `FilterRelationalOperatorFactoryAsync`, `JoinRelationalOperatorFactoryAsync`, `LetRelationalOperatorFactoryAsync`, `LimitRelationalOperatorFactoryAsync`, `OffsetRelationalOperatorFactoryAsync`, `ProjectRelationalOperatorFactoryAsync`, `RelationExpressionAsync`, `ScanRelationalOperatorFactoryAsync`, `SortOperatorFactoryAsync`, `CompiledSortKeyAsync`, `UnpivotOperatorFactoryAsync`, `ValueExpressionAsync`, `WindowRelationalOperatorFactoryAsync`, `CompiledWindowFunctionAsync` - - `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunctionAsync`, `WindowFunctionAsync` +- Added async version of physical plan evaluator `PartiQLCompilerAsync`. + - The following related async APIs have been added: + - `org.partiql.lang.compiler` -- `PartiQLCompilerAsync`, `PartiQLCompilerAsyncBuilder`, `PartiQLCompilerAsyncDefault`, `PartiQLCompilerPipelineAsync` + - `org.partiql.lang.eval` -- `PartiQLStatementAsync` + - `org.partiql.lang.eval.physical` -- `VariableBindingAsync` + - `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactoryAsync`, `CompiledGroupKeyAsync`, `CompiledAggregateFunctionAsync`, `FilterRelationalOperatorFactoryAsync`, `JoinRelationalOperatorFactoryAsync`, `LetRelationalOperatorFactoryAsync`, `LimitRelationalOperatorFactoryAsync`, `OffsetRelationalOperatorFactoryAsync`, `ProjectRelationalOperatorFactoryAsync`, `RelationExpressionAsync`, `ScanRelationalOperatorFactoryAsync`, `SortOperatorFactoryAsync`, `CompiledSortKeyAsync`, `UnpivotOperatorFactoryAsync`, `ValueExpressionAsync`, `WindowRelationalOperatorFactoryAsync`, `CompiledWindowFunctionAsync` + - `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunctionAsync`, `WindowFunctionAsync` + - Overall, we see about a 10-20% performance decline in running a single query on the synchronous vs async evaluator + - JMH benchmarks added to partiql-lang: `PartiQLCompilerPipelineBenchmark` and `PartiQLCompilerPipelineAsyncBenchmark` ### Changed - Function resolution logic: Now the function resolver would match all possible candidate(based on if the argument can be coerced to the Signature parameter type). If there are multiple match it will first attempt to pick the one requires the least cast, then pick the function with the highest precedence. - partiql-cli -- experimental version of CLI now uses the async physical plan evaluator ### Deprecated -- As part of the additions to make an async physical plan evaluator, the synchronous physical plan evaluator `PartiQLCompiler` has been deprecated. The following related APIs have been deprecated - - `org.partiql.lang.compiler` -- `PartiQLCompiler`, `PartiQLCompilerBuilder`, `PartiQLCompilerDefault`, `PartiQLCompilerPipeline` - - `org.partiql.lang.eval` -- `PartiQLStatement` - - `org.partiql.lang.eval.physical` -- `VariableBinding` - - `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactory`, `CompiledGroupKey`, `CompiledAggregateFunction`, `FilterRelationalOperatorFactory`, `JoinRelationalOperatorFactory`, `LetRelationalOperatorFactory`, `LimitRelationalOperatorFactory`, `OffsetRelationalOperatorFactory`, `ProjectRelationalOperatorFactory`, `RelationExpression`, `ScanRelationalOperatorFactory`, `SortOperatorFactory`, `CompiledSortKey`, `UnpivotOperatorFactory`, `ValueExpression`, `WindowRelationalOperatorFactory`, `CompiledWindowFunction` - - `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunction`, `WindowFunction` +- As part of the additions to make an async physical plan evaluator, the synchronous physical plan evaluator `PartiQLCompiler` has been deprecated. + - The following related APIs have been deprecated + - `org.partiql.lang.compiler` -- `PartiQLCompiler`, `PartiQLCompilerBuilder`, `PartiQLCompilerDefault`, `PartiQLCompilerPipeline` + - `org.partiql.lang.eval` -- `PartiQLStatement` + - `org.partiql.lang.eval.physical` -- `VariableBinding` + - `org.partiql.lang.eval.physical.operators` -- `AggregateOperatorFactory`, `CompiledGroupKey`, `CompiledAggregateFunction`, `FilterRelationalOperatorFactory`, `JoinRelationalOperatorFactory`, `LetRelationalOperatorFactory`, `LimitRelationalOperatorFactory`, `OffsetRelationalOperatorFactory`, `ProjectRelationalOperatorFactory`, `RelationExpression`, `ScanRelationalOperatorFactory`, `SortOperatorFactory`, `CompiledSortKey`, `UnpivotOperatorFactory`, `ValueExpression`, `WindowRelationalOperatorFactory`, `CompiledWindowFunction` + - `org.partiql.lang.eval.physical.window` -- `NavigationWindowFunction`, `WindowFunction` ### Fixed diff --git a/examples/src/main/java/org/partiql/examples/PartiQLCompilerPipelineAsyncJavaExample.java b/examples/src/main/java/org/partiql/examples/PartiQLCompilerPipelineAsyncJavaExample.java index 9793067853..b267b4c4ea 100644 --- a/examples/src/main/java/org/partiql/examples/PartiQLCompilerPipelineAsyncJavaExample.java +++ b/examples/src/main/java/org/partiql/examples/PartiQLCompilerPipelineAsyncJavaExample.java @@ -35,7 +35,7 @@ /** * This is an example of using PartiQLCompilerPipelineAsync in Java. * It is an experimental feature and is marked as such, with @OptIn, in this example. - * Unfortunately, it seems like the Java does not recognize the Optin annotation specified in Kotlin. + * Unfortunately, it seems like the Java does not recognize the OptIn annotation specified in Kotlin. * Java users will be able to access the experimental APIs freely, and not be warned at all. */ public class PartiQLCompilerPipelineAsyncJavaExample extends Example { @@ -57,10 +57,7 @@ public void run() { "{name: \"mary\", age: 19}" + "]"; - final Bindings globalVariables = Bindings.lazyBindingsBuilder().addBinding("myTable", () -> { - ExprValue exprValue = ExprValue.of(ion.singleValue(myTable)); - return exprValue; - }).build(); + final Bindings globalVariables = Bindings.lazyBindingsBuilder().addBinding("myTable", () -> ExprValue.of(ion.singleValue(myTable))).build(); final EvaluationSession session = EvaluationSession.builder() .globals(globalVariables) @@ -97,6 +94,11 @@ public void run() { String query = "SELECT t.name FROM myTable AS t WHERE t.age > 20"; print("PartiQL query:", query); + + // Calling Kotlin coroutines from Java requires some additional libraries from `kotlinx.coroutines.future` + // to return a `java.util.concurrent.CompletableFuture`. If a use case arises to call the + // `PartiQLCompilerPipelineAsync` APIs directly from Java, we can add Kotlin functions that directly return + // Java's async libraries (e.g. in https://stackoverflow.com/a/52887677). CompletableFuture statementFuture = FutureKt.future( CoroutineScopeKt.CoroutineScope(EmptyCoroutineContext.INSTANCE), EmptyCoroutineContext.INSTANCE, diff --git a/partiql-lang/build.gradle.kts b/partiql-lang/build.gradle.kts index cb1f5417f2..d114530e9c 100644 --- a/partiql-lang/build.gradle.kts +++ b/partiql-lang/build.gradle.kts @@ -40,7 +40,6 @@ dependencies { implementation(Deps.csv) implementation(Deps.kotlinReflect) implementation(Deps.kotlinxCoroutines) - implementation(Deps.kotlinxCoroutinesJdk8) testImplementation(testFixtures(project(":partiql-planner"))) testImplementation(project(":plugins:partiql-memory")) diff --git a/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineAsyncBenchmark.kt b/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineAsyncBenchmark.kt index a2e0adea51..5e9b2f3c90 100644 --- a/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineAsyncBenchmark.kt +++ b/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineAsyncBenchmark.kt @@ -1,5 +1,6 @@ package org.partiql.jmh.benchmarks +import com.amazon.ion.IonSystem import com.amazon.ion.system.IonSystemBuilder import kotlinx.coroutines.runBlocking import org.openjdk.jmh.annotations.Benchmark @@ -41,10 +42,10 @@ open class PartiQLCompilerPipelineAsyncBenchmark { @State(Scope.Thread) @OptIn(ExperimentalPartiQLCompilerPipeline::class) open class MyState { - val parser = PartiQLParserBuilder.standard().build() - val myIonSystem = IonSystemBuilder.standard().build() + private val parser = PartiQLParserBuilder.standard().build() + private val myIonSystem: IonSystem = IonSystemBuilder.standard().build() - fun tableWithRows(numRows: Int): ExprValue { + private fun tableWithRows(numRows: Int): ExprValue { val allRows = (1..numRows).joinToString { index -> """ { @@ -62,7 +63,7 @@ open class PartiQLCompilerPipelineAsyncBenchmark { ) } - val bindings = Bindings.ofMap( + private val bindings = Bindings.ofMap( mapOf( "t1" to tableWithRows(1), "t10" to tableWithRows(10), @@ -73,7 +74,7 @@ open class PartiQLCompilerPipelineAsyncBenchmark { ) ) - val parameters = listOf( + private val parameters = listOf( ExprValue.newInt(5), // WHERE `id` > 5 ExprValue.newInt(1000000), // LIMIT 1000000 ExprValue.newInt(3), // OFFSET 3 * 2 diff --git a/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineBenchmark.kt b/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineBenchmark.kt index 6f088e22a1..4647524908 100644 --- a/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineBenchmark.kt +++ b/partiql-lang/src/jmh/kotlin/org/partiql/jmh/benchmarks/PartiQLCompilerPipelineBenchmark.kt @@ -1,5 +1,6 @@ package org.partiql.jmh.benchmarks +import com.amazon.ion.IonSystem import com.amazon.ion.system.IonSystemBuilder import kotlinx.coroutines.runBlocking import org.openjdk.jmh.annotations.Benchmark @@ -42,10 +43,10 @@ open class PartiQLCompilerPipelineBenchmark { @State(Scope.Thread) @OptIn(ExperimentalPartiQLCompilerPipeline::class) open class MyState { - val parser = PartiQLParserBuilder.standard().build() - val myIonSystem = IonSystemBuilder.standard().build() + private val parser = PartiQLParserBuilder.standard().build() + private val myIonSystem: IonSystem = IonSystemBuilder.standard().build() - fun tableWithRows(numRows: Int): ExprValue { + private fun tableWithRows(numRows: Int): ExprValue { val allRows = (1..numRows).joinToString { index -> """ { @@ -63,7 +64,7 @@ open class PartiQLCompilerPipelineBenchmark { ) } - val bindings = Bindings.ofMap( + private val bindings = Bindings.ofMap( mapOf( "t1" to tableWithRows(1), "t10" to tableWithRows(10), @@ -74,7 +75,7 @@ open class PartiQLCompilerPipelineBenchmark { ) ) - val parameters = listOf( + private val parameters = listOf( ExprValue.newInt(5), // WHERE `id` > 5 ExprValue.newInt(1000000), // LIMIT 1000000 ExprValue.newInt(3), // OFFSET 3 * 2 @@ -133,7 +134,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query6 = parser.parseAstStatement( + private val query6 = parser.parseAstStatement( """ SELECT * FROM t100000 @@ -143,7 +144,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query7 = parser.parseAstStatement( + private val query7 = parser.parseAstStatement( """ SELECT * FROM t10000 @@ -153,7 +154,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query8 = parser.parseAstStatement( + private val query8 = parser.parseAstStatement( """ SELECT * FROM t1000 @@ -163,7 +164,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query9 = parser.parseAstStatement( + private val query9 = parser.parseAstStatement( """ SELECT * FROM t100 @@ -173,7 +174,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query10 = parser.parseAstStatement( + private val query10 = parser.parseAstStatement( """ SELECT * FROM t10 @@ -183,7 +184,7 @@ open class PartiQLCompilerPipelineBenchmark { OFFSET ? * ? """.trimIndent() ) - val query11 = parser.parseAstStatement( + private val query11 = parser.parseAstStatement( """ SELECT * FROM t1 diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncBuilder.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncBuilder.kt index 58318037b5..321bbf0949 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncBuilder.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncBuilder.kt @@ -137,8 +137,7 @@ class PartiQLCompilerAsyncBuilder private constructor() { private fun allFunctions(typingMode: TypingMode): List { val definitionalBuiltins = definitionalBuiltins(typingMode) val builtins = SCALAR_BUILTINS_DEFAULT - val allFunctions = definitionalBuiltins + builtins + customFunctions + DynamicLookupExprFunction() - return allFunctions + return definitionalBuiltins + builtins + customFunctions + DynamicLookupExprFunction() } private fun allOperatorFactories() = (DEFAULT_RELATIONAL_OPERATOR_FACTORIES + customOperatorFactories).apply { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncDefault.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncDefault.kt index d6457d10c6..0b98276430 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncDefault.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/compiler/PartiQLCompilerAsyncDefault.kt @@ -36,11 +36,11 @@ import org.partiql.lang.types.TypedOpParameter @ExperimentalPartiQLCompilerPipeline internal class PartiQLCompilerAsyncDefault( - private val evaluatorOptions: EvaluatorOptions, - private val customTypedOpParameters: Map, - private val functions: List, - private val procedures: Map, - private val operatorFactories: Map + evaluatorOptions: EvaluatorOptions, + customTypedOpParameters: Map, + functions: List, + procedures: Map, + operatorFactories: Map ) : PartiQLCompilerAsync { private lateinit var exprConverter: PhysicalPlanCompilerAsyncImpl @@ -113,7 +113,7 @@ internal class PartiQLCompilerAsyncDefault( private fun compileExplainDomain(statement: PartiqlPhysical.ExplainTarget.Domain, details: PartiQLPlanner.PlanningDetails): PartiQLResult.Explain.Domain { val format = statement.format?.text - val type = statement.type?.text?.toUpperCase() ?: ExplainDomains.AST.name + val type = statement.type?.text?.uppercase() ?: ExplainDomains.AST.name val domain = try { ExplainDomains.valueOf(type) } catch (ex: IllegalArgumentException) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/PartiQLStatementAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/PartiQLStatementAsync.kt index 27b604ae34..af3fbb6b0d 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/PartiQLStatementAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/PartiQLStatementAsync.kt @@ -15,7 +15,7 @@ package org.partiql.lang.eval /** - * A compiled PartiQL statement + * A compiled PartiQL statement intended to be evaluated from a Kotlin coroutine. */ fun interface PartiQLStatementAsync { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/Thunk.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/Thunk.kt index 903b86d19f..3b489a7d5d 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/Thunk.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/Thunk.kt @@ -107,6 +107,25 @@ data class ThunkOptions private constructor( } } +internal val DEFAULT_EXCEPTION_HANDLER_FOR_LEGACY_MODE: ThunkExceptionHandlerForLegacyMode = { e, sourceLocation -> + val message = e.message ?: "" + throw EvaluationException( + "Internal error, $message", + errorCode = (e as? EvaluationException)?.errorCode ?: ErrorCode.EVALUATOR_GENERIC_EXCEPTION, + errorContext = errorContextFrom(sourceLocation), + cause = e, + internal = true + ) +} + +internal val DEFAULT_EXCEPTION_HANDLER_FOR_PERMISSIVE_MODE: ThunkExceptionHandlerForPermissiveMode = { e, _ -> + when (e) { + is InterruptedException -> { throw e } + is StackOverflowError -> { throw e } + else -> {} + } +} + /** * An extension method for creating [ThunkFactory] based on the type of [TypingMode] * - when [TypingMode] is [TypingMode.LEGACY], creates [LegacyThunkFactory] diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/ThunkAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/ThunkAsync.kt index 838a2e3543..c314eb68aa 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/ThunkAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/ThunkAsync.kt @@ -46,46 +46,6 @@ internal typealias ThunkAsync = suspend (TEnv) -> ExprValue */ internal typealias ThunkValueAsync = suspend (TEnv, TArg) -> ExprValue -/** - * A type alias for an exception handler which always throws(primarily used for [TypingMode.LEGACY]). - */ -internal typealias ThunkExceptionHandlerForLegacyModeAsync = (Throwable, SourceLocationMeta?) -> Nothing - -/** - * A type alias for an exception handler which does not always throw(primarily used for [TypingMode.PERMISSIVE]). - */ -internal typealias ThunkExceptionHandlerForPermissiveModeAsync = (Throwable, SourceLocationMeta?) -> Unit - -/** - * Options for thunk construction. - * - * - [ThunkOptions.handleExceptionForLegacyMode] will be called when in [TypingMode.LEGACY] mode - * - [ThunkOptions.handleExceptionForPermissiveMode] will be called when in [TypingMode.PERMISSIVE] mode - * - [ThunkOptions.thunkReturnTypeAssertions] is intended for testing only, and ensures that the return value of every expression - * conforms to its `StaticType` meta. This has negative performance implications so should be avoided in production - * environments. This only be used for testing and diagnostic purposes only. - * The default exception handler wraps any [Throwable] exception and throws [EvaluationException] - */ - -internal val DEFAULT_EXCEPTION_HANDLER_FOR_LEGACY_MODE: ThunkExceptionHandlerForLegacyModeAsync = { e, sourceLocation -> - val message = e.message ?: "" - throw EvaluationException( - "Internal error, $message", - errorCode = (e as? EvaluationException)?.errorCode ?: ErrorCode.EVALUATOR_GENERIC_EXCEPTION, - errorContext = errorContextFrom(sourceLocation), - cause = e, - internal = true - ) -} - -internal val DEFAULT_EXCEPTION_HANDLER_FOR_PERMISSIVE_MODE: ThunkExceptionHandlerForPermissiveModeAsync = { e, _ -> - when (e) { - is InterruptedException -> { throw e } - is StackOverflowError -> { throw e } - else -> {} - } -} - /** * An extension method for creating [ThunkFactoryAsync] based on the type of [TypingMode] * - when [TypingMode] is [TypingMode.LEGACY], creates [LegacyThunkFactoryAsync] @@ -348,7 +308,7 @@ internal abstract class ThunkFactoryAsync( } /** - * Handles exceptions appropriately for a run-time [Thunk]. + * Handles exceptions appropriately for a run-time [ThunkAsync]. * * - The [SourceLocationMeta] will be extracted from [MetaContainer] and included in any [EvaluationException] that * is thrown, if present. @@ -479,7 +439,7 @@ internal class LegacyThunkFactoryAsync( } /** - * Handles exceptions appropriately for a run-time [Thunk] respecting [TypingMode.LEGACY] behaviour. + * Handles exceptions appropriately for a run-time [ThunkAsync] respecting [TypingMode.LEGACY] behaviour. * * - The [SourceLocationMeta] will be extracted from [MetaContainer] and included in any [EvaluationException] that * is thrown, if present. @@ -630,7 +590,7 @@ internal class PermissiveThunkFactoryAsync( /** * Handles exceptions appropriately for a run-time [Thunk] respecting [TypingMode.PERMISSIVE] behaviour. * - * - Exceptions thrown by [block] that are [EvaluationException] are caught and [MissingExprValue] is returned. + * - Exceptions thrown by [block] that are [EvaluationException] are caught and [ExprValue.missingValue] is returned. * - Exceptions thrown by [block] that are not an [EvaluationException] cause an [EvaluationException] to be thrown * with the original exception as the cause. */ diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalBexprToThunkConverterAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalBexprToThunkConverterAsync.kt index c60f5df652..5dada878e5 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalBexprToThunkConverterAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalBexprToThunkConverterAsync.kt @@ -12,7 +12,6 @@ import org.partiql.lang.eval.ThunkValueAsync import org.partiql.lang.eval.physical.operators.AggregateOperatorFactoryAsync import org.partiql.lang.eval.physical.operators.CompiledAggregateFunctionAsync import org.partiql.lang.eval.physical.operators.CompiledGroupKeyAsync -import org.partiql.lang.eval.physical.operators.CompiledSortKey import org.partiql.lang.eval.physical.operators.CompiledSortKeyAsync import org.partiql.lang.eval.physical.operators.CompiledWindowFunctionAsync import org.partiql.lang.eval.physical.operators.FilterRelationalOperatorFactoryAsync @@ -33,7 +32,9 @@ import org.partiql.lang.eval.physical.operators.valueExpressionAsync import org.partiql.lang.eval.physical.window.createBuiltinWindowFunctionAsync import org.partiql.lang.util.toIntExact -/** Converts instances of [PartiqlPhysical.Bexpr] to any [T]. */ +/** Converts instances of [PartiqlPhysical.Bexpr] to any [T]. A `suspend` version of the physical plan converter + * interface is added since PIG currently does not output async functions. + */ internal interface Converter { suspend fun convert(node: PartiqlPhysical.Bexpr): T = when (node) { is PartiqlPhysical.Bexpr.Project -> convertProject(node) @@ -62,10 +63,10 @@ internal interface Converter { suspend fun convertWindow(node: PartiqlPhysical.Bexpr.Window): T } -/** A specialization of [Thunk] that we use for evaluation of physical plans. */ +/** A specialization of [ThunkAsync] that we use for evaluation of physical plans. */ internal typealias PhysicalPlanThunkAsync = ThunkAsync -/** A specialization of [ThunkValue] that we use for evaluation of physical plans. */ +/** A specialization of [ThunkValueAsync] that we use for evaluation of physical plans. */ internal typealias PhysicalPlanThunkValueAsync = ThunkValueAsync internal class PhysicalBexprToThunkConverterAsync( @@ -77,7 +78,7 @@ internal class PhysicalBexprToThunkConverterAsync( valueExpressionAsync(sourceLocationMeta) { state -> this(state) } private suspend fun RelationExpressionAsync.toRelationThunk(metas: MetaContainer) = - relationThunkAsync(metas) { state -> this.evaluateAsync(state) } + relationThunkAsync(metas) { state -> this.evaluate(state) } private inline fun findOperatorFactory( operator: RelationalOperatorKind, @@ -187,7 +188,7 @@ internal class PhysicalBexprToThunkConverterAsync( override suspend fun convertJoin(node: PartiqlPhysical.Bexpr.Join): RelationThunkEnvAsync { // recurse into children val leftBindingsExpr = this.convert(node.left) - val rightBindingdExpr = this.convert(node.right) + val rightBindingsExpr = this.convert(node.right) val predicateValueExpr = node.predicate?.let { predicate -> exprConverter.convert(predicate) .takeIf { !predicate.isLitTrue() } @@ -214,7 +215,7 @@ internal class PhysicalBexprToThunkConverterAsync( impl = node.i, joinType = node.joinType, leftBexpr = { state -> leftBindingsExpr(state) }, - rightBexpr = { state -> rightBindingdExpr(state) }, + rightBexpr = { state -> rightBindingsExpr(state) }, predicateExpr = predicateValueExpr, setLeftSideVariablesToNull = setLeftSideVariablesToNull, setRightSideVariablesToNull = setRightSideVariablesToNull @@ -278,7 +279,7 @@ internal class PhysicalBexprToThunkConverterAsync( // Get Implementation val factory = findOperatorFactory(RelationalOperatorKind.SORT, node.i.name.text) - val bindingsExpr = factory.create(sortKeys, { state -> source(state) }) + val bindingsExpr = factory.create(sortKeys) { state -> source(state) } return bindingsExpr.toRelationThunk(node.metas) } @@ -302,7 +303,7 @@ internal class PhysicalBexprToThunkConverterAsync( } /** - * Returns a list of [CompiledSortKey] with the aim of pre-computing the [NaturalExprValueComparators] prior to + * Returns a list of [CompiledSortKeyAsync] with the aim of pre-computing the [NaturalExprValueComparators] prior to * evaluation and leaving the [PartiqlPhysical.SortSpec]'s [PartiqlPhysical.Expr] to be evaluated later. */ private suspend fun compileSortSpecsAsync(specs: List): List = specs.map { spec -> diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerAsyncImpl.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerAsyncImpl.kt index 7174cf44f8..7499712382 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerAsyncImpl.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/PhysicalPlanCompilerAsyncImpl.kt @@ -855,7 +855,7 @@ internal class PhysicalPlanCompilerAsyncImpl( try { val func = functionManager.get(name = name, arity = arity, args = argTypes) val computeThunk = when (func.signature.unknownArguments) { - UnknownArguments.PROPAGATE -> thunkFactory.thunkEnvOperands(metas, funcArgThunks) { env, values -> + UnknownArguments.PROPAGATE -> thunkFactory.thunkEnvOperands(metas, funcArgThunks) { env, _ -> func.call(env.session, args) } UnknownArguments.PASS_THRU -> thunkFactory.thunkEnvAsync(metas) { env -> @@ -1679,7 +1679,7 @@ internal class PhysicalPlanCompilerAsyncImpl( escape?.let { val escapeCharString = checkEscapeChar(escape, escapeLocationMeta) val escapeCharCodePoint = escapeCharString.codePointAt(0) // escape is a string of length 1 - val validEscapedChars = setOf('_'.toInt(), '%'.toInt(), escapeCharCodePoint) + val validEscapedChars = setOf('_'.code, '%'.code, escapeCharCodePoint) val iter = pattern.codePointSequence().iterator() while (iter.hasNext()) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/RelationThunk.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/RelationThunk.kt index 2db74f5a38..bafddd6ef1 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/RelationThunk.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/RelationThunk.kt @@ -11,6 +11,7 @@ import org.partiql.lang.eval.fillErrorContext import org.partiql.lang.eval.relation.RelationIterator /** A thunk that returns a [RelationIterator], which is the result of evaluating a relational operator. */ +@Deprecated("To be removed in the next major version.", replaceWith = ReplaceWith("RelationThunkEnvAsync")) internal typealias RelationThunkEnv = (EvaluatorState) -> RelationIterator /** @@ -19,6 +20,7 @@ internal typealias RelationThunkEnv = (EvaluatorState) -> RelationIterator * This function is not currently in [ThunkFactory] to avoid complicating it further. If a need arises, it could be * moved. */ +@Deprecated("To be removed in the next major version.", replaceWith = ReplaceWith("relationThunkAsync")) internal inline fun relationThunk(metas: MetaContainer, crossinline t: RelationThunkEnv): RelationThunkEnv { val sourceLocationMeta = metas[SourceLocationMeta.TAG] as? SourceLocationMeta return { env: EvaluatorState -> diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/AggregateOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/AggregateOperatorFactoryAsync.kt index 8379ce4fa0..8f4cb2948a 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/AggregateOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/AggregateOperatorFactoryAsync.kt @@ -70,10 +70,10 @@ internal class AggregateOperatorDefaultAsync( val keys: List, val functions: List ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator = relation(RelationType.BAG) { + override suspend fun evaluate(state: EvaluatorState): RelationIterator = relation(RelationType.BAG) { val aggregationMap = TreeMap>(DEFAULT_COMPARATOR) - val sourceIter = source.evaluateAsync(state) + val sourceIter = source.evaluate(state) while (sourceIter.nextRow()) { // Initialize the AggregationMap diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/FilterRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/FilterRelationalOperatorFactoryAsync.kt index 8ec95508fd..4225db6f76 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/FilterRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/FilterRelationalOperatorFactoryAsync.kt @@ -51,8 +51,8 @@ internal class SelectOperatorDefaultAsync( val predicate: ValueExpressionAsync, ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { - val input = input.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState): RelationIterator { + val input = input.evaluate(state) return relation(RelationType.BAG) { while (true) { if (!input.nextRow()) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/JoinRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/JoinRelationalOperatorFactoryAsync.kt index 583650a2da..a476e9b91d 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/JoinRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/JoinRelationalOperatorFactoryAsync.kt @@ -93,10 +93,10 @@ private class InnerJoinOperatorAsync( private val condition: suspend (EvaluatorState) -> Boolean ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState) = relation(RelationType.BAG) { - val leftItr = lhs.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState) = relation(RelationType.BAG) { + val leftItr = lhs.evaluate(state) while (leftItr.nextRow()) { - val rightItr = rhs.evaluateAsync(state) + val rightItr = rhs.evaluate(state) while (rightItr.nextRow()) { if (condition(state)) { yield() @@ -116,10 +116,10 @@ private class LeftJoinOperatorAsync( private val setRightSideVariablesToNull: (EvaluatorState) -> Unit ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState) = relation(RelationType.BAG) { - val leftItr = lhs.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState) = relation(RelationType.BAG) { + val leftItr = lhs.evaluate(state) while (leftItr.nextRow()) { - val rightItr = rhs.evaluateAsync(state) + val rightItr = rhs.evaluate(state) var yieldedSomething = false while (rightItr.nextRow()) { if (condition(state)) { @@ -145,10 +145,10 @@ private class RightJoinOperatorAsync( private val setLeftSideVariablesToNull: (EvaluatorState) -> Unit ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState) = relation(RelationType.BAG) { - val rightItr = rhs.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState) = relation(RelationType.BAG) { + val rightItr = rhs.evaluate(state) while (rightItr.nextRow()) { - val leftItr = lhs.evaluateAsync(state) + val leftItr = lhs.evaluate(state) var yieldedSomething = false while (leftItr.nextRow()) { if (condition(state)) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LetRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LetRelationalOperatorFactoryAsync.kt index 868e243872..21ff9365c5 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LetRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LetRelationalOperatorFactoryAsync.kt @@ -2,7 +2,6 @@ package org.partiql.lang.eval.physical.operators import org.partiql.lang.domains.PartiqlPhysical import org.partiql.lang.eval.physical.EvaluatorState -import org.partiql.lang.eval.physical.VariableBinding import org.partiql.lang.eval.physical.VariableBindingAsync import org.partiql.lang.eval.relation.RelationIterator import org.partiql.lang.eval.relation.relation @@ -24,7 +23,7 @@ abstract class LetRelationalOperatorFactoryAsync(name: String) : RelationalOpera * * @param impl * @param sourceBexpr - * @param bindings list of [VariableBinding]s in the `LET` clause + * @param bindings list of [VariableBindingAsync]s in the `LET` clause * @return */ abstract fun create( @@ -51,8 +50,8 @@ internal class LetOperatorAsync( private val bindings: List ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { - val rows = input.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState): RelationIterator { + val rows = input.evaluate(state) return relation(rows.relType) { while (rows.nextRow()) { bindings.forEach { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactory.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactory.kt index 3d294b9d78..ea5d9f5857 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactory.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactory.kt @@ -32,6 +32,7 @@ abstract class LimitRelationalOperatorFactory(name: String) : RelationalOperator * @param sourceBexpr * @return */ + @Deprecated("To be removed in the next major version.", replaceWith = ReplaceWith("LimitRelationalOperatorFactoryAsync.create")) abstract fun create( impl: PartiqlPhysical.Impl, rowCountExpr: ValueExpression, diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactoryAsync.kt index ca3e647327..9442c0fbf1 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/LimitRelationalOperatorFactoryAsync.kt @@ -55,9 +55,9 @@ internal class LimitOperatorAsync( private val limit: ValueExpressionAsync, ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { + override suspend fun evaluate(state: EvaluatorState): RelationIterator { val limit = evalLimitRowCount(limit, state) - val rows = input.evaluateAsync(state) + val rows = input.evaluate(state) return relation(rows.relType) { var rowCount = 0L while (rowCount++ < limit && rows.nextRow()) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/OffsetRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/OffsetRelationalOperatorFactoryAsync.kt index 98bcb19cfd..3df809d793 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/OffsetRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/OffsetRelationalOperatorFactoryAsync.kt @@ -55,9 +55,9 @@ internal class OffsetOperatorAsync( private val offset: ValueExpressionAsync, ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { + override suspend fun evaluate(state: EvaluatorState): RelationIterator { val skipCount: Long = evalOffsetRowCount(offset, state) - val rows = input.evaluateAsync(state) + val rows = input.evaluate(state) return relation(rows.relType) { var rowCount = 0L while (rowCount++ < skipCount) { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/RelationExpressionAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/RelationExpressionAsync.kt index 2b9afcb7f4..61f6f4a795 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/RelationExpressionAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/RelationExpressionAsync.kt @@ -15,5 +15,5 @@ import org.partiql.lang.eval.relation.RelationIterator * implementation details such as [org.partiql.lang.eval.physical.RelationThunkEnv]. */ fun interface RelationExpressionAsync { - suspend fun evaluateAsync(state: EvaluatorState): RelationIterator + suspend fun evaluate(state: EvaluatorState): RelationIterator } diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/ScanRelationalOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/ScanRelationalOperatorFactoryAsync.kt index 0e0f21c4bd..54b7531c63 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/ScanRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/ScanRelationalOperatorFactoryAsync.kt @@ -60,7 +60,7 @@ internal class ScanOperatorAsync( private val setByVar: SetVariableFunc? ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { + override suspend fun evaluate(state: EvaluatorState): RelationIterator { val value = expr(state) val sequence: Sequence = when (value.type) { ExprValueType.LIST, diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryAsync.kt index f35a645205..4447939475 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryAsync.kt @@ -3,7 +3,7 @@ package org.partiql.lang.eval.physical.operators import org.partiql.lang.domains.PartiqlPhysical import org.partiql.lang.eval.NaturalExprValueComparators -/** Provides an implementation of the [PartiqlPhysical.Bexpr.Order] operator.*/ +/** Provides an implementation of the [PartiqlPhysical.Bexpr.Sort] operator.*/ public abstract class SortOperatorFactoryAsync(name: String) : RelationalOperatorFactory { public final override val key: RelationalOperatorFactoryKey = RelationalOperatorFactoryKey(RelationalOperatorKind.SORT, name) public abstract fun create( diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryDefaultAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryDefaultAsync.kt index 2b26b2ef73..66b17bafc3 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryDefaultAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/SortOperatorFactoryDefaultAsync.kt @@ -30,8 +30,8 @@ internal object SortOperatorFactoryDefaultAsync : SortOperatorFactoryAsync(DEFAU } internal class SortOperatorDefaultAsync(private val sortKeys: List, private val sourceRelation: RelationExpressionAsync) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { - val source = sourceRelation.evaluateAsync(state) + override suspend fun evaluate(state: EvaluatorState): RelationIterator { + val source = sourceRelation.evaluate(state) return relation(RelationType.LIST) { val rows = mutableListOf>() @@ -63,7 +63,7 @@ internal class SortOperatorDefaultAsync(private val sortKeys: List): Comparator, List>> { return object : Comparator, List>> { diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/UnpivotOperatorFactoryDefaultAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/UnpivotOperatorFactoryDefaultAsync.kt index f7e2ac1090..6d768707a9 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/UnpivotOperatorFactoryDefaultAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/UnpivotOperatorFactoryDefaultAsync.kt @@ -43,7 +43,7 @@ internal class UnpivotOperatorDefaultAsync( private val setAtVar: SetVariableFunc?, private val setByVar: SetVariableFunc? ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { + override suspend fun evaluate(state: EvaluatorState): RelationIterator { val originalValue = expr(state) val unpivot = originalValue.unpivot() diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/WindowRelationalOperatorFactoryDefaultAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/WindowRelationalOperatorFactoryDefaultAsync.kt index 5e297b7720..034fcbd18e 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/WindowRelationalOperatorFactoryDefaultAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/operators/WindowRelationalOperatorFactoryDefaultAsync.kt @@ -34,9 +34,9 @@ internal class WindowOperatorDefaultAsync( private val windowSortSpecList: List, private val compiledWindowFunctions: List ) : RelationExpressionAsync { - override suspend fun evaluateAsync(state: EvaluatorState): RelationIterator { + override suspend fun evaluate(state: EvaluatorState): RelationIterator { // the following corresponding to materialization process - val sourceIter = source.evaluateAsync(state) + val sourceIter = source.evaluate(state) val registers = sequence { while (sourceIter.nextRow()) { yield(state.registers.clone()) @@ -59,7 +59,7 @@ internal class WindowOperatorDefaultAsync( val sortedRegisters = newRegisters.sortedWith(getSortingComparator(sortKeys.map { it.comparator })).map { it.first } // create the partition here - var partition = mutableListOf>>() + val partition = mutableListOf>>() // entire partition if (windowPartitionList.isEmpty()) { @@ -68,7 +68,7 @@ internal class WindowOperatorDefaultAsync( // need to be partitioned else { val iter = sortedRegisters.iterator() - var rowInPartition = mutableListOf>() + val rowInPartition = mutableListOf>() var previousPartition: ExprValue? = null while (iter.hasNext()) { val currentRow = iter.next() diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LagAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LagAsync.kt index 3af2fdb848..f31fd1acc4 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LagAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LagAsync.kt @@ -34,12 +34,12 @@ internal class LagAsync : NavigationWindowFunctionAsync() { val defaultValue = default?.invoke(state) ?: ExprValue.nullValue val targetIndex = currentPos - offsetValue - if (targetIndex >= 0 && targetIndex <= currentPartition.lastIndex) { + return if (targetIndex >= 0 && targetIndex <= currentPartition.lastIndex) { val targetRow = currentPartition[targetIndex.toInt()] state.load(targetRow) - return target!!.invoke(state) + target!!.invoke(state) } else { - return defaultValue + defaultValue } } } diff --git a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LeadAsync.kt b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LeadAsync.kt index c4a87a65c4..6d8464171b 100644 --- a/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LeadAsync.kt +++ b/partiql-lang/src/main/kotlin/org/partiql/lang/eval/physical/window/LeadAsync.kt @@ -35,12 +35,12 @@ internal class LeadAsync : NavigationWindowFunctionAsync() { val defaultValue = default?.invoke(state) ?: ExprValue.nullValue val targetIndex = currentPos + offsetValue - if (targetIndex <= currentPartition.lastIndex) { + return if (targetIndex <= currentPartition.lastIndex) { val targetRow = currentPartition[targetIndex.toInt()] state.load(targetRow) - return target!!.invoke(state) + target!!.invoke(state) } else { - return defaultValue + defaultValue } } } diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/PartiQLCompilerPipelineExplainTests.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/PartiQLCompilerPipelineExplainTests.kt index c42e0b7fbe..0b93120bf2 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/PartiQLCompilerPipelineExplainTests.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/PartiQLCompilerPipelineExplainTests.kt @@ -34,7 +34,7 @@ import org.partiql.lang.util.ArgumentsProviderBase class PartiQLCompilerPipelineExplainTests { val compiler = PartiQLCompilerPipeline.standard() - val compilerAsync = PartiQLCompilerPipelineAsync.standard() + private val compilerAsync = PartiQLCompilerPipelineAsync.standard() data class ExplainTestCase( val description: String? = null, diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/async/AsyncOperatorTests.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/async/AsyncOperatorTests.kt index 2db36dc5d2..0b145bc0af 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/async/AsyncOperatorTests.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/async/AsyncOperatorTests.kt @@ -5,6 +5,7 @@ import com.amazon.ionelement.api.ionString import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import org.partiql.annotations.ExperimentalPartiQLCompilerPipeline @@ -12,6 +13,7 @@ import org.partiql.lang.compiler.PartiQLCompilerPipelineAsync import org.partiql.lang.domains.PartiqlPhysical import org.partiql.lang.eval.EvaluationSession import org.partiql.lang.eval.PartiQLResult +import org.partiql.lang.eval.PartiQLStatementAsync import org.partiql.lang.eval.booleanValue import org.partiql.lang.eval.isNotUnknown import org.partiql.lang.eval.physical.operators.FilterRelationalOperatorFactoryAsync @@ -26,6 +28,16 @@ import org.partiql.lang.planner.transforms.PLAN_VERSION_NUMBER private const val FAKE_IMPL_NAME = "test_async_fake" private val FAKE_IMPL_NODE = PartiqlPhysical.build { impl(FAKE_IMPL_NAME) } +/** + * Test is included to demonstrate the previous behavior for a relational operator expression that calls an async + * functions. Previously, in the synchronous evaluator, making an async function call would require wrapping the call + * in [runBlocking], which blocks the current thread of execution. This results in the 10 evaluation calls to be + * executed one after the other, waiting for the previous call to finish. + * + * Since the [PartiQLStatementAsync] evaluation is now async, the [runBlocking] around the async function is no longer + * required. Thus, the result is the 10 evaluation calls can be executed without waiting for the previous call to + * finish. + */ @OptIn(ExperimentalPartiQLCompilerPipeline::class) class AsyncOperatorTests { private val fakeOperatorFactories = listOf( @@ -35,12 +47,13 @@ class AsyncOperatorTests { predicate: ValueExpressionAsync, sourceBexpr: RelationExpressionAsync ): RelationExpressionAsync = RelationExpressionAsync { state -> - // If `RelationExpression`'s `evaluate` was NOT a `suspend fun`, then `runBlocking` would be required + // If `RelationExpressionAsync`'s `evaluate` was NOT a `suspend fun`, then `runBlocking` would be + // required // runBlocking { println("Calling") someAsyncOp() // } - val input = sourceBexpr.evaluateAsync(state) + val input = sourceBexpr.evaluate(state) relation(RelationType.BAG) { while (true) { @@ -94,6 +107,7 @@ class AsyncOperatorTests { ) } val statement = pipeline.compile(plan) + // asynchronously evaluate 10 statements and print out the results repeat(10) { index -> launch { print("\nCompiling $index. ") diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/QueryEngine.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/QueryEngine.kt index d88dfda5c5..5b69067ab8 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/QueryEngine.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/QueryEngine.kt @@ -39,7 +39,7 @@ internal const val DB_CONTEXT_VAR = "in-memory-database" */ @OptIn(ExperimentalPartiQLCompilerPipeline::class) class QueryEngine(val db: MemoryDatabase) { - var enableDebugOutput = false + private var enableDebugOutput = false /** Given a [BindingName], inform the planner the unique identifier of the global variable (usually a table). */ private val globalVariableResolver = GlobalVariableResolver { bindingName -> @@ -63,7 +63,7 @@ class QueryEngine(val db: MemoryDatabase) { // TODO: nothing in the planner uses the contentClosed property yet, but "technically" do have open // content since nothing is constraining the fields in the table. contentClosed = false, - // The FilterScanTokeyLookup pass does use this. + // The FilterScanToKeyLookup pass does use this. primaryKeyFields = tableMetadata.primaryKeyFields ) ) diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/operators/GetByKeyProjectRelationalOperatorFactoryAsync.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/operators/GetByKeyProjectRelationalOperatorFactoryAsync.kt index e93612e870..323ec13b6c 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/operators/GetByKeyProjectRelationalOperatorFactoryAsync.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/memorydb/operators/GetByKeyProjectRelationalOperatorFactoryAsync.kt @@ -54,7 +54,7 @@ class GetByKeyProjectRelationalOperatorFactoryAsync : ProjectRelationalOperatorF // Extract the key value constructor val keyValueExpressionAsync = args.single() - // Parse the tableId so we don't have to at evaluation-time + // Parse the tableId, so we don't have to at evaluation-time val tableId = UUID.fromString(impl.staticArgs.single().textValue) var exhausted = false diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/operators/CustomOperatorFactoryTests.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/operators/CustomOperatorFactoryTests.kt index 1ef10728f6..c1dcfee4be 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/operators/CustomOperatorFactoryTests.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/compiler/operators/CustomOperatorFactoryTests.kt @@ -1,6 +1,7 @@ package org.partiql.lang.compiler.operators import com.amazon.ionelement.api.ionBool +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.assertThrows @@ -186,6 +187,7 @@ class CustomOperatorFactoryTests { assertEquals(tc.expectedThrownFromOperator, ex.thrownFromOperator) } + @OptIn(ExperimentalCoroutinesApi::class) @ParameterizedTest @ArgumentsSource(CustomOperatorCases::class) fun `make sure custom async operator implementations are called`(tc: CustomOperatorCases.TestCase) = runTest { @@ -207,7 +209,7 @@ class CustomOperatorFactoryTests { class TestCase(val expectedThrownFromOperator: RelationalOperatorKind, val plan: PartiqlPhysical.Plan) override fun getParameters() = listOf( // The key parts of the cases below are the setting of FAKE_IMPL_NODE which causes the custom operator - // factories to be called. The rest is the minimum gibberish needed to make complete PartiqlPhsyical.Bexpr + // factories to be called. The rest is the minimum gibberish needed to make complete PartiqlPhysical.Bexpr // nodes. There must only be one FAKE_IMPL_NODE per plan otherwise the CreateFunctionWasCalledException // might be called for an operator other than the one intended. createTestCase(RelationalOperatorKind.PROJECT) { project(FAKE_IMPL_NODE, varDecl(0)) }, diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/eval/builtins/functions/DynamicLookupExprFunctionTest.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/eval/builtins/functions/DynamicLookupExprFunctionTest.kt index daf79a475b..e8b1e5c2af 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/eval/builtins/functions/DynamicLookupExprFunctionTest.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/eval/builtins/functions/DynamicLookupExprFunctionTest.kt @@ -111,7 +111,7 @@ class DynamicLookupExprFunctionTest : EvaluatorTestBase() { class MismatchCaseSensitiveCases : ArgumentsProviderBase() { override fun getParameters(): List = listOf( - // Can't find these variables due to case mismatch when perform case sensitive lookup + // Can't find these variables due to case mismatch when perform case-sensitive lookup EvaluatorErrorTestCase( query = "\"$DYNAMIC_LOOKUP_FUNCTION_NAME\"(`fOo`, `case_sensitive`, `locals_then_globals`, [f, b])", expectedErrorCode = ErrorCode.EVALUATOR_QUOTED_BINDING_DOES_NOT_EXIST, diff --git a/partiql-lang/src/test/kotlin/org/partiql/lang/eval/evaluatortestframework/PartiQLCompilerPipelineFactoryAsync.kt b/partiql-lang/src/test/kotlin/org/partiql/lang/eval/evaluatortestframework/PartiQLCompilerPipelineFactoryAsync.kt index 1bbbf9916a..7ab6947261 100644 --- a/partiql-lang/src/test/kotlin/org/partiql/lang/eval/evaluatortestframework/PartiQLCompilerPipelineFactoryAsync.kt +++ b/partiql-lang/src/test/kotlin/org/partiql/lang/eval/evaluatortestframework/PartiQLCompilerPipelineFactoryAsync.kt @@ -22,7 +22,7 @@ import kotlin.test.assertNull * TODO delete this once evaluator tests are replaced by `partiql-tests` */ @OptIn(ExperimentalPartiQLCompilerPipeline::class) -internal class PartiQLCompilerPipelineFactoryAsync() : PipelineFactory { +internal class PartiQLCompilerPipelineFactoryAsync : PipelineFactory { override val pipelineName: String = "PartiQLCompilerPipelineAsync"