diff --git a/MetadataMigration/build.gradle b/MetadataMigration/build.gradle index fa0bf72d9..dc66ccb5a 100644 --- a/MetadataMigration/build.gradle +++ b/MetadataMigration/build.gradle @@ -11,6 +11,7 @@ dependencies { implementation project(":coreUtilities") implementation project(":RFS") implementation project(':transformation') + implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders') implementation group: 'org.jcommander', name: 'jcommander' implementation group: 'org.slf4j', name: 'slf4j-api' diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java b/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java index 01942be40..4c51a1868 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java @@ -4,9 +4,11 @@ import org.opensearch.migrations.bulkload.common.http.ConnectionContext; import org.opensearch.migrations.bulkload.models.DataFilterArgs; +import org.opensearch.migrations.transform.TransformerParams; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParametersDelegate; +import lombok.Getter; public class MigrateOrEvaluateArgs { @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool") @@ -52,4 +54,39 @@ public class MigrateOrEvaluateArgs { @Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3.", converter = VersionConverter.class) public Version sourceVersion = null; + + @ParametersDelegate + public MetadataTransformerParams metadataTransformationParams = new MetadataTransformerParams(); + + @Getter + public static class MetadataTransformerParams implements TransformerParams { + public String getTransformerConfigParameterArgPrefix() { + return ""; + } + @Parameter( + required = false, + names = "--transformer-config-base64", + arity = 1, + description = "Configuration of metadata transformers. The same contents as --transformer-config but " + + "Base64 encoded so that the configuration is easier to pass as a command line parameter.") + private String transformerConfigEncoded; + + @Parameter( + required = false, + names = "--transformer-config", + arity = 1, + description = "Configuration of metadata transformers. Either as a string that identifies the " + + "transformer that should be run (with default settings) or as json to specify options " + + "as well as multiple transformers to run in sequence. " + + "For json, keys are the (simple) names of the loaded transformers and values are the " + + "configuration passed to each of the transformers.") + private String transformerConfig; + + @Parameter( + required = false, + names = "--transformer-config-file", + arity = 1, + description = "Path to the JSON configuration file of metadata transformers.") + private String transformerConfigFile; + } } diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Evaluate.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Evaluate.java index 42876fba9..76a8197f4 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Evaluate.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Evaluate.java @@ -2,6 +2,7 @@ import org.opensearch.migrations.MigrateOrEvaluateArgs; import org.opensearch.migrations.MigrationMode; +import org.opensearch.migrations.bulkload.transformers.CompositeTransformer; import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext; import com.beust.jcommander.ParameterException; @@ -24,7 +25,10 @@ public EvaluateResult execute(RootMetadataMigrationContext context) { var clusters = createClusters(); evaluateResult.clusters(clusters); - var transformer = selectTransformer(clusters); + var transformer = new CompositeTransformer( + getCustomTransformer(), + selectTransformer(clusters) + ); var items = migrateAllItems(migrationMode, clusters, transformer, context); evaluateResult.items(items); diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java index 00edd304f..0f4e4335a 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/Migrate.java @@ -2,6 +2,7 @@ import org.opensearch.migrations.MigrateOrEvaluateArgs; import org.opensearch.migrations.MigrationMode; +import org.opensearch.migrations.bulkload.transformers.CompositeTransformer; import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext; import com.beust.jcommander.ParameterException; @@ -24,7 +25,10 @@ public MigrateResult execute(RootMetadataMigrationContext context) { var clusters = createClusters(); migrateResult.clusters(clusters); - var transformer = selectTransformer(clusters); + var transformer = new CompositeTransformer( + getCustomTransformer(), + selectTransformer(clusters) + ); var items = migrateAllItems(migrationMode, clusters, transformer, context); migrateResult.items(items); diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java index b9871e08b..ecc363ccc 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java @@ -7,6 +7,7 @@ import org.opensearch.migrations.MigrationMode; import org.opensearch.migrations.bulkload.transformers.TransformFunctions; import org.opensearch.migrations.bulkload.transformers.Transformer; +import org.opensearch.migrations.bulkload.transformers.TransformerToIJsonTransformerAdapter; import org.opensearch.migrations.bulkload.worker.IndexMetadataResults; import org.opensearch.migrations.bulkload.worker.IndexRunner; import org.opensearch.migrations.bulkload.worker.MetadataRunner; @@ -17,12 +18,19 @@ import org.opensearch.migrations.metadata.CreationResult; import org.opensearch.migrations.metadata.GlobalMetadataCreatorResults; import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext; +import org.opensearch.migrations.transform.TransformationLoader; +import org.opensearch.migrations.transform.TransformerConfigUtils; import lombok.extern.slf4j.Slf4j; /** Shared functionality between migration and evaluation commands */ @Slf4j public abstract class MigratorEvaluatorBase { + public static final String NOOP_TRANSFORMATION_CONFIG = "[" + + " {" + + " \"NoopTransformerProvider\":\"\"" + + " }" + + "]"; static final int INVALID_PARAMETER_CODE = 999; static final int UNEXPECTED_FAILURE_CODE = 888; @@ -45,13 +53,27 @@ protected Clusters createClusters() { return clusters.build(); } + protected Transformer getCustomTransformer() { + var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams); + if (transformerConfig != null) { + log.atInfo().setMessage("Metadata Transformations config string: {}") + .addArgument(transformerConfig).log(); + } else { + log.atInfo().setMessage("Using Noop transformation config: {}") + .addArgument(NOOP_TRANSFORMATION_CONFIG).log(); + transformerConfig = NOOP_TRANSFORMATION_CONFIG; + } + var transformer = new TransformationLoader().getTransformerFactoryLoader(transformerConfig); + return new TransformerToIJsonTransformerAdapter(transformer); + } + protected Transformer selectTransformer(Clusters clusters) { var transformer = TransformFunctions.getTransformer( clusters.getSource().getVersion(), clusters.getTarget().getVersion(), arguments.minNumberOfReplicas ); - log.info("Selected transformer " + transformer.toString()); + log.atInfo().setMessage("Selected transformer: {}").addArgument(transformer).log(); return transformer; } diff --git a/MetadataMigration/src/main/resources/log4j2.properties b/MetadataMigration/src/main/resources/log4j2.properties index 670c04fb9..a55b999a2 100644 --- a/MetadataMigration/src/main/resources/log4j2.properties +++ b/MetadataMigration/src/main/resources/log4j2.properties @@ -2,8 +2,9 @@ status = WARN property.logsDir = ${env:SHARED_LOGS_DIR_PATH:-./logs} property.failedLoggerFileNamePrefix = ${logsDir}/${hostName}/failedRequests/failedRequests +property.metadataTuplesFileNamePrefix = ${logsDir}/${hostName}/metadataTuples/tuples -appenders = console, FailedRequests, MetadataRun +appenders = console, FailedRequests, MetadataRun, MetadataTuples appender.FailedRequests.type = RollingRandomAccessFile appender.FailedRequests.name = FailedRequests @@ -51,3 +52,20 @@ logger.MetadataMigration.level = info logger.MetadataMigration.additivity = false logger.MetadataMigration.appenderRef.stdout.ref = Console logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun + +appender.MetadataTuples.type = RollingRandomAccessFile +appender.MetadataTuples.name = MetadataTuples +appender.MetadataTuples.fileName = ${metadataTuplesFileNamePrefix}.log +appender.MetadataTuples.filePattern = ${metadataTuplesFileNamePrefix}-%d{yyyy-MM-dd-HH-mm}{UTC}-%i.log +appender.MetadataTuples.layout.type = PatternLayout +appender.MetadataTuples.layout.pattern = %m%n +appender.MetadataTuples.policies.type = Policies +appender.MetadataTuples.policies.size.type = SizeBasedTriggeringPolicy +appender.MetadataTuples.policies.size.size = 10 MB +appender.MetadataTuples.strategy.type = DefaultRolloverStrategy +appender.MetadataTuples.immediateFlush = false + +logger.OutputTransformationJsonLogger.name = OutputTransformationJsonLogger +logger.OutputTransformationJsonLogger.level = info +logger.OutputTransformationJsonLogger.additivity = false +logger.OutputTransformationJsonLogger.appenderRef.MetadataTuples.ref = MetadataTuples diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/ComponentTemplate.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/ComponentTemplate.java new file mode 100644 index 000000000..a019b871c --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/ComponentTemplate.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.bulkload.models; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson +public class ComponentTemplate extends MigrationItem { + public static final String TYPE = "component_template"; + public ComponentTemplate(final String name, final ObjectNode body) { + super(TYPE, name, body); + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/GlobalMetadata.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/GlobalMetadata.java index a8b65a2be..2d867274e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/GlobalMetadata.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/GlobalMetadata.java @@ -9,6 +9,7 @@ import org.opensearch.migrations.bulkload.common.RfsException; import org.opensearch.migrations.bulkload.common.SnapshotRepo; +import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -21,18 +22,34 @@ public interface GlobalMetadata { * See: https://github.com/elastic/elasticsearch/blob/v7.10.2/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java#L1622 * See: https://github.com/elastic/elasticsearch/blob/v6.8.23/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java#L1214 */ - public ObjectNode toObjectNode(); + ObjectNode toObjectNode(); - public ObjectNode getTemplates(); + JsonPointer getTemplatesPath(); - public ObjectNode getIndexTemplates(); + JsonPointer getIndexTemplatesPath(); - public ObjectNode getComponentTemplates(); + JsonPointer getComponentTemplatesPath(); + + default ObjectNode getTemplates() { + return getObjectNodeWithPath(getTemplatesPath()); + } + + default ObjectNode getIndexTemplates() { + return getObjectNodeWithPath(getIndexTemplatesPath()); + } + + default ObjectNode getComponentTemplates() { + return getObjectNodeWithPath(getComponentTemplatesPath()); + } + + default ObjectNode getObjectNodeWithPath(JsonPointer path) { + return toObjectNode().withObject(path, JsonNode.OverwriteMode.NULLS, false); + } /** * Defines the behavior required to read a snapshot's global metadata as JSON and convert it into a Data object */ - public static interface Factory { + interface Factory { private JsonNode getJsonNode( SnapshotRepo.Provider repoDataProvider, String snapshotName, @@ -73,22 +90,22 @@ default GlobalMetadata fromRepo(String snapshotName) { } // Version-specific implementation - public GlobalMetadata fromJsonNode(JsonNode root); + GlobalMetadata fromJsonNode(JsonNode root); // Version-specific implementation - public SmileFactory getSmileFactory(); + SmileFactory getSmileFactory(); // Get the underlying SnapshotRepo Provider - public SnapshotRepo.Provider getRepoDataProvider(); + SnapshotRepo.Provider getRepoDataProvider(); } - public static class CantFindSnapshotInRepo extends RfsException { + class CantFindSnapshotInRepo extends RfsException { public CantFindSnapshotInRepo(String snapshotName) { super("Can't find snapshot in repo: " + snapshotName); } } - public static class CantReadGlobalMetadataFromSnapshot extends RfsException { + class CantReadGlobalMetadataFromSnapshot extends RfsException { public CantReadGlobalMetadataFromSnapshot(String snapshotName, Throwable cause) { super("Can't read the global metadata from snapshot: " + snapshotName, cause); } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/Index.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/Index.java new file mode 100644 index 000000000..cef345ad0 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/Index.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.bulkload.models; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson +public class Index extends MigrationItem { + public final static String TYPE = "index"; + public Index(String name, ObjectNode body) { + super(TYPE, name, body); + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexMetadata.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexMetadata.java index dcc6d7d8c..f6a8ecabd 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexMetadata.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexMetadata.java @@ -10,31 +10,33 @@ import org.opensearch.migrations.bulkload.common.SnapshotRepo; import org.opensearch.migrations.transformation.entity.Index; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import org.apache.lucene.codecs.CodecUtil; -public interface IndexMetadata extends Index { - +// All subclasses need to be annotated with this +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type") +public abstract class IndexMetadata implements Index { /* * Defines the behavior expected of an object that will surface the metadata of an index stored in a snapshot * See: https://github.com/elastic/elasticsearch/blob/v7.10.2/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java#L1475 * See: https://github.com/elastic/elasticsearch/blob/v6.8.23/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java#L1284 */ - public JsonNode getAliases(); + public abstract JsonNode getAliases(); - public String getId(); + public abstract String getId(); - public JsonNode getMappings(); + public abstract JsonNode getMappings(); - public String getName(); + public abstract String getName(); - public int getNumberOfShards(); + public abstract int getNumberOfShards(); - public JsonNode getSettings(); + public abstract JsonNode getSettings(); - public IndexMetadata deepCopy(); + public abstract IndexMetadata deepCopy(); /** * Defines the behavior required to read a snapshot's index metadata as JSON and convert it into a Data object @@ -71,15 +73,15 @@ default IndexMetadata fromRepo(String snapshotName, String indexName) { } // Version-specific implementation - public IndexMetadata fromJsonNode(JsonNode root, String indexId, String indexName); + IndexMetadata fromJsonNode(JsonNode root, String indexId, String indexName); // Version-specific implementation - public SmileFactory getSmileFactory(); + SmileFactory getSmileFactory(); // Version-specific implementation - public String getIndexFileId(String snapshotName, String indexName); + String getIndexFileId(String snapshotName, String indexName); // Get the underlying SnapshotRepo Provider - public SnapshotRepo.Provider getRepoDataProvider(); + SnapshotRepo.Provider getRepoDataProvider(); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexTemplate.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexTemplate.java new file mode 100644 index 000000000..792e9d53e --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/IndexTemplate.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.bulkload.models; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson +public class IndexTemplate extends MigrationItem { + public static final String TYPE = "index_template"; + public IndexTemplate(final String name, final ObjectNode body) { + super(TYPE, name, body); + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/LegacyTemplate.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/LegacyTemplate.java new file mode 100644 index 000000000..7edfa763a --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/LegacyTemplate.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.bulkload.models; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(force = true, access = AccessLevel.PRIVATE) // For Jackson +public class LegacyTemplate extends MigrationItem { + public final static String TYPE = "template"; + public LegacyTemplate(final String name, final ObjectNode body) { + super(TYPE, name, body); + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/models/MigrationItem.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/MigrationItem.java new file mode 100644 index 000000000..dccd2cea1 --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/models/MigrationItem.java @@ -0,0 +1,27 @@ +package org.opensearch.migrations.bulkload.models; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +@NoArgsConstructor(force = true, access = AccessLevel.PROTECTED) // For Jackson +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = Index.class, name = Index.TYPE), + @JsonSubTypes.Type(value = LegacyTemplate.class, name = LegacyTemplate.TYPE), + @JsonSubTypes.Type(value = IndexTemplate.class, name = IndexTemplate.TYPE), + @JsonSubTypes.Type(value = ComponentTemplate.class, name = ComponentTemplate.TYPE) +}) +public abstract class MigrationItem { + public final String type; + public final String name; + public final ObjectNode body; + + public MigrationItem(final String type, final String name, final ObjectNode body) { + this.type = type; + this.name = name; + this.body = body; + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/CompositeTransformer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/CompositeTransformer.java new file mode 100644 index 000000000..075c5204b --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/CompositeTransformer.java @@ -0,0 +1,28 @@ +package org.opensearch.migrations.bulkload.transformers; + +import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import org.opensearch.migrations.bulkload.models.IndexMetadata; + +public class CompositeTransformer implements Transformer { + private final Transformer[] transformers; + + public CompositeTransformer(Transformer... transformers) { + this.transformers = transformers; + } + + @Override + public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { + for (Transformer transformer : transformers) { + globalData = transformer.transformGlobalMetadata(globalData); + } + return globalData; + } + + @Override + public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { + for (Transformer transformer : transformers) { + indexData = transformer.transformIndexMetadata(indexData); + } + return indexData; + } +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformerToIJsonTransformerAdapter.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformerToIJsonTransformerAdapter.java new file mode 100644 index 000000000..eaf08cada --- /dev/null +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformerToIJsonTransformerAdapter.java @@ -0,0 +1,165 @@ +package org.opensearch.migrations.bulkload.transformers; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.opensearch.migrations.bulkload.models.ComponentTemplate; +import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import org.opensearch.migrations.bulkload.models.IndexMetadata; +import org.opensearch.migrations.bulkload.models.IndexTemplate; +import org.opensearch.migrations.bulkload.models.LegacyTemplate; +import org.opensearch.migrations.bulkload.models.MigrationItem; +import org.opensearch.migrations.transform.IJsonTransformer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.Lombok; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Slf4j +public class TransformerToIJsonTransformerAdapter implements Transformer { + public static final String OUTPUT_TRANSFORMATION_JSON_LOGGER = "OutputTransformationJsonLogger"; + private final static ObjectMapper MAPPER = new ObjectMapper(); + private final IJsonTransformer transformer; + private final Logger transformerLogger; + + public TransformerToIJsonTransformerAdapter(IJsonTransformer transformer, Logger transformerLogger) { + this.transformer = transformer; + this.transformerLogger = transformerLogger != null ? transformerLogger : LoggerFactory.getLogger(OUTPUT_TRANSFORMATION_JSON_LOGGER); + } + + public TransformerToIJsonTransformerAdapter(IJsonTransformer transformer) { + this(transformer, LoggerFactory.getLogger(OUTPUT_TRANSFORMATION_JSON_LOGGER)); + } + + private void logTransformation(Map before, Map after) { + if (transformerLogger.isInfoEnabled()) { + try { + var transformationTuple = toTransformationMap(before, after); + var tupleString = MAPPER.writeValueAsString(transformationTuple); + transformerLogger.atInfo().setMessage("{}").addArgument(tupleString).log(); + } catch (Exception e) { + log.atError().setCause(e).setMessage("Exception converting tuple to string").log(); + transformerLogger.atInfo().setMessage("{ \"error\":\"{}\" }").addArgument(e::getMessage).log(); + throw Lombok.sneakyThrow(e); + } + } + } + + private Map toTransformationMap(Map before, Map after) { + var transformationMap = new LinkedHashMap(); + transformationMap.put("before", before); + transformationMap.put("after", after); + return transformationMap; + } + + + @SuppressWarnings("unchecked") + private static Map objectNodeToMap(Object node) { + return (Map) MAPPER.convertValue(node, Map.class); + } + + @SneakyThrows + private static String printMap(Map map) { + return MAPPER.writeValueAsString(map); + } + + @SuppressWarnings("unchecked") + private MigrationItem transformMigrationItem(MigrationItem migrationItem) { + // Keep untouched original for logging + final Map originalMap = MAPPER.convertValue(migrationItem, Map.class); + var transformedMigrationItem = transformer.transformJson(MAPPER.convertValue(migrationItem, Map.class)); + logTransformation(originalMap, transformedMigrationItem); + return MAPPER.convertValue(transformedMigrationItem, MigrationItem.class); + } + + void updateTemplates(Collection transformedItems, ObjectNode itemsRoot) { + itemsRoot.removeAll(); + transformedItems.forEach(item -> + { + log.atInfo().setMessage("Setting new item of type {}, name {}, body {}") + .addArgument(item.type) + .addArgument(item.name) + .addArgument(item.body) + .log(); + itemsRoot.set(item.name, item.body); + } + ); + } + + @Override + public GlobalMetadata transformGlobalMetadata(GlobalMetadata globalData) { + var inputJson = objectNodeToMap(globalData.toObjectNode()); + log.atInfo().setMessage("BeforeJsonGlobal: {}").addArgument(() -> printMap(inputJson)).log(); + var afterJson = transformer.transformJson(inputJson); + log.atInfo().setMessage("AfterJsonGlobal: {}").addArgument(() -> printMap(afterJson)).log(); + + + final List legacyTemplates = new ArrayList<>(); + globalData.getTemplates().fields().forEachRemaining( + entry -> legacyTemplates.add(new LegacyTemplate(entry.getKey(), (ObjectNode) entry.getValue())) + ); + final List indexTemplates = new ArrayList<>(); + globalData.getIndexTemplates().fields().forEachRemaining( + entry -> indexTemplates.add(new IndexTemplate(entry.getKey(), (ObjectNode) entry.getValue())) + ); + final List componentTemplates = new ArrayList<>(); + globalData.getComponentTemplates().fields().forEachRemaining( + entry -> componentTemplates.add(new ComponentTemplate(entry.getKey(), (ObjectNode) entry.getValue())) + ); + + var transformedTemplates = Stream.concat(Stream.concat( + legacyTemplates.stream(), + indexTemplates.stream()), + componentTemplates.stream() + ) + .map(this::transformMigrationItem).collect(Collectors.toList()); + + var transformedLegacy = transformedTemplates.stream().filter( + item -> item instanceof LegacyTemplate + ) + .map(item -> (LegacyTemplate) item) + .collect(Collectors.toList()); + + var transformedIndex = transformedTemplates.stream().filter( + item -> item instanceof IndexTemplate + ) + .map(item -> (IndexTemplate) item) + .collect(Collectors.toList()); + + var transformedComponent = transformedTemplates.stream().filter( + item -> item instanceof ComponentTemplate + ) + .map(item -> (ComponentTemplate) item) + .collect(Collectors.toList()); + + assert transformedLegacy.size() + transformedIndex.size() + transformedComponent.size() == transformedTemplates.size(); + + updateTemplates(transformedLegacy, globalData.getTemplates()); + updateTemplates(transformedIndex, globalData.getIndexTemplates()); + updateTemplates(transformedComponent, globalData.getComponentTemplates()); + + log.atInfo().setMessage("GlobalOutput: {}").addArgument(() -> printMap(objectNodeToMap(globalData.toObjectNode()))).log(); + return globalData; + } + + @SuppressWarnings("unchecked") + @Override + public IndexMetadata transformIndexMetadata(IndexMetadata indexData) { + final Map originalInput = MAPPER.convertValue(indexData, Map.class); + final Map inputJson = MAPPER.convertValue(indexData, Map.class); + var afterJson = transformer.transformJson(inputJson); + logTransformation(originalInput, afterJson); + return MAPPER.convertValue(inputJson, IndexMetadata.class); + } + +} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/GlobalMetadataData_ES_6_8.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/GlobalMetadataData_ES_6_8.java index 42756a474..e351243ed 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/GlobalMetadataData_ES_6_8.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/GlobalMetadataData_ES_6_8.java @@ -2,6 +2,7 @@ import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.node.ObjectNode; public class GlobalMetadataData_ES_6_8 implements GlobalMetadata { @@ -16,17 +17,18 @@ public ObjectNode toObjectNode() { return root; } - public ObjectNode getTemplates() { - return (ObjectNode) root.get("templates"); + @Override + public JsonPointer getTemplatesPath() { + return JsonPointer.compile("/templates"); } @Override - public ObjectNode getIndexTemplates() { - return null; + public JsonPointer getIndexTemplatesPath() { + return JsonPointer.compile("/index_template"); } @Override - public ObjectNode getComponentTemplates() { - return null; + public JsonPointer getComponentTemplatesPath() { + return JsonPointer.compile("/component_template"); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/IndexMetadataData_ES_6_8.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/IndexMetadataData_ES_6_8.java index 0effc88ce..09b9ea586 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/IndexMetadataData_ES_6_8.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/IndexMetadataData_ES_6_8.java @@ -3,18 +3,28 @@ import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.bulkload.transformers.TransformFunctions; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; +import lombok.NoArgsConstructor; -public class IndexMetadataData_ES_6_8 implements IndexMetadata { +@NoArgsConstructor(force = true) // For Jackson +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE) +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type") +public class IndexMetadataData_ES_6_8 extends IndexMetadata { @Getter + @JsonProperty("body") private final ObjectNode rawJson; private ObjectNode mappings; private ObjectNode settings; @Getter + @JsonProperty("id") private final String id; @Getter + @JsonProperty("name") private final String name; public IndexMetadataData_ES_6_8(ObjectNode root, String indexId, String indexName) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/GlobalMetadataData_ES_7_10.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/GlobalMetadataData_ES_7_10.java index 1b0f41a54..67565b7e8 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/GlobalMetadataData_ES_7_10.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/GlobalMetadataData_ES_7_10.java @@ -1,11 +1,12 @@ package org.opensearch.migrations.bulkload.version_es_7_10; -import java.util.Optional; import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.node.ObjectNode; + public class GlobalMetadataData_ES_7_10 implements GlobalMetadata { private final ObjectNode root; @@ -18,19 +19,18 @@ public ObjectNode toObjectNode() { return root; } - public ObjectNode getTemplates() { - return Optional.ofNullable(root) - .map(node -> node.get("templates")) - .filter(ObjectNode.class::isInstance) - .map(ObjectNode.class::cast) - .orElse(null); + @Override + public JsonPointer getTemplatesPath() { + return JsonPointer.compile("/templates"); } - public ObjectNode getIndexTemplates() { - return (ObjectNode) root.get("index_template").get("index_template"); + @Override + public JsonPointer getIndexTemplatesPath() { + return JsonPointer.compile("/index_template/index_template"); } - public ObjectNode getComponentTemplates() { - return (ObjectNode) root.get("component_template").get("component_template"); + @Override + public JsonPointer getComponentTemplatesPath() { + return JsonPointer.compile("/component_template/component_template"); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/IndexMetadataData_ES_7_10.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/IndexMetadataData_ES_7_10.java index 2377704f6..549422a06 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/IndexMetadataData_ES_7_10.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/IndexMetadataData_ES_7_10.java @@ -3,17 +3,27 @@ import org.opensearch.migrations.bulkload.models.IndexMetadata; import org.opensearch.migrations.bulkload.transformers.TransformFunctions; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; +import lombok.NoArgsConstructor; -public class IndexMetadataData_ES_7_10 implements IndexMetadata { +@NoArgsConstructor(force = true) // For Jackson +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE) +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type") +public class IndexMetadataData_ES_7_10 extends IndexMetadata { @Getter + @JsonProperty("body") private final ObjectNode rawJson; private ObjectNode mappings; private ObjectNode settings; @Getter + @JsonProperty("id") private final String id; @Getter + @JsonProperty("name") private final String name; public IndexMetadataData_ES_7_10(ObjectNode root, String indexId, String indexName) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/GlobalMetadataData_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/GlobalMetadataData_OS_2_11.java index a7c00b3f4..45bf8935f 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/GlobalMetadataData_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/GlobalMetadataData_OS_2_11.java @@ -2,6 +2,7 @@ import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.node.ObjectNode; public class GlobalMetadataData_OS_2_11 implements GlobalMetadata { @@ -16,25 +17,18 @@ public ObjectNode toObjectNode() { return root; } - public ObjectNode getTemplates() { - return (ObjectNode) root.get("templates"); + @Override + public JsonPointer getTemplatesPath() { + return JsonPointer.compile("/templates"); } - public ObjectNode getIndexTemplates() { - String indexTemplateKey = "index_template"; - if (root.get(indexTemplateKey) != null) { - return (ObjectNode) root.get(indexTemplateKey).get(indexTemplateKey); - } else { - return null; - } + @Override + public JsonPointer getIndexTemplatesPath() { + return JsonPointer.compile("/index_template/index_template"); } - public ObjectNode getComponentTemplates() { - String componentTemplateKey = "component_template"; - if (root.get(componentTemplateKey) != null) { - return (ObjectNode) root.get(componentTemplateKey).get(componentTemplateKey); - } else { - return null; - } + @Override + public JsonPointer getComponentTemplatesPath() { + return JsonPointer.compile("/component_template/component_template"); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/IndexMetadataData_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/IndexMetadataData_OS_2_11.java index c621e3861..067b3b78e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/IndexMetadataData_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/IndexMetadataData_OS_2_11.java @@ -2,11 +2,21 @@ import org.opensearch.migrations.bulkload.models.IndexMetadata; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.node.ObjectNode; +import lombok.NoArgsConstructor; -public class IndexMetadataData_OS_2_11 implements IndexMetadata { +@NoArgsConstructor(force = true) // For Jackson +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE) +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type") +public class IndexMetadataData_OS_2_11 extends IndexMetadata { + @JsonProperty("body") private ObjectNode root; + @JsonProperty("id") private String indexId; + @JsonProperty("name") private String indexName; public IndexMetadataData_OS_2_11(ObjectNode root, String indexId, String indexName) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteIndexMetadata.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteIndexMetadata.java index 7f830c63a..70b229420 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteIndexMetadata.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteIndexMetadata.java @@ -2,55 +2,56 @@ import org.opensearch.migrations.bulkload.models.IndexMetadata; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; -@AllArgsConstructor -public class RemoteIndexMetadata implements IndexMetadata { +@NoArgsConstructor(force = true) // For Jackson +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE) +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type") +@Getter +public class RemoteIndexMetadata extends IndexMetadata { + @JsonProperty("name") + private String name; + @JsonProperty("id") + private String id; + @JsonProperty("body") + private ObjectNode rawJson; - private String indexName; - private ObjectNode sourceData; - - @Override - public ObjectNode getRawJson() { - return sourceData; + RemoteIndexMetadata(String indexName, ObjectNode rawJson) { + this.name = indexName; + // ID is the same as name in remote metadata + this.id = indexName; + this.rawJson = rawJson; } @Override public JsonNode getAliases() { - return sourceData.get("aliases"); - } - - @Override - public String getId() { - // The ID is the name in this case - return getName(); + return rawJson.get("aliases"); } @Override public JsonNode getMappings() { - return sourceData.get("mappings"); - } - - @Override - public String getName() { - return indexName; + return rawJson.get("mappings"); } @Override public int getNumberOfShards() { - throw new UnsupportedOperationException("Unimplemented method 'getNumberOfShards'"); + return getSettings().get("index").get("number_of_shards").asInt(); } @Override public JsonNode getSettings() { - return sourceData.get("settings"); + return rawJson.get("settings"); } @Override public IndexMetadata deepCopy() { - return new RemoteIndexMetadata(indexName, sourceData.deepCopy()); + return new RemoteIndexMetadata(name, rawJson.deepCopy()); } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteMetadata.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteMetadata.java index 36aa46766..0b6247ab2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteMetadata.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteMetadata.java @@ -1,9 +1,8 @@ package org.opensearch.migrations.bulkload.version_universal; -import java.util.Optional; - import org.opensearch.migrations.bulkload.models.GlobalMetadata; +import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.AllArgsConstructor; @@ -18,30 +17,17 @@ public ObjectNode toObjectNode() { } @Override - public ObjectNode getTemplates() { - return Optional.ofNullable(sourceData) - .map(node -> node.get("templates")) - .map(node -> node.get("templates")) - .filter(ObjectNode.class::isInstance) - .map(ObjectNode.class::cast) - .orElse(null); + public JsonPointer getTemplatesPath() { + return JsonPointer.compile("/templates/templates"); } @Override - public ObjectNode getIndexTemplates() { - return Optional.ofNullable(sourceData) - .map(node -> node.get("index_template")) - .map(node -> node.get("index_template")) - .filter(ObjectNode.class::isInstance) - .map(ObjectNode.class::cast) - .orElse(null); } + public JsonPointer getIndexTemplatesPath() { + return JsonPointer.compile("/index_template/index_template"); + } @Override - public ObjectNode getComponentTemplates() { - return Optional.ofNullable(sourceData) - .map(node -> node.get("component_template")) - .map(node -> node.get("component_template")) - .filter(ObjectNode.class::isInstance) - .map(ObjectNode.class::cast) - .orElse(null); } + public JsonPointer getComponentTemplatesPath() { + return JsonPointer.compile("/component_template/component_template"); + } } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java index 2d287b94a..7592dbd80 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/worker/IndexRunner.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.opensearch.migrations.MigrationMode; import org.opensearch.migrations.bulkload.common.FilterScheme; @@ -35,16 +36,34 @@ public IndexMetadataResults migrateIndices(MigrationMode mode, ICreateIndexConte }; var results = IndexMetadataResults.builder(); + // log out filtered items + repoDataProvider.getIndicesInSnapshot(snapshotName) + .stream() + .filter(Predicate.not(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger))) + .forEach(index -> { + var indexMetadata = metadataFactory.fromRepo(snapshotName, index.getName()); + log.atInfo().setMessage("{ \"before\": {},\n\"after\":{}}") + .addArgument(indexMetadata) + .addArgument("Removed due to index filter") + .log(); + }); + + repoDataProvider.getIndicesInSnapshot(snapshotName) .stream() .filter(FilterScheme.filterIndicesByAllowList(indexAllowlist, logger)) .forEach(index -> { var indexName = index.getName(); - var indexMetadata = metadataFactory.fromRepo(snapshotName, indexName); + var originalIndexMetadata = metadataFactory.fromRepo(snapshotName, indexName); CreationResult indexResult = null; + var indexMetadata = originalIndexMetadata.deepCopy(); try { indexMetadata = transformer.transformIndexMetadata(indexMetadata); + log.atInfo().setMessage("{ \"before\": {},\n\"after\":{}}") + .addArgument(originalIndexMetadata) + .addArgument(indexMetadata) + .log(); indexResult = indexCreator.create(indexMetadata, mode, context); } catch (Throwable t) { indexResult = CreationResult.builder() diff --git a/transformation/src/main/java/org/opensearch/migrations/transformation/JsonTransformerForDocumentTypeRemovalProvider.java b/transformation/src/main/java/org/opensearch/migrations/transformation/JsonTransformerForDocumentTypeRemovalProvider.java index ab2b852df..df8f9d083 100644 --- a/transformation/src/main/java/org/opensearch/migrations/transformation/JsonTransformerForDocumentTypeRemovalProvider.java +++ b/transformation/src/main/java/org/opensearch/migrations/transformation/JsonTransformerForDocumentTypeRemovalProvider.java @@ -18,7 +18,9 @@ private static class Transformer implements IJsonTransformer { @Override @SuppressWarnings("unchecked") public Map transformJson(Map incomingJson) { - ((Map) incomingJson.get("index")).remove("_type"); + if (incomingJson.containsKey("index")) { + ((Map) incomingJson.get("index")).remove("_type"); + } return incomingJson; } }