From 9f66fa960fd686001c91848fddbab1050e09af1a Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Thu, 10 Oct 2024 17:43:08 +0000 Subject: [PATCH 1/2] Add DataGenerator tool New programmatic and CLI way to generate test data mirroring geonames, http_logs, nyc_taxis, and nested workloads from OSB. In the past this repo has been making use of OpenSearch Benchmark which has been valuable for real world data. However, there are dependencies on external internet sources and python libraries that have caused issues in the past. This tool gives this project similar functionality without those issues. Signed-off-by: Peter Nied --- DataGenerator/README.md | 69 +++++++ DataGenerator/build.gradle | 40 ++++ .../opensearch/migrations/DataGenerator.java | 54 ++++++ .../migrations/DataGeneratorArgs.java | 18 ++ .../migrations/data/FieldBuilders.java | 24 +++ .../migrations/data/IndexOptions.java | 16 ++ .../migrations/data/RandomDataBuilders.java | 38 ++++ .../migrations/data/WorkloadGenerator.java | 62 +++++++ .../migrations/data/WorkloadOptions.java | 21 +++ .../migrations/data/workloads/Geonames.java | 126 +++++++++++++ .../migrations/data/workloads/HttpLogs.java | 132 +++++++++++++ .../migrations/data/workloads/Nested.java | 149 +++++++++++++++ .../migrations/data/workloads/NycTaxis.java | 173 ++++++++++++++++++ .../migrations/data/workloads/Workload.java | 20 ++ .../migrations/data/workloads/Workloads.java | 17 ++ .../migrations/DataGeneratorEndToEnd.java | 73 ++++++++ .../src/test/resources/log4j2.properties | 40 ++++ DocumentsFromSnapshotMigration/build.gradle | 1 + .../ParallelDocumentMigrationsTest.java | 54 +++--- .../bulkload/ProcessLifecycleTest.java | 48 ++--- .../bulkload/common/DocumentReindexer.java | 11 +- .../bulkload/common/OpenSearchClient.java | 5 +- .../bulkload/http/ClusterOperations.java | 27 ++- settings.gradle | 1 + 24 files changed, 1161 insertions(+), 58 deletions(-) create mode 100644 DataGenerator/README.md create mode 100644 DataGenerator/build.gradle create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java create mode 100644 DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java create mode 100644 DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java create mode 100644 DataGenerator/src/test/resources/log4j2.properties diff --git a/DataGenerator/README.md b/DataGenerator/README.md new file mode 100644 index 000000000..37e570b2f --- /dev/null +++ b/DataGenerator/README.md @@ -0,0 +1,69 @@ +# Data Generator + +This tool is used to generate data for testing of a search cluster. The workloads are similar to those of [OpenSearch Benchmark](https://github.com/opensearch-project/OpenSearch-Benchmark). + +> **⚠️ Test Infrastructure** +> This tool is for test infrastructure. Features may change without notice, and backward compatibility is not guaranteed. + +- [Data Generator](#data-generator) + - [Run Data Generator](#run-data-generator) + - [Run workloads programmatically](#run-workloads-programmatically) + - [Generate data via gradle](#generate-data-via-gradle) + +## Run Data Generator + +This tool can be used from the command line and programmatically. Programmatic is recommended approach. + +### Run workloads programmatically + +Insert the following code into the test case. + +```java +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; + +// Create or use an existing OpenSearchClient +var client = new OpenSearchClient(...); + +// Create an instance +var generator = new WorkloadGenerator(client); + +// Pass workload options to the generate method, in this case using the defaults +generator.generate(new WorkloadOptions()); +``` + + +### Generate data via gradle + +To upload data onto a test cluster the following + +```shell +./gradlew DataGenerator:run --args='--target-host http://hostname:9200' +``` + +
+ +Example command output + + +``` +$ ./gradlew DataGenerator:run --args=' --target-host https://172.18.0.1:19200 --target-insecure --target-username admin --target-password admin --docs-per-workload 1000' + +> Task :DataGenerator:run +2024-10-10 17:33:01,247 INFO o.o.m.u.ProcessHelpers [main] getNodeInstanceName()=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef +2024-10-10 17:33:01,249 INFO o.o.m.DataGenerator [main] Starting DataGenerator with workerId =generated_d0bf496d-1b80-4316-bf38-e3315321a3ef +2024-10-10 17:33:01,552 INFO o.o.m.d.WorkloadGenerator [main] Starting document creation +2024-10-10 17:33:02,858 INFO o.o.m.d.WorkloadGenerator [main] All document queued +2024-10-10 17:33:02,981 INFO o.o.m.d.WorkloadGenerator [main] All document completed +2024-10-10 17:33:02,981 INFO o.o.m.DataGenerator [main] Generation complete, took 1,429.00ms + +Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. + +You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. + +See https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings + +BUILD SUCCESSFUL in 4s +25 actionable tasks: 1 executed, 24 up-to-date +``` +
diff --git a/DataGenerator/build.gradle b/DataGenerator/build.gradle new file mode 100644 index 000000000..a8e69c9f5 --- /dev/null +++ b/DataGenerator/build.gradle @@ -0,0 +1,40 @@ +plugins { + id 'application' + id 'java' + id 'io.freefair.lombok' +} + +java.sourceCompatibility = JavaVersion.VERSION_11 +java.targetCompatibility = JavaVersion.VERSION_11 + +dependencies { + implementation project(":coreUtilities") + implementation project(":RFS") + + implementation group: 'org.jcommander', name: 'jcommander' + implementation group: 'org.slf4j', name: 'slf4j-api' + implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core' + + implementation platform('io.projectreactor:reactor-bom:2023.0.5') + implementation 'io.projectreactor.netty:reactor-netty-core' + implementation 'io.projectreactor.netty:reactor-netty-http' + + testImplementation testFixtures(project(':RFS')) + testImplementation testFixtures(project(':testHelperFixtures')) + testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core' + testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api' + testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params' + testImplementation group: 'org.mockito', name: 'mockito-core' + testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter' + testImplementation group: 'org.hamcrest', name: 'hamcrest' + testImplementation group: 'org.testcontainers', name: 'testcontainers' + + testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine' +} + +application { + mainClassName = 'org.opensearch.migrations.DataGenerator' +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java new file mode 100644 index 000000000..ed5ff2402 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/DataGenerator.java @@ -0,0 +1,54 @@ +package org.opensearch.migrations; + +import java.text.NumberFormat; + +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.utils.ProcessHelpers; + +import com.beust.jcommander.JCommander; +import lombok.extern.slf4j.Slf4j; + + +/** Command line tool to generate data on a search cluster */ +@Slf4j +public class DataGenerator { + + public static void main(String[] args) { + var workerId = ProcessHelpers.getNodeInstanceName(); + log.info("Starting DataGenerator with workerId =" + workerId); + + var arguments = new DataGeneratorArgs(); + var jCommander = JCommander.newBuilder() + .addObject(arguments) + .build(); + jCommander.parse(args); + + if (arguments.help) { + jCommander.usage(); + return; + } + + var dataGenerator = new DataGenerator(); + dataGenerator.run(arguments); + } + + public void run(DataGeneratorArgs arguments) { + var connectionContext = arguments.targetArgs.toConnectionContext(); + var client = new OpenSearchClient(connectionContext); + + var startTimeMillis = System.currentTimeMillis(); + var workloadGenerator = new WorkloadGenerator(client); + workloadGenerator.generate(arguments.workloadOptions); + var generateTimeMillis = System.currentTimeMillis() - startTimeMillis; + + log.info("Generation complete, took {}ms", formatMillis(generateTimeMillis)); + } + + private String formatMillis(long millis) { + var numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumFractionDigits(2); + numberFormat.setMaximumFractionDigits(2); + return numberFormat.format(millis); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java b/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java new file mode 100644 index 000000000..2cafd3e20 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/DataGeneratorArgs.java @@ -0,0 +1,18 @@ +package org.opensearch.migrations; + +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.data.WorkloadOptions; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParametersDelegate; + +public class DataGeneratorArgs { + @Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool") + public boolean help; + + @ParametersDelegate + public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs(); + + @ParametersDelegate + public WorkloadOptions workloadOptions = new WorkloadOptions(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java new file mode 100644 index 000000000..b2a922232 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java @@ -0,0 +1,24 @@ +package org.opensearch.migrations.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** Shared ways to build fields for index mappings */ +public class FieldBuilders { + private static final ObjectMapper mapper = new ObjectMapper(); + + public static ObjectNode createField(String type) { + var field = mapper.createObjectNode(); + field.put("type", type); + return field; + } + + public static ObjectNode createFieldTextRawKeyword() { + var fieldNode = mapper.createObjectNode(); + fieldNode.put("type", "text"); + var fieldsNode = mapper.createObjectNode(); + fieldsNode.set("raw", createField("keyword")); + fieldNode.set("fields", fieldsNode); + return fieldNode; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java new file mode 100644 index 000000000..9866cca92 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java @@ -0,0 +1,16 @@ +package org.opensearch.migrations.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** Options index configuration */ +public class IndexOptions { + private static final ObjectMapper mapper = new ObjectMapper(); + + /** Improvement to add more flexibility with these values */ + public ObjectNode indexSettings = mapper.createObjectNode() + .put("index.number_of_shards", 5) + .put("index.number_of_replicas", 0) + .put("index.queries.cache.enabled", false) + .put("index.requests.cache.enable", false); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java new file mode 100644 index 000000000..6ccfe5ea3 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java @@ -0,0 +1,38 @@ +package org.opensearch.migrations.data; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Random; + +import lombok.experimental.UtilityClass; + +/** Shared ways to build random data */ +@UtilityClass +public class RandomDataBuilders { + private static final ZoneId UTC_ZONE = ZoneId.of("UTC"); + private static final DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private static final int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000; + + public static long randomTime(long timeFrom, Random random) { + return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS); + } + + public static String randomTimeISOString(long timeFrom, Random random) { + var timeMillis = randomTime(timeFrom, random); + var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE); + return SIMPLE_DATE_PATTERN.format(timeInstant); + } + + public static double randomDouble(Random random, double min, double max) { + return min + (max - min) * random.nextDouble(); + } + + public static String randomElement(String[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } + + public static int randomElement(int[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java new file mode 100644 index 000000000..5787e00bb --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -0,0 +1,62 @@ +package org.opensearch.migrations.data; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.opensearch.migrations.bulkload.common.DocumentReindexer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.data.workloads.Workload; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@AllArgsConstructor +public class WorkloadGenerator { + + private final OpenSearchClient client; + + public void generate(WorkloadOptions options) { + log.info("Starting document creation"); + + // This workflow creates ALL documents in memory, schedules them and waits for completion. + // If larger scale is needed remove the toList() calls and stream all data. + var allDocs = new ArrayList>(); + for (var workload : options.workloads) { + var workloadInstance = workload.getNewInstance().get(); + var docs = workloadInstance + .indexNames() + .stream() + .map(indexName -> generateDocs(indexName, workloadInstance, options)) + .flatMap(List::stream) + .collect(Collectors.toList()); + allDocs.addAll(docs); + } + + log.info("All document queued"); + CompletableFuture.allOf(allDocs.toArray(new CompletableFuture[0])).join(); + log.info("All document completed"); + } + + private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { + // This happens inline to be sure the index exists before docs are indexed on it + client.createIndex(indexName, workload.createIndex(options.index.indexSettings.deepCopy()), null); + + var docIdCounter = new AtomicInteger(0); + var allDocs = workload.createDocs(options.totalDocs) + .map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_ " + docIdCounter.incrementAndGet(), doc.toString())) + .collect(Collectors.toList()); + + var bulkDocGroups = new ArrayList>(); + for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) { + bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size()))); + } + + return bulkDocGroups.stream() + .map(docs -> client.sendBulkRequest(indexName, docs, null).toFuture()) + .collect(Collectors.toList()); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java new file mode 100644 index 000000000..04555a991 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -0,0 +1,21 @@ +package org.opensearch.migrations.data; + +import java.util.Arrays; +import java.util.List; + +import org.opensearch.migrations.data.workloads.Workloads; + +import com.beust.jcommander.Parameter; + +public class WorkloadOptions { + @Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false) + public List workloads = Arrays.asList(Workloads.values()); + + @Parameter(names = { "--docs-per-workload" }, description = "The number of documents per each workload") + public int totalDocs = 1000; + + @Parameter(names = { "--max-bulk-request-batch-size "}, description = "For bulk requests the larger batch size") + public int maxBulkBatchSize = 50; + + public IndexOptions index = new IndexOptions(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java new file mode 100644 index 000000000..1728849dc --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java @@ -0,0 +1,126 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; +import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; + +/** + * Workload based off of Geonames + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames + */ +public class Geonames implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] COUNTRY_CODES = { "US", "DE", "FR", "GB", "CN", "IN", "BR" }; + + @Override + public List indexNames() { + return List.of("geonames"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/geonames/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("geonameid", createField("long")); + properties.set("name", createFieldTextRawKeyword()); + properties.set("asciiname", createFieldTextRawKeyword()); + properties.set("alternatenames", createFieldTextRawKeyword()); + properties.set("feature_class", createFieldTextRawKeyword()); + properties.set("feature_code", createFieldTextRawKeyword()); + properties.set("cc2", createFieldTextRawKeyword()); + properties.set("admin1_code", createFieldTextRawKeyword()); + properties.set("admin2_code", createFieldTextRawKeyword()); + properties.set("admin3_code", createFieldTextRawKeyword()); + properties.set("admin4_code", createFieldTextRawKeyword()); + properties.set("elevation", createField("integer")); + properties.set("population", createField("long")); + properties.set("dem", createFieldTextRawKeyword()); + properties.set("timezone", createFieldTextRawKeyword()); + properties.set("location", createField("geo_point")); + + var countryCodeField = createFieldTextRawKeyword(); + countryCodeField.put("fielddata", true); + properties.set("country_code", countryCodeField); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + return index; + } + + /** + * Example generated document: + { + "geonameid": 1018, + "name": "City19", + "asciiname": "City19", + "alternatenames": "City19", + "feature_class": "FCl19", + "feature_code": "FCo19", + "country_code": "DE", + "cc2": "cc219", + "admin1_code": "admin19", + "population": 621, + "dem": "699", + "timezone": "TZ19", + "location": [ + -104.58261595311684, + -58.923212235479056 + ] + } + */ + @Override + public Stream createDocs(int numDocs) { + return IntStream.range(0, numDocs) + .mapToObj(i -> { + // These documents are have a low degree of uniqueness, + // there is an opportunity to augment them by using Random more. + var random = new Random(i); + var doc = mapper.createObjectNode(); + doc.put("geonameid", i + 1000); + doc.put("name", "City" + (i + 1)); + doc.put("asciiname", "City" + (i + 1)); + doc.put("alternatenames", "City" + (i + 1)); + doc.put("feature_class", "FCl" + (i + 1)); + doc.put("feature_code", "FCo" + (i + 1)); + doc.put("country_code", randomCountryCode(random)); + doc.put("cc2", "cc2" + (i + 1)); + doc.put("admin1_code", "admin" + (i + 1)); + doc.put("population", random.nextInt(1000)); + doc.put("dem", random.nextInt(1000) + ""); + doc.put("timezone", "TZ" + (i + 1)); + doc.set("location", randomLocation(random)); + return doc; + } + ); + } + + private static ArrayNode randomLocation(Random random) { + var location = mapper.createArrayNode(); + location.add(randomDouble(random, -180, 180)); // Longitude + location.add(randomDouble(random, -90, 90)); // Latitude + return location; + } + + private static String randomCountryCode(Random random) { + return randomElement(COUNTRY_CODES, random); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java new file mode 100644 index 000000000..182a6287e --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java @@ -0,0 +1,132 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; + +/** + * Workload based off of http_logs + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/http_logs + */ +public class HttpLogs implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] HTTP_METHODS = { "GET", "POST", "PUT", "DELETE" }; + private static final int[] RESPONSE_CODES = { 200, 201, 400, 401, 403, 404, 500 }; + private static final String[] URLS = { + "/home", "/login", "/search", "/api/data", "/contact" + }; + + @Override + public List indexNames() { + return List.of( + "logs-181998", + "logs-191998", + "logs-201998", + "logs-211998", + "logs-221998", + "logs-231998", + "logs-241998" + ); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/http_logs/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + var timestamp = createField("date"); + timestamp.put("format", "strict_date_optional_time||epoch_second"); + properties.set("@timestamp", timestamp); + var message = createField("keyword"); + message.put("index", false); + message.put("doc_values", false); + properties.set("message", message); + properties.set("clientip", createField("ip")); + var request = createFieldTextRawKeyword(); + var requestRaw = (ObjectNode) request.get("fields").get("raw"); + requestRaw.put("ignore_above", 256); + properties.set("request", request); + properties.set("status", createField("integer")); + properties.set("size", createField("integer")); + var geoip = mapper.createObjectNode(); + var geoipProps = mapper.createObjectNode(); + geoip.set("properties", geoipProps); + geoipProps.set("country_name", createField("keyword")); + geoipProps.set("city_name", createField("keyword")); + geoipProps.set("location", createField("geo_point")); + properties.set("geoip", geoip); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + return index; + } + + /** + * Example generated document: + { + "@timestamp": 1728504268181, + "clientip": "106.171.39.19", + "request": "POST /contact HTTP/1.0", + "status": 403, + "size": 16672893 + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + ObjectNode doc = mapper.createObjectNode(); + doc.put("@timestamp", randomTime(currentTime, random)); + doc.put("clientip", randomIpAddress(random)); + doc.put("request", randomRequest(random)); + doc.put("status", randomStatus(random)); + doc.put("size", randomResponseSize(random)); + return doc; + } + ); + } + + private static String randomIpAddress(Random random) { + return random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256); + } + + private static String randomHttpMethod(Random random) { + return randomElement(HTTP_METHODS, random); + } + + private static String randomRequest(Random random) { + return randomHttpMethod(random) + " " + randomUrl(random) + " HTTP/1.0"; + } + + private static String randomUrl(Random random) { + return randomElement(URLS, random); + } + + private static int randomStatus(Random random) { + return randomElement(RESPONSE_CODES, random); + } + + private static int randomResponseSize(Random random) { + return random.nextInt(50 * 1024 * 1024); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java new file mode 100644 index 000000000..f8ba62869 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java @@ -0,0 +1,149 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; + +/** + * Workload based off of nested + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nested + */ +public class Nested implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] USER_NAMES = { + "alice", "bob", "charlie", "david", "eve", + "frank", "grace", "heidi", "ivan", "judy" + }; + private static final String[] TAGS = { + "java", "python", "c++", "javascript", "html", + "css", "sql", "bash", "docker", "kubernetes" + }; + private static final String[] WORDS = { + "the", "quick", "brown", "fox", "jumps", "over", "lazy", "dog" + }; + + @Override + public List indexNames() { + return List.of("sonested"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nested/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("user", createField("keyword")); + properties.set("creationDate", createField("date")); + properties.set("title", createField("text")); + properties.set("qid", createField("keyword")); + properties.set("tag", createField("keyword")); + properties.set("answer_count", createField("integer")); + var answers = createField("nested"); + var answersProps = mapper.createObjectNode(); + answers.set("properties", answersProps); + answersProps.set("user", createField("keyword")); + answersProps.set("date", createField("date")); + properties.set("answers", answers); + + var mappings = mapper.createObjectNode(); + mappings.put("dynamic", "strict"); + mappings.set("properties", properties); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + + return index; + } + + /** + * Example generated document: + { + "title": "", + "qid": "1405", + "answers": [ + { + "date": 1728487507935, + "user": "frank (1006)" + } + ], + "tag": [ + "bashv6" + ], + "user": "judy (1001)", + "creationDate": 1728506897762 + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + var creationTime = randomTime(currentTime, random); + var doc = mapper.createObjectNode(); + doc.put("title", randomTitle(random)); + doc.put("qid", (i + 1000) + ""); + doc.set("answers", randomAnswers(mapper, creationTime, random)); + doc.set("tag", randomTags(random)); + doc.put("user", randomUser(random)); + doc.put("creationDate", creationTime); + return doc; + } + ); + } + + private static ArrayNode randomAnswers(ObjectMapper mapper, long timeFrom, Random random) { + var answers = mapper.createArrayNode(); + var numAnswers = random.nextInt(5) + 1; + + for (int i = 0; i < numAnswers; i++) { + var answer = mapper.createObjectNode(); + answer.put("date", randomTime(timeFrom, random)); + answer.put("user", randomUser(random)); + + answers.add(answer); + } + return answers; + } + + private static String randomUser(Random random) { + // Extra random int simulates more users + return randomElement(USER_NAMES, random) + " (" + (random.nextInt(10) + 1000) + ")"; + } + + private static ArrayNode randomTags(Random random) { + var tags = mapper.createArrayNode(); + var tagsToCreate = random.nextInt(3) + 1; + + for (int i = 0; i < tagsToCreate; i++) { + tags.add(randomElement(TAGS, random) + "v" + random.nextInt(10)); // Extra random int simulates more tags + } + return tags; + } + + private static String randomTitle(Random random) { + var titleWordLength = random.nextInt(5); + var words = new ArrayList(); + + for (int i = 0; i < titleWordLength; i++) { + words.add(randomElement(WORDS, random) + "" + random.nextInt(10)); // Extra random int simulates more words + } + return words.stream().collect(Collectors.joining(" ")); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java new file mode 100644 index 000000000..26fa6da49 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java @@ -0,0 +1,173 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import static org.opensearch.migrations.data.FieldBuilders.createField; +import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; +import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; +import static org.opensearch.migrations.data.RandomDataBuilders.randomTimeISOString; + +/** + * Workload based off of nyc_taxis + * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nyc_taxis + */ +public class NycTaxis implements Workload { + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final String[] TRIP_TYPES = {"1" , "2"}; + private static final String[] PAYMENT_TYPES = {"1", " 2", "3", "4"}; + private static final String[] STORE_AND_FWD_FLAGS = {"Y", "N"}; + private static final String[] VENDOR_IDS = {"1", "2"}; + + @Override + public List indexNames() { + return List.of("nyc_taxis"); + } + + /** + * Mirroring index configuration from + * https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/index.json + */ + @Override + public ObjectNode createIndex(ObjectNode defaultSettings) { + var properties = mapper.createObjectNode(); + properties.set("cab_color", createField("keyword")); + properties.set("dropoff_datetime", createDateField()); + properties.set("dropoff_location", createField("geo_point")); + properties.set("ehail_fee", createScaledFloatField()); + properties.set("extra", createScaledFloatField()); + properties.set("fare_amount", createScaledFloatField()); + properties.set("improvement_surcharge", createScaledFloatField()); + properties.set("mta_tax", createScaledFloatField()); + properties.set("passenger_count", createField("integer")); + properties.set("payment_type", createField("keyword")); + properties.set("pickup_datetime", createDateField()); + properties.set("pickup_location", createField("geo_point")); + properties.set("rate_code_id", createField("keyword")); + properties.set("store_and_fwd_flag", createField("keyword")); + properties.set("surcharge", createScaledFloatField()); + properties.set("tip_amount", createScaledFloatField()); + properties.set("tolls_amount", createScaledFloatField()); + properties.set("total_amount", createScaledFloatField()); + properties.set("trip_distance", createScaledFloatField()); + properties.set("trip_type", createField("keyword")); + properties.set("vendor_id", createField("keyword")); + properties.set("vendor_name", createField("text")); + + + var mappings = mapper.createObjectNode(); + mappings.set("properties", properties); + mappings.put("dynamic", "strict"); + + var index = mapper.createObjectNode(); + index.set("mappings", mappings); + index.set("settings", defaultSettings); + + return index; + } + + private static ObjectNode createScaledFloatField() { + var property = mapper.createObjectNode(); + property.put("type", "scaled_float"); + property.put("scaling_factor", 100); + return property; + } + + private static ObjectNode createDateField() { + var field = mapper.createObjectNode(); + field.put("type", "date"); + field.put("format", "yyyy-MM-dd HH:mm:ss"); + return field; + } + + /** + * Example generated document: + { + "total_amount": 48.96852646813233, + "improvement_surcharge": 0.3, + "pickup_location": [ + -73.96071975181356, + 40.761333931139575 + ], + "pickup_datetime": "2024-10-10 03:39:22", + "trip_type": "2", + "dropoff_datetime": "2024-10-09 17:54:43", + "rate_code_id": "1", + "tolls_amount": 0.9381693846282485, + "dropoff_location": [ + -73.9126110288055, + 40.715247495239176 + ], + "passenger_count": 4, + "fare_amount": 21.07896409187173, + "extra": 0.5291259818883527, + "trip_distance": 1.124182854144491, + "tip_amount": 0.372383809916233, + "store_and_fwd_flag": "Y", + "payment_type": "3", + "mta_tax": 0.5, + "vendor_id": "2" + } + */ + @Override + public Stream createDocs(int numDocs) { + var currentTime = System.currentTimeMillis(); + + return IntStream.range(0, numDocs) + .mapToObj(i -> { + var random = new Random(i); + var doc = mapper.createObjectNode(); + doc.put("total_amount", randomDouble(random, 5.0, 50.0)); + doc.put("improvement_surcharge", 0.3); + doc.set("pickup_location", randomLocationInNyc(random)); + doc.put("pickup_datetime", randomTimeISOString(currentTime, random)); + doc.put("trip_type", randomTripType(random)); + doc.put("dropoff_datetime", randomTimeISOString(currentTime, random)); + doc.put("rate_code_id", "1"); + doc.put("tolls_amount", randomDouble(random, 0.0, 5.0)); + doc.set("dropoff_location", randomLocationInNyc(random)); + doc.put("passenger_count", random.nextInt(4) + 1); + doc.put("fare_amount", randomDouble(random, 5.0, 50.0)); + doc.put("extra", randomDouble(random, 0.0, 1.0)); + doc.put("trip_distance", randomDouble(random, 0.5, 20.0)); + doc.put("tip_amount", randomDouble(random, 0.0, 15.0)); + doc.put("store_and_fwd_flag", randomStoreAndFwdFlag(random)); + doc.put("payment_type", randomPaymentType(random)); + doc.put("mta_tax", 0.5); + doc.put("vendor_id", randomVendorId(random)); + + return doc; + } + ); + } + + private static ArrayNode randomLocationInNyc(Random random) { + var location = mapper.createArrayNode(); + location.add(randomDouble(random, -74.05, -73.75)); // Longitude + location.add(randomDouble(random, 40.63, 40.85)); // Latitude + return location; + } + + private static String randomTripType(Random random) { + return randomElement(TRIP_TYPES, random); + } + + private static String randomPaymentType(Random random) { + return randomElement(PAYMENT_TYPES, random); + } + + private static String randomStoreAndFwdFlag(Random random) { + return randomElement(STORE_AND_FWD_FLAGS, random); + } + + private static String randomVendorId(Random random) { + return randomElement(VENDOR_IDS, random); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java new file mode 100644 index 000000000..e83e5192b --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workload.java @@ -0,0 +1,20 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.List; +import java.util.stream.Stream; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * Defines a set of indices, settings, and documents that can be added onto a cluster + */ +public interface Workload { + /** Create an index for the workload with the default settings incorporated */ + ObjectNode createIndex(ObjectNode defaultSettings); + + /** Creates a stream of documents for this workload */ + Stream createDocs(int numDocs); + + /** The name(s) of the indices that should be created for this workload */ + List indexNames(); +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java new file mode 100644 index 000000000..7ac08b976 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java @@ -0,0 +1,17 @@ +package org.opensearch.migrations.data.workloads; + +import java.util.function.Supplier; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +public enum Workloads { + Geonames(Geonames::new), + HttpLogs(HttpLogs::new), + Nested(Nested::new), + NycTaxis(NycTaxis::new); + + @Getter + private Supplier newInstance; +} diff --git a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java new file mode 100644 index 000000000..17d370c8f --- /dev/null +++ b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java @@ -0,0 +1,73 @@ +package org.opensearch.migrations; + +import java.util.Map; + +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.bulkload.http.SearchClusterRequests; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; + +/** + * Tests focused on running end to end test cases for Data Generator + */ +// @Tag("isolatedTest") +@Slf4j +class DataGeneratorEndToEnd { + + @Test + void generateData_OS_2_14() throws Exception { + try (var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)) { + generateData(targetCluster); + } + } + + @SneakyThrows + void generateData(final SearchClusterContainer targetCluster) { + // ACTION: Set up the target clusters + targetCluster.start(); + + var arguments = new DataGeneratorArgs(); + arguments.targetArgs.host = targetCluster.getUrl(); + + // ACTION: Generate the data on the target cluster + var dataGenerator = new DataGenerator(); + dataGenerator.run(arguments); + + // VERIFY: Get index state on the target cluster + var requestContext = DocumentMigrationTestContext.factory().noOtelTracking(); + var targetDetails = new SearchClusterRequests(requestContext); + var client = new RestClient(arguments.targetArgs.toConnectionContext()); + + // Make sure the cluster has refreshed before querying it + var refreshResponse = client.post("_refresh", "", requestContext.createUnboundRequestContext()); + assertThat(refreshResponse.body, refreshResponse.statusCode, equalTo(200)); + + // Confirm all indexes have the expected number of docs + var defaultCount = arguments.workloadOptions.totalDocs; + var expectedIndexes = Map.of( + "geonames", defaultCount, + "logs-181998", defaultCount, + "logs-191998", defaultCount, + "logs-201998", defaultCount, + "logs-211998", defaultCount, + "logs-221998", defaultCount, + "logs-231998", defaultCount, + "logs-241998", defaultCount, + "sonested", defaultCount, + "nyc_taxis", defaultCount + ); + + var indexMap = targetDetails.getMapOfIndexAndDocCount(client); + expectedIndexes.forEach((index, expectedDocs) -> + assertThat(indexMap, hasEntry(index, expectedDocs)) + ); + } +} diff --git a/DataGenerator/src/test/resources/log4j2.properties b/DataGenerator/src/test/resources/log4j2.properties new file mode 100644 index 000000000..6def22d58 --- /dev/null +++ b/DataGenerator/src/test/resources/log4j2.properties @@ -0,0 +1,40 @@ +status = INFO + +appenders = console + +appender.console.type = Console +appender.console.name = Console +appender.console.target = SYSTEM_OUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %m%n + +property.ownedPackagesLogLevel=${sys:migrationLogLevel:-DEBUG} + +rootLogger.level = info +rootLogger.appenderRef.console.ref = Console + +# Allow customization of owned package logs +logger.rfs.name = org.opensearch.migrations.bulkload +logger.rfs.level = ${ownedPackagesLogLevel} +logger.migration.name = org.opensearch.migrations +logger.migration.level = ${ownedPackagesLogLevel} + +logger.migrations.name = com.opensearch.migrations +logger.migrations.level = debug + +logger.transformer.name = org.opensearch.migrations.bulkload.transformers.Transformer_ES_6_8_to_OS_2_11 +logger.transformer.level = debug + +# Lower the logging level on these other systems +logger.wire.name = org.apache.hc.client5.http +logger.wire.level = info + +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = info + +logger.dockerclientdeps.name = com.github.dockerjava.zerodep +logger.dockerclientdeps.level = info + +logger.wireLogger.name = org.apache.http.wire +logger.wireLogger.level = OFF +logger.wireLogger.additivity = falsea diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index e023b7dcd..001138867 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -46,6 +46,7 @@ dependencies { testImplementation testFixtures(project(":RFS")) testImplementation testFixtures(project(":coreUtilities")) testImplementation testFixtures(project(":testHelperFixtures")) + testImplementation project(":DataGenerator") testImplementation project(":CreateSnapshot") testImplementation project(":MetadataMigration") testImplementation group: 'org.apache.lucene', name: 'lucene-core' diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java index b0256304d..381220b28 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ParallelDocumentMigrationsTest.java @@ -14,8 +14,11 @@ import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.bulkload.common.FileSystemRepo; -import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; @@ -29,6 +32,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; + @Tag("isolatedTest") @Slf4j public class ParallelDocumentMigrationsTest extends SourceTestBase { @@ -39,24 +43,19 @@ public class ParallelDocumentMigrationsTest extends SourceTestBase { static final List TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0); public static Stream makeDocumentMigrationArgs() { - List sourceImageArgs = SOURCE_IMAGES.stream() - .map(SourceTestBase::makeParamsForBase) - .collect(Collectors.toList()); var targetImageNames = TARGET_IMAGES.stream() .collect(Collectors.toList()); var numWorkersList = List.of(1, 3, 40); var compressionEnabledList = List.of(true, false); - return sourceImageArgs.stream() + return SOURCE_IMAGES.stream() .flatMap( - sourceParams -> targetImageNames.stream() + sourceImage -> targetImageNames.stream() .flatMap( targetImage -> numWorkersList.stream() .flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of( numWorkers, targetImage, - sourceParams[0], - sourceParams[1], - sourceParams[2], + sourceImage, compression )) ) @@ -69,9 +68,7 @@ public static Stream makeDocumentMigrationArgs() { public void testDocumentMigration( int numWorkers, SearchClusterContainer.ContainerVersion targetVersion, - SearchClusterContainer.ContainerVersion baseSourceImageVersion, - String generatorImage, - String[] generatorArgs, + SearchClusterContainer.ContainerVersion sourceVersion, boolean compressionEnabled ) throws Exception { var executorService = Executors.newFixedThreadPool(numWorkers); @@ -80,22 +77,24 @@ public void testDocumentMigration( .withAllTracking(); try ( - var esSourceContainer = new PreloadedSearchClusterContainer( - baseSourceImageVersion, - SOURCE_SERVER_ALIAS, - generatorImage, - generatorArgs - ); - SearchClusterContainer osTargetContainer = new SearchClusterContainer(targetVersion); + var esSourceContainer = new SearchClusterContainer(sourceVersion); + var osTargetContainer = new SearchClusterContainer(targetVersion); ) { - CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> { - esSourceContainer.start(); - return null; - }, executorService), CompletableFuture.supplyAsync(() -> { - osTargetContainer.start(); - return null; - }, executorService)).join(); + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> esSourceContainer.start(), executorService), + CompletableFuture.runAsync(() -> osTargetContainer.start(), executorService) + ).join(); + + // Populate the source cluster with data + var client = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + var generator = new WorkloadGenerator(client); + generator.generate(new WorkloadOptions()); + // Create the snapshot from the source cluster var args = new CreateSnapshot.Args(); args.snapshotName = "test_snapshot"; args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; @@ -104,7 +103,6 @@ public void testDocumentMigration( var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); snapshotCreator.run(); - final List INDEX_ALLOWLIST = List.of(); var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); try { @@ -125,7 +123,7 @@ public void testDocumentMigration( runCounter, clockJitter, testDocMigrationContext, - baseSourceImageVersion.getVersion(), + sourceVersion.getVersion(), compressionEnabled ), executorService diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index 8afee74fc..92e18aa6e 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -13,8 +13,11 @@ import java.util.function.Function; import org.opensearch.migrations.CreateSnapshot; -import org.opensearch.migrations.bulkload.framework.PreloadedSearchClusterContainer; +import org.opensearch.migrations.bulkload.common.OpenSearchClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.data.WorkloadGenerator; +import org.opensearch.migrations.data.WorkloadOptions; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; import org.opensearch.migrations.testutils.ToxiProxyWrapper; import org.opensearch.testcontainers.OpensearchContainer; @@ -60,7 +63,6 @@ private static class RunData { } @Test - @Tag("longTest") public void testExitsZeroThenThreeForSimpleSetup() throws Exception { testProcess(3, d -> { @@ -94,7 +96,8 @@ public void testExitsZeroThenThreeForSimpleSetup() throws Exception { // to such a short value (1s) that no document migration will exit in that amount of time. For good // measure though, the toxiproxy also adds latency to the requests to make it impossible for the // migration to complete w/in that 1s. - "WITH_DELAYS, 2" }) + "WITH_DELAYS, 2" + }) public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception { final var failHow = FailHow.valueOf(failAfterString); testProcess(expectedExitCode, @@ -105,10 +108,6 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC private void testProcess(int expectedExitCode, Function processRunner) { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); - var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2); - var baseSourceImageVersion = (SearchClusterContainer.ContainerVersion) sourceImageArgs[0]; - var generatorImage = (String) sourceImageArgs[1]; - var generatorArgs = (String[]) sourceImageArgs[2]; var targetImageName = SearchClusterContainer.OS_V2_14_0.getImageName(); var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); @@ -116,29 +115,30 @@ private void testProcess(int expectedExitCode, Function proces try ( var network = Network.newNetwork(); - var esSourceContainer = new PreloadedSearchClusterContainer( - baseSourceImageVersion, - SOURCE_SERVER_ALIAS, - generatorImage, - generatorArgs - ); + var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2) + .withNetwork(network) + .withNetworkAliases(SOURCE_SERVER_ALIAS); var osTargetContainer = new OpensearchContainer<>(targetImageName).withExposedPorts(OPENSEARCH_PORT) .withNetwork(network) .withNetworkAliases(TARGET_DOCKER_HOSTNAME); var proxyContainer = new ToxiProxyWrapper(network) ) { + CompletableFuture.allOf( + CompletableFuture.runAsync(() -> esSourceContainer.start()), + CompletableFuture.runAsync(() -> osTargetContainer.start()), + CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT)) + ).join(); + + // Populate the source cluster with data + var client = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + var generator = new WorkloadGenerator(client); + generator.generate(new WorkloadOptions()); - CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> { - esSourceContainer.start(); - return null; - }), CompletableFuture.supplyAsync(() -> { - osTargetContainer.start(); - return null; - }), CompletableFuture.supplyAsync(() -> { - proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT); - return null; - })).join(); - + // Create the snapshot from the source cluster var args = new CreateSnapshot.Args(); args.snapshotName = SNAPSHOT_NAME; args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java index 9e003706c..b9a1caa35 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java @@ -96,15 +96,20 @@ public static class BulkDocSection { private final String docId; private final String bulkIndex; + public BulkDocSection(String id, String docBody) { + this.docId = id; + this.bulkIndex = createBulkIndex(docId, docBody); + } + public BulkDocSection(RfsLuceneDocument doc) { this.docId = doc.id; - this.bulkIndex = createBulkIndex(docId, doc); + this.bulkIndex = createBulkIndex(docId, doc.source); } @SneakyThrows - private static String createBulkIndex(final String docId, final RfsLuceneDocument doc) { + private static String createBulkIndex(final String docId, final String doc) { // For a successful bulk ingestion, we cannot have any leading or trailing whitespace, and must be on a single line. - String trimmedSource = doc.source.trim().replace("\n", ""); + String trimmedSource = doc.trim().replace("\n", ""); return "{\"index\":{\"_id\":\"" + docId + "\"}}" + "\n" + trimmedSource; } diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 0d01ced82..16c844c1e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -44,7 +44,7 @@ public class OpenSearchClient { .maxBackoff(DEFAULT_MAX_BACKOFF) .filter(throwable -> !(throwable instanceof InvalidResponse)); // Do not retry on this exception - private static final int BULK_MAX_RETRY_ATTEMPTS = 15; + private static final int BULK_MAX_RETRY_ATTEMPTS = 2; private static final Duration BULK_BACKOFF = Duration.ofSeconds(2); private static final Duration BULK_MAX_BACKOFF = Duration.ofSeconds(60); /** Retries for up 10 minutes */ @@ -192,7 +192,8 @@ private Optional createObjectIdempotent( ) { var objectDoesNotExist = !hasObjectCheck(objectPath, context); if (objectDoesNotExist) { - client.putAsync(objectPath, settings.toString(), context.createCheckRequestContext()).flatMap(resp -> { + var putRequestContext = context == null ? null : context.createCheckRequestContext(); + client.putAsync(objectPath, settings.toString(), putRequestContext).flatMap(resp -> { if (resp.statusCode == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else if (resp.statusCode == HttpURLConnection.HTTP_BAD_REQUEST) { diff --git a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java index 439862882..fbbd580af 100644 --- a/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java +++ b/RFS/src/testFixtures/java/org/opensearch/migrations/bulkload/http/ClusterOperations.java @@ -52,7 +52,8 @@ public void createSnapshotRepository(final String repoPath) throws IOException { } } - public void createDocument(final String index, final String docId, final String body) throws IOException { + @SneakyThrows + public void createDocument(final String index, final String docId, final String body) { var indexDocumentRequest = new HttpPut(clusterUrl + "/" + index + "/_doc/" + docId); indexDocumentRequest.setEntity(new StringEntity(body)); indexDocumentRequest.setHeader("Content-Type", "application/json"); @@ -70,6 +71,30 @@ public void deleteDocument(final String index, final String docId) throws IOExce } } + public void createIndex(final String index) { + var body = "{" + // + " \"settings\": {" + // + " \"index\": {" + // + " \"number_of_shards\": 5," + // + " \"number_of_replicas\": 0" + // + " }" + // + " }" + // + "}"; + createIndex(index, body); + } + + @SneakyThrows + public void createIndex(final String index, final String body) { + var createIndexRequest = new HttpPut(clusterUrl + "/" + index); + createIndexRequest.setEntity(new StringEntity(body)); + createIndexRequest.setHeader("Content-Type", "application/json"); + + try (var response = httpClient.execute(createIndexRequest)) { + var responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + assertThat(responseBody, response.getCode(), anyOf(equalTo(201), equalTo(200))); + } + } + @SneakyThrows public Map.Entry get(final String path) { final var getRequest = new HttpGet(clusterUrl + path); diff --git a/settings.gradle b/settings.gradle index a518c8316..1712d3abb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,6 +48,7 @@ include 'testHelperFixtures' include 'RFS' include 'CreateSnapshot' include 'dashboardsSanitizer' +include 'DataGenerator' include 'MetadataMigration' include 'DocumentsFromSnapshotMigration' include 'TrafficCapture:captureKafkaOffloader' From fe01c799ca7ff9615bff198a7434999900f3413c Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Fri, 11 Oct 2024 20:45:12 +0000 Subject: [PATCH 2/2] Pr feedback updates Signed-off-by: Peter Nied --- DataGenerator/README.md | 46 ++++++++++++++----- .../migrations/data/WorkloadGenerator.java | 6 +-- .../migrations/data/WorkloadOptions.java | 4 +- .../migrations/DataGeneratorEndToEnd.java | 3 +- .../bulkload/common/OpenSearchClient.java | 2 +- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/DataGenerator/README.md b/DataGenerator/README.md index 37e570b2f..298736248 100644 --- a/DataGenerator/README.md +++ b/DataGenerator/README.md @@ -1,22 +1,47 @@ # Data Generator -This tool is used to generate data for testing of a search cluster. The workloads are similar to those of [OpenSearch Benchmark](https://github.com/opensearch-project/OpenSearch-Benchmark). +This tool is used to generate data for testing a search cluster. The workloads are similar to those of [OpenSearch Benchmark](https://github.com/opensearch-project/OpenSearch-Benchmark). > **⚠️ Test Infrastructure** > This tool is for test infrastructure. Features may change without notice, and backward compatibility is not guaranteed. - [Data Generator](#data-generator) + - [Workloads](#workloads) + - [HttpLogs](#httplogs) + - [Geonames](#geonames) + - [Nested](#nested) + - [NycTaxis](#nyctaxis) - [Run Data Generator](#run-data-generator) - [Run workloads programmatically](#run-workloads-programmatically) - - [Generate data via gradle](#generate-data-via-gradle) + - [Generate data via Gradle](#generate-data-via-gradle) + +## Workloads + +The following workloads are supported and can be controlled with the `--workloads [workload1] [workload2] [...]`: `HttpLogs`, `Geonames`, `Nested`, `NycTaxis`. + +### HttpLogs + +Multiple indices with HTTP request log file entries that include client IP, timestamp, and request details. + +### Geonames + +A single index containing a list of geographic features from all over the world. + +### Nested + +A single index of Stack Overflow questions with user IDs and timestamps, along with answers containing user IDs and timestamps, using the nested mapping type. + +### NycTaxis + +A single index of taxi trip record data from every time a taxi dropped off a fare in the area. ## Run Data Generator -This tool can be used from the command line and programmatically. Programmatic is recommended approach. +This tool can be used from the command line and programmatically. The programmatic approach is recommended. ### Run workloads programmatically -Insert the following code into the test case. +Insert the following code into the test case: ```java import org.opensearch.migrations.data.WorkloadGenerator; @@ -32,10 +57,9 @@ var generator = new WorkloadGenerator(client); generator.generate(new WorkloadOptions()); ``` +### Generate data via Gradle -### Generate data via gradle - -To upload data onto a test cluster the following +To upload data onto a test cluster, use the following command: ```shell ./gradlew DataGenerator:run --args='--target-host http://hostname:9200' @@ -47,14 +71,14 @@ Example command output ``` -$ ./gradlew DataGenerator:run --args=' --target-host https://172.18.0.1:19200 --target-insecure --target-username admin --target-password admin --docs-per-workload 1000' +$ ./gradlew DataGenerator:run --args=' --target-host https://172.18.0.1:19200 --target-insecure --target-username admin --target-password admin --docs-per-workload-count 1000' > Task :DataGenerator:run 2024-10-10 17:33:01,247 INFO o.o.m.u.ProcessHelpers [main] getNodeInstanceName()=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef -2024-10-10 17:33:01,249 INFO o.o.m.DataGenerator [main] Starting DataGenerator with workerId =generated_d0bf496d-1b80-4316-bf38-e3315321a3ef +2024-10-10 17:33:01,249 INFO o.o.m.DataGenerator [main] Starting DataGenerator with workerId=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef 2024-10-10 17:33:01,552 INFO o.o.m.d.WorkloadGenerator [main] Starting document creation -2024-10-10 17:33:02,858 INFO o.o.m.d.WorkloadGenerator [main] All document queued -2024-10-10 17:33:02,981 INFO o.o.m.d.WorkloadGenerator [main] All document completed +2024-10-10 17:33:02,858 INFO o.o.m.d.WorkloadGenerator [main] All documents queued +2024-10-10 17:33:02,981 INFO o.o.m.d.WorkloadGenerator [main] All documents completed 2024-10-10 17:33:02,981 INFO o.o.m.DataGenerator [main] Generation complete, took 1,429.00ms Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0. diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java index 5787e00bb..f1257b37e 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -22,7 +22,7 @@ public class WorkloadGenerator { public void generate(WorkloadOptions options) { log.info("Starting document creation"); - // This workflow creates ALL documents in memory, schedules them and waits for completion. + // This workload creates ALL documents in memory, schedules them and waits for completion. // If larger scale is needed remove the toList() calls and stream all data. var allDocs = new ArrayList>(); for (var workload : options.workloads) { @@ -36,9 +36,9 @@ public void generate(WorkloadOptions options) { allDocs.addAll(docs); } - log.info("All document queued"); + log.info("All documents queued"); CompletableFuture.allOf(allDocs.toArray(new CompletableFuture[0])).join(); - log.info("All document completed"); + log.info("All documents completed"); } private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java index 04555a991..6335d8a26 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -11,10 +11,10 @@ public class WorkloadOptions { @Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false) public List workloads = Arrays.asList(Workloads.values()); - @Parameter(names = { "--docs-per-workload" }, description = "The number of documents per each workload") + @Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload") public int totalDocs = 1000; - @Parameter(names = { "--max-bulk-request-batch-size "}, description = "For bulk requests the larger batch size") + @Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests") public int maxBulkBatchSize = 50; public IndexOptions index = new IndexOptions(); diff --git a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java index 17d370c8f..3b6cd58db 100644 --- a/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java +++ b/DataGenerator/src/test/java/org/opensearch/migrations/DataGeneratorEndToEnd.java @@ -9,6 +9,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.equalTo; @@ -18,7 +19,7 @@ /** * Tests focused on running end to end test cases for Data Generator */ -// @Tag("isolatedTest") +@Tag("isolatedTest") @Slf4j class DataGeneratorEndToEnd { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index 16c844c1e..cb0cd40d8 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -44,7 +44,7 @@ public class OpenSearchClient { .maxBackoff(DEFAULT_MAX_BACKOFF) .filter(throwable -> !(throwable instanceof InvalidResponse)); // Do not retry on this exception - private static final int BULK_MAX_RETRY_ATTEMPTS = 2; + private static final int BULK_MAX_RETRY_ATTEMPTS = 15; private static final Duration BULK_BACKOFF = Duration.ofSeconds(2); private static final Duration BULK_MAX_BACKOFF = Duration.ofSeconds(60); /** Retries for up 10 minutes */