diff --git a/build.gradle b/build.gradle index d7e53a51a..91673ae17 100644 --- a/build.gradle +++ b/build.gradle @@ -180,6 +180,7 @@ integTest.runner { systemProperty 'tests.security.manager', 'false' systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath systemProperty 'tests.path.repo', repo.absolutePath + systemProperty 'buildDir', buildDir // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. doFirst { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt index eba55b96a..78664eb14 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt @@ -31,6 +31,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRemovePolicyAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RefreshSearchAnalyzerAction +import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction +import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.TransportRefreshSearchAnalyzerAction import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner @@ -117,6 +120,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug nodesInCluster: Supplier ): List { return listOf( + RestRefreshSearchAnalyzerAction(), RestIndexPolicyAction(settings, clusterService, indexManagementIndices), RestGetPolicyAction(), RestDeletePolicyAction(), @@ -190,6 +194,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug ActionPlugin.ActionHandler( UpdateManagedIndexMetaDataAction.INSTANCE, TransportUpdateManagedIndexMetaDataAction::class.java + ), + ActionPlugin.ActionHandler( + RefreshSearchAnalyzerAction.INSTANCE, + TransportRefreshSearchAnalyzerAction::class.java ) ) } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerAction.kt new file mode 100644 index 000000000..d96df7391 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerAction.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.action.ActionType +import org.elasticsearch.common.io.stream.Writeable + +class RefreshSearchAnalyzerAction : ActionType(NAME, reader) { + companion object { + const val NAME = "indices:admin/refresh_search_analyzers" + val INSTANCE = RefreshSearchAnalyzerAction() + val reader = Writeable.Reader { inp -> RefreshSearchAnalyzerResponse(inp) } + } + + override fun getResponseReader(): Writeable.Reader = reader +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt new file mode 100644 index 000000000..19a0d57e9 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerRequest.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.action.support.broadcast.BroadcastRequest +import org.elasticsearch.common.io.stream.StreamInput +import java.io.IOException + +class RefreshSearchAnalyzerRequest : BroadcastRequest { + constructor(vararg indices: String) : super(*indices) + + @Throws(IOException::class) + constructor(inp: StreamInput) : super(inp) +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponse.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponse.kt new file mode 100644 index 000000000..746669c38 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponse.kt @@ -0,0 +1,104 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.action.support.DefaultShardOperationFailedException +import org.elasticsearch.action.support.broadcast.BroadcastResponse +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.xcontent.ConstructingObjectParser +import org.elasticsearch.common.xcontent.ToXContent.Params +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.rest.action.RestActions +import java.io.IOException +import java.util.function.Function + +class RefreshSearchAnalyzerResponse : BroadcastResponse { + + private lateinit var shardResponses: MutableList + private lateinit var shardFailures: MutableList + + @Throws(IOException::class) + constructor(inp: StreamInput) : super(inp) { + inp.readList(::RefreshSearchAnalyzerShardResponse) + inp.readList(DefaultShardOperationFailedException::readShardOperationFailed) + } + + constructor( + totalShards: Int, + successfulShards: Int, + failedShards: Int, + shardFailures: List, + shardResponses: List + ) : super( + totalShards, successfulShards, failedShards, shardFailures + ) { + this.shardResponses = shardResponses.toMutableList() + this.shardFailures = shardFailures.toMutableList() + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: Params?): XContentBuilder? { + builder.startObject() + RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, -1, failedShards, shardFailures.toTypedArray()) + builder.startArray("successful_refresh_details") + val successfulIndices = getSuccessfulRefreshDetails() + for (index in successfulIndices.keys) { + val reloadedAnalyzers = successfulIndices.get(index)!! + builder.startObject().field("index", index).startArray("refreshed_analyzers") + for (analyzer in reloadedAnalyzers) { + builder.value(analyzer) + } + builder.endArray().endObject() + } + builder.endArray().endObject() + return builder + } + + // TODO: restrict it for testing + fun getSuccessfulRefreshDetails(): MutableMap> { + var successfulRefreshDetails: MutableMap> = HashMap() + var failedIndices = mutableSetOf() + for (failure in shardFailures) { + failedIndices.add(failure.index()!!) + } + for (response in shardResponses) { + if (!failedIndices.contains(response.index)) { + successfulRefreshDetails.putIfAbsent(response.index, response.reloadedAnalyzers) + } + } + return successfulRefreshDetails + } + + companion object { + private val PARSER = ConstructingObjectParser("_refresh_search_analyzers", true, + Function { arg: Array -> + val response = arg[0] as RefreshSearchAnalyzerResponse + RefreshSearchAnalyzerResponse(response.totalShards, response.successfulShards, response.failedShards, + response.shardFailures, response.shardResponses) + }) + init { + declareBroadcastFields(PARSER) + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeCollection(shardResponses) + out.writeCollection(shardFailures) + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponse.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponse.kt new file mode 100644 index 000000000..45718005e --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponse.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.action.support.broadcast.BroadcastShardResponse +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.index.shard.ShardId +import java.io.IOException + +class RefreshSearchAnalyzerShardResponse : BroadcastShardResponse { + var reloadedAnalyzers: List + + constructor(si: StreamInput) : super(si) { + reloadedAnalyzers = si.readStringArray().toList() + } + + constructor(shardId: ShardId, reloadedAnalyzers: List) : super(shardId) { + this.reloadedAnalyzers = reloadedAnalyzers + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeStringArray(reloadedAnalyzers.toTypedArray()) + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerAction.kt new file mode 100644 index 000000000..be9a42f31 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerAction.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.OPEN_DISTRO_BASE_URI +import org.elasticsearch.client.node.NodeClient +import org.elasticsearch.common.Strings +import org.elasticsearch.rest.BaseRestHandler +import org.elasticsearch.rest.RestHandler.Route +import org.elasticsearch.rest.RestRequest +import org.elasticsearch.rest.RestRequest.Method.POST +import org.elasticsearch.rest.action.RestToXContentListener +import java.io.IOException + +class RestRefreshSearchAnalyzerAction : BaseRestHandler() { + + override fun getName(): String = "refresh_search_analyzer_action" + + override fun routes(): List { + return listOf( + Route(POST, REFRESH_SEARCH_ANALYZER_BASE_URI), + Route(POST, "$REFRESH_SEARCH_ANALYZER_BASE_URI/{index}") + ) + } + + // TODO: Add indicesOptions? + + @Throws(IOException::class) + @Suppress("SpreadOperator") + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val indices: Array? = Strings.splitStringByCommaToArray(request.param("index")) + + if (indices.isNullOrEmpty()) { + throw IllegalArgumentException("Missing indices") + } + + val refreshSearchAnalyzerRequest: RefreshSearchAnalyzerRequest = RefreshSearchAnalyzerRequest() + .indices(*indices) + + return RestChannelConsumer { channel -> + client.execute(RefreshSearchAnalyzerAction.INSTANCE, refreshSearchAnalyzerRequest, RestToXContentListener(channel)) + } + } + + companion object { + const val REFRESH_SEARCH_ANALYZER_BASE_URI = "$OPEN_DISTRO_BASE_URI/_refresh_search_analyzers" + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/TransportRefreshSearchAnalyzerAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/TransportRefreshSearchAnalyzerAction.kt new file mode 100644 index 000000000..5b38a43cd --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/TransportRefreshSearchAnalyzerAction.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.support.ActionFilters +import org.elasticsearch.action.support.DefaultShardOperationFailedException +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.block.ClusterBlockException +import org.elasticsearch.cluster.block.ClusterBlockLevel +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver +import org.elasticsearch.cluster.routing.ShardRouting +import org.elasticsearch.cluster.routing.ShardsIterator +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.Writeable +import org.elasticsearch.index.analysis.AnalysisRegistry +import org.elasticsearch.index.shard.IndexShard +import org.elasticsearch.indices.IndicesService +import org.elasticsearch.threadpool.ThreadPool +import org.elasticsearch.transport.TransportService +import java.io.IOException + +class TransportRefreshSearchAnalyzerAction : + TransportBroadcastByNodeAction< + RefreshSearchAnalyzerRequest, + RefreshSearchAnalyzerResponse, + RefreshSearchAnalyzerShardResponse> { + + private val log = LogManager.getLogger(javaClass) + + @Inject + constructor( + clusterService: ClusterService, + transportService: TransportService, + indicesService: IndicesService, + actionFilters: ActionFilters, + analysisRegistry: AnalysisRegistry, + indexNameExpressionResolver: IndexNameExpressionResolver? + ) : super( + RefreshSearchAnalyzerAction.NAME, + clusterService, + transportService, + actionFilters, + indexNameExpressionResolver, + Writeable.Reader { RefreshSearchAnalyzerRequest() }, + ThreadPool.Names.MANAGEMENT + ) { + this.analysisRegistry = analysisRegistry + this.indicesService = indicesService + } + + private val indicesService: IndicesService + private val analysisRegistry: AnalysisRegistry + + @Throws(IOException::class) + override fun readShardResult(si: StreamInput): RefreshSearchAnalyzerShardResponse? { + return RefreshSearchAnalyzerShardResponse(si) + } + + override fun newResponse( + request: RefreshSearchAnalyzerRequest, + totalShards: Int, + successfulShards: Int, + failedShards: Int, + shardResponses: List, + shardFailures: List, + clusterState: ClusterState + ): RefreshSearchAnalyzerResponse { + return RefreshSearchAnalyzerResponse(totalShards, successfulShards, failedShards, shardFailures, shardResponses) + } + + @Throws(IOException::class) + override fun readRequestFrom(si: StreamInput): RefreshSearchAnalyzerRequest { + return RefreshSearchAnalyzerRequest(si) + } + + @Throws(IOException::class) + override fun shardOperation(request: RefreshSearchAnalyzerRequest, shardRouting: ShardRouting): RefreshSearchAnalyzerShardResponse { + val indexShard: IndexShard = indicesService.indexServiceSafe(shardRouting.shardId().index).getShard(shardRouting.shardId().id()) + val reloadedAnalyzers: List = indexShard.mapperService().reloadSearchAnalyzers(analysisRegistry) + log.info("Reload successful, index: ${shardRouting.shardId().index.name}, shard: ${shardRouting.shardId().id}, " + + "is_primary: ${shardRouting.primary()}") + return RefreshSearchAnalyzerShardResponse(shardRouting.shardId(), reloadedAnalyzers) + } + + /** + * The refresh request works against *all* shards. + */ + override fun shards(clusterState: ClusterState, request: RefreshSearchAnalyzerRequest?, concreteIndices: Array?): ShardsIterator? { + return clusterState.routingTable().allShards(concreteIndices) + } + + override fun checkGlobalBlock(state: ClusterState, request: RefreshSearchAnalyzerRequest?): ClusterBlockException? { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE) + } + + override fun checkRequestBlock(state: ClusterState, request: RefreshSearchAnalyzerRequest?, concreteIndices: Array?): + ClusterBlockException? { + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices) + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementRestTestCase.kt new file mode 100644 index 000000000..23ca5b62c --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementRestTestCase.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement + +import org.elasticsearch.client.Response +import org.elasticsearch.rest.RestStatus +import org.elasticsearch.test.rest.ESRestTestCase + +abstract class IndexManagementRestTestCase : ESRestTestCase() { + + fun Response.asMap(): Map = entityAsMap(this) + + protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode) + + protected fun getRepoPath(): String = System.getProperty("tests.path.repo") +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index f630457be..e83462516 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlug import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ISM_BASE_URI +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ChangePolicy import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData @@ -63,7 +64,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers import org.elasticsearch.rest.RestRequest import org.elasticsearch.rest.RestStatus import org.elasticsearch.test.ESTestCase -import org.elasticsearch.test.rest.ESRestTestCase import org.junit.AfterClass import org.junit.rules.DisableOnDebug import java.io.IOException @@ -77,14 +77,12 @@ import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL -abstract class IndexStateManagementRestTestCase : ESRestTestCase() { +abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() { private val isDebuggingTest = DisableOnDebug(null).isDebugging private val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean() protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 - fun Response.asMap(): Map = entityAsMap(this) - protected fun createPolicy( policy: Policy, policyId: String = ESTestCase.randomAlphaOfLength(10), @@ -346,8 +344,6 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } - protected fun Response.restStatus(): RestStatus = RestStatus.fromCode(this.statusLine.statusCode) - protected fun Policy.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON) protected fun ManagedIndexConfig.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON) @@ -514,8 +510,6 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus()) } - private fun getRepoPath(): String = System.getProperty("tests.path.repo") - private fun getShardsList(): List { val response = client() .makeRequest( diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt new file mode 100644 index 000000000..2edba47f1 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerActionIT.kt @@ -0,0 +1,262 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction.Companion.REFRESH_SEARCH_ANALYZER_BASE_URI +import org.elasticsearch.client.Request +import org.elasticsearch.common.io.Streams +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.common.xcontent.XContentType +import java.io.InputStreamReader +import java.nio.charset.Charset +import java.nio.charset.StandardCharsets +import java.nio.file.Files + +class RefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() { + fun `test index time analyzer`() { + val buildDir = System.getProperty("buildDir") + val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt() + val indexName = "testindex" + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola") + } + + val settings: Settings = Settings.builder() + .loadFromSource(getIndexAnalyzerSettings(), XContentType.JSON) + .build() + createIndex(indexName, settings, getAnalyzerMapping()) + ingestData(indexName) + + assertTrue(queryData(indexName, "hello").contains("hello world")) + + // check synonym + val result2 = queryData(indexName, "hola") + assertTrue(result2.contains("hello world")) + + // check non synonym + val result3 = queryData(indexName, "namaste") + assertFalse(result3.contains("hello world")) + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola, namaste") + } + + // New added synonym should NOT match + val result4 = queryData(indexName, "namaste") + assertFalse(result4.contains("hello world")) + + // refresh synonyms + refreshAnalyzer(indexName) + + // New added synonym should NOT match + val result5 = queryData(indexName, "namaste") + assertFalse(result5.contains("hello world")) + + // clean up + for (i in 0 until numNodes) { + deleteFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt") + } + } + + fun `test search time analyzer`() { + val buildDir = System.getProperty("buildDir") + val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt() + val indexName = "testindex" + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola") + } + + val settings: Settings = Settings.builder() + .loadFromSource(getSearchAnalyzerSettings(), XContentType.JSON) + .build() + createIndex(indexName, settings, getAnalyzerMapping()) + ingestData(indexName) + + assertTrue(queryData(indexName, "hello").contains("hello world")) + + // check synonym + val result2 = queryData(indexName, "hola") + assertTrue(result2.contains("hello world")) + + // check non synonym + val result3 = queryData(indexName, "namaste") + assertFalse(result3.contains("hello world")) + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola, namaste") + } + + // New added synonym should NOT match + val result4 = queryData(indexName, "namaste") + assertFalse(result4.contains("hello world")) + + // refresh synonyms + refreshAnalyzer(indexName) + + // New added synonym should match + val result5 = queryData(indexName, "namaste") + assertTrue(result5.contains("hello world")) + + // clean up + for (i in 0 until numNodes) { + deleteFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt") + } + } + + fun `test alias`() { + val indexName = "testindex" + val numNodes = System.getProperty("cluster.number_of_nodes", "1").toInt() + val buildDir = System.getProperty("buildDir") + val aliasName = "test" + val aliasSettings = "\"$aliasName\": {}" + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola") + } + + val settings: Settings = Settings.builder() + .loadFromSource(getSearchAnalyzerSettings(), XContentType.JSON) + .build() + createIndex(indexName, settings, getAnalyzerMapping(), aliasSettings) + ingestData(indexName) + + assertTrue(queryData(indexName, "hello").contains("hello world")) + + // check synonym + val result2 = queryData(aliasName, "hola") + assertTrue(result2.contains("hello world")) + + // check non synonym + val result3 = queryData(aliasName, "namaste") + assertFalse(result3.contains("hello world")) + + for (i in 0 until numNodes) { + writeToFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt", "hello, hola, namaste") + } + + // New added synonym should NOT match + val result4 = queryData(aliasName, "namaste") + assertFalse(result4.contains("hello world")) + + // refresh synonyms + refreshAnalyzer(aliasName) + + // New added synonym should match + val result5 = queryData(aliasName, "namaste") + assertTrue(result5.contains("hello world")) + + for (i in 0 until numNodes) { + deleteFile("$buildDir/testclusters/integTest-$i/config/pacman_synonyms.txt") + } + } + + companion object { + + fun writeToFile(filePath: String, contents: String) { + var path = org.elasticsearch.common.io.PathUtils.get(filePath) + Files.newBufferedWriter(path, Charset.forName("UTF-8")).use { writer -> writer.write(contents) } + } + + fun deleteFile(filePath: String) { + Files.deleteIfExists(org.elasticsearch.common.io.PathUtils.get(filePath)) + } + + fun ingestData(indexName: String) { + val request = Request("POST", "/$indexName/_doc?refresh=true") + val data: String = """ + { + "title": "hello world..." + } + """.trimIndent() + request.setJsonEntity(data) + client().performRequest(request) + } + + fun queryData(indexName: String, query: String): String { + val request = Request("GET", "/$indexName/_search?q=$query") + val response = client().performRequest(request) + return Streams.copyToString(InputStreamReader(response.entity.content, StandardCharsets.UTF_8)) + } + + fun refreshAnalyzer(indexName: String) { + val request = Request("POST", + "$REFRESH_SEARCH_ANALYZER_BASE_URI/$indexName") + client().performRequest(request) + } + + fun getSearchAnalyzerSettings(): String { + return """ + { + "index" : { + "analysis" : { + "analyzer" : { + "my_synonyms" : { + "tokenizer" : "whitespace", + "filter" : ["synonym"] + } + }, + "filter" : { + "synonym" : { + "type" : "synonym_graph", + "synonyms_path" : "pacman_synonyms.txt", + "updateable" : true + } + } + } + } + } + """.trimIndent() + } + + fun getIndexAnalyzerSettings(): String { + return """ + { + "index" : { + "analysis" : { + "analyzer" : { + "my_synonyms" : { + "tokenizer" : "whitespace", + "filter" : ["synonym"] + } + }, + "filter" : { + "synonym" : { + "type" : "synonym_graph", + "synonyms_path" : "pacman_synonyms.txt" + } + } + } + } + } + """.trimIndent() + } + + fun getAnalyzerMapping(): String { + return """ + "properties": { + "title": { + "type": "text", + "analyzer" : "standard", + "search_analyzer": "my_synonyms" + } + } + """.trimIndent() + } + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt new file mode 100644 index 000000000..faeb9db0b --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerResponseTests.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.action.support.DefaultShardOperationFailedException +import org.elasticsearch.index.shard.ShardId +import org.elasticsearch.test.ESTestCase +import org.junit.Assert + +class RefreshSearchAnalyzerResponseTests : ESTestCase() { + + fun `test get successful refresh details`() { + val index1 = "index1" + val index2 = "index2" + val syn1 = "synonym1" + val syn2 = "synonym2" + val i1s0 = ShardId(index1, "abc", 0) + val i1s1 = ShardId(index1, "abc", 1) + val i2s0 = ShardId(index2, "xyz", 0) + val i2s1 = ShardId(index2, "xyz", 1) + + var response_i1s0 = RefreshSearchAnalyzerShardResponse(i1s0, listOf(syn1, syn2)) + var response_i1s1 = RefreshSearchAnalyzerShardResponse(i1s1, listOf(syn1, syn2)) + var response_i2s0 = RefreshSearchAnalyzerShardResponse(i2s0, listOf(syn1)) + var response_i2s1 = RefreshSearchAnalyzerShardResponse(i2s1, listOf(syn1)) + var failure_i1s0 = DefaultShardOperationFailedException(index1, 0, Throwable("dummyCause")) + var failure_i1s1 = DefaultShardOperationFailedException(index1, 1, Throwable("dummyCause")) + var failure_i2s0 = DefaultShardOperationFailedException(index2, 0, Throwable("dummyCause")) + var failure_i2s1 = DefaultShardOperationFailedException(index2, 1, Throwable("dummyCause")) + + // Case 1: All shards successful + var aggregate_response = listOf(response_i1s0, response_i1s1, response_i2s0, response_i2s1) + var aggregate_failures = listOf() + var refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 4, 0, aggregate_failures, aggregate_response) + var successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() + Assert.assertTrue(successfulIndices.containsKey(index1)) + Assert.assertTrue(successfulIndices.containsKey(index2)) + + // Case 2: All shards failed + aggregate_response = listOf() + aggregate_failures = listOf(failure_i1s0, failure_i1s1, failure_i2s0, failure_i2s1) + refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 0, 4, aggregate_failures, aggregate_response) + successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() + Assert.assertTrue(successfulIndices.isEmpty()) + + // Case 3: Some shards of an index fail, while some others succeed + aggregate_response = listOf(response_i1s1, response_i2s0, response_i2s1) + aggregate_failures = listOf(failure_i1s0) + refreshSearchAnalyzerResponse = RefreshSearchAnalyzerResponse(4, 3, 1, aggregate_failures, aggregate_response) + successfulIndices = refreshSearchAnalyzerResponse.getSuccessfulRefreshDetails() + Assert.assertTrue(successfulIndices.containsKey(index2)) + Assert.assertFalse(successfulIndices.containsKey(index1)) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponseTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponseTests.kt new file mode 100644 index 000000000..4a97e0b83 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RefreshSearchAnalyzerShardResponseTests.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import org.elasticsearch.common.io.stream.BytesStreamOutput +import org.elasticsearch.index.Index +import org.elasticsearch.index.shard.ShardId +import org.elasticsearch.test.ESTestCase +import org.junit.Assert + +class RefreshSearchAnalyzerShardResponseTests : ESTestCase() { + + fun `test shard refresh response parsing`() { + val reloadedAnalyzers = listOf("analyzer1", "analyzer2") + val refreshShardResponse = RefreshSearchAnalyzerShardResponse(ShardId(Index("testIndex", "qwerty"), 0), reloadedAnalyzers) + + val refreshShardResponse2 = roundTripRequest(refreshShardResponse) + Assert.assertEquals(refreshShardResponse2.shardId, refreshShardResponse.shardId) + } + + @Throws(Exception::class) + private fun roundTripRequest(response: RefreshSearchAnalyzerShardResponse): RefreshSearchAnalyzerShardResponse { + BytesStreamOutput().use { out -> + response.writeTo(out) + out.bytes().streamInput().use { si -> return RefreshSearchAnalyzerShardResponse(si) } + } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt new file mode 100644 index 000000000..56f1b8f1b --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/refreshanalyzer/RestRefreshSearchAnalyzerActionIT.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer + +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.makeRequest +import com.amazon.opendistroforelasticsearch.indexmanagement.refreshanalyzer.RestRefreshSearchAnalyzerAction.Companion.REFRESH_SEARCH_ANALYZER_BASE_URI +import org.elasticsearch.client.ResponseException +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.rest.RestRequest.Method.POST +import org.elasticsearch.rest.RestStatus + +class RestRefreshSearchAnalyzerActionIT : IndexManagementRestTestCase() { + + fun `test missing indices`() { + try { + client().makeRequest(POST.toString(), REFRESH_SEARCH_ANALYZER_BASE_URI) + fail("Expected a failure") + } catch (e: ResponseException) { + assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) + val actualMessage = e.response.asMap() + val expectedErrorMessage = mapOf( + "error" to mapOf( + "root_cause" to listOf>( + mapOf("type" to "illegal_argument_exception", "reason" to "Missing indices") + ), + "type" to "illegal_argument_exception", + "reason" to "Missing indices" + ), + "status" to 400 + ) + assertEquals(expectedErrorMessage, actualMessage) + } + } + + fun `test closed index`() { + val indexName = "testindex" + val settings = Settings.builder().build() + createIndex(indexName, settings) + closeIndex(indexName) + + try { + client().makeRequest(POST.toString(), "$REFRESH_SEARCH_ANALYZER_BASE_URI/$indexName") + fail("Expected a failure") + } catch (e: ResponseException) { + val response = e.response.asMap() + assertEquals(400, response.get("status")) + assertEquals("index_closed_exception", (response.get("error") as HashMap<*, *>).get("type")) + } + } +}