Skip to content

Commit

Permalink
Fix ktlint
Browse files Browse the repository at this point in the history
Signed-off-by: Angie Zhang <[email protected]>
  • Loading branch information
Angie Zhang committed Oct 4, 2022
1 parent f93ee3c commit b05614f
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ object ManagedIndexRunner :
private lateinit var extensionStatusChecker: ExtensionStatusChecker
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED

@Suppress("MagicNumber")
private val savePolicyRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val updateMetaDataRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)

@Suppress("MagicNumber")
private val errorNotificationRetryPolicy = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(250), 3)
private var jobInterval: Int = DEFAULT_JOB_INTERVAL
Expand Down Expand Up @@ -345,7 +348,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Previous action was not able to update IndexMetaData.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -359,7 +363,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Failed to execute action=${action?.type} as extension [$actionExtensionName] is not enabled.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -372,7 +377,8 @@ object ManagedIndexRunner :
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
policyRetryInfo = PolicyRetryInfoMetaData(true, 0),
info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
Expand All @@ -388,7 +394,10 @@ object ManagedIndexRunner :
// Step null check is done in getStartingManagedIndexMetaData
withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
managedIndexConfig.id,
settings,
threadPool.threadContext,
managedIndexConfig.policy.user
)
) {
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger)
Expand Down Expand Up @@ -476,8 +485,10 @@ object ManagedIndexRunner :
// Intellij complains about createParser/parseWithType blocking because it sees they throw IOExceptions
return withContext(Dispatchers.IO) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
policySource, XContentType.JSON
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
policySource,
XContentType.JSON
)
xcp.parseWithType(getResponse.id, getResponse.seqNo, getResponse.primaryTerm, Policy.Companion::parse)
}
Expand All @@ -504,8 +515,11 @@ object ManagedIndexRunner :
@Suppress("TooGenericExceptionCaught")
private suspend fun savePolicyToManagedIndexConfig(managedIndexConfig: ManagedIndexConfig, policy: Policy): Boolean {
val updatedManagedIndexConfig = managedIndexConfig.copy(
policyID = policy.id, policy = policy,
policySeqNo = policy.seqNo, policyPrimaryTerm = policy.primaryTerm, changePolicy = null
policyID = policy.id,
policy = policy,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
changePolicy = null
)
val indexRequest = managedIndexConfigIndexRequest(updatedManagedIndexConfig)
var savedPolicy = false
Expand Down Expand Up @@ -605,8 +619,8 @@ object ManagedIndexRunner :
// this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm
// in the metadata, in this case we just want to say we successfully initialized the policy again but we will not
// modify the state, action, etc. so it can resume where it left off
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm
&& managedIndexMetaData.policyID == policy.id ->
managedIndexMetaData.policySeqNo == policy.seqNo && managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm &&
managedIndexMetaData.policyID == policy.id ->
// If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue.
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0),
Expand Down Expand Up @@ -688,7 +702,6 @@ object ManagedIndexRunner :
managedIndexMetaData: ManagedIndexMetaData,
actionToExecute: Action?
) {

// should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null
val changePolicy = managedIndexConfig.changePolicy
if (changePolicy == null) {
Expand All @@ -709,8 +722,13 @@ object ManagedIndexRunner :
// if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are
// in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase
val newTransitionMetaData = ActionMetaData(
TransitionsAction.name, Instant.now().toEpochMilli(), -1,
false, 0, 0, null
TransitionsAction.name,
Instant.now().toEpochMilli(),
-1,
false,
0,
0,
null
)
val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) {
newTransitionMetaData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class TransportIndexPolicyAction @Inject constructor(
val settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<IndexPolicyRequest, IndexPolicyResponse>(
IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest
IndexPolicyAction.NAME,
transportService,
actionFilters,
::IndexPolicyRequest
) {

@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)
Expand Down Expand Up @@ -187,7 +190,8 @@ class TransportIndexPolicyAction @Inject constructor(

private fun putPolicy() {
val policy = request.policy.copy(
schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user
schemaVersion = IndexUtils.indexManagementConfigSchemaVersion,
user = this.user
)

val indexRequest = IndexRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
Expand Down Expand Up @@ -237,7 +241,7 @@ class TransportIndexPolicyAction @Inject constructor(
val failureReasons = StringBuilder()
if (response.shardInfo.failed > 0) {
response.shardInfo.failures.forEach {
entry ->
entry ->
failureReasons.append(entry.reason())
}
return failureReasons.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RollupIndexer(

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_INGEST_BACKOFF_MILLIS, ROLLUP_INGEST_BACKOFF_COUNT) {
millis, count ->
millis, count ->
retryIngestPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RollupSearchService(

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ROLLUP_SEARCH_BACKOFF_MILLIS, ROLLUP_SEARCH_BACKOFF_COUNT) {
millis, count ->
millis, count ->
retrySearchPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class FieldCapsFilter(
@Volatile private var shouldIntercept = RollupSettings.ROLLUP_DASHBOARDS.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) {
flag ->
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) { flag ->
shouldIntercept = flag
}
}
Expand All @@ -59,7 +58,7 @@ class FieldCapsFilter(
val rollupIndices = mutableSetOf<String>()
val nonRollupIndices = mutableSetOf<String>()
val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) {
idx: String? ->
idx: String? ->
indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state())
}
val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
Expand Down Expand Up @@ -102,7 +101,9 @@ class FieldCapsFilter(
}

chain.proceed(
task, action, request,
task,
action,
request,
object : ActionListener<Response> {
override fun onResponse(response: Response) {
logger.info("Has rollup indices will rewrite field caps response")
Expand Down Expand Up @@ -192,10 +193,15 @@ class FieldCapsFilter(
}
val isSearchable = fieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION
response[fieldName]!![type] = FieldCapabilities(
fieldName, type, isSearchable, true,
fieldName,
type,
isSearchable,
true,
fieldMappingIndexMap.getValue(fieldMapping)
.toTypedArray(),
null, null, mapOf<String, Set<String>>()
null,
null,
mapOf<String, Set<String>>()
)
}

Expand Down Expand Up @@ -267,10 +273,15 @@ class FieldCapsFilter(
val fieldCaps = fields.getValue(field).getValue(type)
val rewrittenIndices = if (fieldCaps.indices() != null && fieldCaps.indices().isNotEmpty()) fieldCaps.indices() else indices
expandedResponse[field]!![type] = FieldCapabilities(
fieldCaps.name, fieldCaps.type, fieldCaps.isSearchable,
fieldCaps.name,
fieldCaps.type,
fieldCaps.isSearchable,
fieldCaps
.isAggregatable,
rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta()
rewrittenIndices,
fieldCaps.nonSearchableIndices(),
fieldCaps.nonAggregatableIndices(),
fieldCaps.meta()
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TransformIndexer(
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT
) {
millis, count ->
millis, count ->
backoffPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TransformSearchService(

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) {
millis, count ->
millis, count ->
backoffPolicy = BackoffPolicy.constantBackoff(millis, count)
}
}
Expand Down

0 comments on commit b05614f

Please sign in to comment.