diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt index 2eec5f041..2820d96a9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TargetIndexMappingService.kt @@ -5,18 +5,21 @@ package org.opensearch.indexmanagement.transform +import org.apache.logging.log4j.LogManager import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse import org.opensearch.client.Client import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory -import org.opensearch.indexmanagement.IndexManagementIndices +import org.opensearch.index.IndexNotFoundException import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.search.aggregations.AggregationBuilder +import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder /** * Service designed for creating dynamic target index mapping based on the date field types of the source index. @@ -24,6 +27,7 @@ import org.opensearch.indexmanagement.util.IndexUtils * (ie. if the term grouping is applied on a date field of source index, target index field will have date type also) */ class TargetIndexMappingService(private val client: Client) { + private val logger = LogManager.getLogger(javaClass) companion object { private const val TYPE = "type" private const val PROPERTIES = "properties" @@ -32,22 +36,34 @@ class TargetIndexMappingService(private val client: Client) { private const val DYNAMIC_TEMPLATE = "dynamic_templates" private const val MATCH_MAPPING_TYPE = "match_mapping_type" private const val MAPPING = "mapping" + private const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis" } + suspend fun getTargetMappingsForDates(transform: Transform): Map { + val sourceIndex = transform.sourceIndex + try { + val result: GetMappingsResponse = client.admin().indices().suspendUntil { + getMappings(GetMappingsRequest().indices(sourceIndex), it) + } ?: error("GetMappingResponse for [$transform.sourceIndex] was null") - suspend fun buildTargetIndexMapping(transform: Transform): Pair?> { - val request = GetMappingsRequest().indices(transform.sourceIndex) + val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap - val result: GetMappingsResponse = client.admin().indices().suspendUntil { getMappings(request, it) } - ?: error("GetMappingResponse for [$transform.sourceIndex] was null") - - if (result.mappings[transform.sourceIndex] == null) { - return Pair(IndexManagementIndices.transformTargetMappings, null) + val dateFieldMappings = mutableMapOf() + if (!sourceIndexMapping.isNullOrEmpty()) { + mapDateTermAggregation(transform, sourceIndexMapping, dateFieldMappings) + mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, dateFieldMappings, null) + } + return dateFieldMappings + } catch (ex: IndexNotFoundException) { + logger.error("Index $sourceIndex doesn't exist") + return emptyMap() } + } - val sourceIndexMapping = result.mappings[transform.sourceIndex].sourceAsMap - - val dateCompositeAggregations = mutableMapOf() - val mappedDateFields = mutableSetOf() + private fun mapDateTermAggregation( + transform: Transform, + sourceIndexMapping: MutableMap, + dateFieldMappings: MutableMap, + ) { transform.groups.forEach { dimension -> if (!isFieldInMappings(dimension.sourceField, sourceIndexMapping)) { throw TransformIndexException("Missing field ${dimension.sourceField} in source index") @@ -56,18 +72,13 @@ class TargetIndexMappingService(private val client: Client) { // Consider only date fields as relevant for building the target index mapping if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && sourceFieldType[TYPE] == "date") { // Taking the source field settings (type, format etc.) - // 1213213213213 - val dateTypeTargetMapping = mapOf("type" to "date", "format" to "strict_date_optional_time||epoch_millis") - // sourceIndex.dateField.format[0] --> - dateCompositeAggregations[dimension.targetField] = dateTypeTargetMapping - mappedDateFields.add(dimension.targetField) + val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT) + dateFieldMappings[dimension.targetField] = dateTypeTargetMapping } } - - return Pair(mapTargetIndex(dateCompositeAggregations), mappedDateFields) } - private fun mapTargetIndex(dateCompositeAggregations: MutableMap): String { + fun createTargetIndexMapping(dateCompositeAggregations: Map): String { // Build static properties val builder = XContentFactory.jsonBuilder().startObject() .startObject(METADATA) @@ -113,5 +124,37 @@ class TargetIndexMappingService(private val client: Client) { } } + @Suppress("UNCHECKED_CAST") + private fun mapDateAggregation( + aggBuilders: Collection, + sourceIndexMapping: Map, + targetIndexMapping: MutableMap, + parentPath: String? + ) { + val iterator = aggBuilders.iterator() + while (iterator.hasNext()) { + + val aggBuilder = iterator.next() + val targetIdxFieldName = aggBuilder.name + // In the case of a date field used in aggregation - MIN, MAX or COUNT + if (aggBuilder is ValuesSourceAggregationBuilder<*>) { + val sourceIdxFieldName = aggBuilder.field() + + val sourceFieldType = IndexUtils.getFieldFromMappings(sourceIdxFieldName, sourceIndexMapping) + // Consider only aggregations on date field type + if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") { + val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT) + // In the case if sub-aggregation exist + val fullPath = parentPath?.plus(".")?.plus(targetIndexMapping) ?: targetIdxFieldName + targetIndexMapping[fullPath] = dateTypeTargetMapping + } + } + if (aggBuilder.subAggregations.isNullOrEmpty()) { + continue + } + // Do the same for all sub-aggregations + mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, targetIdxFieldName) + } + } private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 391e86336..60881d4d1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -5,7 +5,6 @@ package org.opensearch.indexmanagement.transform -import formatMillis import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException @@ -55,13 +54,10 @@ class TransformIndexer( } } - private suspend fun createTargetIndex(transform: Transform): Set? { + private suspend fun createTargetIndex(transform: Transform, targetFieldMappings: Map) { val index = transform.targetIndex - var transformDateMappedFields: Set? = null - if (!clusterService.state().routingTable.hasIndex(index)) { - val transformMappings = targetIndexMappingService.buildTargetIndexMapping(transform) - val transformTargetIndexMapping = transformMappings.first + val transformTargetIndexMapping = targetIndexMappingService.createTargetIndexMapping(targetFieldMappings) val request = CreateIndexRequest(index).mapping(transformTargetIndexMapping) // TODO: Read in the actual mappings from the source index and use that val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) } @@ -69,9 +65,7 @@ class TransformIndexer( logger.error("Failed to create the target index $index") throw TransformIndexException("Failed to create the target index") } - transformDateMappedFields = transformMappings.second } - return transformDateMappedFields } @Suppress("ThrowsCount", "RethrowCaughtException") @@ -84,11 +78,7 @@ class TransformIndexer( val targetIndex = updatableDocsToIndex.first().index() logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex") - val dateMappedFields = createTargetIndex(transform) - dateMappedFields?.let { transformContext.updateMappedDateFields(it) } - - updateTargetDateFieldValues(updatableDocsToIndex, transformContext.getMappedTargetDateFields()) - + createTargetIndex(transform, transformContext.getMappedTargetDateFields()) backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { val bulkRequest = BulkRequest().add(updatableDocsToIndex) val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) } @@ -125,21 +115,4 @@ class TransformIndexer( throw TransformIndexException("Failed to index the documents", e) } } - - private fun updateTargetDateFieldValues(updatableDocsToIndex: List>, mappedTargetDateFields: Set?) { - if (mappedTargetDateFields.isNullOrEmpty()) { - return - } - - for (docToBeWritten in updatableDocsToIndex) { - val targetValueMap = (docToBeWritten as IndexRequest).sourceAsMap() - for (mappedDateField in mappedTargetDateFields) { - if (targetValueMap[mappedDateField] == null) { - throw TransformIndexException("Missing field $mappedDateField in target index") - } - targetValueMap[mappedDateField] = formatMillis(targetValueMap, mappedDateField) - docToBeWritten.source(targetValueMap) - } - } - } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt index 9aa02fc69..49492f452 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformRunner.kt @@ -53,6 +53,7 @@ object TransformRunner : private lateinit var transformSearchService: TransformSearchService private lateinit var transformIndexer: TransformIndexer private lateinit var transformValidator: TransformValidator + private lateinit var targetIndexMappingService: TargetIndexMappingService private lateinit var threadPool: ThreadPool fun initialize( @@ -66,12 +67,14 @@ object TransformRunner : ): TransformRunner { this.clusterService = clusterService this.client = client + this.targetIndexMappingService = TargetIndexMappingService(client) this.xContentRegistry = xContentRegistry this.settings = settings this.transformSearchService = TransformSearchService(settings, clusterService, client) this.transformMetadataService = TransformMetadataService(client, xContentRegistry) - this.transformIndexer = TransformIndexer(settings, clusterService, client, TargetIndexMappingService(client)) + this.transformIndexer = TransformIndexer(settings, clusterService, client, targetIndexMappingService) this.transformValidator = TransformValidator(indexNameExpressionResolver, clusterService, client, settings, jvmService) + this.threadPool = threadPool return this } @@ -108,7 +111,13 @@ object TransformRunner : val transformProcessedBucketLog = TransformProcessedBucketLog() var bucketsToTransform = BucketsToTransform(HashSet(), metadata) - val transformContext = TransformContext(TransformLockManager(transform, context)) + // If date was used in term query generate target date field mapping and store it in transform context + val targetDateFieldMapping = targetIndexMappingService.getTargetMappingsForDates(transform) + val transformContext = TransformContext( + TransformLockManager(transform, context), + targetDateFieldMapping + ) + // Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform val transformLockManager = transformContext.transformLockManager transformLockManager.acquireLockForScheduledJob() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index ae659623f..b13c8849a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -5,6 +5,7 @@ package org.opensearch.indexmanagement.transform +import formatMillis import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException @@ -31,7 +32,6 @@ import org.opensearch.index.query.RangeQueryBuilder import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.index.shard.ShardId import org.opensearch.indexmanagement.common.model.dimension.Dimension -import org.opensearch.indexmanagement.opensearchapi.convertToMap import org.opensearch.indexmanagement.opensearchapi.retry import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.transform.exceptions.TransformSearchServiceException @@ -207,7 +207,12 @@ class TransformSearchService( // If the request was successful, update page size transformContext.lastSuccessfulPageSize = pageSize transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart) - return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets) + return convertResponse( + transform, + searchResponse, + modifiedBuckets = modifiedBuckets, + mappedTargetDateFields = transformContext.getMappedTargetDateFields() + ) } catch (e: TransformSearchServiceException) { throw e } catch (e: RemoteTransportException) { @@ -334,7 +339,8 @@ class TransformSearchService( transform: Transform, searchResponse: SearchResponse, waterMarkDocuments: Boolean = true, - modifiedBuckets: MutableSet>? = null + modifiedBuckets: MutableSet>? = null, + mappedTargetDateFields: Map, ): TransformSearchResult { val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets @@ -349,8 +355,18 @@ class TransformSearchService( val hashedId = hashToFixedSize(id) val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments) - aggregatedBucket.key.entries.forEach { bucket -> document[bucket.key] = bucket.value } - aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation) } + aggregatedBucket.key.entries.forEach { bucket -> + // Check if the date is used as a term (exists in conversion list) - if it is, + // convert the date (which is in epoch time millis format to ISO 8601) + if (mappedTargetDateFields.isNullOrEmpty() || !mappedTargetDateFields.containsKey(bucket.key)) { + document[bucket.key] = bucket.value + } else { + document[bucket.key] = formatMillis(bucket.value as Long) + } + } + aggregatedBucket.aggregations.forEach { aggregation -> + document[aggregation.name] = getAggregationValue(aggregation, mappedTargetDateFields) + } val indexRequest = IndexRequest(transform.targetIndex) .id(hashedId) @@ -371,16 +387,14 @@ class TransformSearchService( return BucketSearchResult(modifiedBuckets, aggs.afterKey(), searchResponse.took.millis) } - private fun getAggregationValue(aggregation: Aggregation): Any { + private fun getAggregationValue(aggregation: Aggregation, mappedTargetDateFields: Map): Any { return when (aggregation) { is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> { val agg = aggregation as NumericMetricsAggregation.SingleValue - // If value and value_as_string differs (ie. for date fields), transform should be aware of - if (agg.value().toString() == agg.valueAsString) { + if (mappedTargetDateFields.isEmpty() || !mappedTargetDateFields.containsKey(agg.name)) { agg.value() } else { - // example of agg - {"min_timestamp":{"value":1.662856742912E12,"value_as_string":"2022-09-11T00:39:02.912Z"}} - agg.convertToMap()[agg.name] ?: agg.value() + formatMillis(agg.value().toLong()) } } is Percentiles -> { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt index ccd4e0518..e99c53587 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/action/preview/TransportPreviewTransformAction.kt @@ -40,7 +40,6 @@ class TransportPreviewTransformAction @Inject constructor( ) { private val log = LogManager.getLogger(javaClass) - @Suppress("SpreadOperator") override fun doExecute(task: Task, request: PreviewTransformRequest, listener: ActionListener) { log.debug( @@ -87,7 +86,6 @@ class TransportPreviewTransformAction @Inject constructor( return issues } - fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener) { client.search( searchRequest, @@ -95,7 +93,8 @@ class TransportPreviewTransformAction @Inject constructor( override fun onResponse(response: SearchResponse) { try { val transformSearchResult = TransformSearchService.convertResponse( - transform = transform, searchResponse = response, waterMarkDocuments = false + transform = transform, searchResponse = response, waterMarkDocuments = false, + mappedTargetDateFields = emptyMap() ) val formattedResult = transformSearchResult.docsToIndex.map { it.sourceAsMap() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt index adcea9b9d..29a00f9a9 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformContext.kt @@ -10,8 +10,8 @@ package org.opensearch.indexmanagement.transform.util */ class TransformContext( val transformLockManager: TransformLockManager, + val targetDateFieldMapping: Map, var lastSuccessfulPageSize: Int? = null, - private var mappedTargetDateFields: Set? = null ) { fun getMaxRequestTimeoutInSeconds(): Long? { // Lock timeout must be greater than LOCK_BUFFER @@ -23,11 +23,7 @@ class TransformContext( return maxRequestTimeout } - fun updateMappedDateFields(mappedDateFields: Set) { - mappedTargetDateFields = mappedDateFields - } - - fun getMappedTargetDateFields() = mappedTargetDateFields + fun getMappedTargetDateFields() = targetDateFieldMapping suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) { transformLockManager.renewLockForLongSearch(timeSpentOnSearch) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt index 74861642d..c9880ea59 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/util/TransformUtils.kt @@ -6,11 +6,10 @@ import java.time.ZoneId * SPDX-License-Identifier: Apache-2.0 */ -private const val dateFormat = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ" +private const val DATE_FORMAT = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ" private val timezone: ZoneId = ZoneId.of("UTC") -private val dateFormatter = DateFormatter.forPattern(dateFormat).withZone(timezone) +private val dateFormatter = DateFormatter.forPattern(DATE_FORMAT).withZone(timezone) fun formatMillis( - targetValueMap: MutableMap, - mappedDateField: String, -) = dateFormatter.formatMillis(targetValueMap[mappedDateField] as Long) + dateTimeInMillis: Long, +): String = dateFormatter.formatMillis(dateTimeInMillis) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 43f26ccc7..bb2d3c82e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -286,7 +286,7 @@ class TransformRunnerIT : TransformRestTestCase() { } @Suppress("UNCHECKED_CAST") - fun `test transform target mapping date field same as source mapping date field`() { + fun `test transform term aggregation on date field generate target mapping same as source mapping date field`() { val sourceIdxTestName = "source_idx_test" val targetIdxTestName = "target_idx_test" @@ -297,7 +297,7 @@ class TransformRunnerIT : TransformRestTestCase() { validateSourceIndex(sourceIdxTestName) val transform = Transform( - id = "id_13", + id = "id_14", schemaVersion = 1L, enabled = true, enabledAt = Instant.now(), @@ -334,12 +334,12 @@ class TransformRunnerIT : TransformRestTestCase() { val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> - val sourcePickupType = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] - val targetPickupType = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] - assertEquals(sourcePickupType, targetPickupType) + assertEquals(sourcePickupDate, targetPickupDate) - val pickupDateTimeTerm = "pickupDateTerm" + val pickupDateTimeTerm = "pickupDateTerm14" val request = """ { @@ -369,8 +369,218 @@ class TransformRunnerIT : TransformRestTestCase() { // Verify the values of keys and metrics in all buckets for (i in rawAggBuckets.indices) { - assertEquals(rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) - assertEquals(rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + } + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform aggregation on date field`() { + val sourceIdxTestName = "source_idx_test" + val targetIdxTestName = "target_idx_test" + + val storeAndForward = "store_and_fwd_flag" + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val maxDateAggBuilder = AggregationBuilders.max(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_15", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = storeAndForward, targetField = storeAndForward) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(maxDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + waitFor { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map)[pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + val storeAndForwardTerm = "storeAndForwardTerm" + + val request = """ + { + "size": 0, + "aggs": { + "$storeAndForwardTerm": { + "terms": { + "field": "$storeAndForward", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"max": {"field": "$pickupDateTime"}} + } + } + } + } + """.trimIndent() + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[storeAndForwardTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Avg Fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Max pickup date times are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) + } + } + } + + @Suppress("UNCHECKED_CAST") + fun `test transform term on date field and aggregation on date field`() { + val sourceIdxTestName = "source_idx_test" + val targetIdxTestName = "target_idx_test" + + val pickupDateTime = "tpep_pickup_datetime" + val pickupDateTimeTerm = pickupDateTime.plus("_term") + val fareAmount = "fare_amount" + + validateSourceIndex(sourceIdxTestName) + + val avgFareAmountAgg = AggregationBuilders.avg(fareAmount).field(fareAmount) + val countDateAggBuilder = AggregationBuilders.count(pickupDateTime).field(pickupDateTime) + + val transform = Transform( + id = "id_16", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform doc values must be the same", + metadataId = null, + sourceIndex = sourceIdxTestName, + targetIndex = targetIdxTestName, + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = pickupDateTime, targetField = pickupDateTimeTerm) + ), + aggregations = AggregatorFactories.builder().addAggregator(avgFareAmountAgg).addAggregator(countDateAggBuilder) + ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) + + waitFor { + assertTrue("Target transform index was not created", indexExists(transform.targetIndex)) + } + + Thread.sleep(30000) + + waitFor { + val transformJob = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> + val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + val sourceProperties = ((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map)["properties"] as Map) + val targetProperties = ((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map)["properties"] as Map) + + val sourcePickupDate = (sourceProperties [pickupDateTime] as Map)["type"] + val targetPickupDateTerm = (targetProperties [pickupDateTimeTerm] as Map)["type"] + + assertEquals("date", targetPickupDateTerm) + assertEquals(sourcePickupDate, targetPickupDateTerm) + + val targetPickupDate = (targetProperties [pickupDateTime] as Map)["type"] + + assertEquals("date", targetPickupDate) + assertEquals(sourcePickupDate, targetPickupDate) + + val sourceRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """.trimIndent() + + val targetRequest = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTimeTerm", "order": { "_key": "asc" } + }, + "aggs": { + "$fareAmount": { "avg": { "field": "$fareAmount" } }, + "$pickupDateTime": {"value_count": {"field": "$pickupDateTime"}} + } + } + } + } + """.trimIndent() + + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(sourceRequest, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(targetRequest, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["fareAmount"], transformAggBuckets[i]["fareAmount"]) + assertEquals("Count pickup dates are not the same", rawAggBuckets[i][pickupDateTime]!!["value"], transformAggBuckets[i][pickupDateTime]!!["value"]) } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt index 9a7b050ff..ced54b281 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/util/TransformContextTests.kt @@ -18,7 +18,7 @@ class TransformContextTests : OpenSearchTestCase() { @Throws(Exception::class) fun setup() { transformLockManager = Mockito.mock(TransformLockManager::class.java) - transformContext = TransformContext(transformLockManager) + transformContext = TransformContext(transformLockManager, emptyMap()) } fun `test getMaxRequestTimeoutInSeconds`() {