Skip to content

Commit

Permalink
+ added Stoppable.isStopped API
Browse files Browse the repository at this point in the history
+ made current impls of Stoppable.stop non-blocking
+ added LogServiceManager.getShutdownHookThread
  • Loading branch information
q3769 committed May 11, 2023
1 parent 3fd7da1 commit 860e939
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 64 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# elf4j-engine

[![Maven Central](https://img.shields.io/maven-central/v/io.github.elf4j/elf4j-engine.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22io.github.elf4j%22%20AND%20a:%22elf4j-engine%22)

# elf4j-engine

An asynchronous Java log engine.

Implementing the [ELF4J](https://github.com/elf4j/elf4j) (Easy Logging Facade for Java ) API, this is the log engine
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

<groupId>io.github.elf4j</groupId>
<artifactId>elf4j-engine</artifactId>
<version>8.0.7</version>
<version>8.1.0</version>
<packaging>jar</packaging>
<name>elf4j-engine</name>
<description>A stand-alone Java log engine implementing the ELF4J (Easy Logging Facade for Java) API</description>
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/elf4j/engine/service/ConseqLogEventProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import elf4j.engine.service.writer.LogWriter;
import elf4j.util.InternalLogger;
import lombok.NonNull;
import org.awaitility.Awaitility;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
* Log events are asynchronously processed, optionally by multiple concurrent threads. However, events issued by the
Expand Down Expand Up @@ -78,19 +76,16 @@ public static ConseqLogEventProcessor from(@NonNull LogServiceConfiguration logS
}

private static int getConcurrency(Properties properties) {
Integer concurrency = PropertiesUtils.getAsInteger("concurrency", properties);
concurrency = concurrency == null ? DEFAULT_CONCURRENCY : concurrency;
int concurrency = PropertiesUtils.getIntOrDefault("concurrency", properties, DEFAULT_CONCURRENCY);
if (concurrency < 1) {
throw new IllegalArgumentException("Unexpected concurrency: " + concurrency + ", cannot be less than 1");
}
return concurrency;
}

private static int getWorkQueueCapacity(Properties properties) {
Integer workQueueCapacity = PropertiesUtils.getAsInteger("buffer.front", properties);
if (workQueueCapacity == null) {
return DEFAULT_FRONT_BUFFER_CAPACITY;
}
int workQueueCapacity =
PropertiesUtils.getIntOrDefault("buffer.front", properties, DEFAULT_FRONT_BUFFER_CAPACITY);
if (workQueueCapacity < 1) {
throw new IllegalArgumentException(
"Unexpected buffer.front: " + workQueueCapacity + ", cannot be less than 1");
Expand All @@ -106,6 +101,10 @@ public void process(LogEvent logEvent) {
@Override
public void stop() {
this.conseqExecutor.shutdown();
Awaitility.with().timeout(30, TimeUnit.MINUTES).await().until(this.conseqExecutor::isTerminated);
}

@Override
public boolean isStopped() {
return this.conseqExecutor.isTerminated();
}
}
33 changes: 28 additions & 5 deletions src/main/java/elf4j/engine/service/LogServiceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
package elf4j.engine.service;

import elf4j.engine.service.configuration.Refreshable;
import lombok.NonNull;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;

import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;

/**
*
Expand All @@ -42,6 +45,11 @@ public enum LogServiceManager {

private final Set<Refreshable> refreshables = new HashSet<>();
private final Set<Stoppable> stoppables = new HashSet<>();
private final ConditionFactory await = Awaitility.with().timeout(Duration.ofMinutes(10));

private static boolean allStopped(@NonNull Collection<Stoppable> stoppables) {
return stoppables.stream().allMatch(Stoppable::isStopped);
}

/**
* @param refreshable
Expand Down Expand Up @@ -83,11 +91,26 @@ public void stopAll() {
stopBackend();
}

/**
* @return a thread that orderly stops the entire log service. As an alternative to calling the {@link #stopAll()},
* the returned thread can be registered as a JVM shutdown hook.
*/
@NonNull
public Thread getShutdownHookThread() {
return new Thread(this::stopAll);
}

private void stopBackend() {
stoppables.stream().filter(s -> !(s instanceof LogEventProcessor)).parallel().forEach(Stoppable::stop);
List<Stoppable> backend =
stoppables.stream().filter(s -> !(s instanceof LogEventProcessor)).collect(Collectors.toList());
backend.stream().parallel().forEach(Stoppable::stop);
this.await.until(() -> allStopped(backend));
}

private void stopFrontend() {
stoppables.stream().filter(LogEventProcessor.class::isInstance).parallel().forEach(Stoppable::stop);
List<Stoppable> frontend =
stoppables.stream().filter(LogEventProcessor.class::isInstance).collect(Collectors.toList());
frontend.stream().parallel().forEach(Stoppable::stop);
this.await.until(() -> allStopped(frontend));
}
}
5 changes: 5 additions & 0 deletions src/main/java/elf4j/engine/service/Stoppable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public interface Stoppable {
*
*/
void stop();

/**
* @return true if the instance has stopped
*/
boolean isStopped();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import elf4j.engine.service.ConseqLogEventProcessor;
import elf4j.engine.service.LogEventProcessor;
import elf4j.engine.service.LogServiceManager;
import elf4j.engine.service.util.PropertiesUtils;
import elf4j.engine.service.writer.BufferedStandardOutput;
import elf4j.engine.service.writer.CooperatingWriterGroup;
import elf4j.engine.service.writer.LogWriter;
Expand Down Expand Up @@ -126,8 +125,7 @@ private void parse(@Nullable Properties properties) {
}
this.callerLevels = CallerLevels.from(properties);
this.loggerEnablementCache = new ConcurrentHashMap<>();
this.standardOutput = new BufferedStandardOutput(properties.getProperty("stream"),
PropertiesUtils.getAsInteger("buffer.back", properties));
this.standardOutput = BufferedStandardOutput.from(this);
this.logServiceWriter = CooperatingWriterGroup.from(this);
this.logEventProcessor = ConseqLogEventProcessor.from(this);
}
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/elf4j/engine/service/util/PropertiesUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public static List<Map<String, String>> getPropertiesGroupOfType(String type, @N
}

/**
* Takes only digits from the value to form a sequence, and tries to parse the sequence as an {@link Integer}
*
* @param name
* full key in properties
* @param properties
Expand All @@ -94,4 +96,21 @@ public static Integer getAsInteger(String name, @NonNull Properties properties)
}
return value.startsWith("-") ? -Integer.parseInt(digits) : Integer.parseInt(digits);
}

/**
* @param name
* full key in properties
* @param properties
* to look up in
* @param defaultValue
* the default value to return if the delegate method {@link #getAsInteger} returns null
* @return result of the delegate method {@link #getAsInteger} or, if that is null, the specified defaultValue
*/
public static int getIntOrDefault(String name, @NonNull Properties properties, int defaultValue) {
Integer value = getAsInteger(name, properties);
if (value == null) {
return defaultValue;
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import elf4j.Level;
import elf4j.engine.service.LogServiceManager;
import elf4j.engine.service.Stoppable;
import elf4j.engine.service.util.MoreAwaitility;
import elf4j.engine.service.configuration.LogServiceConfiguration;
import elf4j.engine.service.util.PropertiesUtils;
import elf4j.util.InternalLogger;
import lombok.ToString;
import org.awaitility.Awaitility;
Expand All @@ -38,13 +39,10 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.Properties;
import java.util.concurrent.*;

/**
*
Expand All @@ -59,30 +57,57 @@ public class BufferedStandardOutput implements StandardOutput, Stoppable {
private boolean stopped;

/**
* @param stream
* @param outStreamType
* standard out stream type, stdout or stderr, default to stdout
* @param bufferCapacity
* buffer capacity queued on standard out stream
*/
public BufferedStandardOutput(String stream, Integer bufferCapacity) {
this.outStreamType = stream == null ? DEFAULT_OUT_STREAM_TYPE : OutStreamType.valueOf(stream.toUpperCase());
bufferCapacity = bufferCapacity == null ? DEFAULT_BACK_BUFFER_CAPACITY : bufferCapacity;
InternalLogger.INSTANCE.log(Level.INFO, "Buffer back: " + bufferCapacity);
this.buffer = bufferCapacity == 0 ? new SynchronousQueue<>() : new ArrayBlockingQueue<>(bufferCapacity);
private BufferedStandardOutput(OutStreamType outStreamType, int bufferCapacity) {
this.outStreamType = outStreamType;
this.buffer = new ArrayBlockingQueue<>(bufferCapacity);
this.pollingBytesWriter = new PollingBytesWriter(bufferCapacity);
LogServiceManager.INSTANCE.registerStop(this);
this.stopped = false;
pollingBytesWriter = new PollingBytesWriter(bufferCapacity);
new Thread(pollingBytesWriter).start();
LogServiceManager.INSTANCE.registerStop(this);
}

/**
* @param logServiceConfiguration
* entire service configuration
* @return the {@link BufferedStandardOutput} per the specified configuration
*/
public static BufferedStandardOutput from(LogServiceConfiguration logServiceConfiguration) {
Properties properties = logServiceConfiguration.getProperties();
return new BufferedStandardOutput(getOutStreamType(properties), getBufferCapacity(properties));
}

private static int getBufferCapacity(Properties properties) {
int bufferCapacity = PropertiesUtils.getIntOrDefault("buffer.back", properties, DEFAULT_BACK_BUFFER_CAPACITY);
InternalLogger.INSTANCE.log(Level.INFO, "Buffer back: " + bufferCapacity);
return bufferCapacity;
}

private static OutStreamType getOutStreamType(Properties properties) {
String stream = properties.getProperty("stream");
return stream == null ? DEFAULT_OUT_STREAM_TYPE : OutStreamType.valueOf(stream.toUpperCase());
}

@Override
public void stop() {
MoreAwaitility.suspend(Duration.ofMillis(50));
ConditionFactory await = Awaitility.with().timeout(30, TimeUnit.MINUTES).await();
await.until(this.buffer::isEmpty);
MoreAwaitility.suspend(Duration.ofMillis(50));
await.until(this.pollingBytesWriter::isBufferEmpty);
this.stopped = true;
long longMinutes = 10;
ExecutorService shutdownThread = Executors.newSingleThreadExecutor();
shutdownThread.execute(() -> {
ConditionFactory await = Awaitility.with().timeout(longMinutes, TimeUnit.MINUTES).await();
await.until(this.buffer::isEmpty);
await.until(this.pollingBytesWriter::isBufferEmpty);
this.stopped = true;
});
shutdownThread.shutdown();
}

@Override
public boolean isStopped() {
return this.stopped;
}

@Override
Expand Down
41 changes: 23 additions & 18 deletions src/test/java/elf4j/engine/service/DispatchingLogServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@

@ExtendWith(MockitoExtension.class)
class DispatchingLogServiceTest {
static class StubLogEventProcessor implements LogEventProcessor {
final LogWriter logWriter;

StubLogEventProcessor(LogWriter logWriter) {
this.logWriter = logWriter;
}

@Override
public void process(LogEvent logEvent) {
logWriter.write(logEvent);
}

@Override
public void stop() {

}

@Override
public boolean isStopped() {
return false;
}
}

@Nested
class isEnabled {
NativeLogger stubLogger;
Expand Down Expand Up @@ -154,22 +177,4 @@ void onlyLogWhenEnabled() {
then(mockLogServiceConfiguration).should(never()).getLogServiceWriter();
}
}

class StubLogEventProcessor implements LogEventProcessor {
final LogWriter logWriter;

StubLogEventProcessor(LogWriter logWriter) {
this.logWriter = logWriter;
}

@Override
public void process(LogEvent logEvent) {
logWriter.write(logEvent);
}

@Override
public void stop() {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import elf4j.engine.NativeLogger;
import elf4j.engine.NativeLoggerFactory;
import elf4j.engine.service.LogEvent;
import elf4j.engine.service.LogService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -42,8 +41,6 @@

@ExtendWith(MockitoExtension.class)
class JsonPatternTest {
@Mock LogService stubLogService;

@Mock NativeLoggerFactory mockNativeLoggerFactory;
LogEvent mockLogEvent;
String mockMessage = "testLogMessage {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import elf4j.engine.NativeLogger;
import elf4j.engine.NativeLoggerFactory;
import elf4j.engine.service.LogEvent;
import elf4j.engine.service.LogService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand All @@ -42,8 +41,6 @@

@ExtendWith(MockitoExtension.class)
class MessageAndExceptionPatternTest {
@Mock LogService stubLogService;

@Mock NativeLoggerFactory mockNativeLoggerFactory;
LogEvent mockLogEvent;
String mockMessage = "testLogMessage {}";
Expand Down

0 comments on commit 860e939

Please sign in to comment.