Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds support for dynamically updatable search analyzers #290

Merged
merged 14 commits into from
Sep 12, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
const val PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_analyzer"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.Writeable

class RefreshSearchAnalyzerAction : ActionType<RefreshSearchAnalyzerResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/refresh_search_analyzer"
const val NAME = "indices:admin/_refresh_search_analyzers"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it common to prefix _ on the action names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it's not. Will change back to indices:admin/refresh_search_analyzers

val INSTANCE = RefreshSearchAnalyzerAction()
val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.action.support.DefaultShardOperationFailedException.readShardOperationFailed
import org.elasticsearch.action.support.broadcast.BroadcastResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
Expand All @@ -29,22 +30,21 @@ import java.util.function.Function

class RefreshSearchAnalyzerResponse : BroadcastResponse {

private var results: MutableMap<String, List<String>> = HashMap()
private var shardFailures: MutableList<FailedShardDetails> = mutableListOf()
private var temp: List<DefaultShardOperationFailedException> = mutableListOf()

protected var logger = LogManager.getLogger(javaClass)

private lateinit var shardResponses: MutableList<RefreshSearchAnalyzerShardResponse>
private lateinit var shardFailures: MutableList<DefaultShardOperationFailedException>

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
val resultSize: Int = inp.readVInt()
for (i in 0..resultSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a common pattern you saw elsewhere for reading off streaminput?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this constructor only take the streaminput in the format outputted by writeTo? if so, you can change writeTo to do something like:

out.writeList(shardResponses)

and then here you can read the list and pass a class reference to the element of the list (assuming it's Writeable)

inp.readList(::RefreshSearchAnalyzerShardResponse)

You can find a similar example of this in the Monitor data class for Alerting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this pattern in BroadcastResponse.java, although the one that Mohammad suggested looks to be a better way. I have modified it in my latest commit. Thanks for the suggestion Mo!

results.put(inp.readString(), inp.readStringArray().toList())
shardResponses.add(RefreshSearchAnalyzerShardResponse(inp))
}

val failureSize: Int = inp.readVInt()
for (i in 0..failureSize) {
shardFailures.add(FailedShardDetails(inp.readString(), inp.readInt(), inp.readString()))
shardFailures.add(readShardOperationFailed(inp))
}
}

Expand All @@ -53,58 +53,54 @@ class RefreshSearchAnalyzerResponse : BroadcastResponse {
successfulShards: Int,
failedShards: Int,
shardFailures: List<DefaultShardOperationFailedException>,
results: MutableMap<String, List<String>>
shardResponses: List<RefreshSearchAnalyzerShardResponse>
) : super(
totalShards, successfulShards, failedShards, shardFailures
) {
this.results = results
this.temp = shardFailures
for (failure in shardFailures) {
this.shardFailures.add(FailedShardDetails(failure.index()!!, failure.shardId(), failure.reason()))
}
this.shardResponses = shardResponses.toMutableList()
this.shardFailures = shardFailures.toMutableList()
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? {
builder.startObject()
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, null)

builder.startArray("_successful")
for (index in results.keys) {
builder.startObject()
val reloadedAnalyzers: List<String> = results.get(index)!!
builder.field("index", index)
builder.startArray("refreshed_analyzers")
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, shardFailures.toTypedArray())
builder.startArray("successful_refreshes")
val successfulIndices = getSuccessfulRefreshDetails()
for (index in successfulIndices.keys) {
val reloadedAnalyzers = successfulIndices.get(index)!!
builder.startObject().field("index", index).startArray("refreshed_analyzers")
for (analyzer in reloadedAnalyzers) {
builder.value(analyzer)
}
builder.endArray()
builder.endObject()
builder.endArray().endObject()
}
builder.endArray()
builder.endArray().endObject()
return builder
}

builder.startArray("_failed")
// TODO: restrict it for testing
fun getSuccessfulRefreshDetails(): MutableMap<String, List<String>> {
var successfulRefreshDetails: MutableMap<String, List<String>> = HashMap()
var failedIndices = mutableSetOf<String>()
for (failure in shardFailures) {
builder.startObject()
builder.field("index", failure.index)
builder.field("shardId", failure.shardId)
builder.field("failureReason", failure.failureReason)
builder.endObject()
failedIndices.add(failure.index()!!)
}
builder.endArray()

builder.endObject()
return builder
for (response in shardResponses) {
if (!failedIndices.contains(response.index)) {
successfulRefreshDetails.putIfAbsent(response.index, response.reloadedAnalyzers)
}
}
return successfulRefreshDetails
}

companion object {
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("refresh_search_analyzer", true,
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("_refresh_search_analyzers", true,
Function { arg: Array<Any> ->
val response = arg[0] as RefreshSearchAnalyzerResponse
RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards,
response.temp, response.results)
response.shardFailures, response.shardResponses)
})

init {
declareBroadcastFields(PARSER)
}
Expand All @@ -114,23 +110,14 @@ class RefreshSearchAnalyzerResponse : BroadcastResponse {
override fun writeTo(out: StreamOutput) {
super.writeTo(out)

out.writeVInt(results.size)
for ((key, value) in results.entries) {
out.writeString(key)
out.writeStringArray(value.toTypedArray())
out.writeVInt(shardResponses.size)
for (response in shardResponses) {
response.writeTo(out)
}

out.writeVInt(shardFailures.size)
for (failure in shardFailures) {
out.writeString(failure.index)
out.writeInt(failure.shardId)
out.writeString(failure.failureReason)
failure.writeTo(out)
}
}

class FailedShardDetails(index: String, shardId: Int, failureReason: String) {
val index = index
val shardId = shardId
val failureReason = failureReason
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse {
var indexName: String
var reloadedAnalyzers: List<String>

constructor(`in`: StreamInput) : super(`in`) {
indexName = `in`.readString()
reloadedAnalyzers = `in`.readStringArray().toList()
constructor(si: StreamInput) : super(si) {
reloadedAnalyzers = si.readStringArray().toList()
}

constructor(shardId: ShardId?, indexName: String, reloadedAnalyzers: List<String>) : super(shardId) {
this.indexName = indexName
constructor(shardId: ShardId, reloadedAnalyzers: List<String>) : super(shardId) {
this.reloadedAnalyzers = reloadedAnalyzers
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
out.writeStringArray(reloadedAnalyzers.toTypedArray())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ANALYZER_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.OPEN_DISTRO_BASE_URI
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
import org.elasticsearch.rest.BaseRestHandler
Expand All @@ -31,7 +31,8 @@ class RestRefreshSearchAnalyzerAction : BaseRestHandler() {

override fun routes(): List<Route> {
return listOf(
Route(POST, REFRESH_SYNONYM_ANALYZER_URI)
Route(POST, REFRESH_SEARCH_ANALYZER_BASE_URI),
Route(POST, "$REFRESH_SEARCH_ANALYZER_BASE_URI/{index}")
)
}

Expand All @@ -55,6 +56,6 @@ class RestRefreshSearchAnalyzerAction : BaseRestHandler() {
}

companion object {
const val REFRESH_SYNONYM_ANALYZER_URI = "$ANALYZER_BASE_URI/refresh_search_analyzer/{index}"
const val REFRESH_SEARCH_ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_analyzer/_refresh_search_analyzers"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,34 @@ class TransportRefreshSearchAnalyzerAction :
private val analysisRegistry: AnalysisRegistry

@Throws(IOException::class)
override fun readShardResult(`in`: StreamInput): RefreshSearchAnalyzerShardResponse? {
return RefreshSearchAnalyzerShardResponse(`in`)
override fun readShardResult(si: StreamInput): RefreshSearchAnalyzerShardResponse? {
return RefreshSearchAnalyzerShardResponse(si)
}

override fun newResponse(
request: RefreshSearchAnalyzerRequest?,
request: RefreshSearchAnalyzerRequest,
totalShards: Int,
successfulShards: Int,
failedShards: Int,
results: List<RefreshSearchAnalyzerShardResponse>,
shardResponses: List<RefreshSearchAnalyzerShardResponse>,
shardFailures: List<DefaultShardOperationFailedException>,
clusterState: ClusterState?
clusterState: ClusterState
): RefreshSearchAnalyzerResponse {
val shardResponses: MutableMap<String, List<String>> = HashMap()
for (response in results) {
shardResponses.put(response.indexName, response.reloadedAnalyzers)
}
return RefreshSearchAnalyzerResponse(totalShards, successfulShards, failedShards, shardFailures, shardResponses)
}

@Throws(IOException::class)
override fun readRequestFrom(`in`: StreamInput): RefreshSearchAnalyzerRequest? {
return RefreshSearchAnalyzerRequest(`in`)
override fun readRequestFrom(si: StreamInput): RefreshSearchAnalyzerRequest {
return RefreshSearchAnalyzerRequest(si)
}

@Throws(IOException::class)
override fun shardOperation(request: RefreshSearchAnalyzerRequest?, shardRouting: ShardRouting): RefreshSearchAnalyzerShardResponse {
override fun shardOperation(request: RefreshSearchAnalyzerRequest, shardRouting: ShardRouting): RefreshSearchAnalyzerShardResponse {
val indexShard: IndexShard = indicesService.indexServiceSafe(shardRouting.shardId().index).getShard(shardRouting.shardId().id())
val reloadedAnalyzers: List<String> = indexShard.mapperService().reloadSearchAnalyzers(analysisRegistry)
logger.debug("Reload successful, index: ${shardRouting.shardId().index.name}, shard: ${shardRouting.shardId().id}")
return RefreshSearchAnalyzerShardResponse(shardRouting.shardId(), shardRouting.indexName, reloadedAnalyzers)
logger.info("Reload successful, index: ${shardRouting.shardId().index.name}, shard: ${shardRouting.shardId().id}, " +
"is_primary: ${shardRouting.primary()}")
return RefreshSearchAnalyzerShardResponse(shardRouting.shardId(), reloadedAnalyzers)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.waitFor
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction.Companion.REFRESH_SEARCH_ANALYZER_BASE_URI
import org.elasticsearch.client.Request
import org.elasticsearch.common.io.Streams
import org.elasticsearch.common.settings.Settings
Expand All @@ -41,10 +42,10 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
.build()
createIndex(indexName, settings, getAnalyzerMapping())
ingestData(indexName)
Thread.sleep(1000) // wait for refresh_interval

val result1 = queryData(indexName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(indexName, "hola")
Expand Down Expand Up @@ -87,13 +88,12 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
val settings: Settings = Settings.builder()
.loadFromSource(getSearchAnalyzerSettings(), XContentType.JSON)
.build()
// val mappings: String = "\"properties\":{\"title\":{\"type\": \"text\",\"analyzer\" : \"standard\",\"search_analyzer\": \"my_synonyms\"}}"
createIndex(indexName, settings, getAnalyzerMapping())
ingestData(indexName)
Thread.sleep(1000) // wait for refresh_interval

val result1 = queryData(indexName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(indexName, "hola")
Expand Down Expand Up @@ -140,10 +140,10 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
.build()
createIndex(indexName, settings, getAnalyzerMapping(), aliasSettings)
ingestData(indexName)
Thread.sleep(1000)

val result1 = queryData(aliasName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(aliasName, "hola")
Expand Down Expand Up @@ -181,7 +181,6 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
}

fun deleteFile(filePath: String) {
// org.elasticsearch.common.io.PathUtils.get(filePath)
Files.deleteIfExists(org.elasticsearch.common.io.PathUtils.get(filePath))
}

Expand All @@ -204,7 +203,7 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {

fun refreshAnalyzer(indexName: String) {
val request = Request("POST",
"${IndexManagementPlugin.ANALYZER_BASE_URI}/refresh_search_analyzer/$indexName")
"$REFRESH_SEARCH_ANALYZER_BASE_URI/$indexName")
client().performRequest(request)
}

Expand Down
Loading