diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt index 292c194e6..0a5954a94 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -9,9 +9,16 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse import org.opensearch.client.Client +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -21,6 +28,8 @@ import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.search.aggregations.AggregationBuilder import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets /** * Service designed for creating dynamic target index mapping based on the date field types of the source index. @@ -33,11 +42,7 @@ object TargetIndexMappingService { private const val TYPE = "type" private const val PROPERTIES = "properties" - private const val METADATA = "_meta" - private const val SCHEMA_VERSION = "schema_version" - private const val DYNAMIC_TEMPLATE = "dynamic_templates" - private const val MATCH_MAPPING_TYPE = "match_mapping_type" - private const val MAPPING = "mapping" + private val DATE_FIELD_TYPES = setOf("date", "date_nanos") fun initialize(client: Client) { this.client = client @@ -97,35 +102,58 @@ object TargetIndexMappingService { } private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) = - sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos") + sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE]) + /** + * Loads transform target index mappings from json and adds date properties mapping + * + * @param dateFieldMappings target index date fields mappings + */ fun createTargetIndexMapping(dateFieldMappings: Map): String { - /** TODO - Check if the mappings from file can be loaded into the XContentBuilder - * val dynamicMappings = IndexManagementIndices.transformTargetMappings - * val mappings = createTargetIndexMappingsAsString(dateFieldMappings, dynamicMappings) - */ - // Build static properties - val builder = XContentFactory.jsonBuilder().startObject() - .startObject(METADATA) - .field(SCHEMA_VERSION, 1) - .endObject() - .startArray(DYNAMIC_TEMPLATE) - .startObject() - .startObject("strings") - .field(MATCH_MAPPING_TYPE, "string") - .startObject(MAPPING) - .field(TYPE, "keyword") - .endObject() - .endObject() - .endObject() - .endArray() - .startObject(PROPERTIES) - - // Dynamically build composite aggregation mapping - mapCompositeAggregation(dateFieldMappings, builder) - return builder.endObject() - .endObject() - .string() + val builder = XContentFactory.jsonBuilder() + val dynamicMappings = IndexManagementIndices.transformTargetMappings + val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8)) + val bytesReference = BytesReference.fromByteBuffer(byteBuffer) + + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + XContentType.JSON + ) + loop@while (!xcp.isClosed) { + val token = xcp.currentToken() + val fieldName = xcp.currentName() + + when (token) { + XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue()) + XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text()) + XContentParser.Token.START_OBJECT -> { + if (fieldName != null) { + builder.startObject(fieldName) + } else { + builder.startObject() + } + } + XContentParser.Token.END_OBJECT -> builder.endObject() + XContentParser.Token.START_ARRAY -> builder.startArray(fieldName) + XContentParser.Token.END_ARRAY -> { + builder.endArray() + // Add target index date fields mappings only if the date field mappings are present + if (dateFieldMappings.isNotEmpty()) { + builder.startObject(PROPERTIES) + mapCompositeAggregation(dateFieldMappings, builder) + builder.endObject() + } + } + else -> { + xcp.nextToken() + continue@loop + } + } + xcp.nextToken() + } + return builder.string() } @Suppress("UNCHECKED_CAST") @@ -142,7 +170,9 @@ object TargetIndexMappingService { mapCompositeAggregation(it.value as Map, builder) builder.endObject() } else { - builder.field(it.key, it.value.toString()) + if (DATE_FIELD_TYPES.contains(it.value)) { + builder.field(it.key, it.value.toString()) + } } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index daa940678..6afa93637 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.transform -import org.opensearch.indexmanagement.transform.util.formatMillis import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException @@ -356,14 +355,7 @@ class TransformSearchService( val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments) aggregatedBucket.key.entries.forEach { bucket -> - // Check if the date is used as a term (exists in conversion list) - if it is, - // convert the date (which is in epoch time millis format to ISO 8601) -// document[bucket.key] = bucket.value - if (targetIndexDateFieldMappings.isNullOrEmpty() || !targetIndexDateFieldMappings.containsKey(bucket.key)) { - document[bucket.key] = bucket.value - } else { - document[bucket.key] = formatMillis(bucket.value as Long) - } + document[bucket.key] = bucket.value } aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings) @@ -392,10 +384,15 @@ class TransformSearchService( return when (aggregation) { is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> { val agg = aggregation as NumericMetricsAggregation.SingleValue + /** + * When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format + * which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis + * That's why the exponential value is transformed to long: agg.value().toLong() + */ if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) { agg.value() } else { - formatMillis(agg.value().toLong()) + agg.value().toLong() } } is Percentiles -> { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt new file mode 100644 index 000000000..46416b4e1 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.junit.Assert +import org.opensearch.test.OpenSearchTestCase + +class TargetIndexMappingServiceTests : OpenSearchTestCase() { + + fun `test create target index mapping fields mapped correctly`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}""" + val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date")) + val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } + + fun `test create target index mapping empty map`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}""" + val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap()) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index ec60e589d..f41e2e935 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -286,7 +286,7 @@ class TransformRunnerIT : TransformRestTestCase() { } @Suppress("UNCHECKED_CAST") - fun `test transform term aggregation on date field generate target mapping same as source mapping date field`() { + fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() { val sourceIdxTestName = "source_idx_test_14" val targetIdxTestName = "target_idx_test_14" @@ -411,7 +411,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } - waitFor { + waitFor(timeout = Instant.ofEpochSecond(30)) { val transformJob = getTransform(transformId = transform.id) assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) val transformMetadata = getTransformMetadata(transformJob.metadataId!!) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt index d6aadb951..d3239edaa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.resthandler import org.junit.AfterClass import org.junit.Before import org.opensearch.client.ResponseException +import org.opensearch.common.time.DateFormatter import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.Terms @@ -20,12 +21,9 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import java.time.Instant -import java.time.LocalDate import java.time.ZoneId -import java.time.format.DateTimeFormatter import java.time.format.DateTimeParseException import java.time.temporal.ChronoUnit -import java.util.Locale @Suppress("UNCHECKED_CAST") class RestPreviewTransformActionIT : TransformRestTestCase() { @@ -111,9 +109,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Preview transform failed", RestStatus.OK, response.restStatus()) val transformedDocs = response.asMap()["documents"] as List> assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) - val dateFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT).withZone(ZoneId.of("UTC")) + val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC")) for (doc in transformedDocs) { - assertTrue(isValid(doc["tpep_pickup_datetime"] as? String, dateFormatter)) + assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter)) } } @@ -152,9 +150,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { } } - private fun isValid(dateStr: String?, dateFormatter: DateTimeFormatter): Boolean { + private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean { try { - LocalDate.parse(dateStr, dateFormatter) + dateFormatter.formatMillis(date) } catch (e: DateTimeParseException) { return false }