Skip to content

Commit

Permalink
Added function for creating target index mapping that considers trans…
Browse files Browse the repository at this point in the history
…form mapping json. Added unit tests. Removed target index date field values formatting.

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz committed May 31, 2023
1 parent 4516b52 commit c39b680
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand All @@ -21,6 +28,8 @@ import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

/**
* Service designed for creating dynamic target index mapping based on the date field types of the source index.
Expand All @@ -33,11 +42,7 @@ object TargetIndexMappingService {

private const val TYPE = "type"
private const val PROPERTIES = "properties"
private const val METADATA = "_meta"
private const val SCHEMA_VERSION = "schema_version"
private const val DYNAMIC_TEMPLATE = "dynamic_templates"
private const val MATCH_MAPPING_TYPE = "match_mapping_type"
private const val MAPPING = "mapping"
private val DATE_FIELD_TYPES = setOf("date", "date_nanos")

fun initialize(client: Client) {
this.client = client
Expand Down Expand Up @@ -97,35 +102,58 @@ object TargetIndexMappingService {
}

private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) =
sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos")
sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE])

/**
* Loads transform target index mappings from json and adds date properties mapping
*
* @param dateFieldMappings target index date fields mappings
*/
fun createTargetIndexMapping(dateFieldMappings: Map<String, Any>): String {
/** TODO - Check if the mappings from file can be loaded into the XContentBuilder
* val dynamicMappings = IndexManagementIndices.transformTargetMappings
* val mappings = createTargetIndexMappingsAsString(dateFieldMappings, dynamicMappings)
*/
// Build static properties
val builder = XContentFactory.jsonBuilder().startObject()
.startObject(METADATA)
.field(SCHEMA_VERSION, 1)
.endObject()
.startArray(DYNAMIC_TEMPLATE)
.startObject()
.startObject("strings")
.field(MATCH_MAPPING_TYPE, "string")
.startObject(MAPPING)
.field(TYPE, "keyword")
.endObject()
.endObject()
.endObject()
.endArray()
.startObject(PROPERTIES)

// Dynamically build composite aggregation mapping
mapCompositeAggregation(dateFieldMappings, builder)
return builder.endObject()
.endObject()
.string()
val builder = XContentFactory.jsonBuilder()
val dynamicMappings = IndexManagementIndices.transformTargetMappings
val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8))
val bytesReference = BytesReference.fromByteBuffer(byteBuffer)

val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
bytesReference,
XContentType.JSON
)
loop@while (!xcp.isClosed) {
val token = xcp.currentToken()
val fieldName = xcp.currentName()

when (token) {
XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue())
XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text())
XContentParser.Token.START_OBJECT -> {
if (fieldName != null) {
builder.startObject(fieldName)
} else {
builder.startObject()
}
}
XContentParser.Token.END_OBJECT -> builder.endObject()
XContentParser.Token.START_ARRAY -> builder.startArray(fieldName)
XContentParser.Token.END_ARRAY -> {
builder.endArray()
// Add target index date fields mappings only if the date field mappings are present
if (dateFieldMappings.isNotEmpty()) {
builder.startObject(PROPERTIES)
mapCompositeAggregation(dateFieldMappings, builder)
builder.endObject()
}
}
else -> {
xcp.nextToken()
continue@loop
}
}
xcp.nextToken()
}
return builder.string()
}

@Suppress("UNCHECKED_CAST")
Expand All @@ -142,7 +170,9 @@ object TargetIndexMappingService {
mapCompositeAggregation(it.value as Map<String, Any>, builder)
builder.endObject()
} else {
builder.field(it.key, it.value.toString())
if (DATE_FIELD_TYPES.contains(it.value)) {
builder.field(it.key, it.value.toString())
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.indexmanagement.transform

import org.opensearch.indexmanagement.transform.util.formatMillis
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
Expand Down Expand Up @@ -356,14 +355,7 @@ class TransformSearchService(

val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments)
aggregatedBucket.key.entries.forEach { bucket ->
// Check if the date is used as a term (exists in conversion list) - if it is,
// convert the date (which is in epoch time millis format to ISO 8601)
// document[bucket.key] = bucket.value
if (targetIndexDateFieldMappings.isNullOrEmpty() || !targetIndexDateFieldMappings.containsKey(bucket.key)) {
document[bucket.key] = bucket.value
} else {
document[bucket.key] = formatMillis(bucket.value as Long)
}
document[bucket.key] = bucket.value
}
aggregatedBucket.aggregations.forEach { aggregation ->
document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings)
Expand Down Expand Up @@ -392,10 +384,15 @@ class TransformSearchService(
return when (aggregation) {
is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> {
val agg = aggregation as NumericMetricsAggregation.SingleValue
/**
* When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format
* which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis
* That's why the exponential value is transformed to long: agg.value().toLong()
*/
if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) {
agg.value()
} else {
formatMillis(agg.value().toLong())
agg.value().toLong()
}
}
is Percentiles -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform

import org.junit.Assert
import org.opensearch.test.OpenSearchTestCase

class TargetIndexMappingServiceTests : OpenSearchTestCase() {

fun `test create target index mapping fields mapped correctly`() {
val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}"""
val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date"))
val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap)
Assert.assertNotNull(result)
assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent())
}

fun `test create target index mapping empty map`() {
val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}"""
val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap())
Assert.assertNotNull(result)
assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TransformRunnerIT : TransformRestTestCase() {
}

@Suppress("UNCHECKED_CAST")
fun `test transform term aggregation on date field generate target mapping same as source mapping date field`() {
fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() {
val sourceIdxTestName = "source_idx_test_14"
val targetIdxTestName = "target_idx_test_14"

Expand Down Expand Up @@ -411,7 +411,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Target transform index was not created", indexExists(transform.targetIndex))
}

waitFor {
waitFor(timeout = Instant.ofEpochSecond(30)) {
val transformJob = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.resthandler
import org.junit.AfterClass
import org.junit.Before
import org.opensearch.client.ResponseException
import org.opensearch.common.time.DateFormatter
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI
import org.opensearch.indexmanagement.common.model.dimension.Terms
Expand All @@ -20,12 +21,9 @@ import org.opensearch.rest.RestStatus
import org.opensearch.search.aggregations.AggregationBuilders
import org.opensearch.search.aggregations.AggregatorFactories
import java.time.Instant
import java.time.LocalDate
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoUnit
import java.util.Locale

@Suppress("UNCHECKED_CAST")
class RestPreviewTransformActionIT : TransformRestTestCase() {
Expand Down Expand Up @@ -111,9 +109,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
assertEquals("Preview transform failed", RestStatus.OK, response.restStatus())
val transformedDocs = response.asMap()["documents"] as List<Map<String, Any>>
assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys)
val dateFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT).withZone(ZoneId.of("UTC"))
val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC"))
for (doc in transformedDocs) {
assertTrue(isValid(doc["tpep_pickup_datetime"] as? String, dateFormatter))
assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter))
}
}

Expand Down Expand Up @@ -152,9 +150,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
}
}

private fun isValid(dateStr: String?, dateFormatter: DateTimeFormatter): Boolean {
private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean {
try {
LocalDate.parse(dateStr, dateFormatter)
dateFormatter.formatMillis(date)
} catch (e: DateTimeParseException) {
return false
}
Expand Down

0 comments on commit c39b680

Please sign in to comment.