Skip to content

Commit

Permalink
feat: parse JSON body from pod log files (#122)
Browse files Browse the repository at this point in the history
Co-authored-by: CircleCI <[email protected]>
  • Loading branch information
harelmo-lumigo and CircleCI authored Nov 11, 2024
1 parent efbf81c commit 9bdd228
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
{{- define "clusterAgent.otelCollectorConfig" -}}
receivers:
filelog:
include:
- /var/log/pods/*/*/*.log
exclude:
- /var/log/pods/kube-system_*/*/*.log
- /var/log/pods/lumigo-system_*/*/*.log
# Exclude logs from the otlp-sink in tests (Kind cluster)
- /var/log/pods/local-path-storage_*/*/*.log
- /var/log/pods/otlp-sink_*/*/*.log
start_at: end
include_file_path: true
include_file_name: false
operators:
# Find out which format is used by kubernetes
- type: router
id: get-format
routes:
- output: parser-docker
expr: 'body matches "^\\{"'
- output: parser-crio
expr: 'body matches "^[^ Z]+ "'
- output: parser-containerd
expr: 'body matches "^[^ Z]+Z"'
# Parse CRI-O format
- type: regex_parser
id: parser-crio
regex:
'^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout_type: gotime
layout: '2006-01-02T15:04:05.999999999Z07:00'
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex:
'^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
# Parse Docker format
- type: json_parser
id: parser-docker
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
- type: move
id: overwrite-log-body
from: attributes["log"]
to: body
# best effort attempt to parse the body as a stringified JSON object
- type: json_parser
parse_to: body
on_error: send_quiet
# Extract metadata from file path
- type: regex_parser
id: extract-metadata-from-filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
parse_from: attributes["log.file.path"]
cache:
size: 128 # default maximum amount of Pods per Node is 110
# Rename attributes
- type: move
from: attributes.stream
to: attributes["log.iostream"]
- type: move
from: attributes.container_name
to: resource["k8s.container.name"]
- type: move
from: attributes.namespace
to: resource["k8s.namespace.name"]
- type: move
from: attributes.pod_name
to: resource["k8s.pod.name"]
- type: move
from: attributes.restart_count
to: resource["k8s.container.restart_count"]
- type: move
from: attributes.uid
to: resource["k8s.pod.uid"]
exporters:
otlphttp:
endpoint: ${env:LUMIGO_LOGS_ENDPOINT}
headers:
Authorization: LumigoToken ${env:LUMIGO_TOKEN}
{{- if .Values.debug.enabled }}
debug:
verbosity: detailed
{{- end }}
processors:
transform:
log_statements:
- context: log
statements:
- set(instrumentation_scope.name, "lumigo-operator.log_file_collector")
- set(resource.attributes["k8s.cluster.name"], "${env:KUBERNETES_CLUSTER_NAME}")
batch:
service:
{{- if .Values.debug.enabled }}
telemetry:
logs:
level: debug
{{- end }}
pipelines:
logs:
receivers:
- filelog
processors:
- transform
- batch
exporters:
- otlphttp
{{- if .Values.debug.enabled }}
- debug
{{- end }}
{{- end }}
118 changes: 2 additions & 116 deletions charts/lumigo-operator/templates/cluster-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,122 +4,8 @@ kind: ConfigMap
metadata:
name: {{ include "helm.fullname" . }}-cluster-agent-config
data:
otel-collector-config.yaml: |
receivers:
filelog:
include:
- /var/log/pods/*/*/*.log
exclude:
- /var/log/pods/kube-system_*/*/*.log
- /var/log/pods/lumigo-system_*/*/*.log
# Exclude logs from the otlp-sink in tests (Kind cluster)
- /var/log/pods/local-path-storage_*/*/*.log
- /var/log/pods/otlp-sink_*/*/*.log
start_at: end
include_file_path: true
include_file_name: false
operators:
# Find out which format is used by kubernetes
- type: router
id: get-format
routes:
- output: parser-docker
expr: 'body matches "^\\{"'
- output: parser-crio
expr: 'body matches "^[^ Z]+ "'
- output: parser-containerd
expr: 'body matches "^[^ Z]+Z"'
# Parse CRI-O format
- type: regex_parser
id: parser-crio
regex:
'^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout_type: gotime
layout: '2006-01-02T15:04:05.999999999Z07:00'
# Parse CRI-Containerd format
- type: regex_parser
id: parser-containerd
regex:
'^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
# Parse Docker format
- type: json_parser
id: parser-docker
output: overwrite-log-body
timestamp:
parse_from: attributes.time
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
- type: move
id: overwrite-log-body
from: attributes["log"]
to: body
# Extract metadata from file path
- type: regex_parser
id: extract-metadata-from-filepath
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
parse_from: attributes["log.file.path"]
cache:
size: 128 # default maximum amount of Pods per Node is 110
# Rename attributes
- type: move
from: attributes.stream
to: attributes["log.iostream"]
- type: move
from: attributes.container_name
to: resource["k8s.container.name"]
- type: move
from: attributes.namespace
to: resource["k8s.namespace.name"]
- type: move
from: attributes.pod_name
to: resource["k8s.pod.name"]
- type: move
from: attributes.restart_count
to: resource["k8s.container.restart_count"]
- type: move
from: attributes.uid
to: resource["k8s.pod.uid"]
exporters:
otlphttp:
endpoint: ${env:LUMIGO_LOGS_ENDPOINT}
headers:
Authorization: LumigoToken ${env:LUMIGO_TOKEN}
{{- if .Values.debug.enabled }}
debug:
verbosity: detailed
{{- end }}
processors:
transform:
log_statements:
- context: log
statements:
- set(instrumentation_scope.name, "lumigo-operator.log_file_collector")
- set(resource.attributes["k8s.cluster.name"], "${env:KUBERNETES_CLUSTER_NAME}")
batch:
service:
{{- if .Values.debug.enabled }}
telemetry:
logs:
level: debug
{{- end }}
pipelines:
logs:
receivers:
- filelog
processors:
- transform
- batch
exporters:
- otlphttp
{{- if .Values.debug.enabled }}
- debug
{{- end }}
otel-collector-config.yaml:
{{ include "clusterAgent.otelCollectorConfig" . | toYaml | nindent 4 }}
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
2 changes: 1 addition & 1 deletion tests/kubernetes-distros/kind/apps/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ COPY ./app.py /code/app.py

LABEL distro-version='DEV'

CMD ["python", "app.py", "something to say to the logs"]
CMD ["python", "app.py", "something to say to the logs", "json"]
5 changes: 4 additions & 1 deletion tests/kubernetes-distros/kind/apps/python/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
import time
import logging
import json
from lumigo_opentelemetry import logger_provider

logger = logging.getLogger("test")
Expand All @@ -12,6 +13,8 @@
logger.addHandler(console_handler)

while True:
logger.info(sys.argv[1] if len(sys.argv) > 1 else "Hello, World!")
message = sys.argv[1] if len(sys.argv) > 1 else "Hello, World!"
formatter = json.dumps if len(sys.argv) > 2 and sys.argv[2] == "json" else str
logger.info(formatter({"message": message}))
logger_provider.force_flush()
time.sleep(5)
17 changes: 14 additions & 3 deletions tests/kubernetes-distros/kind/lumigooperator_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

operatorv1alpha1 "github.com/lumigo-io/lumigo-kubernetes-operator/api/v1alpha1"
"github.com/lumigo-io/lumigo-kubernetes-operator/tests/kubernetes-distros/kind/internal"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
apimachinerywait "k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -522,17 +523,27 @@ func TestLumigoOperatorLogsEventsAndObjects(t *testing.T) {
return false, nil
}

foundFileLogRecord := false
for _, appLog := range applicationLogs {
value, found := appLog.Attributes().Get("log.file.path")
// Make sure the log came from log files and not the distro,
// as the logs.json file is shared between several tests reporting to the same OTLP sink
value, found := appLog.Attributes().Get("log.file.path")
if found && strings.HasPrefix(value.AsString(), "/var/log/pods/") {
foundFileLogRecord = true
t.Logf("Found application log: %s", appLog.Body().AsString())
return true, nil
// Make sure the log body was parsed as JSON by the cluster-agent collector
if appLog.Body().Type() == pcommon.ValueTypeMap {
t.Logf("Found a JSON-object application log: %v", appLog.Body().Map())
return true, nil
}
}
}

t.Fatal("No application logs found matching the 'log.file.path' attribute")
if (foundFileLogRecord) {
t.Fatalf("No application logs found with a JSON-object body")
} else {
t.Fatalf("No application logs found matching the 'log.file.path' attribute")
}

return false, nil
}); err != nil {
Expand Down

0 comments on commit 9bdd228

Please sign in to comment.