Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] Allow setting static external coordinator #242

Merged
merged 2 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ ExternalCoordinatorConfig defines parameters for using an external coordinator t
| Field | Description | Scheme | Required |
| ----- | ----------- | ------ | -------- |
| selector | | map[string]string | true |
| serviceEndpoint | | string | false |

[Back to TOC](#table-of-contents)

Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/m3dboperator/v1alpha1/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ type ClusterSpec struct {
// Setup this db cluster, but do not assume a co-located coordinator. Instead
// provide a selector here so we can point to a separate coordinator service.
type ExternalCoordinatorConfig struct {
Selector map[string]string `json:"selector,omityempty"`
Selector map[string]string `json:"selector,omityempty"`
ServiceEndpoint string `json:"serviceEndpoint,omitempty"`
}

// NodeAffinityTerm represents a node label and a set of label values, any of
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"sync"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/k8sops"
"github.com/m3db/m3db-operator/pkg/k8sops/m3db"
"github.com/m3db/m3db-operator/pkg/m3admin"
Expand All @@ -37,8 +38,6 @@ import (
m3topic "github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"go.uber.org/zap"
)

Expand All @@ -54,26 +53,28 @@ type multiAdminClient struct {
plClientFn func(...placement.Option) (placement.Client, error)
tpClientFn func(...topic.Option) (topic.Client, error)

clusterKeyFn func(metav1.ObjectMetaAccessor, string) string
clusterURLFn func(metav1.ObjectMetaAccessor) string
clusterKeyFn func(*myspec.M3DBCluster, string) string
clusterURLFn func(*myspec.M3DBCluster) string

adminClientFn func(...m3admin.Option) m3admin.Client
adminOpts []m3admin.Option
logger *zap.Logger
}

// clusterKey returns a map key for a given cluster.
func clusterKey(obj metav1.ObjectMetaAccessor, url string) string {
m := obj.GetObjectMeta()
return m.GetNamespace() + "/" + m.GetName() + "/" + url
func clusterKey(cluster *myspec.M3DBCluster, url string) string {
return cluster.Namespace + "/" + cluster.Name + "/" + url
}

// clusterURL returns the URL to hit
func clusterURL(cluster metav1.ObjectMetaAccessor) string {
m := cluster.GetObjectMeta()
serviceName := m3db.CoordinatorServiceName(m.GetName())
func clusterURL(cluster *myspec.M3DBCluster) string {
if cluster.Spec.ExternalCoordinator != nil && cluster.Spec.ExternalCoordinator.ServiceEndpoint != "" {
return "http://" + cluster.Spec.ExternalCoordinator.ServiceEndpoint
}

serviceName := m3db.CoordinatorServiceName(cluster.Name)
urlFmt := "http://%s.%s:%d"
url := fmt.Sprintf(urlFmt, serviceName, m.GetNamespace(), m3db.PortM3Coordinator)
url := fmt.Sprintf(urlFmt, serviceName, cluster.Namespace, m3db.PortM3Coordinator)
return url
}

Expand All @@ -83,11 +84,10 @@ func newAdminClient(opts ...m3admin.Option) m3admin.Client {

// clusterURLProxy returns a URL for communicating with a cluster via an
// intermediary kubectl proxy.
func clusterURLProxy(cluster metav1.ObjectMetaAccessor) string {
m := cluster.GetObjectMeta()
serviceName := m3db.CoordinatorServiceName(m.GetName())
func clusterURLProxy(cluster *myspec.M3DBCluster) string {
serviceName := m3db.CoordinatorServiceName(cluster.Name)
urlFmt := "http://localhost:8001/api/v1/namespaces/%s/services/%s:coordinator/proxy"
url := fmt.Sprintf(urlFmt, m.GetNamespace(), serviceName)
url := fmt.Sprintf(urlFmt, cluster.Namespace, serviceName)
return url
}

Expand All @@ -107,13 +107,13 @@ func newMultiAdminClient(adminOpts []m3admin.Option, logger *zap.Logger) *multiA
}
}

func (m *multiAdminClient) adminClientForCluster(cluster metav1.ObjectMetaAccessor) m3admin.Client {
func (m *multiAdminClient) adminClientForCluster(cluster *myspec.M3DBCluster) m3admin.Client {
env := k8sops.DefaultM3ClusterEnvironmentName(cluster)
opts := append(m.adminOpts, m3admin.WithEnvironment(env))
return m.adminClientFn(opts...)
}

func (m *multiAdminClient) namespaceClientForCluster(cluster metav1.ObjectMetaAccessor) namespace.Client {
func (m *multiAdminClient) namespaceClientForCluster(cluster *myspec.M3DBCluster) namespace.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down Expand Up @@ -147,7 +147,7 @@ func (m *multiAdminClient) namespaceClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAccessor) placement.Client {
func (m *multiAdminClient) placementClientForCluster(cluster *myspec.M3DBCluster) placement.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down Expand Up @@ -180,7 +180,7 @@ func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) topicClientForCluster(cluster metav1.ObjectMetaAccessor) topic.Client {
func (m *multiAdminClient) topicClientForCluster(cluster *myspec.M3DBCluster) topic.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ import (

func newTestAdminClient(cl m3admin.Client, url string) *multiAdminClient {
m := newMultiAdminClient(nil, zap.NewNop())
m.clusterKeyFn = func(cl metav1.ObjectMetaAccessor, url string) string {
m.clusterKeyFn = func(cl *myspec.M3DBCluster, url string) string {
return cl.GetObjectMeta().GetName()
}
m.clusterURLFn = func(metav1.ObjectMetaAccessor) string {
m.clusterURLFn = func(*myspec.M3DBCluster) string {
return url
}
m.adminClientFn = func(...m3admin.Option) m3admin.Client {
Expand All @@ -65,6 +65,12 @@ func TestClusterURL(t *testing.T) {
cluster.Namespace = "foo"
url := clusterURL(cluster)
assert.Equal(t, "http://m3coordinator-a.foo:7201", url)

cluster.Spec.ExternalCoordinator = &myspec.ExternalCoordinatorConfig{
ServiceEndpoint: "my-custom-coordinator.other-namespace:1234",
}
url = clusterURL(cluster)
assert.Equal(t, "http://my-custom-coordinator.other-namespace:1234", url)
}

func TestClusterURLProxy(t *testing.T) {
Expand Down