Skip to content

Commit

Permalink
More consistant index template allow list behavior
Browse files Browse the repository at this point in the history
Index and index template matching behavoir is filtered in the same
way, so when no allow list is supplied all templates that do not start
with '.' are included by default.

Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Dec 2, 2024
1 parent 5cd348b commit 520e4ed
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer.ContainerVersion;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.commands.MigrationItemResult;
import org.opensearch.migrations.metadata.CreationResult;
Expand Down Expand Up @@ -163,12 +162,6 @@ private void metadataCommandOnClusters(

arguments.targetArgs.host = targetCluster.getUrl();

var dataFilterArgs = new DataFilterArgs();
dataFilterArgs.indexAllowlist = List.of();
dataFilterArgs.componentTemplateAllowlist = List.of(testData.compoTemplateName);
dataFilterArgs.indexTemplateAllowlist = List.of(testData.indexTemplateName);
arguments.dataFilterArgs = dataFilterArgs;

var targetClusterOperations = new ClusterOperations(targetCluster.getUrl());
targetClusterOperations.createDocument(testData.indexThatAlreadyExists, "doc77", "{}");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FilterScheme {
private FilterScheme() {}

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(
List<String> indexAllowlist,
BiConsumer<String, Boolean> indexNameAcceptanceObserver
) {
return index -> {
public static Predicate<String> filterByAllowList(List<String> allowlist) {
return item -> {
boolean accepted;
if (indexAllowlist.isEmpty()) {
accepted = !index.getName().startsWith(".");
// By default allow all items except 'system' items that start with a period
if (allowlist == null || allowlist.isEmpty()) {
accepted = !item.startsWith(".");
} else {
accepted = indexAllowlist.contains(index.getName());
accepted = allowlist.contains(item);
}

indexNameAcceptanceObserver.accept(index.getName(), accepted);

return accepted;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class DataFilterArgs {

@Parameter(names = {
"--index-template-allowlist" }, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {
"--component-template-allowlist" }, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_template1, posts_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> componentTemplateAllowlist = List.of();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.metadata.CreationResult;
Expand Down Expand Up @@ -35,14 +36,13 @@ public GlobalMetadataCreatorResults create(
log.info("Setting Global Metadata");

var results = GlobalMetadataCreatorResults.builder();
GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root.toObjectNode());
results.legacyTemplates(createLegacyTemplates(globalMetadata, mode, context));
results.componentTemplates(createComponentTemplates(globalMetadata, mode, context));
results.indexTemplates(createIndexTemplates(globalMetadata, mode, context));
results.legacyTemplates(createLegacyTemplates(root, mode, context));
results.componentTemplates(createComponentTemplates(root, mode, context));
results.indexTemplates(createIndexTemplates(root, mode, context));
return results.build();
}

public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createLegacyTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getTemplates(),
legacyTemplateAllowlist,
Expand All @@ -52,7 +52,7 @@ public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 met
);
}

public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createComponentTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getComponentTemplates(),
componentTemplateAllowlist,
Expand All @@ -62,7 +62,7 @@ public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11
);
}

public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createIndexTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getIndexTemplates(),
indexTemplateAllowlist,
Expand All @@ -73,7 +73,7 @@ public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 meta
}

@AllArgsConstructor
private enum TemplateTypes {
enum TemplateTypes {
INDEX_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createIndexTemplate(name, body, context.createMigrateTemplateContext()),
(targetClient, name) -> targetClient.hasIndexTemplate(name)
Expand Down Expand Up @@ -118,49 +118,41 @@ private List<CreationResult> createTemplates(
return List.of();
}

if (templateAllowlist != null && templateAllowlist.isEmpty()) {
log.info("No {} in specified allowlist", templateType);
return List.of();
}
var templatesToCreate = getAllTemplates(templates, templateType);

var templatesToCreate = getTemplatesToCreate(templates, templateAllowlist, templateType);

return processTemplateCreation(templatesToCreate, templateType, mode, context);
return processTemplateCreation(templatesToCreate, templateType, templateAllowlist, mode, context);
}

private Map<String, ObjectNode> getTemplatesToCreate(ObjectNode templates, List<String> templateAllowlist, TemplateTypes templateType) {
Map<String, ObjectNode> getAllTemplates(ObjectNode templates, TemplateTypes templateType) {
var templatesToCreate = new HashMap<String, ObjectNode>();

if (templateAllowlist != null) {
for (String templateName : templateAllowlist) {
if (!templates.has(templateName) || templates.get(templateName) == null) {
log.warn("{} not found: {}", templateType, templateName);
continue;
}
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
}
} else {
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});
}
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});

return templatesToCreate;
}

private List<CreationResult> processTemplateCreation(
Map<String, ObjectNode> templatesToCreate,
TemplateTypes templateType,
List<String> templateAllowList,
MigrationMode mode,
IClusterMetadataContext context
) {
var skipCreation = FilterScheme.filterByAllowList(templateAllowList).negate();

List<CreationResult> templateList = new ArrayList<>();

templatesToCreate.forEach((templateName, templateBody) -> {
return templatesToCreate.entrySet().stream().map((kvp) -> {
var templateName = kvp.getKey();
var templateBody = kvp.getValue();
var creationResult = CreationResult.builder().name(templateName);

if (skipCreation.test(templateName)) {
log.atInfo().setMessage("Template {} was skipped due to allow list filter {}").addArgument(templateName).addArgument(templateAllowList).log();
return creationResult.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER).build();
}

log.info("Creating {}: {}", templateType, templateName);
try {
if (mode == MigrationMode.SIMULATE) {
Expand All @@ -179,9 +171,7 @@ private List<CreationResult> processTemplateCreation(
creationResult.failureType(CreationFailureType.TARGET_CLUSTER_FAILURE);
creationResult.exception(e);
}
templateList.add(creationResult.build());
});

return templateList;
return creationResult.build();
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand All @@ -29,52 +27,46 @@ public class IndexRunner {

public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexContext context) {
var repoDataProvider = metadataFactory.getRepoDataProvider();
BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.atInfo().setMessage("Index {} rejected by allowlist").addArgument(indexName).log();
}
};
var results = IndexMetadataResults.builder();

// Set results for filtered items
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(Predicate.not(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)))
.forEach(index -> results.index(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build()));

var skipCreation = FilterScheme.filterByAllowList(indexAllowlist).negate();

repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.forEach(index -> {
var indexName = index.getName();
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);

CreationResult indexResult = null;
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
indexResult = indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
indexResult = CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
CreationResult creationResult;
if (skipCreation.test(index.getName())) {
creationResult = CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
} else {
creationResult = createIndex(index.getName(), mode, context);
}

var finalResult = indexResult;
results.index(finalResult);
results.index(creationResult);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(finalResult.getFailureType());
aliasResult.failureType(creationResult.getFailureType());
results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
return CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;

import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand Down Expand Up @@ -99,24 +98,37 @@ private static void prepareShardWorkItems(
List<String> indexAllowlist,
IDocumentMigrationContexts.IShardSetupAttemptContext context
) {
log.info("Setting up the Documents Work Items...");
log.atInfo()
.setMessage("Setting up the Documents Work Items...")
.log();
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.info("Index " + indexName + " rejected by allowlist");
}
};
var allowedIndexes = FilterScheme.filterByAllowList(indexAllowlist);
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.filter(index -> {
var accepted = allowedIndexes.test(index.getName());
if (!accepted) {
log.atInfo()
.setMessage("None of the documents in index {} will be reindexed, it was not included in the allowlist: {} ")
.addArgument(index.getName())
.addArgument(indexAllowlist)
.log();
}
return accepted;
})
.forEach(index -> {
IndexMetadata indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards");
log.atInfo()
.setMessage("Index {} has {} shards")
.addArgument(indexMetadata.getName())
.addArgument(indexMetadata.getNumberOfShards())
.log();
IntStream.range(0, indexMetadata.getNumberOfShards()).forEach(shardId -> {
log.info(
"Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId
);
log.atInfo()
.setMessage("Creating Documents Work Item for index: {}, shard: {}")
.addArgument(indexMetadata.getName())
.addArgument(shardId)
.log();
try (var shardSetupContext = context.createShardWorkItemContext()) {
workCoordinator.createUnassignedWorkItem(
IndexAndShardCursor.formatAsWorkItemString(indexMetadata.getName(), shardId),
Expand All @@ -128,6 +140,8 @@ private static void prepareShardWorkItems(
});
});

log.info("Finished setting up the Documents Work Items.");
log.atInfo()
.setMessage("Finished setting up the Documents Work Items.")
.log();
}
}
Loading

0 comments on commit 520e4ed

Please sign in to comment.