Skip to content

Commit

Permalink
Integrate :tool:execution:parallel with :tool:log.
Browse files Browse the repository at this point in the history
Add function for creating Parallel.Type object dynamically.
Improve error message for Parallel.Context.lazyProperty.
Move implementations to internal package.
Add function for checking returned state.
  • Loading branch information
jan-goral authored and mergify-bot committed Jul 9, 2021
1 parent d8a97ea commit 458477d
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 64 deletions.
1 change: 1 addition & 0 deletions tool/execution/parallel/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repositories {
tasks.withType<KotlinCompile> { kotlinOptions.jvmTarget = "1.8" }

dependencies {
api(project(":tool:log"))
implementation(Dependencies.KOTLIN_COROUTINES_CORE)
testImplementation(Dependencies.JUNIT)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flank.exection.parallel

import flank.exection.parallel.internal.EagerProperties
import flank.exection.parallel.internal.dynamicType

// ======================= Signature =======================

Expand Down Expand Up @@ -34,9 +35,16 @@ infix fun <R : Any> Parallel.Type<R>.using(

/**
* Factory function for creating special task that can validate arguments before execution.
* Typically the [Parallel.Context] with added [EagerProperties] is used to validate initial state.
* Typically, the [Parallel.Context] with added [EagerProperties] is used to validate initial state.
*/
internal fun <C : Parallel.Context> validator(
context: (() -> C)
): Parallel.Task<Unit> =
context() using { context().also { it.state = this }.run { validate() } }

// ======================= Type =======================

/**
* Factory function for creating dynamic [Parallel.Type].
*/
inline fun <reified T : Any> type(): Parallel.Type<T> = dynamicType(T::class.java)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flank.exection.parallel

import flank.exection.parallel.internal.Execution
import flank.exection.parallel.internal.invoke
import flank.exection.parallel.internal.minusContextValidators
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -12,7 +13,7 @@ import kotlinx.coroutines.flow.Flow
infix operator fun Tasks.invoke(
args: ParallelState
): Flow<ParallelState> =
Execution(this, args).invoke()
Execution(minusContextValidators(), args).invoke()

// ======================= Extensions =======================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flank.exection.parallel
import flank.exection.parallel.internal.ContextProvider
import flank.exection.parallel.internal.EagerProperties
import flank.exection.parallel.internal.lazyProperty
import java.lang.System.currentTimeMillis
import flank.log.Output

// ======================= Types =======================

Expand All @@ -13,7 +13,7 @@ import java.lang.System.currentTimeMillis
object Parallel {

/**
* Abstraction for execution data provider which is also an context for task execution.
* Abstraction for execution data provider which is also a context for task execution.
* For initialization purpose some properties are exposed as variable.
*/
open class Context : Type<Unit> {
Expand Down Expand Up @@ -67,7 +67,7 @@ object Parallel {
/**
* The task signature.
*
* @param type A return type of a task
* @param type A return a type of task
* @param args A set of types for arguments
*/
data class Signature<R : Any>(
Expand All @@ -81,15 +81,6 @@ object Parallel {
*/
class Function<X : Context>(override val context: () -> X) : ContextProvider<X>()

data class Event internal constructor(
val type: Type<*>,
val data: Any,
val timestamp: Long = currentTimeMillis(),
) {
object Start
object Stop
}

object Logger : Type<Output>

/**
Expand All @@ -111,11 +102,6 @@ object Parallel {
*/
typealias ExecuteTask<R> = suspend ParallelState.() -> R

/**
* Common signature for structural log output.
*/
typealias Output = Any.() -> Unit

/**
* Immutable state for parallel execution.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package flank.exection.parallel

import flank.exection.parallel.internal.contextValidators
import flank.exection.parallel.internal.contextValidatorTypes
import flank.exection.parallel.internal.reduceTo
import flank.exection.parallel.internal.type

/**
* Reduce given [Tasks] by [expected] types to remove unneeded tasks from the graph.
* The returned graph will hold only tasks that are returning selected types, their dependencies and derived dependencies.
* Additionally this is keeping also the validators for initial state.
* Additionally, this is keeping also the validators for initial state.
*
* @return Reduced [Tasks]
*/
operator fun Tasks.invoke(
expected: Set<Parallel.Type<*>>
): Tasks =
reduceTo(expected + contextValidators())
reduceTo(expected + contextValidatorTypes())

/**
* Shortcut for tasks reducing.
*/
operator fun Tasks.invoke(
vararg expected: Parallel.Type<*>
): Tasks = invoke(expected.toSet())

/**
* Remove the [Tasks] by given [types].
*/
operator fun Tasks.minus(
types: Set<Parallel.Type<*>>
): Tasks =
filterNot { task -> task.type in types }.toSet()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package flank.exection.parallel

/**
* Select value by type.
*/
@Suppress("UNCHECKED_CAST")
fun <T : Any> ParallelState.select(type: Parallel.Type<T>) = get(type) as T
Original file line number Diff line number Diff line change
@@ -1,42 +1,12 @@
package flank.exection.parallel

import flank.exection.parallel.internal.args
import flank.exection.parallel.internal.graph.findCycles
import flank.exection.parallel.internal.graph.findDuplicatedDependencies
import flank.exection.parallel.internal.graph.findMissingDependencies
import flank.exection.parallel.internal.type
import kotlinx.coroutines.runBlocking
import flank.exection.parallel.internal.validateExecutionGraphs

/**
* Validate the given [Tasks] and [ParallelState] for finding missing dependencies or broken paths.
*
* @param initial The initial arguments for tasks execution.
* @return Valid [Tasks] if graph has no broken paths or missing dependencies.
*/
fun Tasks.validate(initial: ParallelState = emptyMap()): Tasks = run {
// Separate initial validators from tasks. Validators are important now but not during the execution.
val (validators, tasks) = splitTasks()

// check if initial state is providing all required values specified in context.
runBlocking { validators.forEach { check -> check.execute(initial) } }

map(Parallel.Task<*>::type).findDuplicatedDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Duplicate(this)
}

val graph = associate { task -> task.type to task.args }

graph.findMissingDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Missing(this)
}

graph.findCycles().run {
if (isNotEmpty()) throw Parallel.DependenciesError.Cycles(this)
}

tasks.toSet()
}

private fun Iterable<Parallel.Task<*>>.splitTasks() = this
.groupBy { task -> task.type is Parallel.Context }
.run { getOrDefault(true, emptyList()) to getOrDefault(false, emptyList()) }
fun Tasks.validate(initial: ParallelState = emptyMap()): Tasks =
validateExecutionGraphs(initial)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package flank.exection.parallel

import flank.exection.parallel.internal.checkThrowableValues

/**
* Verify given [ParallelState] has no errors.
*/
fun ParallelState.verify(): ParallelState = checkThrowableValues()
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Output
import flank.exection.parallel.Parallel
import flank.exection.parallel.Parallel.Event
import flank.exection.parallel.Parallel.Logger
import flank.exection.parallel.Parallel.Task
import flank.exection.parallel.Parallel.Type
import flank.exection.parallel.ParallelState
import flank.exection.parallel.Property
import flank.exection.parallel.Tasks
import flank.log.Event
import flank.log.Output
import flank.log.event
import flank.log.normalize
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -221,7 +223,7 @@ private suspend fun Execution.execute(
jobs += type to scope.launch {

// Extend root output for adding additional data.
val out: Output = output?.let { { it(Event(type, this)) } } ?: {}
val out: Output = output?.normalize { type event it } ?: {}

// Log the task was started
Event.Start.out()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import flank.exection.parallel.Tasks

/**
* Get initial state validators.
*
* This is necessary to perform validations of initial state before the execution.
*/
internal fun Tasks.contextValidators(): List<Parallel.Context> =
internal fun Tasks.contextValidatorTypes(): List<Parallel.Context> =
mapNotNull { task -> task.type as? Parallel.Context }

/**
* Return graph without context validation tasks.
*
* Typically, context validation tasks should be used for testing purposes so running them on production is redundant.
* This function can be used to filter them out.
*/
internal fun Tasks.minusContextValidators(): Tasks =
filterNot { task -> task.type is Parallel.Context }.toSet()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import flank.exection.parallel.ParallelState
*/
internal fun <T : Any> Parallel.Context.lazyProperty(type: Parallel.Type<T>) = lazy {
@Suppress("UNCHECKED_CAST")
state[type] as T
state[type] as? T ?: throw IllegalStateException(
"Cannot resolve dependency of type: $type. Make sure is specified as argument"
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Parallel

fun <T : Any> dynamicType(type: Class<T>): Parallel.Type<T> = DynamicType(type)

internal class DynamicType<T : Any>(val type: Class<T>) : Parallel.Type<T> {
override fun equals(other: Any?): Boolean = (other as? DynamicType<*>)?.type == type
override fun hashCode(): Int = type.hashCode() + javaClass.hashCode()
override fun toString(): String = type.canonicalName
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Parallel
import flank.exection.parallel.ParallelState
import flank.exection.parallel.Tasks
import flank.exection.parallel.internal.graph.findCycles
import flank.exection.parallel.internal.graph.findDuplicatedDependencies
import flank.exection.parallel.internal.graph.findMissingDependencies
import kotlinx.coroutines.runBlocking

// TODO: Check all cases and collect results, instead of throwing the first encountered error.
internal fun Tasks.validateExecutionGraphs(initial: ParallelState = emptyMap()): Tasks = run {
// Separate initial validators from tasks. Validators are important now but not during the execution.
val (validators, tasks) = this
.groupBy { task -> task.type is Parallel.Context }
.run { getOrDefault(true, emptyList()) to getOrDefault(false, emptyList()) }

// check if initial state is providing all required values specified in context.
runBlocking { validators.forEach { check -> check.execute(initial) } }

map(Parallel.Task<*>::type).findDuplicatedDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Duplicate(this)
}

val graph = associate { task -> task.type to task.args }

graph.findMissingDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Missing(this)
}

graph.findCycles().run {
if (isNotEmpty()) throw Parallel.DependenciesError.Cycles(this)
}

tasks.toSet()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package flank.exection.parallel.internal

import flank.exection.parallel.ParallelState

internal fun ParallelState.checkThrowableValues(): ParallelState =
onEach { (_, value) -> if (value is Throwable) throw value }
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class ExecuteKtTest {
)
val actual = runBlocking { execute(args).last() }

assert(actual[A] is NullPointerException)
assert(actual[A] is IllegalStateException)
}

/**
Expand Down
7 changes: 5 additions & 2 deletions tool/log/src/main/kotlin/flank/log/Event.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package flank.log
/**
* Structural log representation
*
* @property context Any object that can be treated as a context for a group of events.
* @property type Unique identifier for value. Could be a KClass of [Event.Type] depending on value type.
* @property value Logged value
* @property value Logged value.
* @property timestamp Time when the event occurs. Unix Timestamp in milliseconds.
*/
data class Event<V : Any> internal constructor(
val context: Any,
val type: Any,
val value: V,
val timestamp: Long = System.currentTimeMillis(),
) {
/**
* Interface of event data identified by KClass
Expand Down Expand Up @@ -41,5 +44,5 @@ infix fun Any.event(any: Any): Event<out Any> = when (any) {
is Pair<*, *> -> Event(this, any.first!!, any.second!!)
is Event.Data -> Event(this, any::class.java, any)
is Event.Type<*> -> Event(this, any, Unit)
else -> Event(this, any::class.java, any) // Consider to disallow anonymous types
else -> Event(this, any::class.java, any) // Consider disallow unknown types.
}
2 changes: 1 addition & 1 deletion tool/log/src/main/kotlin/flank/log/Logger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface Logger {
/**
* Logging function signature.
*/
typealias Output = Any.() -> Unit
typealias Output = GenericOutput<Any>

/**
* Generic logging function signature.
Expand Down

0 comments on commit 458477d

Please sign in to comment.