diff --git a/DataGenerator/README.md b/DataGenerator/README.md new file mode 100644 index 000000000..298736248 --- /dev/null +++ b/DataGenerator/README.md @@ -0,0 +1,93 @@ +# Data Generator + +This tool is used to generate data for testing a search cluster. The workloads are similar to those of [OpenSearch Benchmark](https://github.com/opensearch-project/OpenSearch-Benchmark). + +> **⚠️ Test Infrastructure** +> This tool is for test infrastructure. Features may change without notice, and backward compatibility is not guaranteed. + +- [Data Generator](#data-generator) + - [Workloads](#workloads) + - [HttpLogs](#httplogs) + - [Geonames](#geonames) + - [Nested](#nested) + - [NycTaxis](#nyctaxis) + - [Run Data Generator](#run-data-generator) + - [Run workloads programmatically](#run-workloads-programmatically) + - [Generate data via Gradle](#generate-data-via-gradle) + +## Workloads + +The following workloads are supported and can be controlled with the `--workloads [workload1] [workload2] [...]`: `HttpLogs`, `Geonames`, `Nested`, `NycTaxis`. + +### HttpLogs + +Multiple indices with HTTP request log file entries that include client IP, timestamp, and request details. + +### Geonames + +A single index containing a list of geographic features from all over the world. + +### Nested + +A single index of Stack Overflow questions with user IDs and timestamps, along with answers containing user IDs and timestamps, using the nested mapping type. + +### NycTaxis + +A single index of taxi trip record data from every time a taxi dropped off a fare in the area. + +## Run Data Generator + +This tool can be used from the command line and programmatically. The programmatic approach is recommended. + +### Run workloads programmatically + +Insert the following code into the test case: + +```java +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; + +// Create or use an existing OpenSearchClient +var client = new OpenSearchClient(...); + +// Create an instance +var generator = new WorkloadGenerator(client); + +// Pass workload options to the generate method, in this case using the defaults +generator.generate(new WorkloadOptions()); +``` + +### Generate data via Gradle + +To upload data onto a test cluster, use the following command: + +```shell +./gradlew DataGenerator:run --args='--target-host http://hostname:9200' +``` + +
+ +Example command output + + +``` +$ ./gradlew DataGenerator:run --args=' --target-host https://172.18.0.1:19200 --target-insecure --target-username admin --target-password admin --docs-per-workload-count 1000' + +> Task :DataGenerator:run +2024-10-10 17:33:01,247 INFO o.o.m.u.ProcessHelpers [main] getNodeInstanceName()=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef +2024-10-10 17:33:01,249 INFO o.o.m.DataGenerator [main] Starting DataGenerator with workerId=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef +2024-10-10 17:33:01,552 INFO o.o.m.d.WorkloadGenerator [main] Starting document creation +2024-10-10 17:33:02,858 INFO o.o.m.d.WorkloadGenerator [main] All documents queued +2024-10-10 17:33:02,981 INFO o.o.m.d.WorkloadGenerator [main] All documents completed +2024-10-10 17:33:02,981 INFO o.o.m.DataGenerator [main] Generation complete, took 1,429.00ms + +Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. + +You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. + +See https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings + +BUILD SUCCESSFUL in 4s +25 actionable tasks: 1 executed, 24 up-to-date +``` +
diff --git a/DataGenerator/build.gradle b/DataGenerator/build.gradle new file mode 100644 index 000000000..a8e69c9f5 --- /dev/null +++ b/DataGenerator/build.gradle @@ -0,0 +1,40 @@ +plugins { + id 'application' + id 'java' + id 'io.freefair.lombok' +} + +java.sourceCompatibility = JavaVersion.VERSION_11 +java.targetCompatibility = JavaVersion.VERSION_11 + +dependencies { + implementation project(":coreUtilities") + implementation project(":RFS") + + implementation group: 'org.jcommander', name: 'jcommander' + implementation group: 'org.slf4j', name: 'slf4j-api' + implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core' + + implementation platform('io.projectreactor:reactor-bom:2023.0.5') + implementation 'io.projectreactor.netty:reactor-netty-core' + implementation 'io.projectreactor.netty:reactor-netty-http' + + testImplementation testFixtures(project(':RFS')) + testImplementation testFixtures(project(':testHelperFixtures')) + testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core' + testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params' + testImplementation group: 'org.mockito', name: 'mockito-core' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter' + testImplementation group: 'org.hamcrest', name: 'hamcrest' + testImplementation group: 'org.testcontainers', name: 'testcontainers' + + testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine' +} + +application { + mainClassName = 'org.opensearch.migrations.DataGenerator' +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java new file mode 100644 index 000000000..ed5ff2402 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java @@ -0,0 +1,54 @@ +package org.opensearch.migrations; + +import java.text.NumberFormat; + +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.utils.ProcessHelpers; + +import com.beust.jcommander.JCommander; +import lombok.extern.slf4j.Slf4j; + + +/** Command line tool to generate data on a search cluster */ +@Slf4j +public class DataGenerator { + + public static void main(String[] args) { + var workerId = ProcessHelpers.getNodeInstanceName(); + log.info("Starting DataGenerator with workerId =" + workerId); + + var arguments = new DataGeneratorArgs(); + var jCommander = JCommander.newBuilder() + .addObject(arguments) + .build(); + jCommander.parse(args); + + if (arguments.help) { + jCommander.usage(); + return; + } + + var dataGenerator = new DataGenerator(); + dataGenerator.run(arguments); + } + + public void run(DataGeneratorArgs arguments) { + var connectionContext = arguments.targetArgs.toConnectionContext(); + var client = new OpenSearchClient(connectionContext); + + var startTimeMillis = System.currentTimeMillis(); + var workloadGenerator = new WorkloadGenerator(client); + workloadGenerator.generate(arguments.workloadOptions); + var generateTimeMillis = System.currentTimeMillis() - startTimeMillis; + + log.info("Generation complete, took {}ms", formatMillis(generateTimeMillis)); + } + + private String formatMillis(long millis) { + var numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumFractionDigits(2); + numberFormat.setMaximumFractionDigits(2); + return numberFormat.format(millis); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java b/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java new file mode 100644 index 000000000..2cafd3e20 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java @@ -0,0 +1,18 @@ +package org.opensearch.migrations; + +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.data.WorkloadOptions; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParametersDelegate; + +public class DataGeneratorArgs { + @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool") + public boolean help; + + @ParametersDelegate + public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs(); + + @ParametersDelegate + public WorkloadOptions workloadOptions = new WorkloadOptions(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java new file mode 100644 index 000000000..b2a922232 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java @@ -0,0 +1,24 @@ +package org.opensearch.migrations.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** Shared ways to build fields for index mappings */ +public class FieldBuilders { + private static final ObjectMapper mapper = new ObjectMapper(); + + public static ObjectNode createField(String type) { + var field = mapper.createObjectNode(); + field.put("type", type); + return field; + } + + public static ObjectNode createFieldTextRawKeyword() { + var fieldNode = mapper.createObjectNode(); + fieldNode.put("type", "text"); + var fieldsNode = mapper.createObjectNode(); + fieldsNode.set("raw", createField("keyword")); + fieldNode.set("fields", fieldsNode); + return fieldNode; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java new file mode 100644 index 000000000..9866cca92 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java @@ -0,0 +1,16 @@ +package org.opensearch.migrations.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** Options index configuration */ +public class IndexOptions { + private static final ObjectMapper mapper = new ObjectMapper(); + + /** Improvement to add more flexibility with these values */ + public ObjectNode indexSettings = mapper.createObjectNode() + .put("index.number_of_shards", 5) + .put("index.number_of_replicas", 0) + .put("index.queries.cache.enabled", false) + .put("index.requests.cache.enable", false); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java new file mode 100644 index 000000000..6ccfe5ea3 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java @@ -0,0 +1,38 @@ +package org.opensearch.migrations.data; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Random; + +import lombok.experimental.UtilityClass; + +/** Shared ways to build random data */ +@UtilityClass +public class RandomDataBuilders { + private static final ZoneId UTC_ZONE = ZoneId.of("UTC"); + private static final DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000; + + public static long randomTime(long timeFrom, Random random) { + return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS); + } + + public static String randomTimeISOString(long timeFrom, Random random) { + var timeMillis = randomTime(timeFrom, random); + var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE); + return SIMPLE_DATE_PATTERN.format(timeInstant); + } + + public static double randomDouble(Random random, double min, double max) { + return min + (max - min) * random.nextDouble(); + } + + public static String randomElement(String[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } + + public static int randomElement(int[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java new file mode 100644 index 000000000..f1257b37e --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -0,0 +1,62 @@ +package org.opensearch.migrations.data; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.opensearch.migrations.bulkload.common.DocumentReindexer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.data.workloads.Workload; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AllArgsConstructor +public class WorkloadGenerator { + + private final OpenSearchClient client; + + public void generate(WorkloadOptions options) { + log.info("Starting document creation"); + + // This workload creates ALL documents in memory, schedules them and waits for completion. + // If larger scale is needed remove the toList() calls and stream all data. + var allDocs = new ArrayList>(); + for (var workload : options.workloads) { + var workloadInstance = workload.getNewInstance().get(); + var docs = workloadInstance + .indexNames() + .stream() + .map(indexName -> generateDocs(indexName, workloadInstance, options)) + .flatMap(List::stream) + .collect(Collectors.toList()); + allDocs.addAll(docs); + } + + log.info("All documents queued"); + CompletableFuture.allOf(allDocs.toArray(new CompletableFuture[0])).join(); + log.info("All documents completed"); + } + + private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { + // This happens inline to be sure the index exists before docs are indexed on it + client.createIndex(indexName, workload.createIndex(options.index.indexSettings.deepCopy()), null); + + var docIdCounter = new AtomicInteger(0); + var allDocs = workload.createDocs(options.totalDocs) + .map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_ " + docIdCounter.incrementAndGet(), doc.toString())) + .collect(Collectors.toList()); + + var bulkDocGroups = new ArrayList>(); + for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) { + bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size()))); + } + + return bulkDocGroups.stream() + .map(docs -> client.sendBulkRequest(indexName, docs, null).toFuture()) + .collect(Collectors.toList()); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java new file mode 100644 index 000000000..6335d8a26 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -0,0 +1,21 @@ +package org.opensearch.migrations.data; + +import java.util.Arrays; +import java.util.List; + +import org.opensearch.migrations.data.workloads.Workloads; + +import com.beust.jcommander.Parameter; + +public class WorkloadOptions { + @Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false) + public List workloads = Arrays.asList(Workloads.values()); + + @Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload") + public int totalDocs = 1000; + + @Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests") + public int maxBulkBatchSize = 50; + + public IndexOptions index = new IndexOptions(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java new file mode 100644 index 000000000..1728849dc --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java @@ -0,0 +1,126 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; +import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; + +/** + * Workload based off of Geonames + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames + */ +public class Geonames implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] COUNTRY_CODES = { "US", "DE", "FR", "GB", "CN", "IN", "BR" }; + + @Override + public List indexNames() { + return List.of("geonames"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/geonames/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("geonameid", createField("long")); + properties.set("name", createFieldTextRawKeyword()); + properties.set("asciiname", createFieldTextRawKeyword()); + properties.set("alternatenames", createFieldTextRawKeyword()); + properties.set("feature_class", createFieldTextRawKeyword()); + properties.set("feature_code", createFieldTextRawKeyword()); + properties.set("cc2", createFieldTextRawKeyword()); + properties.set("admin1_code", createFieldTextRawKeyword()); + properties.set("admin2_code", createFieldTextRawKeyword()); + properties.set("admin3_code", createFieldTextRawKeyword()); + properties.set("admin4_code", createFieldTextRawKeyword()); + properties.set("elevation", createField("integer")); + properties.set("population", createField("long")); + properties.set("dem", createFieldTextRawKeyword()); + properties.set("timezone", createFieldTextRawKeyword()); + properties.set("location", createField("geo_point")); + + var countryCodeField = createFieldTextRawKeyword(); + countryCodeField.put("fielddata", true); + properties.set("country_code", countryCodeField); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + return index; + } + + /** + * Example generated document: + { + "geonameid": 1018, + "name": "City19", + "asciiname": "City19", + "alternatenames": "City19", + "feature_class": "FCl19", + "feature_code": "FCo19", + "country_code": "DE", + "cc2": "cc219", + "admin1_code": "admin19", + "population": 621, + "dem": "699", + "timezone": "TZ19", + "location": [ + -104.58261595311684, + -58.923212235479056 + ] + } + */ + @Override + public Stream createDocs(int numDocs) { + return IntStream.range(0, numDocs) + .mapToObj(i -> { + // These documents are have a low degree of uniqueness, + // there is an opportunity to augment them by using Random more. + var random = new Random(i); + var doc = mapper.createObjectNode(); + doc.put("geonameid", i + 1000); + doc.put("name", "City" + (i + 1)); + doc.put("asciiname", "City" + (i + 1)); + doc.put("alternatenames", "City" + (i + 1)); + doc.put("feature_class", "FCl" + (i + 1)); + doc.put("feature_code", "FCo" + (i + 1)); + doc.put("country_code", randomCountryCode(random)); + doc.put("cc2", "cc2" + (i + 1)); + doc.put("admin1_code", "admin" + (i + 1)); + doc.put("population", random.nextInt(1000)); + doc.put("dem", random.nextInt(1000) + ""); + doc.put("timezone", "TZ" + (i + 1)); + doc.set("location", randomLocation(random)); + return doc; + } + ); + } + + private static ArrayNode randomLocation(Random random) { + var location = mapper.createArrayNode(); + location.add(randomDouble(random, -180, 180)); // Longitude + location.add(randomDouble(random, -90, 90)); // Latitude + return location; + } + + private static String randomCountryCode(Random random) { + return randomElement(COUNTRY_CODES, random); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java new file mode 100644 index 000000000..182a6287e --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java @@ -0,0 +1,132 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; + +/** + * Workload based off of http_logs + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/http_logs + */ +public class HttpLogs implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] HTTP_METHODS = { "GET", "POST", "PUT", "DELETE" }; + private static final int[] RESPONSE_CODES = { 200, 201, 400, 401, 403, 404, 500 }; + private static final String[] URLS = { + "/home", "/login", "/search", "/api/data", "/contact" + }; + + @Override + public List indexNames() { + return List.of( + "logs-181998", + "logs-191998", + "logs-201998", + "logs-211998", + "logs-221998", + "logs-231998", + "logs-241998" + ); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/http_logs/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + var timestamp = createField("date"); + timestamp.put("format", "strict_date_optional_time||epoch_second"); + properties.set("@timestamp", timestamp); + var message = createField("keyword"); + message.put("index", false); + message.put("doc_values", false); + properties.set("message", message); + properties.set("clientip", createField("ip")); + var request = createFieldTextRawKeyword(); + var requestRaw = (ObjectNode) request.get("fields").get("raw"); + requestRaw.put("ignore_above", 256); + properties.set("request", request); + properties.set("status", createField("integer")); + properties.set("size", createField("integer")); + var geoip = mapper.createObjectNode(); + var geoipProps = mapper.createObjectNode(); + geoip.set("properties", geoipProps); + geoipProps.set("country_name", createField("keyword")); + geoipProps.set("city_name", createField("keyword")); + geoipProps.set("location", createField("geo_point")); + properties.set("geoip", geoip); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + return index; + } + + /** + * Example generated document: + { + "@timestamp": 1728504268181, + "clientip": "106.171.39.19", + "request": "POST /contact HTTP/1.0", + "status": 403, + "size": 16672893 + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + ObjectNode doc = mapper.createObjectNode(); + doc.put("@timestamp", randomTime(currentTime, random)); + doc.put("clientip", randomIpAddress(random)); + doc.put("request", randomRequest(random)); + doc.put("status", randomStatus(random)); + doc.put("size", randomResponseSize(random)); + return doc; + } + ); + } + + private static String randomIpAddress(Random random) { + return random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256); + } + + private static String randomHttpMethod(Random random) { + return randomElement(HTTP_METHODS, random); + } + + private static String randomRequest(Random random) { + return randomHttpMethod(random) + " " + randomUrl(random) + " HTTP/1.0"; + } + + private static String randomUrl(Random random) { + return randomElement(URLS, random); + } + + private static int randomStatus(Random random) { + return randomElement(RESPONSE_CODES, random); + } + + private static int randomResponseSize(Random random) { + return random.nextInt(50 * 1024 * 1024); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java new file mode 100644 index 000000000..f8ba62869 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java @@ -0,0 +1,149 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; + +/** + * Workload based off of nested + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nested + */ +public class Nested implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] USER_NAMES = { + "alice", "bob", "charlie", "david", "eve", + "frank", "grace", "heidi", "ivan", "judy" + }; + private static final String[] TAGS = { + "java", "python", "c++", "javascript", "html", + "css", "sql", "bash", "docker", "kubernetes" + }; + private static final String[] WORDS = { + "the", "quick", "brown", "fox", "jumps", "over", "lazy", "dog" + }; + + @Override + public List indexNames() { + return List.of("sonested"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nested/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("user", createField("keyword")); + properties.set("creationDate", createField("date")); + properties.set("title", createField("text")); + properties.set("qid", createField("keyword")); + properties.set("tag", createField("keyword")); + properties.set("answer_count", createField("integer")); + var answers = createField("nested"); + var answersProps = mapper.createObjectNode(); + answers.set("properties", answersProps); + answersProps.set("user", createField("keyword")); + answersProps.set("date", createField("date")); + properties.set("answers", answers); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + + return index; + } + + /** + * Example generated document: + { + "title": "", + "qid": "1405", + "answers": [ + { + "date": 1728487507935, + "user": "frank (1006)" + } + ], + "tag": [ + "bashv6" + ], + "user": "judy (1001)", + "creationDate": 1728506897762 + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + var creationTime = randomTime(currentTime, random); + var doc = mapper.createObjectNode(); + doc.put("title", randomTitle(random)); + doc.put("qid", (i + 1000) + ""); + doc.set("answers", randomAnswers(mapper, creationTime, random)); + doc.set("tag", randomTags(random)); + doc.put("user", randomUser(random)); + doc.put("creationDate", creationTime); + return doc; + } + ); + } + + private static ArrayNode randomAnswers(ObjectMapper mapper, long timeFrom, Random random) { + var answers = mapper.createArrayNode(); + var numAnswers = random.nextInt(5) + 1; + + for (int i = 0; i < numAnswers; i++) { + var answer = mapper.createObjectNode(); + answer.put("date", randomTime(timeFrom, random)); + answer.put("user", randomUser(random)); + + answers.add(answer); + } + return answers; + } + + private static String randomUser(Random random) { + // Extra random int simulates more users + return randomElement(USER_NAMES, random) + " (" + (random.nextInt(10) + 1000) + ")"; + } + + private static ArrayNode randomTags(Random random) { + var tags = mapper.createArrayNode(); + var tagsToCreate = random.nextInt(3) + 1; + + for (int i = 0; i < tagsToCreate; i++) { + tags.add(randomElement(TAGS, random) + "v" + random.nextInt(10)); // Extra random int simulates more tags + } + return tags; + } + + private static String randomTitle(Random random) { + var titleWordLength = random.nextInt(5); + var words = new ArrayList(); + + for (int i = 0; i < titleWordLength; i++) { + words.add(randomElement(WORDS, random) + "" + random.nextInt(10)); // Extra random int simulates more words + } + return words.stream().collect(Collectors.joining(" ")); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java new file mode 100644 index 000000000..26fa6da49 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java @@ -0,0 +1,173 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTimeISOString; + +/** + * Workload based off of nyc_taxis + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nyc_taxis + */ +public class NycTaxis implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] TRIP_TYPES = {"1" , "2"}; + private static final String[] PAYMENT_TYPES = {"1", " 2", "3", "4"}; + private static final String[] STORE_AND_FWD_FLAGS = {"Y", "N"}; + private static final String[] VENDOR_IDS = {"1", "2"}; + + @Override + public List indexNames() { + return List.of("nyc_taxis"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("cab_color", createField("keyword")); + properties.set("dropoff_datetime", createDateField()); + properties.set("dropoff_location", createField("geo_point")); + properties.set("ehail_fee", createScaledFloatField()); + properties.set("extra", createScaledFloatField()); + properties.set("fare_amount", createScaledFloatField()); + properties.set("improvement_surcharge", createScaledFloatField()); + properties.set("mta_tax", createScaledFloatField()); + properties.set("passenger_count", createField("integer")); + properties.set("payment_type", createField("keyword")); + properties.set("pickup_datetime", createDateField()); + properties.set("pickup_location", createField("geo_point")); + properties.set("rate_code_id", createField("keyword")); + properties.set("store_and_fwd_flag", createField("keyword")); + properties.set("surcharge", createScaledFloatField()); + properties.set("tip_amount", createScaledFloatField()); + properties.set("tolls_amount", createScaledFloatField()); + properties.set("total_amount", createScaledFloatField()); + properties.set("trip_distance", createScaledFloatField()); + properties.set("trip_type", createField("keyword")); + properties.set("vendor_id", createField("keyword")); + properties.set("vendor_name", createField("text")); + + + var mappings = mapper.createObjectNode(); + mappings.set("properties", properties); + mappings.put("dynamic", "strict"); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + + return index; + } + + private static ObjectNode createScaledFloatField() { + var property = mapper.createObjectNode(); + property.put("type", "scaled_float"); + property.put("scaling_factor", 100); + return property; + } + + private static ObjectNode createDateField() { + var field = mapper.createObjectNode(); + field.put("type", "date"); + field.put("format", "yyyy-MM-dd HH:mm:ss"); + return field; + } + + /** + * Example generated document: + { + "total_amount": 48.96852646813233, + "improvement_surcharge": 0.3, + "pickup_location": [ + -73.96071975181356, + 40.761333931139575 + ], + "pickup_datetime": "2024-10-10 03:39:22", + "trip_type": "2", + "dropoff_datetime": "2024-10-09 17:54:43", + "rate_code_id": "1", + "tolls_amount": 0.9381693846282485, + "dropoff_location": [ + -73.9126110288055, + 40.715247495239176 + ], + "passenger_count": 4, + "fare_amount": 21.07896409187173, + "extra": 0.5291259818883527, + "trip_distance": 1.124182854144491, + "tip_amount": 0.372383809916233, + "store_and_fwd_flag": "Y", + "payment_type": "3", + "mta_tax": 0.5, + "vendor_id": "2" + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + var doc = mapper.createObjectNode(); + doc.put("total_amount", randomDouble(random, 5.0, 50.0)); + doc.put("improvement_surcharge", 0.3); + doc.set("pickup_location", randomLocationInNyc(random)); + doc.put("pickup_datetime", randomTimeISOString(currentTime, random)); + doc.put("trip_type", randomTripType(random)); + doc.put("dropoff_datetime", randomTimeISOString(currentTime, random)); + doc.put("rate_code_id", "1"); + doc.put("tolls_amount", randomDouble(random, 0.0, 5.0)); + doc.set("dropoff_location", randomLocationInNyc(random)); + doc.put("passenger_count", random.nextInt(4) + 1); + doc.put("fare_amount", randomDouble(random, 5.0, 50.0)); + doc.put("extra", randomDouble(random, 0.0, 1.0)); + doc.put("trip_distance", randomDouble(random, 0.5, 20.0)); + doc.put("tip_amount", randomDouble(random, 0.0, 15.0)); + doc.put("store_and_fwd_flag", randomStoreAndFwdFlag(random)); + doc.put("payment_type", randomPaymentType(random)); + doc.put("mta_tax", 0.5); + doc.put("vendor_id", randomVendorId(random)); + + return doc; + } + ); + } + + private static ArrayNode randomLocationInNyc(Random random) { + var location = mapper.createArrayNode(); + location.add(randomDouble(random, -74.05, -73.75)); // Longitude + location.add(randomDouble(random, 40.63, 40.85)); // Latitude + return location; + } + + private static String randomTripType(Random random) { + return randomElement(TRIP_TYPES, random); + } + + private static String randomPaymentType(Random random) { + return randomElement(PAYMENT_TYPES, random); + } + + private static String randomStoreAndFwdFlag(Random random) { + return randomElement(STORE_AND_FWD_FLAGS, random); + } + + private static String randomVendorId(Random random) { + return randomElement(VENDOR_IDS, random); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java new file mode 100644 index 000000000..e83e5192b --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java @@ -0,0 +1,20 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * Defines a set of indices, settings, and documents that can be added onto a cluster + */ +public interface Workload { + /** Create an index for the workload with the default settings incorporated */ + ObjectNode createIndex(ObjectNode defaultSettings); + + /** Creates a stream of documents for this workload */ + Stream createDocs(int numDocs); + + /** The name(s) of the indices that should be created for this workload */ + List indexNames(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java new file mode 100644 index 000000000..7ac08b976 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java @@ -0,0 +1,17 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.function.Supplier; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +public enum Workloads { + Geonames(Geonames::new), + HttpLogs(HttpLogs::new), + Nested(Nested::new), + NycTaxis(NycTaxis::new); + + @Getter + private Supplier newInstance; +} diff --git a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java new file mode 100644 index 000000000..3b6cd58db --- /dev/null +++ b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java @@ -0,0 +1,74 @@ +package org.opensearch.migrations; + +import java.util.Map; + +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.bulkload.http.SearchClusterRequests; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; + +/** + * Tests focused on running end to end test cases for Data Generator + */ +@Tag("isolatedTest") +@Slf4j +class DataGeneratorEndToEnd { + + @Test + void generateData_OS_2_14() throws Exception { + try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)) { + generateData(targetCluster); + } + } + + @SneakyThrows + void generateData(final SearchClusterContainer targetCluster) { + // ACTION: Set up the target clusters + targetCluster.start(); + + var arguments = new DataGeneratorArgs(); + arguments.targetArgs.host = targetCluster.getUrl(); + + // ACTION: Generate the data on the target cluster + var dataGenerator = new DataGenerator(); + dataGenerator.run(arguments); + + // VERIFY: Get index state on the target cluster + var requestContext = DocumentMigrationTestContext.factory().noOtelTracking(); + var targetDetails = new SearchClusterRequests(requestContext); + var client = new RestClient(arguments.targetArgs.toConnectionContext()); + + // Make sure the cluster has refreshed before querying it + var refreshResponse = client.post("_refresh", "", requestContext.createUnboundRequestContext()); + assertThat(refreshResponse.body, refreshResponse.statusCode, equalTo(200)); + + // Confirm all indexes have the expected number of docs + var defaultCount = arguments.workloadOptions.totalDocs; + var expectedIndexes = Map.of( + "geonames", defaultCount, + "logs-181998", defaultCount, + "logs-191998", defaultCount, + "logs-201998", defaultCount, + "logs-211998", defaultCount, + "logs-221998", defaultCount, + "logs-231998", defaultCount, + "logs-241998", defaultCount, + "sonested", defaultCount, + "nyc_taxis", defaultCount + ); + + var indexMap = targetDetails.getMapOfIndexAndDocCount(client); + expectedIndexes.forEach((index, expectedDocs) -> + assertThat(indexMap, hasEntry(index, expectedDocs)) + ); + } +} diff --git a/DataGenerator/src/test/resources/log4j2.properties b/DataGenerator/src/test/resources/log4j2.properties new file mode 100644 index 000000000..6def22d58 --- /dev/null +++ b/DataGenerator/src/test/resources/log4j2.properties @@ -0,0 +1,40 @@ +status = INFO + +appenders = console + +appender.console.type = Console +appender.console.name = Console +appender.console.target = SYSTEM_OUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %m%n + +property.ownedPackagesLogLevel=${sys:migrationLogLevel:-DEBUG} + +rootLogger.level = info +rootLogger.appenderRef.console.ref = Console + +# Allow customization of owned package logs +logger.rfs.name = org.opensearch.migrations.bulkload +logger.rfs.level = ${ownedPackagesLogLevel} +logger.migration.name = org.opensearch.migrations +logger.migration.level = ${ownedPackagesLogLevel} + +logger.migrations.name = com.opensearch.migrations +logger.migrations.level = debug + +logger.transformer.name = org.opensearch.migrations.bulkload.transformers.Transformer_ES_6_8_to_OS_2_11 +logger.transformer.level = debug + +# Lower the logging level on these other systems +logger.wire.name = org.apache.hc.client5.http +logger.wire.level = info + +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = info + +logger.dockerclientdeps.name = com.github.dockerjava.zerodep +logger.dockerclientdeps.level = info + +logger.wireLogger.name = org.apache.http.wire +logger.wireLogger.level = OFF +logger.wireLogger.additivity = falsea diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index e023b7dcd..001138867 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -46,6 +46,7 @@ dependencies { testImplementation testFixtures(project(":RFS")) testImplementation testFixtures(project(":coreUtilities")) testImplementation testFixtures(project(":testHelperFixtures")) + testImplementation project(":DataGenerator") testImplementation project(":CreateSnapshot") testImplementation project(":MetadataMigration") testImplementation group: 'org.apache.lucene', name: 'lucene-core' diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index b0256304d..381220b28 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -14,8 +14,11 @@ import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.bulkload.common.FileSystemRepo; -import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; @@ -29,6 +32,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; + @Tag("isolatedTest") @Slf4j public class ParallelDocumentMigrationsTest extends SourceTestBase { @@ -39,24 +43,19 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase { static final List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); public static Stream makeDocumentMigrationArgs() { - List sourceImageArgs = SOURCE_IMAGES.stream() - .map(SourceTestBase::makeParamsForBase) - .collect(Collectors.toList()); var targetImageNames = TARGET_IMAGES.stream() .collect(Collectors.toList()); var numWorkersList = List.of(1, 3, 40); var compressionEnabledList = List.of(true, false); - return sourceImageArgs.stream() + return SOURCE_IMAGES.stream() .flatMap( - sourceParams -> targetImageNames.stream() + sourceImage -> targetImageNames.stream() .flatMap( targetImage -> numWorkersList.stream() .flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of( numWorkers, targetImage, - sourceParams[0], - sourceParams[1], - sourceParams[2], + sourceImage, compression )) ) @@ -69,9 +68,7 @@ public static Stream makeDocumentMigrationArgs() { public void testDocumentMigration( int numWorkers, SearchClusterContainer.ContainerVersion targetVersion, - SearchClusterContainer.ContainerVersion baseSourceImageVersion, - String generatorImage, - String[] generatorArgs, + SearchClusterContainer.ContainerVersion sourceVersion, boolean compressionEnabled ) throws Exception { var executorService = Executors.newFixedThreadPool(numWorkers); @@ -80,22 +77,24 @@ public void testDocumentMigration( .withAllTracking(); try ( - var esSourceContainer = new PreloadedSearchClusterContainer( - baseSourceImageVersion, - SOURCE_SERVER_ALIAS, - generatorImage, - generatorArgs - ); - SearchClusterContainer osTargetContainer = new SearchClusterContainer(targetVersion); + var esSourceContainer = new SearchClusterContainer(sourceVersion); + var osTargetContainer = new SearchClusterContainer(targetVersion); ) { - CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> { - esSourceContainer.start(); - return null; - }, executorService), CompletableFuture.supplyAsync(() -> { - osTargetContainer.start(); - return null; - }, executorService)).join(); + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> esSourceContainer.start(), executorService), + CompletableFuture.runAsync(() -> osTargetContainer.start(), executorService) + ).join(); + + // Populate the source cluster with data + var client = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + var generator = new WorkloadGenerator(client); + generator.generate(new WorkloadOptions()); + // Create the snapshot from the source cluster var args = new CreateSnapshot.Args(); args.snapshotName = "test_snapshot"; args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; @@ -104,7 +103,6 @@ public void testDocumentMigration( var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); snapshotCreator.run(); - final List INDEX_ALLOWLIST = List.of(); var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); try { @@ -125,7 +123,7 @@ public void testDocumentMigration( runCounter, clockJitter, testDocMigrationContext, - baseSourceImageVersion.getVersion(), + sourceVersion.getVersion(), compressionEnabled ), executorService diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index 8afee74fc..92e18aa6e 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -13,8 +13,11 @@ import java.util.function.Function; import org.opensearch.migrations.CreateSnapshot; -import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; import org.opensearch.migrations.testutils.ToxiProxyWrapper; import org.opensearch.testcontainers.OpensearchContainer; @@ -60,7 +63,6 @@ private static class RunData { } @Test - @Tag("longTest") public void testExitsZeroThenThreeForSimpleSetup() throws Exception { testProcess(3, d -> { @@ -94,7 +96,8 @@ public void testExitsZeroThenThreeForSimpleSetup() throws Exception { // to such a short value (1s) that no document migration will exit in that amount of time. For good // measure though, the toxiproxy also adds latency to the requests to make it impossible for the // migration to complete w/in that 1s. - "WITH_DELAYS, 2" }) + "WITH_DELAYS, 2" + }) public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception { final var failHow = FailHow.valueOf(failAfterString); testProcess(expectedExitCode, @@ -105,10 +108,6 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC private void testProcess(int expectedExitCode, Function processRunner) { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); - var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2); - var baseSourceImageVersion = (SearchClusterContainer.ContainerVersion) sourceImageArgs[0]; - var generatorImage = (String) sourceImageArgs[1]; - var generatorArgs = (String[]) sourceImageArgs[2]; var targetImageName = SearchClusterContainer.OS_V2_14_0.getImageName(); var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); @@ -116,29 +115,30 @@ private void testProcess(int expectedExitCode, Function proces try ( var network = Network.newNetwork(); - var esSourceContainer = new PreloadedSearchClusterContainer( - baseSourceImageVersion, - SOURCE_SERVER_ALIAS, - generatorImage, - generatorArgs - ); + var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2) + .withNetwork(network) + .withNetworkAliases(SOURCE_SERVER_ALIAS); var osTargetContainer = new OpensearchContainer<>(targetImageName).withExposedPorts(OPENSEARCH_PORT) .withNetwork(network) .withNetworkAliases(TARGET_DOCKER_HOSTNAME); var proxyContainer = new ToxiProxyWrapper(network) ) { + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> esSourceContainer.start()), + CompletableFuture.runAsync(() -> osTargetContainer.start()), + CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT)) + ).join(); + + // Populate the source cluster with data + var client = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + var generator = new WorkloadGenerator(client); + generator.generate(new WorkloadOptions()); - CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> { - esSourceContainer.start(); - return null; - }), CompletableFuture.supplyAsync(() -> { - osTargetContainer.start(); - return null; - }), CompletableFuture.supplyAsync(() -> { - proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT); - return null; - })).join(); - + // Create the snapshot from the source cluster var args = new CreateSnapshot.Args(); args.snapshotName = SNAPSHOT_NAME; args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 9e003706c..b9a1caa35 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -96,15 +96,20 @@ public static class BulkDocSection { private final String docId; private final String bulkIndex; + public BulkDocSection(String id, String docBody) { + this.docId = id; + this.bulkIndex = createBulkIndex(docId, docBody); + } + public BulkDocSection(RfsLuceneDocument doc) { this.docId = doc.id; - this.bulkIndex = createBulkIndex(docId, doc); + this.bulkIndex = createBulkIndex(docId, doc.source); } @SneakyThrows - private static String createBulkIndex(final String docId, final RfsLuceneDocument doc) { + private static String createBulkIndex(final String docId, final String doc) { // For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line. - String trimmedSource = doc.source.trim().replace("\n", ""); + String trimmedSource = doc.trim().replace("\n", ""); return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 0d01ced82..cb0cd40d8 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -192,7 +192,8 @@ private Optional createObjectIdempotent( ) { var objectDoesNotExist = !hasObjectCheck(objectPath, context); if (objectDoesNotExist) { - client.putAsync(objectPath, settings.toString(), context.createCheckRequestContext()).flatMap(resp -> { + var putRequestContext = context == null ? null : context.createCheckRequestContext(); + client.putAsync(objectPath, settings.toString(), putRequestContext).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else if (resp.statusCode == HttpURLConnection.HTTP_BAD_REQUEST) { diff --git a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java index 439862882..fbbd580af 100644 --- a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java +++ b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java @@ -52,7 +52,8 @@ public void createSnapshotRepository(final String repoPath) throws IOException { } } - public void createDocument(final String index, final String docId, final String body) throws IOException { + @SneakyThrows + public void createDocument(final String index, final String docId, final String body) { var indexDocumentRequest = new HttpPut(clusterUrl + "/" + index + "/_doc/" + docId); indexDocumentRequest.setEntity(new StringEntity(body)); indexDocumentRequest.setHeader("Content-Type", "application/json"); @@ -70,6 +71,30 @@ public void deleteDocument(final String index, final String docId) throws IOExce } } + public void createIndex(final String index) { + var body = "{" + // + " \"settings\": {" + // + " \"index\": {" + // + " \"number_of_shards\": 5," + // + " \"number_of_replicas\": 0" + // + " }" + // + " }" + // + "}"; + createIndex(index, body); + } + + @SneakyThrows + public void createIndex(final String index, final String body) { + var createIndexRequest = new HttpPut(clusterUrl + "/" + index); + createIndexRequest.setEntity(new StringEntity(body)); + createIndexRequest.setHeader("Content-Type", "application/json"); + + try (var response = httpClient.execute(createIndexRequest)) { + var responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + assertThat(responseBody, response.getCode(), anyOf(equalTo(201), equalTo(200))); + } + } + @SneakyThrows public Map.Entry get(final String path) { final var getRequest = new HttpGet(clusterUrl + path); diff --git a/settings.gradle b/settings.gradle index a518c8316..1712d3abb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,6 +48,7 @@ include 'testHelperFixtures' include 'RFS' include 'CreateSnapshot' include 'dashboardsSanitizer' +include 'DataGenerator' include 'MetadataMigration' include 'DocumentsFromSnapshotMigration' include 'TrafficCapture:captureKafkaOffloader'