Skip to content

Commit

Permalink
feat: add create dynamic client by config. (#521)
Browse files Browse the repository at this point in the history
* fix: refactor SetMaxConcurrent method in ParallelTasks. (#502)

* fix: refactor SetMaxConcurrent method in ParallelTasks.

* fix: add SetConcurrent annotation

* feat: add create dynamic client by config. (#506)

* fix: refactor SetMaxConcurrent method in ParallelTasks.

fix: add SetConcurrent annotation

feat: adjust the struct of multicluster interface

fix: remove testing and import cycle.

* fix: rename get multi cluster.
  • Loading branch information
kycheng authored Jan 8, 2024
1 parent 4a30a1d commit 2a0bac4
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 81 deletions.
2 changes: 1 addition & 1 deletion client/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestManagerContext(t *testing.T) {
clt := ManagerCtx(ctx)
g.Expect(clt).To(BeNil())

mgr := NewManager(ctx, nil, nil)
mgr := NewManager(ctx, nil, nil, nil)
ctx = WithManager(ctx, mgr)
g.Expect(ManagerCtx(ctx)).To(Equal(mgr))
}
Expand Down
31 changes: 25 additions & 6 deletions client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"github.com/katanomi/pkg/multicluster"
apiserverrequest "k8s.io/apiserver/pkg/endpoints/request"

"k8s.io/apiserver/pkg/authentication/user"
Expand All @@ -45,10 +46,17 @@ type GetBaseConfigFunc func() (*rest.Config, error)
// GetConfigFunc retrieves a configuration based on a request
type GetConfigFunc func(req *restful.Request, baseConfig GetBaseConfigFunc) (*rest.Config, error)

// NewClusterRegistryClientFunc constructs a multi cluster client based on the specified config.
type NewClusterRegistryClientFunc func(*rest.Config) (multicluster.Interface, error)

// Manager dynamically generates client based on user requests
type Manager struct {
GetConfig GetConfigFunc
// GetConfig retrieves a configuration based on a request
GetConfig GetConfigFunc
// GetBasicConfig retrieves a configuration based on a request
GetBasicConfig GetBaseConfigFunc
// NewClusterRegistryClient constructs a multi cluster client based on the specified config.
NewClusterRegistryClient NewClusterRegistryClientFunc
*zap.SugaredLogger
}

Expand All @@ -67,8 +75,7 @@ func WithCtxManagerFilters(ctx context.Context, ws *restful.WebService) error {
}

// NewManager initializes a new manager based on func
func NewManager(ctx context.Context, get GetConfigFunc, baseConfig GetBaseConfigFunc) *Manager {

func NewManager(ctx context.Context, get GetConfigFunc, baseConfig GetBaseConfigFunc, newClusterRegistryClient NewClusterRegistryClientFunc) *Manager {
if get == nil {
get = FromBearerToken
}
Expand All @@ -83,9 +90,10 @@ func NewManager(ctx context.Context, get GetConfigFunc, baseConfig GetBaseConfig
baseConfig = config.GetConfig
}
return &Manager{
SugaredLogger: logging.FromContext(ctx),
GetConfig: configGetter,
GetBasicConfig: baseConfig,
SugaredLogger: logging.FromContext(ctx),
GetConfig: configGetter,
GetBasicConfig: baseConfig,
NewClusterRegistryClient: newClusterRegistryClient,
}
}

Expand Down Expand Up @@ -157,6 +165,17 @@ func ManagerFilter(ctx context.Context, mgr *Manager) restful.FilterFunction {
}
reqCtx = WithDynamicClient(reqCtx, dynamicClient)

if mgr.NewClusterRegistryClient != nil {
multiClusterClient, err := mgr.NewClusterRegistryClient(config)
if err != nil {
log.Errorw("cannot get multi cluster client", "err", err)
kerrors.HandleError(req, resp, err)
return
}
reqCtx = multicluster.WithMultiCluster(reqCtx, multiClusterClient)
log.Debugw("get multi cluster", "totalElapsed", time.Since(start).String())
}

req.Request = req.Request.WithContext(reqCtx)

log.Debugw("config,client,dynamicclient context done", "totalElapsed", time.Since(start).String())
Expand Down
42 changes: 30 additions & 12 deletions client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,64 @@ import (
"testing"

"github.com/emicklei/go-restful/v3"
"github.com/golang/mock/gomock"
"github.com/katanomi/pkg/multicluster"
multiclustermock "github.com/katanomi/pkg/testing/mock/github.com/katanomi/pkg/multicluster"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

// . "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"knative.dev/pkg/injection"
)

const (
mockToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJlbWFpbCI6ImRldiJ9.v5leOJQ8mxkOzWW-dWWFfPGPn__0eYUGtDCdwx1LWkM"
)

func EmptyHandler(rq *restful.Request, rp *restful.Response) {
}

func TestManagerFilter(t *testing.T) {
// TODO: Find a better and more reliable way to do these
t.Skip()
os.Setenv("KUBERNETES_MASTER", "127.0.0.1:16003")
target := func(req *restful.Request, resp *restful.Response) {}
chain := &restful.FilterChain{Target: target}

t.Run("should succeed", func(t *testing.T) {
g := NewGomegaWithT(t)
ctx := context.TODO()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
dynamic := multiclustermock.NewMockInterface(ctrl)

mgr := NewManager(ctx, FromBearerToken, func() (*rest.Config, error) {
return &rest.Config{
Host: "https://127.0.0.1:6443",
Username: "abc",
Password: "def",
}, nil
})
}, func(config *rest.Config) (multicluster.Interface, error) { return dynamic, nil })
ws := new(restful.WebService)
ws.Consumes(restful.MIME_JSON)
clt := fake.NewClientBuilder().Build()
ctx = WithClient(ctx, clt)
ws.Route(ws.GET("/config").Filter(ManagerFilter(ctx, mgr)).To(EmptyHandler))
restful.Add(ws)
testReq := httptest.NewRequest(http.MethodGet, "/config", nil)
req := restful.NewRequest(testReq)
req.Request.Header.Set("Authorization", "Bearer 0123456789")
req.Request.Header.Set("Authorization", "Bearer "+mockToken)
req.Request = req.Request.WithContext(ctx)
resp := restful.NewResponse(httptest.NewRecorder())

restful.DefaultContainer.ServeHTTP(resp, testReq)
// ManagerFilter(ctx, mgr)(req, resp, chain)
// restful.DefaultContainer.ServeHTTP(resp, testReq)
ManagerFilter(ctx, mgr)(req, resp, chain)

config := injection.GetConfig(testReq.Context())
config := injection.GetConfig(req.Request.Context())
g.Expect(config).ToNot(BeNil())
dynamicClient := multicluster.MultiCluster(req.Request.Context())
g.Expect(dynamicClient).ToNot(BeNil())
g.Expect(resp.StatusCode()).ToNot(Equal(http.StatusInternalServerError))
g.Expect(config.BearerToken).To(Equal("0123456789"))
g.Expect(config.BearerToken).To(Equal(mockToken))
})

t.Run("should return error", func(t *testing.T) {
Expand All @@ -74,21 +90,23 @@ func TestManagerFilter(t *testing.T) {
mgr := NewManager(ctx, FromBearerToken, func() (*rest.Config, error) {
// will return an error
return &rest.Config{}, nil
}, func(c *rest.Config) (multicluster.Interface, error) {
return nil, nil
})
req := restful.NewRequest(httptest.NewRequest(http.MethodGet, "http://example.com", nil))
req.Request = req.Request.WithContext(ctx)
resp := restful.NewResponse(httptest.NewRecorder())
ManagerFilter(ctx, mgr)(req, resp, chain)
g.Expect(resp.StatusCode()).To(Equal(http.StatusUnauthorized))
g.Expect(resp.StatusCode()).To(Equal(http.StatusNotAcceptable))
config := injection.GetConfig(req.Request.Context())
g.Expect(config).To(BeNil())
dynamic := multicluster.MultiCluster(req.Request.Context())
g.Expect(dynamic).To(BeNil())
})

}

func TestUserFromBearerToken(t *testing.T) {
token := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJlbWFpbCI6ImRldiJ9.v5leOJQ8mxkOzWW-dWWFfPGPn__0eYUGtDCdwx1LWkM"
info, err := UserFromBearerToken(token)
info, err := UserFromBearerToken(mockToken)
if err != nil {
t.Errorf("error: %s", err)
}
Expand Down
11 changes: 9 additions & 2 deletions multicluster/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (

// Interface interface for a multi-cluster functionality
type Interface interface {
GetConfig(ctx context.Context, clusterRef *corev1.ObjectReference) (config *rest.Config, err error)
GetDynamic(ctx context.Context, clusterRef *corev1.ObjectReference) (dyn dynamic.Interface, err error)
GetConfigFromCluster(ctx context.Context, cluster *unstructured.Unstructured) (config *rest.Config, err error)

// ListClustersNamespaces lists all namespaces in all clusters
// TODO: add this method to the interface and implementation
Expand All @@ -47,6 +45,9 @@ type Interface interface {

// NamespaceClustersGetter for getting list of clusters related by special namespace
NamespaceClustersGetter

// ConfigGetter for getting config for a clusterRef
ConfigGetter
}

// NamespaceClustersGetter interface get list of clusters related by special namespace
Expand All @@ -58,3 +59,9 @@ type NamespaceClustersGetter interface {
type ClientGetter interface {
GetClient(ctx context.Context, clusterRef *corev1.ObjectReference, scheme *runtime.Scheme) (clt client.Client, err error)
}

// ConfigGetter interface get config for a clusterRef
type ConfigGetter interface {
GetConfig(ctx context.Context, clusterRef *corev1.ObjectReference) (config *rest.Config, err error)
GetConfigFromCluster(ctx context.Context, cluster *unstructured.Unstructured) (config *rest.Config, err error)
}
12 changes: 12 additions & 0 deletions parallel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,23 @@ func (p *ParallelTasks) FailFast() *ParallelTasks {
return p
}

// SetConcurrent set the number of concurrency
func (p *ParallelTasks) SetConcurrent(count int) *ParallelTasks {
p.Options.ConcurrencyCount = count
return p
}

// SetMaxConcurrent set the number of concurrency.
// if count is greater than max, max is used.
func (p *ParallelTasks) SetMaxConcurrent(count int, max int) *ParallelTasks {
if count > max {
count = max
}

p.Options.ConcurrencyCount = count
return p
}

// Context will set context , up to now , task is not support to cancel
// if you cancel from context, wait will return immediately
func (p *ParallelTasks) Context(ctx context.Context) *ParallelTasks {
Expand Down
29 changes: 28 additions & 1 deletion parallel/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@ var _ = Describe("P().Do().Wait()", func() {
})

Context("when set conccurrent", func() {

flag4 := &executeFlag{}
flag5 := &executeFlag{}
flag6 := &executeFlag{}
Expand Down Expand Up @@ -289,5 +288,33 @@ var _ = Describe("P().Do().Wait()", func() {
}
})
})
})

var _ = Describe("SetMaxConcurrent", func() {
var (
ptasks *parallel.ParallelTasks
zapLog, _ = zap.NewDevelopment()
log = zapLog.Sugar()
)

BeforeEach(func() {
ptasks = parallel.P(log, "custom case")
})

Context("SetMaxConcurrent", func() {
It("when max great concurrent", func() {
ptasks.SetMaxConcurrent(5, 6)
Expect(ptasks.Options.ConcurrencyCount).To(BeEquivalentTo(5))
})

It("when max less concurrent", func() {
ptasks.SetMaxConcurrent(5, 4)
Expect(ptasks.Options.ConcurrencyCount).To(BeEquivalentTo(4))
})

It("when max equal concurrent", func() {
ptasks.SetMaxConcurrent(5, 5)
Expect(ptasks.Options.ConcurrencyCount).To(BeEquivalentTo(5))
})
})
})
2 changes: 1 addition & 1 deletion sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
func GetClientManager(ctx context.Context) (context.Context, *kclient.Manager) {
clientManager := kclient.ManagerCtx(ctx)
if clientManager == nil {
clientManager = kclient.NewManager(ctx, nil, nil)
clientManager = kclient.NewManager(ctx, nil, nil, nil)
ctx = kclient.WithManager(ctx, clientManager)
}
return ctx, clientManager
Expand Down
23 changes: 0 additions & 23 deletions testing/config_manager.go

This file was deleted.

35 changes: 0 additions & 35 deletions testing/config_manager_test.go

This file was deleted.

0 comments on commit 2a0bac4

Please sign in to comment.