Skip to content

Commit

Permalink
Add metadata transformations with metadata tuples
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Nov 19, 2024
1 parent 892990a commit 488c664
Show file tree
Hide file tree
Showing 25 changed files with 524 additions and 113 deletions.
1 change: 1 addition & 0 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(":coreUtilities")
implementation project(":RFS")
implementation project(':transformation')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.transform.TransformerParams;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;

public class MigrateOrEvaluateArgs {
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
Expand Down Expand Up @@ -52,4 +54,39 @@ public class MigrateOrEvaluateArgs {

@Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3.", converter = VersionConverter.class)
public Version sourceVersion = null;

@ParametersDelegate
public MetadataTransformerParams metadataTransformationParams = new MetadataTransformerParams();

@Getter
public static class MetadataTransformerParams implements TransformerParams {
public String getTransformerConfigParameterArgPrefix() {
return "";
}
@Parameter(
required = false,
names = "--transformer-config-base64",
arity = 1,
description = "Configuration of metadata transformers. The same contents as --transformer-config but " +
"Base64 encoded so that the configuration is easier to pass as a command line parameter.")
private String transformerConfigEncoded;

@Parameter(
required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of metadata transformers. Either as a string that identifies the "
+ "transformer that should be run (with default settings) or as json to specify options "
+ "as well as multiple transformers to run in sequence. "
+ "For json, keys are the (simple) names of the loaded transformers and values are the "
+ "configuration passed to each of the transformers.")
private String transformerConfig;

@Parameter(
required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of metadata transformers.")
private String transformerConfigFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.opensearch.migrations.MigrateOrEvaluateArgs;
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.transformers.CompositeTransformer;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;

import com.beust.jcommander.ParameterException;
Expand All @@ -24,7 +25,10 @@ public EvaluateResult execute(RootMetadataMigrationContext context) {
var clusters = createClusters();
evaluateResult.clusters(clusters);

var transformer = selectTransformer(clusters);
var transformer = new CompositeTransformer(
getCustomTransformer(),
selectTransformer(clusters)
);

var items = migrateAllItems(migrationMode, clusters, transformer, context);
evaluateResult.items(items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.opensearch.migrations.MigrateOrEvaluateArgs;
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.transformers.CompositeTransformer;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;

import com.beust.jcommander.ParameterException;
Expand All @@ -24,7 +25,10 @@ public MigrateResult execute(RootMetadataMigrationContext context) {
var clusters = createClusters();
migrateResult.clusters(clusters);

var transformer = selectTransformer(clusters);
var transformer = new CompositeTransformer(
getCustomTransformer(),
selectTransformer(clusters)
);

var items = migrateAllItems(migrationMode, clusters, transformer, context);
migrateResult.items(items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.transformers.TransformFunctions;
import org.opensearch.migrations.bulkload.transformers.Transformer;
import org.opensearch.migrations.bulkload.transformers.TransformerToIJsonTransformerAdapter;
import org.opensearch.migrations.bulkload.worker.IndexMetadataResults;
import org.opensearch.migrations.bulkload.worker.IndexRunner;
import org.opensearch.migrations.bulkload.worker.MetadataRunner;
Expand All @@ -17,12 +18,19 @@
import org.opensearch.migrations.metadata.CreationResult;
import org.opensearch.migrations.metadata.GlobalMetadataCreatorResults;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.transform.TransformerConfigUtils;

import lombok.extern.slf4j.Slf4j;

/** Shared functionality between migration and evaluation commands */
@Slf4j
public abstract class MigratorEvaluatorBase {
public static final String NOOP_TRANSFORMATION_CONFIG = "[" +
" {" +
" \"NoopTransformerProvider\":\"\"" +
" }" +
"]";

static final int INVALID_PARAMETER_CODE = 999;
static final int UNEXPECTED_FAILURE_CODE = 888;
Expand All @@ -45,13 +53,27 @@ protected Clusters createClusters() {
return clusters.build();
}

protected Transformer getCustomTransformer() {
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams);
if (transformerConfig != null) {
log.atInfo().setMessage("Metadata Transformations config string: {}")
.addArgument(transformerConfig).log();
} else {
log.atInfo().setMessage("Using Noop transformation config: {}")
.addArgument(NOOP_TRANSFORMATION_CONFIG).log();
transformerConfig = NOOP_TRANSFORMATION_CONFIG;
}
var transformer = new TransformationLoader().getTransformerFactoryLoader(transformerConfig);
return new TransformerToIJsonTransformerAdapter(transformer);
}

protected Transformer selectTransformer(Clusters clusters) {
var transformer = TransformFunctions.getTransformer(
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
arguments.minNumberOfReplicas
);
log.info("Selected transformer " + transformer.toString());
log.atInfo().setMessage("Selected transformer: {}").addArgument(transformer).log();
return transformer;
}

Expand Down
20 changes: 19 additions & 1 deletion MetadataMigration/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ status = WARN

property.logsDir = ${env:SHARED_LOGS_DIR_PATH:-./logs}
property.failedLoggerFileNamePrefix = ${logsDir}/${hostName}/failedRequests/failedRequests
property.metadataTuplesFileNamePrefix = ${logsDir}/${hostName}/metadataTuples/tuples

appenders = console, FailedRequests, MetadataRun
appenders = console, FailedRequests, MetadataRun, MetadataTuples

appender.FailedRequests.type = RollingRandomAccessFile
appender.FailedRequests.name = FailedRequests
Expand Down Expand Up @@ -51,3 +52,20 @@ logger.MetadataMigration.level = info
logger.MetadataMigration.additivity = false
logger.MetadataMigration.appenderRef.stdout.ref = Console
logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun

appender.MetadataTuples.type = RollingRandomAccessFile
appender.MetadataTuples.name = MetadataTuples
appender.MetadataTuples.fileName = ${metadataTuplesFileNamePrefix}.log
appender.MetadataTuples.filePattern = ${metadataTuplesFileNamePrefix}-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log
appender.MetadataTuples.layout.type = PatternLayout
appender.MetadataTuples.layout.pattern = %m%n
appender.MetadataTuples.policies.type = Policies
appender.MetadataTuples.policies.size.type = SizeBasedTriggeringPolicy
appender.MetadataTuples.policies.size.size = 10 MB
appender.MetadataTuples.strategy.type = DefaultRolloverStrategy
appender.MetadataTuples.immediateFlush = false

logger.OutputTransformationJsonLogger.name = OutputTransformationJsonLogger
logger.OutputTransformationJsonLogger.level = info
logger.OutputTransformationJsonLogger.additivity = false
logger.OutputTransformationJsonLogger.appenderRef.MetadataTuples.ref = MetadataTuples
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.migrations.bulkload.models;

import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson
public class ComponentTemplate extends MigrationItem {
public static final String TYPE = "component_template";
public ComponentTemplate(final String name, final ObjectNode body) {
super(TYPE, name, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.bulkload.common.RfsException;
import org.opensearch.migrations.bulkload.common.SnapshotRepo;

import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -21,18 +22,34 @@ public interface GlobalMetadata {
* See: https://github.com/elastic/elasticsearch/blob/v7.10.2/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java#L1622
* See: https://github.com/elastic/elasticsearch/blob/v6.8.23/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java#L1214
*/
public ObjectNode toObjectNode();
ObjectNode toObjectNode();

public ObjectNode getTemplates();
JsonPointer getTemplatesPath();

public ObjectNode getIndexTemplates();
JsonPointer getIndexTemplatesPath();

public ObjectNode getComponentTemplates();
JsonPointer getComponentTemplatesPath();

default ObjectNode getTemplates() {
return getObjectNodeWithPath(getTemplatesPath());
}

default ObjectNode getIndexTemplates() {
return getObjectNodeWithPath(getIndexTemplatesPath());
}

default ObjectNode getComponentTemplates() {
return getObjectNodeWithPath(getComponentTemplatesPath());
}

default ObjectNode getObjectNodeWithPath(JsonPointer path) {
return toObjectNode().withObject(path, JsonNode.OverwriteMode.NULLS, false);
}

/**
* Defines the behavior required to read a snapshot's global metadata as JSON and convert it into a Data object
*/
public static interface Factory {
interface Factory {
private JsonNode getJsonNode(
SnapshotRepo.Provider repoDataProvider,
String snapshotName,
Expand Down Expand Up @@ -73,22 +90,22 @@ default GlobalMetadata fromRepo(String snapshotName) {
}

// Version-specific implementation
public GlobalMetadata fromJsonNode(JsonNode root);
GlobalMetadata fromJsonNode(JsonNode root);

// Version-specific implementation
public SmileFactory getSmileFactory();
SmileFactory getSmileFactory();

// Get the underlying SnapshotRepo Provider
public SnapshotRepo.Provider getRepoDataProvider();
SnapshotRepo.Provider getRepoDataProvider();
}

public static class CantFindSnapshotInRepo extends RfsException {
class CantFindSnapshotInRepo extends RfsException {
public CantFindSnapshotInRepo(String snapshotName) {
super("Can't find snapshot in repo: " + snapshotName);
}
}

public static class CantReadGlobalMetadataFromSnapshot extends RfsException {
class CantReadGlobalMetadataFromSnapshot extends RfsException {
public CantReadGlobalMetadataFromSnapshot(String snapshotName, Throwable cause) {
super("Can't read the global metadata from snapshot: " + snapshotName, cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.migrations.bulkload.models;

import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson
public class Index extends MigrationItem {
public final static String TYPE = "index";
public Index(String name, ObjectNode body) {
super(TYPE, name, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,33 @@
import org.opensearch.migrations.bulkload.common.SnapshotRepo;
import org.opensearch.migrations.transformation.entity.Index;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import org.apache.lucene.codecs.CodecUtil;

public interface IndexMetadata extends Index {

// All subclasses need to be annotated with this
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type")
public abstract class IndexMetadata implements Index {
/*
* Defines the behavior expected of an object that will surface the metadata of an index stored in a snapshot
* See: https://github.com/elastic/elasticsearch/blob/v7.10.2/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java#L1475
* See: https://github.com/elastic/elasticsearch/blob/v6.8.23/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java#L1284
*/
public JsonNode getAliases();
public abstract JsonNode getAliases();

public String getId();
public abstract String getId();

public JsonNode getMappings();
public abstract JsonNode getMappings();

public String getName();
public abstract String getName();

public int getNumberOfShards();
public abstract int getNumberOfShards();

public JsonNode getSettings();
public abstract JsonNode getSettings();

public IndexMetadata deepCopy();
public abstract IndexMetadata deepCopy();

/**
* Defines the behavior required to read a snapshot's index metadata as JSON and convert it into a Data object
Expand Down Expand Up @@ -71,15 +73,15 @@ default IndexMetadata fromRepo(String snapshotName, String indexName) {
}

// Version-specific implementation
public IndexMetadata fromJsonNode(JsonNode root, String indexId, String indexName);
IndexMetadata fromJsonNode(JsonNode root, String indexId, String indexName);

// Version-specific implementation
public SmileFactory getSmileFactory();
SmileFactory getSmileFactory();

// Version-specific implementation
public String getIndexFileId(String snapshotName, String indexName);
String getIndexFileId(String snapshotName, String indexName);

// Get the underlying SnapshotRepo Provider
public SnapshotRepo.Provider getRepoDataProvider();
SnapshotRepo.Provider getRepoDataProvider();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.migrations.bulkload.models;

import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson
public class IndexTemplate extends MigrationItem {
public static final String TYPE = "index_template";
public IndexTemplate(final String name, final ObjectNode body) {
super(TYPE, name, body);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.opensearch.migrations.bulkload.models;

import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson
public class LegacyTemplate extends MigrationItem {
public final static String TYPE = "template";
public LegacyTemplate(final String name, final ObjectNode body) {
super(TYPE, name, body);
}
}
Loading

0 comments on commit 488c664

Please sign in to comment.