Skip to content

Commit

Permalink
Add Validator (#500)
Browse files Browse the repository at this point in the history
* Resolve conflicts

Signed-off-by: mramotar <[email protected]>

* Fix rebase issues

Signed-off-by: mramotar <[email protected]>

* Fix tests

Signed-off-by: mramotar <[email protected]>

Signed-off-by: mramotar <[email protected]>
  • Loading branch information
matt-ramotar authored and aclassen committed Jan 16, 2023
1 parent ae07e06 commit 01cfe83
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,46 @@ package org.mobilenativefoundation.store.store5

interface Converter<Network : Any, Output : Any, Local : Any> {
fun fromNetworkToOutput(network: Network): Output?
fun fromOutputToLocal(common: Output): Local?
fun fromLocalToOutput(sourceOfTruth: Local): Output?
fun fromOutputToLocal(output: Output): Local?
fun fromLocalToOutput(local: Local): Output?

class Builder<Network : Any, Output : Any, Local : Any> {

private var fromOutputToLocal: ((value: Output) -> Local)? = null
private var fromNetworkToOutput: ((value: Network) -> Output)? = null
private var fromLocalToOutput: ((value: Local) -> Output)? = null
private var fromOutputToLocal: ((output: Output) -> Local)? = null
private var fromNetworkToOutput: ((network: Network) -> Output)? = null
private var fromLocalToOutput: ((local: Local) -> Output)? = null

fun build(): Converter<Network, Output, Local> =
RealConverter(fromOutputToLocal, fromNetworkToOutput, fromLocalToOutput)

fun fromOutputToLocal(converter: (value: Output) -> Local): Builder<Network, Output, Local> {
fun fromOutputToLocal(converter: (output: Output) -> Local): Builder<Network, Output, Local> {
fromOutputToLocal = converter
return this
}

fun fromLocalToOutput(converter: (value: Local) -> Output): Builder<Network, Output, Local> {
fun fromLocalToOutput(converter: (local: Local) -> Output): Builder<Network, Output, Local> {
fromLocalToOutput = converter
return this
}

fun fromNetworkToOutput(converter: (value: Network) -> Output): Builder<Network, Output, Local> {
fun fromNetworkToOutput(converter: (network: Network) -> Output): Builder<Network, Output, Local> {
fromNetworkToOutput = converter
return this
}
}
}

private class RealConverter<Network : Any, Output : Any, Local : Any>(
private val fromOutputToLocal: ((value: Output) -> Local)?,
private val fromNetworkToOutput: ((value: Network) -> Output)?,
private val fromLocalToOutput: ((value: Local) -> Output)?,
private val fromOutputToLocal: ((output: Output) -> Local)?,
private val fromNetworkToOutput: ((network: Network) -> Output)?,
private val fromLocalToOutput: ((local: Local) -> Output)?,
) : Converter<Network, Output, Local> {
override fun fromNetworkToOutput(network: Network): Output? =
fromNetworkToOutput?.invoke(network)

override fun fromOutputToLocal(common: Output): Local? =
fromOutputToLocal?.invoke(common)
override fun fromOutputToLocal(output: Output): Local? =
fromOutputToLocal?.invoke(output)

override fun fromLocalToOutput(sourceOfTruth: Local): Output? =
fromLocalToOutput?.invoke(sourceOfTruth)
override fun fromLocalToOutput(local: Local): Output? =
fromLocalToOutput?.invoke(local)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ interface StoreBuilder<Key : Any, Network : Any, Output : Any, Local : Any> {
fun converter(converter: Converter<Network, Output, Local>):
StoreBuilder<Key, Network, Output, Local>

fun validator(validator: Validator<Output>): StoreBuilder<Key, Network, Output, Local>

companion object {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ internal class FetcherController<Key : Any, Network : Any, Output : Any, Local :
piggybackingDownstream = true,
onEach = { response ->
response.dataOrNull()?.let { network ->
val input =
network as? Output ?: converter?.fromNetworkToOutput(network)
if (input != null) {
sourceOfTruth?.write(key, input)
}
val output = converter?.fromNetworkToOutput(network)
val input = output ?: network
sourceOfTruth?.write(key, input as Output)
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,7 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
return lastFailedSync != null || writeRequestsQueueIsEmpty(key).not()
}

@AnyThread
private suspend fun writeRequestsQueueIsEmpty(key: Key): Boolean = withThreadSafety(key) {
keyToWriteRequestQueue[key].isNullOrEmpty()
}
private fun writeRequestsQueueIsEmpty(key: Key): Boolean = keyToWriteRequestQueue[key].isNullOrEmpty()

private suspend fun <Response : Any> addWriteRequestToQueue(writeRequest: StoreWriteRequest<Key, Output, Response>) =
withWriteRequestQueueLock(writeRequest.key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import org.mobilenativefoundation.store.store5.Validator
import org.mobilenativefoundation.store.store5.impl.operators.Either
import org.mobilenativefoundation.store.store5.impl.operators.merge
import org.mobilenativefoundation.store.store5.internal.result.StoreDelegateWriteResult
Expand All @@ -44,7 +45,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
scope: CoroutineScope,
fetcher: Fetcher<Key, Network>,
sourceOfTruth: SourceOfTruth<Key, Local>? = null,
converter: Converter<Network, Output, Local>? = null,
private val converter: Converter<Network, Output, Local>? = null,
private val validator: Validator<Output>?,
private val memoryPolicy: MemoryPolicy<Key, Output>?
) : Store<Key, Output> {
/**
Expand Down Expand Up @@ -88,12 +90,18 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
converter = converter
)

@Suppress("UNCHECKED_CAST")
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
flow {
val cachedToEmit = if (request.shouldSkipCache(CacheType.MEMORY)) {
null
} else {
memCache?.getIfPresent(request.key)
val output = memCache?.getIfPresent(request.key)
when {
output == null -> null
validator?.isValid(output) == false -> null
else -> output
}
}

cachedToEmit?.let {
Expand All @@ -114,27 +122,43 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
diskNetworkCombined(request, sourceOfTruth)
}
emitAll(
stream.transform {
emit(it)
if (it is StoreReadResponse.NoNewData && cachedToEmit == null) {
// In the special case where fetcher returned no new data we actually want to
// serve cache data (even if the request specified skipping cache and/or SoT)
//
// For stream(Request.cached(key, refresh=true)) we will return:
// Cache
// Source of truth
// Fetcher - > Loading
// Fetcher - > NoNewData
// (future Source of truth updates)
//
// For stream(Request.fresh(key)) we will return:
// Fetcher - > Loading
// Fetcher - > NoNewData
// Cache
// Source of truth
// (future Source of truth updates)
memCache?.getIfPresent(request.key)?.let {
emit(StoreReadResponse.Data(value = it, origin = StoreReadResponseOrigin.Cache))
stream.transform { output ->
val data = output.dataOrNull()
val shouldSkipValidation = validator == null || data == null || output.origin == StoreReadResponseOrigin.Fetcher
if (data != null && !shouldSkipValidation && validator?.isValid(data) == false) {
fetcherController.getFetcher(request.key, false).collect { storeReadResponse ->
val network = storeReadResponse.dataOrNull()
if (network != null) {
val newOutput = converter?.fromNetworkToOutput(network) ?: network as? Output
if (newOutput != null) {
emit(StoreReadResponse.Data(newOutput, origin = StoreReadResponseOrigin.Fetcher))
} else {
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Fetcher))
}
}
}
} else {
emit(output)
if (output is StoreReadResponse.NoNewData && cachedToEmit == null) {
// In the special case where fetcher returned no new data we actually want to
// serve cache data (even if the request specified skipping cache and/or SoT)
//
// For stream(Request.cached(key, refresh=true)) we will return:
// Cache
// Source of truth
// Fetcher - > Loading
// Fetcher - > NoNewData
// (future Source of truth updates)
//
// For stream(Request.fresh(key)) we will return:
// Fetcher - > Loading
// Fetcher - > NoNewData
// Cache
// Source of truth
// (future Source of truth updates)
memCache?.getIfPresent(request.key)?.let {
emit(StoreReadResponse.Data(value = it, origin = StoreReadResponseOrigin.Cache))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreBuilder
import org.mobilenativefoundation.store.store5.StoreDefaults
import org.mobilenativefoundation.store.store5.Updater
import org.mobilenativefoundation.store.store5.Validator
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore

fun <Key : Any, Network : Any, Output : Any> storeBuilderFromFetcher(
Expand All @@ -31,6 +32,7 @@ internal class RealStoreBuilder<Key : Any, Network : Any, Output : Any, Local :
private var scope: CoroutineScope? = null
private var cachePolicy: MemoryPolicy<Key, Output>? = StoreDefaults.memoryPolicy
private var converter: Converter<Network, Output, Local>? = null
private var validator: Validator<Output>? = null

override fun scope(scope: CoroutineScope): StoreBuilder<Key, Network, Output, Local> {
this.scope = scope
Expand All @@ -47,6 +49,11 @@ internal class RealStoreBuilder<Key : Any, Network : Any, Output : Any, Local :
return this
}

override fun validator(validator: Validator<Output>): StoreBuilder<Key, Network, Output, Local> {
this.validator = validator
return this
}

override fun converter(converter: Converter<Network, Output, Local>): StoreBuilder<Key, Network, Output, Local> {
this.converter = converter
return this
Expand All @@ -57,7 +64,8 @@ internal class RealStoreBuilder<Key : Any, Network : Any, Output : Any, Local :
sourceOfTruth = sourceOfTruth,
fetcher = fetcher,
memoryPolicy = cachePolicy,
converter = converter
converter = converter,
validator = validator
)

override fun <UpdaterResult : Any> build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ internal class SourceOfTruthWithBarrier<Key : Any, Network : Any, Output : Any,
}
val readFlow: Flow<StoreReadResponse<Output?>> = when (barrierMessage) {
is BarrierMsg.Open ->
delegate.reader(key).mapIndexed { index, sourceOfTruth ->
delegate.reader(key).mapIndexed { index, local ->
if (index == 0 && messageArrivedAfterMe) {
val firstMsgOrigin = if (writeError == null) {
// restarted barrier without an error means write succeeded
Expand All @@ -89,22 +89,24 @@ internal class SourceOfTruthWithBarrier<Key : Any, Network : Any, Output : Any,
StoreReadResponseOrigin.SourceOfTruth
}

val value = sourceOfTruth as? Output ?: if (sourceOfTruth != null) {
converter?.fromLocalToOutput(sourceOfTruth)
} else {
null
val output = when {
local != null -> converter?.fromLocalToOutput(local) ?: local as? Output
else -> null
}

StoreReadResponse.Data(
origin = firstMsgOrigin,
value = value
value = output
)
} else {
val output = when {
local != null -> converter?.fromLocalToOutput(local) ?: local as? Output
else -> null
}

StoreReadResponse.Data(
origin = StoreReadResponseOrigin.SourceOfTruth,
value = sourceOfTruth as? Output
?: if (sourceOfTruth != null) converter?.fromLocalToOutput(
sourceOfTruth
) else null
value = output
) as StoreReadResponse<Output?>
}
}.catch { throwable ->
Expand Down Expand Up @@ -151,10 +153,9 @@ internal class SourceOfTruthWithBarrier<Key : Any, Network : Any, Output : Any,
try {
barrier.emit(BarrierMsg.Blocked(versionCounter.incrementAndGet()))
val writeError = try {
val input = value as? Local ?: converter?.fromOutputToLocal(value)
if (input != null) {
delegate.write(key, input)
}
val local = converter?.fromOutputToLocal(value)
val input = local ?: value
delegate.write(key, input as Local)
null
} catch (throwable: Throwable) {
if (throwable !is CancellationException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.mobilenativefoundation.store.store5.impl.extensions

import kotlinx.datetime.Clock
import kotlin.time.Duration.Companion.hours

internal fun now() = Clock.System.now().toEpochMilliseconds()
internal fun inHours(n: Int) = Clock.System.now().plus(n.hours).toEpochMilliseconds()
Loading

0 comments on commit 01cfe83

Please sign in to comment.