diff --git a/pkg/osbuild/export_test.go b/pkg/osbuild/export_test.go new file mode 100644 index 0000000000..8376cc17fa --- /dev/null +++ b/pkg/osbuild/export_test.go @@ -0,0 +1,5 @@ +package osbuild + +type ( + StatusJSON = statusJSON +) diff --git a/pkg/osbuild/monitor.go b/pkg/osbuild/monitor.go new file mode 100644 index 0000000000..72c8fdb612 --- /dev/null +++ b/pkg/osbuild/monitor.go @@ -0,0 +1,180 @@ +package osbuild + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "strings" + "time" +) + +// Status is a high level aggregation of the low-level osbuild monitor +// messages. It is more structured and meant to be used by UI frontends. +// +// this is intentionally minimal at the beginning until we figure +// out the best API, exposing the jsonseq direct feels too messy +// and based on what we learn here we may consider tweaking +// the osbuild progress +type Status struct { + // Trace contains a single log line, usually very low-level or + // stage output but useful for e.g. bug reporting. Should in + // general not be displayed to the user but the concatenation + // of all "trace" lines should give the same information as + // running osbuild on a terminal + Trace string + + // Message contains a high level user-visible message about + // e.g. a startus change + Message string + + // Progress contains the current progress. + Progress *Progress + + // Timestamp contains the timestamp the message was recieved in + Timestamp time.Time +} + +// Progress provides progress information from an osbuild build. +// Each progress can have an arbitrary number of sub-progress information +// +// Note while those can be nested arbitrarly deep in practise +// we are at 2 levels currently: +// 1. overall pipeline progress +// 2. stages inside each pipeline +// +// we might get +// 3. stage progress (e.g. rpm install progress) +// +// in the future +type Progress struct { + // A human readable message about what is going on + Message string + // The amount of work already done + Done int + // The total amount of work for this (sub)progress + Total int + + SubProgress *Progress +} + +// NewStatusScanner returns a StatusScanner that can parse osbuild +// jsonseq monitor status messages +func NewStatusScanner(r io.Reader) *StatusScanner { + return &StatusScanner{ + scanner: bufio.NewScanner(r), + contextMap: make(map[string]*contextJSON), + stageContextMap: make(map[string]*stageContextJSON), + } +} + +// StatusScanner scan scan the osbuild jsonseq monitor output +type StatusScanner struct { + scanner *bufio.Scanner + contextMap map[string]*contextJSON + stageContextMap map[string]*stageContextJSON +} + +// Status returns a single status struct from the scanner or nil +// if the end of the status reporting is reached. +func (sr *StatusScanner) Status() (*Status, error) { + if !sr.scanner.Scan() { + return nil, sr.scanner.Err() + } + + var status statusJSON + line := sr.scanner.Bytes() + line = bytes.Trim(line, "\x1e") + if err := json.Unmarshal(line, &status); err != nil { + return nil, fmt.Errorf("cannot scan line %q: %w", line, err) + } + // keep track of the context + id := status.Context.ID + context := sr.contextMap[id] + if context == nil { + sr.contextMap[id] = &status.Context + context = &status.Context + } + ts := time.UnixMilli(int64(status.Timestamp * 1000)) + pipelineName := context.Pipeline.Name + + var trace, msg string + // This is a convention, "osbuild.montior" sends the high level + // status, the other messages contain low-level stdout/stderr + // output from individual stages like "org.osbuild.rpm". + if context.Origin == "osbuild.monitor" { + msg = strings.TrimSpace(status.Message) + } else { + trace = strings.TrimSpace(status.Message) + } + + st := &Status{ + Trace: trace, + Message: msg, + Progress: &Progress{ + Done: status.Progress.Done, + Total: status.Progress.Total, + Message: fmt.Sprintf("Pipeline %s", pipelineName), + }, + Timestamp: ts, + } + + // add subprogress + stageID := context.Pipeline.Stage.ID + stageContext := sr.stageContextMap[stageID] + if stageContext == nil { + sr.stageContextMap[id] = &context.Pipeline.Stage + stageContext = &context.Pipeline.Stage + } + stageName := fmt.Sprintf("Stage %s", stageContext.Name) + prog := st.Progress + for subProg := status.Progress.SubProgress; subProg != nil; subProg = subProg.SubProgress { + prog.SubProgress = &Progress{ + Done: subProg.Done, + Total: subProg.Total, + Message: stageName, + } + prog = prog.SubProgress + } + + return st, nil +} + +// statusJSON is a single status entry from the osbuild monitor +type statusJSON struct { + Context contextJSON `json:"context"` + Progress progressJSON `json:"progress"` + // Add "Result" here once + // https://github.com/osbuild/osbuild/pull/1831 is merged + + Message string `json:"message"` + Timestamp float64 `json:"timestamp"` +} + +// contextJSON is the context for which a status is given. Once a context +// was sent to the user from then on it is only referenced by the ID +type contextJSON struct { + Origin string `json:"origin"` + ID string `json:"id"` + Pipeline struct { + ID string `json:"id"` + Name string `json:"name"` + Stage stageContextJSON `json:"stage"` + } `json:"pipeline"` +} + +type stageContextJSON struct { + Name string `json:"name"` + ID string `json:"id"` +} + +// progress is the progress information associcated with a given status. +// The details about nesting are the same as for "Progress" above. +type progressJSON struct { + Name string `json:"name"` + Total int `json:"total"` + Done int `json:"done"` + + SubProgress *progressJSON `json:"progress"` +} diff --git a/pkg/osbuild/monitor_json_test.go b/pkg/osbuild/monitor_json_test.go new file mode 100644 index 0000000000..821a14a5f0 --- /dev/null +++ b/pkg/osbuild/monitor_json_test.go @@ -0,0 +1,56 @@ +package osbuild_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/osbuild/images/pkg/osbuild" +) + +const osbuildMonitorJSON_1 = ` +{ + "message": "Top level message", + "context": { + "origin": "osbuild.monitor", + "pipeline": { + "name": "source org.osbuild.curl", + "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", + "stage": {} + }, + "id": "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2" + }, + "timestamp": 1731585664.9090264, + "progress": { + "name": "name", + "total": 4, + "done": 1, + "progress": { + "name": "nested-name", + "total": 8, + "done": 2, + "progress": { + "name": "nested-nested-name", + "total": 16, + "done": 4 + } + } + } +}` + +func TestOsbuildStatusNestingWorks(t *testing.T) { + var status osbuild.StatusJSON + + err := json.Unmarshal([]byte(osbuildMonitorJSON_1), &status) + assert.NoError(t, err) + assert.Equal(t, "Top level message", status.Message) + assert.Equal(t, "name", status.Progress.Name) + assert.Equal(t, "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2", status.Context.ID) + assert.Equal(t, 4, status.Progress.Total) + assert.Equal(t, "nested-name", status.Progress.SubProgress.Name) + assert.Equal(t, 8, status.Progress.SubProgress.Total) + assert.Equal(t, "nested-nested-name", status.Progress.SubProgress.SubProgress.Name) + assert.Equal(t, 16, status.Progress.SubProgress.SubProgress.Total) + assert.Nil(t, status.Progress.SubProgress.SubProgress.SubProgress) +} diff --git a/pkg/osbuild/monitor_test.go b/pkg/osbuild/monitor_test.go new file mode 100644 index 0000000000..62a0cba304 --- /dev/null +++ b/pkg/osbuild/monitor_test.go @@ -0,0 +1,118 @@ +package osbuild_test + +import ( + "bytes" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/osbuild/images/pkg/osbuild" +) + +const osbuildMonitorLines_curl = `{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm\n", "context": {"origin": "org.osbuild", "pipeline": {"name": "source org.osbuild.curl", "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", "stage": {}}, "id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8252223} +{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm\n", "context": {"id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8256931} +{"message": "Starting pipeline build", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {}}, "id": "0020bdf60135d4a03d8db333f66d40386278bf55b39fd06ed18839da11d98f96"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 2, "done": 0}}, "timestamp": 1731589407.0338647}` + +func TestScannerSimple(t *testing.T) { + ts1 := 1731589338.8252223 * 1000 + ts2 := 1731589338.8256931 * 1000 + ts3 := 1731589407.0338647 * 1000 + + r := bytes.NewBufferString(osbuildMonitorLines_curl) + scanner := osbuild.NewStatusScanner(r) + // first line + st, err := scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm", + Progress: &osbuild.Progress{ + Done: 0, + Total: 4, + Message: "Pipeline source org.osbuild.curl", + }, + Timestamp: time.UnixMilli(int64(ts1)), + }, st) + // second line + st, err = scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm", + Progress: &osbuild.Progress{ + Done: 0, + Total: 4, + Message: "Pipeline source org.osbuild.curl", + }, + Timestamp: time.UnixMilli(int64(ts2)), + }, st) + // third line + st, err = scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Message: "Starting pipeline build", + Progress: &osbuild.Progress{ + Done: 1, + Total: 4, + Message: "Pipeline build", + SubProgress: &osbuild.Progress{ + Message: "Stage ", + Done: 0, + Total: 2, + }, + }, + Timestamp: time.UnixMilli(int64(ts3)), + }, st) + // end + st, err = scanner.Status() + assert.NoError(t, err) + assert.Nil(t, st) +} + +const osbuildMontiorLines_subprogress = `{"message": "Starting module org.osbuild.rpm", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {"name": "org.osbuild.rpm", "id": "bf00d0e1e216ffb796de06a1a7e9bb947d5a357f3f18ffea41a5611ee3ee0eac"}}, "id": "04c5aad63ba70bc39df10ad208cff66a108e44458e44eea41b305aee7a533877"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 8, "done": 2, "progress": {"name": "sub-sub-progress", "total": 16, "done": 4}}}, "timestamp": 1731600115.148399} +` + +func TestScannerSubprogress(t *testing.T) { + ts1 := 1731600115.14839 * 1000 + + r := bytes.NewBufferString(osbuildMontiorLines_subprogress) + scanner := osbuild.NewStatusScanner(r) + st, err := scanner.Status() + assert.NoError(t, err) + assert.Equal(t, &osbuild.Status{ + Message: "Starting module org.osbuild.rpm", + Progress: &osbuild.Progress{ + Done: 1, + Total: 4, + Message: "Pipeline build", + SubProgress: &osbuild.Progress{ + Done: 2, + Total: 8, + Message: "Stage org.osbuild.rpm", + SubProgress: &osbuild.Progress{ + Done: 4, + Total: 16, + Message: "Stage org.osbuild.rpm", + }, + }, + }, + Timestamp: time.UnixMilli(int64(ts1)), + }, st) +} + +func TestScannerSmoke(t *testing.T) { + f, err := os.Open("../../test/data/osbuild-monitor-output.json") + require.NoError(t, err) + defer f.Close() + + scanner := osbuild.NewStatusScanner(f) + for { + st, err := scanner.Status() + assert.NoError(t, err) + if st == nil { + break + } + assert.NotEqual(t, time.Time{}, st.Timestamp) + } +}