Skip to content

Commit

Permalink
Split retrieval of Destination configs in DestinationMigrationUtilSer…
Browse files Browse the repository at this point in the history
…vice

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi committed Apr 17, 2022
1 parent e76d4fd commit a997d40
Showing 1 changed file with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,25 @@ class DestinationMigrationUtilService {
try {
runningLock = true

val destinationsToMigrate = retrieveDestinationsToMigrate(client)
logger.info("Need to migrate ${destinationsToMigrate.size} destinations")
if (destinationsToMigrate.isEmpty()) {
val emailAccountsToMigrate = retrieveConfigsToMigrate(client, "email_account")
val emailGroupsToMigrate = retrieveConfigsToMigrate(client, "email_group")
val destinationsToMigrate = retrieveConfigsToMigrate(client, "destination")
val configsToMigrate = emailAccountsToMigrate + emailGroupsToMigrate + destinationsToMigrate
logger.info(
"Need to migrate ${emailAccountsToMigrate.size} email accounts, " +
"${emailGroupsToMigrate.size} email groups and " +
"${destinationsToMigrate.size} destinations " +
"(${configsToMigrate.size} configs total)"
)
if (configsToMigrate.isEmpty()) {
finishFlag = true
runningLock = false
return
}
val migratedDestinations = createNotificationChannelIfNotExists(client, destinationsToMigrate)
logger.info("Migrated ${migratedDestinations.size} destinations")
val failedDeletedDestinations = deleteOldDestinations(client, migratedDestinations)
logger.info("Failed to delete ${failedDeletedDestinations.size} destinations from migration process cleanup")
val migratedConfigs = createNotificationChannelIfNotExists(client, configsToMigrate)
logger.info("Migrated ${migratedConfigs.size} configs")
val failedDeleteConfigs = deleteOldDestinations(client, migratedConfigs)
logger.info("Failed to delete ${failedDeleteConfigs.size} configs from migration process cleanup")
} finally {
runningLock = false
}
Expand Down Expand Up @@ -136,7 +144,7 @@ class DestinationMigrationUtilService {
return migratedNotificationConfigs
}

private suspend fun retrieveDestinationsToMigrate(client: NodeClient): List<Pair<NotificationConfigInfo, String>> {
private suspend fun retrieveConfigsToMigrate(client: NodeClient, configName: String): List<Pair<NotificationConfigInfo, String>> {
var start = 0
val size = 100
val notificationConfigInfoList = mutableListOf<Pair<NotificationConfigInfo, String>>()
Expand All @@ -150,9 +158,7 @@ class DestinationMigrationUtilService {
.seqNoAndPrimaryTerm(true)
.version(true)
val queryBuilder = QueryBuilders.boolQuery()
.should(QueryBuilders.existsQuery("email_account"))
.should(QueryBuilders.existsQuery("email_group"))
.should(QueryBuilders.existsQuery("destination"))
.should(QueryBuilders.existsQuery(configName))
searchSourceBuilder.query(queryBuilder)

val searchRequest = SearchRequest()
Expand All @@ -161,7 +167,7 @@ class DestinationMigrationUtilService {
val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }

if (response.status() != RestStatus.OK) {
logger.error("Failed to retrieve destinations to migrate")
logger.error("Failed to retrieve ${configName}s to migrate")
hasMoreResults = false
} else {
if (response.hits.hits.isEmpty()) {
Expand All @@ -170,18 +176,18 @@ class DestinationMigrationUtilService {
for (hit in response.hits) {
val xcp = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString)
var notificationConfig: NotificationConfig?
var notificationConfig: NotificationConfig? = null
var userStr = ""
when {
hit.sourceAsString.contains("\"email_group\"") -> {
when (configName) {
"email_group" -> {
val emailGroup = EmailGroup.parseWithType(xcp, hit.id, hit.version)
notificationConfig = convertEmailGroupToNotificationConfig(emailGroup)
}
hit.sourceAsString.contains("\"email_account\"") -> {
"email_account" -> {
val emailAccount = EmailAccount.parseWithType(xcp, hit.id, hit.version)
notificationConfig = convertEmailAccountToNotificationConfig(emailAccount)
}
else -> {
"destination" -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
Expand All @@ -195,6 +201,7 @@ class DestinationMigrationUtilService {
userStr = destination.user.toString()
notificationConfig = convertDestinationToNotificationConfig(destination)
}
else -> logger.info("Unrecognized config name [$configName] to migrate")
}

if (notificationConfig != null)
Expand Down

0 comments on commit a997d40

Please sign in to comment.