Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Refactor #336

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ jobs:
java-version: 14
- name: Run integration tests with multi node config
run: ./gradlew integTest -PnumNodes=3
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/integTest-*/logs/*
6 changes: 6 additions & 0 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ jobs:
java-version: 14
- name: Build with Gradle
run: ./gradlew build
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/integTest-*/logs/*
- name: Create Artifact Path
run: |
mkdir -p index-management-artifacts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,43 @@
package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi

import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.seqno.SequenceNumbers
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.io.IOException
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContentBuilder {
if (instant == null) {
return nullField(name)
}
return this.timeField(name, name, instant.toEpochMilli())
/** Convert an object to maps and lists representation */
fun ToXContent.convertToMap(): Map<String, Any> {
val bytesReference = XContentHelper.toXContent(this, XContentType.JSON, false)
return XContentHelper.convertToMap(bytesReference, false, XContentType.JSON).v2()
}

fun XContentParser.instant(): Instant? {
return when {
currentToken() == XContentParser.Token.VALUE_NULL -> null
currentToken() == Token.VALUE_NULL -> null
currentToken().isValue -> Instant.ofEpochMilli(longValue())
else -> {
XContentParserUtils.throwUnknownToken(currentToken(), tokenLocation)
Expand All @@ -44,11 +63,98 @@ fun XContentParser.instant(): Instant? {
}
}

fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContentBuilder {
if (instant == null) {
return nullField(name)
}
return this.timeField(name, name, instant.toEpochMilli())
}

/**
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
* if [block] throws an [ElasticsearchException] that is retriable (502, 503, 504).
*
* If all retries fail the final exception will be rethrown. Exceptions caught during intermediate retries are
* logged as warnings to [logger]. Similar to [org.elasticsearch.action.bulk.Retry], except this retries on
* 502, 503, 504 error codes as well as 429.
*
* @param logger - logger used to log intermediate failures
* @param retryOn - any additional [RestStatus] values that should be retried
* @param block - the block of code to retry. This should be a suspend function.
*/
suspend fun <T> BackoffPolicy.retry(
logger: Logger,
retryOn: List<RestStatus> = emptyList(),
block: suspend (backoff: TimeValue) -> T
): T {
val iter = iterator()
var backoff: TimeValue = TimeValue.ZERO
do {
try {
return block(backoff)
} catch (e: ElasticsearchException) {
if (iter.hasNext() && (e.isRetryable() || retryOn.contains(e.status()))) {
backoff = iter.next()
logger.warn("Operation failed. Retrying in $backoff.", e)
delay(backoff.millis)
} else {
throw e
}
}
} while (true)
}

/**
* Retries on 502, 503 and 504 per elastic client's behavior: https://github.com/elastic/elasticsearch-net/issues/2061
* 429 must be retried manually as it's not clear if it's ok to retry for requests other than Bulk requests.
*/
fun ElasticsearchException.isRetryable(): Boolean {
return (status() in listOf(RestStatus.BAD_GATEWAY, RestStatus.SERVICE_UNAVAILABLE, RestStatus.GATEWAY_TIMEOUT))
}

/**
* Extension function for ES 6.3 and above that duplicates the ES 6.2 XContentBuilder.string() method.
*/
fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString()

/**
* Converts [ElasticsearchClient] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the ES client API.
*/
suspend fun <C : ElasticsearchClient, T> C.suspendUntil(block: C.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* Converts [LockService] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the LockService API.
*/
suspend fun <T> LockService.suspendUntil(block: LockService.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
}

fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
val rte = this.cause?.findRemoteTransportException()
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
}

@JvmOverloads
@Throws(IOException::class)
fun <T> XContentParser.parseWithType(
Expand All @@ -57,10 +163,10 @@ fun <T> XContentParser.parseWithType(
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
parse: (xcp: XContentParser, id: String, seqNo: Long, primaryTerm: Long) -> T
): T {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, nextToken(), this::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, nextToken(), this::getTokenLocation)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.FIELD_NAME, nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.START_OBJECT, nextToken(), this::getTokenLocation)
val parsed = parse(this, id, seqNo, primaryTerm)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, this.nextToken(), this::getTokenLocation)
ensureExpectedToken(Token.END_OBJECT, this.nextToken(), this::getTokenLocation)
return parsed
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -46,6 +47,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isFailed
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.isPolicyCompleted
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.updateEnableManagedIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -354,8 +356,8 @@ class ManagedIndexCoordinator(
val managedIndexSearchRequest = getSweptManagedIndexSearchRequest()
val response: SearchResponse = client.suspendUntil { search(managedIndexSearchRequest, it) }
return response.hits.map {
it.id to SweptManagedIndexConfig.parseWithType(contentParser(it.sourceRef),
it.seqNo, it.primaryTerm)
it.id to contentParser(it.sourceRef).parseWithType(NO_ID, it.seqNo,
it.primaryTerm, SweptManagedIndexConfig.Companion::parse)
}.toMap()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.string
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -348,7 +349,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
updateManagedIndexMetaData(updatedManagedIndexMetaData)
}

@Suppress("ReturnCount")
@Suppress("ReturnCount", "BlockingMethodInNonBlockingContext")
private suspend fun getPolicy(policyID: String): Policy? {
try {
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, policyID)
Expand All @@ -362,7 +363,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
policySource, XContentType.JSON)
Policy.parseWithType(xcp, getResponse.id, getResponse.seqNo, getResponse.primaryTerm)
xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse)
}
} catch (e: Exception) {
logger.error("Failed to get policy: $policyID", e)
Expand Down
Loading