From ab5e75a29f15534325f4b7e881c51cce6e94a9f1 Mon Sep 17 00:00:00 2001 From: neo <1100909+neowu@users.noreply.github.com> Date: Mon, 13 Jan 2025 21:55:36 -0500 Subject: [PATCH] * log-exporter: remove trace exporting Signed-off-by: neo <1100909+neowu@users.noreply.github.com> --- CHANGELOG.md | 2 ++ TODO.md | 13 ++++----- docker/kafka/docker-compose.yml | 6 ++-- .../log/kafka/ActionLogMessageHandler.java | 11 -------- .../java/core/log/service/ArchiveService.java | 28 ------------------- .../core/log/service/ArchiveServiceTest.java | 8 ------ 6 files changed, 11 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3df415550..71f11c419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ### 9.1.5 (11/11/2024 - ) * log-exporter: change gsutil to "gcloud storage" +* log-exporter: remove trace exporting + > trace is only for troubleshooting, unnecessary for long term * mongo: update driver to 5.2.1 > gsutil is deprecated and requires old version of python diff --git a/TODO.md b/TODO.md index f49867af0..03d7bbf18 100644 --- a/TODO.md +++ b/TODO.md @@ -5,17 +5,10 @@ * impl own json bind by referring https://github.com/json-iterator/java and https://github.com/ngs-doo/dsl-json with annotation processor? * framework error (queue listener, background task error, custom scheduler trigger) forward to kafka? -* redis using ping/pong to validate connection? for zero downtime upgrading e.g. with gcloud memory store * use adminClient to check kafka ready? or retry sending message? * kafka: is static membership (group.instance.id) useful within stateful set? -* log: use es data stream + ILM to rotate index? and is time series data stream (TSDS) useful (only for metrics data) - > not able to close index, only delete, and can simplify log processor and ES interface (no need to support "index" param in all requests) - > to use TSDS, convert statMessage into pure metrics, and make error/info into action? - > https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html - > or framework should manage time based index by itself? - * db: update "on duplicated key" values() syntax, > The use of VALUES() to refer to the new row and columns is deprecated beginning with MySQL 8.0.20, and is subject to removal in a future version of MySQL. @@ -30,4 +23,8 @@ https://bugs.openjdk.org/browse/JDK-8335181 * log diagram, fix d3 tooltip (generate separated json, and make d3 tooltip show other non-HTML info) -* change kafka compression to zstd? as it is production ready and widely used + +* redis using ping/pong to validate connection? for zero downtime upgrading e.g. with gcloud memory store +* migrate to dragonflydb and support RESP3 (cluster / MOVED handling) ? +* migrate to opensearch ? +* log exporter, save in parquet format, or reimplement in rust? diff --git a/docker/kafka/docker-compose.yml b/docker/kafka/docker-compose.yml index dd99c26b8..91cc80b72 100644 --- a/docker/kafka/docker-compose.yml +++ b/docker/kafka/docker-compose.yml @@ -1,13 +1,15 @@ services: kafka: image: apache/kafka:3.9.0 + hostname: dev.internal ports: - 9092:9092 - 1099:1099 environment: KAFKA_NODE_ID: 1 CLUSTER_ID: lK_g8qooQNOD9klGoxLojA - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=dev.internal + KAFKA_JMX_PORT: 1099 + KAFKA_JMX_HOSTNAME: dev.internal KAFKA_JVM_PERFORMANCE_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true KAFKA_HEAP_OPTS: -Xms1G -Xmx1G KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT @@ -24,7 +26,7 @@ services: volumes: - data:/var/lib/kafka/data # mm: -# image: neowu/kafka:3.6.0 +# image: apache/kafka:3.9.0 # volumes: # - ./connect-mirror-maker.properties:/opt/kafka/config/connect-mirror-maker.properties # entrypoint: ["/bin/bash", "-c", "/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/connect-mirror-maker.properties"] diff --git a/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java b/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java index 0f6d4e2f2..d69ec7bc2 100644 --- a/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java +++ b/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java @@ -10,7 +10,6 @@ import java.io.BufferedOutputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDateTime; @@ -37,22 +36,12 @@ public void handle(List> messages) throws IOException for (Message message : messages) { ActionLogEntry entry = entry(message.value); - if (message.value.traceLog != null) { - entry.traceLogPath = archiveService.traceLogPath(now, entry.app, entry.id); - writeTraceLog(entry.traceLogPath, message.value.traceLog); - } - stream.write(writer.toJSON(entry)); stream.write('\n'); } } } - private void writeTraceLog(String traceLogPath, String content) throws IOException { - Path path = archiveService.initializeLogFilePath(traceLogPath); - Files.writeString(path, content, StandardCharsets.UTF_8, CREATE, APPEND); - } - private ActionLogEntry entry(ActionLogMessage message) { var entry = new ActionLogEntry(); entry.id = message.id; diff --git a/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java b/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java index bf696920c..e138bac8d 100644 --- a/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java +++ b/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java @@ -2,17 +2,14 @@ import core.framework.crypto.Hash; import core.framework.inject.Inject; -import core.framework.util.Files; import core.framework.util.Network; import core.framework.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.time.LocalDate; -import java.time.LocalDateTime; import static java.nio.file.Files.createDirectories; import static java.nio.file.Files.exists; @@ -45,20 +42,6 @@ public void uploadArchive(LocalDate date) { uploadService.upload(eventFilePath, eventPath); } - Path traceLogDirPath = Path.of(logDir.toString(), Strings.format("/trace/{}", date)); - if (exists(traceLogDirPath)) { - File[] appDirs = traceLogDirPath.toFile().listFiles(File::isDirectory); - if (appDirs != null) { - for (File appDir : appDirs) { - String app = appDir.getName(); - String traceLogPath = Strings.format("/trace/{}/{}-{}-{}.tar.gz", date, app, date, hash); - Path traceLogFilePath = Path.of(logDir.toString(), traceLogPath); - shell.execute("tar", "-czf", traceLogFilePath.toString(), "-C", logDir.toString(), Strings.format("trace/{}/{}", date, app)); - uploadService.upload(traceLogFilePath, traceLogPath); - } - } - } - logger.info("uploading end, date={}", date); } @@ -70,13 +53,6 @@ public void cleanupArchive(LocalDate date) { Path eventFilePath = Path.of(logDir.toString(), eventPath(date)); shell.execute("rm", "-f", eventFilePath.toString()); - - Path traceLogDirPath = Path.of(logDir.toString(), Strings.format("/trace/{}", date)); - if (exists(traceLogDirPath)) { - logger.info("delete trace logs, path={}", traceLogDirPath); - // use shell (rm -rf or find) may cause pod terminate with error code 137 on mounted disk - Files.deleteDir(traceLogDirPath); - } } public String actionLogPath(LocalDate date) { @@ -87,10 +63,6 @@ public String eventPath(LocalDate date) { return Strings.format("/event/{}/event-{}-{}.ndjson", date.getYear(), date, hash); } - public String traceLogPath(LocalDateTime now, String app, String id) { - return Strings.format("/trace/{}/{}/{}/{}.txt", now.toLocalDate(), app, now.getHour(), id); - } - public Path initializeLogFilePath(String logPath) throws IOException { Path path = Path.of(logDir.toString(), logPath); Path parent = path.getParent(); diff --git a/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java b/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java index d4f3e9bce..1a3a900b1 100644 --- a/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java +++ b/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java @@ -8,7 +8,6 @@ import org.junit.jupiter.api.condition.OS; import java.time.LocalDate; -import java.time.LocalDateTime; import static org.assertj.core.api.Assertions.assertThat; @@ -41,13 +40,6 @@ void eventPath() { .matches("/event/2022/event-2022-11-03-[a-z0-9]*.ndjson"); } - @Test - void traceArchivePath() { - LocalDateTime now = LocalDateTime.parse("2022-11-03T02:00:00"); - assertThat(archiveService.traceLogPath(now, "service", "id")) - .isEqualTo("/trace/2022-11-03/service/2/id.txt"); - } - @Test @EnabledOnOs({OS.MAC, OS.LINUX}) void cleanupArchive() {