From 8523aebb1a8ef80e506b19a9547e62e4b7b4ef8d Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Wed, 31 May 2023 23:12:31 +0200 Subject: [PATCH] Bugfix/202 transform date add date conversion (#622) * 202: Added format property when specifying the date histogram Signed-off-by: Stevan Buzejic * 202: Added component responsible for building the target index mapping once the transform is being triggered. Signed-off-by: Stevan Buzejic * 202: date_histogram considered in the case of the creating the target index for the date fields when transform is executed Signed-off-by: Stevan Buzejic * 202: Enabled target index date field mappings if those fields are used in aggregations or as a term aggregation for defining the buckets Signed-off-by: Stevan Buzejic * Updated code according to comments. Added targetIndexMapping when transform preview action is triggered Signed-off-by: Stevan Buzejic * Updated schema versions Signed-off-by: Stevan Buzejic * Addressed the comments Signed-off-by: Stevan Buzejic * Refactored transform tests related with aggregation based on a date field. Updated transform preview action to consider target index mapping when using a date field. Kept formatting of the date field in target index. Signed-off-by: Stevan Buzejic * detekt fix Signed-off-by: Stevan Buzejic * Added zone in IT Signed-off-by: Stevan Buzejic * Added function for creating target index mapping that considers transform mapping json. Target date field mappings are generated after transform validation when running transform. Removed target index date field values formatting. emoved default format for date_histogram because of the rollup. Updated schema version in test. Signed-off-by: Stevan Buzejic --------- Signed-off-by: Stevan Buzejic (cherry picked from commit 42833b12b292a291f56bc4f94264d32f33317bf2) --- .../indexmanagement/IndexManagementPlugin.kt | 3 + .../common/model/dimension/DateHistogram.kt | 13 +- .../transform/TargetIndexMappingService.kt | 250 +++++++++++++++ .../transform/TransformIndexer.kt | 20 +- .../transform/TransformRunner.kt | 14 +- .../transform/TransformSearchService.kt | 31 +- .../TransportPreviewTransformAction.kt | 68 ++-- .../transform/util/TransformContext.kt | 10 +- .../transform/util/TransformUtils.kt | 18 ++ .../mappings/opendistro-ism-config.json | 5 +- .../IndexManagementRestTestCase.kt | 2 +- .../TargetIndexMappingServiceTests.kt | 27 ++ .../transform/TransformRunnerIT.kt | 298 ++++++++++++++++++ .../RestPreviewTransformActionIT.kt | 56 ++++ .../transform/util/TransformContextTests.kt | 1 + .../cached-opendistro-ism-config.json | 5 +- 16 files changed, 772 insertions(+), 49 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index f3e03226d..a7b34f727 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -150,6 +150,7 @@ import org.opensearch.indexmanagement.spi.IndexManagementExtension import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformRunner import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction import org.opensearch.indexmanagement.transform.action.delete.TransportDeleteTransformsAction @@ -470,6 +471,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin indexNameExpressionResolver, ) + TargetIndexMappingService.initialize(client) + return listOf( managedIndexRunner, rollupRunner, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt index 2c9dda200..73de49726 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/common/model/dimension/DateHistogram.kt @@ -28,7 +28,8 @@ data class DateHistogram( override val targetField: String = sourceField, val fixedInterval: String? = null, val calendarInterval: String? = null, - val timezone: ZoneId = ZoneId.of(UTC) + val timezone: ZoneId = ZoneId.of(UTC), + val format: String? = null ) : Dimension(Type.DATE_HISTOGRAM, sourceField, targetField) { init { @@ -54,6 +55,7 @@ data class DateHistogram( return builder.field(DIMENSION_SOURCE_FIELD_FIELD, sourceField) .field(DIMENSION_TARGET_FIELD_FIELD, targetField) .field(DATE_HISTOGRAM_TIMEZONE_FIELD, timezone.id) + .field(FORMAT, format) .endObject() .endObject() } @@ -82,6 +84,9 @@ data class DateHistogram( fixedInterval?.let { this.fixedInterval(DateHistogramInterval(it)) } + format?.let { + this.format(it) + } } } @@ -128,6 +133,7 @@ data class DateHistogram( const val FIXED_INTERVAL_FIELD = "fixed_interval" const val CALENDAR_INTERVAL_FIELD = "calendar_interval" const val DATE_HISTOGRAM_TIMEZONE_FIELD = "timezone" + const val FORMAT = "format" @Suppress("ComplexMethod", "LongMethod") @JvmStatic @@ -138,6 +144,7 @@ data class DateHistogram( var fixedInterval: String? = null var calendarInterval: String? = null var timezone = ZoneId.of(UTC) + var format: String? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -150,6 +157,7 @@ data class DateHistogram( DATE_HISTOGRAM_TIMEZONE_FIELD -> timezone = ZoneId.of(xcp.text()) DIMENSION_SOURCE_FIELD_FIELD -> sourceField = xcp.text() DIMENSION_TARGET_FIELD_FIELD -> targetField = xcp.text() + FORMAT -> format = xcp.textOrNull() else -> throw IllegalArgumentException("Invalid field [$fieldName] found in date histogram") } } @@ -159,7 +167,8 @@ data class DateHistogram( targetField = requireNotNull(targetField) { "Target field must not be null" }, fixedInterval = fixedInterval, calendarInterval = calendarInterval, - timezone = timezone + timezone = timezone, + format = format ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt new file mode 100644 index 000000000..0a5954a94 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -0,0 +1,250 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse +import org.opensearch.client.Client +import org.opensearch.common.bytes.BytesReference +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.IndexNotFoundException +import org.opensearch.indexmanagement.IndexManagementIndices +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.opensearchapi.string +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT +import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + +/** + * Service designed for creating dynamic target index mapping based on the date field types of the source index. + * Creates target index date properties based on the date properties of the source index + * (ie. if the term grouping is applied on a date field of source index, target index field will have date type also) + */ +object TargetIndexMappingService { + private val logger = LogManager.getLogger(javaClass) + private lateinit var client: Client + + private const val TYPE = "type" + private const val PROPERTIES = "properties" + private val DATE_FIELD_TYPES = setOf("date", "date_nanos") + + fun initialize(client: Client) { + this.client = client + } + + /** + * + * Check if the source index contains date fields and returns target index mapping for date fields by using default date format + * Example: + * input map: [tpep_pickup_datetime, [type: date]] + * target index mapping: "tpep_pickup_datetime": { + * "type": "date", + * "format": "strict_date_optional_time||epoch_millis" + * } + * @return map of the date properties + * + */ + suspend fun getTargetMappingsForDates(transform: Transform): Map { + val sourceIndex = transform.sourceIndex + try { + val result: GetMappingsResponse = client.admin().indices().suspendUntil { + getMappings(GetMappingsRequest().indices(sourceIndex), it) + } ?: error("GetMappingResponse for [$transform.sourceIndex] was null") + + val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap + + val targetIndexDateFieldMappings = mutableMapOf() + if (!sourceIndexMapping.isNullOrEmpty()) { + mapDateTermAggregation(transform, sourceIndexMapping, targetIndexDateFieldMappings) + mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, targetIndexDateFieldMappings, null) + } + return targetIndexDateFieldMappings + } catch (ex: IndexNotFoundException) { + logger.error("Index $sourceIndex doesn't exist") + return emptyMap() + } + } + + private fun mapDateTermAggregation( + transform: Transform, + sourceIndexMapping: MutableMap, + dateFieldMappings: MutableMap, + ) { + transform.groups.forEach { dimension -> + if (!isFieldInMappings(dimension.sourceField, sourceIndexMapping)) { + throw TransformIndexException("Missing field ${dimension.sourceField} in source index") + } + val sourceFieldType = IndexUtils.getFieldFromMappings(dimension.sourceField, sourceIndexMapping) + // Consider only date fields as relevant for building the target index mapping + // Excluding date histogram since user can define format in it + if (dimension !is DateHistogram && isSourceFieldDate(sourceFieldType)) { + // Taking the source field settings (type, format etc.) + val dateTypeTargetMapping = mapOf("type" to sourceFieldType!![TYPE], "format" to DEFAULT_DATE_FORMAT) + dateFieldMappings[dimension.targetField] = dateTypeTargetMapping + } + } + } + + private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) = + sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE]) + + /** + * Loads transform target index mappings from json and adds date properties mapping + * + * @param dateFieldMappings target index date fields mappings + */ + fun createTargetIndexMapping(dateFieldMappings: Map): String { + val builder = XContentFactory.jsonBuilder() + val dynamicMappings = IndexManagementIndices.transformTargetMappings + val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8)) + val bytesReference = BytesReference.fromByteBuffer(byteBuffer) + + val xcp = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + bytesReference, + XContentType.JSON + ) + loop@while (!xcp.isClosed) { + val token = xcp.currentToken() + val fieldName = xcp.currentName() + + when (token) { + XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue()) + XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text()) + XContentParser.Token.START_OBJECT -> { + if (fieldName != null) { + builder.startObject(fieldName) + } else { + builder.startObject() + } + } + XContentParser.Token.END_OBJECT -> builder.endObject() + XContentParser.Token.START_ARRAY -> builder.startArray(fieldName) + XContentParser.Token.END_ARRAY -> { + builder.endArray() + // Add target index date fields mappings only if the date field mappings are present + if (dateFieldMappings.isNotEmpty()) { + builder.startObject(PROPERTIES) + mapCompositeAggregation(dateFieldMappings, builder) + builder.endObject() + } + } + else -> { + xcp.nextToken() + continue@loop + } + } + xcp.nextToken() + } + return builder.string() + } + + @Suppress("UNCHECKED_CAST") + private fun mapCompositeAggregation( + compositeAggregation: Map, + builder: XContentBuilder, + ) { + val iterator = compositeAggregation.entries.iterator() + while (iterator.hasNext()) { + val it = iterator.next() + if (it.value is Map<*, *>) { + builder.startObject(it.key) + // Start object until reaching the "leaf"; leaf is the last key value pair, where value is not a map + mapCompositeAggregation(it.value as Map, builder) + builder.endObject() + } else { + if (DATE_FIELD_TYPES.contains(it.value)) { + builder.field(it.key, it.value.toString()) + } + } + } + } + + /** + * Creates properties section in target index mappings based on the given date fields + * Parses target index mapping as a string - instead of using XContentBuilder + */ + @Suppress("UNUSED_PARAMETER") + private fun createTargetIndexMappingsAsString( + dateFieldMappings: Map, + dynamicMappings: String, + ): String { + val compositeAgg = mapCompositeAggregationToString(dateFieldMappings) + return dynamicMappings.trimIndent().dropLast(1) + ", \n \"properties\" : \n { \n $compositeAgg \n } \n }" + } + + @Suppress("UNCHECKED_CAST") + private fun mapCompositeAggregationToString( + compositeAggregation: Map, + ): String { + return buildString { + var isFirst = true + val iterator = compositeAggregation.entries.iterator() + while (iterator.hasNext()) { + val it = iterator.next() + if (!isFirst) { + append(",") + } + isFirst = false + if (it.value is Map<*, *>) { + append("\"${it.key}\" : {") + append(mapCompositeAggregationToString(it.value as Map)) + append("\n }") + } else { + append("\n") + append("\"${it.key}\" : \"${it.value}\"") + } + } + } + } + + private fun mapDateAggregation( + aggBuilders: Collection, + sourceIndexMapping: Map, + targetIndexMapping: MutableMap, + parentPath: String?, + ) { + val iterator = aggBuilders.iterator() + while (iterator.hasNext()) { + + val aggBuilder = iterator.next() + val targetIdxFieldName = aggBuilder.name + val fullPath = parentPath?.plus(".")?.plus(targetIdxFieldName) ?: targetIdxFieldName + // In the case of a date field used in aggregation - MIN, MAX or COUNT + if (aggBuilder is ValuesSourceAggregationBuilder<*>) { + val sourceIdxFieldName = aggBuilder.field() + + val sourceFieldType = IndexUtils.getFieldFromMappings(sourceIdxFieldName, sourceIndexMapping) + // Consider only aggregations on date field type + if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") { + val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT) + // In the case if sub-aggregation exist + targetIndexMapping[fullPath] = dateTypeTargetMapping + } + } + if (aggBuilder.subAggregations.isNullOrEmpty()) { + continue + } + // Do the same for all sub-aggregations + mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, fullPath) + } + } + private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 568be5fde..8f5805f0f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -19,11 +19,11 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings -import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException import org.opensearch.indexmanagement.transform.settings.TransformSettings +import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.rest.RestStatus import org.opensearch.transport.RemoteTransportException @@ -36,7 +36,8 @@ class TransformIndexer( private val logger = LogManager.getLogger(javaClass) - @Volatile private var backoffPolicy = BackoffPolicy.constantBackoff( + @Volatile + private var backoffPolicy = BackoffPolicy.constantBackoff( TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS.get(settings), TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT.get(settings) ) @@ -51,21 +52,21 @@ class TransformIndexer( } } - private suspend fun createTargetIndex(index: String) { - if (!clusterService.state().routingTable.hasIndex(index)) { - val request = CreateIndexRequest(index) - .mapping(IndexManagementIndices.transformTargetMappings) + private suspend fun createTargetIndex(targetIndex: String, targetFieldMappings: Map) { + if (!clusterService.state().routingTable.hasIndex(targetIndex)) { + val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings) + val request = CreateIndexRequest(targetIndex).mapping(transformTargetIndexMapping) // TODO: Read in the actual mappings from the source index and use that val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } if (!response.isAcknowledged) { - logger.error("Failed to create the target index $index") + logger.error("Failed to create the target index $targetIndex") throw TransformIndexException("Failed to create the target index") } } } @Suppress("ThrowsCount", "RethrowCaughtException") - suspend fun index(docsToIndex: List>): Long { + suspend fun index(transformTargetIndex: String, docsToIndex: List>, transformContext: TransformContext): Long { var updatableDocsToIndex = docsToIndex var indexTimeInMillis = 0L val nonRetryableFailures = mutableListOf() @@ -73,7 +74,8 @@ class TransformIndexer( if (updatableDocsToIndex.isNotEmpty()) { val targetIndex = updatableDocsToIndex.first().index() logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex") - createTargetIndex(targetIndex) + + createTargetIndex(transformTargetIndex, transformContext.getTargetIndexDateFieldMappings()) backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { val bulkRequest = BulkRequest().add(updatableDocsToIndex) val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index abf94a161..0cf658a29 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -72,6 +72,7 @@ object TransformRunner : this.transformMetadataService = TransformMetadataService(client, xContentRegistry) this.transformIndexer = TransformIndexer(settings, clusterService, client) this.transformValidator = TransformValidator(indexNameExpressionResolver, clusterService, client, settings, jvmService) + this.threadPool = threadPool return this } @@ -108,7 +109,10 @@ object TransformRunner : val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) - val transformContext = TransformContext(TransformLockManager(transform, context)) + val transformContext = TransformContext( + TransformLockManager(transform, context) + ) + // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform val transformLockManager = transformContext.transformLockManager transformLockManager.acquireLockForScheduledJob() @@ -129,6 +133,10 @@ object TransformRunner : currentMetadata = validatedMetadata return } + // If date was used in term query generate target date field mapping and store it in transform context + val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) + transformContext.setTargetDateFieldMappings(targetIndexDateFieldMappings) + if (transform.continuous) { // If we have not populated the list of shards to search, do so now if (bucketsToTransform.shardsToSearch == null) { @@ -271,7 +279,7 @@ object TransformRunner : ) } val indexTimeInMillis = withTransformSecurityContext(transform) { - transformIndexer.index(transformSearchResult.docsToIndex) + transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) } val afterKey = transformSearchResult.afterKey val stats = transformSearchResult.stats @@ -298,7 +306,7 @@ object TransformRunner : transformSearchService.executeCompositeSearch(transform, null, modifiedBuckets, transformContext) } val indexTimeInMillis = withTransformSecurityContext(transform) { - transformIndexer.index(transformSearchResult.docsToIndex) + transformIndexer.index(transform.targetIndex, transformSearchResult.docsToIndex, transformContext) } val stats = transformSearchResult.stats val updatedStats = stats.copy( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 0330aa952..6afa93637 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -206,7 +206,12 @@ class TransformSearchService( // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) - return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets) + return convertResponse( + transform, + searchResponse, + modifiedBuckets = modifiedBuckets, + targetIndexDateFieldMappings = transformContext.getTargetIndexDateFieldMappings() + ) } catch (e: TransformSearchServiceException) { throw e } catch (e: RemoteTransportException) { @@ -333,7 +338,8 @@ class TransformSearchService( transform: Transform, searchResponse: SearchResponse, waterMarkDocuments: Boolean = true, - modifiedBuckets: MutableSet>? = null + modifiedBuckets: MutableSet>? = null, + targetIndexDateFieldMappings: Map, ): TransformSearchResult { val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets @@ -348,8 +354,12 @@ class TransformSearchService( val hashedId = hashToFixedSize(id) val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments) - aggregatedBucket.key.entries.forEach { bucket -> document[bucket.key] = bucket.value } - aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation) } + aggregatedBucket.key.entries.forEach { bucket -> + document[bucket.key] = bucket.value + } + aggregatedBucket.aggregations.forEach { aggregation -> + document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings) + } val indexRequest = IndexRequest(transform.targetIndex) .id(hashedId) @@ -370,11 +380,20 @@ class TransformSearchService( return BucketSearchResult(modifiedBuckets, aggs.afterKey(), searchResponse.took.millis) } - private fun getAggregationValue(aggregation: Aggregation): Any { + private fun getAggregationValue(aggregation: Aggregation, targetIndexDateFieldMappings: Map): Any { return when (aggregation) { is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> { val agg = aggregation as NumericMetricsAggregation.SingleValue - agg.value() + /** + * When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format + * which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis + * That's why the exponential value is transformed to long: agg.value().toLong() + */ + if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) { + agg.value() + } else { + agg.value().toLong() + } } is Percentiles -> { val percentiles = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt index ccd4e0518..aef1859bd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement.transform.action.preview +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException @@ -21,10 +24,16 @@ import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.commons.ConfigConstants +import org.opensearch.indexmanagement.opensearchapi.IndexManagementSecurityContext +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.opensearchapi.withClosableContext +import org.opensearch.indexmanagement.transform.TargetIndexMappingService import org.opensearch.indexmanagement.transform.TransformSearchService import org.opensearch.indexmanagement.transform.TransformValidator import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -32,6 +41,7 @@ import org.opensearch.transport.TransportService class TransportPreviewTransformAction @Inject constructor( transportService: TransportService, actionFilters: ActionFilters, + val settings: Settings, private val client: Client, private val clusterService: ClusterService, private val indexNameExpressionResolver: IndexNameExpressionResolver @@ -69,7 +79,15 @@ class TransportPreviewTransformAction @Inject constructor( return } val searchRequest = TransformSearchService.getSearchServiceRequest(transform = transform, pageSize = 10) - executeSearch(searchRequest, transform, listener) + val user = SecurityUtils.buildUser(client.threadPool().threadContext) + + CoroutineScope(Dispatchers.IO).launch { + withClosableContext( + IndexManagementSecurityContext("PreviewTransformHandler", settings, client.threadPool().threadContext, user) + ) { + executeSearch(searchRequest, transform, listener) + } + } } override fun onFailure(e: Exception) { @@ -87,31 +105,31 @@ class TransportPreviewTransformAction @Inject constructor( return issues } + suspend fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener) { + val response = try { + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + searchResponse + } catch (e: Exception) { + listener.onFailure(e) + return + } - fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener) { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - try { - val transformSearchResult = TransformSearchService.convertResponse( - transform = transform, searchResponse = response, waterMarkDocuments = false - ) - val formattedResult = transformSearchResult.docsToIndex.map { - it.sourceAsMap() - } - listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK)) - } catch (e: Exception) { - listener.onFailure( - OpenSearchStatusException( - "Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e) - ) - ) - } - } - - override fun onFailure(e: Exception) = listener.onFailure(e) + try { + val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform) + val transformSearchResult = TransformSearchService.convertResponse( + transform = transform, searchResponse = response, waterMarkDocuments = false, + targetIndexDateFieldMappings = targetIndexDateFieldMappings + ) + val formattedResult = transformSearchResult.docsToIndex.map { + it.sourceAsMap() } - ) + listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK)) + } catch (e: Exception) { + listener.onFailure( + OpenSearchStatusException( + "Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e) + ) + ) + } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt index 8c674724e..3c7627232 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt @@ -10,8 +10,10 @@ package org.opensearch.indexmanagement.transform.util */ class TransformContext( val transformLockManager: TransformLockManager, - var lastSuccessfulPageSize: Int? = null + var lastSuccessfulPageSize: Int? = null, ) { + private lateinit var targetDateFieldMappings: Map + fun getMaxRequestTimeoutInSeconds(): Long? { // Lock timeout must be greater than LOCK_BUFFER var maxRequestTimeout = transformLockManager.lockExpirationInSeconds()?.minus(LOCK_BUFFER_SECONDS) @@ -22,6 +24,12 @@ class TransformContext( return maxRequestTimeout } + fun getTargetIndexDateFieldMappings() = targetDateFieldMappings + + fun setTargetDateFieldMappings(dateFieldMappings: Map) { + this.targetDateFieldMappings = dateFieldMappings + } + suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { transformLockManager.renewLockForLongSearch(timeSpentOnSearch) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt new file mode 100644 index 000000000..17f7577b9 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.util + +import org.opensearch.common.time.DateFormatter +import java.time.ZoneId + +const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis" +private const val DATE_FORMAT = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ" +private val timezone: ZoneId = ZoneId.of("UTC") +private val dateFormatter = DateFormatter.forPattern(DATE_FORMAT).withZone(timezone) + +fun formatMillis( + dateTimeInMillis: Long, +): String = dateFormatter.formatMillis(dateTimeInMillis) diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 92084c3ad..e82a1937a 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1197,6 +1197,9 @@ }, "timezone": { "type": "keyword" + }, + "format": { + "type": "keyword" } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 4162f9964..cc1691428 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -37,7 +37,7 @@ import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 17 + val configSchemaVersion = 18 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt new file mode 100644 index 000000000..46416b4e1 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingServiceTests.kt @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform + +import org.junit.Assert +import org.opensearch.test.OpenSearchTestCase + +class TargetIndexMappingServiceTests : OpenSearchTestCase() { + + fun `test create target index mapping fields mapped correctly`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}""" + val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date")) + val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } + + fun `test create target index mapping empty map`() { + val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}""" + val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap()) + Assert.assertNotNull(result) + assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 73cca0ef1..9ba16a52c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity import org.opensearch.client.Request import org.opensearch.client.RequestOptions import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.TermQueryBuilder import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Histogram @@ -284,6 +285,303 @@ class TransformRunnerIT : TransformRestTestCase() { } } + @Suppress("UNCHECKED_CAST") + fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() { + val sourceIdxTestName = "source_idx_test_14" + val targetIdxTestName = "target_idx_test_14" + + val pickupDateTime = "tpep_pickup_datetime" + + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val transform = Transform( + id = "id_14", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + + assertEquals(sourcePickupDate, targetPickupDate) + + val pickupDateTimeTerm = "pickupDateTerm14" + + val request = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "avgFareAmount": { "avg": { "field": "$fareAmount" } } } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + // Verify the values of keys and metrics in all buckets + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform max aggregation on date field verify search request term aggregation on store_and_fwd_flag field`() { + val sourceIdxTestName = "source_idx_test_15" + val targetIdxTestName = "target_idx_test_15" + + val storeAndForward = "store_and_fwd_flag" + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val maxDateAggBuilder = AggregationBuilders.max(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_15", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = storeAndForward, targetField = storeAndForward) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(maxDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(timeout = Instant.ofEpochSecond(30)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + waitFor(Instant.ofEpochSecond(30)) { + val storeAndForwardTerm = "storeAndForwardTerm" + val request = """ + { + "size": 0, + "aggs": { + "$storeAndForwardTerm": { + "terms": { + "field": "$storeAndForward", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"max": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Avg Fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Max pickup date times are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) + } + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform term on date field and aggregation on date field`() { + val sourceIdxTestName = "source_idx_test_16" + val targetIdxTestName = "target_idx_test_16" + + val pickupDateTime = "tpep_pickup_datetime" + val pickupDateTimeTerm = pickupDateTime.plus("_term") + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val countDateAggBuilder = AggregationBuilders.count(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_16", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTimeTerm) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(countDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor(Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourceProperties = ((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map) + val targetProperties = ((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map) + + val sourcePickupDate = (sourceProperties [pickupDateTime] as Map)["type"] + val targetPickupDateTerm = (targetProperties [pickupDateTimeTerm] as Map)["type"] + + assertEquals("date", targetPickupDateTerm) + assertEquals(sourcePickupDate, targetPickupDateTerm) + + val targetPickupDate = (targetProperties [pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + val sourceRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + val targetRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTimeTerm", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """ + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(sourceRequest, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(targetRequest, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Count pickup dates are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) + } + } + fun `test transform with failure during indexing`() { validateSourceIndex("transform-source-index") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt index 30fa45c1f..d3239edaa 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/resthandler/RestPreviewTransformActionIT.kt @@ -8,15 +8,22 @@ package org.opensearch.indexmanagement.transform.resthandler import org.junit.AfterClass import org.junit.Before import org.opensearch.client.ResponseException +import org.opensearch.common.time.DateFormatter import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.transform.TransformRestTestCase +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.randomTransform +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestStatus import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories +import java.time.Instant +import java.time.ZoneId +import java.time.format.DateTimeParseException +import java.time.temporal.ChronoUnit @Suppress("UNCHECKED_CAST") class RestPreviewTransformActionIT : TransformRestTestCase() { @@ -68,6 +75,46 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) } + fun `test preview with term aggregation on date field`() { + val targetIdxTestName = "target_idx_test_14" + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + val transform = Transform( + id = "id_14", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIndex, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) + ).let { createTransform(it, it.id) } + + val response = client().makeRequest( + "POST", + "$TRANSFORM_BASE_URI/_preview", + emptyMap(), + transform.toHttpEntity() + ) + val expectedKeys = setOf("fare_amount", "tpep_pickup_datetime", "transform._doc_count", "_doc_count") + assertEquals("Preview transform failed", RestStatus.OK, response.restStatus()) + val transformedDocs = response.asMap()["documents"] as List> + assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys) + val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC")) + for (doc in transformedDocs) { + assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter)) + } + } + fun `test mismatched columns`() { val factories = AggregatorFactories.builder() .addAggregator(AggregationBuilders.sum("revenue").field("total_amountdzdfd")) @@ -102,4 +149,13 @@ class RestPreviewTransformActionIT : TransformRestTestCase() { assertEquals("Unexpected failure code", RestStatus.NOT_FOUND, e.response.restStatus()) } } + + private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean { + try { + dateFormatter.formatMillis(date) + } catch (e: DateTimeParseException) { + return false + } + return true + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt index 9a7b050ff..988033123 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt @@ -19,6 +19,7 @@ class TransformContextTests : OpenSearchTestCase() { fun setup() { transformLockManager = Mockito.mock(TransformLockManager::class.java) transformContext = TransformContext(transformLockManager) + transformContext.setTargetDateFieldMappings(emptyMap()) } fun `test getMaxRequestTimeoutInSeconds`() { diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 92084c3ad..e82a1937a 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1197,6 +1197,9 @@ }, "timezone": { "type": "keyword" + }, + "format": { + "type": "keyword" } } },