Skip to content

Commit

Permalink
add logging (#58)
Browse files Browse the repository at this point in the history
* add logging

* add logging
  • Loading branch information
kortemik authored Jan 23, 2025
1 parent ebe3fb0 commit cf80428
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 19 deletions.
26 changes: 20 additions & 6 deletions src/main/java/com/teragrep/aer_02/DefaultOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,31 @@
import com.teragrep.rlp_01.client.*;
import com.teragrep.rlp_01.pool.Pool;
import com.teragrep.rlp_01.pool.UnboundPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.logging.Logger;

/**
* Implementation of an shareable output. Required to be thread-safe.
*/
final class DefaultOutput implements Output {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);

private final Pool<IManagedRelpConnection> relpConnectionPool;
private final String relpAddress;
private final int relpPort;
private final Logger logger;

DefaultOutput(
Logger logger,
String name,
RelpConnectionConfig relpConnectionConfig,
MetricRegistry metricRegistry,
SSLContextSupplier sslContextSupplier
) {
this(
logger,
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
relpConnectionConfig.asRelpConfig(),
name,
metricRegistry,
Expand All @@ -86,11 +87,18 @@ final class DefaultOutput implements Output {
);
}

DefaultOutput(String name, RelpConnectionConfig relpConnectionConfig, MetricRegistry metricRegistry) {
DefaultOutput(
Logger logger,
String name,
RelpConnectionConfig relpConnectionConfig,
MetricRegistry metricRegistry
) {
this(
logger,
relpConnectionConfig,
new UnboundPool<>(
new ManagedRelpConnectionWithMetricsFactory(
logger,
relpConnectionConfig.asRelpConfig(),
name,
metricRegistry,
Expand All @@ -101,11 +109,17 @@ final class DefaultOutput implements Output {
);
}

DefaultOutput(RelpConnectionConfig relpConnectionConfig, Pool<IManagedRelpConnection> relpConnectionPool) {
DefaultOutput(
Logger logger,
RelpConnectionConfig relpConnectionConfig,
Pool<IManagedRelpConnection> relpConnectionPool
) {
this.relpAddress = relpConnectionConfig.relpAddress();
this.relpPort = relpConnectionConfig.relpPort();

this.relpConnectionPool = relpConnectionPool;
this.logger = logger;
logger.info("DefaultOutput constructor done");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

import static com.codahale.metrics.MetricRegistry.name;

public class ManagedRelpConnectionWithMetrics implements IManagedRelpConnection {

private final Logger logger;
private final IRelpConnection relpConnection;
private boolean hasConnected;

Expand All @@ -70,11 +72,13 @@ public class ManagedRelpConnectionWithMetrics implements IManagedRelpConnection
private final Timer connectLatency;

public ManagedRelpConnectionWithMetrics(
Logger logger,
IRelpConnection relpConnection,
String name,
MetricRegistry metricRegistry
) {
this(
logger,
relpConnection,
name,
metricRegistry,
Expand All @@ -84,12 +88,14 @@ public ManagedRelpConnectionWithMetrics(
}

public ManagedRelpConnectionWithMetrics(
Logger logger,
IRelpConnection relpConnection,
String name,
MetricRegistry metricRegistry,
Reservoir sendReservoir,
Reservoir connectReservoir
) {
this.logger = logger;
this.relpConnection = relpConnection;

this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
Expand Down Expand Up @@ -131,8 +137,8 @@ public void connect() {
connects.inc();
}
catch (IOException | TimeoutException e) {
System.err
.println(
logger
.warning(
"Failed to connect to relp server <[" + relpConnection.relpConfig().relpTarget + "]>:<["
+ relpConnection.relpConfig().relpPort + "]>: <" + e.getMessage() + ">"
);
Expand All @@ -142,7 +148,7 @@ public void connect() {
retriedConnects.inc();
}
catch (InterruptedException exception) {
System.err.println("Reconnection timer interrupted, reconnecting now");
logger.warning("Reconnection timer interrupted, reconnecting now");
}
}
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public void ensureSent(byte[] bytes) {
bytesCnt.inc(bytes.length);
}
catch (IllegalStateException | IOException | TimeoutException e) {
System.err.println("Exception <" + e.getMessage() + "> while sending relpBatch. Will retry");
logger.warning("Exception <" + e.getMessage() + "> while sending relpBatch. Will retry");
}
if (!relpBatch.verifyTransactionAll()) {
relpBatch.retryAllFailed();
Expand All @@ -203,7 +209,7 @@ public void close() {
this.relpConnection.disconnect();
}
catch (IllegalStateException | IOException | TimeoutException e) {
System.err.println("Forcefully closing connection due to exception <" + e.getMessage() + ">");
logger.warning("Forcefully closing connection due to exception <" + e.getMessage() + ">");
}
finally {
tearDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.codahale.metrics.MetricRegistry;
import com.teragrep.rlp_01.RelpConnection;
import com.teragrep.rlp_01.client.*;
import java.util.logging.Logger;

import java.util.function.Supplier;

Expand All @@ -58,36 +59,46 @@ public class ManagedRelpConnectionWithMetricsFactory implements Supplier<IManage
private final SSLContextSupplier sslContextSupplier;
private final String name;
private final MetricRegistry metricRegistry;
private final Logger logger;

public ManagedRelpConnectionWithMetricsFactory(String name, MetricRegistry metricRegistry, RelpConfig relpConfig) {
this(relpConfig, name, metricRegistry, new SocketConfigDefault());
public ManagedRelpConnectionWithMetricsFactory(
Logger logger,
String name,
MetricRegistry metricRegistry,
RelpConfig relpConfig
) {
this(logger, relpConfig, name, metricRegistry, new SocketConfigDefault());
}

public ManagedRelpConnectionWithMetricsFactory(
Logger logger,
RelpConfig relpConfig,
String name,
MetricRegistry metricRegistry,
SocketConfig socketConfig
) {
this(relpConfig, name, metricRegistry, socketConfig, new SSLContextSupplierStub());
this(logger, relpConfig, name, metricRegistry, socketConfig, new SSLContextSupplierStub());
}

public ManagedRelpConnectionWithMetricsFactory(
Logger logger,
RelpConfig relpConfig,
String name,
MetricRegistry metricRegistry,
SSLContextSupplier sslContextSupplier
) {
this(relpConfig, name, metricRegistry, new SocketConfigDefault(), sslContextSupplier);
this(logger, relpConfig, name, metricRegistry, new SocketConfigDefault(), sslContextSupplier);
}

public ManagedRelpConnectionWithMetricsFactory(
Logger logger,
RelpConfig relpConfig,
String name,
MetricRegistry metricRegistry,
SocketConfig socketConfig,
SSLContextSupplier sslContextSupplier
) {
this.logger = logger;
this.relpConfig = relpConfig;
this.name = name;
this.metricRegistry = metricRegistry;
Expand All @@ -97,6 +108,7 @@ public ManagedRelpConnectionWithMetricsFactory(

@Override
public IManagedRelpConnection get() {
logger.info("get() called for new IManagedRelpConnection");
IRelpConnection relpConnection;
if (sslContextSupplier.isStub()) {
relpConnection = new RelpConnectionWithConfig(new RelpConnection(), relpConfig);
Expand All @@ -114,6 +126,7 @@ public IManagedRelpConnection get() {
relpConnection.setKeepAlive(socketConfig.keepAlive());

IManagedRelpConnection managedRelpConnection = new ManagedRelpConnectionWithMetrics(
logger,
relpConnection,
name,
metricRegistry
Expand All @@ -126,7 +139,7 @@ public IManagedRelpConnection get() {
if (relpConfig.maxIdleEnabled) {
managedRelpConnection = new RenewableRelpConnection(managedRelpConnection, relpConfig.maxIdle);
}

logger.info("returning new managedRelpConnection");
return managedRelpConnection;
}
}
8 changes: 8 additions & 0 deletions src/main/java/com/teragrep/aer_02/SyslogBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class SyslogBridge {
private DefaultOutput defaultOutput = null;
private boolean initialized = false;

public SyslogBridge() {

}

@FunctionName("metrics")
public HttpResponseMessage metrics(
@HttpTrigger(
Expand Down Expand Up @@ -131,6 +135,7 @@ public void eventHubTriggerToSyslog(
try {
initLock.lock();
if (!initialized) {
context.getLogger().info("initializing at " + this);
final Report report = new JmxReport(
new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry), metricRegistry
);
Expand All @@ -139,6 +144,7 @@ public void eventHubTriggerToSyslog(
if (configSource.source("relp.tls.mode", "none").equals("keyVault")) {

defaultOutput = new DefaultOutput(
context.getLogger(),
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry,
Expand All @@ -147,6 +153,7 @@ public void eventHubTriggerToSyslog(
}
else {
defaultOutput = new DefaultOutput(
context.getLogger(),
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry
Expand All @@ -161,6 +168,7 @@ public void eventHubTriggerToSyslog(
Runtime.getRuntime().addShutdownHook(shutdownHook);

initialized = true;
context.getLogger().info("initialized at " + this);
}
}
finally {
Expand Down
28 changes: 25 additions & 3 deletions src/test/java/com/teragrep/aer_02/DefaultOutputTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.junit.jupiter.api.TestInstance;

import java.nio.charset.StandardCharsets;
import java.util.logging.Logger;

import static com.codahale.metrics.MetricRegistry.name;

Expand All @@ -88,6 +89,7 @@ public void testSendLatencyMetricIsCapped() { // Should only keep information on

UnboundPool<IManagedRelpConnection> pool = new UnboundPool<>(
() -> new ManagedRelpConnectionWithMetrics(
Logger.getAnonymousLogger(),
new RelpConnectionWithConfig(
new RelpConnectionFake(),
new RelpConnectionConfig(new PropertySource()).asRelpConfig()
Expand All @@ -100,7 +102,13 @@ public void testSendLatencyMetricIsCapped() { // Should only keep information on
new ManagedRelpConnectionStub()
);

try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {

for (int i = 0; i < measurementLimit + 100; i++) { // send more messages than the limit is
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -131,12 +139,19 @@ public void testConnectionLatencyMetricIsCapped() { // Should take information o

UnboundPool<IManagedRelpConnection> pool = new UnboundPool<>(
() -> new ManagedRelpConnectionWithMetrics(
Logger.getAnonymousLogger(),
new RelpConnectionWithConfig(new ConnectionlessRelpConnectionFake(reconnections), new RelpConnectionConfig(new PropertySource()).asRelpConfig()), "defaultOutput", metricRegistry, sendReservoir, connectReservoir
),
new ManagedRelpConnectionStub()
);

try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}

Expand All @@ -161,12 +176,19 @@ public void testConnectionLatencyMetricWithException() { // should not update va
MetricRegistry metricRegistry = new MetricRegistry();
UnboundPool<IManagedRelpConnection> pool = new UnboundPool<>(
() -> new ManagedRelpConnectionWithMetrics(
Logger.getAnonymousLogger(),
new RelpConnectionWithConfig(new ThrowingRelpConnectionFake(reconnections), new RelpConnectionConfig(new PropertySource()).asRelpConfig()), "defaultOutput", metricRegistry
),
new ManagedRelpConnectionStub()
);

try (DefaultOutput output = new DefaultOutput(new RelpConnectionConfig(new PropertySource()), pool)) {
try (
DefaultOutput output = new DefaultOutput(
Logger.getAnonymousLogger(),
new RelpConnectionConfig(new PropertySource()),
pool
)
) {
output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Logger;

public class EventDataConsumerTlsTest {

Expand Down Expand Up @@ -176,6 +177,7 @@ public boolean isStub() {
Sourceable configSource = new EnvironmentSource();

DefaultOutput defaultOutput = new DefaultOutput(
Logger.getAnonymousLogger(),
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry,
Expand Down

0 comments on commit cf80428

Please sign in to comment.