From 5ecf4b58587abec9a0e1c6b02ab123d9fb91da92 Mon Sep 17 00:00:00 2001 From: Michael Vogt Date: Fri, 15 Nov 2024 15:56:55 +0100 Subject: [PATCH] osbuild: add support for jsonseq monitor/status message This commit adds support for scanning/parsing the jsonseq based monitoring that osbuild provides. This is useful for machine readable status reporting and progress information for e.g. `bootc-image-builder`. The API consists of a new `osbuild.Status` struct that contains parsed details about the osbuild status/progress and a scanner that can scan a stream of osbuild jsonseq based progress and emit `osbuild.Status` structs. Note that the status is broadly of two categories: 1. information amining at a user interface (like `status.Message` or `status.Progress`) 2. low-level details for logging/error reporting (like `status.Trace` and `status.Timestamp` Note that error handline a bit weak right now. The consumer of the API will not get an error via the monitor currently but only know that something went wrong from osbuild exiting and then the consumer needs to reconstruct the error log from the trace messages. This is not a show-stopper but we could improve this by merging https://github.com/osbuild/osbuild/pull/1831 and adding `Result` to the `statusJSON` and `Status` (but this needs more thinking and probably best done in followups). See https://github.com/osbuild/bootc-image-builder/compare/main...mvo5:progress3?expand=1 how this will be used. --- pkg/osbuild/export_test.go | 5 + pkg/osbuild/monitor.go | 180 +++++++++++++++++++++++++++++++ pkg/osbuild/monitor_json_test.go | 56 ++++++++++ pkg/osbuild/monitor_test.go | 118 ++++++++++++++++++++ 4 files changed, 359 insertions(+) create mode 100644 pkg/osbuild/export_test.go create mode 100644 pkg/osbuild/monitor.go create mode 100644 pkg/osbuild/monitor_json_test.go create mode 100644 pkg/osbuild/monitor_test.go diff --git a/pkg/osbuild/export_test.go b/pkg/osbuild/export_test.go new file mode 100644 index 0000000000..8376cc17fa --- /dev/null +++ b/pkg/osbuild/export_test.go @@ -0,0 +1,5 @@ +package osbuild + +type ( + StatusJSON = statusJSON +) diff --git a/pkg/osbuild/monitor.go b/pkg/osbuild/monitor.go new file mode 100644 index 0000000000..72c8fdb612 --- /dev/null +++ b/pkg/osbuild/monitor.go @@ -0,0 +1,180 @@ +package osbuild + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + "time" +) + +// Status is a high level aggregation of the low-level osbuild monitor +// messages. It is more structured and meant to be used by UI frontends. +// +// this is intentionally minimal at the beginning until we figure +// out the best API, exposing the jsonseq direct feels too messy +// and based on what we learn here we may consider tweaking +// the osbuild progress +type Status struct { + // Trace contains a single log line, usually very low-level or + // stage output but useful for e.g. bug reporting. Should in + // general not be displayed to the user but the concatenation + // of all "trace" lines should give the same information as + // running osbuild on a terminal + Trace string + + // Message contains a high level user-visible message about + // e.g. a startus change + Message string + + // Progress contains the current progress. + Progress *Progress + + // Timestamp contains the timestamp the message was recieved in + Timestamp time.Time +} + +// Progress provides progress information from an osbuild build. +// Each progress can have an arbitrary number of sub-progress information +// +// Note while those can be nested arbitrarly deep in practise +// we are at 2 levels currently: +// 1. overall pipeline progress +// 2. stages inside each pipeline +// +// we might get +// 3. stage progress (e.g. rpm install progress) +// +// in the future +type Progress struct { + // A human readable message about what is going on + Message string + // The amount of work already done + Done int + // The total amount of work for this (sub)progress + Total int + + SubProgress *Progress +} + +// NewStatusScanner returns a StatusScanner that can parse osbuild +// jsonseq monitor status messages +func NewStatusScanner(r io.Reader) *StatusScanner { + return &StatusScanner{ + scanner: bufio.NewScanner(r), + contextMap: make(map[string]*contextJSON), + stageContextMap: make(map[string]*stageContextJSON), + } +} + +// StatusScanner scan scan the osbuild jsonseq monitor output +type StatusScanner struct { + scanner *bufio.Scanner + contextMap map[string]*contextJSON + stageContextMap map[string]*stageContextJSON +} + +// Status returns a single status struct from the scanner or nil +// if the end of the status reporting is reached. +func (sr *StatusScanner) Status() (*Status, error) { + if !sr.scanner.Scan() { + return nil, sr.scanner.Err() + } + + var status statusJSON + line := sr.scanner.Bytes() + line = bytes.Trim(line, "\x1e") + if err := json.Unmarshal(line, &status); err != nil { + return nil, fmt.Errorf("cannot scan line %q: %w", line, err) + } + // keep track of the context + id := status.Context.ID + context := sr.contextMap[id] + if context == nil { + sr.contextMap[id] = &status.Context + context = &status.Context + } + ts := time.UnixMilli(int64(status.Timestamp * 1000)) + pipelineName := context.Pipeline.Name + + var trace, msg string + // This is a convention, "osbuild.montior" sends the high level + // status, the other messages contain low-level stdout/stderr + // output from individual stages like "org.osbuild.rpm". + if context.Origin == "osbuild.monitor" { + msg = strings.TrimSpace(status.Message) + } else { + trace = strings.TrimSpace(status.Message) + } + + st := &Status{ + Trace: trace, + Message: msg, + Progress: &Progress{ + Done: status.Progress.Done, + Total: status.Progress.Total, + Message: fmt.Sprintf("Pipeline %s", pipelineName), + }, + Timestamp: ts, + } + + // add subprogress + stageID := context.Pipeline.Stage.ID + stageContext := sr.stageContextMap[stageID] + if stageContext == nil { + sr.stageContextMap[id] = &context.Pipeline.Stage + stageContext = &context.Pipeline.Stage + } + stageName := fmt.Sprintf("Stage %s", stageContext.Name) + prog := st.Progress + for subProg := status.Progress.SubProgress; subProg != nil; subProg = subProg.SubProgress { + prog.SubProgress = &Progress{ + Done: subProg.Done, + Total: subProg.Total, + Message: stageName, + } + prog = prog.SubProgress + } + + return st, nil +} + +// statusJSON is a single status entry from the osbuild monitor +type statusJSON struct { + Context contextJSON `json:"context"` + Progress progressJSON `json:"progress"` + // Add "Result" here once + // https://github.com/osbuild/osbuild/pull/1831 is merged + + Message string `json:"message"` + Timestamp float64 `json:"timestamp"` +} + +// contextJSON is the context for which a status is given. Once a context +// was sent to the user from then on it is only referenced by the ID +type contextJSON struct { + Origin string `json:"origin"` + ID string `json:"id"` + Pipeline struct { + ID string `json:"id"` + Name string `json:"name"` + Stage stageContextJSON `json:"stage"` + } `json:"pipeline"` +} + +type stageContextJSON struct { + Name string `json:"name"` + ID string `json:"id"` +} + +// progress is the progress information associcated with a given status. +// The details about nesting are the same as for "Progress" above. +type progressJSON struct { + Name string `json:"name"` + Total int `json:"total"` + Done int `json:"done"` + + SubProgress *progressJSON `json:"progress"` +} diff --git a/pkg/osbuild/monitor_json_test.go b/pkg/osbuild/monitor_json_test.go new file mode 100644 index 0000000000..821a14a5f0 --- /dev/null +++ b/pkg/osbuild/monitor_json_test.go @@ -0,0 +1,56 @@ +package osbuild_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/osbuild/images/pkg/osbuild" +) + +const osbuildMonitorJSON_1 = ` +{ + "message": "Top level message", + "context": { + "origin": "osbuild.monitor", + "pipeline": { + "name": "source org.osbuild.curl", + "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", + "stage": {} + }, + "id": "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2" + }, + "timestamp": 1731585664.9090264, + "progress": { + "name": "name", + "total": 4, + "done": 1, + "progress": { + "name": "nested-name", + "total": 8, + "done": 2, + "progress": { + "name": "nested-nested-name", + "total": 16, + "done": 4 + } + } + } +}` + +func TestOsbuildStatusNestingWorks(t *testing.T) { + var status osbuild.StatusJSON + + err := json.Unmarshal([]byte(osbuildMonitorJSON_1), &status) + assert.NoError(t, err) + assert.Equal(t, "Top level message", status.Message) + assert.Equal(t, "name", status.Progress.Name) + assert.Equal(t, "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2", status.Context.ID) + assert.Equal(t, 4, status.Progress.Total) + assert.Equal(t, "nested-name", status.Progress.SubProgress.Name) + assert.Equal(t, 8, status.Progress.SubProgress.Total) + assert.Equal(t, "nested-nested-name", status.Progress.SubProgress.SubProgress.Name) + assert.Equal(t, 16, status.Progress.SubProgress.SubProgress.Total) + assert.Nil(t, status.Progress.SubProgress.SubProgress.SubProgress) +} diff --git a/pkg/osbuild/monitor_test.go b/pkg/osbuild/monitor_test.go new file mode 100644 index 0000000000..62a0cba304 --- /dev/null +++ b/pkg/osbuild/monitor_test.go @@ -0,0 +1,118 @@ +package osbuild_test + +import ( + "bytes" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/osbuild/images/pkg/osbuild" +) + +const osbuildMonitorLines_curl = `{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm\n", "context": {"origin": "org.osbuild", "pipeline": {"name": "source org.osbuild.curl", "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", "stage": {}}, "id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8252223} +{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm\n", "context": {"id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8256931} +{"message": "Starting pipeline build", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {}}, "id": "0020bdf60135d4a03d8db333f66d40386278bf55b39fd06ed18839da11d98f96"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 2, "done": 0}}, "timestamp": 1731589407.0338647}` + +func TestScannerSimple(t *testing.T) { + ts1 := 1731589338.8252223 * 1000 + ts2 := 1731589338.8256931 * 1000 + ts3 := 1731589407.0338647 * 1000 + + r := bytes.NewBufferString(osbuildMonitorLines_curl) + scanner := osbuild.NewStatusScanner(r) + // first line + st, err := scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm", + Progress: &osbuild.Progress{ + Done: 0, + Total: 4, + Message: "Pipeline source org.osbuild.curl", + }, + Timestamp: time.UnixMilli(int64(ts1)), + }, st) + // second line + st, err = scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm", + Progress: &osbuild.Progress{ + Done: 0, + Total: 4, + Message: "Pipeline source org.osbuild.curl", + }, + Timestamp: time.UnixMilli(int64(ts2)), + }, st) + // third line + st, err = scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Message: "Starting pipeline build", + Progress: &osbuild.Progress{ + Done: 1, + Total: 4, + Message: "Pipeline build", + SubProgress: &osbuild.Progress{ + Message: "Stage ", + Done: 0, + Total: 2, + }, + }, + Timestamp: time.UnixMilli(int64(ts3)), + }, st) + // end + st, err = scanner.Status() + assert.NoError(t, err) + assert.Nil(t, st) +} + +const osbuildMontiorLines_subprogress = `{"message": "Starting module org.osbuild.rpm", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {"name": "org.osbuild.rpm", "id": "bf00d0e1e216ffb796de06a1a7e9bb947d5a357f3f18ffea41a5611ee3ee0eac"}}, "id": "04c5aad63ba70bc39df10ad208cff66a108e44458e44eea41b305aee7a533877"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 8, "done": 2, "progress": {"name": "sub-sub-progress", "total": 16, "done": 4}}}, "timestamp": 1731600115.148399} +` + +func TestScannerSubprogress(t *testing.T) { + ts1 := 1731600115.14839 * 1000 + + r := bytes.NewBufferString(osbuildMontiorLines_subprogress) + scanner := osbuild.NewStatusScanner(r) + st, err := scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Message: "Starting module org.osbuild.rpm", + Progress: &osbuild.Progress{ + Done: 1, + Total: 4, + Message: "Pipeline build", + SubProgress: &osbuild.Progress{ + Done: 2, + Total: 8, + Message: "Stage org.osbuild.rpm", + SubProgress: &osbuild.Progress{ + Done: 4, + Total: 16, + Message: "Stage org.osbuild.rpm", + }, + }, + }, + Timestamp: time.UnixMilli(int64(ts1)), + }, st) +} + +func TestScannerSmoke(t *testing.T) { + f, err := os.Open("../../test/data/osbuild-monitor-output.json") + require.NoError(t, err) + defer f.Close() + + scanner := osbuild.NewStatusScanner(f) + for { + st, err := scanner.Status() + assert.NoError(t, err) + if st == nil { + break + } + assert.NotEqual(t, time.Time{}, st.Timestamp) + } +}