Skip to content

Commit

Permalink
Bugfix/202 transform date add date conversion (#622)
Browse files Browse the repository at this point in the history
* 202: Added format property when specifying the date histogram

Signed-off-by: Stevan Buzejic <[email protected]>

* 202: Added component responsible for building the target index mapping once the transform is being triggered.

Signed-off-by: Stevan Buzejic <[email protected]>

* 202: date_histogram considered in the case of the creating the target index for the date fields when transform is executed

Signed-off-by: Stevan Buzejic <[email protected]>

* 202: Enabled target index date field mappings if those fields are used in aggregations or as a term aggregation for defining the buckets

Signed-off-by: Stevan Buzejic <[email protected]>

* Updated code according to comments. Added targetIndexMapping when transform preview action is triggered

Signed-off-by: Stevan Buzejic <[email protected]>

* Updated schema versions

Signed-off-by: Stevan Buzejic <[email protected]>

* Addressed the comments

Signed-off-by: Stevan Buzejic <[email protected]>

* Refactored transform tests related with aggregation based on a date field.
Updated transform preview action to consider target index mapping when using a date field. Kept formatting of the date field in target index.

Signed-off-by: Stevan Buzejic <[email protected]>

* detekt fix

Signed-off-by: Stevan Buzejic <[email protected]>

* Added zone in IT

Signed-off-by: Stevan Buzejic <[email protected]>

* Added function for creating target index mapping that considers transform mapping json. Target date field mappings are generated after transform validation when running transform. Removed target index date field values formatting. emoved default format for date_histogram because of the rollup. Updated schema version in test.

Signed-off-by: Stevan Buzejic <[email protected]>

---------

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored May 31, 2023
1 parent 6eaaf53 commit 42833b1
Show file tree
Hide file tree
Showing 16 changed files with 772 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.transform.TargetIndexMappingService
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction
import org.opensearch.indexmanagement.transform.action.delete.TransportDeleteTransformsAction
Expand Down Expand Up @@ -467,6 +468,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
indexNameExpressionResolver,
)

TargetIndexMappingService.initialize(client)

return listOf(
managedIndexRunner,
rollupRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ data class DateHistogram(
override val targetField: String = sourceField,
val fixedInterval: String? = null,
val calendarInterval: String? = null,
val timezone: ZoneId = ZoneId.of(UTC)
val timezone: ZoneId = ZoneId.of(UTC),
val format: String? = null
) : Dimension(Type.DATE_HISTOGRAM, sourceField, targetField) {

init {
Expand All @@ -54,6 +55,7 @@ data class DateHistogram(
return builder.field(DIMENSION_SOURCE_FIELD_FIELD, sourceField)
.field(DIMENSION_TARGET_FIELD_FIELD, targetField)
.field(DATE_HISTOGRAM_TIMEZONE_FIELD, timezone.id)
.field(FORMAT, format)
.endObject()
.endObject()
}
Expand Down Expand Up @@ -82,6 +84,9 @@ data class DateHistogram(
fixedInterval?.let {
this.fixedInterval(DateHistogramInterval(it))
}
format?.let {
this.format(it)
}
}
}

Expand Down Expand Up @@ -128,6 +133,7 @@ data class DateHistogram(
const val FIXED_INTERVAL_FIELD = "fixed_interval"
const val CALENDAR_INTERVAL_FIELD = "calendar_interval"
const val DATE_HISTOGRAM_TIMEZONE_FIELD = "timezone"
const val FORMAT = "format"

@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
Expand All @@ -138,6 +144,7 @@ data class DateHistogram(
var fixedInterval: String? = null
var calendarInterval: String? = null
var timezone = ZoneId.of(UTC)
var format: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -150,6 +157,7 @@ data class DateHistogram(
DATE_HISTOGRAM_TIMEZONE_FIELD -> timezone = ZoneId.of(xcp.text())
DIMENSION_SOURCE_FIELD_FIELD -> sourceField = xcp.text()
DIMENSION_TARGET_FIELD_FIELD -> targetField = xcp.text()
FORMAT -> format = xcp.textOrNull()
else -> throw IllegalArgumentException("Invalid field [$fieldName] found in date histogram")
}
}
Expand All @@ -159,7 +167,8 @@ data class DateHistogram(
targetField = requireNotNull(targetField) { "Target field must not be null" },
fixedInterval = fixedInterval,
calendarInterval = calendarInterval,
timezone = timezone
timezone = timezone,
format = format
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.transform.util.DEFAULT_DATE_FORMAT
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

/**
* Service designed for creating dynamic target index mapping based on the date field types of the source index.
* Creates target index date properties based on the date properties of the source index
* (ie. if the term grouping is applied on a date field of source index, target index field will have date type also)
*/
object TargetIndexMappingService {
private val logger = LogManager.getLogger(javaClass)
private lateinit var client: Client

private const val TYPE = "type"
private const val PROPERTIES = "properties"
private val DATE_FIELD_TYPES = setOf("date", "date_nanos")

fun initialize(client: Client) {
this.client = client
}

/**
*
* Check if the source index contains date fields and returns target index mapping for date fields by using default date format
* Example:
* input map: [tpep_pickup_datetime, [type: date]]
* target index mapping: "tpep_pickup_datetime": {
* "type": "date",
* "format": "strict_date_optional_time||epoch_millis"
* }
* @return map of the date properties
*
*/
suspend fun getTargetMappingsForDates(transform: Transform): Map<String, Any> {
val sourceIndex = transform.sourceIndex
try {
val result: GetMappingsResponse = client.admin().indices().suspendUntil {
getMappings(GetMappingsRequest().indices(sourceIndex), it)
} ?: error("GetMappingResponse for [$transform.sourceIndex] was null")

val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap

val targetIndexDateFieldMappings = mutableMapOf<String, Any>()
if (!sourceIndexMapping.isNullOrEmpty()) {
mapDateTermAggregation(transform, sourceIndexMapping, targetIndexDateFieldMappings)
mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, targetIndexDateFieldMappings, null)
}
return targetIndexDateFieldMappings
} catch (ex: IndexNotFoundException) {
logger.error("Index $sourceIndex doesn't exist")
return emptyMap()
}
}

private fun mapDateTermAggregation(
transform: Transform,
sourceIndexMapping: MutableMap<String, Any>,
dateFieldMappings: MutableMap<String, Any>,
) {
transform.groups.forEach { dimension ->
if (!isFieldInMappings(dimension.sourceField, sourceIndexMapping)) {
throw TransformIndexException("Missing field ${dimension.sourceField} in source index")
}
val sourceFieldType = IndexUtils.getFieldFromMappings(dimension.sourceField, sourceIndexMapping)
// Consider only date fields as relevant for building the target index mapping
// Excluding date histogram since user can define format in it
if (dimension !is DateHistogram && isSourceFieldDate(sourceFieldType)) {
// Taking the source field settings (type, format etc.)
val dateTypeTargetMapping = mapOf("type" to sourceFieldType!![TYPE], "format" to DEFAULT_DATE_FORMAT)
dateFieldMappings[dimension.targetField] = dateTypeTargetMapping
}
}
}

private fun isSourceFieldDate(sourceFieldType: Map<*, *>?) =
sourceFieldType?.get(TYPE) != null && DATE_FIELD_TYPES.contains(sourceFieldType[TYPE])

/**
* Loads transform target index mappings from json and adds date properties mapping
*
* @param dateFieldMappings target index date fields mappings
*/
fun createTargetIndexMapping(dateFieldMappings: Map<String, Any>): String {
val builder = XContentFactory.jsonBuilder()
val dynamicMappings = IndexManagementIndices.transformTargetMappings
val byteBuffer = ByteBuffer.wrap(dynamicMappings.toByteArray(StandardCharsets.UTF_8))
val bytesReference = BytesReference.fromByteBuffer(byteBuffer)

val xcp = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
bytesReference,
XContentType.JSON
)
loop@while (!xcp.isClosed) {
val token = xcp.currentToken()
val fieldName = xcp.currentName()

when (token) {
XContentParser.Token.VALUE_NUMBER -> builder.field(fieldName, xcp.intValue())
XContentParser.Token.VALUE_STRING -> builder.field(fieldName, xcp.text())
XContentParser.Token.START_OBJECT -> {
if (fieldName != null) {
builder.startObject(fieldName)
} else {
builder.startObject()
}
}
XContentParser.Token.END_OBJECT -> builder.endObject()
XContentParser.Token.START_ARRAY -> builder.startArray(fieldName)
XContentParser.Token.END_ARRAY -> {
builder.endArray()
// Add target index date fields mappings only if the date field mappings are present
if (dateFieldMappings.isNotEmpty()) {
builder.startObject(PROPERTIES)
mapCompositeAggregation(dateFieldMappings, builder)
builder.endObject()
}
}
else -> {
xcp.nextToken()
continue@loop
}
}
xcp.nextToken()
}
return builder.string()
}

@Suppress("UNCHECKED_CAST")
private fun mapCompositeAggregation(
compositeAggregation: Map<String, Any>,
builder: XContentBuilder,
) {
val iterator = compositeAggregation.entries.iterator()
while (iterator.hasNext()) {
val it = iterator.next()
if (it.value is Map<*, *>) {
builder.startObject(it.key)
// Start object until reaching the "leaf"; leaf is the last key value pair, where value is not a map
mapCompositeAggregation(it.value as Map<String, Any>, builder)
builder.endObject()
} else {
if (DATE_FIELD_TYPES.contains(it.value)) {
builder.field(it.key, it.value.toString())
}
}
}
}

/**
* Creates properties section in target index mappings based on the given date fields
* Parses target index mapping as a string - instead of using XContentBuilder
*/
@Suppress("UNUSED_PARAMETER")
private fun createTargetIndexMappingsAsString(
dateFieldMappings: Map<String, Any>,
dynamicMappings: String,
): String {
val compositeAgg = mapCompositeAggregationToString(dateFieldMappings)
return dynamicMappings.trimIndent().dropLast(1) + ", \n \"properties\" : \n { \n $compositeAgg \n } \n }"
}

@Suppress("UNCHECKED_CAST")
private fun mapCompositeAggregationToString(
compositeAggregation: Map<String, Any>,
): String {
return buildString {
var isFirst = true
val iterator = compositeAggregation.entries.iterator()
while (iterator.hasNext()) {
val it = iterator.next()
if (!isFirst) {
append(",")
}
isFirst = false
if (it.value is Map<*, *>) {
append("\"${it.key}\" : {")
append(mapCompositeAggregationToString(it.value as Map<String, Any>))
append("\n }")
} else {
append("\n")
append("\"${it.key}\" : \"${it.value}\"")
}
}
}
}

private fun mapDateAggregation(
aggBuilders: Collection<AggregationBuilder>,
sourceIndexMapping: Map<String, Any>,
targetIndexMapping: MutableMap<String, Any>,
parentPath: String?,
) {
val iterator = aggBuilders.iterator()
while (iterator.hasNext()) {

val aggBuilder = iterator.next()
val targetIdxFieldName = aggBuilder.name
val fullPath = parentPath?.plus(".")?.plus(targetIdxFieldName) ?: targetIdxFieldName
// In the case of a date field used in aggregation - MIN, MAX or COUNT
if (aggBuilder is ValuesSourceAggregationBuilder<*>) {
val sourceIdxFieldName = aggBuilder.field()

val sourceFieldType = IndexUtils.getFieldFromMappings(sourceIdxFieldName, sourceIndexMapping)
// Consider only aggregations on date field type
if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") {
val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT)
// In the case if sub-aggregation exist
targetIndexMapping[fullPath] = dateTypeTargetMapping
}
}
if (aggBuilder.subAggregations.isNullOrEmpty()) {
continue
}
// Do the same for all sub-aggregations
mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, fullPath)
}
}
private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.opensearchapi.retry
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indexmanagement.transform.util.TransformContext
import org.opensearch.rest.RestStatus
import org.opensearch.transport.RemoteTransportException

Expand All @@ -36,7 +36,8 @@ class TransformIndexer(

private val logger = LogManager.getLogger(javaClass)

@Volatile private var backoffPolicy = BackoffPolicy.constantBackoff(
@Volatile
private var backoffPolicy = BackoffPolicy.constantBackoff(
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS.get(settings),
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT.get(settings)
)
Expand All @@ -51,29 +52,30 @@ class TransformIndexer(
}
}

private suspend fun createTargetIndex(index: String) {
if (!clusterService.state().routingTable.hasIndex(index)) {
val request = CreateIndexRequest(index)
.mapping(IndexManagementIndices.transformTargetMappings)
private suspend fun createTargetIndex(targetIndex: String, targetFieldMappings: Map<String, Any>) {
if (!clusterService.state().routingTable.hasIndex(targetIndex)) {
val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings)
val request = CreateIndexRequest(targetIndex).mapping(transformTargetIndexMapping)
// TODO: Read in the actual mappings from the source index and use that
val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
if (!response.isAcknowledged) {
logger.error("Failed to create the target index $index")
logger.error("Failed to create the target index $targetIndex")
throw TransformIndexException("Failed to create the target index")
}
}
}

@Suppress("ThrowsCount", "RethrowCaughtException")
suspend fun index(docsToIndex: List<DocWriteRequest<*>>): Long {
suspend fun index(transformTargetIndex: String, docsToIndex: List<DocWriteRequest<*>>, transformContext: TransformContext): Long {
var updatableDocsToIndex = docsToIndex
var indexTimeInMillis = 0L
val nonRetryableFailures = mutableListOf<BulkItemResponse>()
try {
if (updatableDocsToIndex.isNotEmpty()) {
val targetIndex = updatableDocsToIndex.first().index()
logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")
createTargetIndex(targetIndex)

createTargetIndex(transformTargetIndex, transformContext.getTargetIndexDateFieldMappings())
backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(updatableDocsToIndex)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
Expand Down
Loading

0 comments on commit 42833b1

Please sign in to comment.