Skip to content

Commit

Permalink
Fix the output when canceling the build and allow fast cancelations
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Nov 10, 2020
1 parent 98c91af commit 3a38265
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.jboss.fuse.mvnd.common.DaemonException.StaleAddressException;
import org.jboss.fuse.mvnd.common.DaemonInfo;
import org.jboss.fuse.mvnd.common.Message;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,6 +85,14 @@ public void dispatch(Message message) throws DaemonException.ConnectException {
}
throw new DaemonException.ConnectException("Could not dispatch a message to the daemon.", e);
}
// in case we dispatch a cancelation request, also forward it to the main thread to exit asap
try {
if (message == Message.SimpleMessage.CANCEL_BUILD_SINGLETON) {
queue.put(message);
}
} catch (InterruptedException e) {
throw new DaemonException.InterruptedException(e);
}
}

public List<Message> receive() throws ConnectException, StaleAddressException {
Expand Down Expand Up @@ -130,10 +137,6 @@ protected void doReceive() {
if (m == null) {
break;
}
if (m.getType() == Message.BUILD_STARTED) {
final BuildStarted bs = (BuildStarted) m;
m = bs.withDaemonDispatch(this::dispatch);
}
queue.put(m);
}
} catch (Exception e) {
Expand Down
30 changes: 14 additions & 16 deletions client/src/main/java/org/jboss/fuse/mvnd/client/DefaultClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.jboss.fuse.mvnd.common.OsUtils;
import org.jboss.fuse.mvnd.common.logging.ClientOutput;
import org.jboss.fuse.mvnd.common.logging.TerminalOutput;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -64,7 +66,9 @@ public static void main(String[] argv) throws Exception {
try {
new DefaultClient(new DaemonParameters()).execute(output, args);
} catch (DaemonException.InterruptedException e) {
output.accept(Message.log(System.lineSeparator() + "The build was canceled"));
final AttributedStyle s = new AttributedStyle().bold().foreground(AttributedStyle.RED);
String str = new AttributedString(System.lineSeparator() + "Canceled by user", s).toAnsi();
output.accept(Message.display(str));
}
}
}
Expand Down Expand Up @@ -160,7 +164,7 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {
if (stop) {
DaemonInfo[] dis = registry.getAll().toArray(new DaemonInfo[0]);
if (dis.length > 0) {
output.accept(Message.log("Stopping " + dis.length + " running daemons"));
output.accept(Message.display("Stopping " + dis.length + " running daemons"));
for (DaemonInfo di : dis) {
try {
ProcessHandle.of(di.getPid()).ifPresent(ProcessHandle::destroyForcibly);
Expand Down Expand Up @@ -194,36 +198,30 @@ public ExecutionResult execute(ClientOutput output, List<String> argv) {

final DaemonConnector connector = new DaemonConnector(parameters, registry);
try (DaemonClientConnection daemon = connector.connect(output)) {
output.setSink(daemon::dispatch);
output.accept(Message.buildStatus("Connected to daemon"));

daemon.dispatch(new Message.BuildRequest(
args,
parameters.userDir().toString(),
parameters.multiModuleProjectDirectory().toString(),
System.getenv()));

output.accept(Message.buildStatus("Build request sent"));

while (true) {
final List<Message> messages = daemon.receive();
for (int i = 0; i < messages.size(); i++) {
Message m = messages.get(i);
output.accept(messages);
for (Message m : messages) {
switch (m.getType()) {
case Message.BUILD_EXCEPTION: {
output.accept(messages.subList(0, i + 1));
case Message.CANCEL_BUILD:
return new DefaultResult(argv,
new InterruptedException("The build was canceled"));
case Message.BUILD_EXCEPTION:
final BuildException e = (BuildException) m;
return new DefaultResult(argv,
new Exception(e.getClassName() + ": " + e.getMessage() + "\n" + e.getStackTrace()));
}
case Message.BUILD_STOPPED: {
output.accept(messages.subList(0, i));
case Message.BUILD_STOPPED:
return new DefaultResult(argv, null);
}
default:
break;
}
}
output.accept(messages);
}
}
}
Expand Down
24 changes: 6 additions & 18 deletions common/src/main/java/org/jboss/fuse/mvnd/common/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public abstract class Message {
public static final int BUILD_REQUEST = 0;
Expand Down Expand Up @@ -417,7 +416,6 @@ public static class BuildStarted extends Message {
final String projectId;
final int projectCount;
final int maxThreads;
final Consumer<Message> daemonDispatch;

public static BuildStarted read(DataInputStream input) throws IOException {
final String projectId = readUTF(input);
Expand All @@ -427,15 +425,10 @@ public static BuildStarted read(DataInputStream input) throws IOException {
}

public BuildStarted(String projectId, int projectCount, int maxThreads) {
this(projectId, projectCount, maxThreads, null);
}

public BuildStarted(String projectId, int projectCount, int maxThreads, Consumer<Message> daemonDispatch) {
super(BUILD_STARTED);
this.projectId = projectId;
this.projectCount = projectCount;
this.maxThreads = maxThreads;
this.daemonDispatch = daemonDispatch;
}

public String getProjectId() {
Expand Down Expand Up @@ -465,13 +458,6 @@ public void write(DataOutputStream output) throws IOException {
output.writeInt(maxThreads);
}

public BuildStarted withDaemonDispatch(Consumer<Message> daemonDispatch) {
return new BuildStarted(projectId, projectCount, maxThreads, daemonDispatch);
}

public Consumer<Message> getDaemonDispatch() {
return daemonDispatch;
}
}

public static class BuildMessage extends Message {
Expand Down Expand Up @@ -529,6 +515,8 @@ public String toString() {
return "BuildStopped";
case STOP:
return "Stop";
case CANCEL_BUILD:
return "BuildCanceled";
default:
throw new IllegalStateException("Unexpected type " + type);
}
Expand Down Expand Up @@ -741,12 +729,12 @@ public static StringMessage buildStatus(String payload) {
return new StringMessage(BUILD_STATUS, payload);
}

public static BuildMessage log(String message) {
return new BuildMessage(null, message);
public static Display display(String message) {
return new Display(null, message);
}

public static BuildMessage log(String projectId, String message) {
return new BuildMessage(projectId, message);
public static BuildMessage log(String message) {
return new BuildMessage(null, message);
}

public static StringMessage keyboardInput(char keyStroke) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
package org.jboss.fuse.mvnd.common.logging;

import java.util.List;
import java.util.function.Consumer;
import org.jboss.fuse.mvnd.common.Message;

/**
* A sink for various kinds of events sent by the daemon.
*/
public interface ClientOutput extends AutoCloseable {

void setSink(Consumer<Message> sink);

void accept(Message message);

void accept(List<Message> messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.jboss.fuse.mvnd.common.Message.BuildException;
import org.jboss.fuse.mvnd.common.Message.BuildMessage;
import org.jboss.fuse.mvnd.common.Message.BuildStarted;
import org.jboss.fuse.mvnd.common.Message.SimpleMessage;
import org.jboss.fuse.mvnd.common.Message.StringMessage;
import org.jline.terminal.Size;
import org.jline.terminal.Terminal;
Expand Down Expand Up @@ -70,8 +69,8 @@ public class TerminalOutput implements ClientOutput {
private final CountDownLatch closed = new CountDownLatch(1);
private final long start;
private final ReadWriteLock readInput = new ReentrantReadWriteLock();
private final DaemonDispatch dispatch = new DaemonDispatch();

private volatile Consumer<Message> sink;
private volatile String name;
private volatile int totalProjects;
private volatile int maxThreads;
Expand Down Expand Up @@ -99,18 +98,26 @@ public TerminalOutput(Path logFile) throws IOException {
this.start = System.currentTimeMillis();
this.terminal = TerminalBuilder.terminal();
terminal.enterRawMode();
Thread mainThread = Thread.currentThread();
sink = m -> {
if (m == Message.CANCEL_BUILD_SINGLETON) {
mainThread.interrupt();
}
};
this.previousIntHandler = terminal.handle(Terminal.Signal.INT,
sig -> {
accept(Message.buildStatus("Cancelling..."));
dispatch.dispatch(Message.CANCEL_BUILD_SINGLETON);
});
sig -> sink.accept(Message.CANCEL_BUILD_SINGLETON));
this.display = new Display(terminal, false);
this.log = logFile == null ? new MessageCollector() : new FileLog(logFile);
final Thread r = new Thread(this::readInputLoop);
r.start();
this.reader = r;
}

@Override
public void setSink(Consumer<Message> sink) {
this.sink = sink;
}

@Override
public void accept(Message entry) {
assert "main".equals(Thread.currentThread().getName());
Expand All @@ -122,13 +129,12 @@ public void accept(Message entry) {
@Override
public void accept(List<Message> entries) {
assert "main".equals(Thread.currentThread().getName());
boolean update = true;
for (Message entry : entries) {
update &= doAccept(entry);
}
if (update) {
update();
if (!doAccept(entry)) {
return;
}
}
update();
}

private boolean doAccept(Message entry) {
Expand All @@ -138,9 +144,21 @@ private boolean doAccept(Message entry) {
this.name = bs.getProjectId();
this.totalProjects = bs.getProjectCount();
this.maxThreads = bs.getMaxThreads();
this.dispatch.setSink(bs.getDaemonDispatch());
break;
}
case Message.CANCEL_BUILD: {
projects.values().stream().flatMap(p -> p.log.stream()).forEach(log);
clearDisplay();
try {
log.close();
} catch (IOException e1) {
throw new RuntimeException(e1);
}
final AttributedStyle s = new AttributedStyle().bold().foreground(AttributedStyle.RED);
new AttributedString("The build was canceled", s).println(terminal);
terminal.flush();
return false;
}
case Message.BUILD_EXCEPTION: {
final BuildException e = (BuildException) entry;
final String msg;
Expand Down Expand Up @@ -200,7 +218,11 @@ private boolean doAccept(Message entry) {
case Message.DISPLAY: {
Message.Display d = (Message.Display) entry;
display.update(Collections.emptyList(), 0);
terminal.writer().printf("[%s] %s%n", d.getProjectId(), d.getMessage());
if (d.getProjectId() != null) {
terminal.writer().printf("[%s] %s%n", d.getProjectId(), d.getMessage());
} else {
terminal.writer().printf("%s%n", d.getMessage());
}
break;
}
case Message.PROMPT: {
Expand All @@ -217,7 +239,7 @@ private boolean doAccept(Message entry) {
break;
} else if (c == '\n' || c == '\r') {
terminal.writer().println();
dispatch.dispatch(prompt.response(sb.toString()));
sink.accept(prompt.response(sb.toString()));
break;
} else if (c == 127) {
if (sb.length() > 0) {
Expand All @@ -240,20 +262,11 @@ private boolean doAccept(Message entry) {
}
case Message.BUILD_MESSAGE: {
BuildMessage bm = (BuildMessage) entry;
if (closing) {
try {
closed.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.err.println(bm.getMessage());
if (bm.getProjectId() != null) {
Project prj = projects.computeIfAbsent(bm.getProjectId(), Project::new);
prj.log.add(bm.getMessage());
} else {
if (bm.getProjectId() != null) {
Project prj = projects.computeIfAbsent(bm.getProjectId(), Project::new);
prj.log.add(bm.getMessage());
} else {
log.accept(bm.getMessage());
}
log.accept(bm.getMessage());
}
break;
}
Expand Down Expand Up @@ -334,7 +347,7 @@ private void displayDone() {
public void close() throws Exception {
closing = true;
reader.interrupt();
accept(SimpleMessage.BUILD_STOPPED_SINGLETON);
log.close();
reader.join();
terminal.handle(Terminal.Signal.INT, previousIntHandler);
terminal.close();
Expand Down Expand Up @@ -522,42 +535,6 @@ public void close() throws IOException {

}

public static class DaemonDispatch {
private final Object lock = new Object();
private List<Message> queue;
private Consumer<Message> sink;

public void dispatch(Message m) {
synchronized (lock) {
if (sink != null) {
if (queue != null) {
for (Message msg : queue) {
sink.accept(msg);
}
}
sink.accept(m);
} else {
if (queue == null) {
queue = new ArrayList<Message>();
}
queue.add(m);
}
}
}

public void setSink(Consumer<Message> sink) {
synchronized (lock) {
this.sink = sink;
if (queue != null) {
for (Message msg : queue) {
sink.accept(msg);
}
queue = null;
}
}
}
}

/**
* A {@link ClientLog} that first collects all incoming messages in a {@link List} and outputs them to a JLine
* {@link Terminal} upon {@link #close()}.
Expand Down
Loading

0 comments on commit 3a38265

Please sign in to comment.