Skip to content

Commit

Permalink
osbuild: add support for jsonseq monitor/status message
Browse files Browse the repository at this point in the history
This commit adds support for scanning/parsing the jsonseq based
monitoring that osbuild provides. This is useful for machine
readable status reporting and progress information for e.g.
`bootc-image-builder`.

The API consists of a new `osbuild.Status` struct that contains
parsed details about the osbuild status/progress and a scanner
that can scan a stream of osbuild jsonseq based progress and
emit `osbuild.Status` structs.

Note that the status is broadly of two categories:
1. information amining at a user interface (like
   `status.Message` or `status.Progress`)
2. low-level details for logging/error reporting (like
   `status.Trace` and `status.Timestamp`

Note that error handline a bit weak right now. The consumer
of the API will not get an error via the monitor currently
but only know that something went wrong from osbuild exiting
and then the consumer needs to reconstruct the error log from
the trace messages. This is not a show-stopper but we could
improve this by merging osbuild/osbuild#1831
and adding `Result` to the `statusJSON` and `Status` (but this
needs more thinking and probably best done in followups).

See https://github.com/osbuild/bootc-image-builder/compare/main...mvo5:progress3?expand=1 how this will be used.
  • Loading branch information
mvo5 committed Dec 5, 2024
1 parent ec1a983 commit 5ecf4b5
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/osbuild/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package osbuild

type (
StatusJSON = statusJSON
)
180 changes: 180 additions & 0 deletions pkg/osbuild/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package osbuild

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"time"
)

// Status is a high level aggregation of the low-level osbuild monitor
// messages. It is more structured and meant to be used by UI frontends.
//
// this is intentionally minimal at the beginning until we figure
// out the best API, exposing the jsonseq direct feels too messy
// and based on what we learn here we may consider tweaking
// the osbuild progress
type Status struct {
// Trace contains a single log line, usually very low-level or
// stage output but useful for e.g. bug reporting. Should in
// general not be displayed to the user but the concatenation
// of all "trace" lines should give the same information as
// running osbuild on a terminal
Trace string

// Message contains a high level user-visible message about
// e.g. a startus change
Message string

// Progress contains the current progress.
Progress *Progress

// Timestamp contains the timestamp the message was recieved in
Timestamp time.Time
}

// Progress provides progress information from an osbuild build.
// Each progress can have an arbitrary number of sub-progress information
//
// Note while those can be nested arbitrarly deep in practise
// we are at 2 levels currently:
// 1. overall pipeline progress
// 2. stages inside each pipeline
//
// we might get
// 3. stage progress (e.g. rpm install progress)
//
// in the future
type Progress struct {
// A human readable message about what is going on
Message string
// The amount of work already done
Done int
// The total amount of work for this (sub)progress
Total int

SubProgress *Progress
}

// NewStatusScanner returns a StatusScanner that can parse osbuild
// jsonseq monitor status messages
func NewStatusScanner(r io.Reader) *StatusScanner {
return &StatusScanner{
scanner: bufio.NewScanner(r),
contextMap: make(map[string]*contextJSON),
stageContextMap: make(map[string]*stageContextJSON),
}
}

// StatusScanner scan scan the osbuild jsonseq monitor output
type StatusScanner struct {
scanner *bufio.Scanner
contextMap map[string]*contextJSON
stageContextMap map[string]*stageContextJSON
}

// Status returns a single status struct from the scanner or nil
// if the end of the status reporting is reached.
func (sr *StatusScanner) Status() (*Status, error) {
if !sr.scanner.Scan() {
return nil, sr.scanner.Err()
}

var status statusJSON
line := sr.scanner.Bytes()
line = bytes.Trim(line, "\x1e")
if err := json.Unmarshal(line, &status); err != nil {
return nil, fmt.Errorf("cannot scan line %q: %w", line, err)
}
// keep track of the context
id := status.Context.ID
context := sr.contextMap[id]
if context == nil {
sr.contextMap[id] = &status.Context
context = &status.Context
}
ts := time.UnixMilli(int64(status.Timestamp * 1000))
pipelineName := context.Pipeline.Name

var trace, msg string
// This is a convention, "osbuild.montior" sends the high level
// status, the other messages contain low-level stdout/stderr
// output from individual stages like "org.osbuild.rpm".
if context.Origin == "osbuild.monitor" {
msg = strings.TrimSpace(status.Message)
} else {
trace = strings.TrimSpace(status.Message)
}

st := &Status{
Trace: trace,
Message: msg,
Progress: &Progress{
Done: status.Progress.Done,
Total: status.Progress.Total,
Message: fmt.Sprintf("Pipeline %s", pipelineName),
},
Timestamp: ts,
}

// add subprogress
stageID := context.Pipeline.Stage.ID
stageContext := sr.stageContextMap[stageID]
if stageContext == nil {
sr.stageContextMap[id] = &context.Pipeline.Stage
stageContext = &context.Pipeline.Stage
}
stageName := fmt.Sprintf("Stage %s", stageContext.Name)
prog := st.Progress
for subProg := status.Progress.SubProgress; subProg != nil; subProg = subProg.SubProgress {
prog.SubProgress = &Progress{
Done: subProg.Done,
Total: subProg.Total,
Message: stageName,
}
prog = prog.SubProgress
}

return st, nil
}

// statusJSON is a single status entry from the osbuild monitor
type statusJSON struct {
Context contextJSON `json:"context"`
Progress progressJSON `json:"progress"`
// Add "Result" here once
// https://github.com/osbuild/osbuild/pull/1831 is merged

Message string `json:"message"`
Timestamp float64 `json:"timestamp"`
}

// contextJSON is the context for which a status is given. Once a context
// was sent to the user from then on it is only referenced by the ID
type contextJSON struct {
Origin string `json:"origin"`
ID string `json:"id"`
Pipeline struct {
ID string `json:"id"`
Name string `json:"name"`
Stage stageContextJSON `json:"stage"`
} `json:"pipeline"`
}

type stageContextJSON struct {
Name string `json:"name"`
ID string `json:"id"`
}

// progress is the progress information associcated with a given status.
// The details about nesting are the same as for "Progress" above.
type progressJSON struct {
Name string `json:"name"`
Total int `json:"total"`
Done int `json:"done"`

SubProgress *progressJSON `json:"progress"`
}
56 changes: 56 additions & 0 deletions pkg/osbuild/monitor_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package osbuild_test

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"

"github.com/osbuild/images/pkg/osbuild"
)

const osbuildMonitorJSON_1 = `
{
"message": "Top level message",
"context": {
"origin": "osbuild.monitor",
"pipeline": {
"name": "source org.osbuild.curl",
"id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c",
"stage": {}
},
"id": "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2"
},
"timestamp": 1731585664.9090264,
"progress": {
"name": "name",
"total": 4,
"done": 1,
"progress": {
"name": "nested-name",
"total": 8,
"done": 2,
"progress": {
"name": "nested-nested-name",
"total": 16,
"done": 4
}
}
}
}`

func TestOsbuildStatusNestingWorks(t *testing.T) {
var status osbuild.StatusJSON

err := json.Unmarshal([]byte(osbuildMonitorJSON_1), &status)
assert.NoError(t, err)
assert.Equal(t, "Top level message", status.Message)
assert.Equal(t, "name", status.Progress.Name)
assert.Equal(t, "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2", status.Context.ID)
assert.Equal(t, 4, status.Progress.Total)
assert.Equal(t, "nested-name", status.Progress.SubProgress.Name)
assert.Equal(t, 8, status.Progress.SubProgress.Total)
assert.Equal(t, "nested-nested-name", status.Progress.SubProgress.SubProgress.Name)
assert.Equal(t, 16, status.Progress.SubProgress.SubProgress.Total)
assert.Nil(t, status.Progress.SubProgress.SubProgress.SubProgress)
}
118 changes: 118 additions & 0 deletions pkg/osbuild/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package osbuild_test

import (
"bytes"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/osbuild/images/pkg/osbuild"
)

const osbuildMonitorLines_curl = `{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm\n", "context": {"origin": "org.osbuild", "pipeline": {"name": "source org.osbuild.curl", "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", "stage": {}}, "id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8252223}
{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm\n", "context": {"id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8256931}
{"message": "Starting pipeline build", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {}}, "id": "0020bdf60135d4a03d8db333f66d40386278bf55b39fd06ed18839da11d98f96"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 2, "done": 0}}, "timestamp": 1731589407.0338647}`

func TestScannerSimple(t *testing.T) {
ts1 := 1731589338.8252223 * 1000
ts2 := 1731589338.8256931 * 1000
ts3 := 1731589407.0338647 * 1000

r := bytes.NewBufferString(osbuildMonitorLines_curl)
scanner := osbuild.NewStatusScanner(r)
// first line
st, err := scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm",
Progress: &osbuild.Progress{
Done: 0,
Total: 4,
Message: "Pipeline source org.osbuild.curl",
},
Timestamp: time.UnixMilli(int64(ts1)),
}, st)
// second line
st, err = scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm",
Progress: &osbuild.Progress{
Done: 0,
Total: 4,
Message: "Pipeline source org.osbuild.curl",
},
Timestamp: time.UnixMilli(int64(ts2)),
}, st)
// third line
st, err = scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Message: "Starting pipeline build",
Progress: &osbuild.Progress{
Done: 1,
Total: 4,
Message: "Pipeline build",
SubProgress: &osbuild.Progress{
Message: "Stage ",
Done: 0,
Total: 2,
},
},
Timestamp: time.UnixMilli(int64(ts3)),
}, st)
// end
st, err = scanner.Status()
assert.NoError(t, err)
assert.Nil(t, st)
}

const osbuildMontiorLines_subprogress = `{"message": "Starting module org.osbuild.rpm", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {"name": "org.osbuild.rpm", "id": "bf00d0e1e216ffb796de06a1a7e9bb947d5a357f3f18ffea41a5611ee3ee0eac"}}, "id": "04c5aad63ba70bc39df10ad208cff66a108e44458e44eea41b305aee7a533877"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 8, "done": 2, "progress": {"name": "sub-sub-progress", "total": 16, "done": 4}}}, "timestamp": 1731600115.148399}
`

func TestScannerSubprogress(t *testing.T) {
ts1 := 1731600115.14839 * 1000

r := bytes.NewBufferString(osbuildMontiorLines_subprogress)
scanner := osbuild.NewStatusScanner(r)
st, err := scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Message: "Starting module org.osbuild.rpm",
Progress: &osbuild.Progress{
Done: 1,
Total: 4,
Message: "Pipeline build",
SubProgress: &osbuild.Progress{
Done: 2,
Total: 8,
Message: "Stage org.osbuild.rpm",
SubProgress: &osbuild.Progress{
Done: 4,
Total: 16,
Message: "Stage org.osbuild.rpm",
},
},
},
Timestamp: time.UnixMilli(int64(ts1)),
}, st)
}

func TestScannerSmoke(t *testing.T) {
f, err := os.Open("../../test/data/osbuild-monitor-output.json")
require.NoError(t, err)
defer f.Close()

scanner := osbuild.NewStatusScanner(f)
for {
st, err := scanner.Status()
assert.NoError(t, err)
if st == nil {
break
}
assert.NotEqual(t, time.Time{}, st.Timestamp)
}
}

0 comments on commit 5ecf4b5

Please sign in to comment.