From a755e5179d8cd4c8df15739b61f804284711e1c7 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 11 May 2023 20:24:38 +0200 Subject: [PATCH] Addressed the comments Signed-off-by: Stevan Buzejic --- .../transform/TargetIndexMappingService.kt | 12 ++-- .../transform/TransformIndexer.kt | 11 ++- .../transform/TransformRunnerIT.kt | 69 +++++++++---------- 3 files changed, 45 insertions(+), 47 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt index bb2f4ac39..6fcf2c03b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -9,8 +9,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse import org.opensearch.client.Client -import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.opensearchapi.string @@ -75,15 +75,15 @@ object TargetIndexMappingService { } val sourceFieldType = IndexUtils.getFieldFromMappings(dimension.sourceField, sourceIndexMapping) // Consider only date fields as relevant for building the target index mapping - if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && sourceFieldType[TYPE] == "date") { + if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos")) { // Taking the source field settings (type, format etc.) - val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT) + val dateTypeTargetMapping = mapOf("type" to sourceFieldType[TYPE], "format" to DEFAULT_DATE_FORMAT) dateFieldMappings[dimension.targetField] = dateTypeTargetMapping } } } - fun createTargetIndexMapping(dateCompositeAggregations: Map): String { + fun createTargetIndexMapping(dateFieldMappings: Map): String { // Build static properties val builder = XContentFactory.jsonBuilder().startObject() .startObject(METADATA) @@ -102,7 +102,7 @@ object TargetIndexMappingService { .startObject(PROPERTIES) // Dynamically build composite aggregation mapping - mapCompositeAggregation(dateCompositeAggregations, builder) + mapCompositeAggregation(dateFieldMappings, builder) // Close the object and return as a string return builder.endObject() @@ -133,7 +133,7 @@ object TargetIndexMappingService { aggBuilders: Collection, sourceIndexMapping: Map, targetIndexMapping: MutableMap, - parentPath: String? + parentPath: String?, ) { val iterator = aggBuilders.iterator() while (iterator.hasNext()) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 2187219d1..ac49eb8ac 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -53,15 +53,14 @@ class TransformIndexer( } } - private suspend fun createTargetIndex(transform: Transform, targetFieldMappings: Map) { - val index = transform.targetIndex - if (!clusterService.state().routingTable.hasIndex(index)) { + private suspend fun createTargetIndex(targetIndex: String, targetFieldMappings: Map) { + if (!clusterService.state().routingTable.hasIndex(targetIndex)) { val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings) - val request = CreateIndexRequest(index).mapping(transformTargetIndexMapping) + val request = CreateIndexRequest(targetIndex).mapping(transformTargetIndexMapping) // TODO: Read in the actual mappings from the source index and use that val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } if (!response.isAcknowledged) { - logger.error("Failed to create the target index $index") + logger.error("Failed to create the target index $targetIndex") throw TransformIndexException("Failed to create the target index") } } @@ -77,7 +76,7 @@ class TransformIndexer( val targetIndex = updatableDocsToIndex.first().index() logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex") - createTargetIndex(transform, transformContext.getTargetIndexDateFieldMappings()) + createTargetIndex(transform.targetIndex, transformContext.getTargetIndexDateFieldMappings()) backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { val bulkRequest = BulkRequest().add(updatableDocsToIndex) val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 6b9242fb5..9ac82554d 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -321,27 +321,26 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } - Thread.sleep(60000) - - waitFor { + waitFor(Instant.ofEpochSecond(60)) { val transformJob = getTransform(transformId = transform.id) assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) val transformMetadata = getTransformMetadata(transformJob.metadataId!!) assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } - val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") - val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> - val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") - val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> - val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] - val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] - assertEquals(sourcePickupDate, targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) - val pickupDateTimeTerm = "pickupDateTerm14" + val pickupDateTimeTerm = "pickupDateTerm14" - val request = """ + val request = """ { "size": 0, "aggs": { @@ -354,24 +353,23 @@ class TransformRunnerIT : TransformRestTestCase() { } } } - """.trimIndent() + """ - var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) - assertTrue(rawRes.restStatus() == RestStatus.OK) + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) - var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) - assertTrue(transformRes.restStatus() == RestStatus.OK) + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) - val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! - val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! - assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) - // Verify the values of keys and metrics in all buckets - for (i in rawAggBuckets.indices) { - assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) - assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) - } + // Verify the values of keys and metrics in all buckets + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) } } @@ -418,20 +416,21 @@ class TransformRunnerIT : TransformRestTestCase() { assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) val transformMetadata = getTransformMetadata(transformJob.metadataId!!) assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } - val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") - val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> - val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") - val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> - val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] - val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] - assertEquals("date", targetPickupDate) - assertEquals(sourcePickupDate, targetPickupDate) + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + waitFor(Instant.ofEpochSecond(30)) { val storeAndForwardTerm = "storeAndForwardTerm" - val request = """ { "size": 0, @@ -447,7 +446,7 @@ class TransformRunnerIT : TransformRestTestCase() { } } } - """.trimIndent() + """ var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) assertTrue(rawRes.restStatus() == RestStatus.OK)