From 96af8b67b21d0fcf32e33e7b5139d84b371e120a Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 15 Sep 2022 13:20:55 +0200 Subject: [PATCH] Added function for creating target index mapping that considers transform mapping json. Target date field mappings are generated after transform validation when running transform. Removed target index date field values formatting. emoved default format for date_histogram because of the rollup. Updated schema version in test. Signed-off-by: Stevan Buzejic --- .../common/model/dimension/DateHistogram.kt | 3 +- .../transform/TargetIndexMappingService.kt | 96 ++++++++++++------- .../transform/TransformRunner.kt | 9 +- .../transform/TransformSearchService.kt | 17 ++-- .../transform/util/TransformContext.kt | 7 +- .../IndexManagementRestTestCase.kt | 2 +- .../TargetIndexMappingServiceTests.kt | 27 ++++++ .../transform/TransformRunnerIT.kt | 4 +- .../RestPreviewTransformActionIT.kt | 12 +-- .../transform/util/TransformContextTests.kt | 3 +- 10 files changed, 119 insertions(+), 61 deletions(-) create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt index 84f5b15c0..73de49726 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt @@ -14,7 +14,6 @@ import org.opensearch.core.xcontent.XContentParser.Token import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.index.query.AbstractQueryBuilder import org.opensearch.index.query.RangeQueryBuilder -import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder @@ -30,7 +29,7 @@ data class DateHistogram( val fixedInterval: String? = null, val calendarInterval: String? = null, val timezone: ZoneId = ZoneId.of(UTC), - val format: String? = DEFAULT_DATE_FORMAT + val format: String? = null ) : Dimension(Type.DATE_HISTOGRAM, sourceField, targetField) { init { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt index 292c194e6..0a5954a94 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -9,9 +9,16 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse import org.opensearch.client.Client +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.opensearchapi.suspendUntil @@ -21,6 +28,8 @@ import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.search.aggregations.AggregationBuilder import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets /** * Service designed for creating dynamic target index mapping based on the date field types of the source index. @@ -33,11 +42,7 @@ object TargetIndexMappingService { private const val TYPE = "type" private const val PROPERTIES = "properties" - private const val METADATA = "_meta" - private const val SCHEMA_VERSION = "schema_version" - private const val DYNAMIC_TEMPLATE = "dynamic_templates" - private const val MATCH_MAPPING_TYPE = "match_mapping_type" - private const val MAPPING = "mapping" + private val DATE_FIELD_TYPES = setOf("date", "date_nanos") fun initialize(client: Client) { this.client = client @@ -97,35 +102,58 @@ object TargetIndexMappingService { } private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) = - sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos") + sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE]) + /** + * Loads transform target index mappings from json and adds date properties mapping + * + * @param dateFieldMappings target index date fields mappings + */ fun createTargetIndexMapping(dateFieldMappings: Map): String { - /** TODO - Check if the mappings from file can be loaded into the XContentBuilder - * val dynamicMappings = IndexManagementIndices.transformTargetMappings - * val mappings = createTargetIndexMappingsAsString(dateFieldMappings, dynamicMappings) - */ - // Build static properties - val builder = XContentFactory.jsonBuilder().startObject() - .startObject(METADATA) - .field(SCHEMA_VERSION, 1) - .endObject() - .startArray(DYNAMIC_TEMPLATE) - .startObject() - .startObject("strings") - .field(MATCH_MAPPING_TYPE, "string") - .startObject(MAPPING) - .field(TYPE, "keyword") - .endObject() - .endObject() - .endObject() - .endArray() - .startObject(PROPERTIES) - - // Dynamically build composite aggregation mapping - mapCompositeAggregation(dateFieldMappings, builder) - return builder.endObject() - .endObject() - .string() + val builder = XContentFactory.jsonBuilder() + val dynamicMappings = IndexManagementIndices.transformTargetMappings + val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8)) + val bytesReference = BytesReference.fromByteBuffer(byteBuffer) + + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + XContentType.JSON + ) + loop@while (!xcp.isClosed) { + val token = xcp.currentToken() + val fieldName = xcp.currentName() + + when (token) { + XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue()) + XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text()) + XContentParser.Token.START_OBJECT -> { + if (fieldName != null) { + builder.startObject(fieldName) + } else { + builder.startObject() + } + } + XContentParser.Token.END_OBJECT -> builder.endObject() + XContentParser.Token.START_ARRAY -> builder.startArray(fieldName) + XContentParser.Token.END_ARRAY -> { + builder.endArray() + // Add target index date fields mappings only if the date field mappings are present + if (dateFieldMappings.isNotEmpty()) { + builder.startObject(PROPERTIES) + mapCompositeAggregation(dateFieldMappings, builder) + builder.endObject() + } + } + else -> { + xcp.nextToken() + continue@loop + } + } + xcp.nextToken() + } + return builder.string() } @Suppress("UNCHECKED_CAST") @@ -142,7 +170,9 @@ object TargetIndexMappingService { mapCompositeAggregation(it.value as Map, builder) builder.endObject() } else { - builder.field(it.key, it.value.toString()) + if (DATE_FIELD_TYPES.contains(it.value)) { + builder.field(it.key, it.value.toString()) + } } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 895d2e7ba..0cf658a29 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -109,11 +109,8 @@ object TransformRunner : val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) - // If date was used in term query generate target date field mapping and store it in transform context - val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) val transformContext = TransformContext( - TransformLockManager(transform, context), - targetIndexDateFieldMappings + TransformLockManager(transform, context) ) // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform @@ -136,6 +133,10 @@ object TransformRunner : currentMetadata = validatedMetadata return } + // If date was used in term query generate target date field mapping and store it in transform context + val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) + transformContext.setTargetDateFieldMappings(targetIndexDateFieldMappings) + if (transform.continuous) { // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index daa940678..6afa93637 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.transform -import org.opensearch.indexmanagement.transform.util.formatMillis import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException @@ -356,14 +355,7 @@ class TransformSearchService( val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments) aggregatedBucket.key.entries.forEach { bucket -> - // Check if the date is used as a term (exists in conversion list) - if it is, - // convert the date (which is in epoch time millis format to ISO 8601) -// document[bucket.key] = bucket.value - if (targetIndexDateFieldMappings.isNullOrEmpty() || !targetIndexDateFieldMappings.containsKey(bucket.key)) { - document[bucket.key] = bucket.value - } else { - document[bucket.key] = formatMillis(bucket.value as Long) - } + document[bucket.key] = bucket.value } aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings) @@ -392,10 +384,15 @@ class TransformSearchService( return when (aggregation) { is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> { val agg = aggregation as NumericMetricsAggregation.SingleValue + /** + * When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format + * which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis + * That's why the exponential value is transformed to long: agg.value().toLong() + */ if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) { agg.value() } else { - formatMillis(agg.value().toLong()) + agg.value().toLong() } } is Percentiles -> { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt index 85e263ead..3c7627232 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt @@ -10,9 +10,10 @@ package org.opensearch.indexmanagement.transform.util */ class TransformContext( val transformLockManager: TransformLockManager, - val targetDateFieldMappings: Map, var lastSuccessfulPageSize: Int? = null, ) { + private lateinit var targetDateFieldMappings: Map + fun getMaxRequestTimeoutInSeconds(): Long? { // Lock timeout must be greater than LOCK_BUFFER var maxRequestTimeout = transformLockManager.lockExpirationInSeconds()?.minus(LOCK_BUFFER_SECONDS) @@ -25,6 +26,10 @@ class TransformContext( fun getTargetIndexDateFieldMappings() = targetDateFieldMappings + fun setTargetDateFieldMappings(dateFieldMappings: Map) { + this.targetDateFieldMappings = dateFieldMappings + } + suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { transformLockManager.renewLockForLongSearch(timeSpentOnSearch) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index b98fbb125..02c5ca4c1 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -37,7 +37,7 @@ import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 17 + val configSchemaVersion = 18 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt new file mode 100644 index 000000000..46416b4e1 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.junit.Assert +import org.opensearch.test.OpenSearchTestCase + +class TargetIndexMappingServiceTests : OpenSearchTestCase() { + + fun `test create target index mapping fields mapped correctly`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}""" + val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date")) + val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } + + fun `test create target index mapping empty map`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}""" + val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap()) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index ec60e589d..f41e2e935 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -286,7 +286,7 @@ class TransformRunnerIT : TransformRestTestCase() { } @Suppress("UNCHECKED_CAST") - fun `test transform term aggregation on date field generate target mapping same as source mapping date field`() { + fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() { val sourceIdxTestName = "source_idx_test_14" val targetIdxTestName = "target_idx_test_14" @@ -411,7 +411,7 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) } - waitFor { + waitFor(timeout = Instant.ofEpochSecond(30)) { val transformJob = getTransform(transformId = transform.id) assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) val transformMetadata = getTransformMetadata(transformJob.metadataId!!) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt index d6aadb951..d3239edaa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.resthandler import org.junit.AfterClass import org.junit.Before import org.opensearch.client.ResponseException +import org.opensearch.common.time.DateFormatter import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.Terms @@ -20,12 +21,9 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import java.time.Instant -import java.time.LocalDate import java.time.ZoneId -import java.time.format.DateTimeFormatter import java.time.format.DateTimeParseException import java.time.temporal.ChronoUnit -import java.util.Locale @Suppress("UNCHECKED_CAST") class RestPreviewTransformActionIT : TransformRestTestCase() { @@ -111,9 +109,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Preview transform failed", RestStatus.OK, response.restStatus()) val transformedDocs = response.asMap()["documents"] as List> assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) - val dateFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT).withZone(ZoneId.of("UTC")) + val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC")) for (doc in transformedDocs) { - assertTrue(isValid(doc["tpep_pickup_datetime"] as? String, dateFormatter)) + assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter)) } } @@ -152,9 +150,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { } } - private fun isValid(dateStr: String?, dateFormatter: DateTimeFormatter): Boolean { + private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean { try { - LocalDate.parse(dateStr, dateFormatter) + dateFormatter.formatMillis(date) } catch (e: DateTimeParseException) { return false } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt index ced54b281..988033123 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt @@ -18,7 +18,8 @@ class TransformContextTests : OpenSearchTestCase() { @Throws(Exception::class) fun setup() { transformLockManager = Mockito.mock(TransformLockManager::class.java) - transformContext = TransformContext(transformLockManager, emptyMap()) + transformContext = TransformContext(transformLockManager) + transformContext.setTargetDateFieldMappings(emptyMap()) } fun `test getMaxRequestTimeoutInSeconds`() {