Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine should send notification about node status #3729

Merged
merged 23 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
76b5a89
Collect invalidated cached nodes and tell IDE about them
JaroslavTulach Sep 22, 2022
7a7c11d
Deliver Pending even to the IDE
JaroslavTulach Sep 22, 2022
aedd98e
Some tests may ignore expression update messages
JaroslavTulach Sep 23, 2022
ee488ae
Make sure setExpressionUnsync is called all the time
JaroslavTulach Sep 23, 2022
2c7652d
notifyPendingCacheItems in ProgramExecutionSupport before executionSe…
JaroslavTulach Sep 26, 2022
c471606
Merging with develop branch
JaroslavTulach Sep 26, 2022
91d37d4
Down to just 20 Runtime*Test failures
JaroslavTulach Sep 26, 2022
33b6f47
RuntimeErrorsTest passes OK
JaroslavTulach Sep 26, 2022
eb6e55d
RuntimeVisualizationsTest passes OK
JaroslavTulach Sep 27, 2022
8e056c6
IncrementalUpdatesTest passes OK
JaroslavTulach Sep 27, 2022
42bc4c9
BuiltinTypesTest passes OK
JaroslavTulach Sep 27, 2022
83cec5c
Just five failing tests in RuntimeServerTest
JaroslavTulach Sep 27, 2022
195ca33
Give important metadata items recognizable UUIDs
JaroslavTulach Sep 27, 2022
228567e
RuntimeServerTest passes OK
JaroslavTulach Sep 27, 2022
b1631e6
All runtime-with-instruments tests are fixed
JaroslavTulach Sep 27, 2022
c4e818a
misc: no collection converters
4e6 Sep 27, 2022
ffd0384
Documenting the protocol change
JaroslavTulach Sep 27, 2022
875ee4d
Applying sbt scalafmtAll
JaroslavTulach Sep 27, 2022
0ec1a2f
Merging with Dmitry's changes
JaroslavTulach Sep 27, 2022
f358d8f
Fixing typos in the documentation
JaroslavTulach Sep 27, 2022
a847e25
Merge with develop branch and Marcin's statics
JaroslavTulach Sep 28, 2022
134909a
Better syntax and formatting for the Rust files
JaroslavTulach Sep 28, 2022
a1165aa
Disabling the IDE rendering changes
JaroslavTulach Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ pub enum ExpressionUpdatePayload {
message: String,
trace: Vec<ExpressionId>,
},
#[serde(rename_all = "camelCase")]
Pending {
message: Option<String>,
progress:Option<f64>,
},
}


Expand Down
7 changes: 4 additions & 3 deletions app/gui/src/presenter/graph/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl Expressions {
///
/// This structure keeps the information how the particular graph elements received from controllers
/// are represented in the view. It also handles updates from the controllers and
/// the view in `update_from_controller` and `update_from_view` respectively.
/// the view in `update_from_controller` and `update_from_view` respectively.
#[derive(Clone, Debug, Default)]
pub struct State {
nodes: RefCell<Nodes>,
Expand Down Expand Up @@ -492,6 +492,7 @@ impl<'a> ControllerChange<'a> {
None | Some(Value) => None,
Some(DataflowError { trace }) => Some((Kind::Dataflow, None, trace)),
Some(Panic { message, trace }) => Some((Kind::Panic, Some(message), trace)),
Some(Pending { message: _, progress: _ }) => Some((Kind::Panic, Some("Pending...".to_string()), Vec::new())),
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
}?;
let propagated = if kind == Kind::Panic {
let nodes = self.nodes.borrow();
Expand Down Expand Up @@ -678,15 +679,15 @@ impl<'a> ViewChange<'a> {

impl<'a> ViewChange<'a> {
/// If the connections does not already exist, it is created and corresponding to-be-created
/// Ast connection is returned.
/// Ast connection is returned.
pub fn create_connection(&self, connection: view::graph_editor::Edge) -> Option<AstConnection> {
let source = connection.source()?;
let target = connection.target()?;
self.create_connection_from_endpoints(connection.id(), source, target)
}

/// If the connections with provided endpoints does not already exist, it is created and
/// corresponding to-be-created Ast connection is returned.
/// corresponding to-be-created Ast connection is returned.
pub fn create_connection_from_endpoints(
&self,
connection: ViewConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ final class ContextEventsListener(
case Api.ExpressionUpdate.Payload.Value() =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Value

case Api.ExpressionUpdate.Payload.Pending(m, p) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Pending(m, p)

case Api.ExpressionUpdate.Payload.DataflowError(trace) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.DataflowError(trace)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ object ContextRegistryProtocol {
/** An information about computed expression. */
case object Value extends Payload

case class Pending(message: Option[String], progress: Option[Double]) extends Payload;

/** Indicates that the expression was computed to an error.
*
* @param trace the list of expressions leading to the root error.
Expand All @@ -186,6 +188,8 @@ object ContextRegistryProtocol {

val Value = "Value"

val Pending = "Pending"

val DataflowError = "DataflowError"

val Panic = "Panic"
Expand All @@ -210,6 +214,13 @@ object ContextRegistryProtocol {
.deepMerge(
Json.obj(CodecField.Type -> PayloadType.Panic.asJson)
)

case m: Payload.Pending =>
Encoder[Payload.Pending]
.apply(m)
.deepMerge(
Json.obj(CodecField.Type -> PayloadType.Pending.asJson)
)
}

implicit val decoder: Decoder[Payload] =
Expand All @@ -223,6 +234,9 @@ object ContextRegistryProtocol {

case PayloadType.Panic =>
Decoder[Payload.Panic].tryDecode(cursor)

case PayloadType.Pending =>
Decoder[Payload.Pending].tryDecode(cursor)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,10 @@ object Runtime {
new JsonSubTypes.Type(
value = classOf[Payload.Panic],
name = "expressionUpdatePayloadPanic"
),
new JsonSubTypes.Type(
value = classOf[Payload.Pending],
name = "expressionUpdatePayloadPending"
)
)
)
Expand All @@ -378,6 +382,11 @@ object Runtime {
*/
case class Value() extends Payload

/** TBD
*/
case class Pending(message: Option[String], progress: Option[Double])
extends Payload;

/** Indicates that the expression was computed to an error.
*
* @param trace the list of expressions leading to the root error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ object CacheInvalidation {
*/
def runAll(
stack: Iterable[InstrumentFrame],
instructions: Iterable[CacheInvalidation]
instructions: Iterable[CacheInvalidation],
invalidatedKeys: java.util.Set[UUID]
): Unit =
instructions.foreach(run(stack, _))
instructions.foreach(run(stack, _, invalidatedKeys))

/** Run a sequence of invalidation instructions on all visualisations.
*
Expand Down Expand Up @@ -157,13 +158,14 @@ object CacheInvalidation {
*/
def run(
stack: Iterable[InstrumentFrame],
instruction: CacheInvalidation
instruction: CacheInvalidation,
invalidatedKeys: java.util.Set[UUID]
): Unit = {
val frames = instruction.elements match {
case StackSelector.All => stack
case StackSelector.Top => stack.headOption.toSeq
}
run(frames, instruction.command, instruction.indexes)
run(frames, instruction.command, instruction.indexes, invalidatedKeys)
}

/** Run cache invalidation of a multiple instrument frames.
Expand All @@ -175,9 +177,12 @@ object CacheInvalidation {
private def run(
frames: Iterable[InstrumentFrame],
command: Command,
indexes: Set[IndexSelector]
indexes: Set[IndexSelector],
invalidatedKeys: java.util.Set[UUID]
): Unit = {
frames.foreach(frame => run(frame.cache, frame.syncState, command, indexes))
frames.foreach(frame =>
run(frame.cache, frame.syncState, command, indexes, invalidatedKeys)
)
}

/** Run cache invalidation of a single instrument frame.
Expand Down Expand Up @@ -221,21 +226,25 @@ object CacheInvalidation {
cache: RuntimeCache,
syncState: UpdatesSynchronizationState,
command: Command,
indexes: Set[IndexSelector]
indexes: Set[IndexSelector],
invalidatedKeys: java.util.Set[UUID]
): Unit =
command match {
case Command.InvalidateAll =>
invalidatedKeys.addAll(cache.getKeys)
cache.clear()
indexes.foreach(clearIndex(_, cache))
case Command.InvalidateKeys(keys) =>
keys.foreach { key =>
cache.remove(key)
invalidatedKeys.add(key)
indexes.foreach(clearIndexKey(key, _, cache))
}
case Command.InvalidateStale(scope) =>
val staleKeys = cache.getKeys.asScala.diff(scope.toSet)
staleKeys.foreach { key =>
cache.remove(key)
invalidatedKeys.add(key)
indexes.foreach(clearIndexKey(key, _, cache))
syncState.invalidate(key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ class RecomputeContextCmd(
val cacheInvalidationCommands = request.expressions.toSeq
.map(CacheInvalidation.Command(_))
.map(CacheInvalidation(CacheInvalidation.StackSelector.Top, _))
CacheInvalidation.runAll(stack, cacheInvalidationCommands)
CacheInvalidation.runAll(
stack,
cacheInvalidationCommands,
new java.util.HashSet[java.util.UUID]()
)
reply(Api.RecomputeContextResponse(request.contextId))
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import java.util.logging.Level

import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import java.util.UUID

/** A job that ensures that specified files are compiled.
*
Expand All @@ -45,7 +46,7 @@ final class EnsureCompiledJob(protected val files: Iterable[File])

try {
val compilationResult = ensureCompiledFiles(files)
setCacheWeights()
setCacheWeights(new java.util.HashSet[java.util.UUID]())
compilationResult
} finally {
ctx.locking.releaseWriteCompilationLock()
Expand Down Expand Up @@ -305,17 +306,52 @@ final class EnsureCompiledJob(protected val files: Iterable[File])
changeset,
module.getSource.getCharacters
)
val invalidatedKeys = new java.util.HashSet[java.util.UUID]();
ctx.contextManager.getAllContexts.values
.foreach { stack =>
if (stack.nonEmpty && isStackInModule(module.getName, stack)) {
CacheInvalidation.runAll(stack, invalidationCommands)
CacheInvalidation.runAll(stack, invalidationCommands, invalidatedKeys)
}
}
CacheInvalidation.runAllVisualisations(
ctx.contextManager.getVisualisations(module.getName),
invalidationCommands
)

if (!invalidatedKeys.isEmpty()) {
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
System.err.println("Invalidated: " + invalidatedKeys)
val invalidatedKeysScala = invalidatedKeys.asScala.toSet
ctx.contextManager.getAllContexts.foreachEntry((contextId, stack) => {
val knownKeys = stack.top.cache.getWeights.entrySet
val cachedKeys = stack.top.cache.getKeys
val pendingKeys = new java.util.HashSet[UUID]()
knownKeys.forEach(e => {
if (e.getValue > 0) {
if (!cachedKeys.contains(e.getKey)) {
pendingKeys.add(e.getKey)
}
}
});
val ids = invalidatedKeysScala.map { key =>
// pendingKeys.asScala.toSet.map { key =>
Api.ExpressionUpdate(
key,
None,
None,
Vector.empty,
true,
Api.ExpressionUpdate.Payload.Pending(None, None)
)
}

System.err.println(" ignore pendingKeys: " + pendingKeys)
val msg = Api.Response(
Api.ExpressionUpdates(contextId, ids)
)
ctx.endpoint.sendToClient(msg)
})
}

val invalidatedVisualisations =
ctx.contextManager.getInvalidatedVisualisations(
module.getName,
Expand Down Expand Up @@ -371,15 +407,18 @@ final class EnsureCompiledJob(protected val files: Iterable[File])
else
CompilationStatus.Success

private def setCacheWeights()(implicit ctx: RuntimeContext): Unit = {
private def setCacheWeights(
invalidatedKeys: java.util.Set[java.util.UUID]
)(implicit ctx: RuntimeContext): Unit = {
ctx.contextManager.getAllContexts.values.foreach { stack =>
getCacheMetadata(stack).foreach { metadata =>
CacheInvalidation.run(
stack,
CacheInvalidation(
CacheInvalidation.StackSelector.Top,
CacheInvalidation.Command.SetMetadata(metadata)
)
),
invalidatedKeys
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ object ProgramExecutionSupport {
case _ =>
Api.ExpressionUpdate.Payload.Value()
}
System.out.println(
"Expression update: " + value.getExpressionId + " and " + value.getValue
)
ctx.endpoint.sendToClient(
Api.Response(
Api.ExpressionUpdates(
Expand Down