Skip to content

Commit

Permalink
Merge pull request opensearch-project#1189 from AndreKurait/MetadataM…
Browse files Browse the repository at this point in the history
…ultiType

Add MultiTypeMappingTransformation and test
  • Loading branch information
AndreKurait authored Dec 11, 2024
2 parents 6d3b225 + d276695 commit fdf32c2
Show file tree
Hide file tree
Showing 22 changed files with 829 additions and 318 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,44 @@ jobs:
with:
gradle-version: ${{ env.gradle-version }}
gradle-home-cache-cleanup: true
- name: Generate Cache Key from Dockerfiles
id: generate_cache_key
run: |
files=$(find . -type f \( -name 'docker-compose.yml' -o -name 'Dockerfile' \))
file_contents=$(cat $files)
key=$(echo "${file_contents}" | sha1sum | awk '{print $1}')
echo "key=${key}" >> "$GITHUB_OUTPUT"
- name: Cache Docker Images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ steps.generate_cache_key.outputs.key }}
- name: Pre pull images
run: |
pull_if_not_present() {
local image="$1"
if docker image inspect "$image" > /dev/null 2>&1; then
echo "Image '$image' already exists locally."
else
echo "Pulling image '$image'..."
docker pull "$image"
fi
}
images=(
"opensearchproject/opensearch:1.3.16"
"opensearchproject/opensearch:2.14.0"
"docker.elastic.co/elasticsearch/elasticsearch:7.17.22"
"docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2"
"docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.23"
"docker.elastic.co/elasticsearch/elasticsearch:5.6.16"
"httpd:alpine"
"confluentinc/cp-kafka:7.5.0"
"ghcr.io/shopify/toxiproxy:latest"
"amazonlinux:2023"
"alpine:3.16"
)
for image in "${images[@]}"; do
pull_if_not_present "$image"
done
- name: Run Gradle Build
run: ./gradlew build -x test -x TrafficCapture:dockerSolution:build -x spotlessCheck --stacktrace
env:
Expand Down Expand Up @@ -161,6 +199,7 @@ jobs:
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ steps.generate_cache_key.outputs.key }}
read-only: true
- name: Start Docker Solution
run: ./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test -x spotlessCheck --info --stacktrace
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ private void migrationDocumentsWithClusters(


sourceClusterOperations.createDocument(indexName, "222", "{\"author\":\"Tobias Funke\"}");
sourceClusterOperations.createDocument(indexName, "223", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "224", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "225", "{\"author\":\"Tobias Funke\", \"category\": \"tech\"}", "2");
sourceClusterOperations.createDocument(indexName, "223", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1", null);
sourceClusterOperations.createDocument(indexName, "224", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1", null);
sourceClusterOperations.createDocument(indexName, "225", "{\"author\":\"Tobias Funke\", \"category\": \"tech\"}", "2", null);

// === ACTION: Take a snapshot ===
var snapshotName = "my_snap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase {
public static Stream<Arguments> makeDocumentMigrationArgs() {
var numWorkersList = List.of(1, 3, 40);
var compressionEnabledList = List.of(true, false);
return SupportedClusters.targets().stream()
.flatMap(
targetImage -> numWorkersList.stream()
return numWorkersList.stream()
.flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of(
numWorkers,
targetImage,
SearchClusterContainer.OS_V2_14_0,
compression
))
)
);
);
}

@ParameterizedTest
Expand All @@ -70,8 +67,8 @@ public void testDocumentMigration(
var osTargetContainer = new SearchClusterContainer(targetVersion);
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> esSourceContainer.start(), executorService),
CompletableFuture.runAsync(() -> osTargetContainer.start(), executorService)
CompletableFuture.runAsync(esSourceContainer::start, executorService),
CompletableFuture.runAsync(osTargetContainer::start, executorService)
).join();

// Populate the source cluster with data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public class SourceTestBase {
public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;

protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) {
return new Object[] {
return new Object[]{
baseSourceImage,
GENERATOR_BASE_IMAGE,
new String[] { "/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/" } };
new String[]{"/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/"}};
}

@AllArgsConstructor
Expand Down Expand Up @@ -102,7 +102,7 @@ public static int migrateDocumentsSequentially(
Version version,
boolean compressionEnabled
) {
for (int runNumber = 1;; ++runNumber) {
for (int runNumber = 1; ; ++runNumber) {
try {
var workResult = migrateDocumentsWithOneWorker(
sourceRepo,
Expand Down Expand Up @@ -146,7 +146,8 @@ public Flux<RfsLuceneDocument> readDocuments(int startSegmentIndex, int startDoc
}
}

static class LeasePastError extends Error {}
static class LeasePastError extends Error {
}

@SneakyThrows
public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
Expand Down
1 change: 1 addition & 0 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.testcontainers', name: 'testcontainers'
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
}
Expand Down
8 changes: 4 additions & 4 deletions MetadataMigration/docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,17 @@ Migration Candidates:
Transformations:
Index:
ERROR - IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "Multiple mapping types are not supported""
ERROR - IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "No multi type resolution behavior declared, specify --multi-type-behavior to process""
Index Template:
ERROR - IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "Multiple mapping types are not supported"
ERROR - IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "No multi type resolution behavior declared, specify --multi-type-behavior to process"
DEBUG - 6 transformations did not apply, add --`full` to see all results
Result:
2 migration issues detected
Issues:
IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "Multiple mapping types are not supported""
IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "Multiple mapping types are not supported"
IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "No multi type resolution behavior declared, specify --multi-type-behavior to process""
IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "No multi type resolution behavior declared, specify --multi-type-behavior to process"
```
### Exclude incompatible rolling logs indices

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.transformers.MetadataTransformerParams;
import org.opensearch.migrations.transform.TransformerParams;
import org.opensearch.migrations.transformation.rules.IndexMappingTypeRemoval;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;
Expand Down Expand Up @@ -56,10 +59,20 @@ public class MigrateOrEvaluateArgs {
public Version sourceVersion = null;

@ParametersDelegate
public TransformerParams metadataTransformationParams = new MetadataTransformerParams();
public MetadataTransformationParams metadataTransformationParams = new MetadataTransformationParams();

@ParametersDelegate
public TransformerParams metadataCustomTransformationParams = new MetadataCustomTransformationParams();

@Getter
public static class MetadataTransformationParams implements MetadataTransformerParams {
@Parameter(names = {"--multi-type-behavior"}, description = "Define behavior for resolving multi type mappings.")
public IndexMappingTypeRemoval.MultiTypeResolutionBehavior multiTypeResolutionBehavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.NONE;
}

@Getter
public static class MetadataTransformerParams implements TransformerParams {
public static class MetadataCustomTransformationParams implements TransformerParams {

public String getTransformerConfigParameterArgPrefix() {
return "";
}
Expand Down Expand Up @@ -89,4 +102,11 @@ public String getTransformerConfigParameterArgPrefix() {
description = "Path to the JSON configuration file of metadata transformers.")
private String transformerConfigFile;
}

static class MultiTypeResolutionBehaviorConverter implements IStringConverter<IndexMappingTypeRemoval.MultiTypeResolutionBehavior> {
@Override
public IndexMappingTypeRemoval.MultiTypeResolutionBehavior convert(String value) {
return IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(value.toUpperCase());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected Clusters createClusters() {
}

protected Transformer getCustomTransformer() {
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams);
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataCustomTransformationParams);
if (transformerConfig != null) {
log.atInfo().setMessage("Metadata Transformations config string: {}")
.addArgument(transformerConfig).log();
Expand All @@ -72,7 +72,8 @@ protected Transformer selectTransformer(Clusters clusters) {
var versionTransformer = TransformFunctions.getTransformer(
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
arguments.minNumberOfReplicas
arguments.minNumberOfReplicas,
arguments.metadataTransformationParams
);
var customTransformer = getCustomTransformer();
var compositeTransformer = new CompositeTransformer(customTransformer, versionTransformer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.opensearch.migrations;

import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.commands.MigrationItemResult;
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.io.TempDir;

/**
* Base test class providing shared setup and utility methods for migration tests.
*/
@Slf4j
abstract class BaseMigrationTest {

@TempDir
protected File localDirectory;

@Getter
protected SearchClusterContainer sourceCluster;
@Getter
protected SearchClusterContainer targetCluster;

protected ClusterOperations sourceOperations;
protected ClusterOperations targetOperations;

/**
* Starts the source and target clusters.
*/
protected void startClusters() {
CompletableFuture.allOf(
CompletableFuture.runAsync(sourceCluster::start),
CompletableFuture.runAsync(targetCluster::start)
).join();

sourceOperations = new ClusterOperations(sourceCluster.getUrl());
targetOperations = new ClusterOperations(targetCluster.getUrl());
}

/**
* Sets up a snapshot repository and takes a snapshot of the source cluster.
*
* @param snapshotName Name of the snapshot.
* @return The name of the created snapshot.
*/
@SneakyThrows
protected String createSnapshot(String snapshotName) {
var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
var snapshotCreator = new org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator(
snapshotName,
sourceClient,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
snapshotContext.createSnapshotCreateContext()
);
org.opensearch.migrations.bulkload.worker.SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
sourceCluster.copySnapshotData(localDirectory.toString());
return snapshotName;
}

/**
* Prepares migration arguments for snapshot-based migrations.
*
* @param snapshotName Name of the snapshot.
* @return Prepared migration arguments.
*/
protected MigrateOrEvaluateArgs prepareSnapshotMigrationArgs(String snapshotName) {
var arguments = new MigrateOrEvaluateArgs();
arguments.fileSystemRepoPath = localDirectory.getAbsolutePath();
arguments.snapshotName = snapshotName;
arguments.sourceVersion = sourceCluster.getContainerVersion().getVersion();
arguments.targetArgs.host = targetCluster.getUrl();
return arguments;
}

/**
* Executes the migration command (either migrate or evaluate).
*
* @param arguments Migration arguments.
* @param command The migration command to execute.
* @return The result of the migration.
*/
protected MigrationItemResult executeMigration(MigrateOrEvaluateArgs arguments, MetadataCommands command) {
var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking();
var metadata = new MetadataMigration();

if (MetadataCommands.MIGRATE.equals(command)) {
return metadata.migrate(arguments).execute(metadataContext);
} else {
return metadata.evaluate(arguments).execute(metadataContext);
}
}

/**
* Creates an OpenSearch client for the given cluster.
*
* @param cluster The cluster container.
* @return An OpenSearch client.
*/
protected OpenSearchClient createClient(SearchClusterContainer cluster) {
return new OpenSearchClient(ConnectionContextTestParams.builder()
.host(cluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
}

protected SnapshotTestContext createSnapshotContext() {
return SnapshotTestContext.factory().noOtelTracking();
}

protected FileSystemSnapshotCreator createSnapshotCreator(String snapshotName, OpenSearchClient client, SnapshotTestContext context) {
return new FileSystemSnapshotCreator(
snapshotName,
client,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
context.createSnapshotCreateContext()
);
}

@SneakyThrows
protected void runSnapshotAndCopyData(FileSystemSnapshotCreator snapshotCreator, SearchClusterContainer cluster) {
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
cluster.copySnapshotData(localDirectory.toString());
}
}
Loading

0 comments on commit fdf32c2

Please sign in to comment.