Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
CR comments, code refactoring and more UTs
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit d87842e4ccb592a39d6b6897a2b9cde9ddc2839f
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 23:36:24 2020 -0700

    Reverting RefreshSearchAnalyzerResponse stream input output parsing test for fixing later

commit 33ef3223acde38a64575255f7ec702f807d0502e
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 23:34:49 2020 -0700

    Fixing missing indices test

commit 8516d2142362c61630682bf7353e019bf9f6a9eb
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 21:59:55 2020 -0700

    CR comments and added more UTs

commit d306421ca462f2614756a73231baeeb56b388365
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 20:15:59 2020 -0700

    shardFailure refactoring

commit da80de7a68fef05e524e624ea1a8da6afdbf11d4
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 18:14:45 2020 -0700

    Working changes after refactoring shardFailure

commit be9bf2daaa31fb6444f799a8f3ed3d0383551471
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 17:45:31 2020 -0700

    Corrected response parsing for indices with some failed shards

commit 256f06ed23ac4ab2d78724601a1dd7e9d1423331
Author: Himanshu Setia <[email protected]>
Date:   Thu Sep 10 17:12:00 2020 -0700

    Working
  • Loading branch information
Himanshu Setia committed Sep 11, 2020
1 parent 2f790fc commit b41ebcb
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
const val PLUGIN_NAME = "opendistro-im"
const val OPEN_DISTRO_BASE_URI = "/_opendistro"
const val ISM_BASE_URI = "$OPEN_DISTRO_BASE_URI/_ism"
const val ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_analyzer"
const val POLICY_BASE_URI = "$ISM_BASE_URI/policies"
const val INDEX_MANAGEMENT_INDEX = ".opendistro-ism-config"
const val INDEX_MANAGEMENT_JOB_TYPE = "opendistro-index-management"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.Writeable

class RefreshSearchAnalyzerAction : ActionType<RefreshSearchAnalyzerResponse>(NAME, reader) {
companion object {
const val NAME = "indices:admin/refresh_search_analyzer"
const val NAME = "indices:admin/_refresh_search_analyzers"
val INSTANCE = RefreshSearchAnalyzerAction()
val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.action.support.DefaultShardOperationFailedException.readShardOperationFailed
import org.elasticsearch.action.support.broadcast.BroadcastResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
Expand All @@ -29,22 +30,21 @@ import java.util.function.Function

class RefreshSearchAnalyzerResponse : BroadcastResponse {

private var results: MutableMap<String, List<String>> = HashMap()
private var shardFailures: MutableList<FailedShardDetails> = mutableListOf()
private var temp: List<DefaultShardOperationFailedException> = mutableListOf()

protected var logger = LogManager.getLogger(javaClass)

private lateinit var shardResponses: MutableList<RefreshSearchAnalyzerShardResponse>
private lateinit var shardFailures: MutableList<DefaultShardOperationFailedException>

@Throws(IOException::class)
constructor(inp: StreamInput) : super(inp) {
val resultSize: Int = inp.readVInt()
for (i in 0..resultSize) {
results.put(inp.readString(), inp.readStringArray().toList())
shardResponses.add(RefreshSearchAnalyzerShardResponse(inp))
}

val failureSize: Int = inp.readVInt()
for (i in 0..failureSize) {
shardFailures.add(FailedShardDetails(inp.readString(), inp.readInt(), inp.readString()))
shardFailures.add(readShardOperationFailed(inp))
}
}

Expand All @@ -53,58 +53,54 @@ class RefreshSearchAnalyzerResponse : BroadcastResponse {
successfulShards: Int,
failedShards: Int,
shardFailures: List<DefaultShardOperationFailedException>,
results: MutableMap<String, List<String>>
shardResponses: List<RefreshSearchAnalyzerShardResponse>
) : super(
totalShards, successfulShards, failedShards, shardFailures
) {
this.results = results
this.temp = shardFailures
for (failure in shardFailures) {
this.shardFailures.add(FailedShardDetails(failure.index()!!, failure.shardId(), failure.reason()))
}
this.shardResponses = shardResponses.toMutableList()
this.shardFailures = shardFailures.toMutableList()
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? {
builder.startObject()
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, null)

builder.startArray("_successful")
for (index in results.keys) {
builder.startObject()
val reloadedAnalyzers: List<String> = results.get(index)!!
builder.field("index", index)
builder.startArray("refreshed_analyzers")
RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, shardFailures.toTypedArray())
builder.startArray("successful_refreshes")
val successfulIndices = getSuccessfulRefreshDetails()
for (index in successfulIndices.keys) {
val reloadedAnalyzers = successfulIndices.get(index)!!
builder.startObject().field("index", index).startArray("refreshed_analyzers")
for (analyzer in reloadedAnalyzers) {
builder.value(analyzer)
}
builder.endArray()
builder.endObject()
builder.endArray().endObject()
}
builder.endArray()
builder.endArray().endObject()
return builder
}

builder.startArray("_failed")
// TODO: restrict it for testing
fun getSuccessfulRefreshDetails(): MutableMap<String, List<String>> {
var successfulRefreshDetails: MutableMap<String, List<String>> = HashMap()
var failedIndices = mutableSetOf<String>()
for (failure in shardFailures) {
builder.startObject()
builder.field("index", failure.index)
builder.field("shardId", failure.shardId)
builder.field("failureReason", failure.failureReason)
builder.endObject()
failedIndices.add(failure.index()!!)
}
builder.endArray()

builder.endObject()
return builder
for (response in shardResponses) {
if (!failedIndices.contains(response.index)) {
successfulRefreshDetails.putIfAbsent(response.index, response.reloadedAnalyzers)
}
}
return successfulRefreshDetails
}

companion object {
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("refresh_search_analyzer", true,
private val PARSER = ConstructingObjectParser<RefreshSearchAnalyzerResponse, Void>("_refresh_search_analyzers", true,
Function { arg: Array<Any> ->
val response = arg[0] as RefreshSearchAnalyzerResponse
RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards,
response.temp, response.results)
response.shardFailures, response.shardResponses)
})

init {
declareBroadcastFields(PARSER)
}
Expand All @@ -114,23 +110,14 @@ class RefreshSearchAnalyzerResponse : BroadcastResponse {
override fun writeTo(out: StreamOutput) {
super.writeTo(out)

out.writeVInt(results.size)
for ((key, value) in results.entries) {
out.writeString(key)
out.writeStringArray(value.toTypedArray())
out.writeVInt(shardResponses.size)
for (response in shardResponses) {
response.writeTo(out)
}

out.writeVInt(shardFailures.size)
for (failure in shardFailures) {
out.writeString(failure.index)
out.writeInt(failure.shardId)
out.writeString(failure.failureReason)
failure.writeTo(out)
}
}

class FailedShardDetails(index: String, shardId: Int, failureReason: String) {
val index = index
val shardId = shardId
val failureReason = failureReason
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ import org.elasticsearch.index.shard.ShardId
import java.io.IOException

class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse {
var indexName: String
var reloadedAnalyzers: List<String>

constructor(`in`: StreamInput) : super(`in`) {
indexName = `in`.readString()
reloadedAnalyzers = `in`.readStringArray().toList()
constructor(si: StreamInput) : super(si) {
reloadedAnalyzers = si.readStringArray().toList()
}

constructor(shardId: ShardId?, indexName: String, reloadedAnalyzers: List<String>) : super(shardId) {
this.indexName = indexName
constructor(shardId: ShardId, reloadedAnalyzers: List<String>) : super(shardId) {
this.reloadedAnalyzers = reloadedAnalyzers
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeString(indexName)
out.writeStringArray(reloadedAnalyzers.toTypedArray())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ANALYZER_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.OPEN_DISTRO_BASE_URI
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.Strings
import org.elasticsearch.rest.BaseRestHandler
Expand All @@ -31,7 +31,8 @@ class RestRefreshSearchAnalyzerAction : BaseRestHandler() {

override fun routes(): List<Route> {
return listOf(
Route(POST, REFRESH_SYNONYM_ANALYZER_URI)
Route(POST, REFRESH_SEARCH_ANALYZER_BASE_URI),
Route(POST, "$REFRESH_SEARCH_ANALYZER_BASE_URI/{index}")
)
}

Expand All @@ -55,6 +56,6 @@ class RestRefreshSearchAnalyzerAction : BaseRestHandler() {
}

companion object {
const val REFRESH_SYNONYM_ANALYZER_URI = "$ANALYZER_BASE_URI/refresh_search_analyzer/{index}"
const val REFRESH_SEARCH_ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_analyzer/_refresh_search_analyzers"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,34 @@ class TransportRefreshSearchAnalyzerAction :
private val analysisRegistry: AnalysisRegistry

@Throws(IOException::class)
override fun readShardResult(`in`: StreamInput): RefreshSearchAnalyzerShardResponse? {
return RefreshSearchAnalyzerShardResponse(`in`)
override fun readShardResult(si: StreamInput): RefreshSearchAnalyzerShardResponse? {
return RefreshSearchAnalyzerShardResponse(si)
}

override fun newResponse(
request: RefreshSearchAnalyzerRequest?,
request: RefreshSearchAnalyzerRequest,
totalShards: Int,
successfulShards: Int,
failedShards: Int,
results: List<RefreshSearchAnalyzerShardResponse>,
shardResponses: List<RefreshSearchAnalyzerShardResponse>,
shardFailures: List<DefaultShardOperationFailedException>,
clusterState: ClusterState?
clusterState: ClusterState
): RefreshSearchAnalyzerResponse {
val shardResponses: MutableMap<String, List<String>> = HashMap()
for (response in results) {
shardResponses.put(response.indexName, response.reloadedAnalyzers)
}
return RefreshSearchAnalyzerResponse(totalShards, successfulShards, failedShards, shardFailures, shardResponses)
}

@Throws(IOException::class)
override fun readRequestFrom(`in`: StreamInput): RefreshSearchAnalyzerRequest? {
return RefreshSearchAnalyzerRequest(`in`)
override fun readRequestFrom(si: StreamInput): RefreshSearchAnalyzerRequest {
return RefreshSearchAnalyzerRequest(si)
}

@Throws(IOException::class)
override fun shardOperation(request: RefreshSearchAnalyzerRequest?, shardRouting: ShardRouting): RefreshSearchAnalyzerShardResponse {
override fun shardOperation(request: RefreshSearchAnalyzerRequest, shardRouting: ShardRouting): RefreshSearchAnalyzerShardResponse {
val indexShard: IndexShard = indicesService.indexServiceSafe(shardRouting.shardId().index).getShard(shardRouting.shardId().id())
val reloadedAnalyzers: List<String> = indexShard.mapperService().reloadSearchAnalyzers(analysisRegistry)
logger.debug("Reload successful, index: ${shardRouting.shardId().index.name}, shard: ${shardRouting.shardId().id}")
return RefreshSearchAnalyzerShardResponse(shardRouting.shardId(), shardRouting.indexName, reloadedAnalyzers)
logger.info("Reload successful, index: ${shardRouting.shardId().index.name}, shard: ${shardRouting.shardId().id}, " +
"is_primary: ${shardRouting.primary()}")
return RefreshSearchAnalyzerShardResponse(shardRouting.shardId(), reloadedAnalyzers)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.waitFor
import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction.Companion.REFRESH_SEARCH_ANALYZER_BASE_URI
import org.elasticsearch.client.Request
import org.elasticsearch.common.io.Streams
import org.elasticsearch.common.settings.Settings
Expand All @@ -41,10 +42,10 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
.build()
createIndex(indexName, settings, getAnalyzerMapping())
ingestData(indexName)
Thread.sleep(1000) // wait for refresh_interval

val result1 = queryData(indexName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(indexName, "hola")
Expand Down Expand Up @@ -87,13 +88,12 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
val settings: Settings = Settings.builder()
.loadFromSource(getSearchAnalyzerSettings(), XContentType.JSON)
.build()
// val mappings: String = "\"properties\":{\"title\":{\"type\": \"text\",\"analyzer\" : \"standard\",\"search_analyzer\": \"my_synonyms\"}}"
createIndex(indexName, settings, getAnalyzerMapping())
ingestData(indexName)
Thread.sleep(1000) // wait for refresh_interval

val result1 = queryData(indexName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(indexName, "hola")
Expand Down Expand Up @@ -140,10 +140,10 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
.build()
createIndex(indexName, settings, getAnalyzerMapping(), aliasSettings)
ingestData(indexName)
Thread.sleep(1000)

val result1 = queryData(aliasName, "hello")
assertTrue(result1.contains("hello world"))
waitFor {
assertTrue(queryData(indexName, "hello").contains("hello world"))
}

// check synonym
val result2 = queryData(aliasName, "hola")
Expand Down Expand Up @@ -181,7 +181,6 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {
}

fun deleteFile(filePath: String) {
// org.elasticsearch.common.io.PathUtils.get(filePath)
Files.deleteIfExists(org.elasticsearch.common.io.PathUtils.get(filePath))
}

Expand All @@ -204,7 +203,7 @@ class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() {

fun refreshAnalyzer(indexName: String) {
val request = Request("POST",
"${IndexManagementPlugin.ANALYZER_BASE_URI}/refresh_search_analyzer/$indexName")
"$REFRESH_SEARCH_ANALYZER_BASE_URI/$indexName")
client().performRequest(request)
}

Expand Down
Loading

0 comments on commit b41ebcb

Please sign in to comment.