diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..43acdc97 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,2 @@ +# Datasource local storage ignored files +/dataSources/ diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml deleted file mode 100644 index b9191652..00000000 --- a/.idea/codeStyles/Project.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - - \ No newline at end of file diff --git a/build.gradle b/build.gradle index 5e806e4d..93614188 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ buildscript { corda_release_group = 'net.corda' corda_release_version = '4.3' tokens_release_group = "com.r3.corda.lib.tokens" - tokens_release_version = "1.2.4-SNAPSHOT" + tokens_release_version = "1.2.5-ASYNC-LOADING-SNAPSHOT" corda_gradle_plugins_version = '5.0.12' kotlin_version = '1.2.71' junit_version = '4.12' diff --git a/contracts/build.gradle b/contracts/build.gradle index 21e20c66..a75354a3 100644 --- a/contracts/build.gradle +++ b/contracts/build.gradle @@ -47,7 +47,14 @@ cordapp { versionId 2 } signing { - enabled false + enabled true + options { + keystore "./signing-infra/keyStore.jks" + alias "cordapp_signing" + storepass "monkey" + keypass "monkey" + storetype "PKCS12" + } } } diff --git a/contracts/signing-infra/keyStore.jks b/contracts/signing-infra/keyStore.jks new file mode 100644 index 00000000..8542d6db Binary files /dev/null and b/contracts/signing-infra/keyStore.jks differ diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle index 8c31e3bd..3868afbc 100644 --- a/freighter-tests/build.gradle +++ b/freighter-tests/build.gradle @@ -26,8 +26,13 @@ task freighterTest(type: Test, dependsOn: [project(":workflows").jar]) { classpath = sourceSets.freighterTest.runtimeClasspath useJUnitPlatform { includeTags "DOCKER" - excludeTags "AZURE", "FULL_LINUX_KERNEL", "ORACLE" + excludeTags "AZURE", "FULL_LINUX_KERNEL", "ORACLE", "LARGE_TEST" } + + testLogging { + showStandardStreams = true + } + } configurations { @@ -37,10 +42,11 @@ configurations { dependencies { freighterTestCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" - freighterTestCompile "freighter:freighter-testing-core-junit5:0.7.3-TEST-SNAPSHOT" + freighterTestCompile "freighter:freighter-testing-core-junit5:0.9.0-SNAPSHOT" freighterTestCompile project(":contracts") freighterTestCompile project(":workflows") + freighterTestCompile project(":workflows-integration-test") } diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt new file mode 100644 index 00000000..0f4145a2 --- /dev/null +++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt @@ -0,0 +1,170 @@ +package freighter.testing + +import com.r3.corda.lib.tokens.contracts.states.FungibleToken +import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType +import com.r3.corda.lib.tokens.contracts.types.TokenType +import com.r3.corda.lib.tokens.integration.workflows.LockEverythingGetValue +import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens +import com.stress.flows.CreateNewCIFlow +import freighter.deployments.DeploymentContext +import freighter.deployments.NodeBuilder +import freighter.deployments.SingleNodeDeployment +import freighter.deployments.UnitOfDeployment +import freighter.machine.DeploymentMachineProvider +import freighter.machine.generateRandomString +import net.corda.core.contracts.Amount +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import org.junit.jupiter.api.Tag +import org.junit.jupiter.api.Test +import utility.getOrThrow +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantLock +import java.util.stream.StreamSupport +import kotlin.concurrent.withLock +import kotlin.streams.toList + +@Tag("LARGE_TEST") +annotation class LargeTest + +class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() { + + val loadingThreads = 8 + val pageSize = 10_000 + + val tokenCurrentContracts = + NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey() + + val testFlows = + NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey() + + val tokenCurrentWorkflows = + NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig( + """ + stateSelection.inMemory.enabled=true + stateSelection.inMemory.indexingStrategies=[EXTERNAL_ID, PUBLIC_KEY] + stateSelection.inMemory.pageSize=${pageSize} + stateSelection.inMemory.loadingSleepSeconds=-1 + stateSelection.inMemory.loadingThreads=${loadingThreads} + """.trimIndent().byteInputStream() + ) + + val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact( + group = "com.r3.corda.lib.ci", + artifact = "ci-workflows", + version = "1.0" + ) + + val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows") + + @Test + fun `tokens can be loaded async during node startup on postgres 9_6`() { + run(DeploymentMachineProvider.DatabaseType.PG_9_6) + } + + @Test + fun `tokens can be loaded async during node startup on H2`() { + run(DeploymentMachineProvider.DatabaseType.H2) + } + + private fun run(db: DeploymentMachineProvider.DatabaseType) { + val randomString = generateRandomString() + val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword) + val node1 = SingleNodeDeployment( + NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString") + .withCordapp(tokenCurrentContracts) + .withCordapp(tokenCurrentWorkflows) + .withCordapp(modernCiV1) + .withCordapp(freighterHelperCordapp) + .withCordapp(testFlows) + .withDatabase(machineProvider.requestDatabase(db)) + ).withVersion(UnitOfDeployment.CORDA_4_7) + .deploy(deploymentContext) + + val nodeMachine1 = node1.getOrThrow().nodeMachines.single() + + val createdCi = nodeMachine1.rpc { + startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also { + println("Successfully created CI: $it") + } + } + + val tokenType = TokenType("StefCoin", 2) + val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType) + val issuedTotal = AtomicLong(0) + + val tokenToIssue = (0.until(100)).map { FungibleToken(Amount(1, issuedTokenType), createdCi) }.toList() + + val numberIssued = StreamSupport.stream((0.until(500)).chunked(200).spliterator(), true).map { toIssue -> + nodeMachine1.rpc { + repeat(toIssue.size) { + startFlow( + ::IssueTokens, + tokenToIssue, listOf() + ).returnValue.getOrThrow(Duration.ofMinutes(1)) + println("[${Thread.currentThread().name}] Total number issued: ${issuedTotal.addAndGet(tokenToIssue.size * 1L)}") + } + } + + toIssue.size * tokenToIssue.size + }.toList().sum() + + nodeMachine1.stopNode() + println() + nodeMachine1.startNode() + + val nodeStartTime = System.currentTimeMillis() + + var amountLoaded = 0L + + val lock = ReentrantLock() + val condition = lock.newCondition() + + + //whilst we are loading, issue 500 more tokens to see if they are correctly loaded + CompletableFuture.runAsync { + lock.withLock { + condition.await() + } + nodeMachine1.rpc { + repeat(5) { + startFlow( + ::IssueTokens, + tokenToIssue, listOf() + ).returnValue.getOrThrow(Duration.ofMinutes(1)) + println("[${Thread.currentThread().name}] Total number issued: ${issuedTotal.addAndGet(tokenToIssue.size * 1L)}") + } + } + } + + while (amountLoaded != (issuedTotal.toLong())) { + nodeMachine1.rpc { + amountLoaded = startFlow( + ::LockEverythingGetValue, + tokenType + ).returnValue.getOrThrow(Duration.ofMinutes(1)) + } + println("TOTAL TOKEN VALUE IN CACHE: $amountLoaded") + + if (amountLoaded > 0) { + lock.withLock { + condition.signal() + } + } + + if (amountLoaded != issuedTotal.toLong()) { + Thread.sleep(20000) + } + } + + val loadEndTime = System.currentTimeMillis() + + println("it took: ${(loadEndTime - nodeStartTime) / 1000} seconds to populate $amountLoaded states using $loadingThreads loading threads and pageSize: $pageSize") + + + } + + +} \ No newline at end of file diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt index deacd504..ef3bdc87 100644 --- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt +++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt @@ -67,12 +67,6 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.MS_SQL) } - @Test - @OracleTest - fun `tokens can be observed on node that does not know CI running oracle 12 r2`() { - runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.ORACLE_12_R2) - } - private fun runTokensOnNodeRunningDatabase(db: DeploymentMachineProvider.DatabaseType) { val randomString = generateRandomString() val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword) @@ -83,7 +77,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { .withCordapp(modernCiV1) .withCordapp(freighterHelperCordapp) .withDatabase(machineProvider.requestDatabase(db)) - ).withVersion(UnitOfDeployment.CORDA_4_6) + ).withVersion(UnitOfDeployment.CORDA_4_7) .deploy(deploymentContext) val node2 = SingleNodeDeployment( @@ -93,7 +87,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() { .withCordapp(modernCiV1) .withCordapp(freighterHelperCordapp) .withDatabase(machineProvider.requestDatabase(db)) - ).withVersion(UnitOfDeployment.CORDA_4_6) + ).withVersion(UnitOfDeployment.CORDA_4_7) .deploy(deploymentContext) val nodeMachine1 = node1.getOrThrow().nodeMachines.single() diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt new file mode 100644 index 00000000..506d0745 --- /dev/null +++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt @@ -0,0 +1,138 @@ +package freighter.testing + +import com.r3.corda.lib.tokens.contracts.states.FungibleToken +import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType +import com.r3.corda.lib.tokens.contracts.types.TokenType +import com.r3.corda.lib.tokens.integration.workflows.GetSelectionPageSize +import com.r3.corda.lib.tokens.integration.workflows.GetSelectionSleepDuration +import com.r3.corda.lib.tokens.integration.workflows.LockEverythingGetValue +import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens +import com.stress.flows.CreateNewCIFlow +import freighter.deployments.DeploymentContext +import freighter.deployments.NodeBuilder +import freighter.deployments.SingleNodeDeployment +import freighter.deployments.UnitOfDeployment +import freighter.machine.DeploymentMachineProvider +import freighter.machine.generateRandomString +import net.corda.core.contracts.Amount +import net.corda.core.internal.stream +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import org.hamcrest.MatcherAssert +import org.hamcrest.Matchers.`is` +import org.junit.jupiter.api.Test +import utility.getOrThrow +import java.time.Duration +import kotlin.streams.toList + +class TokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() { + + val tokenCurrentContracts = + NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey() + + val testFlows = + NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey() + + val tokenCurrentWorkflows = + NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig( + """ + stateSelection.inMemory.enabled=true + stateSelection.inMemory.indexingStrategies=[EXTERNAL_ID, PUBLIC_KEY] + stateSelection.inMemory.pageSize=5 + stateSelection.inMemory.loadingSleepSeconds=600 + """.trimIndent().byteInputStream() + ) + + val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact( + group = "com.r3.corda.lib.ci", + artifact = "ci-workflows", + version = "1.0" + ) + + val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows") + + @Test + fun `tokens can be loaded async during node startup on postgres 9_6`() { + run(DeploymentMachineProvider.DatabaseType.PG_9_6) + } + + @Test + fun `tokens can be loaded async during node startup on H2`() { + run(DeploymentMachineProvider.DatabaseType.H2) + } + + private fun run(db: DeploymentMachineProvider.DatabaseType) { + val randomString = generateRandomString() + val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword) + val node1 = SingleNodeDeployment( + NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString") + .withCordapp(tokenCurrentContracts) + .withCordapp(tokenCurrentWorkflows) + .withCordapp(modernCiV1) + .withCordapp(freighterHelperCordapp) + .withCordapp(testFlows) + .withDatabase(machineProvider.requestDatabase(db)) + ).withVersion(UnitOfDeployment.CORDA_4_7) + .deploy(deploymentContext) + + val nodeMachine1 = node1.getOrThrow().nodeMachines.single() + + val createdCi = nodeMachine1.rpc { + startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also { + println("Successfully created CI: $it") + } + } + + val tokenType = TokenType("StefCoin", 2) + val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType) + val amount = Amount(1, issuedTokenType) + val tokenToIssue1 = (0..100).map { FungibleToken(amount, createdCi) }.toList() + + val loadingPageSize = nodeMachine1.rpc { + startFlow( + ::GetSelectionPageSize + ).returnValue.getOrThrow() + } + + MatcherAssert.assertThat(loadingPageSize, `is`(5)) + + val loadingSleepDuration = nodeMachine1.rpc { + startFlow( + ::GetSelectionSleepDuration + ).returnValue.getOrThrow() + } + + MatcherAssert.assertThat(loadingSleepDuration, `is`(600)) + + + val issueTXs = (0..100).stream(true).mapToObj { + val issueTx = nodeMachine1.rpc { + startFlow( + ::IssueTokens, + tokenToIssue1, listOf() + ).returnValue.getOrThrow(Duration.ofMinutes(1)) + } + issueTx + }.toList() + + + print(issueTXs) + + nodeMachine1.stopNode() + + println() + + nodeMachine1.startNode() + + val tokenValueLoadedInCache = nodeMachine1.rpc { + startFlow( + ::LockEverythingGetValue, + tokenType + ).returnValue.getOrThrow(Duration.ofMinutes(1)) + } + + MatcherAssert.assertThat(tokenValueLoadedInCache, `is`(5L)) + } + + +} \ No newline at end of file diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt index 4cbea217..405cdfa2 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt @@ -19,6 +19,11 @@ internal fun sortByStateRefAscending(): Sort { return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))) } +internal fun sortByTimeStampAscending(): Sort { + val sortAttribute = SortAttribute.Standard(Sort.VaultStateAttribute.RECORDED_TIME) + return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC))) +} + // Returns all held token amounts of a specified token with given issuer. // We need to discriminate on the token type as well as the symbol as different tokens might use the same symbols. @Suspendable diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt index cf8a1b65..17826001 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt @@ -3,69 +3,79 @@ package com.r3.corda.lib.tokens.selection.memory.config import co.paralleluniverse.fibers.Suspendable import com.r3.corda.lib.tokens.selection.api.StateSelectionConfig import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import net.corda.core.cordapp.CordappConfig import net.corda.core.cordapp.CordappConfigException import net.corda.core.node.ServiceHub import org.slf4j.LoggerFactory -const val CACHE_SIZE_DEFAULT = 1024 // TODO Return good default, for now it's not wired, it will be done in separate PR. +const val CACHE_SIZE_DEFAULT = 1024 +const val PAGE_SIZE_DEFAULT = 1024 -data class InMemorySelectionConfig @JvmOverloads constructor(val enabled: Boolean, - val indexingStrategies: List, - val cacheSize: Int = CACHE_SIZE_DEFAULT) : StateSelectionConfig { - companion object { - private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger") +data class InMemorySelectionConfig @JvmOverloads constructor( + val enabled: Boolean, + val indexingStrategies: List, + val cacheSize: Int = CACHE_SIZE_DEFAULT, + val pageSize: Int = 1000, + val sleep: Int = 0, + val loadingThreads: Int = 4 +) : StateSelectionConfig { + companion object { + private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger") - @JvmStatic - fun parse(config: CordappConfig): InMemorySelectionConfig { - val enabled = if (!config.exists("stateSelection.inMemory.enabled")) { - logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this") - true - } else { - config.getBoolean("stateSelection.inMemory.enabled") - } - val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize") - ?: CACHE_SIZE_DEFAULT - val indexingType = try { - (config.get("stateSelection.inMemory.indexingStrategies") as List).map { VaultWatcherService.IndexingType.valueOf(it.toString()) } - } catch (e: CordappConfigException) { - logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() - } catch (e: ClassCastException) { - logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") - emptyList() - } - logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize") - return InMemorySelectionConfig(enabled, indexingType, cacheSize) - } + @JvmStatic + fun parse(config: CordappConfig): InMemorySelectionConfig { + val enabled = if (!config.exists("stateSelection.inMemory.enabled")) { + logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this") + true + } else { + config.getBoolean("stateSelection.inMemory.enabled") + } + val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize") + ?: CACHE_SIZE_DEFAULT + val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.pageSize")?: PAGE_SIZE_DEFAULT + val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0 + val loadingThreads: Int = config.getIntOrNull("stateSelection.inMemory.loadingThreads")?: 4 + val indexingType = try { + (config.get("stateSelection.inMemory.indexingStrategies") as List).map { IndexingType.valueOf(it.toString()) } + } catch (e: CordappConfigException) { + logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") + emptyList() + } catch (e: ClassCastException) { + logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens") + emptyList() + } + logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize") + return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep, loadingThreads) + } - fun defaultConfig(): InMemorySelectionConfig { - return InMemorySelectionConfig(true, emptyList()) - } - } + fun defaultConfig(): InMemorySelectionConfig { + return InMemorySelectionConfig(true, emptyList()) + } + } - @Suspendable - override fun toSelector(services: ServiceHub): LocalTokenSelector { - return try { - val vaultObserver = services.cordaService(VaultWatcherService::class.java) - LocalTokenSelector(services, vaultObserver, state = null) - } catch (e: IllegalArgumentException) { - throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.") - } - } + @Suspendable + override fun toSelector(services: ServiceHub): LocalTokenSelector { + return try { + val vaultObserver = services.cordaService(VaultWatcherService::class.java) + LocalTokenSelector(services, vaultObserver, state = null) + } catch (e: IllegalArgumentException) { + throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.") + } + } } // Helpers for configuration parsing. fun CordappConfig.getIntOrNull(path: String): Int? { - return try { - getInt(path) - } catch (e: CordappConfigException) { - if (exists(path)) { - throw IllegalArgumentException("Provide correct database selection configuration for config path: $path") - } else { - null - } - } + return try { + getInt(path) + } catch (e: CordappConfigException) { + if (exists(path)) { + throw IllegalArgumentException("Provide correct database selection configuration for config path: $path") + } else { + null + } + } } diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt new file mode 100644 index 00000000..2bec1ccb --- /dev/null +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt @@ -0,0 +1,25 @@ +package com.r3.corda.lib.tokens.selection.memory.services + +import com.r3.corda.lib.tokens.selection.memory.internal.Holder + +enum class IndexingType(val ownerType: Class) { + + EXTERNAL_ID(Holder.MappedIdentity::class.java), + PUBLIC_KEY(Holder.KeyIdentity::class.java); + + companion object { + fun fromHolder(holder: Class): IndexingType { + return when (holder) { + Holder.MappedIdentity::class.java -> { + EXTERNAL_ID + } + + Holder.KeyIdentity::class.java -> { + PUBLIC_KEY + } + else -> throw IllegalArgumentException("Unknown Holder type: $holder") + } + } + } + +} \ No newline at end of file diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt new file mode 100644 index 00000000..62fab99f --- /dev/null +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt @@ -0,0 +1,76 @@ +package com.r3.corda.lib.tokens.selection.memory.services + +import com.r3.corda.lib.tokens.contracts.states.FungibleToken +import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig +import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending +import io.github.classgraph.ClassGraph +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.Vault +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.utilities.contextLogger +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger + +class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub, + private val configOptions: InMemorySelectionConfig) : ((Vault.Update) -> Unit) -> Unit { + + + override fun invoke( + onVaultUpdate: (Vault.Update) -> Unit + ) { + LOG.info("Starting async token loading from vault") + + val classGraph = ClassGraph() + classGraph.enableClassInfo() + + val scanResultFuture = CompletableFuture.supplyAsync { + classGraph.scan() + } + + scanResultFuture.thenApplyAsync { scanResult -> + val subclasses: Set> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName) + .map { it.name } + .map { Class.forName(it) as Class }.toSet() + + val enrichedClasses = (subclasses - setOf(FungibleToken::class.java)) + LOG.info("Enriching token query with types: $enrichedClasses") + + val shouldLoop = AtomicBoolean(true) + val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1) + val loadingFutures: List> = 0.until(configOptions.loadingThreads).map { + CompletableFuture.runAsync { + try { + while (shouldLoop.get()) { + LOG.info("loading page: ${pageNumber.get() + 1}, should loop: ${shouldLoop.get()}") + val newlyLoadedStates = appServiceHub.vaultService.queryBy( + paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = configOptions.pageSize), + criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses), + sorting = sortByTimeStampAscending() + ).states.toSet() + onVaultUpdate(Vault.Update(emptySet(), newlyLoadedStates)) + LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") + shouldLoop.compareAndSet(true, newlyLoadedStates.isNotEmpty()) + LOG.debug("shouldLoop=${shouldLoop}") + if (configOptions.sleep > 0) { + Thread.sleep(configOptions.sleep.toLong() * 1000) + } + } + } catch (t: Throwable) { + LOG.error("Token Loading Failed due to: ", t) + } + } + } + CompletableFuture.allOf(*loadingFutures.toTypedArray()).thenRunAsync { + LOG.info("finished token loading") + } + } + } + + companion object { + val LOG = contextLogger() + } +} \ No newline at end of file diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt index dad687c0..0c01b700 100644 --- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt +++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt @@ -4,9 +4,9 @@ import com.r3.corda.lib.tokens.contracts.states.FungibleToken import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType import com.r3.corda.lib.tokens.contracts.types.TokenType import com.r3.corda.lib.tokens.contracts.utilities.withoutIssuer -import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.InsufficientBalanceException import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException +import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.internal.Holder import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey import com.r3.corda.lib.tokens.selection.sortByStateRefAscending @@ -29,309 +29,292 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kotlin.concurrent.read import kotlin.concurrent.write -val UPDATER: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor() val EMPTY_BUCKET = TokenBucket() const val PLACE_HOLDER: String = "THIS_IS_A_PLACE_HOLDER" @CordaService -class VaultWatcherService(private val tokenObserver: TokenObserver, - private val providedConfig: InMemorySelectionConfig) : SingletonSerializeAsToken() { - - private val __backingMap: ConcurrentMap, String> = ConcurrentHashMap() - private val __indexed: ConcurrentMap, ConcurrentMap> = ConcurrentHashMap( - providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap() }.toMap() - ) - - private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock() - - enum class IndexingType(val ownerType: Class) { - - EXTERNAL_ID(Holder.MappedIdentity::class.java), - PUBLIC_KEY(Holder.KeyIdentity::class.java); - - companion object { - fun fromHolder(holder: Class): IndexingType { - return when (holder) { - Holder.MappedIdentity::class.java -> { - EXTERNAL_ID - } - - Holder.KeyIdentity::class.java -> { - PUBLIC_KEY; - } - else -> throw IllegalArgumentException("Unknown Holder type: $holder") - } - } - } - - } - - constructor(appServiceHub: AppServiceHub) : this(getObservableFromAppServiceHub(appServiceHub), InMemorySelectionConfig.parse(appServiceHub.getAppContext().config)) - - companion object { - val LOG = contextLogger() - - private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver { - val config = appServiceHub.cordappProvider.getAppContext().config - val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config) - - if (!configOptions.enabled) { - LOG.info("Disabling inMemory token selection - refer to documentation on how to enable") - return TokenObserver(emptyList(), Observable.empty(), { _, _ -> - Holder.UnmappedIdentity() - }) - } - - val ownerProvider: (StateAndRef, IndexingType) -> Holder = { token, indexingType -> - when (indexingType) { - IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey) - IndexingType.EXTERNAL_ID -> { - val owningKey = token.state.data.holder.owningKey - lookupExternalIdFromKey(owningKey, appServiceHub) - } - } - } - - - val pageSize = 1000 - var currentPage = DEFAULT_PAGE_NUM - val (_, vaultObservable) = appServiceHub.vaultService.trackBy( - contractStateType = FungibleToken::class.java, - paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize), - criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL), - sorting = sortByStateRefAscending()) - - // we use the UPDATER thread for two reasons - // 1 this means we return the service before all states are loaded, and so do not hold up the node startup - // 2 because all updates to the cache (addition / removal) are also done via UPDATER, this means that until we have finished loading all updates are buffered preventing out of order updates - val asyncLoader = object : ((Vault.Update) -> Unit) -> Unit { - override fun invoke(callback: (Vault.Update) -> Unit) { - LOG.info("Starting async token loading from vault") - UPDATER.submit { - try { - var shouldLoop = true - while (shouldLoop) { - val newlyLoadedStates = appServiceHub.vaultService.queryBy( - contractStateType = FungibleToken::class.java, - paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize), - criteria = QueryCriteria.VaultQueryCriteria(), - sorting = sortByStateRefAscending() - ).states.toSet() - LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback") - callback(Vault.Update(emptySet(), newlyLoadedStates)) - shouldLoop = newlyLoadedStates.isNotEmpty() - LOG.debug("shouldLoop=${shouldLoop}") - currentPage++ - } - LOG.info("finished token loading") - } catch (t: Throwable) { - LOG.error("Token Loading Failed due to: ", t) - } - } - } - } - return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader) - } - } - - init { - addTokensToCache(tokenObserver.initialValues) - tokenObserver.source.doOnError { - LOG.error("received error from observable", it) - } - tokenObserver.startLoading(::onVaultUpdate) - tokenObserver.source.subscribe(::onVaultUpdate) - } - - private fun processToken(token: StateAndRef, indexingType: IndexingType): TokenIndex { - val owner = tokenObserver.ownerProvider(token, indexingType) - val type = token.state.data.amount.token.tokenType.tokenClass - val typeId = token.state.data.amount.token.tokenType.tokenIdentifier - return TokenIndex(owner, type, typeId) - } - - private fun onVaultUpdate(t: Vault.Update) { - LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states") - try { - removeTokensFromCache(t.consumed) - addTokensToCache(t.produced) - } catch (t: Throwable) { - //we DO NOT want to kill the observable - as a single exception will terminate the feed - LOG.error("Failure during token cache update", t) - } - } - - private fun removeTokensFromCache(stateAndRefs: Collection>) { - indexViewCreationLock.read { - for (stateAndRef in stateAndRefs) { - val existingMark = __backingMap.remove(stateAndRef) - existingMark - ?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours") - for (key in __indexed.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(key)) - val indexedViewForHolder = __indexed[key] - indexedViewForHolder - ?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") - - val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index) - bucketForIndex?.remove(stateAndRef) - } - } - } - } - - private fun addTokensToCache(stateAndRefs: Collection>) { - indexViewCreationLock.read { - for (stateAndRef in stateAndRefs) { - val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER) - existingMark?.let { - LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours") - } - for (key in __indexed.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(key)) - val indexedViewForHolder = __indexed[key] - ?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") - val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { - TokenBucket() - } - bucketForIndex.add(stateAndRef) - } - } - } - } - - private fun getOrCreateIndexViewForHolderType(holderType: Class): ConcurrentMap { - return __indexed[holderType] ?: indexViewCreationLock.write { - __indexed[holderType] ?: generateNewIndexedView(holderType) - } - } - - private fun generateNewIndexedView(holderType: Class): ConcurrentMap { - val indexedViewForHolder: ConcurrentMap = ConcurrentHashMap() - for (stateAndRef in __backingMap.keys) { - val index = processToken(stateAndRef, IndexingType.fromHolder(holderType)) - val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { - TokenBucket() - } - bucketForIndex.add(stateAndRef) - } - __indexed[holderType] = indexedViewForHolder - return indexedViewForHolder - } - - fun lockTokensExternal(list: List>, knownSelectionId: String) { - list.forEach { - __backingMap.replace(it, PLACE_HOLDER, knownSelectionId) - } - } - - fun selectTokens( - owner: Holder, - requiredAmount: Amount, - predicate: ((StateAndRef) -> Boolean) = { true }, - allowShortfall: Boolean = false, - autoUnlockDelay: Duration = Duration.ofMinutes(5), - selectionId: String - ): List> { - //we have to handle both cases - //1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType - //2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer. - val enrichedPredicate: AtomicReference<(StateAndRef) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) { - val issuer = (requiredAmount.token as IssuedTokenType).issuer - { token -> - predicate(token) && token.state.data.issuer == issuer - } - } else { - predicate - }) - - val lockedTokens = mutableListOf>() - val bucket: Iterable> = if (owner is Holder.TokenOnly) { - val currentPredicate = enrichedPredicate.get() - //why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault - //so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type - //we will revisit in future if this assumption turns out to be wrong - enrichedPredicate.set { - val stateTokenType = it.state.data.tokenType - currentPredicate(it) && - stateTokenType.fractionDigits == requiredAmount.token.fractionDigits && - requiredAmount.token.tokenClass == stateTokenType.tokenClass && - requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier - } - __backingMap.keys - } else { - val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass) - getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView) - } - - val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer() - var amountLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) - // this is the running total of soft locked tokens that we encounter until the target token amount is reached - var amountAlreadySoftLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) - val finalPredicate = enrichedPredicate.get() - for (tokenStateAndRef in bucket) { - // Does the token satisfy the (optional) predicate eg. issuer? - if (finalPredicate.invoke(tokenStateAndRef)) { - val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer()) - // if so, race to lock the token, expected oldValue = PLACE_HOLDER - if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) { - // we won the race to lock this token - lockedTokens.add(tokenStateAndRef) - amountLocked += tokenAmount - if (amountLocked >= requiredAmountWithoutIssuer) { - break - } - } else { - amountAlreadySoftLocked += tokenAmount - } - } - } - - if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) { - lockedTokens.forEach { - unlockToken(it, selectionId) - } - if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) { - throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.") - } else { - throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.") - } - } - - UPDATER.schedule({ - lockedTokens.forEach { - unlockToken(it, selectionId) - } - }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) - - return uncheckedCast(lockedTokens) - } - - fun unlockToken(it: StateAndRef, selectionId: String) { - __backingMap.replace(it, selectionId, PLACE_HOLDER) - } - - private fun getTokenBucket(idx: Holder, - tokenClass: Class<*>, - tokenIdentifier: String, - mapToSelectFrom: ConcurrentMap): TokenBucket { - return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET - } +class VaultWatcherService( + private val tokenObserver: TokenObserver, + val providedConfig: InMemorySelectionConfig +) : SingletonSerializeAsToken() { + + private val __backingMap: ConcurrentMap, String> = ConcurrentHashMap() + private val __indexed: ConcurrentMap, ConcurrentMap> = ConcurrentHashMap( + providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap() }.toMap() + ) + + private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock() + private val UPDATER = Executors.newSingleThreadScheduledExecutor() + + constructor(appServiceHub: AppServiceHub) : this( + getObservableFromAppServiceHub(appServiceHub), + InMemorySelectionConfig.parse(appServiceHub.getAppContext().config) + ) + + init { + addTokensToCache(tokenObserver.initialValues) + tokenObserver.source.doOnError { + LOG.error("received error from observable", it) + } + tokenObserver.source.subscribe(::onVaultUpdate) + tokenObserver.startLoading(::onVaultUpdate) + } + + companion object { + val LOG = contextLogger() + + private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver { + val rawConfig = appServiceHub.cordappProvider.getAppContext().config + val parsedConfig: InMemorySelectionConfig = InMemorySelectionConfig.parse(rawConfig) + + if (!parsedConfig.enabled) { + LOG.info("Disabling inMemory token selection - refer to documentation on how to enable") + return TokenObserver(emptyList(), Observable.empty(), { _, _ -> + Holder.UnmappedIdentity() + }) + } + + val ownerProvider: (StateAndRef, IndexingType) -> Holder = { token, indexingType -> + when (indexingType) { + IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey) + IndexingType.EXTERNAL_ID -> { + val owningKey = token.state.data.holder.owningKey + lookupExternalIdFromKey(owningKey, appServiceHub) + } + } + } + val (_, vaultObservable) = appServiceHub.vaultService.trackBy( + contractStateType = FungibleToken::class.java, + paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1), + criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL), + sorting = sortByStateRefAscending() + ) + + val asyncLoader = ServiceHubAsyncLoader(appServiceHub, parsedConfig) + return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader) + } + } + + + + private fun processToken(token: StateAndRef, indexingType: IndexingType): TokenIndex { + val owner = tokenObserver.ownerProvider(token, indexingType) + val type = token.state.data.amount.token.tokenType.tokenClass + val typeId = token.state.data.amount.token.tokenType.tokenIdentifier + return TokenIndex(owner, type, typeId) + } + + fun onVaultUpdate(t: Vault.Update) { + UPDATER.submit { + LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states") + try { + removeTokensFromCache(t.consumed) + addTokensToCache(t.produced) + } catch (t: Throwable) { + //we DO NOT want to kill the observable - as a single exception will terminate the feed + LOG.error("Failure during token cache update", t) + } + } + } + + private fun removeTokensFromCache(stateAndRefs: Collection>) { + indexViewCreationLock.read { + for (stateAndRef in stateAndRefs) { + val existingMark = __backingMap.remove(stateAndRef) + existingMark + ?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours") + for (key in __indexed.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(key)) + val indexedViewForHolder = __indexed[key] + indexedViewForHolder + ?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") + + val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index) + bucketForIndex?.remove(stateAndRef) + } + } + } + } + + private fun addTokensToCache(stateAndRefs: Collection>) { + indexViewCreationLock.read { + for (stateAndRef in stateAndRefs) { + if (stateAndRef.state.encumbrance != null){ + continue + } + val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER) + existingMark?.let { + LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours") + } + for (key in __indexed.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(key)) + val indexedViewForHolder = __indexed[key] + ?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views") + val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { + TokenBucket() + } + bucketForIndex.add(stateAndRef) + } + } + } + } + + private fun getOrCreateIndexViewForHolderType(holderType: Class): ConcurrentMap { + return __indexed[holderType] ?: indexViewCreationLock.write { + __indexed[holderType] ?: generateNewIndexedView(holderType) + } + } + + private fun generateNewIndexedView(holderType: Class): ConcurrentMap { + val indexedViewForHolder: ConcurrentMap = ConcurrentHashMap() + for (stateAndRef in __backingMap.keys) { + val index = processToken(stateAndRef, IndexingType.fromHolder(holderType)) + val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) { + TokenBucket() + } + bucketForIndex.add(stateAndRef) + } + __indexed[holderType] = indexedViewForHolder + return indexedViewForHolder + } + + fun lockTokensExternal(list: List>, knownSelectionId: String, autoUnlockDelay: Duration? = null) { + + list.forEach { + __backingMap.putIfAbsent(it, PLACE_HOLDER) + } + + list.forEach { + __backingMap.replace(it, PLACE_HOLDER, knownSelectionId) + } + + if (autoUnlockDelay != null) { + UPDATER.schedule({ + list.forEach { + unlockToken(it, knownSelectionId) + } + }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) + } + } + + fun selectTokens( + owner: Holder, + requiredAmount: Amount, + predicate: ((StateAndRef) -> Boolean) = { true }, + allowShortfall: Boolean = false, + autoUnlockDelay: Duration = Duration.ofMinutes(5), + selectionId: String + ): List> { + //we have to handle both cases + //1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType + //2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer. + val enrichedPredicate: AtomicReference<(StateAndRef) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) { + val issuer = (requiredAmount.token as IssuedTokenType).issuer + { token -> + predicate(token) && token.state.data.issuer == issuer + } + } else { + predicate + }) + + val lockedTokens = mutableListOf>() + val bucket: Iterable> = if (owner is Holder.TokenOnly) { + val currentPredicate = enrichedPredicate.get() + //why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault + //so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type + //we will revisit in future if this assumption turns out to be wrong + enrichedPredicate.set { + val stateTokenType = it.state.data.tokenType + currentPredicate(it) && + stateTokenType.fractionDigits == requiredAmount.token.fractionDigits && + requiredAmount.token.tokenClass == stateTokenType.tokenClass && + requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier + } + __backingMap.keys + } else { + val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass) + getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView) + } + + val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer() + var amountLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) + // this is the running total of soft locked tokens that we encounter until the target token amount is reached + var amountAlreadySoftLocked: Amount = requiredAmountWithoutIssuer.copy(quantity = 0) + val finalPredicate = enrichedPredicate.get() + for (tokenStateAndRef in bucket) { + // Does the token satisfy the (optional) predicate eg. issuer? + if (finalPredicate.invoke(tokenStateAndRef)) { + val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer()) + // if so, race to lock the token, expected oldValue = PLACE_HOLDER + if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) { + // we won the race to lock this token + lockedTokens.add(tokenStateAndRef) + amountLocked += tokenAmount + if (amountLocked >= requiredAmountWithoutIssuer) { + break + } + } else { + amountAlreadySoftLocked += tokenAmount + } + } + } + + if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) { + lockedTokens.forEach { + unlockToken(it, selectionId) + } + if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) { + throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.") + } else { + throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.") + } + } + + UPDATER.schedule({ + lockedTokens.forEach { + unlockToken(it, selectionId) + } + }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS) + + return uncheckedCast(lockedTokens) + } + + fun unlockToken(it: StateAndRef, selectionId: String) { + __backingMap.replace(it, selectionId, PLACE_HOLDER) + } + + fun isTokenLocked(it: StateAndRef, lockId: String? = null): Boolean { + return if (lockId != null) { + __backingMap[it] == lockId + } else __backingMap[it] != PLACE_HOLDER + } + + private fun getTokenBucket( + idx: Holder, + tokenClass: Class<*>, + tokenIdentifier: String, + mapToSelectFrom: ConcurrentMap + ): TokenBucket { + return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET + } } -class TokenObserver(val initialValues: List>, - val source: Observable>, - val ownerProvider: ((StateAndRef, VaultWatcherService.IndexingType) -> Holder), - inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> }) { +class TokenObserver( + val initialValues: List>, + val source: Observable>, + val ownerProvider: ((StateAndRef, IndexingType) -> Holder), + inline val asyncLoader: ((Vault.Update) -> Unit) -> Unit = { _ -> } +) { - fun startLoading(loadingCallBack: (Vault.Update) -> Unit) { - asyncLoader(loadingCallBack) - } + fun startLoading(loadingCallBack: (Vault.Update) -> Unit) { + asyncLoader(loadingCallBack) + } } -class TokenBucket(set: MutableSet> = ConcurrentHashMap, Boolean>().keySet(true)) : MutableSet> by set +class TokenBucket(set: MutableSet> = ConcurrentHashMap, Boolean>().keySet(true)) : + MutableSet> by set data class TokenIndex(val owner: Holder, val tokenClazz: Class<*>, val tokenIdentifier: String) diff --git a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt index fdd5080d..78c4503d 100644 --- a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt +++ b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt @@ -55,6 +55,7 @@ import org.hamcrest.CoreMatchers.`is` import org.hamcrest.CoreMatchers.equalTo import org.junit.Assert import org.junit.Test +import java.util.concurrent.CountDownLatch import kotlin.test.assertFailsWith class TokenDriverTest { @@ -264,7 +265,7 @@ class TokenDriverTest { } } - @Test + @Test(timeout = 300_000) fun `tokens locked in memory are still locked after restart`() { driver(DriverParameters( inMemoryDB = false, @@ -292,12 +293,15 @@ class TokenDriverTest { listOf(50.USD issuedBy nodeParty heldBy nodeParty), emptyList() ).returnValue.getOrThrow() - // Run select and lock tokens flow with 5 seconds sleep in it. - node.rpc.startFlowDynamic( + // Run select and lock tokens flow + val pt = node.rpc.startTrackedFlowDynamic( SelectAndLockFlow::class.java, 50.GBP, - 5.seconds - ) + 50.seconds + ).returnValue + + Thread.sleep(10_000) + // Stop node (node as OutOfProcess).process.destroyForcibly() node.stop() @@ -305,6 +309,8 @@ class TokenDriverTest { // Restart the node val restartedNode = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to "localhost:30000")).getOrThrow() // Try to spend same states, they should be locked after restart, so we expect insufficient not locked balance exception to be thrown. + Thread.sleep(15000) // because token loading is now async, we must wait a bit of time before we can attempt to select. + assertFailsWith { restartedNode.rpc.startFlowDynamic( SelectAndLockFlow::class.java, diff --git a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt index 1f41aa9e..cc32e840 100644 --- a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt +++ b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt @@ -8,7 +8,9 @@ import com.r3.corda.lib.tokens.contracts.types.TokenPointer import com.r3.corda.lib.tokens.contracts.types.TokenType import com.r3.corda.lib.tokens.selection.InsufficientBalanceException import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelection +import com.r3.corda.lib.tokens.selection.memory.internal.Holder import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector +import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import com.r3.corda.lib.tokens.testing.states.House import com.r3.corda.lib.tokens.workflows.flows.move.addMoveNonFungibleTokens import com.r3.corda.lib.tokens.workflows.flows.move.addMoveTokens @@ -28,10 +30,12 @@ import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap import java.time.Duration import java.time.temporal.ChronoUnit +import java.util.* // This is very simple test flow for DvP. @CordaSerializable @@ -40,121 +44,180 @@ private class DvPNotification(val amount: Amount) @StartableByRPC @InitiatingFlow class DvPFlow(val house: House, val newOwner: Party) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - val txBuilder = TransactionBuilder(notary = getPreferredNotary(serviceHub)) - addMoveNonFungibleTokens(txBuilder, serviceHub, house.toPointer(), newOwner) - val session = initiateFlow(newOwner) - // Ask for input stateAndRefs - send notification with the amount to exchange. - session.send(DvPNotification(house.valuation)) - // TODO add some checks for inputs and outputs - val inputs = subFlow(ReceiveStateAndRefFlow(session)) - // Receive outputs (this is just quick and dirty, we could calculate them on our side of the flow). - val outputs = session.receive>().unwrap { it } - addMoveTokens(txBuilder, inputs, outputs) - // Synchronise any confidential identities - subFlow(SyncKeyMappingFlow(session, txBuilder.toWireTransaction(serviceHub))) - val ourSigningKeys = txBuilder.toLedgerTransaction(serviceHub).ourSigningKeys(serviceHub) - val initialStx = serviceHub.signInitialTransaction(txBuilder, signingPubKeys = ourSigningKeys) - val stx = subFlow(CollectSignaturesFlow(initialStx, listOf(session), ourSigningKeys)) - // Update distribution list. - subFlow(UpdateDistributionListFlow(stx)) - return subFlow(ObserverAwareFinalityFlow(stx, listOf(session))) - } + @Suspendable + override fun call(): SignedTransaction { + val txBuilder = TransactionBuilder(notary = getPreferredNotary(serviceHub)) + addMoveNonFungibleTokens(txBuilder, serviceHub, house.toPointer(), newOwner) + val session = initiateFlow(newOwner) + // Ask for input stateAndRefs - send notification with the amount to exchange. + session.send(DvPNotification(house.valuation)) + // TODO add some checks for inputs and outputs + val inputs = subFlow(ReceiveStateAndRefFlow(session)) + // Receive outputs (this is just quick and dirty, we could calculate them on our side of the flow). + val outputs = session.receive>().unwrap { it } + addMoveTokens(txBuilder, inputs, outputs) + // Synchronise any confidential identities + subFlow(SyncKeyMappingFlow(session, txBuilder.toWireTransaction(serviceHub))) + val ourSigningKeys = txBuilder.toLedgerTransaction(serviceHub).ourSigningKeys(serviceHub) + val initialStx = serviceHub.signInitialTransaction(txBuilder, signingPubKeys = ourSigningKeys) + val stx = subFlow(CollectSignaturesFlow(initialStx, listOf(session), ourSigningKeys)) + // Update distribution list. + subFlow(UpdateDistributionListFlow(stx)) + return subFlow(ObserverAwareFinalityFlow(stx, listOf(session))) + } } @InitiatedBy(DvPFlow::class) class DvPFlowHandler(val otherSession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - // Receive notification with house price. - val dvPNotification = otherSession.receive().unwrap { it } - // Chose state and refs to send back. - // TODO This is API pain, we assumed that we could just modify TransactionBuilder, but... it cannot be sent over the wire, because non-serializable - // We need custom serializer and some custom flows to do checks. - val changeHolder = serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false).party.anonymise() - val (inputs, outputs) = DatabaseTokenSelection(serviceHub).generateMove( - lockId = runId.uuid, - partiesAndAmounts = listOf(Pair(otherSession.counterparty, dvPNotification.amount)), - changeHolder = changeHolder - ) - subFlow(SendStateAndRefFlow(otherSession, inputs)) - otherSession.send(outputs) - subFlow(SyncKeyMappingFlowHandler(otherSession)) - subFlow(object : SignTransactionFlow(otherSession) { - override fun checkTransaction(stx: SignedTransaction) {} - } - ) - subFlow(ObserverAwareFinalityFlowHandler(otherSession)) - } + @Suspendable + override fun call() { + // Receive notification with house price. + val dvPNotification = otherSession.receive().unwrap { it } + // Chose state and refs to send back. + // TODO This is API pain, we assumed that we could just modify TransactionBuilder, but... it cannot be sent over the wire, because non-serializable + // We need custom serializer and some custom flows to do checks. + val changeHolder = serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false).party.anonymise() + val (inputs, outputs) = DatabaseTokenSelection(serviceHub).generateMove( + lockId = runId.uuid, + partiesAndAmounts = listOf(Pair(otherSession.counterparty, dvPNotification.amount)), + changeHolder = changeHolder + ) + subFlow(SendStateAndRefFlow(otherSession, inputs)) + otherSession.send(outputs) + subFlow(SyncKeyMappingFlowHandler(otherSession)) + subFlow(object : SignTransactionFlow(otherSession) { + override fun checkTransaction(stx: SignedTransaction) {} + } + ) + subFlow(ObserverAwareFinalityFlowHandler(otherSession)) + } } @StartableByRPC class GetDistributionList(val housePtr: TokenPointer) : FlowLogic>() { - @Suspendable - override fun call(): List { - return getDistributionList(serviceHub, housePtr.pointer.pointer) - } + @Suspendable + override fun call(): List { + return getDistributionList(serviceHub, housePtr.pointer.pointer) + } } @StartableByRPC class CheckTokenPointer(val housePtr: TokenPointer) : FlowLogic() { - @Suspendable - override fun call(): House { - return housePtr.pointer.resolve(serviceHub).state.data - } + @Suspendable + override fun call(): House { + return housePtr.pointer.resolve(serviceHub).state.data + } } // TODO This is hack that will be removed after fix in Corda 5. startFlowDynamic doesn't handle type parameters properly. @StartableByRPC class RedeemNonFungibleHouse( - val housePtr: TokenPointer, - val issuerParty: Party + val housePtr: TokenPointer, + val issuerParty: Party ) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - return subFlow(RedeemNonFungibleTokens(housePtr, issuerParty, emptyList())) - } + @Suspendable + override fun call(): SignedTransaction { + return subFlow(RedeemNonFungibleTokens(housePtr, issuerParty, emptyList())) + } } @StartableByRPC class RedeemFungibleGBP( - val amount: Amount, - val issuerParty: Party + val amount: Amount, + val issuerParty: Party ) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - return subFlow(RedeemFungibleTokens(amount, issuerParty, emptyList(), null)) - } + @Suspendable + override fun call(): SignedTransaction { + return subFlow(RedeemFungibleTokens(amount, issuerParty, emptyList(), null)) + } } // Helper flow for selection testing @StartableByRPC class SelectAndLockFlow(val amount: Amount, val delay: Duration = 1.seconds) : FlowLogic() { - @Suspendable - override fun call() { - val selector = LocalTokenSelector(serviceHub) - selector.selectTokens(amount) - sleep(delay) - } + + + companion object { + val SELECTED = ProgressTracker.Step("SELECTED") + } + + override val progressTracker: ProgressTracker? + get() = ProgressTracker(ProgressTracker.STARTING, SELECTED, ProgressTracker.DONE) + + @Suspendable + override fun call() { + progressTracker?.currentStep = ProgressTracker.STARTING + val selector = LocalTokenSelector(serviceHub) + selector.selectTokens(amount) + progressTracker?.currentStep = SELECTED + sleep(delay) + progressTracker?.currentStep = ProgressTracker.DONE + } } // Helper flow for selection testing @StartableByRPC -class JustLocalSelect(val amount: Amount, val timeBetweenSelects: Duration = Duration.of(10, ChronoUnit.SECONDS), val maxSelectAttempts: Int = 5) : FlowLogic>>() { - @Suspendable - override fun call(): List> { - val selector = LocalTokenSelector(serviceHub) - var selectionAttempts = 0 - while (selectionAttempts < maxSelectAttempts) { - try { - return selector.selectTokens(amount) - } catch (e: InsufficientBalanceException) { - logger.error("failed to select", e) - sleep(timeBetweenSelects, true) - selectionAttempts++ - } - } - throw InsufficientBalanceException("Could not select: ${amount}") - } +class JustLocalSelect(val amount: Amount, val timeBetweenSelects: Duration = Duration.of(10, ChronoUnit.SECONDS), val maxSelectAttempts: Int = 5) : + FlowLogic>>() { + @Suspendable + override fun call(): List> { + val selector = LocalTokenSelector(serviceHub) + var selectionAttempts = 0 + while (selectionAttempts < maxSelectAttempts) { + try { + return selector.selectTokens(amount) + } catch (e: InsufficientBalanceException) { + logger.error("failed to select", e) + sleep(timeBetweenSelects, true) + selectionAttempts++ + } + } + throw InsufficientBalanceException("Could not select: ${amount}") + } +} + +@StartableByRPC +class GetSelectionPageSize : FlowLogic() { + @Suspendable + override fun call(): Int { + val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java) + return vaultWatcherService.providedConfig.pageSize + } +} + +@StartableByRPC +class GetSelectionSleepDuration : FlowLogic() { + @Suspendable + override fun call(): Int { + val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java) + return vaultWatcherService.providedConfig.sleep + } +} + +@StartableByRPC +class LockEverythingGetValue(val tokenType: TokenType) : FlowLogic() { + @Suspendable + override fun call(): Long { + val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java) + val amount = Amount(Long.MAX_VALUE, tokenType) + val selectionId = UUID.randomUUID().toString() + var tokens: List>? = vaultWatcherService.selectTokens( + Holder.TokenOnly(), amount, + allowShortfall = true, + selectionId = selectionId + ) + + val value = tokens?.map { it.state.data.amount.quantity }?.sum() + + tokens?.forEach { + vaultWatcherService.unlockToken( + it, selectionId + ) + } + + // just to make sure that tokens is not checkpointed anywhere + tokens = null + + return value!! + } } \ No newline at end of file diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt index d598dd02..151f64f4 100644 --- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt +++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt @@ -8,6 +8,7 @@ import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelectio import com.r3.corda.lib.tokens.selection.memory.config.CACHE_SIZE_DEFAULT import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import com.typesafe.config.ConfigFactory import net.corda.core.identity.CordaX500Name @@ -58,14 +59,14 @@ class ConfigSelectionTest { val config = ConfigFactory.parseString("stateSelection {\n" + "inMemory {\n" + "cacheSize: 9000\n" + - "indexingStrategies: [${VaultWatcherService.IndexingType.PUBLIC_KEY}]\n" + + "indexingStrategies: [${IndexingType.PUBLIC_KEY}]\n" + "}\n" + "}") val cordappConfig = TypesafeCordappConfig(config) val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig) assertThat(inMemoryConfig.cacheSize).isEqualTo(9000) - assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.PUBLIC_KEY)) + assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.PUBLIC_KEY)) val selection = ConfigSelection.getPreferredSelection(services, cordappConfig) assertThat(selection).isInstanceOf(LocalTokenSelector::class.java) @@ -76,13 +77,13 @@ class ConfigSelectionTest { val config = ConfigFactory.parseString("stateSelection {\n" + "inMemory {\n" + "cacheSize: 9000\n" + - "indexingStrategies: [\"${VaultWatcherService.IndexingType.EXTERNAL_ID}\", \"${VaultWatcherService.IndexingType.PUBLIC_KEY}\"]\n" + + "indexingStrategies: [\"${IndexingType.EXTERNAL_ID}\", \"${IndexingType.PUBLIC_KEY}\"]\n" + "}\n" + "}") val cordappConfig = TypesafeCordappConfig(config) val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig) assertThat(inMemoryConfig.cacheSize).isEqualTo(9000) - assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.EXTERNAL_ID, VaultWatcherService.IndexingType.PUBLIC_KEY)) + assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.EXTERNAL_ID, IndexingType.PUBLIC_KEY)) } @Test diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt index bc8bdb48..c16b1088 100644 --- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt +++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt @@ -12,6 +12,7 @@ import com.r3.corda.lib.tokens.money.GBP import com.r3.corda.lib.tokens.selection.InsufficientBalanceException import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig import com.r3.corda.lib.tokens.selection.memory.internal.Holder +import com.r3.corda.lib.tokens.selection.memory.services.IndexingType import com.r3.corda.lib.tokens.selection.memory.services.TokenObserver import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens @@ -205,14 +206,14 @@ class VaultWatcherServiceTest { } }.toMap() - val ownerProvider = object : (StateAndRef, VaultWatcherService.IndexingType) -> Holder { - override fun invoke(tokenState: StateAndRef, indexingType: VaultWatcherService.IndexingType): Holder { + val ownerProvider = object : (StateAndRef, IndexingType) -> Holder { + override fun invoke(tokenState: StateAndRef, indexingType: IndexingType): Holder { return when (indexingType) { - VaultWatcherService.IndexingType.EXTERNAL_ID -> { + IndexingType.EXTERNAL_ID -> { Holder.MappedIdentity(keyToAccount[tokenState.state.data.holder.owningKey] ?: error("should never happen")) } - VaultWatcherService.IndexingType.PUBLIC_KEY -> { + IndexingType.PUBLIC_KEY -> { Holder.KeyIdentity(tokenState.state.data.holder.owningKey) } }