Skip to content

Commit

Permalink
+ reusing same bytes buffer instance for out stream flush
Browse files Browse the repository at this point in the history
+ added delay while stopping output thread
  • Loading branch information
q3769 committed May 3, 2023
1 parent 5a7153c commit 6986409
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

<groupId>io.github.elf4j</groupId>
<artifactId>elf4j-engine</artifactId>
<version>7.0.1</version>
<version>7.0.2</version>
<packaging>jar</packaging>
<name>elf4j-engine</name>
<description>A stand-alone Java log engine implementing the ELF4J (Easy Logging Facade for Java) API</description>
Expand Down Expand Up @@ -67,7 +67,7 @@
<dependency>
<groupId>io.github.elf4j</groupId>
<artifactId>elf4j</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.dslplatform</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,47 @@

package elf4j.engine.service.util;

import elf4j.Level;
import elf4j.util.InternalLogger;
import lombok.NonNull;
import org.awaitility.Awaitility;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
*/
public class MoreAwaitility {
private static final ScheduledExecutorService delayer =
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, r -> {
Thread daemonThread = new Thread(r, "more-awaitility-util-thread" + UUID.randomUUID());
daemonThread.setDaemon(true);
return daemonThread;
});
private MoreAwaitility() {
}

public static void block(@NonNull Duration duration) {
block(duration, null);
/**
* @param duration
* to suspend the current thread for
*/
public static void suspend(@NonNull Duration duration) {
suspend(duration, null);
}

public static void block(@NonNull Duration duration, String message) {
/**
* @param duration
* to suspend the current thread for
* @param message
* intended for internal logging which goes out to stderr
*/
public static void suspend(@NonNull Duration duration, String message) {
if (message != null) {
System.out.println(message + " - blocking for " + duration);
InternalLogger.INSTANCE.log(Level.INFO,
message + " - suspending current thread " + Thread.currentThread() + " for " + duration);
}
AtomicBoolean resume = new AtomicBoolean(false);
ScheduledExecutorService delayer = Executors.newSingleThreadScheduledExecutor();
delayer.schedule(() -> resume.set(true), duration.toMillis(), TimeUnit.MILLISECONDS);
Awaitility.await().untilTrue(resume);
delayer.shutdown();
Awaitility.with().pollInterval(duration.dividedBy(10)).await().untilTrue(resume);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@

import elf4j.Level;
import elf4j.engine.service.LogServiceManager;
import elf4j.engine.service.util.MoreAwaitility;
import elf4j.util.InternalLogger;
import lombok.ToString;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -50,6 +52,7 @@ public class BufferedStandardOutput implements StandardOutput {
private static final int DEFAULT_BACK_BUFFER_CAPACITY = 256;
private final OutStreamType outStreamType;
private final BlockingQueue<byte[]> buffer;
private final PollingBytesWriter pollingBytesWriter;
private boolean stopped;

/**
Expand All @@ -62,15 +65,19 @@ public BufferedStandardOutput(String stream, Integer bufferCapacity) {
this.outStreamType = stream == null ? OutStreamType.STDOUT : OutStreamType.valueOf(stream.toUpperCase());
bufferCapacity = bufferCapacity == null ? DEFAULT_BACK_BUFFER_CAPACITY : bufferCapacity;
InternalLogger.INSTANCE.log(Level.INFO, "Standard stream buffer capacity: " + bufferCapacity);
this.buffer = bufferCapacity == 0 ? new SynchronousQueue<>() : new ArrayBlockingQueue<>(bufferCapacity);
this.buffer = new ArrayBlockingQueue<>(bufferCapacity == 0 ? 1 : bufferCapacity);
this.stopped = false;
new Thread(new PollingBytesWriter()).start();
pollingBytesWriter = new PollingBytesWriter();
new Thread(pollingBytesWriter).start();
LogServiceManager.INSTANCE.registerStop(this);
}

@Override
public void stop() {
Awaitility.with().timeout(30, TimeUnit.MINUTES).await().until(this.buffer::isEmpty);
MoreAwaitility.suspend(Duration.ofMillis(100));
ConditionFactory await = Awaitility.with().timeout(30, TimeUnit.MINUTES).await();
await.until(this.buffer::isEmpty);
await.until(this.pollingBytesWriter::isBufferEmpty);
this.stopped = true;
}

Expand All @@ -92,13 +99,15 @@ enum OutStreamType {
}

private class PollingBytesWriter implements Runnable {
private final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

@Override
public void run() {
while (!stopped) {
List<byte[]> poll = new LinkedList<>();
buffer.drainTo(poll);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(poll.size() * 2048);
poll.forEach(bytes -> {
List<byte[]> pollBatch = new LinkedList<>();
buffer.drainTo(pollBatch);
byteArrayOutputStream.reset();
pollBatch.forEach(bytes -> {
try {
byteArrayOutputStream.write(bytes);
} catch (IOException e) {
Expand All @@ -113,5 +122,9 @@ public void run() {
}
}
}

public boolean isBufferEmpty() {
return this.byteArrayOutputStream.size() == 0;
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/elf4j/engine/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
class IntegrationTest {
@AfterEach
void afterEach() {
MoreAwaitility.block(Duration.ofMillis(500));
MoreAwaitility.suspend(Duration.ofMillis(500));
}

@Nested
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/elf4j/engine/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) {
.atDebug()
.log("Not a practical example but now the severity level is DEBUG");

MoreAwaitility.block(Duration.ofMillis(200), "Making sure console streams show up");
MoreAwaitility.suspend(Duration.ofMillis(200), "Making sure console streams show up");
LogServiceManager.INSTANCE.stopAll();
}
}
2 changes: 1 addition & 1 deletion src/test/java/elf4j/engine/SampleUsageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SampleUsageTest {

@AfterEach
void afterEach() {
MoreAwaitility.block(Duration.ofMillis(500));
MoreAwaitility.suspend(Duration.ofMillis(500));
}

@Nested
Expand Down

0 comments on commit 6986409

Please sign in to comment.