Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds support for dynamically updatable search analyzers #290

Merged
merged 14 commits into from
Sep 12, 2020
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ integTest.runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'tests.path.repo', repo.absolutePath
systemProperty 'buildDir', buildDir
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
Expand Down Expand Up @@ -72,6 +75,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
const val PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_analyzer"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
Expand Down Expand Up @@ -117,6 +121,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
RestRefreshSearchAnalyzerAction(),
RestIndexPolicyAction(settings, clusterService, indexManagementIndices),
RestGetPolicyAction(),
RestDeletePolicyAction(),
Expand Down Expand Up @@ -190,6 +195,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
ActionPlugin.ActionHandler(
UpdateManagedIndexMetaDataAction.INSTANCE,
TransportUpdateManagedIndexMetaDataAction::class.java
),
ActionPlugin.ActionHandler(
RefreshSearchAnalyzerAction.INSTANCE,
TransportRefreshSearchAnalyzerAction::class.java
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.ActionType
import org.elasticsearch.common.io.stream.Writeable

class RefreshSearchAnalyzerAction : ActionType<RefreshSearchAnalyzerResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/refresh_search_analyzer"
val INSTANCE = RefreshSearchAnalyzerAction()
val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) }
}

override fun getResponseReader(): Writeable.Reader<RefreshSearchAnalyzerResponse> = reader
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastRequest
import org.elasticsearch.common.io.stream.StreamInput
import java.io.IOException

class RefreshSearchAnalyzerRequest : BroadcastRequest<RefreshSearchAnalyzerRequest> {
constructor(vararg indices: String) : super(*indices)

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.action.support.broadcast.BroadcastResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ConstructingObjectParser
import org.elasticsearch.common.xcontent.ToXContent.Params
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.rest.action.RestActions
import java.io.IOException
import java.util.function.Function

class RefreshSearchAnalyzerResponse : BroadcastResponse {

private var results: MutableMap<String, List<String>> = HashMap()
private var shardFailures: MutableList<FailedShardDetails> = mutableListOf()
private var temp: List<DefaultShardOperationFailedException> = mutableListOf()

protected var logger = LogManager.getLogger(javaClass)

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
val resultSize: Int = inp.readVInt()
for (i in 0..resultSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common pattern you saw elsewhere for reading off streaminput?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this constructor only take the streaminput in the format outputted by writeTo? if so, you can change writeTo to do something like:

out.writeList(shardResponses)

and then here you can read the list and pass a class reference to the element of the list (assuming it's Writeable)

inp.readList(::RefreshSearchAnalyzerShardResponse)

You can find a similar example of this in the Monitor data class for Alerting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this pattern in BroadcastResponse.java, although the one that Mohammad suggested looks to be a better way. I have modified it in my latest commit. Thanks for the suggestion Mo!

results.put(inp.readString(), inp.readStringArray().toList())
}

val failureSize: Int = inp.readVInt()
for (i in 0..failureSize) {
shardFailures.add(FailedShardDetails(inp.readString(), inp.readInt(), inp.readString()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Make FailedShardDetails writeable and just pass in the streaminput/output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I wanted to do some refactoring on this but couldn't get to it earlier. It's done now, the new version does not use this class now.

}
}

constructor(
totalShards: Int,
successfulShards: Int,
failedShards: Int,
shardFailures: List<DefaultShardOperationFailedException>,
results: MutableMap<String, List<String>>
) : super(
totalShards, successfulShards, failedShards, shardFailures
) {
this.results = results
this.temp = shardFailures
for (failure in shardFailures) {
qreshi marked this conversation as resolved.
Show resolved Hide resolved
this.shardFailures.add(FailedShardDetails(failure.index()!!, failure.shardId(), failure.reason()))
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? {
builder.startObject()
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, null)

builder.startArray("_successful")
for (index in results.keys) {
builder.startObject()
val reloadedAnalyzers: List<String> = results.get(index)!!
builder.field("index", index)
builder.startArray("refreshed_analyzers")
for (analyzer in reloadedAnalyzers) {
builder.value(analyzer)
}
builder.endArray()
builder.endObject()
}
builder.endArray()

builder.startArray("_failed")
for (failure in shardFailures) {
builder.startObject()
qreshi marked this conversation as resolved.
Show resolved Hide resolved
builder.field("index", failure.index)
builder.field("shardId", failure.shardId)
builder.field("failureReason", failure.failureReason)
builder.endObject()
}
builder.endArray()

builder.endObject()
return builder
}

companion object {
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("refresh_search_analyzer", true,
Function { arg: Array<Any> ->
val response = arg[0] as RefreshSearchAnalyzerResponse
RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards,
response.temp, response.results)
})

init {
declareBroadcastFields(PARSER)
}
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)

out.writeVInt(results.size)
for ((key, value) in results.entries) {
out.writeString(key)
out.writeStringArray(value.toTypedArray())
}

out.writeVInt(shardFailures.size)
for (failure in shardFailures) {
out.writeString(failure.index)
out.writeInt(failure.shardId)
out.writeString(failure.failureReason)
}
}

class FailedShardDetails(index: String, shardId: Int, failureReason: String) {
val index = index
val shardId = shardId
val failureReason = failureReason
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.elasticsearch.action.support.broadcast.BroadcastShardResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse {
var indexName: String
var reloadedAnalyzers: List<String>

constructor(`in`: StreamInput) : super(`in`) {
qreshi marked this conversation as resolved.
Show resolved Hide resolved
indexName = `in`.readString()
reloadedAnalyzers = `in`.readStringArray().toList()
}

constructor(shardId: ShardId?, indexName: String, reloadedAnalyzers: List<String>) : super(shardId) {
this.indexName = indexName
this.reloadedAnalyzers = reloadedAnalyzers
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
out.writeStringArray(reloadedAnalyzers.toTypedArray())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ANALYZER_BASE_URI
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.POST
import org.elasticsearch.rest.action.RestToXContentListener
import java.io.IOException

class RestRefreshSearchAnalyzerAction : BaseRestHandler() {

override fun getName(): String = "refresh_search_analyzer_action"

override fun routes(): List<Route> {
return listOf(
Route(POST, REFRESH_SYNONYM_ANALYZER_URI)
)
}

// TODO: Add indicesOptions?

@Throws(IOException::class)
@Suppress("SpreadOperator")
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val indices: Array<String>? = Strings.splitStringByCommaToArray(request.param("index"))

if (indices.isNullOrEmpty()) {
throw IllegalArgumentException("Missing indices")
}

val refreshSearchAnalyzerRequest: RefreshSearchAnalyzerRequest = RefreshSearchAnalyzerRequest()
.indices(*indices)

return RestChannelConsumer { channel ->
client.execute(RefreshSearchAnalyzerAction.INSTANCE, refreshSearchAnalyzerRequest, RestToXContentListener(channel))
}
}

companion object {
const val REFRESH_SYNONYM_ANALYZER_URI = "$ANALYZER_BASE_URI/refresh_search_analyzer/{index}"
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading