Skip to content

Commit

Permalink
Addressed the comments
Browse files Browse the repository at this point in the history
Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbuzejic committed May 11, 2023
1 parent 1117ec6 commit a755e51
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.client.Client
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.opensearchapi.string
Expand Down Expand Up @@ -75,15 +75,15 @@ object TargetIndexMappingService {
}
val sourceFieldType = IndexUtils.getFieldFromMappings(dimension.sourceField, sourceIndexMapping)
// Consider only date fields as relevant for building the target index mapping
if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && sourceFieldType[TYPE] == "date") {
if (dimension !is DateHistogram && sourceFieldType?.get(TYPE) != null && (sourceFieldType[TYPE] == "date" || sourceFieldType[TYPE] == "date_nanos")) {
// Taking the source field settings (type, format etc.)
val dateTypeTargetMapping = mapOf("type" to "date", "format" to DEFAULT_DATE_FORMAT)
val dateTypeTargetMapping = mapOf("type" to sourceFieldType[TYPE], "format" to DEFAULT_DATE_FORMAT)
dateFieldMappings[dimension.targetField] = dateTypeTargetMapping
}
}
}

fun createTargetIndexMapping(dateCompositeAggregations: Map<String, Any>): String {
fun createTargetIndexMapping(dateFieldMappings: Map<String, Any>): String {
// Build static properties
val builder = XContentFactory.jsonBuilder().startObject()
.startObject(METADATA)
Expand All @@ -102,7 +102,7 @@ object TargetIndexMappingService {
.startObject(PROPERTIES)

// Dynamically build composite aggregation mapping
mapCompositeAggregation(dateCompositeAggregations, builder)
mapCompositeAggregation(dateFieldMappings, builder)

// Close the object and return as a string
return builder.endObject()
Expand Down Expand Up @@ -133,7 +133,7 @@ object TargetIndexMappingService {
aggBuilders: Collection<AggregationBuilder>,
sourceIndexMapping: Map<String, Any>,
targetIndexMapping: MutableMap<String, Any>,
parentPath: String?
parentPath: String?,
) {
val iterator = aggBuilders.iterator()
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ class TransformIndexer(
}
}

private suspend fun createTargetIndex(transform: Transform, targetFieldMappings: Map<String, Any>) {
val index = transform.targetIndex
if (!clusterService.state().routingTable.hasIndex(index)) {
private suspend fun createTargetIndex(targetIndex: String, targetFieldMappings: Map<String, Any>) {
if (!clusterService.state().routingTable.hasIndex(targetIndex)) {
val transformTargetIndexMapping = TargetIndexMappingService.createTargetIndexMapping(targetFieldMappings)
val request = CreateIndexRequest(index).mapping(transformTargetIndexMapping)
val request = CreateIndexRequest(targetIndex).mapping(transformTargetIndexMapping)
// TODO: Read in the actual mappings from the source index and use that
val response: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
if (!response.isAcknowledged) {
logger.error("Failed to create the target index $index")
logger.error("Failed to create the target index $targetIndex")
throw TransformIndexException("Failed to create the target index")
}
}
Expand All @@ -77,7 +76,7 @@ class TransformIndexer(
val targetIndex = updatableDocsToIndex.first().index()
logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")

createTargetIndex(transform, transformContext.getTargetIndexDateFieldMappings())
createTargetIndex(transform.targetIndex, transformContext.getTargetIndexDateFieldMappings())
backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(updatableDocsToIndex)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,26 @@ class TransformRunnerIT : TransformRestTestCase() {
assertTrue("Target transform index was not created", indexExists(transform.targetIndex))
}

Thread.sleep(60000)

waitFor {
waitFor(Instant.ofEpochSecond(60)) {
val transformJob = getTransform(transformId = transform.id)
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)
}

val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping")
val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping")
val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping")
val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping")
val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map<String, Map<String, Any>>

val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]
val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]
val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]
val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)["tpep_pickup_datetime"] as Map<String, Any>)["type"]

assertEquals(sourcePickupDate, targetPickupDate)
assertEquals(sourcePickupDate, targetPickupDate)

val pickupDateTimeTerm = "pickupDateTerm14"
val pickupDateTimeTerm = "pickupDateTerm14"

val request = """
val request = """
{
"size": 0,
"aggs": {
Expand All @@ -354,24 +353,23 @@ class TransformRunnerIT : TransformRestTestCase() {
}
}
}
""".trimIndent()
"""

var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)

var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(transformRes.restStatus() == RestStatus.OK)
var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(transformRes.restStatus() == RestStatus.OK)

val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!
val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!
val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!
val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map<String, Map<String, List<Map<String, Map<String, Any>>>>>)[pickupDateTimeTerm]!!["buckets"]!!

assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size)
assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size)

// Verify the values of keys and metrics in all buckets
for (i in rawAggBuckets.indices) {
assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"])
assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"])
}
// Verify the values of keys and metrics in all buckets
for (i in rawAggBuckets.indices) {
assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"])
assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"])
}
}

Expand Down Expand Up @@ -418,20 +416,21 @@ class TransformRunnerIT : TransformRestTestCase() {
assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId)
val transformMetadata = getTransformMetadata(transformJob.metadataId!!)
assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status)
}

val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping")
val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping")
val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val sourceIndexMapping = client().makeRequest("GET", "/$sourceIdxTestName/_mapping")
val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map<String, Map<String, Any>>
val targetIndexMapping = client().makeRequest("GET", "/$targetIdxTestName/_mapping")
val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map<String, Map<String, Any>>

val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)[pickupDateTime] as Map<String, Any>)["type"]
val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)[pickupDateTime] as Map<String, Any>)["type"]
val sourcePickupDate = (((sourceIndexParserMap[sourceIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)[pickupDateTime] as Map<String, Any>)["type"]
val targetPickupDate = (((targetIndexParserMap[targetIdxTestName]?.get("mappings") as Map<String, Any>)["properties"] as Map<String, Any>)[pickupDateTime] as Map<String, Any>)["type"]

assertEquals("date", targetPickupDate)
assertEquals(sourcePickupDate, targetPickupDate)
assertEquals("date", targetPickupDate)
assertEquals(sourcePickupDate, targetPickupDate)

waitFor(Instant.ofEpochSecond(30)) {
val storeAndForwardTerm = "storeAndForwardTerm"

val request = """
{
"size": 0,
Expand All @@ -447,7 +446,7 @@ class TransformRunnerIT : TransformRestTestCase() {
}
}
}
""".trimIndent()
"""

var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
Expand Down

0 comments on commit a755e51

Please sign in to comment.