Skip to content

Commit

Permalink
Do not report exceptions on long running Excel reads (#11916)
Browse files Browse the repository at this point in the history
* Do not report exceptions on long running Excel reads

This change introduces two modifications:
- `ClosedByInterruptException` is wrapped in `InterruptedException`
  instead of `RuntimeException`
- when instrumentation encounters `InterruptedException` it bails early

Having `ClosedByInterruptException` wrapped in `RuntimeException` meant
that it is being reported as a regular `HostException` in the engine and
to the user. Instead it should be treated specially since we know that
it is caused by cancelling a long-running job. Since it is a statically
checked exception it has to be declared and the information has to be
propagated through various lambda constructs (thanks Java!).

The above change alone meant that an error is not reported for
`Data.read` nodes but any values dependent on it would still report
`No_Such_Method` error when the exception is reported as a value. Hence
the early bail out mechanism.

* Send `PendingInterrupted` on interrupt

The information could be used in GUI to indicate pending execution that
will take tad longer.

* Prettify

* Test `PendingInterrupted` payload

* Add `wasInterrupted` flag to `Pending`

Reduce `PendingInterrupted` to a flag in `Pending`

* fmt
  • Loading branch information
hubertp authored Dec 20, 2024
1 parent e5a1c5a commit d87484b
Show file tree
Hide file tree
Showing 18 changed files with 299 additions and 56 deletions.
4 changes: 3 additions & 1 deletion docs/language-server/protocol-language-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ interface ExpressionUpdate {
An information about the computed value.

```typescript
type ExpressionUpdatePayload = Value | DatafalowError | Panic | Pending;
type ExpressionUpdatePayload = Value | DataflowError | Panic | Pending;

/** Indicates that the expression was computed to a value. */
interface Value {
Expand Down Expand Up @@ -424,6 +424,8 @@ interface Pending {
/** Optional amount of already done work as a number between `0.0` to `1.0`.
*/
progress?: number;
/** Indicates whether the computation of the expression has been interrupted and will be retried. */
wasInterrupted: boolean;
}

/** Information about warnings associated with the value. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ final class ContextEventsListener(
functionSchema.map(toProtocolFunctionSchema)
)

case Api.ExpressionUpdate.Payload.Pending(m, p) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Pending(m, p)
case Api.ExpressionUpdate.Payload.Pending(m, p, i) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Pending(m, p, i)

case Api.ExpressionUpdate.Payload.DataflowError(trace) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.DataflowError(trace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,17 @@ object ContextRegistryProtocol {
)
}

case class Pending(message: Option[String], progress: Option[Double])
extends Payload
/** Indicates that an expression is pending a computation
*/
case class Pending(
message: Option[String],
progress: Option[Double],
wasInterrupted: Boolean
) extends Payload

/** Indicates that an expression's computation has been interrupted and shall be retried.
*/
case object PendingInterrupted extends Payload

/** Indicates that the expression was computed to an error.
*
Expand All @@ -258,6 +267,8 @@ object ContextRegistryProtocol {

val Pending = "Pending"

val PendingInterrupted = "PendingInterrupted"

val DataflowError = "DataflowError"

val Panic = "Panic"
Expand Down Expand Up @@ -291,6 +302,14 @@ object ContextRegistryProtocol {
.deepMerge(
Json.obj(CodecField.Type -> PayloadType.Pending.asJson)
)
case m: Payload.PendingInterrupted.type =>
Encoder[Payload.PendingInterrupted.type]
.apply(m)
.deepMerge(
Json.obj(
CodecField.Type -> PayloadType.PendingInterrupted.asJson
)
)
}

implicit val decoder: Decoder[Payload] =
Expand All @@ -307,6 +326,9 @@ object ContextRegistryProtocol {

case PayloadType.Pending =>
Decoder[Payload.Pending].tryDecode(cursor)

case PayloadType.PendingInterrupted =>
Decoder[Payload.PendingInterrupted.type].tryDecode(cursor)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ object Runtime {
)
}

/** TBD
/** Indicates that an expression is pending a computation
*/
@named("expressionUpdatePayloadPending")
case class Pending(message: Option[String], progress: Option[Double])
extends Payload;
case class Pending(
message: Option[String],
progress: Option[Double],
wasInterrupted: Boolean = false
) extends Payload

/** Indicates that the expression was computed to an error.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ object ProgramExecutionSupport {
val onComputedValueCallback: Consumer[ExpressionValue] = { value =>
if (callStack.isEmpty) {
logger.log(Level.FINEST, s"ON_COMPUTED ${value.getExpressionId}")

if (VisualizationResult.isInterruptedException(value.getValue)) {
value.getValue match {
case e: AbstractTruffleException =>
sendInterruptedExpressionUpdate(
contextId,
executionFrame.syncState,
value
)
// Bail out early. Any references to this value that do not expect
// Interrupted error will likely return `No_Such_Method` otherwise.
throw new ThreadInterruptedException(e);
case _ =>
}
}
sendExpressionUpdate(contextId, executionFrame.syncState, value)
sendVisualizationUpdates(
contextId,
Expand Down Expand Up @@ -377,6 +392,50 @@ object ProgramExecutionSupport {
Api.ExecutionResult.Failure(ex.getMessage, None)
}

private def sendInterruptedExpressionUpdate(
contextId: ContextId,
syncState: UpdatesSynchronizationState,
value: ExpressionValue
)(implicit ctx: RuntimeContext): Unit = {
val expressionId = value.getExpressionId
val methodCall = toMethodCall(value)
if (
!syncState.isExpressionSync(expressionId) ||
(methodCall.isDefined && !syncState.isMethodPointerSync(
expressionId
))
) {
val payload =
Api.ExpressionUpdate.Payload.Pending(None, None, wasInterrupted = true)
ctx.endpoint.sendToClient(
Api.Response(
Api.ExpressionUpdates(
contextId,
Set(
Api.ExpressionUpdate(
value.getExpressionId,
Option(value.getTypes).map(_.toVector),
methodCall,
value.getProfilingInfo.map { case e: ExecutionTime =>
Api.ProfilingInfo.ExecutionTime(e.getNanoTimeElapsed)
}.toVector,
value.wasCached(),
value.isTypeChanged || value.isFunctionCallChanged,
payload
)
)
)
)
)

syncState.setExpressionSync(expressionId)
ctx.state.expressionExecutionState.setExpressionExecuted(expressionId)
if (methodCall.isDefined) {
syncState.setMethodPointerSync(expressionId)
}
}
}

private def sendExpressionUpdate(
contextId: ContextId,
syncState: UpdatesSynchronizationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,15 @@ class RuntimeAsyncCommandsTest

responses should contain theSameElementsAs Seq(
Api.Response(requestId, Api.RecomputeContextResponse(contextId)),
TestMessages.update(
TestMessages.pendingInterrupted(
contextId,
vId,
ConstantsGen.INTEGER,
methodCall = Some(
MethodCall(
MethodPointer("Enso_Test.Test.Main", "Enso_Test.Test.Main", "loop"),
Vector(1)
)
)
),
vId
),
context.executionComplete(contextId)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,33 @@ object TestMessages {
)
)

/** Create an pending interrupted response.
*
* @param contextId an identifier of the context
* @param expressionIds a list of pending expressions
* @return the expression update response
*/
def pendingInterrupted(
contextId: UUID,
methodCall: Option[Api.MethodCall],
expressionIds: UUID*
): Api.Response =
Api.Response(
Api.ExpressionUpdates(
contextId,
expressionIds.toSet.map { expressionId =>
Api.ExpressionUpdate(
expressionId,
None,
methodCall,
Vector(Api.ProfilingInfo.ExecutionTime(0)),
false,
true,
Api.ExpressionUpdate.Payload
.Pending(None, None, wasInterrupted = true)
)
}
)
)

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
package org.enso.interpreter.runtime.control;

/** Thrown when guest code discovers a thread interrupt. */
public class ThreadInterruptedException extends RuntimeException {}
public class ThreadInterruptedException extends RuntimeException {
public ThreadInterruptedException() {}

public ThreadInterruptedException(Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.enso.base.cache.ReloadDetector;
import org.enso.table.excel.xssfreader.XSSFReaderWorkbook;
import org.enso.table.util.FunctionWithException;

public class ExcelConnectionPool {
public static final ExcelConnectionPool INSTANCE = new ExcelConnectionPool();

private ExcelConnectionPool() {}

public ReadOnlyExcelConnection openReadOnlyConnection(File file, ExcelFileFormat format)
throws IOException {
throws IOException, InterruptedException {
synchronized (this) {
if (isCurrentlyWriting) {
throw new IllegalStateException(
Expand Down Expand Up @@ -134,7 +135,7 @@ public <R> R writeWorkbook(File file, Function<Workbook, R> writeAction) throws
*/
public <R> R lockForWriting(
File file, ExcelFileFormat format, File[] accompanyingFiles, Function<WriteHelper, R> action)
throws IOException {
throws IOException, InterruptedException {
synchronized (this) {
if (isCurrentlyWriting) {
throw new IllegalStateException(
Expand Down Expand Up @@ -242,7 +243,8 @@ static class ConnectionRecord {
private ExcelWorkbook workbook;
private IOException initializationException = null;

<T> T withWorkbook(Function<ExcelWorkbook, T> action) throws IOException {
<T> T withWorkbook(FunctionWithException<ExcelWorkbook, T, InterruptedException> action)
throws IOException, InterruptedException {
synchronized (this) {
return action.apply(accessCurrentWorkbook());
}
Expand All @@ -258,7 +260,7 @@ public void close() throws IOException {
}
}

void reopen(boolean throwOnFailure) throws IOException {
void reopen(boolean throwOnFailure) throws IOException, InterruptedException {
synchronized (this) {
if (workbook != null) {
throw new IllegalStateException("The workbook is already open.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public static ExcelRange forRows(String sheetName, int topRow, int bottomRow) {
* @param sheet ExcelSheet containing the range refers to.
* @return Expanded range covering the connected table of cells.
*/
public static ExcelRange expandSingleCell(ExcelRange excelRange, ExcelSheet sheet) {
public static ExcelRange expandSingleCell(ExcelRange excelRange, ExcelSheet sheet)
throws InterruptedException {
ExcelRow currentRow = sheet.get(excelRange.getTopRow());
if (currentRow == null || currentRow.isEmpty(excelRange.getLeftColumn())) {
return new ExcelRange(
Expand Down Expand Up @@ -337,7 +338,7 @@ public int getRowCount() {
return isWholeColumn() ? Integer.MAX_VALUE : bottomRow - topRow + 1;
}

public int getLastNonEmptyRow(ExcelSheet sheet) {
public int getLastNonEmptyRow(ExcelSheet sheet) throws InterruptedException {
int lastRow =
Math.min(sheet.getLastRow(), isWholeColumn() ? sheet.getLastRow() : bottomRow) + 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ public interface ExcelSheet {
String getName();

/** Gets the initial row index within the sheet (1-based). */
int getFirstRow();
int getFirstRow() throws InterruptedException;

/** Gets the final row index within the sheet (1-based). */
int getLastRow();
int getLastRow() throws InterruptedException;

/**
* Gets the row at the given index within the sheet (1-based)
*
* @param row the row index (1-based)/
* @return the row object or null if the row index is out of range or doesn't exist.
*/
ExcelRow get(int row);
ExcelRow get(int row) throws InterruptedException;

/** Gets the underlying Apache POI Sheet object - may be null. Provided for Writer use only. */
Sheet getSheet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.enso.table.excel;

import java.io.IOException;
import java.util.function.Function;
import org.enso.table.util.FunctionWithException;

public class ReadOnlyExcelConnection implements AutoCloseable {

Expand All @@ -27,7 +27,9 @@ public synchronized void close() throws IOException {
record = null;
}

public synchronized <T> T withWorkbook(Function<ExcelWorkbook, T> f) throws IOException {
public synchronized <T> T withWorkbook(
FunctionWithException<ExcelWorkbook, T, InterruptedException> f)
throws IOException, InterruptedException {
if (record == null) {
throw new IllegalStateException("ReadOnlyExcelConnection is being used after it was closed.");
}
Expand Down
Loading

0 comments on commit d87484b

Please sign in to comment.