Skip to content

Commit

Permalink
[api] Allow setting static external coordinator (#242)
Browse files Browse the repository at this point in the history
We allow changing the selector that the coordinator service uses,
enabling users to control their M3DB clusters with coordinators that
live outside the cluster. This improves the ability to use
out-of-cluster coordinators, but has the limitation that the coordinator
must be in the same namespace as the cluster.

This PR extends external coordinator support to allow configuring a
static endpoint. This allows the user to manage the cluster with a
coordinator in a different namespace, i.e.
`global-coordinator.$OTHER_NS:7201`.
  • Loading branch information
schallert authored Oct 8, 2020
1 parent 27c3036 commit 1432e04
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ ExternalCoordinatorConfig defines parameters for using an external coordinator t
| Field | Description | Scheme | Required |
| ----- | ----------- | ------ | -------- |
| selector | | map[string]string | true |
| serviceEndpoint | | string | false |

[Back to TOC](#table-of-contents)

Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ type ClusterSpec struct {
// Setup this db cluster, but do not assume a co-located coordinator. Instead
// provide a selector here so we can point to a separate coordinator service.
type ExternalCoordinatorConfig struct {
Selector map[string]string `json:"selector,omityempty"`
Selector map[string]string `json:"selector,omityempty"`
ServiceEndpoint string `json:"serviceEndpoint,omitempty"`
}

// NodeAffinityTerm represents a node label and a set of label values, any of
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"sync"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/k8sops"
"github.com/m3db/m3db-operator/pkg/k8sops/m3db"
"github.com/m3db/m3db-operator/pkg/m3admin"
Expand All @@ -37,8 +38,6 @@ import (
m3topic "github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
)

Expand All @@ -54,26 +53,28 @@ type multiAdminClient struct {
plClientFn func(...placement.Option) (placement.Client, error)
tpClientFn func(...topic.Option) (topic.Client, error)

clusterKeyFn func(metav1.ObjectMetaAccessor, string) string
clusterURLFn func(metav1.ObjectMetaAccessor) string
clusterKeyFn func(*myspec.M3DBCluster, string) string
clusterURLFn func(*myspec.M3DBCluster) string

adminClientFn func(...m3admin.Option) m3admin.Client
adminOpts []m3admin.Option
logger *zap.Logger
}

// clusterKey returns a map key for a given cluster.
func clusterKey(obj metav1.ObjectMetaAccessor, url string) string {
m := obj.GetObjectMeta()
return m.GetNamespace() + "/" + m.GetName() + "/" + url
func clusterKey(cluster *myspec.M3DBCluster, url string) string {
return cluster.Namespace + "/" + cluster.Name + "/" + url
}

// clusterURL returns the URL to hit
func clusterURL(cluster metav1.ObjectMetaAccessor) string {
m := cluster.GetObjectMeta()
serviceName := m3db.CoordinatorServiceName(m.GetName())
func clusterURL(cluster *myspec.M3DBCluster) string {
if cluster.Spec.ExternalCoordinator != nil && cluster.Spec.ExternalCoordinator.ServiceEndpoint != "" {
return "http://" + cluster.Spec.ExternalCoordinator.ServiceEndpoint
}

serviceName := m3db.CoordinatorServiceName(cluster.Name)
urlFmt := "http://%s.%s:%d"
url := fmt.Sprintf(urlFmt, serviceName, m.GetNamespace(), m3db.PortM3Coordinator)
url := fmt.Sprintf(urlFmt, serviceName, cluster.Namespace, m3db.PortM3Coordinator)
return url
}

Expand All @@ -83,11 +84,10 @@ func newAdminClient(opts ...m3admin.Option) m3admin.Client {

// clusterURLProxy returns a URL for communicating with a cluster via an
// intermediary kubectl proxy.
func clusterURLProxy(cluster metav1.ObjectMetaAccessor) string {
m := cluster.GetObjectMeta()
serviceName := m3db.CoordinatorServiceName(m.GetName())
func clusterURLProxy(cluster *myspec.M3DBCluster) string {
serviceName := m3db.CoordinatorServiceName(cluster.Name)
urlFmt := "http://localhost:8001/api/v1/namespaces/%s/services/%s:coordinator/proxy"
url := fmt.Sprintf(urlFmt, m.GetNamespace(), serviceName)
url := fmt.Sprintf(urlFmt, cluster.Namespace, serviceName)
return url
}

Expand All @@ -107,13 +107,13 @@ func newMultiAdminClient(adminOpts []m3admin.Option, logger *zap.Logger) *multiA
}
}

func (m *multiAdminClient) adminClientForCluster(cluster metav1.ObjectMetaAccessor) m3admin.Client {
func (m *multiAdminClient) adminClientForCluster(cluster *myspec.M3DBCluster) m3admin.Client {
env := k8sops.DefaultM3ClusterEnvironmentName(cluster)
opts := append(m.adminOpts, m3admin.WithEnvironment(env))
return m.adminClientFn(opts...)
}

func (m *multiAdminClient) namespaceClientForCluster(cluster metav1.ObjectMetaAccessor) namespace.Client {
func (m *multiAdminClient) namespaceClientForCluster(cluster *myspec.M3DBCluster) namespace.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down Expand Up @@ -147,7 +147,7 @@ func (m *multiAdminClient) namespaceClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAccessor) placement.Client {
func (m *multiAdminClient) placementClientForCluster(cluster *myspec.M3DBCluster) placement.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down Expand Up @@ -180,7 +180,7 @@ func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) topicClientForCluster(cluster metav1.ObjectMetaAccessor) topic.Client {
func (m *multiAdminClient) topicClientForCluster(cluster *myspec.M3DBCluster) topic.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ import (

func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient {
m := newMultiAdminClient(nil, zap.NewNop())
m.clusterKeyFn = func(cl metav1.ObjectMetaAccessor, url string) string {
m.clusterKeyFn = func(cl *myspec.M3DBCluster, url string) string {
return cl.GetObjectMeta().GetName()
}
m.clusterURLFn = func(metav1.ObjectMetaAccessor) string {
m.clusterURLFn = func(*myspec.M3DBCluster) string {
return url
}
m.adminClientFn = func(...m3admin.Option) m3admin.Client {
Expand All @@ -65,6 +65,12 @@ func TestClusterURL(t *testing.T) {
cluster.Namespace = "foo"
url := clusterURL(cluster)
assert.Equal(t, "http://m3coordinator-a.foo:7201", url)

cluster.Spec.ExternalCoordinator = &myspec.ExternalCoordinatorConfig{
ServiceEndpoint: "my-custom-coordinator.other-namespace:1234",
}
url = clusterURL(cluster)
assert.Equal(t, "http://my-custom-coordinator.other-namespace:1234", url)
}

func TestClusterURLProxy(t *testing.T) {
Expand Down

0 comments on commit 1432e04

Please sign in to comment.