diff --git a/.gitignore b/.gitignore
index 3a855b4..a787546 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
target/**
+rpm/target/**
.idea/**
dependency-reduced-pom.xml
var/
diff --git a/Dockerfile b/Dockerfile
index b713e25..e49e880 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,6 @@
FROM rockylinux:8
COPY rpm/target/rpm/com.teragrep-k8s_01/RPMS/noarch/com.teragrep-k8s_01-*.rpm /rpm/
-RUN dnf -y install jq java-1.8.0-headless /rpm/*.rpm && yum clean all
+RUN dnf -y install jq /rpm/*.rpm && yum clean all
VOLUME /opt/teragrep/k8s_01/var
VOLUME /opt/teragrep/k8s_01/etc
WORKDIR /opt/teragrep/k8s_01
diff --git a/etc/config.json b/etc/config.json
index 5f5b57d..96e32fd 100644
--- a/etc/config.json
+++ b/etc/config.json
@@ -1,4 +1,7 @@
{
+ "metrics": {
+ "port": 12345
+ },
"kubernetes": {
"logdir": "/var/log/containers",
"url": "https://127.0.0.1:8443",
diff --git a/example/combined.yaml b/example/combined.yaml
index 2eb925a..c1711fd 100644
--- a/example/combined.yaml
+++ b/example/combined.yaml
@@ -36,6 +36,9 @@ apiVersion: v1
data:
config.json: |
{
+ "metrics": {
+ "port": 12345
+ },
"kubernetes": {
"logdir": "/var/log/containers",
"url": "https://127.0.0.1:8443",
@@ -98,7 +101,7 @@ data:
kind: ConfigMap
metadata:
- name: app-config-42gthtbf4f
+ name: app-config-td2t2mhm2c
---
apiVersion: v1
kind: Secret
@@ -221,7 +224,7 @@ spec:
terminationGracePeriodSeconds: 0
volumes:
- configMap:
- name: app-config-42gthtbf4f
+ name: app-config-td2t2mhm2c
name: app-config
- hostPath:
path: /var/log/containers
diff --git a/example/config/config.json b/example/config/config.json
index 264c49e..9d7cf49 100644
--- a/example/config/config.json
+++ b/example/config/config.json
@@ -1,4 +1,7 @@
{
+ "metrics": {
+ "port": 12345
+ },
"kubernetes": {
"logdir": "/var/log/containers",
"url": "https://127.0.0.1:8443",
diff --git a/pom.xml b/pom.xml
index 4a10dba..b98cc30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,6 +17,9 @@
0.0.1
-SNAPSHOT
+ 0.16.0
+ 9.4.51.v20230217
+ 4.2.18
@@ -130,6 +133,50 @@
2.0.7
+
+
+ io.prometheus
+ simpleclient
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_dropwizard
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_servlet
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${prometheus-simpleclient.version}
+
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${prometheus-jettyservlet.version}
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${dropwizard-metrics.version}
+
+
+ io.dropwizard.metrics
+ metrics-jmx
+ ${dropwizard-metrics.version}
+
+
+ io.dropwizard.metrics
+ metrics-jvm
+ ${dropwizard-metrics.version}
+
+
diff --git a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
index 9780a34..acb36be 100644
--- a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
+++ b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
@@ -17,6 +17,7 @@
package com.teragrep.k8s_01;
+import com.codahale.metrics.MetricRegistry;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import com.teragrep.k8s_01.config.AppConfig;
@@ -35,7 +36,7 @@
public class KubernetesLogReader {
private static final Logger LOGGER = LoggerFactory.getLogger(KubernetesLogReader.class);
-
+ private static final MetricRegistry metricRegistry = new MetricRegistry();
static Gson gson = new Gson();
public static void main(String[] args) throws IOException {
AppConfig appConfig;
@@ -64,6 +65,7 @@ public static void main(String[] args) throws IOException {
return;
}
KubernetesCachingAPIClient cacheClient = new KubernetesCachingAPIClient(appConfig.getKubernetes());
+ PrometheusMetrics prometheusMetrics = new PrometheusMetrics(appConfig.getMetrics().getPort());
// Pool of Relp output threads to be shared by every consumer
BlockingQueue relpOutputPool = new LinkedBlockingDeque<>(appConfig.getRelp().getOutputThreads());
@@ -79,7 +81,7 @@ public static void main(String[] args) throws IOException {
"Adding RelpOutput thread #{}",
i
);
- relpOutputPool.put(new RelpOutput(appConfig.getRelp(), i));
+ relpOutputPool.put(new RelpOutput(appConfig.getRelp(), i, metricRegistry));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -171,5 +173,6 @@ public static void main(String[] args) throws IOException {
throw new RuntimeException(e);
}
}
+ prometheusMetrics.close();
}
}
diff --git a/src/main/java/com/teragrep/k8s_01/PrometheusMetrics.java b/src/main/java/com/teragrep/k8s_01/PrometheusMetrics.java
new file mode 100644
index 0000000..6544cce
--- /dev/null
+++ b/src/main/java/com/teragrep/k8s_01/PrometheusMetrics.java
@@ -0,0 +1,95 @@
+/*
+ Kubernetes log forwarder k8s_01
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.k8s_01;
+
+import com.codahale.metrics.*;
+import com.codahale.metrics.jmx.JmxReporter;
+import com.codahale.metrics.jvm.*;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.dropwizard.DropwizardExports;
+import io.prometheus.client.exporter.MetricsServlet;
+import io.prometheus.client.hotspot.DefaultExports;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+public class PrometheusMetrics {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusMetrics.class);
+ private final Server jettyServer;
+ public PrometheusMetrics(int port) {
+ LOGGER.info("Starting prometheus metrics server on port {}", port);
+ // prometheus-exporter
+ jettyServer = new Server(port);
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ jettyServer.setHandler(context);
+
+ MetricsServlet metricsServlet = new MetricsServlet();
+ ServletHolder servletHolder = new ServletHolder(metricsServlet);
+ context.addServlet(servletHolder, "/metrics");
+ setupDropWizard();
+ // Add metrics about CPU, JVM memory etc.
+ DefaultExports.initialize();
+ // Start the webserver.
+ try {
+ jettyServer.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static void setupDropWizard() {
+ MetricRegistry metricRegistry = new MetricRegistry();
+
+ // Totals
+ metricRegistry.register(name("total", "reconnects"), new Counter());
+ metricRegistry.register(name("total", "connections"), new Counter());
+
+ // Throughput meters
+ metricRegistry.register(name("throughput", "bytes"), new Meter(new SlidingTimeWindowMovingAverages()));
+ metricRegistry.register(name("throughput", "records"), new Meter(new SlidingTimeWindowMovingAverages()));
+ metricRegistry.register(name("throughput", "errors"), new Meter(new SlidingTimeWindowMovingAverages()));
+
+ // Misc
+ metricRegistry.register(name("jvm", "vm"), new JvmAttributeGaugeSet());
+ metricRegistry.register(name("jvm", "memory"), new MemoryUsageGaugeSet());
+ metricRegistry.register(name("jvm", "threads"), new ThreadStatesGaugeSet());
+ metricRegistry.register(name("jvm", "gc"), new GarbageCollectorMetricSet());
+ SharedMetricRegistries.add("default", metricRegistry);
+
+ // Add to Prometheus metrics
+ CollectorRegistry.defaultRegistry.register(new DropwizardExports(metricRegistry));
+
+ // Enable JMX listener
+ JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+ jmxReporter.start();
+ }
+
+ public void close() {
+ LOGGER.info("Closing prometheus metrics server");
+ try {
+ jettyServer.stop();
+ } catch (Exception e) {
+ LOGGER.error("Failed to stop jettyServer:", e);
+ }
+ }
+}
diff --git a/src/main/java/com/teragrep/k8s_01/RelpOutput.java b/src/main/java/com/teragrep/k8s_01/RelpOutput.java
index 04fd5be..97bfe54 100644
--- a/src/main/java/com/teragrep/k8s_01/RelpOutput.java
+++ b/src/main/java/com/teragrep/k8s_01/RelpOutput.java
@@ -18,6 +18,7 @@
package com.teragrep.k8s_01;
import com.cloudbees.syslog.SyslogMessage;
+import com.codahale.metrics.*;
import com.teragrep.k8s_01.config.AppConfigRelp;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.RelpConnection;
@@ -28,13 +29,19 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
+import static com.codahale.metrics.MetricRegistry.name;
+
public class RelpOutput {
private static final Logger LOGGER = LoggerFactory.getLogger(RelpOutput.class);
private final RelpConnection relpConnection;
private final AppConfigRelp relpConfig;
private final int id;
-
- RelpOutput(AppConfigRelp appConfigRelp, int threadId) {
+ private final Counter totalReconnects;
+ private final Counter totalConnections;
+ private final Meter throughputBytes;
+ private final Meter throughputRecords;
+ private final Meter throughputErrors;
+ RelpOutput(AppConfigRelp appConfigRelp, int threadId, MetricRegistry metricRegistry) {
relpConfig = appConfigRelp;
id = threadId;
if(LOGGER.isDebugEnabled()) {
@@ -55,6 +62,14 @@ public class RelpOutput {
relpConnection.setConnectionTimeout(relpConfig.getConnectionTimeout());
relpConnection.setReadTimeout(relpConfig.getReadTimeout());
relpConnection.setWriteTimeout(relpConfig.getWriteTimeout());
+ // Throughput
+ throughputBytes = metricRegistry.meter(name("throughput", "bytes"));
+ throughputRecords = metricRegistry.meter(name("throughput", "records"));
+ throughputErrors = metricRegistry.meter(name("throughput", "errors"));
+
+ // Totals
+ totalConnections = metricRegistry.counter(name("total", "connections"));
+ totalReconnects = metricRegistry.counter(name("total", "reconnects"));
connect();
}
@@ -71,14 +86,18 @@ private void connect() {
);
}
connected = relpConnection.connect(relpConfig.getTarget(), relpConfig.getPort());
+ totalConnections.inc();
} catch (IOException | TimeoutException e) {
LOGGER.error(
"[#{}] Can't connect to Relp server:",
getId(),
e
);
+ throughputErrors.mark();
+ totalConnections.dec();
}
if (!connected) {
+ totalReconnects.inc();
try {
LOGGER.info(
"[#{}] Attempting to reconnect in {}ms.",
@@ -88,6 +107,7 @@ private void connect() {
Thread.sleep(relpConfig.getReconnectInterval());
} catch (InterruptedException e) {
e.printStackTrace();
+ throughputErrors.mark();
}
}
}
@@ -99,6 +119,7 @@ public void disconnect() {
getId()
);
try {
+ totalConnections.dec();
relpConnection.disconnect();
} catch (IOException | TimeoutException e) {
LOGGER.debug(
@@ -106,6 +127,7 @@ public void disconnect() {
getId()
);
relpConnection.tearDown();
+ throughputErrors.mark();
throw new RuntimeException(e);
}
}
@@ -145,6 +167,7 @@ public void send(SyslogMessage syslogMessage) {
getId(),
e
);
+ throughputErrors.mark();
}
// Check if everything has been sent, retry and reconnect if not.
if (!batch.verifyTransactionAll()) {
@@ -154,9 +177,12 @@ public void send(SyslogMessage syslogMessage) {
);
batch.retryAllFailed();
relpConnection.tearDown();
+ totalConnections.dec();
connect();
} else {
allSent = true;
+ throughputBytes.mark(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8).length);
+ throughputRecords.mark();
}
}
}
diff --git a/src/main/java/com/teragrep/k8s_01/config/AppConfig.java b/src/main/java/com/teragrep/k8s_01/config/AppConfig.java
index 2542f42..f55a192 100644
--- a/src/main/java/com/teragrep/k8s_01/config/AppConfig.java
+++ b/src/main/java/com/teragrep/k8s_01/config/AppConfig.java
@@ -22,6 +22,12 @@
/* POJO representing the main config.json */
public class AppConfig {
private AppConfigKubernetes kubernetes;
+
+ public AppConfigMetrics getMetrics() {
+ return metrics;
+ }
+
+ private AppConfigMetrics metrics;
private AppConfigRelp relp;
public AppConfigKubernetes getKubernetes() {
diff --git a/src/main/java/com/teragrep/k8s_01/config/AppConfigMetrics.java b/src/main/java/com/teragrep/k8s_01/config/AppConfigMetrics.java
new file mode 100644
index 0000000..1b80966
--- /dev/null
+++ b/src/main/java/com/teragrep/k8s_01/config/AppConfigMetrics.java
@@ -0,0 +1,33 @@
+/*
+ Kubernetes log forwarder k8s_01
+ Copyright (C) 2023 Suomen Kanuuna Oy
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package com.teragrep.k8s_01.config;
+
+import com.google.gson.Gson;
+
+public class AppConfigMetrics {
+ public int getPort() {
+ return port;
+ }
+
+ private int port;
+
+ @Override
+ public String toString() {
+ return new Gson().toJson(this);
+ }
+}