Skip to content

Commit

Permalink
surface Start split to Connect and WaitForCompletion to follower and …
Browse files Browse the repository at this point in the history
…its consumers

allow for optimize / eliminate wait/sleep/timeout intervals in follower unit test and follower consumer unit tests
remove follower initialization in unit test
split follower setup where Complete() is for creation and Run() for running follower
add buildrun name setter to facilitate Complete() vs. Run() split
add multi thread coordination via FollowerReady to make sure pod_watcher is up
  • Loading branch information
gabemontero committed Apr 29, 2022
1 parent 0dc7c18 commit 600428d
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 46 deletions.
48 changes: 33 additions & 15 deletions pkg/shp/cmd/build/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
type RunCommand struct {
cmd *cobra.Command // cobra command instance

buildName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
follow bool // flag to tail pod logs
follower *follower.Follower
buildName string
namespace string
buildRunSpec *buildv1alpha1.BuildRunSpec // stores command-line flags
follow bool // flag to tail pod logs
follower *follower.Follower
followerReady chan bool
}

const buildRunLongDesc = `
Expand All @@ -52,6 +53,15 @@ func (r *RunCommand) Complete(params *params.Params, ioStreams *genericclioption

r.namespace = params.Namespace()

if r.follow {
var err error
// provide empty build run name; will be set in Run()
r.follower, err = params.NewFollower(r.cmd.Context(), types.NamespacedName{}, ioStreams)
if err != nil {
return err
}
r.followerReady = make(chan bool, 1)
}
// overwriting build-ref name to use what's on arguments
return r.Cmd().Flags().Set(flags.BuildrefNameFlag, r.buildName)
}
Expand All @@ -64,6 +74,16 @@ func (r *RunCommand) Validate() error {
return nil
}

// FollowerReady blocks until the any log following connections are established in the Run call.
// Useful if you have code that calls Run on a separate thread and coordination is needed.
func (r *RunCommand) FollowerReady() bool {
if !r.follow {
return false
}
_, closed := <-r.followerReady
return !closed
}

// Run creates a BuildRun resource based on Build's name informed on arguments.
func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOStreams) error {
// resource using GenerateName, which will provide a unique instance
Expand All @@ -90,15 +110,8 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
return nil
}

// during unit-testing the follower instance will be injected directly, which makes possible to
// simulate the pod events without creating a race condition
if r.follower == nil {
buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()}
r.follower, err = params.NewFollower(ctx, buildRun, ioStreams)
if err != nil {
return err
}
}
buildRun := types.NamespacedName{Namespace: r.namespace, Name: br.GetName()}
r.follower.SetBuildRunName(buildRun)

// instantiating a pod watcher with a specific label-selector to find the indented pod where the
// actual build started by this subcommand is being executed, including the randomized buildrun
Expand All @@ -108,7 +121,12 @@ func (r *RunCommand) Run(params *params.Params, ioStreams *genericclioptions.IOS
r.buildName,
br.GetName(),
)}
_, err = r.follower.Start(listOpts)
err = r.follower.Connect(listOpts)
if err != nil {
return err
}
close(r.followerReady)
_, err = r.follower.WaitForCompletion()
return err
}

Expand Down
30 changes: 18 additions & 12 deletions pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"strings"
"testing"
"time"

buildv1alpha1 "github.com/shipwright-io/build/pkg/apis/build/v1alpha1"
shpfake "github.com/shipwright-io/build/pkg/client/clientset/versioned/fake"
Expand All @@ -15,7 +16,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/fake"
fakekubetesting "k8s.io/client-go/testing"
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestStartBuildRunFollowLog(t *testing.T) {
},
{
name: "timeout",
to: "1s",
to: "1ms",
logText: reactor.RequestTimeoutMessage,
},
{
Expand Down Expand Up @@ -146,18 +146,10 @@ func TestStartBuildRunFollowLog(t *testing.T) {
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)
failureDuration := 1 * time.Millisecond
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, &failureDuration, &failureDuration)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()
var err error
cmd.follower, err = param.NewFollower(
cmd.Cmd().Context(),
types.NamespacedName{Namespace: br.GetNamespace(), Name: br.GetName()},
&ioStreams,
)
if err != nil {
t.Fatalf("error instantiating follower: %q", err)
}

switch {
case test.cancelled:
Expand All @@ -184,6 +176,12 @@ func TestStartBuildRunFollowLog(t *testing.T) {
Status: corev1.ConditionFalse,
},
}
case test.phase == corev1.PodRunning:
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, corev1.ContainerStatus{
State: corev1.ContainerState{
Running: &corev1.ContainerStateRunning{StartedAt: metav1.Now()},
},
})
}

cmd.Complete(param, &ioStreams, []string{name})
Expand All @@ -200,6 +198,14 @@ func TestStartBuildRunFollowLog(t *testing.T) {
}
}()

// when employing the Run() method in a multi-threaded capacity, we must make sure
// the underlying Follower/PodWatcher watches are sync'ed and ready for use before
// we start populating the event queue
ready := cmd.FollowerReady()
if !ready {
t.Errorf("%s follower no ready", test.name)
}

if !test.noPodYet {
// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
pod.Status.Phase = test.phase
Expand Down
2 changes: 1 addition & 1 deletion pkg/shp/cmd/buildrun/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestCancelBuildRun(t *testing.T) {

// set up context
cmd.Cmd().ExecuteC()
param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault)
param := params.NewParamsForTest(nil, clientset, nil, metav1.NamespaceDefault, nil, nil)

ioStreams, _, _, _ := genericclioptions.NewTestIOStreams()
err := cmd.Run(param, &ioStreams)
Expand Down
4 changes: 2 additions & 2 deletions pkg/shp/cmd/buildrun/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestStreamBuildLogs(t *testing.T) {

clientset := fake.NewSimpleClientset(pod)
ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()
param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault)
param := params.NewParamsForTest(clientset, nil, nil, metav1.NamespaceDefault, nil, nil)
err := cmd.Run(param, &ioStreams)
if err != nil {
t.Fatalf("%s", err.Error())
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestStreamBuildRunFollowLogs(t *testing.T) {
if len(test.to) > 0 {
pm.Timeout = &test.to
}
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault)
param := params.NewParamsForTest(kclientset, shpclientset, pm, metav1.NamespaceDefault, nil, nil)

ioStreams, _, out, _ := genericclioptions.NewTestIOStreams()

Expand Down
63 changes: 51 additions & 12 deletions pkg/shp/cmd/follower/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type Follower struct {

logLock sync.Mutex // avoiding race condition to print logs
enteredRunningState bool // target pod is running

failPollInterval time.Duration // for use in the PollInterval call when processing failed pods
failPollTimeout time.Duration // for use in the PollInterval call when processing failed pods
}

// NewFollower returns a Follower instance.
Expand All @@ -56,9 +59,11 @@ func NewFollower(
clientset: clientset,
buildClientset: buildClientset,

logTail: tail.NewTail(ctx, clientset),
logLock: sync.Mutex{},
tailLogsStarted: map[string]bool{},
logTail: tail.NewTail(ctx, clientset),
logLock: sync.Mutex{},
tailLogsStarted: map[string]bool{},
failPollInterval: 1 * time.Second,
failPollTimeout: 15 * time.Second,
}

f.pw.WithOnPodModifiedFn(f.OnEvent)
Expand All @@ -68,6 +73,23 @@ func NewFollower(
return f
}

// SetBuildRunName allows for setting of the BuildRun name after to call to NewFollower. This help service
// auto generation of the BuildRun name from the Build. NOTE, if the BuildRun name
// is not set prior to the call to WaitForCompletion, the Follower will not function fully once events arrive.
func (f *Follower) SetBuildRunName(brName types.NamespacedName) {
f.buildRun = brName
}

// SetFailPollInterval overrides the default value used in polling calls
func (f *Follower) SetFailPollInterval(t time.Duration) {
f.failPollInterval = t
}

// SetFailPollTimeout overrides the default value used in polling calls
func (f *Follower) SetFailPollTimeout(t time.Duration) {
f.failPollTimeout = t
}

// GetLogLock returns the mutex used for coordinating access to log buffers.
func (f *Follower) GetLogLock() *sync.Mutex {
return &f.logLock
Expand Down Expand Up @@ -106,16 +128,20 @@ func (f *Follower) OnEvent(pod *corev1.Pod) error {
case corev1.PodRunning:
if !f.enteredRunningState {
f.Log(fmt.Sprintf("Pod %q in %q state, starting up log tail", pod.GetName(), corev1.PodRunning))
f.enteredRunningState = true
// graceful time to wait for container start
time.Sleep(3 * time.Second)
// start tailing container logs
f.tailLogs(pod)
for _, c := range pod.Status.ContainerStatuses {
if c.State.Running != nil && !c.State.Running.StartedAt.IsZero() {
f.enteredRunningState = true
break
}
}
if f.enteredRunningState {
f.tailLogs(pod)
}
}
case corev1.PodFailed:
msg := ""
var br *buildv1alpha1.BuildRun
err := wait.PollImmediate(1*time.Second, 15*time.Second, func() (done bool, err error) {
err := wait.PollImmediate(f.failPollInterval, f.failPollTimeout, func() (done bool, err error) {
brClient := f.buildClientset.ShipwrightV1alpha1().BuildRuns(pod.Namespace)
br, err = brClient.Get(f.ctx, f.buildRun.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -235,7 +261,20 @@ func (f *Follower) OnNoPodEventsYet(podList *corev1.PodList) {
}
}

// Start initiates the log following for the referenced BuildRun's Pod
func (f *Follower) Start(lo metav1.ListOptions) (*corev1.Pod, error) {
return f.pw.Start(lo)
func (f *Follower) Connect(lo metav1.ListOptions) error {
return f.pw.Connect(lo)
}

// WaitForCompletion initiates the log following for the referenced BuildRun's Pod
func (f *Follower) WaitForCompletion() (*corev1.Pod, error) {
return f.pw.WaitForCompletion()
}

// Start is a convenience method for capturing the use of both Connect and WaitForCompletion
func (f *Follower) Start(listOpts metav1.ListOptions) (*corev1.Pod, error) {
err := f.Connect(listOpts)
if err != nil {
return nil, err
}
return f.WaitForCompletion()
}
22 changes: 18 additions & 4 deletions pkg/shp/params/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Params struct {

configFlags *genericclioptions.ConfigFlags
namespace string

failPollInterval *time.Duration
failPollTimeout *time.Duration
}

// AddFlags accepts flags and adds program global flags to it
Expand Down Expand Up @@ -181,6 +184,12 @@ func (p *Params) NewFollower(
}

p.follower = follower.NewFollower(ctx, br, ioStreams, pw, clientset, buildClientset)
if p.failPollTimeout != nil {
p.follower.SetFailPollTimeout(*p.failPollTimeout)
}
if p.failPollInterval != nil {
p.follower.SetFailPollInterval(*p.failPollInterval)
}
return p.follower, nil
}

Expand All @@ -198,11 +207,16 @@ func NewParamsForTest(clientset kubernetes.Interface,
shpClientset buildclientset.Interface,
configFlags *genericclioptions.ConfigFlags,
namespace string,
failPollInterval *time.Duration,
failPollTimeout *time.Duration,

) *Params {
return &Params{
clientset: clientset,
buildClientset: shpClientset,
configFlags: configFlags,
namespace: namespace,
clientset: clientset,
buildClientset: shpClientset,
configFlags: configFlags,
namespace: namespace,
failPollInterval: failPollInterval,
failPollTimeout: failPollTimeout,
}
}

0 comments on commit 600428d

Please sign in to comment.