diff --git a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java index 3eb446e39c04c..b7e144bbbdd4f 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/console/AeshConsole.java @@ -86,6 +86,7 @@ public void run() { } }, "Console Shutdown Hook")); prompt = registerStatusLine(0); + } private void updatePromptOnChange(StringBuilder buffer, int newLines) { @@ -222,6 +223,26 @@ public void run() { }); // Keyboard handling conn.setStdinHandler(keys -> { + + QuarkusConsole.StateChangeInputStream redirectIn = QuarkusConsole.REDIRECT_IN; + //see if the users application wants to read the keystrokes: + int pos = 0; + while (pos < keys.length) { + if (!redirectIn.acceptInput(keys[pos])) { + break; + } + ++pos; + } + if (pos > 0) { + if (pos == keys.length) { + return; + } + //the app only consumed some keys + //stick the rest in a new array + int[] newKeys = new int[keys.length - pos]; + System.arraycopy(keys, pos, newKeys, 0, newKeys.length); + keys = newKeys; + } try { if (delegateConnection != null) { //console mode diff --git a/core/deployment/src/main/java/io/quarkus/deployment/console/ConsoleHelper.java b/core/deployment/src/main/java/io/quarkus/deployment/console/ConsoleHelper.java index b9710dc97a47a..06c2e5746427b 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/console/ConsoleHelper.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/console/ConsoleHelper.java @@ -47,8 +47,11 @@ public void accept(Connection connection) { connection.setStdinHandler(new Consumer() { @Override public void accept(int[] ints) { + QuarkusConsole.StateChangeInputStream redirectIn = QuarkusConsole.REDIRECT_IN; for (int i : ints) { - queue.add(i); + if (redirectIn != null && !redirectIn.acceptInput(i)) { + queue.add(i); + } } } }); diff --git a/core/devmode-spi/src/main/java/io/quarkus/dev/console/QuarkusConsole.java b/core/devmode-spi/src/main/java/io/quarkus/dev/console/QuarkusConsole.java index bc6b3766e88e5..faca9f2dc0e8c 100644 --- a/core/devmode-spi/src/main/java/io/quarkus/dev/console/QuarkusConsole.java +++ b/core/devmode-spi/src/main/java/io/quarkus/dev/console/QuarkusConsole.java @@ -1,9 +1,13 @@ package io.quarkus.dev.console; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.PrintStream; import java.util.List; import java.util.Locale; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -52,9 +56,12 @@ public abstract class QuarkusConsole { public final static PrintStream ORIGINAL_OUT = System.out; public final static PrintStream ORIGINAL_ERR = System.err; + public final static InputStream ORIGINAL_IN = System.in; public static PrintStream REDIRECT_OUT = null; public static PrintStream REDIRECT_ERR = null; + public static StateChangeInputStream REDIRECT_IN; + protected volatile boolean userReadInProgress; public synchronized static void installRedirects() { if (redirectsInstalled) { @@ -67,8 +74,10 @@ public synchronized static void installRedirects() { QuarkusConsole.INSTANCE.isInputSupported(); REDIRECT_OUT = new RedirectPrintStream(false); REDIRECT_ERR = new RedirectPrintStream(true); + REDIRECT_IN = new StateChangeInputStream(); System.setOut(REDIRECT_OUT); System.setErr(REDIRECT_ERR); + System.setIn(REDIRECT_IN); } public synchronized static void uninstallRedirects() { @@ -86,8 +95,10 @@ public synchronized static void uninstallRedirects() { REDIRECT_ERR.close(); REDIRECT_ERR = null; } + REDIRECT_IN = null; System.setOut(ORIGINAL_OUT); System.setErr(ORIGINAL_ERR); + System.setIn(ORIGINAL_IN); redirectsInstalled = false; } @@ -176,4 +187,69 @@ public boolean isAnsiSupported() { return false; } + protected void userReadStart() { + + } + + protected void userReadStop() { + + } + + public static class StateChangeInputStream extends InputStream { + + private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(); + + private volatile boolean reading; + + public synchronized boolean acceptInput(int input) { + if (reading) { + queue.add(input); + notifyAll(); + return true; + } + return false; + } + + @Override + public synchronized int read() throws IOException { + reading = true; + try { + while (queue.isEmpty()) { + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + return queue.pollFirst(); + } finally { + reading = false; + } + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + reading = true; + int read = 0; + try { + while (read < len) { + while (queue.isEmpty()) { + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + byte byteValue = queue.poll().byteValue(); + b[read++] = byteValue; + if (byteValue == '\n' || byteValue == '\r') { + return read; + } + } + return read; + } finally { + reading = false; + } + } + } }