Skip to content

Commit

Permalink
More consistent index template allow list behavior
Browse files Browse the repository at this point in the history
Index and index template matching behavior is filtered in the same
way, so when no allow list is supplied all templates that do not start
with '.' are included by default.

Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Dec 2, 2024
1 parent 5cd348b commit 4cffdd6
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 109 deletions.
2 changes: 1 addition & 1 deletion MetadataMigration/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ If your target cluster has basic auth enabled on it, you can supply those creden

### Allowlisting the templates and indices to migrate

By default, the tool has an empty allowlist for templates, meaning none will be migrated. In contrast, the default allowlist for indices is open, meaning all non-system indices (those not prefixed with `.`) will be migrated. You can tweak these allowlists with a comma-separated list of items you specifically with to migrate. If you specify an custom allowlist for the templates or indices, the default allowlist is disregarded and **only** the items you have in your allowlist will be moved.
By default, allowlist for indices and index templates is open, meaning all non-system indices (those not prefixed with `.`) will be migrated. You can tweak these allowlists with a comma-separated list of items you specifically with to migrate. If you specify an custom allowlist for the templates or indices, the default allowlist is disregarded and **only** the items you have in your allowlist will be moved.

```shell
./gradlew MetadataMigration:run --args='--snapshot-name reindex-from-snapshot --s3-local-dir /tmp/s3_files --s3-repo-uri s3://your-s3-uri --s3-region us-fake-1 --target-host http://hostname:9200 --index-allowlist Index1,.my_system_index,logs-2023 --index-template-allowlist logs_template --component-template-allowlist component2,component7'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer.ContainerVersion;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.commands.MigrationItemResult;
import org.opensearch.migrations.metadata.CreationResult;
Expand Down Expand Up @@ -163,12 +162,6 @@ private void metadataCommandOnClusters(

arguments.targetArgs.host = targetCluster.getUrl();

var dataFilterArgs = new DataFilterArgs();
dataFilterArgs.indexAllowlist = List.of();
dataFilterArgs.componentTemplateAllowlist = List.of(testData.compoTemplateName);
dataFilterArgs.indexTemplateAllowlist = List.of(testData.indexTemplateName);
arguments.dataFilterArgs = dataFilterArgs;

var targetClusterOperations = new ClusterOperations(targetCluster.getUrl());
targetClusterOperations.createDocument(testData.indexThatAlreadyExists, "doc77", "{}");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FilterScheme {
private FilterScheme() {}

public static Predicate<SnapshotRepo.Index> filterIndicesByAllowList(
List<String> indexAllowlist,
BiConsumer<String, Boolean> indexNameAcceptanceObserver
) {
return index -> {
public static Predicate<String> filterByAllowList(List<String> allowlist) {
return item -> {
boolean accepted;
if (indexAllowlist.isEmpty()) {
accepted = !index.getName().startsWith(".");
// By default allow all items except 'system' items that start with a period
if (allowlist == null || allowlist.isEmpty()) {
accepted = !item.startsWith(".");
} else {
accepted = indexAllowlist.contains(index.getName());
accepted = allowlist.contains(item);
}

indexNameAcceptanceObserver.accept(index.getName(), accepted);

return accepted;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class DataFilterArgs {

@Parameter(names = {
"--index-template-allowlist" }, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {
"--component-template-allowlist" }, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
+ " (e.g. 'posts_template1, posts_template2'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> componentTemplateAllowlist = List.of();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.models.GlobalMetadata;
import org.opensearch.migrations.metadata.CreationResult;
Expand Down Expand Up @@ -35,14 +36,13 @@ public GlobalMetadataCreatorResults create(
log.info("Setting Global Metadata");

var results = GlobalMetadataCreatorResults.builder();
GlobalMetadataData_OS_2_11 globalMetadata = new GlobalMetadataData_OS_2_11(root.toObjectNode());
results.legacyTemplates(createLegacyTemplates(globalMetadata, mode, context));
results.componentTemplates(createComponentTemplates(globalMetadata, mode, context));
results.indexTemplates(createIndexTemplates(globalMetadata, mode, context));
results.legacyTemplates(createLegacyTemplates(root, mode, context));
results.componentTemplates(createComponentTemplates(root, mode, context));
results.indexTemplates(createIndexTemplates(root, mode, context));
return results.build();
}

public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createLegacyTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getTemplates(),
legacyTemplateAllowlist,
Expand All @@ -52,7 +52,7 @@ public List<CreationResult> createLegacyTemplates(GlobalMetadataData_OS_2_11 met
);
}

public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createComponentTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getComponentTemplates(),
componentTemplateAllowlist,
Expand All @@ -62,7 +62,7 @@ public List<CreationResult> createComponentTemplates(GlobalMetadataData_OS_2_11
);
}

public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 metadata, MigrationMode mode, IClusterMetadataContext context) {
public List<CreationResult> createIndexTemplates(GlobalMetadata metadata, MigrationMode mode, IClusterMetadataContext context) {
return createTemplates(
metadata.getIndexTemplates(),
indexTemplateAllowlist,
Expand All @@ -73,7 +73,7 @@ public List<CreationResult> createIndexTemplates(GlobalMetadataData_OS_2_11 meta
}

@AllArgsConstructor
private enum TemplateTypes {
enum TemplateTypes {
INDEX_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createIndexTemplate(name, body, context.createMigrateTemplateContext()),
(targetClient, name) -> targetClient.hasIndexTemplate(name)
Expand Down Expand Up @@ -118,49 +118,41 @@ private List<CreationResult> createTemplates(
return List.of();
}

if (templateAllowlist != null && templateAllowlist.isEmpty()) {
log.info("No {} in specified allowlist", templateType);
return List.of();
}
var templatesToCreate = getAllTemplates(templates, templateType);

var templatesToCreate = getTemplatesToCreate(templates, templateAllowlist, templateType);

return processTemplateCreation(templatesToCreate, templateType, mode, context);
return processTemplateCreation(templatesToCreate, templateType, templateAllowlist, mode, context);
}

private Map<String, ObjectNode> getTemplatesToCreate(ObjectNode templates, List<String> templateAllowlist, TemplateTypes templateType) {
Map<String, ObjectNode> getAllTemplates(ObjectNode templates, TemplateTypes templateType) {
var templatesToCreate = new HashMap<String, ObjectNode>();

if (templateAllowlist != null) {
for (String templateName : templateAllowlist) {
if (!templates.has(templateName) || templates.get(templateName) == null) {
log.warn("{} not found: {}", templateType, templateName);
continue;
}
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
}
} else {
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});
}
templates.fieldNames().forEachRemaining(templateName -> {
ObjectNode settings = (ObjectNode) templates.get(templateName);
templatesToCreate.put(templateName, settings);
});

return templatesToCreate;
}

private List<CreationResult> processTemplateCreation(
Map<String, ObjectNode> templatesToCreate,
TemplateTypes templateType,
List<String> templateAllowList,
MigrationMode mode,
IClusterMetadataContext context
) {
var skipCreation = FilterScheme.filterByAllowList(templateAllowList).negate();

List<CreationResult> templateList = new ArrayList<>();

templatesToCreate.forEach((templateName, templateBody) -> {
return templatesToCreate.entrySet().stream().map((kvp) -> {
var templateName = kvp.getKey();
var templateBody = kvp.getValue();
var creationResult = CreationResult.builder().name(templateName);

if (skipCreation.test(templateName)) {
log.atInfo().setMessage("Template {} was skipped due to allow list filter {}").addArgument(templateName).addArgument(templateAllowList).log();
return creationResult.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER).build();
}

log.info("Creating {}: {}", templateType, templateName);
try {
if (mode == MigrationMode.SIMULATE) {
Expand All @@ -179,9 +171,7 @@ private List<CreationResult> processTemplateCreation(
creationResult.failureType(CreationFailureType.TARGET_CLUSTER_FAILURE);
creationResult.exception(e);
}
templateList.add(creationResult.build());
});

return templateList;
return creationResult.build();
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.opensearch.migrations.bulkload.worker;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand All @@ -29,52 +27,46 @@ public class IndexRunner {

public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexContext context) {
var repoDataProvider = metadataFactory.getRepoDataProvider();
BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.atInfo().setMessage("Index {} rejected by allowlist").addArgument(indexName).log();
}
};
var results = IndexMetadataResults.builder();

// Set results for filtered items
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(Predicate.not(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)))
.forEach(index -> results.index(CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build()));

var skipCreation = FilterScheme.filterByAllowList(indexAllowlist).negate();

repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.forEach(index -> {
var indexName = index.getName();
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);

CreationResult indexResult = null;
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
indexResult = indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
indexResult = CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
CreationResult creationResult;
if (skipCreation.test(index.getName())) {
creationResult = CreationResult.builder()
.name(index.getName())
.failureType(CreationFailureType.SKIPPED_DUE_TO_FILTER)
.build();
} else {
creationResult = createIndex(index.getName(), mode, context);
}

var finalResult = indexResult;
results.index(finalResult);
results.index(creationResult);

var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
indexMetadata.getAliases().fieldNames().forEachRemaining(alias -> {
var aliasResult = CreationResult.builder().name(alias);
aliasResult.failureType(finalResult.getFailureType());
aliasResult.failureType(creationResult.getFailureType());
results.alias(aliasResult.build());
});
});
return results.build();
}

private CreationResult createIndex(String indexName, MigrationMode mode, ICreateIndexContext context) {
var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName);
var indexMetadata = originalIndexMetadata.deepCopy();
try {
indexMetadata = transformer.transformIndexMetadata(indexMetadata);
return indexCreator.create(indexMetadata, mode, context);
} catch (Throwable t) {
return CreationResult.builder()
.name(indexName)
.exception(new IndexTransformationException(indexName, t))
.failureType(CreationFailureType.UNABLE_TO_TRANSFORM_FAILURE)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;

import org.opensearch.migrations.bulkload.common.FilterScheme;
Expand Down Expand Up @@ -99,24 +98,37 @@ private static void prepareShardWorkItems(
List<String> indexAllowlist,
IDocumentMigrationContexts.IShardSetupAttemptContext context
) {
log.info("Setting up the Documents Work Items...");
log.atInfo()
.setMessage("Setting up the Documents Work Items...")
.log();
SnapshotRepo.Provider repoDataProvider = metadataFactory.getRepoDataProvider();

BiConsumer<String, Boolean> logger = (indexName, accepted) -> {
if (Boolean.FALSE.equals(accepted)) {
log.info("Index " + indexName + " rejected by allowlist");
}
};
var allowedIndexes = FilterScheme.filterByAllowList(indexAllowlist);
repoDataProvider.getIndicesInSnapshot(snapshotName)
.stream()
.filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))
.filter(index -> {
var accepted = allowedIndexes.test(index.getName());
if (!accepted) {
log.atInfo()
.setMessage("None of the documents in index {} will be reindexed, it was not included in the allowlist: {} ")
.addArgument(index.getName())
.addArgument(indexAllowlist)
.log();
}
return accepted;
})
.forEach(index -> {
IndexMetadata indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName());
log.info("Index " + indexMetadata.getName() + " has " + indexMetadata.getNumberOfShards() + " shards");
log.atInfo()
.setMessage("Index {} has {} shards")
.addArgument(indexMetadata.getName())
.addArgument(indexMetadata.getNumberOfShards())
.log();
IntStream.range(0, indexMetadata.getNumberOfShards()).forEach(shardId -> {
log.info(
"Creating Documents Work Item for index: " + indexMetadata.getName() + ", shard: " + shardId
);
log.atInfo()
.setMessage("Creating Documents Work Item for index: {}, shard: {}")
.addArgument(indexMetadata.getName())
.addArgument(shardId)
.log();
try (var shardSetupContext = context.createShardWorkItemContext()) {
workCoordinator.createUnassignedWorkItem(
IndexAndShardCursor.formatAsWorkItemString(indexMetadata.getName(), shardId),
Expand All @@ -128,6 +140,8 @@ private static void prepareShardWorkItems(
});
});

log.info("Finished setting up the Documents Work Items.");
log.atInfo()
.setMessage("Finished setting up the Documents Work Items.")
.log();
}
}
Loading

0 comments on commit 4cffdd6

Please sign in to comment.