Skip to content

Commit

Permalink
various test and lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stoksc committed May 28, 2024
1 parent 0e52fed commit 8806bff
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 37 deletions.
22 changes: 12 additions & 10 deletions master/internal/rm/kubernetesrm/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/determined-ai/determined/master/pkg/tasks"
)

func createPod(
func createJob(
allocationID model.AllocationID,
resourceHandler *requestQueue,
task tasks.TaskSpec,
Expand Down Expand Up @@ -82,6 +82,7 @@ func createUser() *model.User {
func createJobWithMockQueue(t *testing.T, k8sRequestQueue *requestQueue) (
*job,
model.AllocationID,
*tasks.TaskSpec,
*sproto.ResourcesSubscription,
) {
commandSpec := tasks.GenericCommandSpec{
Expand Down Expand Up @@ -114,19 +115,20 @@ func createJobWithMockQueue(t *testing.T, k8sRequestQueue *requestQueue) (

aID := model.AllocationID(uuid.NewString())
sub := rmevents.Subscribe(aID)
newPod := createPod(
spec := commandSpec.ToTaskSpec()
newPod := createJob(
aID,
k8sRequestQueue,
commandSpec.ToTaskSpec(),
spec,
)

go consumeResourceRequestFailures(ctx, failures, newPod)

err := newPod.createSpecAndSubmit(&tasks.TaskSpec{})
err := newPod.createSpecAndSubmit(&spec)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)

return newPod, aID, sub
return newPod, aID, &spec, sub
}

func setupEntrypoint(t *testing.T) {
Expand Down Expand Up @@ -173,12 +175,12 @@ func TestResourceCreationFailed(t *testing.T) {

const correctMsg = "already exists"

ref, aID, sub := createJobWithMockQueue(t, nil)
ref, aID, spec, sub := createJobWithMockQueue(t, nil)

purge(aID, sub)
require.Zero(t, sub.Len())
// Send a second start message to trigger an additional resource creation failure.
err := ref.createSpecAndSubmit(&tasks.TaskSpec{})
err := ref.createSpecAndSubmit(spec)
require.NoError(t, err)
time.Sleep(time.Second)

Expand All @@ -199,7 +201,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {

t.Run("job deleting, but in pending state", func(t *testing.T) {
t.Logf("Testing PodPending status")
ref, aID, sub := createJobWithMockQueue(t, nil)
ref, aID, _, sub := createJobWithMockQueue(t, nil)
purge(aID, sub)
require.Zero(t, sub.Len())

Expand All @@ -226,7 +228,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {

t.Run("job failed", func(t *testing.T) {
t.Logf("Testing PodFailed status")
ref, aID, sub := createJobWithMockQueue(t, nil)
ref, aID, _, sub := createJobWithMockQueue(t, nil)
purge(aID, sub)
require.Zero(t, sub.Len())

Expand All @@ -244,7 +246,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {
})

t.Run("pod succeeded", func(t *testing.T) {
ref, aID, sub := createJobWithMockQueue(t, nil)
ref, aID, _, sub := createJobWithMockQueue(t, nil)
purge(aID, sub)
require.Zero(t, sub.Len())

Expand Down
11 changes: 8 additions & 3 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kubernetesrm
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -12,6 +11,7 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
batchV1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -356,6 +356,7 @@ func (j *jobsService) deleteDoomedKubernetesResources() error {
for _, alloc := range openAllocations {
openAllocationIDs.Insert(alloc.AllocationID)
}
j.syslog.Infof("found open allocations %s", openAllocationIDs)

listOptions := metaV1.ListOptions{LabelSelector: determinedLabel}
jobs, err := j.listJobsInAllNamespaces(context.TODO(), listOptions)
Expand All @@ -377,7 +378,7 @@ func (j *jobsService) deleteDoomedKubernetesResources() error {
continue
}

allocationIDStr := job.Labels[determinedLabel]
allocationIDStr := job.Labels[allocationIDLabel]
if allocationIDStr == "" {
j.syslog.Warnf("deleting job '%s' without determined label (whose value is the allocation ID)", job.Name)
toKillJobs.Items = append(toKillJobs.Items, job)
Expand All @@ -386,7 +387,9 @@ func (j *jobsService) deleteDoomedKubernetesResources() error {
allocationID := model.AllocationID(allocationIDStr)

if !openAllocationIDs.Contains(allocationID) {
j.syslog.Warnf("deleting job '%s', did not find an open allocation for it", allocationID)
j.syslog.
WithField("allocation-id", allocationID).
Warnf("deleting job '%s', did not find an open allocation for it", job.Name)
toKillJobs.Items = append(toKillJobs.Items, job)
continue
}
Expand Down Expand Up @@ -669,6 +672,7 @@ func (j *jobsService) refreshJobState(allocationID model.AllocationID) error {
if _, ok := j.namespaceToPoolName[job.Namespace]; !ok {
continue
}
job := job
j.jobUpdatedCallback(&job)
}
return nil
Expand All @@ -690,6 +694,7 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error {
if _, ok := j.namespaceToPoolName[pod.Namespace]; !ok {
continue
}
pod := pod
j.podStatusCallback(&pod)
}
return nil
Expand Down
11 changes: 8 additions & 3 deletions master/internal/rm/kubernetesrm/jobs_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//go:build integration

package kubernetesrm

import (
"context"
"errors"
"fmt"
"os"
"runtime/debug"
"strings"
"testing"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -72,7 +75,7 @@ func TestJobWorkflows(t *testing.T) {
slots: 1,
wantFailure: &sproto.ResourcesFailedError{
FailureType: sproto.ResourcesFailed,
ErrMsg: "unrecoverable image pull errors in pod",
ErrMsg: "unrecoverable image pull errors",
},
},
{
Expand Down Expand Up @@ -224,7 +227,7 @@ func testLaunch(
}

func TestPodLogStreamerReattach(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

j := newTestJobsService(t)
Expand Down Expand Up @@ -319,6 +322,7 @@ func TestPodLogStreamerReattach(t *testing.T) {
for {
log := poll[*sproto.ContainerLog](ctx, t, sub)
if strings.Contains(log.Message(), secret) {
t.Logf("saw one log: %s", log.Message())
seen++
}
if seen == 2 {
Expand Down Expand Up @@ -892,6 +896,7 @@ func poll[T sproto.ResourcesEvent](ctx context.Context, t *testing.T, sub *sprot
if err != nil {
var typed T
t.Errorf("failed to receive %T in time: %s", typed, err)
t.Error(string(debug.Stack()))
t.FailNow()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

func TestMain(m *testing.M) {
// Need to set up the DB for TestJobQueueStats
pgDB, _, err := db.ResolveTestPostgres()
pgDB, _, err := db.ResolveNewPostgresDatabase()
if err != nil {
log.Panicln(err)
}
Expand Down
3 changes: 2 additions & 1 deletion master/internal/rm/kubernetesrm/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func startPodLogStreamer(
return errors.Wrapf(err, "failed to initialize log stream for pod: %s", podName)
}
syslog := logrus.WithField("podName", podName)
logger := &podLogStreamer{callback}

logger := &podLogStreamer{callback}
go logger.receiveStreamLogs(syslog, logReader)

return nil
Expand All @@ -50,6 +50,7 @@ func (p *podLogStreamer) receiveStreamLogs(
syslog *logrus.Entry,
logReader io.ReadCloser,
) {
syslog.Debug("starting pod log streamer")
_, err := io.Copy(p, logReader)
if err != nil {
syslog.WithError(err).Debug("error reading logs")
Expand Down
43 changes: 25 additions & 18 deletions master/internal/rm/kubernetesrm/request_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"context"
"fmt"

batchV1 "k8s.io/client-go/kubernetes/typed/batch/v1"

"github.com/sirupsen/logrus"

"github.com/determined-ai/determined/master/pkg/ptrs"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchV1 "k8s.io/client-go/kubernetes/typed/batch/v1"
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"

"github.com/determined-ai/determined/master/pkg/ptrs"
)

type requestProcessingWorker struct {
Expand Down Expand Up @@ -101,32 +100,40 @@ func (r *requestProcessingWorker) receiveDeleteKubernetesResources(
GracePeriodSeconds: &gracePeriod,
PropagationPolicy: ptrs.Ptr(metaV1.DeletePropagationBackground),
})
if err != nil {
r.syslog.WithError(err).Errorf("failed to delete pod %s", msg.jobName)
} else {
switch {
case k8serrors.IsNotFound(err):
r.syslog.Infof("job %s is already deleted", msg.jobName)
case err != nil:
r.syslog.WithError(err).Errorf("failed to delete job %s", msg.jobName)
default:
r.syslog.Infof("deleted job %s", msg.jobName)
}
}

if len(msg.podName) > 0 {
err = r.podInterface[msg.namespace].Delete(
context.TODO(), msg.podName, metaV1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
if err != nil {
r.syslog.WithError(err).Errorf("failed to delete pod %s", msg.podName)
} else {
r.syslog.Infof("deleted pod %s", msg.podName)
switch {
case k8serrors.IsNotFound(err):
r.syslog.Infof("pod %s is already deleted", msg.jobName)
case err != nil:
r.syslog.WithError(err).Errorf("failed to delete pod %s", msg.jobName)
default:
r.syslog.Infof("deleted pod %s", msg.jobName)
}
}

if len(msg.configMapName) > 0 {
errDeletingConfigMap := r.configMapInterfaces[msg.namespace].Delete(
err = r.configMapInterfaces[msg.namespace].Delete(
context.TODO(), msg.configMapName,
metaV1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
if errDeletingConfigMap != nil {
r.syslog.WithError(err).Errorf("failed to delete configMap %s", msg.configMapName)
err = errDeletingConfigMap
} else {
r.syslog.Infof("deleted configMap %s", msg.configMapName)
switch {
case k8serrors.IsNotFound(err):
r.syslog.Infof("configMap %s is already deleted", msg.jobName)
case err != nil:
r.syslog.WithError(err).Errorf("failed to delete configMap %s", msg.jobName)
default:
r.syslog.Infof("deleted configMap %s", msg.jobName)
}
}

Expand Down
15 changes: 14 additions & 1 deletion master/internal/rm/kubernetesrm/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,9 +626,22 @@ func (j *job) createSpec(scheduler string, taskSpec *tasks.TaskSpec) (*batchV1.J
}

func configureUniqueName(t tasks.TaskSpec) string {
name := t.Description

// Prefix with a cluster ID so multiple Determined installations can coexist within cluster. But
// limit to the first 8 chars of the cluster ID to avoid the 63 character limit (this is ~53).
return fmt.Sprintf("%s-%s", t.ClusterID[:8], t.Description)
// Handle short cluster IDs for tests.
var clusterIDPrefix string
if len(t.ClusterID) >= 8 {
clusterIDPrefix = t.ClusterID[:8]
} else {
clusterIDPrefix = t.ClusterID
}
if clusterIDPrefix != "" {
name = fmt.Sprintf("%s-%s", clusterIDPrefix, name)
}

return name
}

func configureSecurityContext(agentUserGroup *model.AgentUserGroup) *k8sV1.SecurityContext {
Expand Down

0 comments on commit 8806bff

Please sign in to comment.