Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(turborepo): properly tag errors when running in GH Actions #5435

Merged
merged 5 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 86 additions & 36 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,68 @@ func (tsob *threadsafeOutputBuffer) Bytes() []byte {
return tsob.buf.Bytes()
}

type logLine struct {
isStdout bool
line []byte
}

// logBuffer holds the log lines for a task, tagged to stdout or stderr
type logBuffer struct {
mu sync.Mutex
lines []logLine
}

// LogLine appends a log line to the log buffer
func (lb *logBuffer) LogLine(line []byte, isStdout bool) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.lines = append(lb.lines, logLine{isStdout, line})
}

// Drain writes the contents of the logBuffer to the appropriate output stream
func (lb *logBuffer) Drain(stdout io.Writer, stderr io.Writer) error {
for _, line := range lb.lines {
if line.isStdout {
if _, err := stdout.Write(line.line); err != nil {
return err
}
} else {
if _, err := stderr.Write(line.line); err != nil {
return err
}
}
}
return nil
}

// StdoutWriter returns a writer tagged to stdout
func (lb *logBuffer) StdoutWriter() *logBufferWriter {
return &logBufferWriter{
isStdout: true,
logBuffer: lb,
}
}

// StderrWriter returns a writer tagged to stderr
func (lb *logBuffer) StderrWriter() *logBufferWriter {
return &logBufferWriter{
isStdout: false,
logBuffer: lb,
}
}

type logBufferWriter struct {
isStdout bool
logBuffer *logBuffer
}

// Write implements io.Writer.Write for logBufferWriter
func (lbw *logBufferWriter) Write(bytes []byte) (int, error) {
n := len(bytes)
lbw.logBuffer.LogLine(bytes, lbw.isStdout)
return n, nil
}

// RealRun executes a set of tasks
func RealRun(
ctx gocontext.Context,
Expand Down Expand Up @@ -124,25 +186,19 @@ func RealRun(
}

taskCount := len(engine.TaskGraph.Vertices())
logChan := make(chan taskLogContext, taskCount)
logChan := make(chan *logBuffer, taskCount)
logWaitGroup := sync.WaitGroup{}
isGrouped := rs.Opts.runOpts.LogOrder == "grouped"

if isGrouped {
logWaitGroup.Add(1)
go func() {
for logContext := range logChan {

outBytes := logContext.outBuf.Bytes()
errBytes := logContext.errBuf.Bytes()
for logBuffer := range logChan {

_, errOut := os.Stdout.Write(outBytes)
_, errErr := os.Stderr.Write(errBytes)

if errOut != nil || errErr != nil {
ec.ui.Error("Failed to output some of the logs.")
err := logBuffer.Drain(os.Stdout, os.Stderr)
if err != nil {
ec.ui.Error(fmt.Sprintf("Failed to output some of the logs: %v", err))
}

}
logWaitGroup.Done()
}()
Expand All @@ -151,15 +207,14 @@ func RealRun(
taskSummaryMutex := sync.Mutex{}
taskSummaries := []*runsummary.TaskSummary{}
execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
outBuf := &bytes.Buffer{}
errBuf := &bytes.Buffer{}

var outWriter io.Writer = os.Stdout
var errWriter io.Writer = os.Stderr

logBuffer := &logBuffer{}

if isGrouped {
outWriter = outBuf
errWriter = errBuf
outWriter = logBuffer.StdoutWriter()
errWriter = logBuffer.StderrWriter()
}

var spacesLogBuffer *threadsafeOutputBuffer
Expand Down Expand Up @@ -194,10 +249,7 @@ func RealRun(
runSummary.CloseTask(taskSummary, logBytes)
}
if isGrouped {
logChan <- taskLogContext{
outBuf: outBuf,
errBuf: errBuf,
}
logChan <- logBuffer
}

// Return the error when there is one
Expand All @@ -214,6 +266,10 @@ func RealRun(

visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, rs.Opts.runOpts.FrameworkInference, globalEnvMode, getArgs, base.Logger, execFunc)
errs := engine.Execute(visitorFn, execOpts)
if isGrouped {
close(logChan)
logWaitGroup.Wait()
}

// Track if we saw any child with a non-zero exit code
exitCode := 0
Expand Down Expand Up @@ -257,11 +313,6 @@ func RealRun(
}
}

if isGrouped {
close(logChan)
logWaitGroup.Wait()
}

if err := runSummary.Close(ctx, exitCode, g.WorkspaceInfos, base.UI); err != nil {
// We don't need to throw an error, but we can warn on this.
// Note: this method doesn't actually return an error for Real Runs at the time of writing.
Expand All @@ -276,11 +327,6 @@ func RealRun(
return nil
}

type taskLogContext struct {
outBuf *bytes.Buffer
errBuf *bytes.Buffer
}

type execContext struct {
colorCache *colorcache.ColorCache
runSummary runsummary.Meta
Expand Down Expand Up @@ -357,6 +403,13 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

if ec.rs.Opts.runOpts.IsGithubActions {
ui.Output(fmt.Sprintf("::group::%s", packageTask.OutputPrefix(ec.isSinglePackage)))
prefixedUI.WarnPrefix = "::warn::"
prefixedUI.ErrorPrefix = "::error::"
defer func() {
// We don't use the prefixedUI here because the prefix in this case would include
// the ::group::<taskID>, and we explicitly want to close the github group
ui.Output("::endgroup::")
}()
}

cacheStatus, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
Expand Down Expand Up @@ -447,11 +500,6 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

closeOutputs := func() error {
var closeErrors []error
if ec.rs.Opts.runOpts.IsGithubActions {
// We don't use the prefixedUI here because the prefix in this case would include
// the ::group::<taskID>, and we explicitly want to close the github group
ui.Output("::endgroup::")
}

if err := logStreamerOut.Close(); err != nil {
closeErrors = append(closeErrors, errors.Wrap(err, "log stdout"))
Expand All @@ -475,7 +523,9 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas

// Run the command
if err := ec.processes.Exec(cmd); err != nil {
// close off our outputs. We errored, so we mostly don't care if we fail to close
// ensure we close off our outputs. We errored, so we mostly don't care if we fail to close
// We don't close them directly because we're potentially going to output some errors or
// warnings that we want grouped with the task output.
_ = closeOutputs()
// if we already know we're in the process of exiting,
// we don't need to record an error to that effect.
Expand Down
102 changes: 101 additions & 1 deletion cli/internal/ui/ui_factory.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package ui

import (
"bufio"
"errors"
"fmt"
"io"
"os"
"os/signal"
"strings"

"github.com/bgentry/speakeasy"
"github.com/fatih/color"
"github.com/mattn/go-isatty"
"github.com/mitchellh/cli"
)

Expand All @@ -16,9 +24,101 @@ type Factory interface {
type BasicUIFactory struct {
}

// basicUI is an implementation of Ui that just outputs to the given
// writer. This UI is not threadsafe by default, but you can wrap it
// in a ConcurrentUi to make it safe.
//
// Inlined from cli.Ui to fuse newlines to lines being logged. This is
// probably not the optimal way to do it, but it works for now.
type basicUI struct {
Reader io.Reader
Writer io.Writer
ErrorWriter io.Writer
}

// Ask implements ui.Cli.Ask for BasicUi
func (u *basicUI) Ask(query string) (string, error) {
return u.ask(query, false)
}

// AskSecret implements ui.Cli.AskSecret for BasicUi
func (u *basicUI) AskSecret(query string) (string, error) {
return u.ask(query, true)
}

func (u *basicUI) ask(query string, secret bool) (string, error) {
if _, err := fmt.Fprint(u.Writer, query+" "); err != nil {
return "", err
}

// Register for interrupts so that we can catch it and immediately
// return...
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt)
defer signal.Stop(sigCh)

// Ask for input in a go-routine so that we can ignore it.
errCh := make(chan error, 1)
lineCh := make(chan string, 1)
go func() {
var line string
var err error
if secret && isatty.IsTerminal(os.Stdin.Fd()) {
line, err = speakeasy.Ask("")
} else {
r := bufio.NewReader(u.Reader)
line, err = r.ReadString('\n')
}
if err != nil {
errCh <- err
return
}

lineCh <- strings.TrimRight(line, "\r\n")
}()

select {
case err := <-errCh:
return "", err
case line := <-lineCh:
return line, nil
case <-sigCh:
// Print a newline so that any further output starts properly
// on a new line.
fmt.Fprintln(u.Writer)

return "", errors.New("interrupted")
}
}

// Error implements ui.Cli.Error for BasicUi
func (u *basicUI) Error(message string) {
w := u.Writer
if u.ErrorWriter != nil {
w = u.ErrorWriter
}

fmt.Fprintf(w, "%v\n", message)
}

// Info implements ui.Cli.Info for BasicUi
func (u *basicUI) Info(message string) {
u.Output(message)
}

// Output implements ui.Cli.Output for BasicUi
func (u *basicUI) Output(message string) {
fmt.Fprintf(u.Writer, "%v\n", message)
}

// Warn implements ui.Cli.Warn for BasicUi
func (u *basicUI) Warn(message string) {
u.Error(message)
}

// Build builds a cli.BasicUi from input, output and error IOs
func (factory *BasicUIFactory) Build(in io.Reader, out io.Writer, err io.Writer) cli.Ui {
return &cli.BasicUi{
return &basicUI{
Reader: in,
Writer: out,
ErrorWriter: err,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "util",
"scripts": {
"build": "sleep 0.5 && echo 'building' && sleep 1 && echo 'completed'"
"build": "sleep 0.5 && echo 'building' && sleep 1 && echo 'completed'",
"fail": "echo 'failing'; exit 1"
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://turbo.build/schema.json",
"pipeline": {
"build": {
}
"build": {},
"fail": {}
}
}
32 changes: 31 additions & 1 deletion turborepo-tests/integration/tests/ordered/github.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ because otherwise prysk interprets them as multiline commands
done
::endgroup::
::group::util:build
cache bypass, force executing 0bc36a8a234e31d4
cache bypass, force executing 90d7154e362e3386

>\sbuild (re)
>\ssleep 0.5 && echo 'building' && sleep 1 && echo 'completed' (re)
Expand All @@ -33,3 +33,33 @@ because otherwise prysk interprets them as multiline commands
Cached: 0 cached, 2 total
Time:\s*[\.0-9]+m?s (re)

Verify that errors are grouped properly
$ ${TURBO} run fail
\xe2\x80\xa2 Packages in scope: my-app, util (esc)
\xe2\x80\xa2 Running fail in 2 packages (esc)
\xe2\x80\xa2 Remote caching disabled (esc)
::group::util:fail
cache miss, executing 9bf2c727d81cf834

\> fail (re)
\> echo 'failing'; exit 1 (re)

failing
npm ERR! Lifecycle script `fail` failed with error:
npm ERR! Error: command failed
npm ERR! in workspace: util
npm ERR\! at location: (.*)/packages/util (re)
::error::ERROR: command finished with error: command \((.*)/packages/util\) npm run fail exited \(1\) (re)
::endgroup::
command \(.*/packages/util\) npm run fail exited \(1\) (re)

Tasks: 0 successful, 1 total
Cached: 0 cached, 1 total
Time:\s*[\.0-9]+m?s (re)
Failed: util#fail

ERROR run failed: command exited (1)
[1]