Skip to content

Commit

Permalink
* log-exporter: remove trace exporting
Browse files Browse the repository at this point in the history
Signed-off-by: neo <[email protected]>
  • Loading branch information
neowu committed Jan 14, 2025
1 parent 49c7135 commit ab5e75a
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### 9.1.5 (11/11/2024 - )

* log-exporter: change gsutil to "gcloud storage"
* log-exporter: remove trace exporting
> trace is only for troubleshooting, unnecessary for long term
* mongo: update driver to 5.2.1

> gsutil is deprecated and requires old version of python
Expand Down
13 changes: 5 additions & 8 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@
* impl own json bind by referring https://github.com/json-iterator/java and https://github.com/ngs-doo/dsl-json with annotation processor?

* framework error (queue listener, background task error, custom scheduler trigger) forward to kafka?
* redis using ping/pong to validate connection? for zero downtime upgrading e.g. with gcloud memory store
* use adminClient to check kafka ready? or retry sending message?

* kafka: is static membership (group.instance.id) useful within stateful set?

* log: use es data stream + ILM to rotate index? and is time series data stream (TSDS) useful (only for metrics data)
> not able to close index, only delete, and can simplify log processor and ES interface (no need to support "index" param in all requests)
> to use TSDS, convert statMessage into pure metrics, and make error/info into action?
> https://www.elastic.co/guide/en/elasticsearch/reference/current/tsds.html
> or framework should manage time based index by itself?
* db: update "on duplicated key" values() syntax,
> The use of VALUES() to refer to the new row and columns is deprecated beginning with MySQL 8.0.20, and is subject to removal in a future version of MySQL.
Expand All @@ -30,4 +23,8 @@
https://bugs.openjdk.org/browse/JDK-8335181

* log diagram, fix d3 tooltip (generate separated json, and make d3 tooltip show other non-HTML info)
* change kafka compression to zstd? as it is production ready and widely used

* redis using ping/pong to validate connection? for zero downtime upgrading e.g. with gcloud memory store
* migrate to dragonflydb and support RESP3 (cluster / MOVED handling) ?
* migrate to opensearch ?
* log exporter, save in parquet format, or reimplement in rust?
6 changes: 4 additions & 2 deletions docker/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
services:
kafka:
image: apache/kafka:3.9.0
hostname: dev.internal
ports:
- 9092:9092
- 1099:1099
environment:
KAFKA_NODE_ID: 1
CLUSTER_ID: lK_g8qooQNOD9klGoxLojA
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=dev.internal
KAFKA_JMX_PORT: 1099
KAFKA_JMX_HOSTNAME: dev.internal
KAFKA_JVM_PERFORMANCE_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true
KAFKA_HEAP_OPTS: -Xms1G -Xmx1G
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
Expand All @@ -24,7 +26,7 @@ services:
volumes:
- data:/var/lib/kafka/data
# mm:
# image: neowu/kafka:3.6.0
# image: apache/kafka:3.9.0
# volumes:
# - ./connect-mirror-maker.properties:/opt/kafka/config/connect-mirror-maker.properties
# entrypoint: ["/bin/bash", "-c", "/opt/kafka/bin/connect-mirror-maker.sh /opt/kafka/config/connect-mirror-maker.properties"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
Expand All @@ -37,22 +36,12 @@ public void handle(List<Message<ActionLogMessage>> messages) throws IOException
for (Message<ActionLogMessage> message : messages) {
ActionLogEntry entry = entry(message.value);

if (message.value.traceLog != null) {
entry.traceLogPath = archiveService.traceLogPath(now, entry.app, entry.id);
writeTraceLog(entry.traceLogPath, message.value.traceLog);
}

stream.write(writer.toJSON(entry));
stream.write('\n');
}
}
}

private void writeTraceLog(String traceLogPath, String content) throws IOException {
Path path = archiveService.initializeLogFilePath(traceLogPath);
Files.writeString(path, content, StandardCharsets.UTF_8, CREATE, APPEND);
}

private ActionLogEntry entry(ActionLogMessage message) {
var entry = new ActionLogEntry();
entry.id = message.id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

import core.framework.crypto.Hash;
import core.framework.inject.Inject;
import core.framework.util.Files;
import core.framework.util.Network;
import core.framework.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.LocalDate;
import java.time.LocalDateTime;

import static java.nio.file.Files.createDirectories;
import static java.nio.file.Files.exists;
Expand Down Expand Up @@ -45,20 +42,6 @@ public void uploadArchive(LocalDate date) {
uploadService.upload(eventFilePath, eventPath);
}

Path traceLogDirPath = Path.of(logDir.toString(), Strings.format("/trace/{}", date));
if (exists(traceLogDirPath)) {
File[] appDirs = traceLogDirPath.toFile().listFiles(File::isDirectory);
if (appDirs != null) {
for (File appDir : appDirs) {
String app = appDir.getName();
String traceLogPath = Strings.format("/trace/{}/{}-{}-{}.tar.gz", date, app, date, hash);
Path traceLogFilePath = Path.of(logDir.toString(), traceLogPath);
shell.execute("tar", "-czf", traceLogFilePath.toString(), "-C", logDir.toString(), Strings.format("trace/{}/{}", date, app));
uploadService.upload(traceLogFilePath, traceLogPath);
}
}
}

logger.info("uploading end, date={}", date);
}

Expand All @@ -70,13 +53,6 @@ public void cleanupArchive(LocalDate date) {

Path eventFilePath = Path.of(logDir.toString(), eventPath(date));
shell.execute("rm", "-f", eventFilePath.toString());

Path traceLogDirPath = Path.of(logDir.toString(), Strings.format("/trace/{}", date));
if (exists(traceLogDirPath)) {
logger.info("delete trace logs, path={}", traceLogDirPath);
// use shell (rm -rf or find) may cause pod terminate with error code 137 on mounted disk
Files.deleteDir(traceLogDirPath);
}
}

public String actionLogPath(LocalDate date) {
Expand All @@ -87,10 +63,6 @@ public String eventPath(LocalDate date) {
return Strings.format("/event/{}/event-{}-{}.ndjson", date.getYear(), date, hash);
}

public String traceLogPath(LocalDateTime now, String app, String id) {
return Strings.format("/trace/{}/{}/{}/{}.txt", now.toLocalDate(), app, now.getHour(), id);
}

public Path initializeLogFilePath(String logPath) throws IOException {
Path path = Path.of(logDir.toString(), logPath);
Path parent = path.getParent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.junit.jupiter.api.condition.OS;

import java.time.LocalDate;
import java.time.LocalDateTime;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -41,13 +40,6 @@ void eventPath() {
.matches("/event/2022/event-2022-11-03-[a-z0-9]*.ndjson");
}

@Test
void traceArchivePath() {
LocalDateTime now = LocalDateTime.parse("2022-11-03T02:00:00");
assertThat(archiveService.traceLogPath(now, "service", "id"))
.isEqualTo("/trace/2022-11-03/service/2/id.txt");
}

@Test
@EnabledOnOs({OS.MAC, OS.LINUX})
void cleanupArchive() {
Expand Down

0 comments on commit ab5e75a

Please sign in to comment.