Skip to content

Commit

Permalink
fix(webserver): ensure queues are not closed in nioEventLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jan 27, 2025
1 parent 9273d73 commit b518a7b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down Expand Up @@ -40,6 +41,7 @@ public Flux<Event<LogEntry>> streamExecutionLogs(final String tenantId,
final AtomicReference<Runnable> disposable = new AtomicReference<>();

return Flux.<Event<LogEntry>>create(emitter -> {

// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
emitter.next(Event.of(LogEntry.builder().build()).id("start"));

Expand All @@ -65,9 +67,11 @@ public Flux<Event<LogEntry>> streamExecutionLogs(final String tenantId,
}));
}, FluxSink.OverflowStrategy.BUFFER)
.doFinally(ignored -> {
if (disposable.get() != null) {
disposable.get().run();
}
Schedulers.boundedElastic().schedule(() -> {
if (disposable.get() != null) {
disposable.get().run();
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,9 +1518,11 @@ public Flux<Event<Execution>> follow(
cancel.set(receive);
}, FluxSink.OverflowStrategy.BUFFER)
.doFinally(ignored -> {
if (cancel.get() != null) {
cancel.get().run();
}
Schedulers.boundedElastic().schedule(() -> {
if (cancel.get() != null) {
cancel.get().run();
}
});
});
}

Expand Down

0 comments on commit b518a7b

Please sign in to comment.