Skip to content

Commit

Permalink
chore(api): Update go dependencies in merlin api server (#551)
Browse files Browse the repository at this point in the history
# Description
This MR basically bumps a couple of Go package dependencies used by the
Merlin API server, which in turn is imported by the Turing API server
and will have its own dependencies updated in a separate PR (the updates
in the PR will allow the dependencies of the Turing API server to also
be updated without having to be pinned to the versions used in the
Merlin API server). The version of Go used has also been bumped up from
1.20 to 1.22.

Main packages updated:
```
- github.com/GoogleCloudPlatform/spark-on-k8s-operator
- github.com/google/go-containerregistry 
- github.com/kserve/kserve
- k8s.io/api 
- k8s.io/apimachinery
- k8s.io/client-go
- knative.dev/networking
- knative.dev/pkg
- knative.dev/serving
- sigs.k8s.io/controller-runtime
```

Some minor refactoring was performed on structs that may have changed or
on functions called due to the changes in their expected argument -
these are annotated directly on the diffs.

# Modifications
- `api/cluster/controller.go` - Updated the KServe controller functions
as they now require a context
- `api/cluster/resource/templater.go` - Updated the `Handler` field of a
`Probe` struct to a `ProbeHandler` field
- `api/go.mod` - Updated various packages here

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [x] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [x] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note
NONE
```
  • Loading branch information
deadlycoconuts authored Apr 8, 2024
1 parent 1b8e3f5 commit 2beab49
Show file tree
Hide file tree
Showing 28 changed files with 505 additions and 1,387 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/merlin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
ARTIFACT_RETENTION_DAYS: 7
DOCKER_BUILDKIT: 1
DOCKER_REGISTRY: ghcr.io
GO_VERSION: "1.20"
GO_VERSION: "1.22"

jobs:
create-version:
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
# Ensure the same version as the one defined in Makefile
version: v1.51.2
version: v1.56.2
working-directory: api

test-api:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ============================================================
# Build stage 1: Build API
# ============================================================
FROM golang:1.20-alpine as go-builder
FROM golang:1.22-alpine as go-builder

RUN apk update && apk add --no-cache git ca-certificates bash
RUN mkdir -p src/api
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ VERSION := $(or ${VERSION}, $(shell git describe --tags --always --first-parent)
LOG_URL?=localhost:8002
TEST_TAGS?=

GOLANGCI_LINT_VERSION="v1.51.2"
GOLANGCI_LINT_VERSION="v1.56.2"
PROTOC_GEN_GO_JSON_VERSION="v1.1.0"
PROTOC_GEN_GO_VERSION="v1.26"
PYTHON_VERSION ?= "39" #set as 38 39 310 for 3.8-3.10 respectively
Expand Down
9 changes: 6 additions & 3 deletions api/batch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewController(
manifestManager ManifestManager,
envMetaData cluster.Metadata,
batchJobTemplater *BatchJobTemplater,
) Controller {
) (Controller, error) {
informerFactory := externalversions.NewSharedInformerFactory(sparkClient, resyncPeriod)
informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
Expand All @@ -130,10 +130,13 @@ func NewController(
ContainerFetcher: cluster.NewContainerFetcher(kubeClient.CoreV1(), envMetaData),
}

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.onUpdate,
})
return controller
if err != nil {
return nil, err
}
return controller, nil
}

func (c *controller) Submit(ctx context.Context, predictionJob *models.PredictionJob, namespace string) error {
Expand Down
34 changes: 24 additions & 10 deletions api/batch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,10 @@ func TestSubmit(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
ctl, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

mockKubeClient.PrependReactor("get", "namespaces", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, kerrors.NewNotFound(schema.GroupResource{}, action.(ktesting.GetAction).GetName())
Expand Down Expand Up @@ -358,7 +360,7 @@ func TestSubmit(t *testing.T) {
mockManifestManager.On("DeleteJobSpec", context.Background(), jobName, defaultNamespace).Return(nil)
}

err := ctl.Submit(context.Background(), predictionJob, test.namespace)
err = ctl.Submit(context.Background(), predictionJob, test.namespace)
if test.wantError {
assert.Error(t, err)
assert.Equal(t, test.wantErrorMsg, err.Error())
Expand Down Expand Up @@ -396,13 +398,14 @@ func TestCleanupAfterSubmitFailed(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
ctl, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

mockManifestManager.On("DeleteSecret", context.Background(), jobName, defaultNamespace).Return(nil)
mockManifestManager.On("DeleteJobSpec", context.Background(), jobName, defaultNamespace).Return(nil)

err := ctl.Submit(context.Background(), predictionJob, defaultNamespace)
err = ctl.Submit(context.Background(), predictionJob, defaultNamespace)
assert.Error(t, err)
mockManifestManager.AssertExpectations(t)
}
Expand All @@ -422,8 +425,12 @@ func TestOnUpdate(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater).(*controller)
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
stopCh := make(chan struct{})
defer close(stopCh)
go ctl.Run(stopCh)
Expand Down Expand Up @@ -499,8 +506,12 @@ func TestUpdateStatus(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
clusterMetadata, batchJobTemplater).(*controller)
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
stopCh := make(chan struct{})
defer close(stopCh)
go ctl.Run(stopCh)
Expand Down Expand Up @@ -597,9 +608,12 @@ func TestStop(t *testing.T) {
mockManifestManager := &batchMock.ManifestManager{}
clusterMetadata := cluster.Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}
batchJobTemplater := NewBatchJobTemplater(defaultBatchConfig)
ctl := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient, mockManifestManager,
newController, err := NewController(mockStorage, mockMlpAPIClient, mockSparkClient, mockKubeClient,
mockManifestManager,
clusterMetadata, batchJobTemplater)
assert.NoError(t, err)

ctl := newController.(*controller)
mockKubeClient.PrependReactor("get", "namespaces", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, kerrors.NewNotFound(schema.GroupResource{}, action.(ktesting.GetAction).GetName())
})
Expand All @@ -616,7 +630,7 @@ func TestStop(t *testing.T) {
})
mockStorage.On("Delete", predictionJob).Return(nil)

err := ctl.Stop(context.Background(), predictionJob, namespace.Name)
err = ctl.Stop(context.Background(), predictionJob, namespace.Name)
if test.wantError {
assert.Error(t, err)
assert.Equal(t, test.wantErrorMsg, err.Error())
Expand Down
23 changes: 13 additions & 10 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
if modelService.CurrentIsvcName != "" {
if modelService.DeploymentMode == deployment.ServerlessDeploymentMode ||
modelService.DeploymentMode == deployment.EmptyDeploymentMode {
currentIsvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.CurrentIsvcName, metav1.GetOptions{})
currentIsvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx,
modelService.CurrentIsvcName, metav1.GetOptions{})
if err != nil && !kerrors.IsNotFound(err) {
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToGetInferenceServiceStatus, isvcName))
}
Expand All @@ -234,10 +235,10 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
}

// check the cluster to see if the inference service has already been deployed
s, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.Name, metav1.GetOptions{})
s, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx, modelService.Name, metav1.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
s, err = c.kserveClient.InferenceServices(modelService.Namespace).Create(spec)
s, err = c.kserveClient.InferenceServices(modelService.Namespace).Create(ctx, spec, metav1.CreateOptions{})
if err != nil {
log.Errorf("unable to create inference service %s: %v", isvcName, err)
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToCreateInferenceService, isvcName))
Expand All @@ -260,7 +261,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
s, err = c.waitInferenceServiceReady(s)
if err != nil {
// remove created inferenceservice when got error
if err := c.deleteInferenceService(isvcName, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, isvcName, modelService.Namespace); err != nil {
log.Errorf("unable to delete inference service %s with error %v", isvcName, err)
}

Expand Down Expand Up @@ -288,7 +289,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (

// Delete previous inference service
if modelService.CurrentIsvcName != "" {
if err := c.deleteInferenceService(modelService.CurrentIsvcName, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, modelService.CurrentIsvcName, modelService.Namespace); err != nil {
log.Errorf("unable to delete prevision revision %s with error %v", modelService.CurrentIsvcName, err)
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToDeletePreviousInferenceService, modelService.CurrentIsvcName))
}
Expand All @@ -305,15 +306,16 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
}

func (c *controller) Delete(ctx context.Context, modelService *models.Service) (*models.Service, error) {
infSvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(modelService.Name, metav1.GetOptions{})
infSvc, err := c.kserveClient.InferenceServices(modelService.Namespace).Get(ctx, modelService.Name,
metav1.GetOptions{})
if err != nil {
if !kerrors.IsNotFound(err) {
return nil, errors.Wrapf(err, "unable to check status of inference service: %s", infSvc.Name)
}
return modelService, nil
}

if err := c.deleteInferenceService(modelService.Name, modelService.Namespace); err != nil {
if err := c.deleteInferenceService(ctx, modelService.Name, modelService.Namespace); err != nil {
return nil, err
}

Expand All @@ -336,9 +338,10 @@ func (c *controller) Delete(ctx context.Context, modelService *models.Service) (
return modelService, nil
}

func (c *controller) deleteInferenceService(serviceName string, namespace string) error {
func (c *controller) deleteInferenceService(ctx context.Context, serviceName string, namespace string) error {
gracePeriod := int64(deletionGracePeriodSecond)
err := c.kserveClient.InferenceServices(namespace).Delete(serviceName, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
err := c.kserveClient.InferenceServices(namespace).Delete(ctx, serviceName,
metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod})
if client.IgnoreNotFound(err) != nil {
return errors.Wrapf(err, "unable to delete inference service: %s %v", serviceName, err)
}
Expand Down Expand Up @@ -373,7 +376,7 @@ func (c *controller) waitInferenceServiceReady(service *kservev1beta1.InferenceS
}
}()

isvcWatcher, err := c.kserveClient.InferenceServices(service.Namespace).Watch(metav1.ListOptions{
isvcWatcher, err := c.kserveClient.InferenceServices(service.Namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", service.Name),
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions api/cluster/resource/templater.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (t *InferenceServiceTemplater) enrichStandardTransformerEnvVars(modelServic

func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: httpPath,
Scheme: "HTTP",
Expand All @@ -575,7 +575,7 @@ func createHTTPGetLivenessProbe(httpPath string, port int) *corev1.Probe {

func createGRPCLivenessProbe(port int) *corev1.Probe {
return &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{grpcHealthProbeCommand, fmt.Sprintf("-addr=:%d", port)},
},
Expand Down
7 changes: 5 additions & 2 deletions api/cmd/api/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gorm.io/gorm"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/clock"

"github.com/caraml-dev/merlin/api"
"github.com/caraml-dev/merlin/batch"
Expand Down Expand Up @@ -403,8 +403,11 @@ func initBatchControllers(cfg *config.Config, db *gorm.DB, mlpAPIClient mlp.APIC

batchJobTemplator := batch.NewBatchJobTemplater(cfg.BatchConfig)

ctl := batch.NewController(predictionJobStorage, mlpAPIClient, sparkClient, kubeClient, manifestManager,
ctl, err := batch.NewController(predictionJobStorage, mlpAPIClient, sparkClient, kubeClient, manifestManager,
envMetadata, batchJobTemplator)
if err != nil {
log.Panicf("unable to create batch controller: %v", err)
}
stopCh := make(chan struct{})
go ctl.Run(stopCh)

Expand Down
17 changes: 8 additions & 9 deletions api/cmd/inference-logger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ import (
"strings"
"time"

"github.com/caraml-dev/merlin/pkg/inference-logger/liveness"
merlinlogger "github.com/caraml-dev/merlin/pkg/inference-logger/logger"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/kelseyhightower/envconfig"
nrconfig "github.com/newrelic/newrelic-client-go/v2/pkg/config"
nrlog "github.com/newrelic/newrelic-client-go/v2/pkg/logs"
"github.com/pkg/errors"
"go.uber.org/zap"
network "knative.dev/networking/pkg"
"knative.dev/networking/pkg/http/header"
"knative.dev/networking/pkg/http/proxy"
pkgnet "knative.dev/pkg/network"
pkghandler "knative.dev/pkg/network/handlers"
"knative.dev/pkg/signals"
"knative.dev/serving/pkg/queue"
"knative.dev/serving/pkg/queue/health"
"knative.dev/serving/pkg/queue/readiness"

"github.com/caraml-dev/merlin/pkg/inference-logger/liveness"
merlinlogger "github.com/caraml-dev/merlin/pkg/inference-logger/logger"
)

var (
Expand Down Expand Up @@ -216,10 +216,9 @@ func buildServer(target *url.URL, dispatcher *merlinlogger.Dispatcher, loggingMo

httpProxy := httputil.NewSingleHostReverseProxy(target)
httpProxy.Transport = pkgnet.NewAutoTransport(maxIdleConns /* max-idle */, maxIdleConns /* max-idle-per-host */)
// nolint:staticcheck
httpProxy.ErrorHandler = pkgnet.ErrorHandler(log)
httpProxy.BufferPool = network.NewBufferPool()
httpProxy.FlushInterval = network.FlushInterval
httpProxy.ErrorHandler = pkghandler.Error(log)
httpProxy.BufferPool = proxy.NewBufferPool()
httpProxy.FlushInterval = proxy.FlushInterval

var composedHandler http.Handler = httpProxy
composedHandler = merlinlogger.NewLoggerHandler(dispatcher, loggingMode, composedHandler, log)
Expand All @@ -230,7 +229,7 @@ func buildServer(target *url.URL, dispatcher *merlinlogger.Dispatcher, loggingMo
drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
HealthCheckUAPrefixes: []string{network.ActivatorUserAgent},
HealthCheckUAPrefixes: []string{header.ActivatorUserAgent},
Inner: composedHandler,
HealthCheck: health.ProbeHandler(probe, false),
}
Expand Down
2 changes: 1 addition & 1 deletion api/cmd/transformer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/kelseyhightower/envconfig"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"github.com/prometheus/client_golang/prometheus/collectors/version"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
Expand Down
Loading

0 comments on commit 2beab49

Please sign in to comment.