Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport from main to 2.x #614

Merged
merged 2 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/bwc-test-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Backward compatibility test workflow
on:
pull_request:
branches:
- "*"
push:
branches:
- "*"

jobs:
test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run IM Backwards Compatibility Tests
run: |
echo "Running backwards compatibility tests..."
./gradlew bwcTestSuite
- name: Upload failed logs
uses: actions/upload-artifact@v2
if: failure()
with:
name: logs
path: build/testclusters/indexmanagementBwcCluster*/logs/*
1 change: 1 addition & 0 deletions .github/workflows/create-documentation-issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: Create Documentation Issue
on:
pull_request:
types:
- closed
- labeled
env:
PR_NUMBER: ${{ github.event.number }}
Expand Down
18 changes: 0 additions & 18 deletions .github/workflows/dco.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
branches: [ main ]

jobs:
linkchecker:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
25 changes: 3 additions & 22 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ on:
- "*"

jobs:
build:
# Job name
name: Build Index Management
test:
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
- name: Set Up JDK
uses: actions/setup-java@v1
with:
java-version: 11
java-version: 17
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
Expand All @@ -31,20 +29,3 @@ jobs:
with:
name: logs
path: build/testclusters/integTest-*/logs/*
bwc:
name: Run Index Management Backwards Compatibility Tests
# This job runs on Linux
runs-on: ubuntu-latest
steps:
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
# index-management
- name: Checkout Branch
uses: actions/checkout@v2
- name: Run IM Backwards Compatibility Tests
run: |
echo "Running backwards compatibility tests..."
./gradlew bwcTestSuite
2 changes: 0 additions & 2 deletions .github/workflows/test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ on:

jobs:
build:
# Job name
name: Build Index Management
env:
BUILD_ARGS: ${{ matrix.os_build_args }}
WORKING_DIR: ${{ matrix.working_directory }}.
Expand Down
15 changes: 13 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,24 @@ task ktlint(type: JavaExec, group: "verification") {
classpath = configurations.ktlint
args "src/**/*.kt", "spi/src/main/**/*.kt"
}

check.dependsOn ktlint

task ktlintFormat(type: JavaExec, group: "formatting") {
description = "Fix Kotlin code style deviations."
main = "com.pinterest.ktlint.Main"
classpath = configurations.ktlint
args "-F", "src/**/*.kt", "spi/src/main/**/*.kt"
// https://github.com/pinterest/ktlint/issues/1391
jvmArgs "--add-opens=java.base/java.lang=ALL-UNNAMED"
}

detekt {
config = files("detekt.yml")
buildUponDefaultConfig = true
}
// When writing detekt Gradle first finds the extension with this name,
// but with a string it should look for a task with that name instead
check.dependsOn "detekt"

configurations.testImplementation {
exclude module: "securemock"
Expand Down Expand Up @@ -656,7 +660,14 @@ run {
}
}

compileKotlin { kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict'] }
compileKotlin {
kotlinOptions.freeCompilerArgs = ['-Xjsr305=strict']
kotlinOptions.allWarningsAsErrors = true
}

compileTestKotlin {
kotlinOptions.allWarningsAsErrors = true
}

apply from: 'build-tools/pkgbuild.gradle'

Expand Down
5 changes: 3 additions & 2 deletions detekt.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# TODO: Remove this before initial release, only for developmental purposes
build:
maxIssues: 20
maxIssues: 0

exceptions:
TooGenericExceptionCaught:
Expand All @@ -14,6 +13,8 @@ style:
MaxLineLength:
maxLineLength: 150
excludes: ['**/test/**']
FunctionOnlyReturningConstant:
active: false

complexity:
LargeClass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.LocalNodeMasterListener
import org.opensearch.cluster.LocalNodeClusterManagerListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
Expand Down Expand Up @@ -47,7 +47,7 @@ class IndexStateManagementHistory(
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
private val indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener {
) : LocalNodeClusterManagerListener {

private val logger = LogManager.getLogger(javaClass)
private var scheduledRollover: Scheduler.Cancellable? = null
Expand All @@ -61,7 +61,7 @@ class IndexStateManagementHistory(
@Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings)

init {
clusterService.addLocalNodeMasterListener(this)
clusterService.addLocalNodeClusterManagerListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_ENABLED) {
historyEnabled = it
}
Expand All @@ -82,7 +82,7 @@ class IndexStateManagementHistory(
}
}

override fun onMaster() {
override fun onClusterManager() {
try {
// try to rollover immediately as we might be restarting the cluster
if (historyEnabled) rolloverHistoryIndex()
Expand All @@ -97,12 +97,12 @@ class IndexStateManagementHistory(
}
}

override fun offMaster() {
override fun offClusterManager() {
scheduledRollover?.cancel()
}

private fun rescheduleRollover() {
if (clusterService.state().nodes.isLocalNodeElectedMaster) {
if (clusterService.state().nodes.isLocalNodeElectedClusterManager) {
scheduledRollover?.cancel()
scheduledRollover = threadPool.scheduleWithFixedDelay(
{ rolloverAndDeleteHistoryIndex() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ class ManagedIndexCoordinator(
}
clusterService.clusterSettings.addSettingsUpdateConsumer(METADATA_SERVICE_STATUS) {
metadataServiceEnabled = it == 0
if (!metadataServiceEnabled) scheduledMoveMetadata?.cancel()
else initMoveMetadata()
if (!metadataServiceEnabled) {
logger.info("Canceling metadata moving job because of cluster setting update.")
scheduledMoveMetadata?.cancel()
} else initMoveMetadata()
}
clusterService.clusterSettings.addSettingsUpdateConsumer(TEMPLATE_MIGRATION_CONTROL) {
templateMigrationEnabled = it >= 0L
Expand Down Expand Up @@ -202,8 +204,8 @@ class ManagedIndexCoordinator(
// Instead of using a LocalNodeMasterListener to track cluster manager changes, this service will
// track them here to avoid conditions where cluster manager listener events run after other
// listeners that depend on what happened in the cluster manager listener
if (this.isClusterManager != event.localNodeMaster()) {
this.isClusterManager = event.localNodeMaster()
if (this.isClusterManager != event.localNodeClusterManager()) {
this.isClusterManager = event.localNodeClusterManager()
if (this.isClusterManager) {
onClusterManager()
} else {
Expand All @@ -215,7 +217,7 @@ class ManagedIndexCoordinator(

if (event.isNewCluster) return

if (!event.localNodeMaster()) return
if (!event.localNodeClusterManager()) return

if (!event.metadataChanged()) return

Expand Down Expand Up @@ -380,7 +382,7 @@ class ManagedIndexCoordinator(
}

/**
* Find a policy that has highest priority ism template with matching index pattern to the index and is created before index creation date. If
* Find a policy that has the highest priority ism template with matching index pattern to the index and is created before index creation date. If
* the policy has user, ensure that the user can manage the index if not find the one that can.
* */
private suspend fun findMatchingPolicy(indexName: String, creationDate: Long, policies: List<Policy>): Policy? {
Expand Down Expand Up @@ -422,7 +424,7 @@ class ManagedIndexCoordinator(
try {
val request = ManagedIndexRequest().indices(indexName)
withClosableContext(IndexManagementSecurityContext("ApplyPolicyOnIndexCreation", settings, threadPool.threadContext, policy.user)) {
val response: AcknowledgedResponse = client.suspendUntil { execute(ManagedIndexAction.INSTANCE, request, it) }
client.suspendUntil<Client, AcknowledgedResponse> { execute(ManagedIndexAction.INSTANCE, request, it) }
}
} catch (e: OpenSearchSecurityException) {
logger.debug("Skipping applying policy ${policy.id} on $indexName as the policy user is missing permissions", e)
Expand Down Expand Up @@ -473,13 +475,13 @@ class ManagedIndexCoordinator(
// If ISM is disabled return early
if (!isIndexStateManagementEnabled()) return

// Do not setup background sweep if we're not the elected cluster manager node
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
// Do not set up background sweep if we're not the elected cluster manager node
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return

// Cancel existing background sweep
scheduledFullSweep?.cancel()

// Setup an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job
// Set up an anti-entropy/self-healing background sweep, in case we fail to create a ManagedIndexConfig job
val scheduledSweep = Runnable {
val elapsedTime = getFullSweepElapsedTime()

Expand All @@ -505,7 +507,7 @@ class ManagedIndexCoordinator(
fun initMoveMetadata() {
if (!metadataServiceEnabled) return
if (!isIndexStateManagementEnabled()) return
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return
scheduledMoveMetadata?.cancel()

if (metadataService.finishFlag) {
Expand Down Expand Up @@ -535,7 +537,7 @@ class ManagedIndexCoordinator(
fun initTemplateMigration(enableSetting: Long) {
if (!templateMigrationEnabled) return
if (!isIndexStateManagementEnabled()) return
if (!clusterService.state().nodes().isLocalNodeElectedMaster) return
if (!clusterService.state().nodes().isLocalNodeElectedClusterManager) return
scheduledTemplateMigration?.cancel()

// if service has finished, re-enable it
Expand Down Expand Up @@ -657,8 +659,7 @@ class ManagedIndexCoordinator(
if (scrollIDsToClear.isNotEmpty()) {
val clearScrollRequest = ClearScrollRequest()
clearScrollRequest.scrollIds(scrollIDsToClear.toList())
val clearScrollResponse: ClearScrollResponse =
client.suspendUntil { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
client.suspendUntil<Client, ClearScrollResponse> { execute(ClearScrollAction.INSTANCE, clearScrollRequest, it) }
}
}
return managedIndexUuids
Expand Down Expand Up @@ -693,7 +694,7 @@ class ManagedIndexCoordinator(
val mRes: MultiGetResponse = client.suspendUntil { multiGet(mReq, it) }
val responses = mRes.responses
if (responses.first().isFailed) {
// config index may not initialised yet
// config index may not initialise yet
logger.error("get managed-index failed: ${responses.first().failure.failure}")
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ object ManagedIndexRunner :
// the cluster state index uuid differs from the one in the managed index config then the config is referring
// to a different index which does not exist in the cluster. We need to check all of the extensions to confirm an index exists
if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) {
clusterStateIndexMetadata = null
// If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types
val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE }
val multiTypeIndexNameToMetaData =
Expand Down Expand Up @@ -387,7 +386,7 @@ object ManagedIndexRunner :
// If this action is not allowed and the step to be executed is the first step in the action then we will fail
// as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight
if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) {
val info = mapOf("message" to "Attempted to execute action=${action?.type} which is not allowed.")
val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.")
val updated = updateManagedIndexMetaData(
managedIndexMetaData.copy(
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
Expand All @@ -404,7 +403,13 @@ object ManagedIndexRunner :
@Suppress("ComplexCondition", "MaxLineLength")
if (updateResult.metadataSaved && state != null && action != null && step != null && currentActionMetaData != null) {
if (validationServiceEnabled) {
val validationResult = actionValidation.validate(action.type, stepContext.metadata.index)
val validationResult = withClosableContext(
IndexManagementSecurityContext(
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user
)
) {
actionValidation.validate(action.type, stepContext.metadata.index)
}
if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) {
logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name")
publishErrorNotification(policy, managedIndexMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ class MetadataService(

override fun onResponse(response: ClusterUpdateSettingsResponse) {
if (!response.isAcknowledged) {
logger.error("Update template migration setting to $status is not acknowledged")
logger.error("Update metadata migration setting to $status is not acknowledged")
throw IndexManagementException.wrap(
Exception("Update template migration setting to $status is not acknowledged")
Exception("Update metadata migration setting to $status is not acknowledged")
)
} else {
logger.info("Successfully update template migration setting to $status")
logger.info("Successfully metadata template migration setting to $status")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ data class ManagedIndexConfig(
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy?.copy(
id = policyID ?: NO_ID,
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
Expand Down
Loading