Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordering of patches with cycles during upload #2524

Merged
merged 25 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
05873b1
Ordering of patches with cyclic during upload
aditya-07 Apr 23, 2024
c9e01ea
Ignored failing test
aditya-07 Apr 23, 2024
ed07d2f
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 14, 2024
9354c15
Refactored and added tests
aditya-07 May 16, 2024
fb10a0f
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 16, 2024
6575fae
Refactored
aditya-07 May 16, 2024
9a9f79d
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 27, 2024
471e5ef
Updated code to use strongly connected resources
aditya-07 May 30, 2024
d7524d6
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 May 30, 2024
e796958
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 3, 2024
500e7e8
Review comments: Seperated the code for finding ssc. Added some tets …
aditya-07 Jun 5, 2024
76c4840
Refactored code and added tests.
aditya-07 Jun 6, 2024
d346f0e
Refactored code and added tests.
aditya-07 Jun 6, 2024
4fc3e35
Updated StronglyConnectedPatches to compute node size
aditya-07 Jun 6, 2024
0100dca
Added comments
aditya-07 Jun 6, 2024
b3d87c5
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 6, 2024
46fd7b4
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 6, 2024
7415b62
Updated tarjan
aditya-07 Jun 10, 2024
190fda2
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 10, 2024
31d8098
Review comments: Updated docs
aditya-07 Jun 11, 2024
e2d95fa
Merge branch 'master' into ak/patchOrderWithCycles
aditya-07 Jun 11, 2024
bfcd0c9
Review comments: refactored code and renamed some classes
aditya-07 Jun 13, 2024
39c0173
Refactor
aditya-07 Jun 13, 2024
0de1b7a
Updated the code to use arrays
aditya-07 Jun 13, 2024
e320d00
Fixed failing test
aditya-07 Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.google.android.fhir.LocalChange
import com.google.android.fhir.db.Database

/**
* Generates [Patch]es from [LocalChange]s and output [List<[PatchMapping]>] to keep a mapping of
* Generates [Patch]es from [LocalChange]s and output [List<[OrderedMapping]>] to keep a mapping of
* the [LocalChange]s to their corresponding generated [Patch]
*
* INTERNAL ONLY. This interface should NEVER been exposed as an external API because it works
Expand All @@ -35,7 +35,7 @@ internal interface PatchGenerator {
* NOTE: different implementations may have requirements on the size of [localChanges] and output
* certain numbers of [Patch]es.
*/
suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping>
suspend fun generate(localChanges: List<LocalChange>): List<OrderedMapping>
}

internal object PatchGeneratorFactory {
Expand Down Expand Up @@ -67,3 +67,20 @@ internal data class PatchMapping(
val localChanges: List<LocalChange>,
val generatedPatch: Patch,
)

/** Structure to help describe the cyclic nature of ordered [PatchMapping]. */
internal sealed interface OrderedMapping {
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved

/**
* [IndividualMapping] doesn't have a cyclic dependency on other [IndividualMapping] /
* [PatchMapping]. It may however have an acyclic dependency on other [IndividualMapping]s /
* [PatchMapping]s.
*/
data class IndividualMapping(val patchMapping: PatchMapping) : OrderedMapping

/**
* [CombinedMapping] contains weakly connected [PatchMapping]s where one or more [PatchMapping]s
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
* have a cyclic dependency between each other.
*/
data class CombinedMapping(val patchMappings: List<PatchMapping>) : OrderedMapping
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package com.google.android.fhir.sync.upload.patch
import androidx.annotation.VisibleForTesting
import com.google.android.fhir.db.Database
import com.google.android.fhir.db.LocalChangeResourceReference
import kotlin.math.min

private typealias Node = String
typealias Node = String

typealias Graph = Map<Node, List<Node>>
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Orders the [PatchMapping]s to maintain referential integrity during upload.
Expand Down Expand Up @@ -53,15 +56,16 @@ internal object PatchOrdering {
* {D} (UPDATE), then B,C needs to go before the resource A so that referential integrity is
* retained. Order of D shouldn't matter for the purpose of referential integrity.
*
* @return - A ordered list of the [PatchMapping]s based on the references to other [PatchMapping]
* if the mappings are acyclic
* - throws [IllegalStateException] otherwise
* @return A ordered list of the [OrderedMapping] containing:
* - [OrderedMapping.IndividualMapping] for the [PatchMapping] based on the references to other
* [PatchMapping] if the mappings are acyclic
* - [OrderedMapping.CombinedMapping] for [PatchMapping]s based on the references to other
* [PatchMapping]s if the mappings are cyclic.
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
*/
suspend fun List<PatchMapping>.orderByReferences(
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
database: Database,
): List<PatchMapping> {
): List<OrderedMapping> {
val resourceIdToPatchMapping = associateBy { patchMapping -> patchMapping.resourceTypeAndId }

/* Get LocalChangeResourceReferences for all the local changes. A single LocalChange may have
multiple LocalChangeResourceReference, one for each resource reference in the
LocalChange.payload.*/
Expand All @@ -71,7 +75,20 @@ internal object PatchOrdering {
.groupBy { it.localChangeId }

val adjacencyList = createAdjacencyListForCreateReferences(localChangeIdToResourceReferenceMap)
return createTopologicalOrderedList(adjacencyList).mapNotNull { resourceIdToPatchMapping[it] }
val stronglyConnected: List<List<Node>> =
stronglyConnectedComponents(adjacencyList, resourceIdToPatchMapping.size)
val mapping = reduceToSuperNode(stronglyConnected)
val updatedGraph = transformGraph(adjacencyList, mapping)
val orderedNodes = createTopologicalOrderedList(updatedGraph)
val orderedStronglyConnected = superNodeToSSC(orderedNodes, mapping.first)
return orderedStronglyConnected.map {
val mappings = it.mapNotNull { resourceIdToPatchMapping[it] }
if (mappings.size == 1) {
OrderedMapping.IndividualMapping(mappings.first())
} else {
OrderedMapping.CombinedMapping(mappings)
}
}
}

/**
Expand Down Expand Up @@ -139,4 +156,138 @@ internal object PatchOrdering {
adjacencyList.keys.forEach { dfs(it) }
return stack.reversed()
}

private fun stronglyConnectedComponents(diGraph: Graph, nodesCount: Int): List<List<Node>> {
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
return sscTarzan(diGraph, nodesCount)
}

private fun reduceToSuperNode(
ssc: List<List<Node>>,
): Pair<Map<Node, List<Node>>, Map<Node, Node>> {
val superNodesMap = mutableMapOf<Node, List<Node>>()
val nodeToSuperNode = mutableMapOf<Node, String>()

var counter = 0
ssc.forEach {
superNodesMap[(++counter).toString()] = it
it.forEach { nodeToSuperNode[it] = (counter).toString() }
}

return superNodesMap to nodeToSuperNode
}

private fun transformGraph(
oldGraph: Graph,
nodeMapping: Pair<Map<Node, List<Node>>, Map<Node, Node>>,
): Graph {
val newGraph = mutableMapOf<Node, List<Node>>()
val (superNodeToNodes, nodeToSuperNode) = nodeMapping
// Remove any cyclic dependency from connected components.
// reduce them to a single node
// replace the ssc nodes with super node
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
oldGraph.forEach {
// println(it.key)
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
val superNode = nodeToSuperNode[it.key]!!

if (newGraph.containsKey(superNode)) {
newGraph[superNode] =
(newGraph[superNode]!! + it.value.mapNotNull { nodeToSuperNode[it] })
.distinct()
.filterNot { superNode == it }
} else {
newGraph[superNode] = it.value.mapNotNull { nodeToSuperNode[it] }
}
}

return newGraph
}

private fun superNodeToSSC(
orderedNodes: List<Node>,
mapping: Map<Node, List<Node>>,
): List<List<Node>> {
return orderedNodes.mapNotNull { mapping[it] }
}

private fun sscTarzan(diGraph: Graph, nodesCount: Int): List<List<Node>> {
// Code inspired from
// https://github.com/williamfiset/Algorithms/blob/master/src/main/java/com/williamfiset/algorithms/graphtheory/TarjanSccSolverAdjacencyList.java
var counter = -1
val intToNode = mutableMapOf<Int, String>()
val nodeToInt = mutableMapOf<String, Int>()

val graph = MutableList<MutableList<Int>>(nodesCount) { mutableListOf() }

diGraph.forEach { (key, value) ->
val intForKey =
nodeToInt[key]
?: let {
intToNode[++counter] = key
nodeToInt[key] = counter
counter
}

value.forEach { node ->
val intForValue =
nodeToInt[node]
?: let {
intToNode[++counter] = node
nodeToInt[node] = counter
counter
}
graph[intForKey].add(intForValue)
}
}

var id = 0
var sccCount = 0
val visited = BooleanArray(graph.size)
val ids = IntArray(graph.size) { -1 }
val low = IntArray(graph.size)
val sccs = IntArray(graph.size)
val stack = ArrayDeque<Int>()

fun dfs(at: Int) {
low[at] = id++
ids[at] = low[at]
stack.addFirst(at)
visited[at] = true
for (to in graph[at]) {
if (ids[to] == -1) {
dfs(to)
}
if (visited[to]) {
low[at] = min(low[at], low[to])
}
}

// On recursive callback, if we're at the root node (start of SCC)
// empty the seen stack until back to root.
if (ids[at] == low[at]) {
var node: Int = stack.removeFirst()
while (true) {
visited[node] = false
sccs[node] = sccCount
if (node == at) break
node = stack.removeFirst()
}
sccCount++
}
}

for (i in 0 until nodesCount) {
if (ids[i] == -1) {
dfs(i)
}
}

val lowLinkToConnectedMap = mutableMapOf<Int, MutableList<Int>>()
for (i in 0 until nodesCount) {
if (!lowLinkToConnectedMap.containsKey(sccs[i])) {
lowLinkToConnectedMap[sccs[i]] = mutableListOf()
}
lowLinkToConnectedMap[sccs[i]]!!.add(i)
}
return lowLinkToConnectedMap.values.map { it.mapNotNull { intToNode[it] } }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ import com.google.android.fhir.LocalChange
* maintain an audit trail.
*/
internal object PerChangePatchGenerator : PatchGenerator {
override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> =
localChanges.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
override suspend fun generate(localChanges: List<LocalChange>): List<OrderedMapping> =
localChanges
.map {
PatchMapping(
localChanges = listOf(it),
generatedPatch =
Patch(
resourceType = it.resourceType,
resourceId = it.resourceId,
versionId = it.versionId,
timestamp = it.timestamp,
type = it.type.toPatchType(),
payload = it.payload,
),
)
}
.map { OrderedMapping.IndividualMapping(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.google.android.fhir.sync.upload.patch.PatchOrdering.orderByReferences
internal class PerResourcePatchGenerator private constructor(val database: Database) :
PatchGenerator {

override suspend fun generate(localChanges: List<LocalChange>): List<PatchMapping> {
override suspend fun generate(localChanges: List<LocalChange>): List<OrderedMapping> {
return generateSquashedChangesMapping(localChanges).orderByReferences(database)
}

Expand Down
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Google LLC
* Copyright 2023-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.google.android.fhir.sync.upload.request

import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.OrderedMapping
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import org.hl7.fhir.r4.model.Bundle
Expand All @@ -29,10 +30,52 @@ internal class TransactionBundleGenerator(
(patch: Patch, useETagForUpload: Boolean) -> BundleEntryComponentGenerator,
) : UploadRequestGenerator {

/**
* In order to accommodate cyclic dependencies between [PatchMapping]s and maintain referential
* integrity on the server, the [PatchMapping]s in a [OrderedMapping.CombinedMapping] are all put
* in a single [BundleUploadRequestMapping]. Based on the [generatedBundleSize], the remaining
* space of the [BundleUploadRequestMapping] maybe filled with other
* [OrderedMapping.CombinedMapping] or [OrderedMapping.IndividualMapping] mappings.
*
* In case a single [OrderedMapping.CombinedMapping] has more [PatchMapping]s than the
* [generatedBundleSize], [generatedBundleSize] will be ignored so that all of the dependent
* mappings in [OrderedMapping.CombinedMapping] can be sent in a single [Bundle].
*
* **NOTE: The order of the [OrderedMapping.IndividualMapping] is always maintained and the order
* of [OrderedMapping.CombinedMapping] doesn't matter since it contain all the required
* [PatchMapping] inside the same [Bundle].**
*/
override fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<OrderedMapping>,
): List<BundleUploadRequestMapping> {
return mappedPatches.chunked(generatedBundleSize).map { patchList ->
val mappingsPerBundle = mutableListOf<List<PatchMapping>>()

var bundle = mutableListOf<PatchMapping>()
mappedPatches.forEach {
when (it) {
is OrderedMapping.IndividualMapping -> {
if (bundle.size < generatedBundleSize) {
bundle.add(it.patchMapping)
} else {
mappingsPerBundle.add(bundle)
bundle = mutableListOf(it.patchMapping)
}
}
is OrderedMapping.CombinedMapping -> {
if ((bundle.size + it.patchMappings.size) <= generatedBundleSize) {
bundle.addAll(it.patchMappings)
} else {
mappingsPerBundle.add(bundle)
bundle = mutableListOf()
bundle.addAll(it.patchMappings)
}
}
}
}

if (bundle.isNotEmpty()) mappingsPerBundle.add(bundle)

return mappingsPerBundle.map { patchList ->
generateBundleRequest(patchList).let { mappedBundleRequest ->
BundleUploadRequestMapping(
splitLocalChanges = mappedBundleRequest.first,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
package com.google.android.fhir.sync.upload.request

import com.google.android.fhir.LocalChange
import com.google.android.fhir.sync.upload.patch.OrderedMapping
import com.google.android.fhir.sync.upload.patch.Patch
import com.google.android.fhir.sync.upload.patch.PatchMapping
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.codesystems.HttpVerb

/**
* Generator that generates [UploadRequest]s from the [Patch]es present in the
* [List<[PatchMapping]>]. Any implementation of this generator is expected to output
* [List<[OrderedMapping]>]. Any implementation of this generator is expected to output
* [List<[UploadRequestMapping]>] which maps [UploadRequest] to the corresponding [LocalChange]s it
* was generated from.
*/
internal interface UploadRequestGenerator {
/** Generates a list of [UploadRequestMapping] from the [PatchMapping]s */
fun generateUploadRequests(
mappedPatches: List<PatchMapping>,
mappedPatches: List<OrderedMapping>,
): List<UploadRequestMapping>
}

Expand Down
Loading
Loading