Skip to content

Commit

Permalink
Refactor csr logic to context source caller (#1311)
Browse files Browse the repository at this point in the history
* wip: refacto csr logic to context source caller

* fix: tests

* feat: add missing tests

* feat: add missing tests

* feat: renaming to DistributedEntityConsumptionService

* feat: renaming to DistributedEntityConsumptionService

* feat: rename variable to DistributedEntityConsumptionService

* feat: add timeout
  • Loading branch information
thomasBousselin authored Jan 15, 2025
1 parent 6924af3 commit 991c7bb
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 156 deletions.
4 changes: 2 additions & 2 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityEventService.kt$EntityEventService$private fun publishAttributeChangeEvent( sub: String?, tenantName: String, entityId: URI, entityTypesAndPayload: Pair&lt;List&lt;ExpandedTerm&gt;, String&gt;, attributeOperationResult: SucceededAttributeOperationResult )</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.FORMAT, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.FORMAT, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand All @@ -29,6 +27,8 @@
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityUri: URI, ngsiLdAttributes: List&lt;NgsiLdAttribute&gt;, expandedAttributes: ExpandedAttributes, disallowOverwrite: Boolean, createdAt: ZonedDateTime, sub: Sub? )</ID>
<ID>LongParameterList:TemporalEntityHandler.kt$TemporalEntityHandler$( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @PathVariable attrId: String, @PathVariable instanceId: URI, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters(notImplemented = [QP.LOCAL, QP.VIA]) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; )</ID>
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>MaxLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$fun</ID>
<ID>MaximumLineLength:DistributedEntityConsumptionServiceTests.kt$DistributedEntityConsumptionServiceTests$</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
<ID>SwallowedException:TemporalQueryUtils.kt$e: IllegalArgumentException</ID>
</CurrentIssues>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import arrow.core.getOrNone
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import arrow.core.separateEither
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.MiscellaneousPersistentWarning
import com.egm.stellio.search.csr.model.MiscellaneousWarning
import com.egm.stellio.search.csr.model.NGSILDWarning
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.RevalidationFailedWarning
import com.egm.stellio.search.entity.model.EntitiesQueryFromGet
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.queryparameter.QueryParameter
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
Expand All @@ -20,6 +25,7 @@ import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.stereotype.Service
import org.springframework.util.CollectionUtils
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.ClientResponse
Expand All @@ -30,10 +36,48 @@ import java.net.URI

typealias QueryEntitiesResponse = Pair<List<CompactedEntity>, Int?>

object ContextSourceCaller {
@Service
class DistributedEntityConsumptionService(
private val contextSourceRegistrationService: ContextSourceRegistrationService,
) {

val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun retrieveContextSourceEntity(
suspend fun distributeRetrieveEntityOperation(
id: URI,
httpHeaders: HttpHeaders,
queryParams: MultiValueMap<String, String>
): Pair<List<NGSILDWarning>, List<CompactedEntityWithCSR>> {
val csrFilters =
CSRFilters(
ids = setOf(id),
operations = listOf(
Operation.RETRIEVE_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
return matchingCSR.parMap { csr ->
val response = retrieveEntityFromContextSource(
httpHeaders,
csr,
id,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { it?.let { it to csr } }
}.separateEither()
.let { (warnings, maybeResponses) ->
warnings.toMutableList() to maybeResponses.filterNotNull()
}
}

suspend fun retrieveEntityFromContextSource(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
id: URI,
Expand All @@ -55,7 +99,45 @@ object ContextSourceCaller {
)
}

suspend fun queryContextSourceEntities(
suspend fun distributeQueryEntitiesOperation(
entitiesQuery: EntitiesQueryFromGet,
httpHeaders: HttpHeaders,
queryParams: MultiValueMap<String, String>
): Triple<List<NGSILDWarning>, List<CompactedEntitiesWithCSR>, List<Int?>> {
val csrFilters =
CSRFilters(
ids = entitiesQuery.ids,
idPattern = entitiesQuery.idPattern,
typeSelection = entitiesQuery.typeSelection,
operations = listOf(
Operation.QUERY_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

return matchingCSR.parMap { csr ->
val response = queryEntitiesFromContextSource(
httpHeaders,
csr,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { (entities, count) -> Triple(entities, csr, count) }
}.separateEither()
.let { (warnings, response) ->
Triple(
warnings.toMutableList(),
response.map { (entities, csr, _) -> entities to csr },
response.map { (_, _, counts) -> counts }
)
}
}

suspend fun queryEntitiesFromContextSource(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
params: MultiValueMap<String, String>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@ import arrow.core.getOrElse
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import arrow.core.separateEither
import arrow.fx.coroutines.parMap
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.addWarnings
import com.egm.stellio.search.csr.service.ContextSourceCaller
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.ContextSourceUtils
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
import com.egm.stellio.search.entity.service.LinkedEntityService
Expand Down Expand Up @@ -76,7 +71,7 @@ class EntityHandler(
private val applicationProperties: ApplicationProperties,
private val entityService: EntityService,
private val entityQueryService: EntityQueryService,
private val contextSourceRegistrationService: ContextSourceRegistrationService,
private val distributedEntityConsumptionService: DistributedEntityConsumptionService,
private val linkedEntityService: LinkedEntityService
) : BaseHandler() {

Expand Down Expand Up @@ -206,28 +201,9 @@ class EntityHandler(
val sub = getSubFromSecurityContext()

val contexts = getContextFromLinkHeaderOrDefault(httpHeaders, applicationProperties.contexts.core).bind()
val entitiesQuery = composeEntitiesQueryFromGet(
applicationProperties.pagination,
queryParams,
contexts
).bind()
val entitiesQuery = composeEntitiesQueryFromGet(applicationProperties.pagination, queryParams, contexts).bind()
.validateMinimalQueryEntitiesParameters().bind()

val csrFilters =
CSRFilters(
ids = entitiesQuery.ids,
idPattern = entitiesQuery.idPattern,
typeSelection = entitiesQuery.typeSelection,
operations = listOf(
Operation.QUERY_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

val (entities, localCount) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind()

val filteredEntities = entities.filterAttributes(entitiesQuery.attrs, entitiesQuery.datasetId)
Expand All @@ -237,31 +213,21 @@ class EntityHandler(
linkedEntityService.processLinkedEntities(it, entitiesQuery, sub.getOrNull()).bind()
}

val (warnings, remoteEntitiesWithCSR, remoteCounts) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.queryContextSourceEntities(
val (queryWarnings, remoteEntitiesWithCSR, remoteCounts) =
distributedEntityConsumptionService.distributeQueryEntitiesOperation(
entitiesQuery,
httpHeaders,
csr,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { (entities, count) -> Triple(entities, csr, count) }
}.separateEither()
.let { (warnings, response) ->
Triple(
warnings.toMutableList(),
response.map { (entities, csr, _) -> entities to csr },
response.map { (_, _, counts) -> counts }
)
}

val maxCount = (remoteCounts + localCount).maxBy { it ?: 0 } ?: 0

val mergedEntities = ContextSourceUtils.mergeEntitiesLists(
val (warnings, mergedEntities) = ContextSourceUtils.mergeEntitiesLists(
localEntities,
remoteEntitiesWithCSR
).toPair().let { (mergeWarnings, mergedEntities) ->
mergeWarnings?.let { warnings.addAll(it) }
mergedEntities ?: emptyList()
val warnings = mergeWarnings?.let { queryWarnings + it } ?: queryWarnings
warnings to (mergedEntities ?: emptyList())
}

val ngsiLdDataRepresentation = parseRepresentations(queryParams, mediaType).bind()
Expand Down Expand Up @@ -305,19 +271,6 @@ class EntityHandler(
contexts
).bind()

val csrFilters =
CSRFilters(
ids = setOf(entityId),
operations = listOf(
Operation.RETRIEVE_ENTITY,
Operation.FEDERATION_OPS,
Operation.RETRIEVE_OPS,
Operation.REDIRECTION_OPS
)
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters)

val localEntity = either {
val expandedEntity = entityQueryService.queryEntity(entityId, sub.getOrNull()).bind()
expandedEntity.checkContainsAnyOf(entitiesQuery.attrs).bind()
Expand All @@ -328,20 +281,11 @@ class EntityHandler(
compactEntity(filteredExpandedEntity, contexts)
}

// we can add parMap(concurrency = X) if this trigger too much http connexion at the same time
val (warnings, remoteEntitiesWithCSR) = matchingCSR.parMap { csr ->
val response = ContextSourceCaller.retrieveContextSourceEntity(
httpHeaders,
csr,
entityId,
queryParams
)
contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight())
response.map { it?.let { it to csr } }
}.separateEither()
.let { (warnings, maybeResponses) ->
warnings.toMutableList() to maybeResponses.filterNotNull()
}
val (warnings, remoteEntitiesWithCSR) = distributedEntityConsumptionService.distributeRetrieveEntityOperation(
entityId,
httpHeaders,
queryParams
).let { (warnings, it) -> warnings.toMutableList() to it }

val (mergeWarnings, mergedEntity) = ContextSourceUtils.mergeEntities(
localEntity.getOrNull(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.egm.stellio.search.authorization.web

import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.csr.service.ContextSourceRegistrationService
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.entity.service.EntityEventService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
Expand Down Expand Up @@ -42,7 +42,7 @@ class AnonymousUserHandlerTests {
private lateinit var entityQueryService: EntityQueryService

@MockkBean
private lateinit var contextSourceRegistrationService: ContextSourceRegistrationService
private lateinit var distributedEntityConsumptionService: DistributedEntityConsumptionService

@MockkBean(relaxed = true)
private lateinit var linkedEntityService: LinkedEntityService
Expand Down
Loading

0 comments on commit 991c7bb

Please sign in to comment.