Skip to content

Commit

Permalink
Merge pull request #301 from depot/feat/background-status-send
Browse files Browse the repository at this point in the history
feat: send --load messages in background task
  • Loading branch information
goller authored Oct 10, 2024
2 parents 6749db8 + 48d8af3 commit 54ffc06
Show file tree
Hide file tree
Showing 12 changed files with 712 additions and 388 deletions.
4 changes: 0 additions & 4 deletions pkg/buildx/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,6 @@ type DepotNodeResponse struct {
SolveResponse *client.SolveResponse
}

func Build(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer, dockerfileCallback DockerfileCallback, build *depotbuild.Build) (resp []DepotBuildResponse, err error) {
return BuildWithResultHandler(ctx, nodes, opt, docker, configDir, w, dockerfileCallback, nil, false, build)
}

func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[string]Options, docker *dockerutil.Client, configDir string, w progress.Writer, dockerfileCallback DockerfileCallback, resultHandleFunc func(driverIndex int, rCtx *ResultContext), allowNoOutput bool, build *depotbuild.Build) (resp []DepotBuildResponse, err error) {
if len(nodes) == 0 {
return nil, errors.Errorf("driver required for build")
Expand Down
3 changes: 2 additions & 1 deletion pkg/buildx/commands/bake.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func RunBake(dockerCli command.Cli, in BakeOptions, validator BakeValidator, pri
var err error
// Only load images from requested targets to avoid pulling unnecessary images.
if slices.Contains(requestedTargets, resp[i].Name) {
reportingPrinter := progresshelper.NewReportingWriter(printer, in.buildID, in.token)
reportingPrinter := progresshelper.NewReporter(ctx2, printer, in.buildID, in.token)
defer reportingPrinter.Close()
err = load.DepotFastLoad(ctx2, dockerCli.Client(), depotResponses, pullOpts, reportingPrinter)
}
load.DeleteExportLeases(ctx2, depotResponses)
Expand Down
4 changes: 3 additions & 1 deletion pkg/buildx/commands/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, nodes []builder.No
}

// NOTE: the err is returned at the end of this function after the final prints.
reportingPrinter := progresshelper.NewReportingWriter(printer, depotOpts.buildID, depotOpts.token)
reportingPrinter := progresshelper.NewReporter(ctx, printer, depotOpts.buildID, depotOpts.token)
err = load.DepotFastLoad(ctx, dockerCli.Client(), resp, pullOpts, reportingPrinter)
if err != nil && !errors.Is(err, context.Canceled) {
// For now, we will fallback by rebuilding with load.
Expand All @@ -342,6 +342,8 @@ func buildTargets(ctx context.Context, dockerCli command.Cli, nodes []builder.No
}
}
}
reportingPrinter.Close()

load.DeleteExportLeases(ctx, resp)

if err := printer.Wait(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/buildxdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (d *Driver) Bootstrap(ctx context.Context, reporter progress.Logger) error
}
d.cfg.Auth = depotbuild.NewAuthProvider(credentials, d.cfg.Auth)
}

reportingLogger := progresshelper.NewReportingLogger(reporter, buildID, token)
reportingLogger := progresshelper.NewReporterFromLogger(ctx, reporter, buildID, token)
defer reportingLogger.Close()

message := "[depot] launching " + platform + " machine"

Expand Down Expand Up @@ -137,10 +137,10 @@ func (d *Driver) Version(ctx context.Context) (string, error) {
return "", nil
}

func StartLog(message string, logger progress.Logger) func(err error) {
func StartLog(message string, logger *progresshelper.Reporter) func(err error) {
dgst := digest.FromBytes([]byte(identity.NewID()))
tm := time.Now()
logger(&client.SolveStatus{
logger.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{{
Digest: dgst,
Name: message,
Expand All @@ -154,7 +154,7 @@ func StartLog(message string, logger progress.Logger) func(err error) {
if err != nil {
errMsg = err.Error()
}
logger(&client.SolveStatus{
logger.Write(&client.SolveStatus{
Vertexes: []*client.Vertex{{
Digest: dgst,
Name: message,
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/buildctl/dial-stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func run() error {
}
state.Reporter = progresshelper.Tee(state.Reporter, status)

reportingWriter := progresshelper.NewReportingWriter(state.Reporter, build.ID, build.Token)
reportingWriter := progresshelper.NewReporter(ctx2, state.Reporter, build.ID, build.Token)
defer reportingWriter.Close()

state.SummaryURL = build.BuildURL
buildFinish = build.Finish
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewCmdExec(dockerCli command.Cli) *cobra.Command {
return buildErr
}

reportingWriter := progresshelper.NewReportingWriter(printer, build.ID, build.Token)
reportingWriter := progresshelper.NewReporter(printCtx, printer, build.ID, build.Token)

var builder *machine.Machine
buildErr = progresshelper.WithLog(reportingWriter, fmt.Sprintf("[depot] launching %s machine", platform), func() error {
Expand All @@ -99,6 +99,7 @@ func NewCmdExec(dockerCli command.Cli) *cobra.Command {
})
if buildErr != nil {
cancel()
reportingWriter.Close()
return buildErr
}

Expand All @@ -115,6 +116,7 @@ func NewCmdExec(dockerCli command.Cli) *cobra.Command {
return nil
})
cancel()
reportingWriter.Close()

listener, localAddr, buildErr := connection.LocalListener()
if buildErr != nil {
Expand Down
21 changes: 0 additions & 21 deletions pkg/progresshelper/api.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
package progresshelper

import (
"context"
"time"

"connectrpc.com/connect"
depotapi "github.com/depot/cli/pkg/api"
cliv1 "github.com/depot/cli/pkg/proto/depot/cli/v1"
cliv1connect "github.com/depot/cli/pkg/proto/depot/cli/v1/cliv1connect"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/client"
)

func reportToAPI(client cliv1connect.BuildServiceClient, status *client.SolveStatus, buildID, token string) {
if buildID != "" && token != "" {
req := &cliv1.ReportStatusRequest{
BuildId: buildID,
Statuses: []*controlapi.StatusResponse{toStatusResponse(status)},
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, _ = client.ReportStatus(ctx, depotapi.WithAuthentication(connect.NewRequest(req), token))
}
}

func toStatusResponse(status *client.SolveStatus) *controlapi.StatusResponse {
vertexes := make([]*controlapi.Vertex, 0, len(status.Vertexes))
for _, v := range status.Vertexes {
Expand Down
15 changes: 0 additions & 15 deletions pkg/progresshelper/reportinglogger.go

This file was deleted.

176 changes: 167 additions & 9 deletions pkg/progresshelper/reportingwriter.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,190 @@
package progresshelper

import (
"context"
"errors"
"sync"
"time"

depotapi "github.com/depot/cli/pkg/api"
"github.com/depot/cli/pkg/debuglog"
cliv1 "github.com/depot/cli/pkg/proto/depot/cli/v1"
cliv1connect "github.com/depot/cli/pkg/proto/depot/cli/v1/cliv1connect"
"github.com/docker/buildx/util/progress"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/client"
"github.com/opencontainers/go-digest"
)

var _ progress.Writer = (*reportingWriter)(nil)
var _ progress.Writer = (*Reporter)(nil)

type reportingWriter struct {
progress.Writer
type Reporter struct {
// Using a function so we can support oth progress.Writer and progress.Logger.
writer func(status *client.SolveStatus)
validate func(digest.Digest, interface{}) bool
clear func(interface{})

buildID string
token string
client cliv1connect.BuildServiceClient

ch chan *client.SolveStatus

closed bool
mu sync.Mutex
}

func NewReporter(ctx context.Context, w progress.Writer, buildID, token string) *Reporter {
r := &Reporter{
writer: w.Write,
validate: w.ValidateLogSource,
clear: w.ClearLogSource,
buildID: buildID,
token: token,
client: depotapi.NewBuildClient(),
ch: make(chan *client.SolveStatus, 16384),
}
go r.Run(ctx)

return r
}

func NewReportingWriter(w progress.Writer, buildID, token string) progress.Writer {
return &reportingWriter{
Writer: w,
func NewReporterFromLogger(ctx context.Context, w progress.Logger, buildID, token string) *Reporter {
r := &Reporter{
writer: w,
buildID: buildID,
token: token,
client: depotapi.NewBuildClient(),
ch: make(chan *client.SolveStatus, 16384),
}
go r.Run(ctx)

return r
}

func (s *reportingWriter) Write(status *client.SolveStatus) {
s.Writer.Write(status)
reportToAPI(s.client, status, s.buildID, s.token)
func (r *Reporter) Write(status *client.SolveStatus) {
r.writer(status)

r.mu.Lock()
defer r.mu.Unlock()
if r.closed {
return
}

select {
case r.ch <- status:
default:
}
}

// Make sure to call Close() after any call to Write.
func (r *Reporter) Close() {
r.mu.Lock()
r.closed = true
r.mu.Unlock()

close(r.ch)
}

func (r *Reporter) Run(ctx context.Context) {
sender := r.client.ReportStatusStream(ctx)
sender.RequestHeader().Add("Authorization", "Bearer "+r.token)
defer func() {
_, _ = sender.CloseAndReceive()
}()

// Buffer 1 second before sending build timings to the server
const (
bufferTimeout = time.Second
)

// I'm using a timer here because I may need to retry sending data to the server.
// With a retry I need to track what data needs to be sent, however, because I
// may not get more data for a "while" I set it on a timer to force a delivery.
ticker := time.NewTicker(bufferTimeout)
defer ticker.Stop()
statuses := []*controlapi.StatusResponse{}

for {
select {
case status := <-r.ch:
if status == nil {
continue
}

statuses = append(statuses, toStatusResponse(status))
case <-ticker.C:
if len(statuses) == 0 {
ticker.Reset(bufferTimeout)
continue
}

req := &cliv1.ReportStatusStreamRequest{
BuildId: r.buildID,
Statuses: statuses,
}

err := sender.Send(req)
if err == nil {
statuses = statuses[:0]
ticker.Reset(bufferTimeout)
break
}
if errors.Is(err, context.Canceled) {
// This means we got a context cancel while sending the data.
// We loop again and will go to the ctx.Done() case.
continue
}

debuglog.Log("unable to send status: %v", err)

// Reconnect if the connection is broken.
_, _ = sender.CloseAndReceive()
sender = r.client.ReportStatusStream(ctx)
sender.RequestHeader().Add("Authorization", "Bearer "+r.token)
ticker.Reset(bufferTimeout)
case <-ctx.Done():
// Attempt to send any remaining statuses. This is best effort. If it fails, we'll just give up.
for status := range r.ch {
if status == nil {
continue
}
statuses = append(statuses, toStatusResponse(status))
}

if len(statuses) == 0 {
return
}

_, _ = sender.CloseAndReceive()

// Requires a new context because the previous one was canceled.
ctx2, cancel := context.WithTimeout(context.Background(), 5*time.Second)
sender = r.client.ReportStatusStream(ctx2)
sender.RequestHeader().Add("Authorization", "Bearer "+r.token)

req := &cliv1.ReportStatusStreamRequest{
BuildId: r.buildID,
Statuses: statuses,
}

_ = sender.Send(req)
_, _ = sender.CloseAndReceive()
cancel()
return
}
}
}

func (r *Reporter) ValidateLogSource(dgst digest.Digest, src interface{}) bool {
if r.validate == nil {
return true
}
return r.validate(dgst, src)
}

func (r *Reporter) ClearLogSource(src interface{}) {
if r.clear != nil {
r.clear(src)
}
}
Loading

0 comments on commit 54ffc06

Please sign in to comment.