Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adds support for dynamically updatable synonyms to es version 7.8 (#294)
Browse files Browse the repository at this point in the history
* Adding support for dynamically refreshable synonym analyzers to es version 7.8

Squashed commit of the following:

commit 02db791
Author: Himanshu Setia <[email protected]>
Date:   Fri Sep 11 14:52:23 2020 -0700

    Reverting action name to follow the code convention

commit 4115f97
Author: Himanshu Setia <[email protected]>
Date:   Fri Sep 11 14:01:41 2020 -0700

    misc changes

commit 71234c6
Author: Himanshu Setia <[email protected]>
Date:   Fri Sep 11 13:31:40 2020 -0700

    API path change and few minor tweaks

commit f4c8465
Author: Himanshu Setia <[email protected]>
Date:   Fri Sep 11 12:52:55 2020 -0700

    CR comments - reading shardResponses as List from inputStream

commit 33330d4
Author: Himanshu Setia <[email protected]>
Date:   Fri Sep 11 11:31:00 2020 -0700

    Integ test fixes by enforcing refresh during ingestion

commit 6c33bb6
Merge: b41ebcb 1042dd6
Author: Drew Baugher <[email protected]>
Date:   Thu Sep 10 23:56:35 2020 -0700

    Merge branch 'master' into hotreload2

commit b41ebcb
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 23:38:56 2020 -0700

    CR comments, code refactoring and more UTs

    Squashed commit of the following:

    commit d87842e4ccb592a39d6b6897a2b9cde9ddc2839f
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 23:36:24 2020 -0700

        Reverting RefreshSearchAnalyzerResponse stream input output parsing test for fixing later

    commit 33ef3223acde38a64575255f7ec702f807d0502e
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 23:34:49 2020 -0700

        Fixing missing indices test

    commit 8516d2142362c61630682bf7353e019bf9f6a9eb
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 21:59:55 2020 -0700

        CR comments and added more UTs

    commit d306421ca462f2614756a73231baeeb56b388365
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 20:15:59 2020 -0700

        shardFailure refactoring

    commit da80de7a68fef05e524e624ea1a8da6afdbf11d4
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 18:14:45 2020 -0700

        Working changes after refactoring shardFailure

    commit be9bf2daaa31fb6444f799a8f3ed3d0383551471
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 17:45:31 2020 -0700

        Corrected response parsing for indices with some failed shards

    commit 256f06ed23ac4ab2d78724601a1dd7e9d1423331
    Author: Himanshu Setia <[email protected]>
    Date:   Thu Sep 10 17:12:00 2020 -0700

        Working

commit 2f790fc
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 16:16:03 2020 -0700

    Adding UTs to validate stream  parsing

commit 387095a
Author: Himanshu Setia <[email protected]>
Date:   Wed Sep 9 14:37:13 2020 -0700

    Fixing klint errors

commit 2f494ab
Author: Himanshu Setia <[email protected]>
Date:   Wed Sep 9 13:49:52 2020 -0700

    Refactoring and logging

commit c68d4bc
Author: Himanshu Setia <[email protected]>
Date:   Wed Sep 9 13:45:19 2020 -0700

    Fixed multinode response parsing issue

commit ae21bd4
Author: Himanshu Setia <[email protected]>
Date:   Wed Sep 9 10:30:36 2020 -0700

    Response parsing changes

commit 4703b95
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 21:51:41 2020 -0700

    Refactoring - API renaming

commit 40db035
Author: Himanshu Setia <[email protected]>
Date:   Tue Sep 8 19:24:20 2020 -0700

    Squashed commit of the following:

    commit c8c8233
    Author: Himanshu Setia <[email protected]>
    Date:   Tue Sep 8 19:21:58 2020 -0700

        removing comments, linting changes

    commit 52526e8
    Author: Himanshu Setia <[email protected]>
    Date:   Tue Sep 8 19:09:30 2020 -0700

        Making refresh_synonym_analyzer uri odfe compatible

    commit d4cc71e
    Author: Himanshu Setia <[email protected]>
    Date:   Tue Sep 8 18:33:39 2020 -0700

        Adding copyright disclaimer to new files

    commit 9af7fdf
    Author: Himanshu Setia <[email protected]>
    Date:   Tue Sep 8 17:26:53 2020 -0700

        Misc - removing comments, adding newline, etc.

    commit 1c85f18
    Author: Himanshu Setia <[email protected]>
    Date:   Tue Sep 8 16:39:26 2020 -0700

        Adding _refresh_synonym_analyzer API to support dynamic update for search analyzers

* Adding base uri
  • Loading branch information
setiah authored Sep 13, 2020
1 parent c7b8d36 commit 9d44bbb
Show file tree
Hide file tree
Showing 14 changed files with 865 additions and 19 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ testClusters.integTest {
integTest.runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'buildDir', buildDir
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.Re
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -67,7 +70,8 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,

companion object {
const val PLUGIN_NAME = "opendistro-ism"
const val ISM_BASE_URI = "/_opendistro/_ism"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val INDEX_STATE_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_STATE_MANAGEMENT_JOB_TYPE = "opendistro-managed-index"
Expand Down Expand Up @@ -119,6 +123,7 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
RestRefreshSearchAnalyzerAction(),
RestIndexPolicyAction(settings, clusterService, indexStateManagementIndices),
RestGetPolicyAction(),
RestDeletePolicyAction(),
Expand Down Expand Up @@ -186,6 +191,10 @@ internal class IndexStateManagementPlugin : JobSchedulerExtension, ActionPlugin,
ActionPlugin.ActionHandler(
UpdateManagedIndexMetaDataAction.INSTANCE,
TransportUpdateManagedIndexMetaDataAction::class.java
),
ActionPlugin.ActionHandler(
RefreshSearchAnalyzerAction.INSTANCE,
TransportRefreshSearchAnalyzerAction::class.java
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.ActionType
import org.elasticsearch.common.io.stream.Writeable

class RefreshSearchAnalyzerAction : ActionType<RefreshSearchAnalyzerResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/refresh_search_analyzers"
val INSTANCE = RefreshSearchAnalyzerAction()
val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<RefreshSearchAnalyzerResponse> = reader
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastRequest
import org.elasticsearch.common.io.stream.StreamInput
import java.io.IOException

class RefreshSearchAnalyzerRequest : BroadcastRequest<RefreshSearchAnalyzerRequest> {
constructor(vararg indices: String) : super(*indices)

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.action.support.broadcast.BroadcastResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ConstructingObjectParser
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.rest.action.RestActions
import java.io.IOException
import java.util.function.Function

class RefreshSearchAnalyzerResponse : BroadcastResponse {

private lateinit var shardResponses: MutableList<RefreshSearchAnalyzerShardResponse>
private lateinit var shardFailures: MutableList<DefaultShardOperationFailedException>

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
inp.readList(::RefreshSearchAnalyzerShardResponse)
inp.readList(DefaultShardOperationFailedException::readShardOperationFailed)
}

constructor(
totalShards: Int,
successfulShards: Int,
failedShards: Int,
shardFailures: List<DefaultShardOperationFailedException>,
shardResponses: List<RefreshSearchAnalyzerShardResponse>
) : super(
totalShards, successfulShards, failedShards, shardFailures
) {
this.shardResponses = shardResponses.toMutableList()
this.shardFailures = shardFailures.toMutableList()
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? {
builder.startObject()
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, shardFailures.toTypedArray())
builder.startArray("successful_refresh_details")
val successfulIndices = getSuccessfulRefreshDetails()
for (index in successfulIndices.keys) {
val reloadedAnalyzers = successfulIndices.get(index)!!
builder.startObject().field("index", index).startArray("refreshed_analyzers")
for (analyzer in reloadedAnalyzers) {
builder.value(analyzer)
}
builder.endArray().endObject()
}
builder.endArray().endObject()
return builder
}

// TODO: restrict it for testing
fun getSuccessfulRefreshDetails(): MutableMap<String, List<String>> {
var successfulRefreshDetails: MutableMap<String, List<String>> = HashMap()
var failedIndices = mutableSetOf<String>()
for (failure in shardFailures) {
failedIndices.add(failure.index()!!)
}
for (response in shardResponses) {
if (!failedIndices.contains(response.index)) {
successfulRefreshDetails.putIfAbsent(response.index, response.reloadedAnalyzers)
}
}
return successfulRefreshDetails
}

companion object {
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("_refresh_search_analyzers", true,
Function { arg: Array<Any> ->
val response = arg[0] as RefreshSearchAnalyzerResponse
RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards,
response.shardFailures, response.shardResponses)
})
init {
declareBroadcastFields(PARSER)
}
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeCollection(shardResponses)
out.writeCollection(shardFailures)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastShardResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse {
var reloadedAnalyzers: List<String>

constructor(si: StreamInput) : super(si) {
reloadedAnalyzers = si.readStringArray().toList()
}

constructor(shardId: ShardId, reloadedAnalyzers: List<String>) : super(shardId) {
this.reloadedAnalyzers = reloadedAnalyzers
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeStringArray(reloadedAnalyzers.toTypedArray())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.OPEN_DISTRO_BASE_URI
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException

class RestRefreshSearchAnalyzerAction : BaseRestHandler() {

override fun getName(): String = "refresh_search_analyzer_action"

override fun routes(): List<Route> {
return listOf(
Route(POST, REFRESH_SEARCH_ANALYZER_BASE_URI),
Route(POST, "$REFRESH_SEARCH_ANALYZER_BASE_URI/{index}")
)
}

// TODO: Add indicesOptions?

@Throws(IOException::class)
@Suppress("SpreadOperator")
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val indices: Array<String>? = Strings.splitStringByCommaToArray(request.param("index"))

if (indices.isNullOrEmpty()) {
throw IllegalArgumentException("Missing indices")
}

val refreshSearchAnalyzerRequest: RefreshSearchAnalyzerRequest = RefreshSearchAnalyzerRequest()
.indices(*indices)

return RestChannelConsumer { channel ->
client.execute(RefreshSearchAnalyzerAction.INSTANCE, refreshSearchAnalyzerRequest, RestToXContentListener(channel))
}
}

companion object {
const val REFRESH_SEARCH_ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_refresh_search_analyzers"
}
}
Loading

0 comments on commit 9d44bbb

Please sign in to comment.