Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not report exceptions on long running Excel reads #11916

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/language-server/protocol-language-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ interface ExpressionUpdate {
An information about the computed value.

```typescript
type ExpressionUpdatePayload = Value | DatafalowError | Panic | Pending;
type ExpressionUpdatePayload = Value | DataflowError | Panic | Pending;

/** Indicates that the expression was computed to a value. */
interface Value {
Expand Down Expand Up @@ -424,6 +424,8 @@ interface Pending {
/** Optional amount of already done work as a number between `0.0` to `1.0`.
*/
progress?: number;
/** Indicates whether the computation of the expression has been interrupted and will be retried. */
wasInterrupted: boolean;
}

/** Information about warnings associated with the value. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ final class ContextEventsListener(
functionSchema.map(toProtocolFunctionSchema)
)

case Api.ExpressionUpdate.Payload.Pending(m, p) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Pending(m, p)
case Api.ExpressionUpdate.Payload.Pending(m, p, i) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.Pending(m, p, i)

case Api.ExpressionUpdate.Payload.DataflowError(trace) =>
ContextRegistryProtocol.ExpressionUpdate.Payload.DataflowError(trace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,17 @@ object ContextRegistryProtocol {
)
}

case class Pending(message: Option[String], progress: Option[Double])
extends Payload
/** Indicates that an expression is pending a computation
*/
case class Pending(
message: Option[String],
progress: Option[Double],
wasInterrupted: Boolean
) extends Payload

/** Indicates that an expression's computation has been interrupted and shall be retried.
*/
case object PendingInterrupted extends Payload

/** Indicates that the expression was computed to an error.
*
Expand All @@ -258,6 +267,8 @@ object ContextRegistryProtocol {

val Pending = "Pending"

val PendingInterrupted = "PendingInterrupted"

val DataflowError = "DataflowError"

val Panic = "Panic"
Expand Down Expand Up @@ -291,6 +302,14 @@ object ContextRegistryProtocol {
.deepMerge(
Json.obj(CodecField.Type -> PayloadType.Pending.asJson)
)
case m: Payload.PendingInterrupted.type =>
Encoder[Payload.PendingInterrupted.type]
.apply(m)
.deepMerge(
Json.obj(
CodecField.Type -> PayloadType.PendingInterrupted.asJson
)
)
}

implicit val decoder: Decoder[Payload] =
Expand All @@ -307,6 +326,9 @@ object ContextRegistryProtocol {

case PayloadType.Pending =>
Decoder[Payload.Pending].tryDecode(cursor)

case PayloadType.PendingInterrupted =>
Decoder[Payload.PendingInterrupted.type].tryDecode(cursor)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ object Runtime {
)
}

/** TBD
/** Indicates that an expression is pending a computation
*/
@named("expressionUpdatePayloadPending")
case class Pending(message: Option[String], progress: Option[Double])
extends Payload;
case class Pending(
message: Option[String],
progress: Option[Double],
wasInterrupted: Boolean = false
) extends Payload

/** Indicates that the expression was computed to an error.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ object ProgramExecutionSupport {
val onComputedValueCallback: Consumer[ExpressionValue] = { value =>
if (callStack.isEmpty) {
logger.log(Level.FINEST, s"ON_COMPUTED ${value.getExpressionId}")

if (VisualizationResult.isInterruptedException(value.getValue)) {
value.getValue match {
case e: AbstractTruffleException =>
sendInterruptedExpressionUpdate(
contextId,
executionFrame.syncState,
value
)
// Bail out early. Any references to this value that do not expect
// Interrupted error will likely return `No_Such_Method` otherwise.
throw new ThreadInterruptedException(e);
case _ =>
}
}
sendExpressionUpdate(contextId, executionFrame.syncState, value)
sendVisualizationUpdates(
contextId,
Expand Down Expand Up @@ -377,6 +392,50 @@ object ProgramExecutionSupport {
Api.ExecutionResult.Failure(ex.getMessage, None)
}

private def sendInterruptedExpressionUpdate(
contextId: ContextId,
syncState: UpdatesSynchronizationState,
value: ExpressionValue
)(implicit ctx: RuntimeContext): Unit = {
val expressionId = value.getExpressionId
val methodCall = toMethodCall(value)
if (
!syncState.isExpressionSync(expressionId) ||
(methodCall.isDefined && !syncState.isMethodPointerSync(
expressionId
))
) {
val payload =
Api.ExpressionUpdate.Payload.Pending(None, None, wasInterrupted = true)
ctx.endpoint.sendToClient(
Api.Response(
Api.ExpressionUpdates(
contextId,
Set(
Api.ExpressionUpdate(
value.getExpressionId,
Option(value.getTypes).map(_.toVector),
methodCall,
value.getProfilingInfo.map { case e: ExecutionTime =>
Api.ProfilingInfo.ExecutionTime(e.getNanoTimeElapsed)
}.toVector,
value.wasCached(),
value.isTypeChanged || value.isFunctionCallChanged,
payload
)
)
)
)
)

syncState.setExpressionSync(expressionId)
ctx.state.expressionExecutionState.setExpressionExecuted(expressionId)
if (methodCall.isDefined) {
syncState.setMethodPointerSync(expressionId)
}
}
}

private def sendExpressionUpdate(
contextId: ContextId,
syncState: UpdatesSynchronizationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,16 +500,15 @@ class RuntimeAsyncCommandsTest

responses should contain theSameElementsAs Seq(
Api.Response(requestId, Api.RecomputeContextResponse(contextId)),
TestMessages.update(
TestMessages.pendingInterrupted(
contextId,
vId,
ConstantsGen.INTEGER,
methodCall = Some(
MethodCall(
MethodPointer("Enso_Test.Test.Main", "Enso_Test.Test.Main", "loop"),
Vector(1)
)
)
),
vId
),
context.executionComplete(contextId)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,33 @@ object TestMessages {
)
)

/** Create an pending interrupted response.
*
* @param contextId an identifier of the context
* @param expressionIds a list of pending expressions
* @return the expression update response
*/
def pendingInterrupted(
contextId: UUID,
methodCall: Option[Api.MethodCall],
expressionIds: UUID*
): Api.Response =
Api.Response(
Api.ExpressionUpdates(
contextId,
expressionIds.toSet.map { expressionId =>
Api.ExpressionUpdate(
expressionId,
None,
methodCall,
Vector(Api.ProfilingInfo.ExecutionTime(0)),
false,
true,
Api.ExpressionUpdate.Payload
.Pending(None, None, wasInterrupted = true)
)
}
)
)

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
package org.enso.interpreter.runtime.control;

/** Thrown when guest code discovers a thread interrupt. */
public class ThreadInterruptedException extends RuntimeException {}
public class ThreadInterruptedException extends RuntimeException {
public ThreadInterruptedException() {}

public ThreadInterruptedException(Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.enso.base.cache.ReloadDetector;
import org.enso.table.excel.xssfreader.XSSFReaderWorkbook;
import org.enso.table.util.FunctionWithException;

public class ExcelConnectionPool {
public static final ExcelConnectionPool INSTANCE = new ExcelConnectionPool();

private ExcelConnectionPool() {}

public ReadOnlyExcelConnection openReadOnlyConnection(File file, ExcelFileFormat format)
throws IOException {
throws IOException, InterruptedException {
synchronized (this) {
if (isCurrentlyWriting) {
throw new IllegalStateException(
Expand Down Expand Up @@ -134,7 +135,7 @@ public <R> R writeWorkbook(File file, Function<Workbook, R> writeAction) throws
*/
public <R> R lockForWriting(
File file, ExcelFileFormat format, File[] accompanyingFiles, Function<WriteHelper, R> action)
throws IOException {
throws IOException, InterruptedException {
synchronized (this) {
if (isCurrentlyWriting) {
throw new IllegalStateException(
Expand Down Expand Up @@ -242,7 +243,8 @@ static class ConnectionRecord {
private ExcelWorkbook workbook;
private IOException initializationException = null;

<T> T withWorkbook(Function<ExcelWorkbook, T> action) throws IOException {
<T> T withWorkbook(FunctionWithException<ExcelWorkbook, T, InterruptedException> action)
throws IOException, InterruptedException {
synchronized (this) {
return action.apply(accessCurrentWorkbook());
}
Expand All @@ -258,7 +260,7 @@ public void close() throws IOException {
}
}

void reopen(boolean throwOnFailure) throws IOException {
void reopen(boolean throwOnFailure) throws IOException, InterruptedException {
synchronized (this) {
if (workbook != null) {
throw new IllegalStateException("The workbook is already open.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public static ExcelRange forRows(String sheetName, int topRow, int bottomRow) {
* @param sheet ExcelSheet containing the range refers to.
* @return Expanded range covering the connected table of cells.
*/
public static ExcelRange expandSingleCell(ExcelRange excelRange, ExcelSheet sheet) {
public static ExcelRange expandSingleCell(ExcelRange excelRange, ExcelSheet sheet)
throws InterruptedException {
ExcelRow currentRow = sheet.get(excelRange.getTopRow());
if (currentRow == null || currentRow.isEmpty(excelRange.getLeftColumn())) {
return new ExcelRange(
Expand Down Expand Up @@ -337,7 +338,7 @@ public int getRowCount() {
return isWholeColumn() ? Integer.MAX_VALUE : bottomRow - topRow + 1;
}

public int getLastNonEmptyRow(ExcelSheet sheet) {
public int getLastNonEmptyRow(ExcelSheet sheet) throws InterruptedException {
int lastRow =
Math.min(sheet.getLastRow(), isWholeColumn() ? sheet.getLastRow() : bottomRow) + 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ public interface ExcelSheet {
String getName();

/** Gets the initial row index within the sheet (1-based). */
int getFirstRow();
int getFirstRow() throws InterruptedException;

/** Gets the final row index within the sheet (1-based). */
int getLastRow();
int getLastRow() throws InterruptedException;

/**
* Gets the row at the given index within the sheet (1-based)
*
* @param row the row index (1-based)/
* @return the row object or null if the row index is out of range or doesn't exist.
*/
ExcelRow get(int row);
ExcelRow get(int row) throws InterruptedException;

/** Gets the underlying Apache POI Sheet object - may be null. Provided for Writer use only. */
Sheet getSheet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.enso.table.excel;

import java.io.IOException;
import java.util.function.Function;
import org.enso.table.util.FunctionWithException;

public class ReadOnlyExcelConnection implements AutoCloseable {

Expand All @@ -27,7 +27,9 @@ public synchronized void close() throws IOException {
record = null;
}

public synchronized <T> T withWorkbook(Function<ExcelWorkbook, T> f) throws IOException {
public synchronized <T> T withWorkbook(
FunctionWithException<ExcelWorkbook, T, InterruptedException> f)
throws IOException, InterruptedException {
if (record == null) {
throw new IllegalStateException("ReadOnlyExcelConnection is being used after it was closed.");
}
Expand Down
Loading
Loading