Skip to content

Commit

Permalink
202: Enabled target index date field mappings if those fields are use…
Browse files Browse the repository at this point in the history
…d in aggregations or as a term aggregation for defining the buckets

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and stevanbuzejic committed May 11, 2023
1 parent ec1b9f8 commit 130a533
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@

package org.opensearch.indexmanagement.transform

import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.client.Client
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.string
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.exceptions.TransformIndexException
import org.opensearch.indexmanagement.transform.model.Transform
import org.opensearch.indexmanagement.util.IndexUtils
import org.opensearch.search.aggregations.AggregationBuilder
import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder

/**
* Service designed for creating dynamic target index mapping based on the date field types of the source index.
* Creates target index date properties based on the date properties of the source index
* (ie. if the term grouping is applied on a date field of source index, target index field will have date type also)
*/
class TargetIndexMappingService(private val client: Client) {
private val logger = LogManager.getLogger(javaClass)
companion object {
private const val TYPE = "type"
private const val PROPERTIES = "properties"
Expand All @@ -32,22 +36,34 @@ class TargetIndexMappingService(private val client: Client) {
private const val DYNAMIC_TEMPLATE = "dynamic_templates"
private const val MATCH_MAPPING_TYPE = "match_mapping_type"
private const val MAPPING = "mapping"
private const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis"
}
suspend fun getTargetMappingsForDates(transform: Transform): Map<String, Any> {
val sourceIndex = transform.sourceIndex
try {
val result: GetMappingsResponse = client.admin().indices().suspendUntil {
getMappings(GetMappingsRequest().indices(sourceIndex), it)
} ?: error("GetMappingResponse for [$transform.sourceIndex] was null")

suspend fun buildTargetIndexMapping(transform: Transform): Pair<String, Set<String>?> {
val request = GetMappingsRequest().indices(transform.sourceIndex)
val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap

val result: GetMappingsResponse = client.admin().indices().suspendUntil { getMappings(request, it) }
?: error("GetMappingResponse for [$transform.sourceIndex] was null")

if (result.mappings[transform.sourceIndex] == null) {
return Pair(IndexManagementIndices.transformTargetMappings, null)
val dateFieldMappings = mutableMapOf<String, Any>()
if (!sourceIndexMapping.isNullOrEmpty()) {
mapDateTermAggregation(transform, sourceIndexMapping, dateFieldMappings)
mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, dateFieldMappings, null)
}
return dateFieldMappings
} catch (ex: IndexNotFoundException) {
logger.error("Index $sourceIndex doesn't exist")
return emptyMap()
}
}

val sourceIndexMapping = result.mappings[transform.sourceIndex].sourceAsMap

val dateCompositeAggregations = mutableMapOf<String, Any>()
val mappedDateFields = mutableSetOf<String>()
private fun mapDateTermAggregation(
transform: Transform,
sourceIndexMapping: MutableMap<String, Any>,
dateFieldMappings: MutableMap<String, Any>,
) {
transform.groups.forEach { dimension ->
if (!isFieldInMappings(dimension.sourceField, sourceIndexMapping)) {
throw TransformIndexException("Missing field ${dimension.sourceField} in source index")
Expand All @@ -56,18 +72,13 @@ class TargetIndexMappingService(private val client: Client) {
// Consider only date fields as relevant for building the target index mapping
if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && sourceFieldType[TYPE] == "date") {
// Taking the source field settings (type, format etc.)
// 1213213213213
val dateTypeTargetMapping = mapOf("type" to "date", "format" to "strict_date_optional_time||epoch_millis")
// sourceIndex.dateField.format[0] -->
dateCompositeAggregations[dimension.targetField] = dateTypeTargetMapping
mappedDateFields.add(dimension.targetField)
val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT)
dateFieldMappings[dimension.targetField] = dateTypeTargetMapping
}
}

return Pair(mapTargetIndex(dateCompositeAggregations), mappedDateFields)
}

private fun mapTargetIndex(dateCompositeAggregations: MutableMap<String, Any>): String {
fun createTargetIndexMapping(dateCompositeAggregations: Map<String, Any>): String {
// Build static properties
val builder = XContentFactory.jsonBuilder().startObject()
.startObject(METADATA)
Expand Down Expand Up @@ -113,5 +124,37 @@ class TargetIndexMappingService(private val client: Client) {
}
}

@Suppress("UNCHECKED_CAST")
private fun mapDateAggregation(
aggBuilders: Collection<AggregationBuilder>,
sourceIndexMapping: Map<String, Any>,
targetIndexMapping: MutableMap<String, Any>,
parentPath: String?
) {
val iterator = aggBuilders.iterator()
while (iterator.hasNext()) {

val aggBuilder = iterator.next()
val targetIdxFieldName = aggBuilder.name
// In the case of a date field used in aggregation - MIN, MAX or COUNT
if (aggBuilder is ValuesSourceAggregationBuilder<*>) {
val sourceIdxFieldName = aggBuilder.field()

val sourceFieldType = IndexUtils.getFieldFromMappings(sourceIdxFieldName, sourceIndexMapping)
// Consider only aggregations on date field type
if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") {
val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT)
// In the case if sub-aggregation exist
val fullPath = parentPath?.plus(".")?.plus(targetIndexMapping) ?: targetIdxFieldName
targetIndexMapping[fullPath] = dateTypeTargetMapping
}
}
if (aggBuilder.subAggregations.isNullOrEmpty()) {
continue
}
// Do the same for all sub-aggregations
mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, targetIdxFieldName)
}
}
private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.indexmanagement.transform

import formatMillis
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
Expand Down Expand Up @@ -55,23 +54,18 @@ class TransformIndexer(
}
}

private suspend fun createTargetIndex(transform: Transform): Set<String>? {
private suspend fun createTargetIndex(transform: Transform, targetFieldMappings: Map<String, Any>) {
val index = transform.targetIndex
var transformDateMappedFields: Set<String>? = null

if (!clusterService.state().routingTable.hasIndex(index)) {
val transformMappings = targetIndexMappingService.buildTargetIndexMapping(transform)
val transformTargetIndexMapping = transformMappings.first
val transformTargetIndexMapping = targetIndexMappingService.createTargetIndexMapping(targetFieldMappings)
val request = CreateIndexRequest(index).mapping(transformTargetIndexMapping)
// TODO: Read in the actual mappings from the source index and use that
val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
if (!response.isAcknowledged) {
logger.error("Failed to create the target index $index")
throw TransformIndexException("Failed to create the target index")
}
transformDateMappedFields = transformMappings.second
}
return transformDateMappedFields
}

@Suppress("ThrowsCount", "RethrowCaughtException")
Expand All @@ -84,11 +78,7 @@ class TransformIndexer(
val targetIndex = updatableDocsToIndex.first().index()
logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")

val dateMappedFields = createTargetIndex(transform)
dateMappedFields?.let { transformContext.updateMappedDateFields(it) }

updateTargetDateFieldValues(updatableDocsToIndex, transformContext.getMappedTargetDateFields())

createTargetIndex(transform, transformContext.getMappedTargetDateFields())
backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(updatableDocsToIndex)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
Expand Down Expand Up @@ -125,21 +115,4 @@ class TransformIndexer(
throw TransformIndexException("Failed to index the documents", e)
}
}

private fun updateTargetDateFieldValues(updatableDocsToIndex: List<DocWriteRequest<*>>, mappedTargetDateFields: Set<String>?) {
if (mappedTargetDateFields.isNullOrEmpty()) {
return
}

for (docToBeWritten in updatableDocsToIndex) {
val targetValueMap = (docToBeWritten as IndexRequest).sourceAsMap()
for (mappedDateField in mappedTargetDateFields) {
if (targetValueMap[mappedDateField] == null) {
throw TransformIndexException("Missing field $mappedDateField in target index")
}
targetValueMap[mappedDateField] = formatMillis(targetValueMap, mappedDateField)
docToBeWritten.source(targetValueMap)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object TransformRunner :
private lateinit var transformSearchService: TransformSearchService
private lateinit var transformIndexer: TransformIndexer
private lateinit var transformValidator: TransformValidator
private lateinit var targetIndexMappingService: TargetIndexMappingService
private lateinit var threadPool: ThreadPool

fun initialize(
Expand All @@ -66,12 +67,14 @@ object TransformRunner :
): TransformRunner {
this.clusterService = clusterService
this.client = client
this.targetIndexMappingService = TargetIndexMappingService(client)
this.xContentRegistry = xContentRegistry
this.settings = settings
this.transformSearchService = TransformSearchService(settings, clusterService, client)
this.transformMetadataService = TransformMetadataService(client, xContentRegistry)
this.transformIndexer = TransformIndexer(settings, clusterService, client, TargetIndexMappingService(client))
this.transformIndexer = TransformIndexer(settings, clusterService, client, targetIndexMappingService)
this.transformValidator = TransformValidator(indexNameExpressionResolver, clusterService, client, settings, jvmService)

this.threadPool = threadPool
return this
}
Expand Down Expand Up @@ -108,7 +111,13 @@ object TransformRunner :
val transformProcessedBucketLog = TransformProcessedBucketLog()
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)

val transformContext = TransformContext(TransformLockManager(transform, context))
// If date was used in term query generate target date field mapping and store it in transform context
val targetDateFieldMapping = targetIndexMappingService.getTargetMappingsForDates(transform)
val transformContext = TransformContext(
TransformLockManager(transform, context),
targetDateFieldMapping
)

// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
val transformLockManager = transformContext.transformLockManager
transformLockManager.acquireLockForScheduledJob()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.transform

import formatMillis
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
Expand All @@ -31,7 +32,6 @@ import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.index.shard.ShardId
import org.opensearch.indexmanagement.common.model.dimension.Dimension
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.opensearchapi.retry
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.exceptions.TransformSearchServiceException
Expand Down Expand Up @@ -207,7 +207,12 @@ class TransformSearchService(
// If the request was successful, update page size
transformContext.lastSuccessfulPageSize = pageSize
transformContext.renewLockForLongSearch(Instant.now().epochSecond - searchStart)
return convertResponse(transform, searchResponse, modifiedBuckets = modifiedBuckets)
return convertResponse(
transform,
searchResponse,
modifiedBuckets = modifiedBuckets,
mappedTargetDateFields = transformContext.getMappedTargetDateFields()
)
} catch (e: TransformSearchServiceException) {
throw e
} catch (e: RemoteTransportException) {
Expand Down Expand Up @@ -334,7 +339,8 @@ class TransformSearchService(
transform: Transform,
searchResponse: SearchResponse,
waterMarkDocuments: Boolean = true,
modifiedBuckets: MutableSet<Map<String, Any>>? = null
modifiedBuckets: MutableSet<Map<String, Any>>? = null,
mappedTargetDateFields: Map<String, Any>,
): TransformSearchResult {
val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation
val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets
Expand All @@ -349,8 +355,18 @@ class TransformSearchService(
val hashedId = hashToFixedSize(id)

val document = transform.convertToDoc(aggregatedBucket.docCount, waterMarkDocuments)
aggregatedBucket.key.entries.forEach { bucket -> document[bucket.key] = bucket.value }
aggregatedBucket.aggregations.forEach { aggregation -> document[aggregation.name] = getAggregationValue(aggregation) }
aggregatedBucket.key.entries.forEach { bucket ->
// Check if the date is used as a term (exists in conversion list) - if it is,
// convert the date (which is in epoch time millis format to ISO 8601)
if (mappedTargetDateFields.isNullOrEmpty() || !mappedTargetDateFields.containsKey(bucket.key)) {
document[bucket.key] = bucket.value
} else {
document[bucket.key] = formatMillis(bucket.value as Long)
}
}
aggregatedBucket.aggregations.forEach { aggregation ->
document[aggregation.name] = getAggregationValue(aggregation, mappedTargetDateFields)
}

val indexRequest = IndexRequest(transform.targetIndex)
.id(hashedId)
Expand All @@ -371,16 +387,14 @@ class TransformSearchService(
return BucketSearchResult(modifiedBuckets, aggs.afterKey(), searchResponse.took.millis)
}

private fun getAggregationValue(aggregation: Aggregation): Any {
private fun getAggregationValue(aggregation: Aggregation, mappedTargetDateFields: Map<String, Any>): Any {
return when (aggregation) {
is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> {
val agg = aggregation as NumericMetricsAggregation.SingleValue
// If value and value_as_string differs (ie. for date fields), transform should be aware of
if (agg.value().toString() == agg.valueAsString) {
if (mappedTargetDateFields.isEmpty() || !mappedTargetDateFields.containsKey(agg.name)) {
agg.value()
} else {
// example of agg - {"min_timestamp":{"value":1.662856742912E12,"value_as_string":"2022-09-11T00:39:02.912Z"}}
agg.convertToMap()[agg.name] ?: agg.value()
formatMillis(agg.value().toLong())
}
}
is Percentiles -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ class TransportPreviewTransformAction @Inject constructor(

return issues
}

fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener<PreviewTransformResponse>) {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
try {
val transformSearchResult = TransformSearchService.convertResponse(
transform = transform, searchResponse = response, waterMarkDocuments = false
transform = transform, searchResponse = response, waterMarkDocuments = false,
mappedTargetDateFields = emptyMap()
)
val formattedResult = transformSearchResult.docsToIndex.map {
it.sourceAsMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ package org.opensearch.indexmanagement.transform.util
*/
class TransformContext(
val transformLockManager: TransformLockManager,
val targetDateFieldMapping: Map<String, Any>,
var lastSuccessfulPageSize: Int? = null,
private var mappedTargetDateFields: Set<String>? = null
) {
fun getMaxRequestTimeoutInSeconds(): Long? {
// Lock timeout must be greater than LOCK_BUFFER
Expand All @@ -23,11 +23,7 @@ class TransformContext(
return maxRequestTimeout
}

fun updateMappedDateFields(mappedDateFields: Set<String>) {
mappedTargetDateFields = mappedDateFields
}

fun getMappedTargetDateFields() = mappedTargetDateFields
fun getMappedTargetDateFields() = targetDateFieldMapping

suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) {
transformLockManager.renewLockForLongSearch(timeSpentOnSearch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import java.time.ZoneId
* SPDX-License-Identifier: Apache-2.0
*/

private const val dateFormat = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ"
private const val DATE_FORMAT = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ"
private val timezone: ZoneId = ZoneId.of("UTC")
private val dateFormatter = DateFormatter.forPattern(dateFormat).withZone(timezone)
private val dateFormatter = DateFormatter.forPattern(DATE_FORMAT).withZone(timezone)

fun formatMillis(
targetValueMap: MutableMap<String, Any>,
mappedDateField: String,
) = dateFormatter.formatMillis(targetValueMap[mappedDateField] as Long)
dateTimeInMillis: Long,
): String = dateFormatter.formatMillis(dateTimeInMillis)
Loading

0 comments on commit 130a533

Please sign in to comment.