Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of remote exec in runnerv2 #720

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/beta/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func runCodeBlock(
KnownName: block.Name(),
KnownID: block.ID(),
}
ctx = rcontext.ContextWithExecutionInfo(ctx, execInfo)
ctx = rcontext.WithExecutionInfo(ctx, execInfo)

cmd, err := factory.Build(cfg, options)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/command/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func redactConfig(cfg *ProgramConfig) *ProgramConfig {
}

func isShell(cfg *ProgramConfig) bool {
return IsShellProgram(filepath.Base(cfg.ProgramName)) || IsShellLanguage(cfg.LanguageId)
return isShellProgram(filepath.Base(cfg.ProgramName)) || IsShellLanguage(cfg.LanguageId)
}

func IsShellProgram(programName string) bool {
func isShellProgram(programName string) bool {
switch strings.ToLower(programName) {
case "sh", "bash", "zsh", "ksh", "shell":
return true
Expand Down
2 changes: 1 addition & 1 deletion internal/config/autoconfig/autoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func getLogger(c *config.Config) (*zap.Logger, error) {
}

if c.Log.Verbose {
zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
zapConfig.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
zapConfig.Development = true
zapConfig.Encoding = "console"
zapConfig.EncoderConfig = zap.NewDevelopmentEncoderConfig()
Expand Down
4 changes: 2 additions & 2 deletions internal/owl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,12 +698,12 @@ func (s *Store) LoadEnvs(source string, envs ...string) error {
return nil
}

func (s *Store) Update(context context.Context, newOrUpdated, deleted []string) error {
func (s *Store) Update(ctx context.Context, newOrUpdated, deleted []string) error {
s.mu.Lock()
defer s.mu.Unlock()

execRef := "[execution]"
if execInfo, ok := context.Value(rcontext.ExecutionInfoKey).(*rcontext.ExecutionInfo); ok {
if execInfo, ok := rcontext.ExecutionInfoFromContext(ctx); ok {
execRef = fmt.Sprintf("#%s", execInfo.KnownID)
if execInfo.KnownName != "" {
execRef = fmt.Sprintf("#%s", execInfo.KnownName)
Expand Down
13 changes: 9 additions & 4 deletions internal/runner/context/exec_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package runner

import "context"

type runnerContextKey struct{}
type contextKey struct{ string }

var ExecutionInfoKey = &runnerContextKey{}
var executionInfoKey = &contextKey{"ExecutionInfo"}

type ExecutionInfo struct {
ExecContext string
Expand All @@ -13,6 +13,11 @@ type ExecutionInfo struct {
RunID string
}

func ContextWithExecutionInfo(ctx context.Context, execInfo *ExecutionInfo) context.Context {
return context.WithValue(ctx, ExecutionInfoKey, execInfo)
func WithExecutionInfo(ctx context.Context, execInfo *ExecutionInfo) context.Context {
return context.WithValue(ctx, executionInfoKey, execInfo)
}

func ExecutionInfoFromContext(ctx context.Context) (*ExecutionInfo, bool) {
execInfo, ok := ctx.Value(executionInfoKey).(*ExecutionInfo)
return execInfo, ok
}
4 changes: 2 additions & 2 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
KnownName: req.GetKnownName(),
KnownID: req.GetKnownId(),
}
ctx := rcontext.ContextWithExecutionInfo(srv.Context(), execInfo)
ctx := rcontext.WithExecutionInfo(srv.Context(), execInfo)

if req.KnownId != "" {
logger = logger.With(zap.String("knownID", req.KnownId))
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
cmdCtx := ctx

if req.Background {
cmdCtx = rcontext.ContextWithExecutionInfo(context.Background(), execInfo)
cmdCtx = rcontext.WithExecutionInfo(context.Background(), execInfo)
}

if err := cmd.StartWithOpts(cmdCtx, &startOpts{}); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/runnerv2client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@ import (
runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2"
)

const maxMsgSize = 32 * 1024 * 1024 // 32 MiB

type Client struct {
runnerv2.RunnerServiceClient
conn *grpc.ClientConn
logger *zap.Logger
}

func New(target string, logger *zap.Logger, opts ...grpc.DialOption) (*Client, error) {
opts = append(
// default options
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
},
opts...,
)

client, err := grpc.NewClient(target, opts...)
if err != nil {
return nil, errors.WithStack(err)
Expand Down
30 changes: 30 additions & 0 deletions internal/runnerv2service/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package runnerv2service

import (
"github.com/stateful/runme/v3/internal/session"
runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2"
"github.com/stateful/runme/v3/pkg/project"
)

func convertSessionToProtoSession(sess *session.Session) *runnerv2.Session {
return &runnerv2.Session{
Id: sess.ID,
Env: sess.GetAllEnv(),
// Metadata: sess.Metadata,
}
}

// TODO(adamb): this function should not return nil project and nil error at the same time.
func convertProtoProjectToProject(runnerProj *runnerv2.Project) (*project.Project, error) {
if runnerProj == nil {
return nil, nil
}

opts := project.DefaultProjectOptions[:]

if runnerProj.EnvLoadOrder != nil {
opts = append(opts, project.WithEnvFilesReadOrder(runnerProj.EnvLoadOrder))
}

return project.NewDirProject(runnerProj.Root, opts...)
}
152 changes: 74 additions & 78 deletions internal/runnerv2service/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,80 +22,13 @@ import (
"github.com/stateful/runme/v3/pkg/project"
)

const (
// msgBufferSize limits the size of data chunks
// sent by the handler to clients. It's smaller
// intentionally as typically the messages are
// small.
// In the future, it might be worth to implement
// variable-sized buffers.
msgBufferSize = 2 * 1024 * 1024 // 2 MiB
)

var opininatedEnvVarNamingRegexp = regexp.MustCompile(`^[A-Z_][A-Z0-9_]{1}[A-Z0-9_]*[A-Z][A-Z0-9_]*$`)

type buffer struct {
mu *sync.Mutex
// +checklocks:mu
b *bytes.Buffer
closed *atomic.Bool
close chan struct{}
more chan struct{}
}

var _ io.WriteCloser = (*buffer)(nil)

func newBuffer() *buffer {
return &buffer{
mu: &sync.Mutex{},
b: bytes.NewBuffer(make([]byte, 0, msgBufferSize)),
closed: &atomic.Bool{},
close: make(chan struct{}),
more: make(chan struct{}),
}
}

func (b *buffer) Write(p []byte) (int, error) {
if b.closed.Load() {
return 0, errors.New("closed")
}

b.mu.Lock()
n, err := b.b.Write(p)
b.mu.Unlock()

select {
case b.more <- struct{}{}:
default:
}

return n, err
}

func (b *buffer) Close() error {
if b.closed.CompareAndSwap(false, true) {
close(b.close)
}
return nil
}

func (b *buffer) Read(p []byte) (int, error) {
b.mu.Lock()
n, err := b.b.Read(p)
b.mu.Unlock()

if err != nil && errors.Is(err, io.EOF) && !b.closed.Load() {
select {
case <-b.more:
case <-b.close:
return n, io.EOF
}
return n, nil
}

return n, err
func matchesOpinionatedEnvVarNaming(knownName string) bool {
return opininatedEnvVarNamingRegexp.MatchString(knownName)
}

//lint:ignore U1000 Used in A/B testing
type execution struct {
Cmd command.Command
knownName string
Expand All @@ -108,6 +41,7 @@ type execution struct {
storeStdoutInEnv bool
}

//lint:ignore U1000 Used in A/B testing
func newExecution(
cfg *command.ProgramConfig,
proj *project.Project,
Expand All @@ -121,8 +55,8 @@ func newExecution(
)

stdin, stdinWriter := io.Pipe()
stdout := newBuffer()
stderr := newBuffer()
stdout := newBuffer(msgBufferSize)
stderr := newBuffer(msgBufferSize)

cmdOptions := command.CommandOptions{
EnableEcho: true,
Expand Down Expand Up @@ -153,7 +87,7 @@ func newExecution(
return exec, nil
}

func (e *execution) Wait(ctx context.Context, sender sender) (int, error) {
func (e *execution) Wait(ctx context.Context, sender runnerv2.RunnerService_ExecuteServer) (int, error) {
lastStdout := io.Discard
if e.storeStdoutInEnv {
b := rbuffer.NewRingBuffer(session.MaxEnvSizeInBytes - len(command.StoreStdoutEnvName) - 1)
Expand Down Expand Up @@ -337,10 +271,6 @@ func (e *execution) storeOutputInEnv(ctx context.Context, r io.Reader) {
}
}

func matchesOpinionatedEnvVarNaming(knownName string) bool {
return opininatedEnvVarNamingRegexp.MatchString(knownName)
}

type sender interface {
Send(*runnerv2.ExecuteResponse) error
}
Expand All @@ -363,7 +293,7 @@ func readSendLoop(
eof = true
}

logger.Info("readSendLoop", zap.Int("n", n))
logger.Debug("readSendLoop", zap.Int("n", n))

if n == 0 && eof {
return nil
Expand Down Expand Up @@ -400,3 +330,69 @@ func exitCodeFromErr(err error) int {
}
return -1
}

type buffer struct {
mu *sync.Mutex
// +checklocks:mu
b *bytes.Buffer
closed *atomic.Bool
close chan struct{}
more chan struct{}
}

var _ io.WriteCloser = (*buffer)(nil)

func newBuffer(size int) *buffer {
return &buffer{
mu: &sync.Mutex{},
b: bytes.NewBuffer(make([]byte, 0, size)),
closed: &atomic.Bool{},
close: make(chan struct{}),
more: make(chan struct{}),
}
}

func (b *buffer) Write(p []byte) (int, error) {
if b.closed.Load() {
return 0, errors.New("closed")
}

b.mu.Lock()
n, err := b.b.Write(p)
b.mu.Unlock()

select {
case b.more <- struct{}{}:
default:
}

return n, err
}

func (b *buffer) Close() error {
if b.closed.CompareAndSwap(false, true) {
close(b.close)
}
return nil
}

func (b *buffer) Read(p []byte) (int, error) {
b.mu.Lock()
n, err := b.b.Read(p)
b.mu.Unlock()

if err != nil && errors.Is(err, io.EOF) && !b.closed.Load() {
if n > 0 {
return n, nil
}

select {
case <-b.more:
return b.Read(p)
case <-b.close:
return 0, io.EOF
}
}

return n, err
}
Loading
Loading