Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance index creation with validation on extra_options #961

Open
noCharger opened this issue Dec 2, 2024 · 0 comments
Open

Enhance index creation with validation on extra_options #961

noCharger opened this issue Dec 2, 2024 · 0 comments
Labels
enhancement New feature or request

Comments

@noCharger
Copy link
Collaborator

noCharger commented Dec 2, 2024

Is your feature request related to a problem?

Currently, neither FlintSparkIndexRefresh.validate() nor FlintSparkIndexBuilder / FlintSparkIndexOptions perform validation on the extra_options field. This can lead to a partial failure scenario where:

  • Index creation succeeds and the OpenSearch index is created
  • Index refresh fails due to malformed extra_options

What solution would you like?

Enhance the index creation process to include validation of the extra_options field. This validation should occur before the index is created to prevent the partial failure scenario described above.

Do you have any additional context?

This issue was identified when an index refresh operation failed due to malformed JSON in the extra_options field. The error occurs during the parsing of extra_options in FlintSparkIndexOptions.parseExtraOptions().

Example log

24/11/29 10:00:21.205 ERROR {QID=***} (main) FlintSpark: Failed to execute index operation [Refresh Flint index ***]
java.lang.IllegalStateException: Failed to commit transaction operation
	at org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction.commit(DefaultOptimisticTransaction.java:123)
	at org.opensearch.flint.spark.FlintSpark.refreshIndexManual(FlintSpark.scala:574)
	at org.opensearch.flint.spark.FlintSpark.$anonfun$refreshIndex$1(FlintSpark.scala:166)
	at org.opensearch.flint.spark.FlintSparkTransactionSupport.withTransaction(FlintSparkTransactionSupport.scala:67)
	at org.opensearch.flint.spark.FlintSparkTransactionSupport.withTransaction$(FlintSparkTransactionSupport.scala:52)
	at org.opensearch.flint.spark.FlintSpark.withTransaction(FlintSpark.scala:43)
	at org.opensearch.flint.spark.FlintSpark.refreshIndex(FlintSpark.scala:160)
	at org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder.$anonfun$visitRefreshMaterializedViewStatement$1(FlintSparkMaterializedViewAstBuilder.scala:65)
	at org.opensearch.flint.spark.sql.FlintSparkSqlCommand.run(FlintSparkSqlCommand.scala:27)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:223)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:692)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:714)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:745)
	at com.amazon.client.WarmpoolStatementExecutionManagerDqsImpl.executeStatement(WarmpoolStatementExecutionManagerDqsImpl.java:201)
	at org.apache.spark.sql.JobOperator.start(JobOperator.scala:93)
	at org.apache.spark.sql.FlintJob$.processStreamingJob(FlintJob.scala:388)
	at org.apache.spark.sql.FlintJob$.handleWarmpoolJob(FlintJob.scala:188)
	at org.apache.spark.sql.FlintJob$.main(FlintJob.scala:80)
	at org.apache.spark.sql.FlintJob.main(FlintJob.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.json4s.ParserUtil$ParseException: unknown token a
Near: gGroupIdentifier:["a
	at org.json4s.native.JsonParser$Parser.fail(JsonParser.scala:236)
	at org.json4s.native.JsonParser$Parser.nextToken(JsonParser.scala:324)
	at org.json4s.native.JsonParser$.$anonfun$astParser$1(JsonParser.scala:188)
	at org.json4s.native.JsonParser$.$anonfun$astParser$1$adapted(JsonParser.scala:145)
	at org.json4s.native.JsonParser$.parse(JsonParser.scala:133)
	at org.json4s.native.JsonParser$.parse(JsonParser.scala:71)
	at org.json4s.native.JsonMethods.parse(JsonMethods.scala:10)
	at org.json4s.native.JsonMethods.parse$(JsonMethods.scala:9)
	at org.json4s.native.JsonMethods$.parse(JsonMethods.scala:63)
	at org.opensearch.flint.spark.FlintSparkIndexOptions.$anonfun$parseExtraOptions$1(FlintSparkIndexOptions.scala:145)
	at scala.Option.map(Option.scala:230)
	at org.opensearch.flint.spark.FlintSparkIndexOptions.parseExtraOptions(FlintSparkIndexOptions.scala:145)
	at org.opensearch.flint.spark.FlintSparkIndexOptions.extraSourceOptions(FlintSparkIndexOptions.scala:108)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView.org$opensearch$flint$spark$mv$FlintSparkMaterializedView$$optionsWithExtra(FlintSparkMaterializedView.scala:119)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:100)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView$$anonfun$1.applyOrElse(FlintSparkMaterializedView.scala:95)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1384)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1383)
	at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:99)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1384)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1383)
	at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.mapChildren(basicLogicalOperators.scala:2037)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1384)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1383)
	at org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark.mapChildren(EventTimeWatermark.scala:40)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:524)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$4(TreeNode.scala:534)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1384)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1383)
	at org.apache.spark.sql.catalyst.plans.logical.Aggregate.mapChildren(basicLogicalOperators.scala:1443)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:534)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:463)
	at org.opensearch.flint.spark.mv.FlintSparkMaterializedView.buildStream(FlintSparkMaterializedView.scala:95)
	at org.opensearch.flint.spark.refresh.AutoIndexRefresh.$anonfun$start$2(AutoIndexRefresh.scala:75)
	at org.opensearch.flint.core.metrics.MetricsSparkListener$.withMetrics(MetricsSparkListener.scala:59)
	at org.opensearch.flint.spark.refresh.AutoIndexRefresh.start(AutoIndexRefresh.scala:73)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.$anonfun$start$2(IncrementalIndexRefresh.scala:55)
	at org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect.withMetrics(RefreshMetricsAspect.scala:51)
	at org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect.withMetrics$(RefreshMetricsAspect.scala:40)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.withMetrics(IncrementalIndexRefresh.scala:23)
	at org.opensearch.flint.spark.refresh.IncrementalIndexRefresh.start(IncrementalIndexRefresh.scala:51)
	at org.opensearch.flint.spark.FlintSpark.$anonfun$refreshIndexManual$5(FlintSpark.scala:574)
	at org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction.commit(DefaultOptimisticTransaction.java:102)
	... 64 more
@noCharger noCharger added enhancement New feature or request untriaged labels Dec 2, 2024
@dai-chen dai-chen removed the untriaged label Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants