Skip to content

Commit

Permalink
Added function for creating target index mapping that considers trans…
Browse files Browse the repository at this point in the history
…form mapping json. Target date field mappings are generated after transform validation when running transform. Removed target index date field values formatting. emoved default format for date_histogram because of the rollup. Updated schema version in test.

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz committed May 31, 2023
1 parent 4516b52 commit 96af8b6
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.index.query.AbstractQueryBuilder
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder
Expand All @@ -30,7 +29,7 @@ data class DateHistogram(
val fixedInterval: String? = null,
val calendarInterval: String? = null,
val timezone: ZoneId = ZoneId.of(UTC),
val format: String? = DEFAULT_DATE_FORMAT
val format: String? = null
) : Dimension(Type.DATE_HISTOGRAM, sourceField, targetField) {

init {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand All @@ -21,6 +28,8 @@ import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

/**
* Service designed for creating dynamic target index mapping based on the date field types of the source index.
Expand All @@ -33,11 +42,7 @@ object TargetIndexMappingService {

private const val TYPE = "type"
private const val PROPERTIES = "properties"
private const val METADATA = "_meta"
private const val SCHEMA_VERSION = "schema_version"
private const val DYNAMIC_TEMPLATE = "dynamic_templates"
private const val MATCH_MAPPING_TYPE = "match_mapping_type"
private const val MAPPING = "mapping"
private val DATE_FIELD_TYPES = setOf("date", "date_nanos")

fun initialize(client: Client) {
this.client = client
Expand Down Expand Up @@ -97,35 +102,58 @@ object TargetIndexMappingService {
}

private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) =
sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos")
sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE])

/**
* Loads transform target index mappings from json and adds date properties mapping
*
* @param dateFieldMappings target index date fields mappings
*/
fun createTargetIndexMapping(dateFieldMappings: Map<String, Any>): String {
/** TODO - Check if the mappings from file can be loaded into the XContentBuilder
* val dynamicMappings = IndexManagementIndices.transformTargetMappings
* val mappings = createTargetIndexMappingsAsString(dateFieldMappings, dynamicMappings)
*/
// Build static properties
val builder = XContentFactory.jsonBuilder().startObject()
.startObject(METADATA)
.field(SCHEMA_VERSION, 1)
.endObject()
.startArray(DYNAMIC_TEMPLATE)
.startObject()
.startObject("strings")
.field(MATCH_MAPPING_TYPE, "string")
.startObject(MAPPING)
.field(TYPE, "keyword")
.endObject()
.endObject()
.endObject()
.endArray()
.startObject(PROPERTIES)

// Dynamically build composite aggregation mapping
mapCompositeAggregation(dateFieldMappings, builder)
return builder.endObject()
.endObject()
.string()
val builder = XContentFactory.jsonBuilder()
val dynamicMappings = IndexManagementIndices.transformTargetMappings
val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8))
val bytesReference = BytesReference.fromByteBuffer(byteBuffer)

val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
bytesReference,
XContentType.JSON
)
loop@while (!xcp.isClosed) {
val token = xcp.currentToken()
val fieldName = xcp.currentName()

when (token) {
XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue())
XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text())
XContentParser.Token.START_OBJECT -> {
if (fieldName != null) {
builder.startObject(fieldName)
} else {
builder.startObject()
}
}
XContentParser.Token.END_OBJECT -> builder.endObject()
XContentParser.Token.START_ARRAY -> builder.startArray(fieldName)
XContentParser.Token.END_ARRAY -> {
builder.endArray()
// Add target index date fields mappings only if the date field mappings are present
if (dateFieldMappings.isNotEmpty()) {
builder.startObject(PROPERTIES)
mapCompositeAggregation(dateFieldMappings, builder)
builder.endObject()
}
}
else -> {
xcp.nextToken()
continue@loop
}
}
xcp.nextToken()
}
return builder.string()
}

@Suppress("UNCHECKED_CAST")
Expand All @@ -142,7 +170,9 @@ object TargetIndexMappingService {
mapCompositeAggregation(it.value as Map<String, Any>, builder)
builder.endObject()
} else {
builder.field(it.key, it.value.toString())
if (DATE_FIELD_TYPES.contains(it.value)) {
builder.field(it.key, it.value.toString())
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,8 @@ object TransformRunner :
val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)

// If date was used in term query generate target date field mapping and store it in transform context
val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform)
val transformContext = TransformContext(
TransformLockManager(transform, context),
targetIndexDateFieldMappings
TransformLockManager(transform, context)
)

// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
Expand All @@ -136,6 +133,10 @@ object TransformRunner :
currentMetadata = validatedMetadata
return
}
// If date was used in term query generate target date field mapping and store it in transform context
val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform)
transformContext.setTargetDateFieldMappings(targetIndexDateFieldMappings)

if (transform.continuous) {
// If we have not populated the list of shards to search, do so now
if (bucketsToTransform.shardsToSearch == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.indexmanagement.transform

import org.opensearch.indexmanagement.transform.util.formatMillis
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
Expand Down Expand Up @@ -356,14 +355,7 @@ class TransformSearchService(

val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments)
aggregatedBucket.key.entries.forEach { bucket ->
// Check if the date is used as a term (exists in conversion list) - if it is,
// convert the date (which is in epoch time millis format to ISO 8601)
// document[bucket.key] = bucket.value
if (targetIndexDateFieldMappings.isNullOrEmpty() || !targetIndexDateFieldMappings.containsKey(bucket.key)) {
document[bucket.key] = bucket.value
} else {
document[bucket.key] = formatMillis(bucket.value as Long)
}
document[bucket.key] = bucket.value
}
aggregatedBucket.aggregations.forEach { aggregation ->
document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings)
Expand Down Expand Up @@ -392,10 +384,15 @@ class TransformSearchService(
return when (aggregation) {
is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> {
val agg = aggregation as NumericMetricsAggregation.SingleValue
/**
* When date filed is used in transform aggregation (min, max avg), the value of the field is in exponential format
* which is not allowed since the target index mapping for date field is strict_date_optional_time||epoch_millis
* That's why the exponential value is transformed to long: agg.value().toLong()
*/
if (aggregation is InternalValueCount || aggregation is InternalSum || !targetIndexDateFieldMappings.containsKey(agg.name)) {
agg.value()
} else {
formatMillis(agg.value().toLong())
agg.value().toLong()
}
}
is Percentiles -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ package org.opensearch.indexmanagement.transform.util
*/
class TransformContext(
val transformLockManager: TransformLockManager,
val targetDateFieldMappings: Map<String, Any>,
var lastSuccessfulPageSize: Int? = null,
) {
private lateinit var targetDateFieldMappings: Map<String, Any>

fun getMaxRequestTimeoutInSeconds(): Long? {
// Lock timeout must be greater than LOCK_BUFFER
var maxRequestTimeout = transformLockManager.lockExpirationInSeconds()?.minus(LOCK_BUFFER_SECONDS)
Expand All @@ -25,6 +26,10 @@ class TransformContext(

fun getTargetIndexDateFieldMappings() = targetDateFieldMappings

fun setTargetDateFieldMappings(dateFieldMappings: Map<String, Any>) {
this.targetDateFieldMappings = dateFieldMappings
}

suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) {
transformLockManager.renewLockForLongSearch(timeSpentOnSearch)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 17
val configSchemaVersion = 18
val historySchemaVersion = 5

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform

import org.junit.Assert
import org.opensearch.test.OpenSearchTestCase

class TargetIndexMappingServiceTests : OpenSearchTestCase() {

fun `test create target index mapping fields mapped correctly`() {
val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"tpep_pickup_datetime":{"type":"date"}}}"""
val dateFieldMap = mapOf("tpep_pickup_datetime" to mapOf("type" to "date"))
val result = TargetIndexMappingService.createTargetIndexMapping(dateFieldMap)
Assert.assertNotNull(result)
assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent())
}

fun `test create target index mapping empty map`() {
val expectedResult = """{"_meta":{"schema_version":1},"dynamic_templates":[{"strings":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}]}"""
val result = TargetIndexMappingService.createTargetIndexMapping(emptyMap())
Assert.assertNotNull(result)
assertEquals("Target index mapping with date fields not correct", expectedResult.trimIndent(), result.trimIndent())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TransformRunnerIT : TransformRestTestCase() {
}

@Suppress("UNCHECKED_CAST")
fun `test transform term aggregation on date field generate target mapping same as source mapping date field`() {
fun `test transform term aggregation on date field generate target mapping same as source mapping for date field`() {
val sourceIdxTestName = "source_idx_test_14"
val targetIdxTestName = "target_idx_test_14"

Expand Down Expand Up @@ -411,7 +411,7 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Target transform index was not created", indexExists(transform.targetIndex))
}

waitFor {
waitFor(timeout = Instant.ofEpochSecond(30)) {
val transformJob = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.resthandler
import org.junit.AfterClass
import org.junit.Before
import org.opensearch.client.ResponseException
import org.opensearch.common.time.DateFormatter
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.TRANSFORM_BASE_URI
import org.opensearch.indexmanagement.common.model.dimension.Terms
Expand All @@ -20,12 +21,9 @@ import org.opensearch.rest.RestStatus
import org.opensearch.search.aggregations.AggregationBuilders
import org.opensearch.search.aggregations.AggregatorFactories
import java.time.Instant
import java.time.LocalDate
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoUnit
import java.util.Locale

@Suppress("UNCHECKED_CAST")
class RestPreviewTransformActionIT : TransformRestTestCase() {
Expand Down Expand Up @@ -111,9 +109,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
assertEquals("Preview transform failed", RestStatus.OK, response.restStatus())
val transformedDocs = response.asMap()["documents"] as List<Map<String, Any>>
assertEquals("Transformed docs have unexpected schema", expectedKeys, transformedDocs.first().keys)
val dateFormatter = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ", Locale.ROOT).withZone(ZoneId.of("UTC"))
val dateFormatter = DateFormatter.forPattern("uuuu-MM-dd'T'HH:mm:ss.SSSZZ").withZone(ZoneId.of("UTC"))
for (doc in transformedDocs) {
assertTrue(isValid(doc["tpep_pickup_datetime"] as? String, dateFormatter))
assertTrue(isValid(doc["tpep_pickup_datetime"].toString().toLong(), dateFormatter))
}
}

Expand Down Expand Up @@ -152,9 +150,9 @@ class RestPreviewTransformActionIT : TransformRestTestCase() {
}
}

private fun isValid(dateStr: String?, dateFormatter: DateTimeFormatter): Boolean {
private fun isValid(date: Long, dateFormatter: DateFormatter): Boolean {
try {
LocalDate.parse(dateStr, dateFormatter)
dateFormatter.formatMillis(date)
} catch (e: DateTimeParseException) {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class TransformContextTests : OpenSearchTestCase() {
@Throws(Exception::class)
fun setup() {
transformLockManager = Mockito.mock(TransformLockManager::class.java)
transformContext = TransformContext(transformLockManager, emptyMap())
transformContext = TransformContext(transformLockManager)
transformContext.setTargetDateFieldMappings(emptyMap())
}

fun `test getMaxRequestTimeoutInSeconds`() {
Expand Down

0 comments on commit 96af8b6

Please sign in to comment.