Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

osbuildmonitor: add new package to monitor osbuild output #1047

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/osbuild/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package osbuild

type (
StatusJSON = statusJSON
)
180 changes: 180 additions & 0 deletions pkg/osbuild/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package osbuild

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"time"
)

// Status is a high level aggregation of the low-level osbuild monitor
// messages. It is more structured and meant to be used by UI frontends.
//
// this is intentionally minimal at the beginning until we figure
// out the best API, exposing the jsonseq direct feels too messy
// and based on what we learn here we may consider tweaking
// the osbuild progress
type Status struct {
// Trace contains a single log line, usually very low-level or
// stage output but useful for e.g. bug reporting. Should in
// general not be displayed to the user but the concatenation
// of all "trace" lines should give the same information as
// running osbuild on a terminal
Trace string

// Message contains a high level user-visible message about
// e.g. a startus change
Message string

// Progress contains the current progress.
Progress *Progress

// Timestamp contains the timestamp the message was recieved in
Timestamp time.Time
}

// Progress provides progress information from an osbuild build.
// Each progress can have an arbitrary number of sub-progress information
//
// Note while those can be nested arbitrarly deep in practise
// we are at 2 levels currently:
// 1. overall pipeline progress
// 2. stages inside each pipeline
//
// we might get
// 3. stage progress (e.g. rpm install progress)
//
// in the future
type Progress struct {
// A human readable message about what is going on
Message string
// The amount of work already done
Done int
// The total amount of work for this (sub)progress
Total int

SubProgress *Progress
}

// NewStatusScanner returns a StatusScanner that can parse osbuild
// jsonseq monitor status messages
func NewStatusScanner(r io.Reader) *StatusScanner {
return &StatusScanner{
scanner: bufio.NewScanner(r),
contextMap: make(map[string]*contextJSON),
stageContextMap: make(map[string]*stageContextJSON),
}
}

// StatusScanner scan scan the osbuild jsonseq monitor output
type StatusScanner struct {
scanner *bufio.Scanner
contextMap map[string]*contextJSON
stageContextMap map[string]*stageContextJSON
}

// Status returns a single status struct from the scanner or nil
// if the end of the status reporting is reached.
func (sr *StatusScanner) Status() (*Status, error) {
if !sr.scanner.Scan() {
return nil, sr.scanner.Err()
}

var status statusJSON
line := sr.scanner.Bytes()
line = bytes.Trim(line, "\x1e")
if err := json.Unmarshal(line, &status); err != nil {
return nil, fmt.Errorf("cannot scan line %q: %w", line, err)
}
// keep track of the context
id := status.Context.ID
context := sr.contextMap[id]
if context == nil {
sr.contextMap[id] = &status.Context
context = &status.Context
}
ts := time.UnixMilli(int64(status.Timestamp * 1000))
pipelineName := context.Pipeline.Name

var trace, msg string
// This is a convention, "osbuild.montior" sends the high level
// status, the other messages contain low-level stdout/stderr
// output from individual stages like "org.osbuild.rpm".
if context.Origin == "osbuild.monitor" {
msg = strings.TrimSpace(status.Message)
} else {
trace = strings.TrimSpace(status.Message)
}

st := &Status{
Trace: trace,
Message: msg,
Progress: &Progress{
Done: status.Progress.Done,
Total: status.Progress.Total,
Message: fmt.Sprintf("Pipeline %s", pipelineName),
},
Timestamp: ts,
}

// add subprogress
stageID := context.Pipeline.Stage.ID
stageContext := sr.stageContextMap[stageID]
if stageContext == nil {
sr.stageContextMap[id] = &context.Pipeline.Stage
stageContext = &context.Pipeline.Stage
}
stageName := fmt.Sprintf("Stage %s", stageContext.Name)
prog := st.Progress
for subProg := status.Progress.SubProgress; subProg != nil; subProg = subProg.SubProgress {
prog.SubProgress = &Progress{
Done: subProg.Done,
Total: subProg.Total,
Message: stageName,
}
prog = prog.SubProgress
}

return st, nil
}

// statusJSON is a single status entry from the osbuild monitor
type statusJSON struct {
Context contextJSON `json:"context"`
Progress progressJSON `json:"progress"`
// Add "Result" here once
// https://github.com/osbuild/osbuild/pull/1831 is merged

Message string `json:"message"`
Timestamp float64 `json:"timestamp"`
}

// contextJSON is the context for which a status is given. Once a context
// was sent to the user from then on it is only referenced by the ID
type contextJSON struct {
Origin string `json:"origin"`
ID string `json:"id"`
Pipeline struct {
ID string `json:"id"`
Name string `json:"name"`
Stage stageContextJSON `json:"stage"`
} `json:"pipeline"`
}

type stageContextJSON struct {
Name string `json:"name"`
ID string `json:"id"`
}

// progress is the progress information associcated with a given status.
// The details about nesting are the same as for "Progress" above.
type progressJSON struct {
Name string `json:"name"`
Total int `json:"total"`
Done int `json:"done"`

SubProgress *progressJSON `json:"progress"`
}
56 changes: 56 additions & 0 deletions pkg/osbuild/monitor_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package osbuild_test

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"

"github.com/osbuild/images/pkg/osbuild"
)

const osbuildMonitorJSON_1 = `
{
"message": "Top level message",
"context": {
"origin": "osbuild.monitor",
"pipeline": {
"name": "source org.osbuild.curl",
"id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c",
"stage": {}
},
"id": "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2"
},
"timestamp": 1731585664.9090264,
"progress": {
"name": "name",
"total": 4,
"done": 1,
"progress": {
"name": "nested-name",
"total": 8,
"done": 2,
"progress": {
"name": "nested-nested-name",
"total": 16,
"done": 4
}
}
}
}`

func TestOsbuildStatusNestingWorks(t *testing.T) {
var status osbuild.StatusJSON

err := json.Unmarshal([]byte(osbuildMonitorJSON_1), &status)
assert.NoError(t, err)
assert.Equal(t, "Top level message", status.Message)
assert.Equal(t, "name", status.Progress.Name)
assert.Equal(t, "69816755441434713b7567970edfdd42d58193f163e1fdd506274d52246e87f2", status.Context.ID)
assert.Equal(t, 4, status.Progress.Total)
assert.Equal(t, "nested-name", status.Progress.SubProgress.Name)
assert.Equal(t, 8, status.Progress.SubProgress.Total)
assert.Equal(t, "nested-nested-name", status.Progress.SubProgress.SubProgress.Name)
assert.Equal(t, 16, status.Progress.SubProgress.SubProgress.Total)
assert.Nil(t, status.Progress.SubProgress.SubProgress.SubProgress)
}
118 changes: 118 additions & 0 deletions pkg/osbuild/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package osbuild_test

import (
"bytes"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/osbuild/images/pkg/osbuild"
)

const osbuildMonitorLines_curl = `{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm\n", "context": {"origin": "org.osbuild", "pipeline": {"name": "source org.osbuild.curl", "id": "598849389c35f93efe2412446f5ca6919434417b9bcea040ea5f9203de81db2c", "stage": {}}, "id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8252223}
{"message": "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm\n", "context": {"id": "7355d3857aa5c7b3a0c476c13d4b242a625fe190f2e7796df2335f3a34429db3"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 0}, "timestamp": 1731589338.8256931}
{"message": "Starting pipeline build", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {}}, "id": "0020bdf60135d4a03d8db333f66d40386278bf55b39fd06ed18839da11d98f96"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 2, "done": 0}}, "timestamp": 1731589407.0338647}`

func TestScannerSimple(t *testing.T) {
ts1 := 1731589338.8252223 * 1000
ts2 := 1731589338.8256931 * 1000
ts3 := 1731589407.0338647 * 1000

r := bytes.NewBufferString(osbuildMonitorLines_curl)
scanner := osbuild.NewStatusScanner(r)
// first line
st, err := scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/k/kpartx-0.9.5-2.fc39.x86_64.rpm",
Progress: &osbuild.Progress{
Done: 0,
Total: 4,
Message: "Pipeline source org.osbuild.curl",
},
Timestamp: time.UnixMilli(int64(ts1)),
}, st)
// second line
st, err = scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Trace: "source/org.osbuild.curl (org.osbuild.curl): Downloaded https://rpmrepo.osbuild.org/v2/mirror/public/f39/f39-x86_64-fedora-20231109/Packages/l/langpacks-fonts-en-4.0-9.fc39.noarch.rpm",
Progress: &osbuild.Progress{
Done: 0,
Total: 4,
Message: "Pipeline source org.osbuild.curl",
},
Timestamp: time.UnixMilli(int64(ts2)),
}, st)
// third line
st, err = scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Message: "Starting pipeline build",
Progress: &osbuild.Progress{
Done: 1,
Total: 4,
Message: "Pipeline build",
SubProgress: &osbuild.Progress{
Message: "Stage ",
Done: 0,
Total: 2,
},
},
Timestamp: time.UnixMilli(int64(ts3)),
}, st)
// end
st, err = scanner.Status()
assert.NoError(t, err)
assert.Nil(t, st)
}

const osbuildMontiorLines_subprogress = `{"message": "Starting module org.osbuild.rpm", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "build", "id": "32e87da44d9a519e89770723a33b7ecdd4ab85b872ae6ab8aaa94bdef9a275c7", "stage": {"name": "org.osbuild.rpm", "id": "bf00d0e1e216ffb796de06a1a7e9bb947d5a357f3f18ffea41a5611ee3ee0eac"}}, "id": "04c5aad63ba70bc39df10ad208cff66a108e44458e44eea41b305aee7a533877"}, "progress": {"name": "pipelines/sources", "total": 4, "done": 1, "progress": {"name": "pipeline: build", "total": 8, "done": 2, "progress": {"name": "sub-sub-progress", "total": 16, "done": 4}}}, "timestamp": 1731600115.148399}
`

func TestScannerSubprogress(t *testing.T) {
ts1 := 1731600115.14839 * 1000

r := bytes.NewBufferString(osbuildMontiorLines_subprogress)
scanner := osbuild.NewStatusScanner(r)
st, err := scanner.Status()
assert.NoError(t, err)
assert.Equal(t, &osbuild.Status{
Message: "Starting module org.osbuild.rpm",
Progress: &osbuild.Progress{
Done: 1,
Total: 4,
Message: "Pipeline build",
SubProgress: &osbuild.Progress{
Done: 2,
Total: 8,
Message: "Stage org.osbuild.rpm",
SubProgress: &osbuild.Progress{
Done: 4,
Total: 16,
Message: "Stage org.osbuild.rpm",
},
},
},
Timestamp: time.UnixMilli(int64(ts1)),
}, st)
}

func TestScannerSmoke(t *testing.T) {
f, err := os.Open("../../test/data/osbuild-monitor-output.json")
require.NoError(t, err)
defer f.Close()

scanner := osbuild.NewStatusScanner(f)
for {
st, err := scanner.Status()
assert.NoError(t, err)
if st == nil {
break
}
assert.NotEqual(t, time.Time{}, st.Timestamp)
}
}
Loading
Loading