From a6c9a5aad3208042ee573c3021ca7ca9f6995c1a Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Fri, 3 Dec 2021 15:02:19 +0100
Subject: [PATCH 01/10] make loading more async

---
 freighter-tests/build.gradle                  |   2 +-
 .../testing/NullHolderOnObserverTest.kt       |  10 +-
 .../SelectionUtilities.kt                     |   5 +
 .../memory/config/InMemorySelectionConfig.kt  | 105 +--
 .../memory/services/VaultWatcherService.kt    | 638 ++++++++++--------
 5 files changed, 408 insertions(+), 352 deletions(-)

diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle
index 8c31e3bd..69a1a380 100644
--- a/freighter-tests/build.gradle
+++ b/freighter-tests/build.gradle
@@ -37,7 +37,7 @@ configurations {
 
 dependencies {
     freighterTestCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
-    freighterTestCompile "freighter:freighter-testing-core-junit5:0.7.3-TEST-SNAPSHOT"
+    freighterTestCompile "freighter:freighter-testing-core-junit5:0.9.0-SNAPSHOT"
 
     freighterTestCompile project(":contracts")
     freighterTestCompile project(":workflows")
diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt
index deacd504..ef3bdc87 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/NullHolderOnObserverTest.kt
@@ -67,12 +67,6 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() {
 		runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.MS_SQL)
 	}
 
-	@Test
-	@OracleTest
-	fun `tokens can be observed on node that does not know CI running oracle 12 r2`() {
-		runTokensOnNodeRunningDatabase(DeploymentMachineProvider.DatabaseType.ORACLE_12_R2)
-	}
-
 	private fun runTokensOnNodeRunningDatabase(db: DeploymentMachineProvider.DatabaseType) {
 		val randomString = generateRandomString()
 		val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword)
@@ -83,7 +77,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() {
 				.withCordapp(modernCiV1)
 				.withCordapp(freighterHelperCordapp)
 				.withDatabase(machineProvider.requestDatabase(db))
-		).withVersion(UnitOfDeployment.CORDA_4_6)
+		).withVersion(UnitOfDeployment.CORDA_4_7)
 			.deploy(deploymentContext)
 
 		val node2 = SingleNodeDeployment(
@@ -93,7 +87,7 @@ class NullHolderOnObserverTest : DockerRemoteMachineBasedTest() {
 				.withCordapp(modernCiV1)
 				.withCordapp(freighterHelperCordapp)
 				.withDatabase(machineProvider.requestDatabase(db))
-		).withVersion(UnitOfDeployment.CORDA_4_6)
+		).withVersion(UnitOfDeployment.CORDA_4_7)
 			.deploy(deploymentContext)
 
 		val nodeMachine1 = node1.getOrThrow().nodeMachines.single()
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt
index 4cbea217..405cdfa2 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/SelectionUtilities.kt
@@ -19,6 +19,11 @@ internal fun sortByStateRefAscending(): Sort {
     return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC)))
 }
 
+internal fun sortByTimeStampAscending(): Sort {
+    val sortAttribute = SortAttribute.Standard(Sort.VaultStateAttribute.RECORDED_TIME)
+    return Sort(setOf(Sort.SortColumn(sortAttribute, Sort.Direction.ASC)))
+}
+
 // Returns all held token amounts of a specified token with given issuer.
 // We need to discriminate on the token type as well as the symbol as different tokens might use the same symbols.
 @Suspendable
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
index cf8a1b65..79b814f5 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
@@ -9,63 +9,68 @@ import net.corda.core.cordapp.CordappConfigException
 import net.corda.core.node.ServiceHub
 import org.slf4j.LoggerFactory
 
-const val CACHE_SIZE_DEFAULT = 1024 // TODO Return good default, for now it's not wired, it will be done in separate PR.
+const val CACHE_SIZE_DEFAULT = 1024
+const val PAGE_SIZE_DEFAULT = 1024
 
-data class InMemorySelectionConfig @JvmOverloads constructor(val enabled: Boolean,
-                                   val indexingStrategies: List<VaultWatcherService.IndexingType>,
-                                   val cacheSize: Int = CACHE_SIZE_DEFAULT) : StateSelectionConfig {
-    companion object {
-        private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
+data class InMemorySelectionConfig @JvmOverloads constructor(
+    val enabled: Boolean,
+    val indexingStrategies: List<VaultWatcherService.IndexingType>,
+    val cacheSize: Int = CACHE_SIZE_DEFAULT,
+    val pageSize: Int = 1000
+) : StateSelectionConfig {
+	companion object {
+		private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
 
-        @JvmStatic
-        fun parse(config: CordappConfig): InMemorySelectionConfig {
-            val enabled = if (!config.exists("stateSelection.inMemory.enabled")) {
-                logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this")
-                true
-            } else {
-                config.getBoolean("stateSelection.inMemory.enabled")
-            }
-            val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize")
-                    ?: CACHE_SIZE_DEFAULT
-            val indexingType = try {
-                (config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
-            } catch (e: CordappConfigException) {
-                logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
-                emptyList<VaultWatcherService.IndexingType>()
-            } catch (e: ClassCastException) {
-                logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
-                emptyList<VaultWatcherService.IndexingType>()
-            }
-            logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
-            return InMemorySelectionConfig(enabled, indexingType, cacheSize)
-        }
+		@JvmStatic
+		fun parse(config: CordappConfig): InMemorySelectionConfig {
+			val enabled = if (!config.exists("stateSelection.inMemory.enabled")) {
+				logger.warn("Did not detect a configuration for InMemory selection - enabling memory usage for token indexing. Please set stateSelection.inMemory.enabled to \"false\" to disable this")
+				true
+			} else {
+				config.getBoolean("stateSelection.inMemory.enabled")
+			}
+			val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize")
+				?: CACHE_SIZE_DEFAULT
+            val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.cacheSize")?: PAGE_SIZE_DEFAULT
+			val indexingType = try {
+				(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
+			} catch (e: CordappConfigException) {
+				logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
+				emptyList<VaultWatcherService.IndexingType>()
+			} catch (e: ClassCastException) {
+				logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
+				emptyList<VaultWatcherService.IndexingType>()
+			}
+			logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
+			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize)
+		}
 
-        fun defaultConfig(): InMemorySelectionConfig {
-            return InMemorySelectionConfig(true, emptyList())
-        }
-    }
+		fun defaultConfig(): InMemorySelectionConfig {
+			return InMemorySelectionConfig(true, emptyList())
+		}
+	}
 
-    @Suspendable
-    override fun toSelector(services: ServiceHub): LocalTokenSelector {
-        return try {
-            val vaultObserver = services.cordaService(VaultWatcherService::class.java)
-            LocalTokenSelector(services, vaultObserver, state = null)
-        } catch (e: IllegalArgumentException) {
-            throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.")
-        }
-    }
+	@Suspendable
+	override fun toSelector(services: ServiceHub): LocalTokenSelector {
+		return try {
+			val vaultObserver = services.cordaService(VaultWatcherService::class.java)
+			LocalTokenSelector(services, vaultObserver, state = null)
+		} catch (e: IllegalArgumentException) {
+			throw IllegalArgumentException("Couldn't find VaultWatcherService in CordaServices, please make sure that it was installed in node.")
+		}
+	}
 }
 
 // Helpers for configuration parsing.
 
 fun CordappConfig.getIntOrNull(path: String): Int? {
-    return try {
-        getInt(path)
-    } catch (e: CordappConfigException) {
-        if (exists(path)) {
-            throw IllegalArgumentException("Provide correct database selection configuration for config path: $path")
-        } else {
-            null
-        }
-    }
+	return try {
+		getInt(path)
+	} catch (e: CordappConfigException) {
+		if (exists(path)) {
+			throw IllegalArgumentException("Provide correct database selection configuration for config path: $path")
+		} else {
+			null
+		}
+	}
 }
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
index dad687c0..e8941ff8 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
@@ -4,18 +4,22 @@ import com.r3.corda.lib.tokens.contracts.states.FungibleToken
 import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType
 import com.r3.corda.lib.tokens.contracts.types.TokenType
 import com.r3.corda.lib.tokens.contracts.utilities.withoutIssuer
-import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
 import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
 import com.r3.corda.lib.tokens.selection.InsufficientNotLockedBalanceException
+import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
 import com.r3.corda.lib.tokens.selection.memory.internal.Holder
 import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey
 import com.r3.corda.lib.tokens.selection.sortByStateRefAscending
+import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending
+import io.github.classgraph.ClassGraph
+import io.github.classgraph.ScanResult
 import net.corda.core.contracts.Amount
 import net.corda.core.contracts.StateAndRef
 import net.corda.core.internal.uncheckedCast
 import net.corda.core.node.AppServiceHub
 import net.corda.core.node.services.CordaService
 import net.corda.core.node.services.Vault
+import net.corda.core.node.services.queryBy
 import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
 import net.corda.core.node.services.vault.PageSpecification
 import net.corda.core.node.services.vault.QueryCriteria
@@ -26,312 +30,360 @@ import java.time.Duration
 import java.util.concurrent.*
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.function.Function
+import java.util.function.Supplier
 import kotlin.concurrent.read
 import kotlin.concurrent.write
 
-val UPDATER: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
 val EMPTY_BUCKET = TokenBucket()
 
 const val PLACE_HOLDER: String = "THIS_IS_A_PLACE_HOLDER"
 
 @CordaService
-class VaultWatcherService(private val tokenObserver: TokenObserver,
-                          private val providedConfig: InMemorySelectionConfig) : SingletonSerializeAsToken() {
-
-    private val __backingMap: ConcurrentMap<StateAndRef<FungibleToken>, String> = ConcurrentHashMap()
-    private val __indexed: ConcurrentMap<Class<out Holder>, ConcurrentMap<TokenIndex, TokenBucket>> = ConcurrentHashMap(
-            providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap<TokenIndex, TokenBucket>() }.toMap()
-    )
-
-    private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock()
-
-    enum class IndexingType(val ownerType: Class<out Holder>) {
-
-        EXTERNAL_ID(Holder.MappedIdentity::class.java),
-        PUBLIC_KEY(Holder.KeyIdentity::class.java);
-
-        companion object {
-            fun fromHolder(holder: Class<out Holder>): IndexingType {
-                return when (holder) {
-                    Holder.MappedIdentity::class.java -> {
-                        EXTERNAL_ID
-                    }
-
-                    Holder.KeyIdentity::class.java -> {
-                        PUBLIC_KEY;
-                    }
-                    else -> throw IllegalArgumentException("Unknown Holder type: $holder")
-                }
-            }
-        }
-
-    }
-
-    constructor(appServiceHub: AppServiceHub) : this(getObservableFromAppServiceHub(appServiceHub), InMemorySelectionConfig.parse(appServiceHub.getAppContext().config))
-
-    companion object {
-        val LOG = contextLogger()
-
-        private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
-            val config = appServiceHub.cordappProvider.getAppContext().config
-            val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
-
-            if (!configOptions.enabled) {
-                LOG.info("Disabling inMemory token selection - refer to documentation on how to enable")
-                return TokenObserver(emptyList(), Observable.empty(), { _, _ ->
-                    Holder.UnmappedIdentity()
-                })
-            }
-
-            val ownerProvider: (StateAndRef<FungibleToken>, IndexingType) -> Holder = { token, indexingType ->
-                when (indexingType) {
-                    IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey)
-                    IndexingType.EXTERNAL_ID -> {
-                        val owningKey = token.state.data.holder.owningKey
-                        lookupExternalIdFromKey(owningKey, appServiceHub)
-                    }
-                }
-            }
-
-
-            val pageSize = 1000
-            var currentPage = DEFAULT_PAGE_NUM
-            val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
-                    contractStateType = FungibleToken::class.java,
-                    paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize),
-                    criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL),
-                    sorting = sortByStateRefAscending())
-
-            // we use the UPDATER thread for two reasons
-            // 1 this means we return the service before all states are loaded, and so do not hold up the node startup
-            // 2 because all updates to the cache (addition / removal) are also done via UPDATER, this means that until we have finished loading all updates are buffered preventing out of order updates
-            val asyncLoader = object : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
-                override fun invoke(callback: (Vault.Update<FungibleToken>) -> Unit) {
-                    LOG.info("Starting async token loading from vault")
-                    UPDATER.submit {
-                        try {
-                            var shouldLoop = true
-                            while (shouldLoop) {
-                                val newlyLoadedStates = appServiceHub.vaultService.queryBy(
-                                        contractStateType = FungibleToken::class.java,
-                                        paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize),
-                                        criteria = QueryCriteria.VaultQueryCriteria(),
-                                        sorting = sortByStateRefAscending()
-                                ).states.toSet()
-                                LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
-                                callback(Vault.Update(emptySet(), newlyLoadedStates))
-                                shouldLoop = newlyLoadedStates.isNotEmpty()
-                                LOG.debug("shouldLoop=${shouldLoop}")
-                                currentPage++
-                            }
-                            LOG.info("finished token loading")
-                        } catch (t: Throwable) {
-                            LOG.error("Token Loading Failed due to: ", t)
-                        }
-                    }
-                }
-            }
-            return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
-        }
-    }
-
-    init {
-        addTokensToCache(tokenObserver.initialValues)
-        tokenObserver.source.doOnError {
-            LOG.error("received error from observable", it)
-        }
-        tokenObserver.startLoading(::onVaultUpdate)
-        tokenObserver.source.subscribe(::onVaultUpdate)
-    }
-
-    private fun processToken(token: StateAndRef<FungibleToken>, indexingType: IndexingType): TokenIndex {
-        val owner = tokenObserver.ownerProvider(token, indexingType)
-        val type = token.state.data.amount.token.tokenType.tokenClass
-        val typeId = token.state.data.amount.token.tokenType.tokenIdentifier
-        return TokenIndex(owner, type, typeId)
-    }
-
-    private fun onVaultUpdate(t: Vault.Update<FungibleToken>) {
-        LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states")
-        try {
-            removeTokensFromCache(t.consumed)
-            addTokensToCache(t.produced)
-        } catch (t: Throwable) {
-            //we DO NOT want to kill the observable - as a single exception will terminate the feed
-            LOG.error("Failure during token cache update", t)
-        }
-    }
-
-    private fun removeTokensFromCache(stateAndRefs: Collection<StateAndRef<FungibleToken>>) {
-        indexViewCreationLock.read {
-            for (stateAndRef in stateAndRefs) {
-                val existingMark = __backingMap.remove(stateAndRef)
-                existingMark
-                        ?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours")
-                for (key in __indexed.keys) {
-                    val index = processToken(stateAndRef, IndexingType.fromHolder(key))
-                    val indexedViewForHolder = __indexed[key]
-                    indexedViewForHolder
-                            ?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views")
-
-                    val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index)
-                    bucketForIndex?.remove(stateAndRef)
-                }
-            }
-        }
-    }
-
-    private fun addTokensToCache(stateAndRefs: Collection<StateAndRef<FungibleToken>>) {
-        indexViewCreationLock.read {
-            for (stateAndRef in stateAndRefs) {
-                val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER)
-                existingMark?.let {
-                    LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours")
-                }
-                for (key in __indexed.keys) {
-                    val index = processToken(stateAndRef, IndexingType.fromHolder(key))
-                    val indexedViewForHolder = __indexed[key]
-                            ?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views")
-                    val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) {
-                        TokenBucket()
-                    }
-                    bucketForIndex.add(stateAndRef)
-                }
-            }
-        }
-    }
-
-    private fun getOrCreateIndexViewForHolderType(holderType: Class<out Holder>): ConcurrentMap<TokenIndex, TokenBucket> {
-        return __indexed[holderType] ?: indexViewCreationLock.write {
-            __indexed[holderType] ?: generateNewIndexedView(holderType)
-        }
-    }
-
-    private fun generateNewIndexedView(holderType: Class<out Holder>): ConcurrentMap<TokenIndex, TokenBucket> {
-        val indexedViewForHolder: ConcurrentMap<TokenIndex, TokenBucket> = ConcurrentHashMap()
-        for (stateAndRef in __backingMap.keys) {
-            val index = processToken(stateAndRef, IndexingType.fromHolder(holderType))
-            val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) {
-                TokenBucket()
-            }
-            bucketForIndex.add(stateAndRef)
-        }
-        __indexed[holderType] = indexedViewForHolder
-        return indexedViewForHolder
-    }
-
-    fun lockTokensExternal(list: List<StateAndRef<FungibleToken>>, knownSelectionId: String) {
-        list.forEach {
-            __backingMap.replace(it, PLACE_HOLDER, knownSelectionId)
-        }
-    }
-
-    fun selectTokens(
-            owner: Holder,
-            requiredAmount: Amount<TokenType>,
-            predicate: ((StateAndRef<FungibleToken>) -> Boolean) = { true },
-            allowShortfall: Boolean = false,
-            autoUnlockDelay: Duration = Duration.ofMinutes(5),
-            selectionId: String
-    ): List<StateAndRef<FungibleToken>> {
-        //we have to handle both cases
-        //1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType
-        //2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer.
-        val enrichedPredicate: AtomicReference<(StateAndRef<FungibleToken>) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) {
-            val issuer = (requiredAmount.token as IssuedTokenType).issuer
-            { token ->
-                predicate(token) && token.state.data.issuer == issuer
-            }
-        } else {
-            predicate
-        })
-
-        val lockedTokens = mutableListOf<StateAndRef<FungibleToken>>()
-        val bucket: Iterable<StateAndRef<FungibleToken>> = if (owner is Holder.TokenOnly) {
-            val currentPredicate = enrichedPredicate.get()
-            //why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault
-            //so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type
-            //we will revisit in future if this assumption turns out to be wrong
-            enrichedPredicate.set {
-                val stateTokenType = it.state.data.tokenType
-                currentPredicate(it) &&
-                        stateTokenType.fractionDigits == requiredAmount.token.fractionDigits &&
-                        requiredAmount.token.tokenClass == stateTokenType.tokenClass &&
-                        requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier
-            }
-            __backingMap.keys
-        } else {
-            val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass)
-            getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView)
-        }
-
-        val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer()
-        var amountLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
-        // this is the running total of soft locked tokens that we encounter until the target token amount is reached
-        var amountAlreadySoftLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
-        val finalPredicate = enrichedPredicate.get()
-        for (tokenStateAndRef in bucket) {
-            // Does the token satisfy the (optional) predicate eg. issuer?
-            if (finalPredicate.invoke(tokenStateAndRef)) {
-                val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer())
-                // if so, race to lock the token, expected oldValue = PLACE_HOLDER
-                if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) {
-                    // we won the race to lock this token
-                    lockedTokens.add(tokenStateAndRef)
-                    amountLocked += tokenAmount
-                    if (amountLocked >= requiredAmountWithoutIssuer) {
-                        break
-                    }
-                } else {
-                    amountAlreadySoftLocked += tokenAmount
-                }
-            }
-        }
-
-        if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) {
-            lockedTokens.forEach {
-                unlockToken(it, selectionId)
-            }
-            if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) {
-                throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
-            } else {
-                throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.")
-            }
-        }
-
-        UPDATER.schedule({
-            lockedTokens.forEach {
-                unlockToken(it, selectionId)
-            }
-        }, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS)
-
-        return uncheckedCast(lockedTokens)
-    }
-
-    fun unlockToken(it: StateAndRef<FungibleToken>, selectionId: String) {
-        __backingMap.replace(it, selectionId, PLACE_HOLDER)
-    }
-
-    private fun getTokenBucket(idx: Holder,
-                               tokenClass: Class<*>,
-                               tokenIdentifier: String,
-                               mapToSelectFrom: ConcurrentMap<TokenIndex, TokenBucket>): TokenBucket {
-        return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET
-    }
+class VaultWatcherService(
+	private val tokenObserver: TokenObserver,
+	private val providedConfig: InMemorySelectionConfig
+) : SingletonSerializeAsToken() {
+
+	private val __backingMap: ConcurrentMap<StateAndRef<FungibleToken>, String> = ConcurrentHashMap()
+	private val __indexed: ConcurrentMap<Class<out Holder>, ConcurrentMap<TokenIndex, TokenBucket>> = ConcurrentHashMap(
+		providedConfig.indexingStrategies.map { it.ownerType to ConcurrentHashMap<TokenIndex, TokenBucket>() }.toMap()
+	)
+
+	private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock()
+	private val UPDATER = Executors.newSingleThreadScheduledExecutor()
+
+	enum class IndexingType(val ownerType: Class<out Holder>) {
+
+		EXTERNAL_ID(Holder.MappedIdentity::class.java),
+		PUBLIC_KEY(Holder.KeyIdentity::class.java);
+
+		companion object {
+			fun fromHolder(holder: Class<out Holder>): IndexingType {
+				return when (holder) {
+					Holder.MappedIdentity::class.java -> {
+						EXTERNAL_ID
+					}
+
+					Holder.KeyIdentity::class.java -> {
+						PUBLIC_KEY
+					}
+					else -> throw IllegalArgumentException("Unknown Holder type: $holder")
+				}
+			}
+		}
+
+	}
+
+	constructor(appServiceHub: AppServiceHub) : this(
+		getObservableFromAppServiceHub(appServiceHub),
+		InMemorySelectionConfig.parse(appServiceHub.getAppContext().config)
+	)
+
+	companion object {
+		val LOG = contextLogger()
+
+		private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
+			val updaterThread = Executors.newSingleThreadScheduledExecutor()
+			val config = appServiceHub.cordappProvider.getAppContext().config
+			val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
+
+			if (!configOptions.enabled) {
+				LOG.info("Disabling inMemory token selection - refer to documentation on how to enable")
+				return TokenObserver(emptyList(), Observable.empty(), { _, _ ->
+					Holder.UnmappedIdentity()
+				})
+			}
+
+			val ownerProvider: (StateAndRef<FungibleToken>, IndexingType) -> Holder = { token, indexingType ->
+				when (indexingType) {
+					IndexingType.PUBLIC_KEY -> Holder.KeyIdentity(token.state.data.holder.owningKey)
+					IndexingType.EXTERNAL_ID -> {
+						val owningKey = token.state.data.holder.owningKey
+						lookupExternalIdFromKey(owningKey, appServiceHub)
+					}
+				}
+			}
+
+
+			val pageSize = configOptions.pageSize
+			var currentPage = DEFAULT_PAGE_NUM
+			val asyncLoader = object : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
+				override fun invoke(callback: (Vault.Update<FungibleToken>) -> Unit) {
+					LOG.info("Starting async token loading from vault")
+
+					val classGraph = ClassGraph()
+					classGraph.enableClassInfo()
+
+					val scanResultFuture = CompletableFuture.supplyAsync(Supplier {
+						classGraph.scan()
+					}, updaterThread)
+
+					scanResultFuture.thenApplyAsync(Function<ScanResult, Unit> { scanResult ->
+						val subclasses : Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
+							.map { it.name }
+							.map { Class.forName(it) as Class<out FungibleToken> }.toSet()
+
+						val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
+						LOG.info("Enriching token query with types: $enrichedClasses")
+						updaterThread.submit {
+							LOG.info("Querying for tokens of types: $subclasses")
+							try {
+								var shouldLoop = true
+								while (shouldLoop) {
+									val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
+										paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize),
+										criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
+										sorting = sortByTimeStampAscending()
+									).states.toSet()
+									callback(Vault.Update(emptySet(), newlyLoadedStates))
+									LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
+									shouldLoop = newlyLoadedStates.isNotEmpty()
+									LOG.debug("shouldLoop=${shouldLoop}")
+									currentPage++
+								}
+								LOG.info("finished token loading")
+							} catch (t: Throwable) {
+								LOG.error("Token Loading Failed due to: ", t)
+							}
+						}
+					}, updaterThread)
+				}
+			}
+
+			val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
+				contractStateType = FungibleToken::class.java,
+				paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1),
+				criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL),
+				sorting = sortByStateRefAscending()
+			)
+
+
+			return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
+		}
+	}
+
+	init {
+		addTokensToCache(tokenObserver.initialValues)
+		tokenObserver.source.doOnError {
+			LOG.error("received error from observable", it)
+		}
+		tokenObserver.startLoading(::onVaultUpdate)
+		tokenObserver.source.subscribe(::onVaultUpdate)
+	}
+
+	private fun processToken(token: StateAndRef<FungibleToken>, indexingType: IndexingType): TokenIndex {
+		val owner = tokenObserver.ownerProvider(token, indexingType)
+		val type = token.state.data.amount.token.tokenType.tokenClass
+		val typeId = token.state.data.amount.token.tokenType.tokenIdentifier
+		return TokenIndex(owner, type, typeId)
+	}
+
+	fun onVaultUpdate(t: Vault.Update<FungibleToken>) {
+		UPDATER.submit {
+			LOG.info("received token vault update with ${t.consumed.size} consumed states and: ${t.produced.size} produced states")
+			try {
+				removeTokensFromCache(t.consumed)
+				addTokensToCache(t.produced)
+			} catch (t: Throwable) {
+				//we DO NOT want to kill the observable - as a single exception will terminate the feed
+				LOG.error("Failure during token cache update", t)
+			}
+		}
+	}
+
+	private fun removeTokensFromCache(stateAndRefs: Collection<StateAndRef<FungibleToken>>) {
+		indexViewCreationLock.read {
+			for (stateAndRef in stateAndRefs) {
+				val existingMark = __backingMap.remove(stateAndRef)
+				existingMark
+					?: LOG.warn("Attempted to remove existing token ${stateAndRef.ref}, but it was not found this suggests incorrect vault behaviours")
+				for (key in __indexed.keys) {
+					val index = processToken(stateAndRef, IndexingType.fromHolder(key))
+					val indexedViewForHolder = __indexed[key]
+					indexedViewForHolder
+						?: LOG.warn("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views")
+
+					val bucketForIndex: TokenBucket? = indexedViewForHolder?.get(index)
+					bucketForIndex?.remove(stateAndRef)
+				}
+			}
+		}
+	}
+
+	private fun addTokensToCache(stateAndRefs: Collection<StateAndRef<FungibleToken>>) {
+		indexViewCreationLock.read {
+			for (stateAndRef in stateAndRefs) {
+				if (stateAndRef.state.encumbrance != null){
+					continue
+				}
+				val existingMark = __backingMap.putIfAbsent(stateAndRef, PLACE_HOLDER)
+				existingMark?.let {
+					LOG.warn("Attempted to overwrite existing token ${stateAndRef.ref}, this suggests incorrect vault behaviours")
+				}
+				for (key in __indexed.keys) {
+					val index = processToken(stateAndRef, IndexingType.fromHolder(key))
+					val indexedViewForHolder = __indexed[key]
+						?: throw IllegalStateException("tried to obtain an indexed view for holder type: $key but was not found in set of indexed views")
+					val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) {
+						TokenBucket()
+					}
+					bucketForIndex.add(stateAndRef)
+				}
+			}
+		}
+	}
+
+	private fun getOrCreateIndexViewForHolderType(holderType: Class<out Holder>): ConcurrentMap<TokenIndex, TokenBucket> {
+		return __indexed[holderType] ?: indexViewCreationLock.write {
+			__indexed[holderType] ?: generateNewIndexedView(holderType)
+		}
+	}
+
+	private fun generateNewIndexedView(holderType: Class<out Holder>): ConcurrentMap<TokenIndex, TokenBucket> {
+		val indexedViewForHolder: ConcurrentMap<TokenIndex, TokenBucket> = ConcurrentHashMap()
+		for (stateAndRef in __backingMap.keys) {
+			val index = processToken(stateAndRef, IndexingType.fromHolder(holderType))
+			val bucketForIndex: TokenBucket = indexedViewForHolder.computeIfAbsent(index) {
+				TokenBucket()
+			}
+			bucketForIndex.add(stateAndRef)
+		}
+		__indexed[holderType] = indexedViewForHolder
+		return indexedViewForHolder
+	}
+
+	fun lockTokensExternal(list: List<StateAndRef<FungibleToken>>, knownSelectionId: String, autoUnlockDelay: Duration? = null) {
+		list.forEach {
+			__backingMap.replace(it, PLACE_HOLDER, knownSelectionId)
+		}
+
+		if (autoUnlockDelay != null) {
+			UPDATER.schedule({
+				list.forEach {
+					unlockToken(it, knownSelectionId)
+				}
+			}, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS)
+		}
+	}
+
+	fun selectTokens(
+		owner: Holder,
+		requiredAmount: Amount<TokenType>,
+		predicate: ((StateAndRef<FungibleToken>) -> Boolean) = { true },
+		allowShortfall: Boolean = false,
+		autoUnlockDelay: Duration = Duration.ofMinutes(5),
+		selectionId: String
+	): List<StateAndRef<FungibleToken>> {
+		//we have to handle both cases
+		//1 when passed a raw TokenType - it's likely that the selecting entity does not care about the issuer and so we cannot constrain all selections to using IssuedTokenType
+		//2 when passed an IssuedTokenType - it's likely that the selecting entity does care about the issuer, and so we must filter all tokens which do not match the issuer.
+		val enrichedPredicate: AtomicReference<(StateAndRef<FungibleToken>) -> Boolean> = AtomicReference(if (requiredAmount.token is IssuedTokenType) {
+			val issuer = (requiredAmount.token as IssuedTokenType).issuer
+			{ token ->
+				predicate(token) && token.state.data.issuer == issuer
+			}
+		} else {
+			predicate
+		})
+
+		val lockedTokens = mutableListOf<StateAndRef<FungibleToken>>()
+		val bucket: Iterable<StateAndRef<FungibleToken>> = if (owner is Holder.TokenOnly) {
+			val currentPredicate = enrichedPredicate.get()
+			//why do we do this? It doesn't really make sense to index on token type, as it's very likely that there will be very few types of tokens in a given vault
+			//so instead of relying on an indexed view, we can create a predicate on the fly which will constrain the selection to the correct token type
+			//we will revisit in future if this assumption turns out to be wrong
+			enrichedPredicate.set {
+				val stateTokenType = it.state.data.tokenType
+				currentPredicate(it) &&
+						stateTokenType.fractionDigits == requiredAmount.token.fractionDigits &&
+						requiredAmount.token.tokenClass == stateTokenType.tokenClass &&
+						requiredAmount.token.tokenIdentifier == stateTokenType.tokenIdentifier
+			}
+			__backingMap.keys
+		} else {
+			val indexedView = getOrCreateIndexViewForHolderType(owner.javaClass)
+			getTokenBucket(owner, requiredAmount.token.tokenClass, requiredAmount.token.tokenIdentifier, indexedView)
+		}
+
+		val requiredAmountWithoutIssuer = requiredAmount.withoutIssuer()
+		var amountLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
+		// this is the running total of soft locked tokens that we encounter until the target token amount is reached
+		var amountAlreadySoftLocked: Amount<TokenType> = requiredAmountWithoutIssuer.copy(quantity = 0)
+		val finalPredicate = enrichedPredicate.get()
+		for (tokenStateAndRef in bucket) {
+			// Does the token satisfy the (optional) predicate eg. issuer?
+			if (finalPredicate.invoke(tokenStateAndRef)) {
+				val tokenAmount = uncheckedCast(tokenStateAndRef.state.data.amount.withoutIssuer())
+				// if so, race to lock the token, expected oldValue = PLACE_HOLDER
+				if (__backingMap.replace(tokenStateAndRef, PLACE_HOLDER, selectionId)) {
+					// we won the race to lock this token
+					lockedTokens.add(tokenStateAndRef)
+					amountLocked += tokenAmount
+					if (amountLocked >= requiredAmountWithoutIssuer) {
+						break
+					}
+				} else {
+					amountAlreadySoftLocked += tokenAmount
+				}
+			}
+		}
+
+		if (!allowShortfall && amountLocked < requiredAmountWithoutIssuer) {
+			lockedTokens.forEach {
+				unlockToken(it, selectionId)
+			}
+			if (amountLocked + amountAlreadySoftLocked < requiredAmountWithoutIssuer) {
+				throw InsufficientBalanceException("Insufficient spendable states identified for $requiredAmount.")
+			} else {
+				throw InsufficientNotLockedBalanceException("Insufficient not-locked spendable states identified for $requiredAmount.")
+			}
+		}
+
+		UPDATER.schedule({
+			lockedTokens.forEach {
+				unlockToken(it, selectionId)
+			}
+		}, autoUnlockDelay.toMillis(), TimeUnit.MILLISECONDS)
+
+		return uncheckedCast(lockedTokens)
+	}
+
+	fun unlockToken(it: StateAndRef<FungibleToken>, selectionId: String) {
+		__backingMap.replace(it, selectionId, PLACE_HOLDER)
+	}
+
+	fun isTokenLocked(it: StateAndRef<FungibleToken>, lockId: String? = null): Boolean {
+		return if (lockId != null) {
+			__backingMap[it] == lockId
+		} else __backingMap[it] != PLACE_HOLDER
+	}
+
+	private fun getTokenBucket(
+		idx: Holder,
+		tokenClass: Class<*>,
+		tokenIdentifier: String,
+		mapToSelectFrom: ConcurrentMap<TokenIndex, TokenBucket>
+	): TokenBucket {
+		return mapToSelectFrom[TokenIndex(idx, tokenClass, tokenIdentifier)] ?: EMPTY_BUCKET
+	}
 
 }
 
-class TokenObserver(val initialValues: List<StateAndRef<FungibleToken>>,
-                    val source: Observable<Vault.Update<FungibleToken>>,
-                    val ownerProvider: ((StateAndRef<FungibleToken>, VaultWatcherService.IndexingType) -> Holder),
-                    inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }) {
+class TokenObserver(
+	val initialValues: List<StateAndRef<FungibleToken>>,
+	val source: Observable<Vault.Update<FungibleToken>>,
+	val ownerProvider: ((StateAndRef<FungibleToken>, VaultWatcherService.IndexingType) -> Holder),
+	inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }
+) {
 
-    fun startLoading(loadingCallBack: (Vault.Update<FungibleToken>) -> Unit) {
-        asyncLoader(loadingCallBack)
-    }
+	fun startLoading(loadingCallBack: (Vault.Update<FungibleToken>) -> Unit) {
+		asyncLoader(loadingCallBack)
+	}
 }
 
-class TokenBucket(set: MutableSet<StateAndRef<FungibleToken>> = ConcurrentHashMap<StateAndRef<FungibleToken>, Boolean>().keySet(true)) : MutableSet<StateAndRef<FungibleToken>> by set
+class TokenBucket(set: MutableSet<StateAndRef<FungibleToken>> = ConcurrentHashMap<StateAndRef<FungibleToken>, Boolean>().keySet(true)) :
+	MutableSet<StateAndRef<FungibleToken>> by set
 
 
 data class TokenIndex(val owner: Holder, val tokenClazz: Class<*>, val tokenIdentifier: String)

From 787e4758ac76770805f5dcbb655348c9aff6a1ef Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Mon, 6 Dec 2021 14:32:28 +0100
Subject: [PATCH 02/10] add some testing for huge loading test

---
 freighter-tests/build.gradle                  |   1 +
 .../testing/HugeTokensLoadedOnRestartTest.kt  | 142 +++++++++++
 .../testing/TokensLoadedOnRestartTest.kt      | 138 +++++++++++
 .../memory/config/InMemorySelectionConfig.kt  |   8 +-
 .../memory/services/VaultWatcherService.kt    |  15 +-
 .../tokens/integration/workflows/TestFlows.kt | 221 +++++++++++-------
 6 files changed, 432 insertions(+), 93 deletions(-)
 create mode 100644 freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
 create mode 100644 freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt

diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle
index 69a1a380..7125a8dc 100644
--- a/freighter-tests/build.gradle
+++ b/freighter-tests/build.gradle
@@ -41,6 +41,7 @@ dependencies {
 
     freighterTestCompile project(":contracts")
     freighterTestCompile project(":workflows")
+    freighterTestCompile project(":workflows-integration-test")
 }
 
 
diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
new file mode 100644
index 00000000..e4c9d42e
--- /dev/null
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -0,0 +1,142 @@
+package freighter.testing
+
+import com.r3.corda.lib.tokens.contracts.states.FungibleToken
+import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType
+import com.r3.corda.lib.tokens.contracts.types.TokenType
+import com.r3.corda.lib.tokens.integration.workflows.GetSelectionPageSize
+import com.r3.corda.lib.tokens.integration.workflows.GetSelectionSleepDuration
+import com.r3.corda.lib.tokens.integration.workflows.LockEverythingGetValue
+import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens
+import com.stress.flows.CreateNewCIFlow
+import freighter.deployments.DeploymentContext
+import freighter.deployments.NodeBuilder
+import freighter.deployments.SingleNodeDeployment
+import freighter.deployments.UnitOfDeployment
+import freighter.machine.DeploymentMachineProvider
+import freighter.machine.generateRandomString
+import net.corda.core.contracts.Amount
+import net.corda.core.messaging.startFlow
+import net.corda.core.utilities.getOrThrow
+import org.hamcrest.MatcherAssert
+import org.hamcrest.Matchers.`is`
+import org.junit.jupiter.api.Test
+import utility.getOrThrow
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicLong
+import java.util.stream.StreamSupport
+import kotlin.streams.toList
+
+class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
+
+	val tokenCurrentContracts =
+		NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey()
+
+	val testFlows =
+		NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey()
+
+	val tokenCurrentWorkflows =
+		NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig(
+			"""
+				stateSelection.inMemory.enabled=true
+				stateSelection.inMemory.indexingStrategies=[EXTERNAL_ID, PUBLIC_KEY]
+				stateSelection.inMemory.pageSize=5000
+				stateSelection.inMemory.loadingSleepSeconds=-1
+			""".trimIndent().byteInputStream()
+		)
+
+	val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact(
+		group = "com.r3.corda.lib.ci",
+		artifact = "ci-workflows",
+		version = "1.0"
+	)
+
+	val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows")
+
+	@Test
+	fun `tokens can be loaded async during node startup on postgres 9_6`() {
+		run(DeploymentMachineProvider.DatabaseType.PG_9_6)
+	}
+
+	@Test
+	fun `tokens can be loaded async during node startup on H2`() {
+		run(DeploymentMachineProvider.DatabaseType.H2)
+	}
+
+	private fun run(db: DeploymentMachineProvider.DatabaseType) {
+		val randomString = generateRandomString()
+		val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword)
+		val node1 = SingleNodeDeployment(
+			NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString")
+				.withCordapp(tokenCurrentContracts)
+				.withCordapp(tokenCurrentWorkflows)
+				.withCordapp(modernCiV1)
+				.withCordapp(freighterHelperCordapp)
+				.withCordapp(testFlows)
+				.withDatabase(machineProvider.requestDatabase(db))
+		).withVersion(UnitOfDeployment.CORDA_4_7)
+			.deploy(deploymentContext)
+
+		val nodeMachine1 = node1.getOrThrow().nodeMachines.single()
+
+		val createdCi = nodeMachine1.rpc {
+			startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also {
+				println("Successfully created CI: $it")
+			}
+		}
+
+		val tokenType = TokenType("StefCoin", 2)
+		val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType)
+		val amount = Amount(1, issuedTokenType)
+		val tokenToIssue = (0..99).map { FungibleToken(amount, createdCi) }.toList()
+
+		val loadingPageSize = nodeMachine1.rpc {
+			startFlow(
+				::GetSelectionPageSize
+			).returnValue.getOrThrow()
+		}
+
+		MatcherAssert.assertThat(loadingPageSize, `is`(5000))
+
+		val loadingSleepDuration = nodeMachine1.rpc {
+			startFlow(
+				::GetSelectionSleepDuration
+			).returnValue.getOrThrow()
+		}
+
+		MatcherAssert.assertThat(loadingSleepDuration, `is`(-1))
+
+		val issuedNumber = AtomicLong(0)
+
+		val numberIssued = StreamSupport.stream((0..9999).chunked(1000).spliterator(), true).map { toIssue ->
+			repeat(toIssue.size) {
+				nodeMachine1.rpc {
+					startFlow(
+						::IssueTokens,
+						tokenToIssue, listOf()
+					).returnValue.getOrThrow(Duration.ofMinutes(1))
+				}
+				println("Total number issued: ${ issuedNumber.addAndGet(tokenToIssue.size * 1L) }")
+			}
+
+			toIssue.size
+		}.toList().sum()
+
+		//1 million states
+		nodeMachine1.stopNode()
+		println()
+		nodeMachine1.startNode()
+		(0..100).forEach { _ ->
+			val tokenValueLoadedInCache = nodeMachine1.rpc {
+				startFlow(
+					::LockEverythingGetValue,
+					tokenType
+				).returnValue.getOrThrow(Duration.ofMinutes(1))
+			}
+			println(tokenValueLoadedInCache)
+			Thread.sleep(1000)
+		}
+
+	}
+
+
+}
\ No newline at end of file
diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt
new file mode 100644
index 00000000..506d0745
--- /dev/null
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/TokensLoadedOnRestartTest.kt
@@ -0,0 +1,138 @@
+package freighter.testing
+
+import com.r3.corda.lib.tokens.contracts.states.FungibleToken
+import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType
+import com.r3.corda.lib.tokens.contracts.types.TokenType
+import com.r3.corda.lib.tokens.integration.workflows.GetSelectionPageSize
+import com.r3.corda.lib.tokens.integration.workflows.GetSelectionSleepDuration
+import com.r3.corda.lib.tokens.integration.workflows.LockEverythingGetValue
+import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens
+import com.stress.flows.CreateNewCIFlow
+import freighter.deployments.DeploymentContext
+import freighter.deployments.NodeBuilder
+import freighter.deployments.SingleNodeDeployment
+import freighter.deployments.UnitOfDeployment
+import freighter.machine.DeploymentMachineProvider
+import freighter.machine.generateRandomString
+import net.corda.core.contracts.Amount
+import net.corda.core.internal.stream
+import net.corda.core.messaging.startFlow
+import net.corda.core.utilities.getOrThrow
+import org.hamcrest.MatcherAssert
+import org.hamcrest.Matchers.`is`
+import org.junit.jupiter.api.Test
+import utility.getOrThrow
+import java.time.Duration
+import kotlin.streams.toList
+
+class TokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
+
+	val tokenCurrentContracts =
+		NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey()
+
+	val testFlows =
+		NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey()
+
+	val tokenCurrentWorkflows =
+		NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig(
+			"""
+				stateSelection.inMemory.enabled=true
+				stateSelection.inMemory.indexingStrategies=[EXTERNAL_ID, PUBLIC_KEY]
+				stateSelection.inMemory.pageSize=5
+				stateSelection.inMemory.loadingSleepSeconds=600
+			""".trimIndent().byteInputStream()
+		)
+
+	val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact(
+		group = "com.r3.corda.lib.ci",
+		artifact = "ci-workflows",
+		version = "1.0"
+	)
+
+	val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows")
+
+	@Test
+	fun `tokens can be loaded async during node startup on postgres 9_6`() {
+		run(DeploymentMachineProvider.DatabaseType.PG_9_6)
+	}
+
+	@Test
+	fun `tokens can be loaded async during node startup on H2`() {
+		run(DeploymentMachineProvider.DatabaseType.H2)
+	}
+
+	private fun run(db: DeploymentMachineProvider.DatabaseType) {
+		val randomString = generateRandomString()
+		val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword)
+		val node1 = SingleNodeDeployment(
+			NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString")
+				.withCordapp(tokenCurrentContracts)
+				.withCordapp(tokenCurrentWorkflows)
+				.withCordapp(modernCiV1)
+				.withCordapp(freighterHelperCordapp)
+				.withCordapp(testFlows)
+				.withDatabase(machineProvider.requestDatabase(db))
+		).withVersion(UnitOfDeployment.CORDA_4_7)
+			.deploy(deploymentContext)
+
+		val nodeMachine1 = node1.getOrThrow().nodeMachines.single()
+
+		val createdCi = nodeMachine1.rpc {
+			startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also {
+				println("Successfully created CI: $it")
+			}
+		}
+
+		val tokenType = TokenType("StefCoin", 2)
+		val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType)
+		val amount = Amount(1, issuedTokenType)
+		val tokenToIssue1 = (0..100).map { FungibleToken(amount, createdCi) }.toList()
+
+		val loadingPageSize = nodeMachine1.rpc {
+			startFlow(
+				::GetSelectionPageSize
+			).returnValue.getOrThrow()
+		}
+
+		MatcherAssert.assertThat(loadingPageSize, `is`(5))
+
+		val loadingSleepDuration = nodeMachine1.rpc {
+			startFlow(
+				::GetSelectionSleepDuration
+			).returnValue.getOrThrow()
+		}
+
+		MatcherAssert.assertThat(loadingSleepDuration, `is`(600))
+
+
+		val issueTXs = (0..100).stream(true).mapToObj {
+			val issueTx = nodeMachine1.rpc {
+				startFlow(
+					::IssueTokens,
+					tokenToIssue1, listOf()
+				).returnValue.getOrThrow(Duration.ofMinutes(1))
+			}
+			issueTx
+		}.toList()
+
+
+		print(issueTXs)
+
+		nodeMachine1.stopNode()
+
+		println()
+
+		nodeMachine1.startNode()
+
+		val tokenValueLoadedInCache = nodeMachine1.rpc {
+			startFlow(
+				::LockEverythingGetValue,
+				tokenType
+			).returnValue.getOrThrow(Duration.ofMinutes(1))
+		}
+
+		MatcherAssert.assertThat(tokenValueLoadedInCache, `is`(5L))
+	}
+
+
+}
\ No newline at end of file
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
index 79b814f5..d662ddfe 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
@@ -16,7 +16,8 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
     val enabled: Boolean,
     val indexingStrategies: List<VaultWatcherService.IndexingType>,
     val cacheSize: Int = CACHE_SIZE_DEFAULT,
-    val pageSize: Int = 1000
+    val pageSize: Int = 1000,
+	val sleep: Int = 0
 ) : StateSelectionConfig {
 	companion object {
 		private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
@@ -31,7 +32,8 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
 			}
 			val cacheSize = config.getIntOrNull("stateSelection.inMemory.cacheSize")
 				?: CACHE_SIZE_DEFAULT
-            val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.cacheSize")?: PAGE_SIZE_DEFAULT
+            val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.pageSize")?: PAGE_SIZE_DEFAULT
+            val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0
 			val indexingType = try {
 				(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
 			} catch (e: CordappConfigException) {
@@ -42,7 +44,7 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
 				emptyList<VaultWatcherService.IndexingType>()
 			}
 			logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
-			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize)
+			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep)
 		}
 
 		fun defaultConfig(): InMemorySelectionConfig {
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
index e8941ff8..eefb9eaa 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
@@ -42,7 +42,7 @@ const val PLACE_HOLDER: String = "THIS_IS_A_PLACE_HOLDER"
 @CordaService
 class VaultWatcherService(
 	private val tokenObserver: TokenObserver,
-	private val providedConfig: InMemorySelectionConfig
+	val providedConfig: InMemorySelectionConfig
 ) : SingletonSerializeAsToken() {
 
 	private val __backingMap: ConcurrentMap<StateAndRef<FungibleToken>, String> = ConcurrentHashMap()
@@ -84,7 +84,7 @@ class VaultWatcherService(
 		val LOG = contextLogger()
 
 		private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
-			val updaterThread = Executors.newSingleThreadScheduledExecutor()
+			val loadingThread = Executors.newSingleThreadScheduledExecutor()
 			val config = appServiceHub.cordappProvider.getAppContext().config
 			val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
 
@@ -117,7 +117,7 @@ class VaultWatcherService(
 
 					val scanResultFuture = CompletableFuture.supplyAsync(Supplier {
 						classGraph.scan()
-					}, updaterThread)
+					}, loadingThread)
 
 					scanResultFuture.thenApplyAsync(Function<ScanResult, Unit> { scanResult ->
 						val subclasses : Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
@@ -126,7 +126,7 @@ class VaultWatcherService(
 
 						val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
 						LOG.info("Enriching token query with types: $enrichedClasses")
-						updaterThread.submit {
+						loadingThread.submit {
 							LOG.info("Querying for tokens of types: $subclasses")
 							try {
 								var shouldLoop = true
@@ -141,13 +141,18 @@ class VaultWatcherService(
 									shouldLoop = newlyLoadedStates.isNotEmpty()
 									LOG.debug("shouldLoop=${shouldLoop}")
 									currentPage++
+
+									if (configOptions.sleep > 0){
+										Thread.sleep(configOptions.sleep.toLong() * 1000)
+									}
+
 								}
 								LOG.info("finished token loading")
 							} catch (t: Throwable) {
 								LOG.error("Token Loading Failed due to: ", t)
 							}
 						}
-					}, updaterThread)
+					}, loadingThread)
 				}
 			}
 
diff --git a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
index 1f41aa9e..801bf32f 100644
--- a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
+++ b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
@@ -8,7 +8,9 @@ import com.r3.corda.lib.tokens.contracts.types.TokenPointer
 import com.r3.corda.lib.tokens.contracts.types.TokenType
 import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
 import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelection
+import com.r3.corda.lib.tokens.selection.memory.internal.Holder
 import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector
+import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
 import com.r3.corda.lib.tokens.testing.states.House
 import com.r3.corda.lib.tokens.workflows.flows.move.addMoveNonFungibleTokens
 import com.r3.corda.lib.tokens.workflows.flows.move.addMoveTokens
@@ -32,6 +34,8 @@ import net.corda.core.utilities.seconds
 import net.corda.core.utilities.unwrap
 import java.time.Duration
 import java.time.temporal.ChronoUnit
+import java.util.*
+import javax.swing.plaf.nimbus.State
 
 // This is very simple test flow for DvP.
 @CordaSerializable
@@ -40,121 +44,168 @@ private class DvPNotification(val amount: Amount<TokenType>)
 @StartableByRPC
 @InitiatingFlow
 class DvPFlow(val house: House, val newOwner: Party) : FlowLogic<SignedTransaction>() {
-    @Suspendable
-    override fun call(): SignedTransaction {
-        val txBuilder = TransactionBuilder(notary = getPreferredNotary(serviceHub))
-        addMoveNonFungibleTokens(txBuilder, serviceHub, house.toPointer<House>(), newOwner)
-        val session = initiateFlow(newOwner)
-        // Ask for input stateAndRefs - send notification with the amount to exchange.
-        session.send(DvPNotification(house.valuation))
-        // TODO add some checks for inputs and outputs
-        val inputs = subFlow(ReceiveStateAndRefFlow<FungibleToken>(session))
-        // Receive outputs (this is just quick and dirty, we could calculate them on our side of the flow).
-        val outputs = session.receive<List<FungibleToken>>().unwrap { it }
-        addMoveTokens(txBuilder, inputs, outputs)
-        // Synchronise any confidential identities
-        subFlow(SyncKeyMappingFlow(session, txBuilder.toWireTransaction(serviceHub)))
-        val ourSigningKeys = txBuilder.toLedgerTransaction(serviceHub).ourSigningKeys(serviceHub)
-        val initialStx = serviceHub.signInitialTransaction(txBuilder, signingPubKeys = ourSigningKeys)
-        val stx = subFlow(CollectSignaturesFlow(initialStx, listOf(session), ourSigningKeys))
-        // Update distribution list.
-        subFlow(UpdateDistributionListFlow(stx))
-        return subFlow(ObserverAwareFinalityFlow(stx, listOf(session)))
-    }
+	@Suspendable
+	override fun call(): SignedTransaction {
+		val txBuilder = TransactionBuilder(notary = getPreferredNotary(serviceHub))
+		addMoveNonFungibleTokens(txBuilder, serviceHub, house.toPointer<House>(), newOwner)
+		val session = initiateFlow(newOwner)
+		// Ask for input stateAndRefs - send notification with the amount to exchange.
+		session.send(DvPNotification(house.valuation))
+		// TODO add some checks for inputs and outputs
+		val inputs = subFlow(ReceiveStateAndRefFlow<FungibleToken>(session))
+		// Receive outputs (this is just quick and dirty, we could calculate them on our side of the flow).
+		val outputs = session.receive<List<FungibleToken>>().unwrap { it }
+		addMoveTokens(txBuilder, inputs, outputs)
+		// Synchronise any confidential identities
+		subFlow(SyncKeyMappingFlow(session, txBuilder.toWireTransaction(serviceHub)))
+		val ourSigningKeys = txBuilder.toLedgerTransaction(serviceHub).ourSigningKeys(serviceHub)
+		val initialStx = serviceHub.signInitialTransaction(txBuilder, signingPubKeys = ourSigningKeys)
+		val stx = subFlow(CollectSignaturesFlow(initialStx, listOf(session), ourSigningKeys))
+		// Update distribution list.
+		subFlow(UpdateDistributionListFlow(stx))
+		return subFlow(ObserverAwareFinalityFlow(stx, listOf(session)))
+	}
 }
 
 @InitiatedBy(DvPFlow::class)
 class DvPFlowHandler(val otherSession: FlowSession) : FlowLogic<Unit>() {
-    @Suspendable
-    override fun call() {
-        // Receive notification with house price.
-        val dvPNotification = otherSession.receive<DvPNotification>().unwrap { it }
-        // Chose state and refs to send back.
-        // TODO This is API pain, we assumed that we could just modify TransactionBuilder, but... it cannot be sent over the wire, because non-serializable
-        // We need custom serializer and some custom flows to do checks.
-        val changeHolder = serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false).party.anonymise()
-        val (inputs, outputs) = DatabaseTokenSelection(serviceHub).generateMove(
-                lockId = runId.uuid,
-                partiesAndAmounts = listOf(Pair(otherSession.counterparty, dvPNotification.amount)),
-                changeHolder = changeHolder
-        )
-        subFlow(SendStateAndRefFlow(otherSession, inputs))
-        otherSession.send(outputs)
-        subFlow(SyncKeyMappingFlowHandler(otherSession))
-        subFlow(object : SignTransactionFlow(otherSession) {
-            override fun checkTransaction(stx: SignedTransaction) {}
-        }
-        )
-        subFlow(ObserverAwareFinalityFlowHandler(otherSession))
-    }
+	@Suspendable
+	override fun call() {
+		// Receive notification with house price.
+		val dvPNotification = otherSession.receive<DvPNotification>().unwrap { it }
+		// Chose state and refs to send back.
+		// TODO This is API pain, we assumed that we could just modify TransactionBuilder, but... it cannot be sent over the wire, because non-serializable
+		// We need custom serializer and some custom flows to do checks.
+		val changeHolder = serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false).party.anonymise()
+		val (inputs, outputs) = DatabaseTokenSelection(serviceHub).generateMove(
+			lockId = runId.uuid,
+			partiesAndAmounts = listOf(Pair(otherSession.counterparty, dvPNotification.amount)),
+			changeHolder = changeHolder
+		)
+		subFlow(SendStateAndRefFlow(otherSession, inputs))
+		otherSession.send(outputs)
+		subFlow(SyncKeyMappingFlowHandler(otherSession))
+		subFlow(object : SignTransactionFlow(otherSession) {
+			override fun checkTransaction(stx: SignedTransaction) {}
+		}
+		)
+		subFlow(ObserverAwareFinalityFlowHandler(otherSession))
+	}
 }
 
 @StartableByRPC
 class GetDistributionList(val housePtr: TokenPointer<House>) : FlowLogic<List<DistributionRecord>>() {
-    @Suspendable
-    override fun call(): List<DistributionRecord> {
-        return getDistributionList(serviceHub, housePtr.pointer.pointer)
-    }
+	@Suspendable
+	override fun call(): List<DistributionRecord> {
+		return getDistributionList(serviceHub, housePtr.pointer.pointer)
+	}
 }
 
 @StartableByRPC
 class CheckTokenPointer(val housePtr: TokenPointer<House>) : FlowLogic<House>() {
-    @Suspendable
-    override fun call(): House {
-        return housePtr.pointer.resolve(serviceHub).state.data
-    }
+	@Suspendable
+	override fun call(): House {
+		return housePtr.pointer.resolve(serviceHub).state.data
+	}
 }
 
 // TODO This is hack that will be removed after fix in Corda 5. startFlowDynamic doesn't handle type parameters properly.
 @StartableByRPC
 class RedeemNonFungibleHouse(
-        val housePtr: TokenPointer<House>,
-        val issuerParty: Party
+	val housePtr: TokenPointer<House>,
+	val issuerParty: Party
 ) : FlowLogic<SignedTransaction>() {
-    @Suspendable
-    override fun call(): SignedTransaction {
-        return subFlow(RedeemNonFungibleTokens(housePtr, issuerParty, emptyList()))
-    }
+	@Suspendable
+	override fun call(): SignedTransaction {
+		return subFlow(RedeemNonFungibleTokens(housePtr, issuerParty, emptyList()))
+	}
 }
 
 @StartableByRPC
 class RedeemFungibleGBP(
-        val amount: Amount<TokenType>,
-        val issuerParty: Party
+	val amount: Amount<TokenType>,
+	val issuerParty: Party
 ) : FlowLogic<SignedTransaction>() {
-    @Suspendable
-    override fun call(): SignedTransaction {
-        return subFlow(RedeemFungibleTokens(amount, issuerParty, emptyList(), null))
-    }
+	@Suspendable
+	override fun call(): SignedTransaction {
+		return subFlow(RedeemFungibleTokens(amount, issuerParty, emptyList(), null))
+	}
 }
 
 // Helper flow for selection testing
 @StartableByRPC
 class SelectAndLockFlow(val amount: Amount<TokenType>, val delay: Duration = 1.seconds) : FlowLogic<Unit>() {
-    @Suspendable
-    override fun call() {
-        val selector = LocalTokenSelector(serviceHub)
-        selector.selectTokens(amount)
-        sleep(delay)
-    }
+	@Suspendable
+	override fun call() {
+		val selector = LocalTokenSelector(serviceHub)
+		selector.selectTokens(amount)
+		sleep(delay)
+	}
 }
 
 // Helper flow for selection testing
 @StartableByRPC
-class JustLocalSelect(val amount: Amount<TokenType>, val timeBetweenSelects: Duration = Duration.of(10, ChronoUnit.SECONDS), val maxSelectAttempts: Int = 5) : FlowLogic<List<StateAndRef<FungibleToken>>>() {
-    @Suspendable
-    override fun call(): List<StateAndRef<FungibleToken>> {
-        val selector = LocalTokenSelector(serviceHub)
-        var selectionAttempts = 0
-        while (selectionAttempts < maxSelectAttempts) {
-            try {
-                return selector.selectTokens(amount)
-            } catch (e: InsufficientBalanceException) {
-                logger.error("failed to select", e)
-                sleep(timeBetweenSelects, true)
-                selectionAttempts++
-            }
-        }
-        throw InsufficientBalanceException("Could not select: ${amount}")
-    }
+class JustLocalSelect(val amount: Amount<TokenType>, val timeBetweenSelects: Duration = Duration.of(10, ChronoUnit.SECONDS), val maxSelectAttempts: Int = 5) :
+	FlowLogic<List<StateAndRef<FungibleToken>>>() {
+	@Suspendable
+	override fun call(): List<StateAndRef<FungibleToken>> {
+		val selector = LocalTokenSelector(serviceHub)
+		var selectionAttempts = 0
+		while (selectionAttempts < maxSelectAttempts) {
+			try {
+				return selector.selectTokens(amount)
+			} catch (e: InsufficientBalanceException) {
+				logger.error("failed to select", e)
+				sleep(timeBetweenSelects, true)
+				selectionAttempts++
+			}
+		}
+		throw InsufficientBalanceException("Could not select: ${amount}")
+	}
+}
+
+@StartableByRPC
+class GetSelectionPageSize : FlowLogic<Int>() {
+	@Suspendable
+	override fun call(): Int {
+		val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java)
+		return vaultWatcherService.providedConfig.pageSize
+	}
+}
+
+@StartableByRPC
+class GetSelectionSleepDuration : FlowLogic<Int>() {
+	@Suspendable
+	override fun call(): Int {
+		val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java)
+		return vaultWatcherService.providedConfig.sleep
+	}
+}
+
+@StartableByRPC
+class LockEverythingGetValue(val tokenType: TokenType) : FlowLogic<Long>() {
+	@Suspendable
+	override fun call(): Long {
+		val vaultWatcherService = serviceHub.cordaService(VaultWatcherService::class.java)
+		val amount = Amount(Long.MAX_VALUE, tokenType)
+		val selectionId = UUID.randomUUID().toString()
+		var tokens: List<StateAndRef<FungibleToken>>? = vaultWatcherService.selectTokens(
+			Holder.TokenOnly(), amount,
+			allowShortfall = true,
+			selectionId = selectionId
+		)
+
+		val value = tokens?.map { it.state.data.amount.quantity }?.sum()
+
+		tokens?.forEach {
+			vaultWatcherService.unlockToken(
+				it, selectionId
+			)
+		}
+
+		// just to make sure that tokens is not checkpointed anywhere
+		tokens = null
+
+		return value!!
+	}
 }
\ No newline at end of file

From 308a8f503dada1baba6eb70421b6aef64a777c2d Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Mon, 6 Dec 2021 17:47:59 +0100
Subject: [PATCH 03/10] more testing

---
 .idea/.gitignore                               |  2 ++
 .idea/codeStyles/Project.xml                   | 18 ------------------
 freighter-tests/build.gradle                   |  5 +++++
 .../testing/HugeTokensLoadedOnRestartTest.kt   | 13 +++++++------
 4 files changed, 14 insertions(+), 24 deletions(-)
 create mode 100644 .idea/.gitignore
 delete mode 100644 .idea/codeStyles/Project.xml

diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 00000000..43acdc97
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,2 @@
+# Datasource local storage ignored files
+/dataSources/
diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
deleted file mode 100644
index b9191652..00000000
--- a/.idea/codeStyles/Project.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<component name="ProjectCodeStyleConfiguration">
-  <code_scheme name="Project" version="173">
-    <option name="RIGHT_MARGIN" value="160" />
-    <JavaCodeStyleSettings>
-      <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="500" />
-      <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="300" />
-    </JavaCodeStyleSettings>
-    <JetCodeStyleSettings>
-      <option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
-    </JetCodeStyleSettings>
-    <codeStyleSettings language="kotlin">
-      <option name="CODE_STYLE_DEFAULTS" value="KOTLIN_OFFICIAL" />
-      <indentOptions>
-        <option name="USE_TAB_CHARACTER" value="true" />
-      </indentOptions>
-    </codeStyleSettings>
-  </code_scheme>
-</component>
\ No newline at end of file
diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle
index 7125a8dc..fe1c6bfa 100644
--- a/freighter-tests/build.gradle
+++ b/freighter-tests/build.gradle
@@ -28,6 +28,11 @@ task freighterTest(type: Test, dependsOn: [project(":workflows").jar]) {
         includeTags "DOCKER"
         excludeTags "AZURE", "FULL_LINUX_KERNEL", "ORACLE"
     }
+
+    testLogging {
+        showStandardStreams = true
+    }
+
 }
 
 configurations {
diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
index e4c9d42e..0080d229 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -107,15 +107,16 @@ class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 
 		val issuedNumber = AtomicLong(0)
 
-		val numberIssued = StreamSupport.stream((0..9999).chunked(1000).spliterator(), true).map { toIssue ->
-			repeat(toIssue.size) {
-				nodeMachine1.rpc {
+		val numberIssued = StreamSupport.stream((0..4999).chunked(250).spliterator(), true).map { toIssue ->
+			nodeMachine1.rpc {
+				repeat(toIssue.size) {
 					startFlow(
 						::IssueTokens,
 						tokenToIssue, listOf()
 					).returnValue.getOrThrow(Duration.ofMinutes(1))
+					println("[${Thread.currentThread().name}] Total number issued: ${issuedNumber.addAndGet(tokenToIssue.size * 1L)}")
 				}
-				println("Total number issued: ${ issuedNumber.addAndGet(tokenToIssue.size * 1L) }")
+
 			}
 
 			toIssue.size
@@ -132,8 +133,8 @@ class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 					tokenType
 				).returnValue.getOrThrow(Duration.ofMinutes(1))
 			}
-			println(tokenValueLoadedInCache)
-			Thread.sleep(1000)
+			println("TOTAL TOKEN VALUE IN CACHE: $tokenValueLoadedInCache")
+			Thread.sleep(20000)
 		}
 
 	}

From 4aa3fc36d769724a5d1c0564f746e1fecc46e382 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 14:23:21 +0100
Subject: [PATCH 04/10] final version of huge loading test

---
 .../testing/HugeTokensLoadedOnRestartTest.kt  | 240 ++++++++++--------
 .../memory/config/InMemorySelectionConfig.kt  |   6 +-
 .../memory/services/VaultWatcherService.kt    |  77 +++---
 3 files changed, 174 insertions(+), 149 deletions(-)

diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
index 0080d229..8c586296 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -3,8 +3,6 @@ package freighter.testing
 import com.r3.corda.lib.tokens.contracts.states.FungibleToken
 import com.r3.corda.lib.tokens.contracts.types.IssuedTokenType
 import com.r3.corda.lib.tokens.contracts.types.TokenType
-import com.r3.corda.lib.tokens.integration.workflows.GetSelectionPageSize
-import com.r3.corda.lib.tokens.integration.workflows.GetSelectionSleepDuration
 import com.r3.corda.lib.tokens.integration.workflows.LockEverythingGetValue
 import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens
 import com.stress.flows.CreateNewCIFlow
@@ -17,127 +15,153 @@ import freighter.machine.generateRandomString
 import net.corda.core.contracts.Amount
 import net.corda.core.messaging.startFlow
 import net.corda.core.utilities.getOrThrow
-import org.hamcrest.MatcherAssert
-import org.hamcrest.Matchers.`is`
 import org.junit.jupiter.api.Test
 import utility.getOrThrow
 import java.time.Duration
+import java.util.concurrent.CompletableFuture
 import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.locks.ReentrantLock
 import java.util.stream.StreamSupport
+import kotlin.concurrent.withLock
 import kotlin.streams.toList
 
 class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 
-	val tokenCurrentContracts =
-		NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey()
+    val loadingThreads = 8
+    val pageSize = 10_000
 
-	val testFlows =
-		NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey()
+    val tokenCurrentContracts =
+            NodeBuilder.DeployedCordapp.fromClassPath("tokens-contracts").signedWithFreighterKey()
 
-	val tokenCurrentWorkflows =
-		NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig(
-			"""
+    val testFlows =
+            NodeBuilder.DeployedCordapp.fromClassPath("workflows-integration-test").signedWithFreighterKey()
+
+    val tokenCurrentWorkflows =
+            NodeBuilder.DeployedCordapp.fromClassPath("tokens-workflows").withConfig(
+                    """
 				stateSelection.inMemory.enabled=true
 				stateSelection.inMemory.indexingStrategies=[EXTERNAL_ID, PUBLIC_KEY]
-				stateSelection.inMemory.pageSize=5000
+				stateSelection.inMemory.pageSize=${pageSize}
 				stateSelection.inMemory.loadingSleepSeconds=-1
+				stateSelection.inMemory.loadingThreads=${loadingThreads}
 			""".trimIndent().byteInputStream()
-		)
-
-	val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact(
-		group = "com.r3.corda.lib.ci",
-		artifact = "ci-workflows",
-		version = "1.0"
-	)
-
-	val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows")
-
-	@Test
-	fun `tokens can be loaded async during node startup on postgres 9_6`() {
-		run(DeploymentMachineProvider.DatabaseType.PG_9_6)
-	}
-
-	@Test
-	fun `tokens can be loaded async during node startup on H2`() {
-		run(DeploymentMachineProvider.DatabaseType.H2)
-	}
-
-	private fun run(db: DeploymentMachineProvider.DatabaseType) {
-		val randomString = generateRandomString()
-		val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword)
-		val node1 = SingleNodeDeployment(
-			NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString")
-				.withCordapp(tokenCurrentContracts)
-				.withCordapp(tokenCurrentWorkflows)
-				.withCordapp(modernCiV1)
-				.withCordapp(freighterHelperCordapp)
-				.withCordapp(testFlows)
-				.withDatabase(machineProvider.requestDatabase(db))
-		).withVersion(UnitOfDeployment.CORDA_4_7)
-			.deploy(deploymentContext)
-
-		val nodeMachine1 = node1.getOrThrow().nodeMachines.single()
-
-		val createdCi = nodeMachine1.rpc {
-			startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also {
-				println("Successfully created CI: $it")
-			}
-		}
-
-		val tokenType = TokenType("StefCoin", 2)
-		val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType)
-		val amount = Amount(1, issuedTokenType)
-		val tokenToIssue = (0..99).map { FungibleToken(amount, createdCi) }.toList()
-
-		val loadingPageSize = nodeMachine1.rpc {
-			startFlow(
-				::GetSelectionPageSize
-			).returnValue.getOrThrow()
-		}
-
-		MatcherAssert.assertThat(loadingPageSize, `is`(5000))
-
-		val loadingSleepDuration = nodeMachine1.rpc {
-			startFlow(
-				::GetSelectionSleepDuration
-			).returnValue.getOrThrow()
-		}
-
-		MatcherAssert.assertThat(loadingSleepDuration, `is`(-1))
-
-		val issuedNumber = AtomicLong(0)
-
-		val numberIssued = StreamSupport.stream((0..4999).chunked(250).spliterator(), true).map { toIssue ->
-			nodeMachine1.rpc {
-				repeat(toIssue.size) {
-					startFlow(
-						::IssueTokens,
-						tokenToIssue, listOf()
-					).returnValue.getOrThrow(Duration.ofMinutes(1))
-					println("[${Thread.currentThread().name}] Total number issued: ${issuedNumber.addAndGet(tokenToIssue.size * 1L)}")
-				}
-
-			}
-
-			toIssue.size
-		}.toList().sum()
-
-		//1 million states
-		nodeMachine1.stopNode()
-		println()
-		nodeMachine1.startNode()
-		(0..100).forEach { _ ->
-			val tokenValueLoadedInCache = nodeMachine1.rpc {
-				startFlow(
-					::LockEverythingGetValue,
-					tokenType
-				).returnValue.getOrThrow(Duration.ofMinutes(1))
-			}
-			println("TOTAL TOKEN VALUE IN CACHE: $tokenValueLoadedInCache")
-			Thread.sleep(20000)
-		}
-
-	}
+            )
+
+    val modernCiV1 = NodeBuilder.DeployedCordapp.fromGradleArtifact(
+            group = "com.r3.corda.lib.ci",
+            artifact = "ci-workflows",
+            version = "1.0"
+    )
+
+    val freighterHelperCordapp = NodeBuilder.DeployedCordapp.fromClassPath("freighter-cordapp-flows")
+
+    @Test
+    fun `tokens can be loaded async during node startup on postgres 9_6`() {
+        run(DeploymentMachineProvider.DatabaseType.PG_9_6)
+    }
+
+    @Test
+    fun `tokens can be loaded async during node startup on H2`() {
+        run(DeploymentMachineProvider.DatabaseType.H2)
+    }
+
+    private fun run(db: DeploymentMachineProvider.DatabaseType) {
+        val randomString = generateRandomString()
+        val deploymentContext = DeploymentContext(machineProvider, nms, artifactoryUsername, artifactoryPassword)
+        val node1 = SingleNodeDeployment(
+                NodeBuilder().withX500("O=PartyB, C=GB, L=LONDON, CN=$randomString")
+                        .withCordapp(tokenCurrentContracts)
+                        .withCordapp(tokenCurrentWorkflows)
+                        .withCordapp(modernCiV1)
+                        .withCordapp(freighterHelperCordapp)
+                        .withCordapp(testFlows)
+                        .withDatabase(machineProvider.requestDatabase(db))
+        ).withVersion(UnitOfDeployment.CORDA_4_7)
+                .deploy(deploymentContext)
+
+        val nodeMachine1 = node1.getOrThrow().nodeMachines.single()
+
+        val createdCi = nodeMachine1.rpc {
+            startFlow(::CreateNewCIFlow).returnValue.getOrThrow().also {
+                println("Successfully created CI: $it")
+            }
+        }
+
+        val tokenType = TokenType("StefCoin", 2)
+        val issuedTokenType = IssuedTokenType(nodeMachine1.identity(), tokenType)
+        val issuedTotal = AtomicLong(0)
+
+        val tokenToIssue = (0.until(100)).map { FungibleToken(Amount(1, issuedTokenType), createdCi) }.toList()
+
+        val numberIssued = StreamSupport.stream((0.until(5000)).chunked(200).spliterator(), true).map { toIssue ->
+            nodeMachine1.rpc {
+                repeat(toIssue.size) {
+                    startFlow(
+                            ::IssueTokens,
+                            tokenToIssue, listOf()
+                    ).returnValue.getOrThrow(Duration.ofMinutes(1))
+                    println("[${Thread.currentThread().name}] Total number issued: ${issuedTotal.addAndGet(tokenToIssue.size * 1L)}")
+                }
+
+            }
+
+            toIssue.size
+        }.toList().sum()
+
+        nodeMachine1.stopNode()
+        println()
+        nodeMachine1.startNode()
+
+        val nodeStartTime = System.currentTimeMillis()
+
+        var amountLoaded = 0L
+
+        val lock = ReentrantLock()
+        val condition = lock.newCondition()
+
+
+        //whilst we are loading, issue 500 more tokens to see if they are correctly loaded
+        CompletableFuture.runAsync {
+            lock.withLock {
+                condition.await()
+            }
+            nodeMachine1.rpc {
+                repeat(5) {
+                    startFlow(
+                            ::IssueTokens,
+                            tokenToIssue, listOf()
+                    ).returnValue.getOrThrow(Duration.ofMinutes(1))
+                    println("[${Thread.currentThread().name}] Total number issued: ${issuedTotal.addAndGet(tokenToIssue.size * 1L)}")
+                }
+            }
+        }
+
+        while (amountLoaded != (issuedTotal.toLong() + 500)) {
+            nodeMachine1.rpc {
+                amountLoaded = startFlow(
+                        ::LockEverythingGetValue,
+                        tokenType
+                ).returnValue.getOrThrow(Duration.ofMinutes(1))
+            }
+            println("TOTAL TOKEN VALUE IN CACHE: $amountLoaded")
+
+            if (amountLoaded > 0) {
+                lock.withLock {
+                    condition.signal()
+                }
+            }
+
+            if (amountLoaded != issuedTotal.toLong()) {
+                Thread.sleep(20000)
+            }
+        }
+
+        val loadEndTime = System.currentTimeMillis()
+
+        println("it took: ${(loadEndTime - nodeStartTime) / 1000} seconds to populate $amountLoaded states using $loadingThreads loading threads and pageSize: $pageSize")
+
+
+    }
 
 
 }
\ No newline at end of file
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
index d662ddfe..12ab2b92 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
@@ -17,7 +17,8 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
     val indexingStrategies: List<VaultWatcherService.IndexingType>,
     val cacheSize: Int = CACHE_SIZE_DEFAULT,
     val pageSize: Int = 1000,
-	val sleep: Int = 0
+	val sleep: Int = 0,
+	val loadingThreads: Int = 4
 ) : StateSelectionConfig {
 	companion object {
 		private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
@@ -34,6 +35,7 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
 				?: CACHE_SIZE_DEFAULT
             val pageSize: Int = config.getIntOrNull("stateSelection.inMemory.pageSize")?: PAGE_SIZE_DEFAULT
             val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0
+            val loadingThreads: Int = config.getIntOrNull("stateSelection.inMemory.loadingThreads")?: 4
 			val indexingType = try {
 				(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
 			} catch (e: CordappConfigException) {
@@ -44,7 +46,7 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
 				emptyList<VaultWatcherService.IndexingType>()
 			}
 			logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
-			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep)
+			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep, loadingThreads)
 		}
 
 		fun defaultConfig(): InMemorySelectionConfig {
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
index eefb9eaa..cccc7aef 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
@@ -28,6 +28,8 @@ import net.corda.core.utilities.contextLogger
 import rx.Observable
 import java.time.Duration
 import java.util.concurrent.*
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import java.util.function.Function
@@ -84,7 +86,6 @@ class VaultWatcherService(
 		val LOG = contextLogger()
 
 		private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
-			val loadingThread = Executors.newSingleThreadScheduledExecutor()
 			val config = appServiceHub.cordappProvider.getAppContext().config
 			val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
 
@@ -105,9 +106,14 @@ class VaultWatcherService(
 				}
 			}
 
+			val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
+					contractStateType = FungibleToken::class.java,
+					paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1),
+					criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL),
+					sorting = sortByStateRefAscending()
+			)
 
 			val pageSize = configOptions.pageSize
-			var currentPage = DEFAULT_PAGE_NUM
 			val asyncLoader = object : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
 				override fun invoke(callback: (Vault.Update<FungibleToken>) -> Unit) {
 					LOG.info("Starting async token loading from vault")
@@ -115,55 +121,48 @@ class VaultWatcherService(
 					val classGraph = ClassGraph()
 					classGraph.enableClassInfo()
 
-					val scanResultFuture = CompletableFuture.supplyAsync(Supplier {
+					val scanResultFuture = CompletableFuture.supplyAsync {
 						classGraph.scan()
-					}, loadingThread)
+					}
 
-					scanResultFuture.thenApplyAsync(Function<ScanResult, Unit> { scanResult ->
-						val subclasses : Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
-							.map { it.name }
-							.map { Class.forName(it) as Class<out FungibleToken> }.toSet()
+					scanResultFuture.thenApplyAsync { scanResult ->
+						val subclasses: Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
+								.map { it.name }
+								.map { Class.forName(it) as Class<out FungibleToken> }.toSet()
 
 						val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
 						LOG.info("Enriching token query with types: $enrichedClasses")
-						loadingThread.submit {
-							LOG.info("Querying for tokens of types: $subclasses")
-							try {
-								var shouldLoop = true
-								while (shouldLoop) {
-									val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
-										paging = PageSpecification(pageNumber = currentPage, pageSize = pageSize),
-										criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
-										sorting = sortByTimeStampAscending()
-									).states.toSet()
-									callback(Vault.Update(emptySet(), newlyLoadedStates))
-									LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
-									shouldLoop = newlyLoadedStates.isNotEmpty()
-									LOG.debug("shouldLoop=${shouldLoop}")
-									currentPage++
-
-									if (configOptions.sleep > 0){
-										Thread.sleep(configOptions.sleep.toLong() * 1000)
-									}
 
+						val shouldLoop = AtomicBoolean(true)
+						val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1)
+						val loadingFutures: List<CompletableFuture<Void>> = 0.until(configOptions.loadingThreads).map {
+							CompletableFuture.runAsync {
+								try {
+									while (shouldLoop.get()) {
+										val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
+												paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = pageSize),
+												criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
+												sorting = sortByTimeStampAscending()
+										).states.toSet()
+										callback(Vault.Update(emptySet(), newlyLoadedStates))
+										LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
+										shouldLoop.compareAndSet(newlyLoadedStates.isNotEmpty(), true)
+										LOG.debug("shouldLoop=${shouldLoop}")
+										if (configOptions.sleep > 0) {
+											Thread.sleep(configOptions.sleep.toLong() * 1000)
+										}
+
+									}
+									LOG.info("finished token loading")
+								} catch (t: Throwable) {
+									LOG.error("Token Loading Failed due to: ", t)
 								}
-								LOG.info("finished token loading")
-							} catch (t: Throwable) {
-								LOG.error("Token Loading Failed due to: ", t)
 							}
 						}
-					}, loadingThread)
+					}
 				}
 			}
 
-			val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
-				contractStateType = FungibleToken::class.java,
-				paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1),
-				criteria = QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL),
-				sorting = sortByStateRefAscending()
-			)
-
-
 			return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
 		}
 	}

From 768a2b0f78f1fa9f97114a58e02a337a2d45decd Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 14:47:49 +0100
Subject: [PATCH 05/10] solve halting problem

---
 .../kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
index 8c586296..f55cfb9e 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -136,7 +136,7 @@ class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
             }
         }
 
-        while (amountLoaded != (issuedTotal.toLong() + 500)) {
+        while (amountLoaded != (issuedTotal.toLong())) {
             nodeMachine1.rpc {
                 amountLoaded = startFlow(
                         ::LockEverythingGetValue,

From db195dd5c4fca2b35cff9680fae89da208534929 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 15:22:26 +0100
Subject: [PATCH 06/10] add sleep on restart lock test as token loading is now
 async

---
 build.gradle                                                | 2 +-
 .../freighter/testing/HugeTokensLoadedOnRestartTest.kt      | 5 +++++
 .../r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt  | 6 ++++--
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5e806e4d..004f1af2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -5,7 +5,7 @@ buildscript {
         corda_release_group = 'net.corda'
         corda_release_version = '4.3'
         tokens_release_group = "com.r3.corda.lib.tokens"
-        tokens_release_version = "1.2.4-SNAPSHOT"
+        tokens_release_version = "1.2.5-SNAPSHOT"
         corda_gradle_plugins_version = '5.0.12'
         kotlin_version = '1.2.71'
         junit_version = '4.12'
diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
index f55cfb9e..23627df3 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -15,6 +15,7 @@ import freighter.machine.generateRandomString
 import net.corda.core.contracts.Amount
 import net.corda.core.messaging.startFlow
 import net.corda.core.utilities.getOrThrow
+import org.junit.jupiter.api.Tag
 import org.junit.jupiter.api.Test
 import utility.getOrThrow
 import java.time.Duration
@@ -25,6 +26,10 @@ import java.util.stream.StreamSupport
 import kotlin.concurrent.withLock
 import kotlin.streams.toList
 
+@Tag("LARGE_TEST")
+annotation class LargeTest
+
+@LargeTest
 class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 
     val loadingThreads = 8
diff --git a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
index fdd5080d..66214385 100644
--- a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
+++ b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
@@ -292,11 +292,11 @@ class TokenDriverTest {
                     listOf(50.USD issuedBy nodeParty heldBy nodeParty),
                     emptyList<Party>()
             ).returnValue.getOrThrow()
-            // Run select and lock tokens flow with 5 seconds sleep in it.
+            // Run select and lock tokens flow
             node.rpc.startFlowDynamic(
                     SelectAndLockFlow::class.java,
                     50.GBP,
-                    5.seconds
+                    50.seconds
             )
             // Stop node
             (node as OutOfProcess).process.destroyForcibly()
@@ -305,6 +305,8 @@ class TokenDriverTest {
             // Restart the node
             val restartedNode = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to "localhost:30000")).getOrThrow()
             // Try to spend same states, they should be locked after restart, so we expect insufficient not locked balance exception to be thrown.
+            Thread.sleep(15000) // because token loading is now async, we must wait a bit of time before we can attempt to select.
+
             assertFailsWith<InsufficientNotLockedBalanceException> {
                 restartedNode.rpc.startFlowDynamic(
                     SelectAndLockFlow::class.java,

From ff99df716ce0a75539e357953a4c57d1512aeaa3 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 16:01:07 +0100
Subject: [PATCH 07/10] refactor some code out of VaultWatcherService.kt

---
 .../memory/config/InMemorySelectionConfig.kt  |  19 +--
 .../memory/services/IndexingType.kt           |  25 ++++
 .../memory/services/ServiceHubAsyncLoader.kt  |  75 ++++++++++++
 .../memory/services/VaultWatcherService.kt    | 114 +++---------------
 .../tokens/integration/workflows/TestFlows.kt |   1 -
 .../tokens/workflows/ConfigSelectionTest.kt   |   9 +-
 .../workflows/VaultWatcherServiceTest.kt      |   9 +-
 7 files changed, 138 insertions(+), 114 deletions(-)
 create mode 100644 modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt
 create mode 100644 modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt

diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
index 12ab2b92..17826001 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/config/InMemorySelectionConfig.kt
@@ -3,6 +3,7 @@ package com.r3.corda.lib.tokens.selection.memory.config
 import co.paralleluniverse.fibers.Suspendable
 import com.r3.corda.lib.tokens.selection.api.StateSelectionConfig
 import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector
+import com.r3.corda.lib.tokens.selection.memory.services.IndexingType
 import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
 import net.corda.core.cordapp.CordappConfig
 import net.corda.core.cordapp.CordappConfigException
@@ -13,12 +14,12 @@ const val CACHE_SIZE_DEFAULT = 1024
 const val PAGE_SIZE_DEFAULT = 1024
 
 data class InMemorySelectionConfig @JvmOverloads constructor(
-    val enabled: Boolean,
-    val indexingStrategies: List<VaultWatcherService.IndexingType>,
-    val cacheSize: Int = CACHE_SIZE_DEFAULT,
-    val pageSize: Int = 1000,
-	val sleep: Int = 0,
-	val loadingThreads: Int = 4
+        val enabled: Boolean,
+        val indexingStrategies: List<IndexingType>,
+        val cacheSize: Int = CACHE_SIZE_DEFAULT,
+        val pageSize: Int = 1000,
+        val sleep: Int = 0,
+        val loadingThreads: Int = 4
 ) : StateSelectionConfig {
 	companion object {
 		private val logger = LoggerFactory.getLogger("inMemoryConfigSelectionLogger")
@@ -37,13 +38,13 @@ data class InMemorySelectionConfig @JvmOverloads constructor(
             val loadingSleep: Int = config.getIntOrNull("stateSelection.inMemory.loadingSleepSeconds")?: 0
             val loadingThreads: Int = config.getIntOrNull("stateSelection.inMemory.loadingThreads")?: 4
 			val indexingType = try {
-				(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { VaultWatcherService.IndexingType.valueOf(it.toString()) }
+				(config.get("stateSelection.inMemory.indexingStrategies") as List<Any>).map { IndexingType.valueOf(it.toString()) }
 			} catch (e: CordappConfigException) {
 				logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
-				emptyList<VaultWatcherService.IndexingType>()
+				emptyList<IndexingType>()
 			} catch (e: ClassCastException) {
 				logger.warn("No indexing method specified. Indexes will be created at run-time for each invocation of selectTokens")
-				emptyList<VaultWatcherService.IndexingType>()
+				emptyList<IndexingType>()
 			}
 			logger.info("Found in memory token selection configuration with values indexing strategy: $indexingType, cacheSize: $cacheSize")
 			return InMemorySelectionConfig(enabled, indexingType, cacheSize, pageSize, loadingSleep, loadingThreads)
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt
new file mode 100644
index 00000000..2bec1ccb
--- /dev/null
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/IndexingType.kt
@@ -0,0 +1,25 @@
+package com.r3.corda.lib.tokens.selection.memory.services
+
+import com.r3.corda.lib.tokens.selection.memory.internal.Holder
+
+enum class IndexingType(val ownerType: Class<out Holder>) {
+
+    EXTERNAL_ID(Holder.MappedIdentity::class.java),
+    PUBLIC_KEY(Holder.KeyIdentity::class.java);
+
+    companion object {
+        fun fromHolder(holder: Class<out Holder>): IndexingType {
+            return when (holder) {
+                Holder.MappedIdentity::class.java -> {
+                    EXTERNAL_ID
+                }
+
+                Holder.KeyIdentity::class.java -> {
+                    PUBLIC_KEY
+                }
+                else -> throw IllegalArgumentException("Unknown Holder type: $holder")
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt
new file mode 100644
index 00000000..c5f9568a
--- /dev/null
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt
@@ -0,0 +1,75 @@
+package com.r3.corda.lib.tokens.selection.memory.services
+
+import com.r3.corda.lib.tokens.contracts.states.FungibleToken
+import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
+import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending
+import io.github.classgraph.ClassGraph
+import net.corda.core.node.AppServiceHub
+import net.corda.core.node.services.Vault
+import net.corda.core.node.services.queryBy
+import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
+import net.corda.core.node.services.vault.PageSpecification
+import net.corda.core.node.services.vault.QueryCriteria
+import net.corda.core.utilities.contextLogger
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
+
+class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub,
+                            private val configOptions: InMemorySelectionConfig) : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
+
+
+    override fun invoke(
+            onVaultUpdate: (Vault.Update<FungibleToken>) -> Unit
+    ) {
+        LOG.info("Starting async token loading from vault")
+
+        val classGraph = ClassGraph()
+        classGraph.enableClassInfo()
+
+        val scanResultFuture = CompletableFuture.supplyAsync {
+            classGraph.scan()
+        }
+
+        scanResultFuture.thenApplyAsync { scanResult ->
+            val subclasses: Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
+                    .map { it.name }
+                    .map { Class.forName(it) as Class<out FungibleToken> }.toSet()
+
+            val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
+            LOG.info("Enriching token query with types: $enrichedClasses")
+
+            val shouldLoop = AtomicBoolean(true)
+            val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1)
+            val loadingFutures: List<CompletableFuture<Void>> = 0.until(configOptions.loadingThreads).map {
+                CompletableFuture.runAsync {
+                    try {
+                        while (shouldLoop.get()) {
+                            val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
+                                    paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = configOptions.pageSize),
+                                    criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
+                                    sorting = sortByTimeStampAscending()
+                            ).states.toSet()
+                            onVaultUpdate(Vault.Update(emptySet(), newlyLoadedStates))
+                            LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
+                            shouldLoop.compareAndSet(true, newlyLoadedStates.isNotEmpty())
+                            LOG.debug("shouldLoop=${shouldLoop}")
+                            if (configOptions.sleep > 0) {
+                                Thread.sleep(configOptions.sleep.toLong() * 1000)
+                            }
+                        }
+                    } catch (t: Throwable) {
+                        LOG.error("Token Loading Failed due to: ", t)
+                    }
+                }
+            }
+            CompletableFuture.allOf(*loadingFutures.toTypedArray()).thenRunAsync {
+                LOG.info("finished token loading")
+            }
+        }
+    }
+    
+    companion object {
+        val LOG = contextLogger()
+    }
+}
\ No newline at end of file
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
index cccc7aef..0a48da64 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
@@ -10,16 +10,12 @@ import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
 import com.r3.corda.lib.tokens.selection.memory.internal.Holder
 import com.r3.corda.lib.tokens.selection.memory.internal.lookupExternalIdFromKey
 import com.r3.corda.lib.tokens.selection.sortByStateRefAscending
-import com.r3.corda.lib.tokens.selection.sortByTimeStampAscending
-import io.github.classgraph.ClassGraph
-import io.github.classgraph.ScanResult
 import net.corda.core.contracts.Amount
 import net.corda.core.contracts.StateAndRef
 import net.corda.core.internal.uncheckedCast
 import net.corda.core.node.AppServiceHub
 import net.corda.core.node.services.CordaService
 import net.corda.core.node.services.Vault
-import net.corda.core.node.services.queryBy
 import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
 import net.corda.core.node.services.vault.PageSpecification
 import net.corda.core.node.services.vault.QueryCriteria
@@ -28,12 +24,8 @@ import net.corda.core.utilities.contextLogger
 import rx.Observable
 import java.time.Duration
 import java.util.concurrent.*
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import java.util.function.Function
-import java.util.function.Supplier
 import kotlin.concurrent.read
 import kotlin.concurrent.write
 
@@ -55,41 +47,28 @@ class VaultWatcherService(
 	private val indexViewCreationLock: ReentrantReadWriteLock = ReentrantReadWriteLock()
 	private val UPDATER = Executors.newSingleThreadScheduledExecutor()
 
-	enum class IndexingType(val ownerType: Class<out Holder>) {
-
-		EXTERNAL_ID(Holder.MappedIdentity::class.java),
-		PUBLIC_KEY(Holder.KeyIdentity::class.java);
-
-		companion object {
-			fun fromHolder(holder: Class<out Holder>): IndexingType {
-				return when (holder) {
-					Holder.MappedIdentity::class.java -> {
-						EXTERNAL_ID
-					}
-
-					Holder.KeyIdentity::class.java -> {
-						PUBLIC_KEY
-					}
-					else -> throw IllegalArgumentException("Unknown Holder type: $holder")
-				}
-			}
-		}
-
-	}
-
 	constructor(appServiceHub: AppServiceHub) : this(
 		getObservableFromAppServiceHub(appServiceHub),
 		InMemorySelectionConfig.parse(appServiceHub.getAppContext().config)
 	)
 
+	init {
+		addTokensToCache(tokenObserver.initialValues)
+		tokenObserver.source.doOnError {
+			LOG.error("received error from observable", it)
+		}
+		tokenObserver.source.subscribe(::onVaultUpdate)
+		tokenObserver.startLoading(::onVaultUpdate)
+	}
+
 	companion object {
 		val LOG = contextLogger()
 
 		private fun getObservableFromAppServiceHub(appServiceHub: AppServiceHub): TokenObserver {
-			val config = appServiceHub.cordappProvider.getAppContext().config
-			val configOptions: InMemorySelectionConfig = InMemorySelectionConfig.parse(config)
+			val rawConfig = appServiceHub.cordappProvider.getAppContext().config
+			val parsedConfig: InMemorySelectionConfig = InMemorySelectionConfig.parse(rawConfig)
 
-			if (!configOptions.enabled) {
+			if (!parsedConfig.enabled) {
 				LOG.info("Disabling inMemory token selection - refer to documentation on how to enable")
 				return TokenObserver(emptyList(), Observable.empty(), { _, _ ->
 					Holder.UnmappedIdentity()
@@ -105,7 +84,6 @@ class VaultWatcherService(
 					}
 				}
 			}
-
 			val (_, vaultObservable) = appServiceHub.vaultService.trackBy(
 					contractStateType = FungibleToken::class.java,
 					paging = PageSpecification(pageNumber = DEFAULT_PAGE_NUM, pageSize = 1),
@@ -113,68 +91,12 @@ class VaultWatcherService(
 					sorting = sortByStateRefAscending()
 			)
 
-			val pageSize = configOptions.pageSize
-			val asyncLoader = object : ((Vault.Update<FungibleToken>) -> Unit) -> Unit {
-				override fun invoke(callback: (Vault.Update<FungibleToken>) -> Unit) {
-					LOG.info("Starting async token loading from vault")
-
-					val classGraph = ClassGraph()
-					classGraph.enableClassInfo()
-
-					val scanResultFuture = CompletableFuture.supplyAsync {
-						classGraph.scan()
-					}
-
-					scanResultFuture.thenApplyAsync { scanResult ->
-						val subclasses: Set<Class<out FungibleToken>> = scanResult.getSubclasses(FungibleToken::class.java.canonicalName)
-								.map { it.name }
-								.map { Class.forName(it) as Class<out FungibleToken> }.toSet()
-
-						val enrichedClasses = (subclasses - setOf(FungibleToken::class.java))
-						LOG.info("Enriching token query with types: $enrichedClasses")
-
-						val shouldLoop = AtomicBoolean(true)
-						val pageNumber = AtomicInteger(DEFAULT_PAGE_NUM - 1)
-						val loadingFutures: List<CompletableFuture<Void>> = 0.until(configOptions.loadingThreads).map {
-							CompletableFuture.runAsync {
-								try {
-									while (shouldLoop.get()) {
-										val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
-												paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = pageSize),
-												criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
-												sorting = sortByTimeStampAscending()
-										).states.toSet()
-										callback(Vault.Update(emptySet(), newlyLoadedStates))
-										LOG.info("publishing ${newlyLoadedStates.size} to async state loading callback")
-										shouldLoop.compareAndSet(newlyLoadedStates.isNotEmpty(), true)
-										LOG.debug("shouldLoop=${shouldLoop}")
-										if (configOptions.sleep > 0) {
-											Thread.sleep(configOptions.sleep.toLong() * 1000)
-										}
-
-									}
-									LOG.info("finished token loading")
-								} catch (t: Throwable) {
-									LOG.error("Token Loading Failed due to: ", t)
-								}
-							}
-						}
-					}
-				}
-			}
-
+			val asyncLoader = ServiceHubAsyncLoader(appServiceHub, parsedConfig)
 			return TokenObserver(emptyList(), uncheckedCast(vaultObservable), ownerProvider, asyncLoader)
 		}
 	}
 
-	init {
-		addTokensToCache(tokenObserver.initialValues)
-		tokenObserver.source.doOnError {
-			LOG.error("received error from observable", it)
-		}
-		tokenObserver.startLoading(::onVaultUpdate)
-		tokenObserver.source.subscribe(::onVaultUpdate)
-	}
+
 
 	private fun processToken(token: StateAndRef<FungibleToken>, indexingType: IndexingType): TokenIndex {
 		val owner = tokenObserver.ownerProvider(token, indexingType)
@@ -375,10 +297,10 @@ class VaultWatcherService(
 }
 
 class TokenObserver(
-	val initialValues: List<StateAndRef<FungibleToken>>,
-	val source: Observable<Vault.Update<FungibleToken>>,
-	val ownerProvider: ((StateAndRef<FungibleToken>, VaultWatcherService.IndexingType) -> Holder),
-	inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }
+		val initialValues: List<StateAndRef<FungibleToken>>,
+		val source: Observable<Vault.Update<FungibleToken>>,
+		val ownerProvider: ((StateAndRef<FungibleToken>, IndexingType) -> Holder),
+		inline val asyncLoader: ((Vault.Update<FungibleToken>) -> Unit) -> Unit = { _ -> }
 ) {
 
 	fun startLoading(loadingCallBack: (Vault.Update<FungibleToken>) -> Unit) {
diff --git a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
index 801bf32f..16723123 100644
--- a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
+++ b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
@@ -35,7 +35,6 @@ import net.corda.core.utilities.unwrap
 import java.time.Duration
 import java.time.temporal.ChronoUnit
 import java.util.*
-import javax.swing.plaf.nimbus.State
 
 // This is very simple test flow for DvP.
 @CordaSerializable
diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt
index d598dd02..151f64f4 100644
--- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt
+++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/ConfigSelectionTest.kt
@@ -8,6 +8,7 @@ import com.r3.corda.lib.tokens.selection.database.selector.DatabaseTokenSelectio
 import com.r3.corda.lib.tokens.selection.memory.config.CACHE_SIZE_DEFAULT
 import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
 import com.r3.corda.lib.tokens.selection.memory.selector.LocalTokenSelector
+import com.r3.corda.lib.tokens.selection.memory.services.IndexingType
 import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
 import com.typesafe.config.ConfigFactory
 import net.corda.core.identity.CordaX500Name
@@ -58,14 +59,14 @@ class ConfigSelectionTest {
         val config = ConfigFactory.parseString("stateSelection {\n" +
                 "inMemory {\n" +
                 "cacheSize: 9000\n" +
-                "indexingStrategies: [${VaultWatcherService.IndexingType.PUBLIC_KEY}]\n" +
+                "indexingStrategies: [${IndexingType.PUBLIC_KEY}]\n" +
                 "}\n" +
                 "}")
         val cordappConfig = TypesafeCordappConfig(config)
 
         val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig)
         assertThat(inMemoryConfig.cacheSize).isEqualTo(9000)
-        assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.PUBLIC_KEY))
+        assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.PUBLIC_KEY))
 
         val selection = ConfigSelection.getPreferredSelection(services, cordappConfig)
         assertThat(selection).isInstanceOf(LocalTokenSelector::class.java)
@@ -76,13 +77,13 @@ class ConfigSelectionTest {
         val config = ConfigFactory.parseString("stateSelection {\n" +
                 "inMemory {\n" +
                 "cacheSize: 9000\n" +
-                "indexingStrategies: [\"${VaultWatcherService.IndexingType.EXTERNAL_ID}\", \"${VaultWatcherService.IndexingType.PUBLIC_KEY}\"]\n" +
+                "indexingStrategies: [\"${IndexingType.EXTERNAL_ID}\", \"${IndexingType.PUBLIC_KEY}\"]\n" +
                 "}\n" +
                 "}")
         val cordappConfig = TypesafeCordappConfig(config)
         val inMemoryConfig = InMemorySelectionConfig.parse(cordappConfig)
         assertThat(inMemoryConfig.cacheSize).isEqualTo(9000)
-        assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(VaultWatcherService.IndexingType.EXTERNAL_ID, VaultWatcherService.IndexingType.PUBLIC_KEY))
+        assertThat(inMemoryConfig.indexingStrategies).isEqualTo(listOf(IndexingType.EXTERNAL_ID, IndexingType.PUBLIC_KEY))
     }
 
     @Test
diff --git a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt
index bc8bdb48..c16b1088 100644
--- a/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt
+++ b/workflows/src/test/kotlin/com/r3/corda/lib/tokens/workflows/VaultWatcherServiceTest.kt
@@ -12,6 +12,7 @@ import com.r3.corda.lib.tokens.money.GBP
 import com.r3.corda.lib.tokens.selection.InsufficientBalanceException
 import com.r3.corda.lib.tokens.selection.memory.config.InMemorySelectionConfig
 import com.r3.corda.lib.tokens.selection.memory.internal.Holder
+import com.r3.corda.lib.tokens.selection.memory.services.IndexingType
 import com.r3.corda.lib.tokens.selection.memory.services.TokenObserver
 import com.r3.corda.lib.tokens.selection.memory.services.VaultWatcherService
 import com.r3.corda.lib.tokens.workflows.flows.rpc.IssueTokens
@@ -205,14 +206,14 @@ class VaultWatcherServiceTest {
             }
         }.toMap()
 
-        val ownerProvider = object : (StateAndRef<FungibleToken>, VaultWatcherService.IndexingType) -> Holder {
-            override fun invoke(tokenState: StateAndRef<FungibleToken>, indexingType: VaultWatcherService.IndexingType): Holder {
+        val ownerProvider = object : (StateAndRef<FungibleToken>, IndexingType) -> Holder {
+            override fun invoke(tokenState: StateAndRef<FungibleToken>, indexingType: IndexingType): Holder {
                 return when (indexingType) {
-                    VaultWatcherService.IndexingType.EXTERNAL_ID -> {
+                    IndexingType.EXTERNAL_ID -> {
                         Holder.MappedIdentity(keyToAccount[tokenState.state.data.holder.owningKey]
                                 ?: error("should never happen"))
                     }
-                    VaultWatcherService.IndexingType.PUBLIC_KEY -> {
+                    IndexingType.PUBLIC_KEY -> {
                         Holder.KeyIdentity(tokenState.state.data.holder.owningKey)
                     }
                 }

From 4a693b507ce36a42867b5948a004f32558545c45 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 16:31:39 +0100
Subject: [PATCH 08/10] add signing key

---
 contracts/build.gradle               |   9 ++++++++-
 contracts/signing-infra/keyStore.jks | Bin 0 -> 2633 bytes
 2 files changed, 8 insertions(+), 1 deletion(-)
 create mode 100644 contracts/signing-infra/keyStore.jks

diff --git a/contracts/build.gradle b/contracts/build.gradle
index 21e20c66..a75354a3 100644
--- a/contracts/build.gradle
+++ b/contracts/build.gradle
@@ -47,7 +47,14 @@ cordapp {
         versionId 2
     }
     signing {
-        enabled false
+        enabled true
+        options {
+            keystore "./signing-infra/keyStore.jks"
+            alias "cordapp_signing"
+            storepass "monkey"
+            keypass "monkey"
+            storetype "PKCS12"
+        }
     }
 }
 
diff --git a/contracts/signing-infra/keyStore.jks b/contracts/signing-infra/keyStore.jks
new file mode 100644
index 0000000000000000000000000000000000000000..8542d6dbef43fe53e8ccfeb15a0e24df2582b202
GIT binary patch
literal 2633
zcmY+Ec{CJ?7sqGD7|mD;Gm&IZmcd}i5L5OLvb?8Pma)Xx*Dx|;--=QsV<)n2S;xLS
zQ%G4tqVl3)>`N5J{QQ3B_j~XB&OP_ubH3m2y?=gCQ7}CakOdV5J%&JK;tb>VIe~1z
z{3z%KI10LQBGXV&tYQCWv3i1|Slv!!=aY7Tu>bdp0|o@;M}ZGfQQ&=)Gz9iP`Q-U|
zFr50_5zinG7WHrBi!DrY7w0M!?1ESTedgdO@M|skjNApoq=mrO-!CI>tI-|Z6JPT$
zSlG7^*6>Qw{>9Feh8D8OLiag?HKFU`5J5C@VzNf^7}84!owpuhNGV@HR%p(U-;r2o
zg^xwTGJ-}q&Xi4@a>81h%oG;dl0&;QQ}0)SNJo$`YVHkhyEmyURiCr&BKr6@XQq@_
zWo^_}v)|Y?rX?icVYWM0IznNc+Jw#LcEw+p5)<zc>)Cxk5nc3WU6%CVs>{U1=1A9Y
za5k}-$cxHfJ6$w>>Q<ZXg%ZLe3B6ehcm0`jT)yXhF?8p>$ct9Tsfo@|V$Q|oIsBc_
zWwbx{sRpUg{BE$pyx-Cd2Ys$DdTUC7kDASO%oH+eXUeZH#VLvY0$kM5l<3_wE%9vQ
zR==3TX)R&1;~ehk>JY4$GOoFMw>tZQYF&MB@}S<F=t4y`&}!YI?rl)q=h3R$?Pvb%
zSyb`;?3OPT(VqtA=jvRNrI*ETG<|COhNuM)aYuqyNiE25f+BVtcLQN+Rz|32E=O1!
zB`>m7@zvx8<T17qw?MjP4W3rAQ+X@57iJlhOQ=jzsjV^gm2T+Aijb)K!6~2ABl%}F
z-J7!Ued$h>-jDc^C6T`a3E2kQeJg_2cSA4f-<O@B08uJH1g|$z7oZ0}!$2D~XHPKt
zA4r9@m-AbFogQgy=JTd$eXr>$*|YciIZa!y96J_H)^?ZZo6Fg?DJLbMQ$|3(P*=b4
z&M!CAMXpu6Dh@Nac0>a|h!44SU9#ExOmK;?aPB@PuIe73pGQr#kC$>eq)>!fcA@c5
zK@Yw(Ky`gCzw+4B`7n^<IjZ=+`nKmespIH9>}R`Wmu^bxR0Lp{+p6XDxrqp&0umNa
zFaveKtBV8_Uo=t}f_LOF*5PGmzlB&Usl4<qtIsu5>JhoEa?Drd&7)yV$wn3JspJg4
ztr6a2EBj2NalaVMAEF{PT^Jrb&)EEv5s2-MX`Dl=q1Kxq=_DmT!u&0VJGUUoALG&^
zRvhc`U5!<ZsOz@Afd*dXfzim6fY`p%w~X+CWBUm$>un<On_=n14KaE4i3>Zr&{-Rr
zHMz$v*Io3Fvekx<Gx{HW3MXO7?O33hVcvW+MZ+sI>G(%N`iv+hu|#Lc)H2JOJ7clA
zW=k@IcV6-b$6scMvmLk?CnMWqKS2SBs+hO;UbtO*zjWxbneAjr&VEbjg0!jV0yo4T
z0t<$D0+OWDEw4Oy96okkl3h?NrF1lR4v<G?(_Y$FOM)HT^gG((a>*!&jEG5C^pYqD
zme7FiOQx2@E3|DH#s^0CO!hy!XtJ%|+au0;*&x}TGl3p&WkQ-Urp(1l*{Y1ci(d5g
z{U!%N@-B1U8RrbRI#PK4rb%jolkHLBcCAOcJr90iaU2HPMTgjc`6fmd?0sNt5kD+w
zZfp~4Lrud!iJGHF_ro#+C`fR5)VZ$ho*cRTftA23Xbw^!-`P_e1OUebA2eOuvPshr
zH@U9yBx8qOq3Yn-&(lbgZR(2~sQGn3FPW}^gW-C5w2HkJ*#oRJVt|be-TUZFzdh4K
z6skMlmk7O@%yZC>qlbt$AjNe))T*A{(-uCO{EnN~ucLlaBaA+=xXq*A_3dJ2OhN5?
z7GWtx3zXd7sD#25MTCR^P5>W(KfoD)1NfelJs<#p2e<;fPo_7(6(#yl9H9sY^H|}%
zTtpOA)RfUE4C*opqpEa*Q|G@!Y>*SeTAxTaK|sLCGWy2>{vUH8|7NZx2A!$E&fQ>w
zU7w*E))B9x&4&KN+=wXf8G*G64}YOOu|LfaFI)6}$?bkR-O?a9>p1FemMR~;cU;N&
zBsev<wVIMG&HT}2wqy<H5LGrWfAstlD_n}-sy%jB+z&I0PIRdx<~(9lNFVao>Y-<W
z=~;PK8vX5xeDHg%eDb8Ev7OMdfDUKm+%Ga-dQ|8kgj+o9(=AoKZ$j5nE-QN7@8lY;
z_AlN7;vzrCO%PN^+<q@Y?i5(MCAMY#CgAmQeOzt#r9vXtlJYa_9~$Q5^eRdTtXje^
z#&xA_qn_Jr1G8)&oyquegPE_}$_LZ9lBS<1#L9{6zuQjmeX!ZI?My_>N*RhdVm`vf
zLX+w}x7i&*7Lu_ekg&3cR*C@nyN*A9D!w%t&GA+@#oSXI&xwfPR%c2Dj#1fOFnc8Z
zc{%>rVwH^~K`L{L{$g)W^-Vt8oV5PM*rj}9y0*OMDPe-d-20qR2f!YP{0P9${5)mu
zoqN6XO%OLWUVA~4XF%~q?{55@zRUN!vreuuFKbkaA3~WlQm41g$5eK}6TYWL3_3VZ
zD7IkAzb9~Y`pt-v4@>T1gJ~=S7m*{%es!10Me%QS!xYqD>C#`As{$G~<r?Gt27(Mq
zu?I0OBvddmPfav)xm+9L{d9m$YdDBQ`TekAfBtV38>S1L+2;us4T%m!QkDkk5U~H7
z`k(4+5urA9#ihpe0;XZU!AxbcSAn?Z*DMd5Lbn4Mdj$<dROQ>bpr+p_wQ^t>?_!Lc
zFJprftb?J3Ifw2Je+gu>53?ZJJ?XEgSA4uIb-vK<)P0@R5%LtFa8WB|*)>^3QXb}!
zChL=^*(?fnGOxe)j_8={>V2@6TP^s-uiZ3HSJvWVLZKmv+&-&~lDb=MUf|r~6ayYX
zfx^R^`-M+xy2j<64%Dal#&?j8e{sRI$olv5fhv&)-imv5L%H0-eq5`Q#Nie7N@I~r
zLq42-?!02;M-1bH?~X`|B=r|AI_)_Z%u81%8-lDIWa!jdFn@C9sZ&-S#U@)+dE5j1
zA1Rxmg>|U>1K&^8nldmLyG<|hxEM=j_C#fQX$>AAC7TY7zWiR=#t`vg4|Pc6xX+Pk
z5w4(Hija?scxM|9BRn63`E?a=necWC3aYoj`@~!K<eV<?(<&`Q$K(#wIe}cNq4V)m
zILC~hZ*|W|L8(&?xb6m-4i4AZPF8f&y)Mw6iqX!wmJchNvZf{l9|GP(ubwS``M57d
zKXko_A3CfggA7s<b;o}f+7u({<xEZ3KZ!RnaxlQ_qV!88Z>o#AM&=1vy?0Zj-ibZ!
z)Vn`s>Ty~tva%sE?L%O|3u@N@XygIKJT02!VM;Gn&0uO;Xv1Si#u{IBAZlvt!;3H=
zi^n``Lmr2-Ey!p;@_GqM3nhwzLRe+FSU>`-05F_|7FOSd+v;(Vs}|EE8gC3778Qcw
d${|$wAzbtfkR;;1+8+rrV5-~%u>j-p{{___$sPaz

literal 0
HcmV?d00001


From 5cd86241e402aed5190d3a4079b6bdb4d4b8d114 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Tue, 7 Dec 2021 16:34:08 +0100
Subject: [PATCH 09/10] change version number

---
 build.gradle                 | 2 +-
 freighter-tests/build.gradle | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/build.gradle b/build.gradle
index 004f1af2..93614188 100644
--- a/build.gradle
+++ b/build.gradle
@@ -5,7 +5,7 @@ buildscript {
         corda_release_group = 'net.corda'
         corda_release_version = '4.3'
         tokens_release_group = "com.r3.corda.lib.tokens"
-        tokens_release_version = "1.2.5-SNAPSHOT"
+        tokens_release_version = "1.2.5-ASYNC-LOADING-SNAPSHOT"
         corda_gradle_plugins_version = '5.0.12'
         kotlin_version = '1.2.71'
         junit_version = '4.12'
diff --git a/freighter-tests/build.gradle b/freighter-tests/build.gradle
index fe1c6bfa..3868afbc 100644
--- a/freighter-tests/build.gradle
+++ b/freighter-tests/build.gradle
@@ -26,7 +26,7 @@ task freighterTest(type: Test, dependsOn: [project(":workflows").jar]) {
     classpath = sourceSets.freighterTest.runtimeClasspath
     useJUnitPlatform {
         includeTags "DOCKER"
-        excludeTags "AZURE", "FULL_LINUX_KERNEL", "ORACLE"
+        excludeTags "AZURE", "FULL_LINUX_KERNEL", "ORACLE", "LARGE_TEST"
     }
 
     testLogging {

From 8925697fc2aa63bf9b973a51055b0ed3298c0fc1 Mon Sep 17 00:00:00 2001
From: Stefano Franz <roastario@gmail.com>
Date: Wed, 8 Dec 2021 12:14:09 +0100
Subject: [PATCH 10/10] attempt to fix IT now that tokens are loaded async

---
 .../testing/HugeTokensLoadedOnRestartTest.kt        |  6 ++----
 .../memory/services/ServiceHubAsyncLoader.kt        |  3 ++-
 .../memory/services/VaultWatcherService.kt          |  5 +++++
 .../lib/tokens/integrationTest/TokenDriverTest.kt   | 10 +++++++---
 .../lib/tokens/integration/workflows/TestFlows.kt   | 13 +++++++++++++
 5 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
index 23627df3..0f4145a2 100644
--- a/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
+++ b/freighter-tests/src/freighterTest/kotlin/freighter/testing/HugeTokensLoadedOnRestartTest.kt
@@ -29,7 +29,6 @@ import kotlin.streams.toList
 @Tag("LARGE_TEST")
 annotation class LargeTest
 
-@LargeTest
 class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 
     val loadingThreads = 8
@@ -98,7 +97,7 @@ class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
 
         val tokenToIssue = (0.until(100)).map { FungibleToken(Amount(1, issuedTokenType), createdCi) }.toList()
 
-        val numberIssued = StreamSupport.stream((0.until(5000)).chunked(200).spliterator(), true).map { toIssue ->
+        val numberIssued = StreamSupport.stream((0.until(500)).chunked(200).spliterator(), true).map { toIssue ->
             nodeMachine1.rpc {
                 repeat(toIssue.size) {
                     startFlow(
@@ -107,10 +106,9 @@ class HugeTokensLoadedOnRestartTest : DockerRemoteMachineBasedTest() {
                     ).returnValue.getOrThrow(Duration.ofMinutes(1))
                     println("[${Thread.currentThread().name}] Total number issued: ${issuedTotal.addAndGet(tokenToIssue.size * 1L)}")
                 }
-
             }
 
-            toIssue.size
+            toIssue.size * tokenToIssue.size
         }.toList().sum()
 
         nodeMachine1.stopNode()
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt
index c5f9568a..62fab99f 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/ServiceHubAsyncLoader.kt
@@ -45,6 +45,7 @@ class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub,
                 CompletableFuture.runAsync {
                     try {
                         while (shouldLoop.get()) {
+                            LOG.info("loading page: ${pageNumber.get() + 1}, should loop: ${shouldLoop.get()}")
                             val newlyLoadedStates = appServiceHub.vaultService.queryBy<FungibleToken>(
                                     paging = PageSpecification(pageNumber = pageNumber.addAndGet(1), pageSize = configOptions.pageSize),
                                     criteria = QueryCriteria.VaultQueryCriteria(contractStateTypes = subclasses),
@@ -68,7 +69,7 @@ class ServiceHubAsyncLoader(private val appServiceHub: AppServiceHub,
             }
         }
     }
-    
+
     companion object {
         val LOG = contextLogger()
     }
diff --git a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
index 0a48da64..0c01b700 100644
--- a/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
+++ b/modules/selection/src/main/kotlin/com.r3.corda.lib.tokens.selection/memory/services/VaultWatcherService.kt
@@ -180,6 +180,11 @@ class VaultWatcherService(
 	}
 
 	fun lockTokensExternal(list: List<StateAndRef<FungibleToken>>, knownSelectionId: String, autoUnlockDelay: Duration? = null) {
+
+		list.forEach {
+			__backingMap.putIfAbsent(it, PLACE_HOLDER)
+		}
+
 		list.forEach {
 			__backingMap.replace(it, PLACE_HOLDER, knownSelectionId)
 		}
diff --git a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
index 66214385..78c4503d 100644
--- a/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
+++ b/workflows-integration-test/src/integrationTest/kotlin/com/r3/corda/lib/tokens/integrationTest/TokenDriverTest.kt
@@ -55,6 +55,7 @@ import org.hamcrest.CoreMatchers.`is`
 import org.hamcrest.CoreMatchers.equalTo
 import org.junit.Assert
 import org.junit.Test
+import java.util.concurrent.CountDownLatch
 import kotlin.test.assertFailsWith
 
 class TokenDriverTest {
@@ -264,7 +265,7 @@ class TokenDriverTest {
         }
     }
 
-    @Test
+    @Test(timeout = 300_000)
     fun `tokens locked in memory are still locked after restart`() {
         driver(DriverParameters(
                 inMemoryDB = false,
@@ -293,11 +294,14 @@ class TokenDriverTest {
                     emptyList<Party>()
             ).returnValue.getOrThrow()
             // Run select and lock tokens flow
-            node.rpc.startFlowDynamic(
+            val pt = node.rpc.startTrackedFlowDynamic(
                     SelectAndLockFlow::class.java,
                     50.GBP,
                     50.seconds
-            )
+            ).returnValue
+
+            Thread.sleep(10_000)
+
             // Stop node
             (node as OutOfProcess).process.destroyForcibly()
             node.stop()
diff --git a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
index 16723123..cc32e840 100644
--- a/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
+++ b/workflows-integration-test/src/main/kotlin/com/r3/corda/lib/tokens/integration/workflows/TestFlows.kt
@@ -30,6 +30,7 @@ import net.corda.core.identity.Party
 import net.corda.core.serialization.CordaSerializable
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.transactions.TransactionBuilder
+import net.corda.core.utilities.ProgressTracker
 import net.corda.core.utilities.seconds
 import net.corda.core.utilities.unwrap
 import java.time.Duration
@@ -134,11 +135,23 @@ class RedeemFungibleGBP(
 // Helper flow for selection testing
 @StartableByRPC
 class SelectAndLockFlow(val amount: Amount<TokenType>, val delay: Duration = 1.seconds) : FlowLogic<Unit>() {
+
+
+	companion object {
+		val SELECTED =  ProgressTracker.Step("SELECTED")
+	}
+
+	override val progressTracker: ProgressTracker?
+		get() = ProgressTracker(ProgressTracker.STARTING, SELECTED, ProgressTracker.DONE)
+
 	@Suspendable
 	override fun call() {
+		progressTracker?.currentStep = ProgressTracker.STARTING
 		val selector = LocalTokenSelector(serviceHub)
 		selector.selectTokens(amount)
+		progressTracker?.currentStep = SELECTED
 		sleep(delay)
+		progressTracker?.currentStep = ProgressTracker.DONE
 	}
 }