Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MultiTypeMappingTransformation and test #1189

Merged
merged 24 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ec2bb14
Reduce the number of cluster start/stops for Metadata end to end tests
peternied Dec 9, 2024
e375b9a
Add MultiTypeMappingTransformation and test
AndreKurait Dec 10, 2024
0527ea1
Update type union equality
AndreKurait Dec 10, 2024
ea71e8f
Fix transformation test
AndreKurait Dec 10, 2024
4d1b8a7
Disable union type conflict
AndreKurait Dec 10, 2024
1e81a14
Simplify E2E tests in metadata
AndreKurait Dec 10, 2024
b0cca39
Add NONE multiTypeResolutionBehavior
AndreKurait Dec 10, 2024
4b1b788
Add unit test for multi type resolution
AndreKurait Dec 10, 2024
9a095bc
Simplify ClusterOperations
AndreKurait Dec 10, 2024
bfb5704
Format SearchClusterContainer
AndreKurait Dec 10, 2024
50af06d
Update IndexMappingTypeRemovalTest
AndreKurait Dec 10, 2024
828489e
Add hint to error due to multi type index
AndreKurait Dec 10, 2024
98c6b7d
Fix formatting
AndreKurait Dec 10, 2024
d383fd3
Remove TestMetadataTransformationParams
AndreKurait Dec 10, 2024
9a2edba
Fix spotless
AndreKurait Dec 10, 2024
3511cad
Update test for metadata
AndreKurait Dec 10, 2024
1b5c6a5
Merge branch 'faster-meta-e2e' into MetadataMultiType
AndreKurait Dec 10, 2024
406ad78
Run formatter on EndToEndTest
AndreKurait Dec 10, 2024
28eaf85
Speed up metadata EndToEndTest
AndreKurait Dec 10, 2024
33d4e79
Add pre-pull to gradle task
AndreKurait Dec 10, 2024
c5cb306
Remove multiple mounts in MultiTypeMappingTransformationTest for GHA …
AndreKurait Dec 11, 2024
e501f09
Merge remote-tracking branch 'upstream/main' into MetadataMultiType
AndreKurait Dec 11, 2024
e9e2805
Fix GHA by disabliing MultiTypeMappingTransformationTest
AndreKurait Dec 11, 2024
d276695
Spotless Apply
AndreKurait Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,44 @@ jobs:
with:
gradle-version: ${{ env.gradle-version }}
gradle-home-cache-cleanup: true
- name: Generate Cache Key from Dockerfiles
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndreKurait Can you revert the change to this file or maybe we can sync on the utility of this change?

The additional workflow complexity + extra effort to remember to sync these lists between java source and CI.yml gives me pause about including this change. It looks like this was done in an effort to speed up and issues with the test case that ended up being disabled due to an unrelated root cause.

id: generate_cache_key
run: |
files=$(find . -type f \( -name 'docker-compose.yml' -o -name 'Dockerfile' \))
file_contents=$(cat $files)
key=$(echo "${file_contents}" | sha1sum | awk '{print $1}')
echo "key=${key}" >> "$GITHUB_OUTPUT"
- name: Cache Docker Images
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ steps.generate_cache_key.outputs.key }}
- name: Pre pull images
run: |
pull_if_not_present() {
local image="$1"
if docker image inspect "$image" > /dev/null 2>&1; then
echo "Image '$image' already exists locally."
else
echo "Pulling image '$image'..."
docker pull "$image"
fi
}
images=(
"opensearchproject/opensearch:1.3.16"
"opensearchproject/opensearch:2.14.0"
"docker.elastic.co/elasticsearch/elasticsearch:7.17.22"
"docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2"
"docker.elastic.co/elasticsearch/elasticsearch-oss:6.8.23"
"docker.elastic.co/elasticsearch/elasticsearch:5.6.16"
"httpd:alpine"
"confluentinc/cp-kafka:7.5.0"
"ghcr.io/shopify/toxiproxy:latest"
"amazonlinux:2023"
"alpine:3.16"
)
for image in "${images[@]}"; do
pull_if_not_present "$image"
done
- name: Run Gradle Build
run: ./gradlew build -x test -x TrafficCapture:dockerSolution:build -x spotlessCheck --stacktrace
env:
Expand Down Expand Up @@ -161,6 +199,7 @@ jobs:
uses: ScribeMD/[email protected]
with:
key: docker-${{ runner.os }}-${{ steps.generate_cache_key.outputs.key }}
read-only: true
- name: Start Docker Solution
run: ./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test -x spotlessCheck --info --stacktrace
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ private void migrationDocumentsWithClusters(


sourceClusterOperations.createDocument(indexName, "222", "{\"author\":\"Tobias Funke\"}");
sourceClusterOperations.createDocument(indexName, "223", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "224", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1");
sourceClusterOperations.createDocument(indexName, "225", "{\"author\":\"Tobias Funke\", \"category\": \"tech\"}", "2");
sourceClusterOperations.createDocument(indexName, "223", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1", null);
sourceClusterOperations.createDocument(indexName, "224", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}", "1", null);
sourceClusterOperations.createDocument(indexName, "225", "{\"author\":\"Tobias Funke\", \"category\": \"tech\"}", "2", null);

// === ACTION: Take a snapshot ===
var snapshotName = "my_snap";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,13 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase {
public static Stream<Arguments> makeDocumentMigrationArgs() {
var numWorkersList = List.of(1, 3, 40);
var compressionEnabledList = List.of(true, false);
return SupportedClusters.targets().stream()
.flatMap(
targetImage -> numWorkersList.stream()
return numWorkersList.stream()
.flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of(
numWorkers,
targetImage,
SearchClusterContainer.OS_V2_14_0,
compression
))
)
);
);
}

@ParameterizedTest
Expand All @@ -70,8 +67,8 @@ public void testDocumentMigration(
var osTargetContainer = new SearchClusterContainer(targetVersion);
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> esSourceContainer.start(), executorService),
CompletableFuture.runAsync(() -> osTargetContainer.start(), executorService)
CompletableFuture.runAsync(esSourceContainer::start, executorService),
CompletableFuture.runAsync(osTargetContainer::start, executorService)
).join();

// Populate the source cluster with data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public class SourceTestBase {
public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;

protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) {
return new Object[] {
return new Object[]{
baseSourceImage,
GENERATOR_BASE_IMAGE,
new String[] { "/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/" } };
new String[]{"/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/"}};
}

@AllArgsConstructor
Expand Down Expand Up @@ -102,7 +102,7 @@ public static int migrateDocumentsSequentially(
Version version,
boolean compressionEnabled
) {
for (int runNumber = 1;; ++runNumber) {
for (int runNumber = 1; ; ++runNumber) {
try {
var workResult = migrateDocumentsWithOneWorker(
sourceRepo,
Expand Down Expand Up @@ -146,7 +146,8 @@ public Flux<RfsLuceneDocument> readDocuments(int startSegmentIndex, int startDoc
}
}

static class LeasePastError extends Error {}
static class LeasePastError extends Error {
}

@SneakyThrows
public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
Expand Down
1 change: 1 addition & 0 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.testcontainers', name: 'testcontainers'
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
}
Expand Down
8 changes: 4 additions & 4 deletions MetadataMigration/docs/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,17 @@ Migration Candidates:

Transformations:
Index:
ERROR - IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "Multiple mapping types are not supported""
ERROR - IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "No multi type resolution behavior declared, specify --multi-type-behavior to process""
Index Template:
ERROR - IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "Multiple mapping types are not supported"
ERROR - IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "No multi type resolution behavior declared, specify --multi-type-behavior to process"
DEBUG - 6 transformations did not apply, add --`full` to see all results

Result:
2 migration issues detected

Issues:
IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "Multiple mapping types are not supported""
IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "Multiple mapping types are not supported"
IndexMappingTypeRemoval is Unsupported on Index `logs-181998` "No multi type resolution behavior declared, specify --multi-type-behavior to process""
IndexMappingTypeRemoval is Unsupported on Index Template `daily_logs` "No multi type resolution behavior declared, specify --multi-type-behavior to process"
```
### Exclude incompatible rolling logs indices

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.transformers.MetadataTransformerParams;
import org.opensearch.migrations.transform.TransformerParams;
import org.opensearch.migrations.transformation.rules.IndexMappingTypeRemoval;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;
Expand Down Expand Up @@ -56,10 +59,20 @@ public class MigrateOrEvaluateArgs {
public Version sourceVersion = null;

@ParametersDelegate
public TransformerParams metadataTransformationParams = new MetadataTransformerParams();
public MetadataTransformationParams metadataTransformationParams = new MetadataTransformationParams();

@ParametersDelegate
public TransformerParams metadataCustomTransformationParams = new MetadataCustomTransformationParams();

@Getter
public static class MetadataTransformationParams implements MetadataTransformerParams {
@Parameter(names = {"--multi-type-behavior"}, description = "Define behavior for resolving multi type mappings.")
public IndexMappingTypeRemoval.MultiTypeResolutionBehavior multiTypeResolutionBehavior = IndexMappingTypeRemoval.MultiTypeResolutionBehavior.NONE;
}

@Getter
public static class MetadataTransformerParams implements TransformerParams {
public static class MetadataCustomTransformationParams implements TransformerParams {

public String getTransformerConfigParameterArgPrefix() {
return "";
}
Expand Down Expand Up @@ -89,4 +102,11 @@ public String getTransformerConfigParameterArgPrefix() {
description = "Path to the JSON configuration file of metadata transformers.")
private String transformerConfigFile;
}

static class MultiTypeResolutionBehaviorConverter implements IStringConverter<IndexMappingTypeRemoval.MultiTypeResolutionBehavior> {
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
@Override
public IndexMappingTypeRemoval.MultiTypeResolutionBehavior convert(String value) {
return IndexMappingTypeRemoval.MultiTypeResolutionBehavior.valueOf(value.toUpperCase());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected Clusters createClusters() {
}

protected Transformer getCustomTransformer() {
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams);
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataCustomTransformationParams);
if (transformerConfig != null) {
log.atInfo().setMessage("Metadata Transformations config string: {}")
.addArgument(transformerConfig).log();
Expand All @@ -72,7 +72,8 @@ protected Transformer selectTransformer(Clusters clusters) {
var versionTransformer = TransformFunctions.getTransformer(
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
arguments.minNumberOfReplicas
arguments.minNumberOfReplicas,
arguments.metadataTransformationParams
);
var customTransformer = getCustomTransformer();
var compositeTransformer = new CompositeTransformer(customTransformer, versionTransformer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.opensearch.migrations;

import java.io.File;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
import org.opensearch.migrations.commands.MigrationItemResult;
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.io.TempDir;

/**
* Base test class providing shared setup and utility methods for migration tests.
*/
@Slf4j
abstract class BaseMigrationTest {

@TempDir
protected File localDirectory;

@Getter
protected SearchClusterContainer sourceCluster;
@Getter
protected SearchClusterContainer targetCluster;

protected ClusterOperations sourceOperations;
protected ClusterOperations targetOperations;

/**
* Starts the source and target clusters.
*/
protected void startClusters() {
CompletableFuture.allOf(
CompletableFuture.runAsync(sourceCluster::start),
CompletableFuture.runAsync(targetCluster::start)
).join();

sourceOperations = new ClusterOperations(sourceCluster.getUrl());
targetOperations = new ClusterOperations(targetCluster.getUrl());
}

/**
* Sets up a snapshot repository and takes a snapshot of the source cluster.
*
* @param snapshotName Name of the snapshot.
* @return The name of the created snapshot.
*/
@SneakyThrows
protected String createSnapshot(String snapshotName) {
var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(sourceCluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
var snapshotCreator = new org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator(
snapshotName,
sourceClient,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
snapshotContext.createSnapshotCreateContext()
);
org.opensearch.migrations.bulkload.worker.SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
sourceCluster.copySnapshotData(localDirectory.toString());
return snapshotName;
}

/**
* Prepares migration arguments for snapshot-based migrations.
*
* @param snapshotName Name of the snapshot.
* @return Prepared migration arguments.
*/
protected MigrateOrEvaluateArgs prepareSnapshotMigrationArgs(String snapshotName) {
var arguments = new MigrateOrEvaluateArgs();
arguments.fileSystemRepoPath = localDirectory.getAbsolutePath();
arguments.snapshotName = snapshotName;
arguments.sourceVersion = sourceCluster.getContainerVersion().getVersion();
arguments.targetArgs.host = targetCluster.getUrl();
return arguments;
}

/**
* Executes the migration command (either migrate or evaluate).
*
* @param arguments Migration arguments.
* @param command The migration command to execute.
* @return The result of the migration.
*/
protected MigrationItemResult executeMigration(MigrateOrEvaluateArgs arguments, MetadataCommands command) {
var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking();
var metadata = new MetadataMigration();

if (MetadataCommands.MIGRATE.equals(command)) {
return metadata.migrate(arguments).execute(metadataContext);
} else {
return metadata.evaluate(arguments).execute(metadataContext);
}
}

/**
* Creates an OpenSearch client for the given cluster.
*
* @param cluster The cluster container.
* @return An OpenSearch client.
*/
protected OpenSearchClient createClient(SearchClusterContainer cluster) {
return new OpenSearchClient(ConnectionContextTestParams.builder()
.host(cluster.getUrl())
.insecure(true)
.build()
.toConnectionContext());
}

protected SnapshotTestContext createSnapshotContext() {
return SnapshotTestContext.factory().noOtelTracking();
}

protected FileSystemSnapshotCreator createSnapshotCreator(String snapshotName, OpenSearchClient client, SnapshotTestContext context) {
return new FileSystemSnapshotCreator(
snapshotName,
client,
SearchClusterContainer.CLUSTER_SNAPSHOT_DIR,
List.of(),
context.createSnapshotCreateContext()
);
}

@SneakyThrows
protected void runSnapshotAndCopyData(FileSystemSnapshotCreator snapshotCreator, SearchClusterContainer cluster) {
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
cluster.copySnapshotData(localDirectory.toString());
}
}
Loading
Loading