Skip to content

Commit

Permalink
Updated code according to comments. Added targetIndexMapping when tra…
Browse files Browse the repository at this point in the history
…nsform preview action is triggered

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and stevanbuzejic committed May 11, 2023
1 parent 130a533 commit d630fe9
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.transform.TargetIndexMappingService
import org.opensearch.indexmanagement.transform.TransformRunner
import org.opensearch.indexmanagement.transform.action.delete.DeleteTransformsAction
import org.opensearch.indexmanagement.transform.action.delete.TransportDeleteTransformsAction
Expand Down Expand Up @@ -439,6 +440,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val pluginVersionSweepCoordinator = PluginVersionSweepCoordinator(skipFlag, settings, threadPool, clusterService)

TargetIndexMappingService.initialize(client)

return listOf(
managedIndexRunner,
rollupRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,23 @@ import org.opensearch.search.aggregations.support.ValuesSourceAggregationBuilder
* Creates target index date properties based on the date properties of the source index
* (ie. if the term grouping is applied on a date field of source index, target index field will have date type also)
*/
class TargetIndexMappingService(private val client: Client) {
object TargetIndexMappingService {
private val logger = LogManager.getLogger(javaClass)
companion object {
private const val TYPE = "type"
private const val PROPERTIES = "properties"
private const val METADATA = "_meta"
private const val SCHEMA_VERSION = "schema_version"
private const val DYNAMIC_TEMPLATE = "dynamic_templates"
private const val MATCH_MAPPING_TYPE = "match_mapping_type"
private const val MAPPING = "mapping"
private const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis"
private lateinit var client: Client

private const val TYPE = "type"
private const val PROPERTIES = "properties"
private const val METADATA = "_meta"
private const val SCHEMA_VERSION = "schema_version"
private const val DYNAMIC_TEMPLATE = "dynamic_templates"
private const val MATCH_MAPPING_TYPE = "match_mapping_type"
private const val MAPPING = "mapping"
private const val DEFAULT_DATE_FORMAT = "strict_date_optional_time||epoch_millis"

fun initialize(client: Client) {
this.client = client
}

suspend fun getTargetMappingsForDates(transform: Transform): Map<String, Any> {
val sourceIndex = transform.sourceIndex
try {
Expand All @@ -47,12 +52,12 @@ class TargetIndexMappingService(private val client: Client) {

val sourceIndexMapping = result.mappings[sourceIndex]?.sourceAsMap

val dateFieldMappings = mutableMapOf<String, Any>()
val targetIndexDateFieldMappings = mutableMapOf<String, Any>()
if (!sourceIndexMapping.isNullOrEmpty()) {
mapDateTermAggregation(transform, sourceIndexMapping, dateFieldMappings)
mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, dateFieldMappings, null)
mapDateTermAggregation(transform, sourceIndexMapping, targetIndexDateFieldMappings)
mapDateAggregation(transform.aggregations.aggregatorFactories, sourceIndexMapping, targetIndexDateFieldMappings, null)
}
return dateFieldMappings
return targetIndexDateFieldMappings
} catch (ex: IndexNotFoundException) {
logger.error("Index $sourceIndex doesn't exist")
return emptyMap()
Expand Down Expand Up @@ -124,7 +129,6 @@ class TargetIndexMappingService(private val client: Client) {
}
}

@Suppress("UNCHECKED_CAST")
private fun mapDateAggregation(
aggBuilders: Collection<AggregationBuilder>,
sourceIndexMapping: Map<String, Any>,
Expand All @@ -136,6 +140,7 @@ class TargetIndexMappingService(private val client: Client) {

val aggBuilder = iterator.next()
val targetIdxFieldName = aggBuilder.name
val fullPath = parentPath?.plus(".")?.plus(targetIdxFieldName) ?: targetIdxFieldName
// In the case of a date field used in aggregation - MIN, MAX or COUNT
if (aggBuilder is ValuesSourceAggregationBuilder<*>) {
val sourceIdxFieldName = aggBuilder.field()
Expand All @@ -145,15 +150,14 @@ class TargetIndexMappingService(private val client: Client) {
if (!sourceFieldType.isNullOrEmpty() && sourceFieldType[TYPE] == "date") {
val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT)
// In the case if sub-aggregation exist
val fullPath = parentPath?.plus(".")?.plus(targetIndexMapping) ?: targetIdxFieldName
targetIndexMapping[fullPath] = dateTypeTargetMapping
}
}
if (aggBuilder.subAggregations.isNullOrEmpty()) {
continue
}
// Do the same for all sub-aggregations
mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, targetIdxFieldName)
mapDateAggregation(aggBuilder.subAggregations, sourceIndexMapping, targetIndexMapping, fullPath)
}
}
private fun isFieldInMappings(fieldName: String, mappings: Map<*, *>) = IndexUtils.getFieldFromMappings(fieldName, mappings) != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import org.opensearch.transport.RemoteTransportException
class TransformIndexer(
settings: Settings,
private val clusterService: ClusterService,
private val client: Client,
private val targetIndexMappingService: TargetIndexMappingService
private val client: Client
) {

private val logger = LogManager.getLogger(javaClass)
Expand All @@ -57,7 +56,7 @@ class TransformIndexer(
private suspend fun createTargetIndex(transform: Transform, targetFieldMappings: Map<String, Any>) {
val index = transform.targetIndex
if (!clusterService.state().routingTable.hasIndex(index)) {
val transformTargetIndexMapping = targetIndexMappingService.createTargetIndexMapping(targetFieldMappings)
val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings)
val request = CreateIndexRequest(index).mapping(transformTargetIndexMapping)
// TODO: Read in the actual mappings from the source index and use that
val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
Expand All @@ -78,7 +77,7 @@ class TransformIndexer(
val targetIndex = updatableDocsToIndex.first().index()
logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")

createTargetIndex(transform, transformContext.getMappedTargetDateFields())
createTargetIndex(transform, transformContext.getTargetIndexDateFieldMappings())
backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(updatableDocsToIndex)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object TransformRunner :
private lateinit var transformSearchService: TransformSearchService
private lateinit var transformIndexer: TransformIndexer
private lateinit var transformValidator: TransformValidator
private lateinit var targetIndexMappingService: TargetIndexMappingService
private lateinit var threadPool: ThreadPool

fun initialize(
Expand All @@ -67,12 +66,11 @@ object TransformRunner :
): TransformRunner {
this.clusterService = clusterService
this.client = client
this.targetIndexMappingService = TargetIndexMappingService(client)
this.xContentRegistry = xContentRegistry
this.settings = settings
this.transformSearchService = TransformSearchService(settings, clusterService, client)
this.transformMetadataService = TransformMetadataService(client, xContentRegistry)
this.transformIndexer = TransformIndexer(settings, clusterService, client, targetIndexMappingService)
this.transformIndexer = TransformIndexer(settings, clusterService, client)
this.transformValidator = TransformValidator(indexNameExpressionResolver, clusterService, client, settings, jvmService)

this.threadPool = threadPool
Expand Down Expand Up @@ -112,10 +110,10 @@ object TransformRunner :
var bucketsToTransform = BucketsToTransform(HashSet(), metadata)

// If date was used in term query generate target date field mapping and store it in transform context
val targetDateFieldMapping = targetIndexMappingService.getTargetMappingsForDates(transform)
val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform)
val transformContext = TransformContext(
TransformLockManager(transform, context),
targetDateFieldMapping
targetIndexDateFieldMappings
)

// Acquires the lock if there is no running job execution for the given transform; Lock is acquired per transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.indexmanagement.transform

import formatMillis
import org.opensearch.indexmanagement.transform.util.formatMillis
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
Expand Down Expand Up @@ -211,7 +211,7 @@ class TransformSearchService(
transform,
searchResponse,
modifiedBuckets = modifiedBuckets,
mappedTargetDateFields = transformContext.getMappedTargetDateFields()
targetIndexDateFieldMappings = transformContext.getTargetIndexDateFieldMappings()
)
} catch (e: TransformSearchServiceException) {
throw e
Expand Down Expand Up @@ -340,7 +340,7 @@ class TransformSearchService(
searchResponse: SearchResponse,
waterMarkDocuments: Boolean = true,
modifiedBuckets: MutableSet<Map<String, Any>>? = null,
mappedTargetDateFields: Map<String, Any>,
targetIndexDateFieldMappings: Map<String, Any>,
): TransformSearchResult {
val aggs = searchResponse.aggregations.get(transform.id) as CompositeAggregation
val buckets = if (modifiedBuckets != null) aggs.buckets.filter { modifiedBuckets.contains(it.key) } else aggs.buckets
Expand All @@ -358,14 +358,14 @@ class TransformSearchService(
aggregatedBucket.key.entries.forEach { bucket ->
// Check if the date is used as a term (exists in conversion list) - if it is,
// convert the date (which is in epoch time millis format to ISO 8601)
if (mappedTargetDateFields.isNullOrEmpty() || !mappedTargetDateFields.containsKey(bucket.key)) {
if (targetIndexDateFieldMappings.isNullOrEmpty() || !targetIndexDateFieldMappings.containsKey(bucket.key)) {
document[bucket.key] = bucket.value
} else {
document[bucket.key] = formatMillis(bucket.value as Long)
}
}
aggregatedBucket.aggregations.forEach { aggregation ->
document[aggregation.name] = getAggregationValue(aggregation, mappedTargetDateFields)
document[aggregation.name] = getAggregationValue(aggregation, targetIndexDateFieldMappings)
}

val indexRequest = IndexRequest(transform.targetIndex)
Expand All @@ -387,11 +387,11 @@ class TransformSearchService(
return BucketSearchResult(modifiedBuckets, aggs.afterKey(), searchResponse.took.millis)
}

private fun getAggregationValue(aggregation: Aggregation, mappedTargetDateFields: Map<String, Any>): Any {
private fun getAggregationValue(aggregation: Aggregation, targetIndexDateFieldMappings: Map<String, Any>): Any {
return when (aggregation) {
is InternalSum, is InternalMin, is InternalMax, is InternalAvg, is InternalValueCount -> {
val agg = aggregation as NumericMetricsAggregation.SingleValue
if (mappedTargetDateFields.isEmpty() || !mappedTargetDateFields.containsKey(agg.name)) {
if (targetIndexDateFieldMappings.isEmpty() || !targetIndexDateFieldMappings.containsKey(agg.name)) {
agg.value()
} else {
formatMillis(agg.value().toLong())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.transform.action.preview

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
Expand All @@ -22,6 +25,8 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.commons.ConfigConstants
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.transform.TargetIndexMappingService
import org.opensearch.indexmanagement.transform.TransformSearchService
import org.opensearch.indexmanagement.transform.TransformValidator
import org.opensearch.indexmanagement.transform.model.Transform
Expand Down Expand Up @@ -69,7 +74,9 @@ class TransportPreviewTransformAction @Inject constructor(
return
}
val searchRequest = TransformSearchService.getSearchServiceRequest(transform = transform, pageSize = 10)
executeSearch(searchRequest, transform, listener)
CoroutineScope(Dispatchers.IO).launch {
executeSearch(searchRequest, transform, listener)
}
}

override fun onFailure(e: Exception) {
Expand All @@ -87,31 +94,31 @@ class TransportPreviewTransformAction @Inject constructor(

return issues
}
fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener<PreviewTransformResponse>) {
client.search(
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
try {
val transformSearchResult = TransformSearchService.convertResponse(
transform = transform, searchResponse = response, waterMarkDocuments = false,
mappedTargetDateFields = emptyMap()
)
val formattedResult = transformSearchResult.docsToIndex.map {
it.sourceAsMap()
}
listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK))
} catch (e: Exception) {
listener.onFailure(
OpenSearchStatusException(
"Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e)
)
)
}
}
suspend fun executeSearch(searchRequest: SearchRequest, transform: Transform, listener: ActionListener<PreviewTransformResponse>) {
val response = try {
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
searchResponse
} catch (e: Exception) {
listener.onFailure(e)
return
}

override fun onFailure(e: Exception) = listener.onFailure(e)
try {
val targetIndexDateFieldMappings = TargetIndexMappingService.getTargetMappingsForDates(transform)
val transformSearchResult = TransformSearchService.convertResponse(
transform = transform, searchResponse = response, waterMarkDocuments = false,
targetIndexDateFieldMappings = targetIndexDateFieldMappings
)
val formattedResult = transformSearchResult.docsToIndex.map {
it.sourceAsMap()
}
)
listener.onResponse(PreviewTransformResponse(formattedResult, RestStatus.OK))
} catch (e: Exception) {
listener.onFailure(
OpenSearchStatusException(
"Failed to parse the transformed results", RestStatus.INTERNAL_SERVER_ERROR, ExceptionsHelper.unwrapCause(e)
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package org.opensearch.indexmanagement.transform.util
*/
class TransformContext(
val transformLockManager: TransformLockManager,
val targetDateFieldMapping: Map<String, Any>,
val targetDateFieldMappings: Map<String, Any>,
var lastSuccessfulPageSize: Int? = null,
) {
fun getMaxRequestTimeoutInSeconds(): Long? {
Expand All @@ -23,7 +23,7 @@ class TransformContext(
return maxRequestTimeout
}

fun getMappedTargetDateFields() = targetDateFieldMapping
fun getTargetIndexDateFieldMappings() = targetDateFieldMappings

suspend fun renewLockForLongSearch(timeSpentOnSearch: Long) {
transformLockManager.renewLockForLongSearch(timeSpentOnSearch)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import org.opensearch.common.time.DateFormatter
import java.time.ZoneId

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.transform.util

import org.opensearch.common.time.DateFormatter
import java.time.ZoneId

private const val DATE_FORMAT = "uuuu-MM-dd'T'HH:mm:ss.SSSZZ"
private val timezone: ZoneId = ZoneId.of("UTC")
private val dateFormatter = DateFormatter.forPattern(DATE_FORMAT).withZone(timezone)
Expand Down
Loading

0 comments on commit d630fe9

Please sign in to comment.