Skip to content

Commit

Permalink
Support copy alias in rollover (#892)
Browse files Browse the repository at this point in the history
* Support copy alias in rollover

Signed-off-by: bowenlan-amzn <[email protected]>

* Add more tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Add bwc test

Signed-off-by: bowenlan-amzn <[email protected]>

* Enhance tests

Signed-off-by: bowenlan-amzn <[email protected]>

* Improve not acknowledge call scenario

Signed-off-by: bowenlan-amzn <[email protected]>

---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored Aug 17, 2023
1 parent 841303f commit 1923003
Show file tree
Hide file tree
Showing 17 changed files with 770 additions and 101 deletions.
76 changes: 36 additions & 40 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -613,32 +613,44 @@ task integTestRemote(type: RestIntegTestTask) {

// === Set up BWC tests ===

String bwcMinVersion = "1.13.2.0"
String bwcJobSchedulerVersion = "1.13.0.0"
String bwcMinVersion = "2.6.0.0"
String bwcBundleVersion = "1.3.2.0"
Boolean bwcBundleTest = (project.findProperty('customDistributionDownloadType') != null &&
project.properties['customDistributionDownloadType'] == "bundle");
String bwcVersion = bwcBundleTest ? bwcBundleVersion : bwcMinVersion
String bwcCurrentVersion = opensearch_version.replace("-SNAPSHOT", "")
String baseName = "indexmanagementBwcCluster"
String bwcFilePath = "src/test/resources/bwc/"
String bwc_js_resource_location = bwcFilePath + "job-scheduler/" + bwcJobSchedulerVersion
String bwc_im_resource_location = bwcFilePath + "indexmanagement/" + bwcVersion

// Downloads the bwc job scheduler version
String bwc_js_download_url = "https://github.com/opendistro-for-elasticsearch/job-scheduler/releases/download/v" +
bwcJobSchedulerVersion + "/job-scheduler-artifacts.zip"

// Downloads the bwc index management version
String bwc_im_download_url = "https://github.com/opendistro-for-elasticsearch/index-management/releases/download/v" +
bwcMinVersion + "/index-management-artifacts.zip"
getPluginResource(bwc_im_resource_location, bwc_im_download_url)

configurations {
bwcZip
}
dependencies {
bwcZip "org.opensearch.plugin:opensearch-job-scheduler:${bwcMinVersion}-SNAPSHOT@zip"
bwcZip "org.opensearch.plugin:opensearch-index-management:${bwcMinVersion}-SNAPSHOT@zip"
}
ext.resolvebwcZipFile = { pluginId ->
return new Callable<RegularFile>() {
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.bwcZip.resolvedConfiguration.resolvedArtifacts
.find { ResolvedArtifact f ->
f.name.startsWith(pluginId)
}
.file
}
}
}
}
}
Integer bwcNumNodes = 3
2.times {i ->
testClusters {
"${baseName}$i"{
testDistribution = "ARCHIVE"
numberOfNodes = 3
numberOfNodes = bwcNumNodes
if (bwcBundleTest) {
versions = [
"1.3.2", bwcCurrentVersion
Expand Down Expand Up @@ -686,31 +698,10 @@ getPluginResource(bwc_im_resource_location, bwc_im_download_url)
}
} else {
versions = [
"7.10.2", opensearch_version
"2.6.0-SNAPSHOT", opensearch_version
]
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_js_resource_location, bwc_js_download_url)
}
}
}
}))

plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return getPluginResource(bwc_im_resource_location, bwc_im_download_url)
}
}
}
}))
plugin(provider(resolvebwcZipFile("opensearch-job-scheduler")))
plugin(provider(resolvebwcZipFile("opensearch-index-management")))
}

setting 'path.repo',
Expand Down Expand Up @@ -748,6 +739,7 @@ task prepareBwcTests {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}
}

Expand All @@ -772,6 +764,7 @@ task "${baseName}#oneThirdsUpgradeCluster"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
Expand All @@ -792,6 +785,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
Expand All @@ -812,6 +806,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// Upgrade all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
Expand All @@ -830,14 +825,15 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}")
systemProperty 'tests.security.manager', 'false'
systemProperty 'cluster.number_of_nodes', "${bwcNumNodes}"
}

// A bwc test suite which runs all the bwc tasks combined
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
// TODO refactor bwc test #677
// dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask")
dependsOn tasks.named("${baseName}#fullRestartClusterTask")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ data class ManagedIndexMetaData(
val info: Map<String, Any>?,
val id: String = NO_ID,
val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
val rolledOverIndexName: String? = null,
) : Writeable, ToXContentFragment {

@Suppress("ComplexMethod")
Expand All @@ -51,6 +52,7 @@ data class ManagedIndexMetaData(
if (policyPrimaryTerm != null) resultMap[POLICY_PRIMARY_TERM] = policyPrimaryTerm.toString()
if (policyCompleted != null) resultMap[POLICY_COMPLETED] = policyCompleted.toString()
if (rolledOver != null) resultMap[ROLLED_OVER] = rolledOver.toString()
if (rolledOverIndexName != null) resultMap[ROLLED_OVER_INDEX_NAME] = rolledOverIndexName
if (indexCreationDate != null) resultMap[INDEX_CREATION_DATE] = indexCreationDate.toString()
if (transitionTo != null) resultMap[TRANSITION_TO] = transitionTo
if (stateMetaData != null) resultMap[StateMetaData.STATE] = stateMetaData.getMapValueString()
Expand All @@ -76,6 +78,7 @@ data class ManagedIndexMetaData(
.field(POLICY_PRIMARY_TERM, policyPrimaryTerm)
.field(POLICY_COMPLETED, policyCompleted)
.field(ROLLED_OVER, rolledOver)
.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
.field(INDEX_CREATION_DATE, indexCreationDate)
.field(TRANSITION_TO, transitionTo)
.addObject(StateMetaData.STATE, stateMetaData, params, true)
Expand Down Expand Up @@ -110,6 +113,7 @@ data class ManagedIndexMetaData(
// Only show rolled_over if we have rolled over or we are in the rollover action
if (rolledOver == true || (actionMetaData != null && actionMetaData.name == "rollover")) {
builder.field(ROLLED_OVER, rolledOver)
if (rolledOverIndexName != null) builder.field(ROLLED_OVER_INDEX_NAME, rolledOverIndexName)
}

if (indexCreationDate != null) builder.field(INDEX_CREATION_DATE, indexCreationDate)
Expand Down Expand Up @@ -142,6 +146,7 @@ data class ManagedIndexMetaData(
streamOutput.writeOptionalLong(policyPrimaryTerm)
streamOutput.writeOptionalBoolean(policyCompleted)
streamOutput.writeOptionalBoolean(rolledOver)
streamOutput.writeOptionalString(rolledOverIndexName)
streamOutput.writeOptionalLong(indexCreationDate)
streamOutput.writeOptionalString(transitionTo)

Expand Down Expand Up @@ -172,6 +177,7 @@ data class ManagedIndexMetaData(
const val POLICY_PRIMARY_TERM = "policy_primary_term"
const val POLICY_COMPLETED = "policy_completed"
const val ROLLED_OVER = "rolled_over"
const val ROLLED_OVER_INDEX_NAME = "rolled_over_index_name"
const val INDEX_CREATION_DATE = "index_creation_date"
const val TRANSITION_TO = "transition_to"
const val INFO = "info"
Expand All @@ -185,6 +191,7 @@ data class ManagedIndexMetaData(
val policyPrimaryTerm: Long? = si.readOptionalLong()
val policyCompleted: Boolean? = si.readOptionalBoolean()
val rolledOver: Boolean? = si.readOptionalBoolean()
val rolledOverIndexName: String? = si.readOptionalString()
val indexCreationDate: Long? = si.readOptionalLong()
val transitionTo: String? = si.readOptionalString()

Expand All @@ -207,6 +214,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
Expand Down Expand Up @@ -234,6 +242,7 @@ data class ManagedIndexMetaData(
var policyPrimaryTerm: Long? = null
var policyCompleted: Boolean? = null
var rolledOver: Boolean? = null
var rolledOverIndexName: String? = null
var indexCreationDate: Long? = null
var transitionTo: String? = null

Expand All @@ -256,6 +265,7 @@ data class ManagedIndexMetaData(
POLICY_PRIMARY_TERM -> policyPrimaryTerm = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
POLICY_COMPLETED -> policyCompleted = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER -> rolledOver = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.booleanValue()
ROLLED_OVER_INDEX_NAME -> rolledOverIndexName = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
INDEX_CREATION_DATE -> indexCreationDate = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.longValue()
TRANSITION_TO -> transitionTo = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else xcp.text()
StateMetaData.STATE -> {
Expand All @@ -277,23 +287,24 @@ data class ManagedIndexMetaData(
}

return ManagedIndexMetaData(
requireNotNull(index) { "$INDEX is null" },
requireNotNull(indexUuid) { "$INDEX_UUID is null" },
requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo,
policyPrimaryTerm,
policyCompleted,
rolledOver,
indexCreationDate,
transitionTo,
state,
action,
step,
retryInfo,
info,
id,
seqNo,
primaryTerm
index = requireNotNull(index) { "$INDEX is null" },
indexUuid = requireNotNull(indexUuid) { "$INDEX_UUID is null" },
policyID = requireNotNull(policyID) { "$POLICY_ID is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policyCompleted = policyCompleted,
rolledOver = rolledOver,
rolledOverIndexName = rolledOverIndexName,
indexCreationDate = indexCreationDate,
transitionTo = transitionTo,
stateMetaData = state,
actionMetaData = action,
stepMetaData = step,
policyRetryInfo = retryInfo,
info = info,
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
)
}

Expand Down Expand Up @@ -323,6 +334,7 @@ data class ManagedIndexMetaData(
policyPrimaryTerm = map[POLICY_PRIMARY_TERM]?.toLong(),
policyCompleted = map[POLICY_COMPLETED]?.toBoolean(),
rolledOver = map[ROLLED_OVER]?.toBoolean(),
rolledOverIndexName = map[ROLLED_OVER_INDEX_NAME],
indexCreationDate = map[INDEX_CREATION_DATE]?.toLong(),
transitionTo = map[TRANSITION_TO],
stateMetaData = StateMetaData.fromManagedIndexMetaDataMap(map),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class RolloverAction(
val minDocs: Long?,
val minAge: TimeValue?,
val minPrimaryShardSize: ByteSizeValue?,
val copyAlias: Boolean = false,
index: Int
) : Action(name, index) {

Expand Down Expand Up @@ -47,6 +48,7 @@ class RolloverAction(
if (minDocs != null) builder.field(MIN_DOC_COUNT_FIELD, minDocs)
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
builder.field(COPY_ALIAS_FIELD, copyAlias)
builder.endObject()
}

Expand All @@ -55,6 +57,7 @@ class RolloverAction(
out.writeOptionalLong(minDocs)
out.writeOptionalTimeValue(minAge)
out.writeOptionalWriteable(minPrimaryShardSize)
out.writeBoolean(copyAlias)
out.writeInt(actionIndex)
}

Expand All @@ -64,5 +67,6 @@ class RolloverAction(
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
const val COPY_ALIAS_FIELD = "copy_alias"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ class RolloverActionParser : ActionParser() {
val minDocs = sin.readOptionalLong()
val minAge = sin.readOptionalTimeValue()
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
val copyAlias = sin.readBoolean()
val index = sin.readInt()

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun fromXContent(xcp: XContentParser, index: Int): Action {
var minSize: ByteSizeValue? = null
var minDocs: Long? = null
var minAge: TimeValue? = null
var minPrimaryShardSize: ByteSizeValue? = null
var copyAlias = false

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -44,11 +46,12 @@ class RolloverActionParser : ActionParser() {
RolloverAction
.MIN_PRIMARY_SHARD_SIZE_FIELD
)
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
}
}

return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, index)
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
}

override fun getActionType(): String {
Expand Down
Loading

0 comments on commit 1923003

Please sign in to comment.