Skip to content

Commit

Permalink
Add support for service-mirror selectors (#4795)
Browse files Browse the repository at this point in the history
* Add selector support

Signed-off-by: Alex Leong <[email protected]>

* Removed unused labels

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored Jul 30, 2020
1 parent 6307868 commit a1543b3
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 450 deletions.
266 changes: 16 additions & 250 deletions cli/cmd/multicluster.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cmd

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -25,8 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/helm/pkg/chartutil"
Expand Down Expand Up @@ -72,11 +68,7 @@ type (
logLevel string
controlPlaneVersion string
dockerRegistry string
}

exportServiceOptions struct {
gatewayNamespace string
gatewayName string
selector string
}

gatewaysOptions struct {
Expand Down Expand Up @@ -117,6 +109,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) {
dockerRegistry: defaultDockerRegistry,
serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
logLevel: defaults.LogLevel,
selector: k8s.DefaultExportedServiceSelector,
}, nil
}

Expand Down Expand Up @@ -517,11 +510,6 @@ func newLinkCommand() *cobra.Command {
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: opts.namespace,
Annotations: map[string]string{
k8s.RemoteClusterNameLabel: opts.clusterName,
k8s.RemoteClusterDomainAnnotation: configMap.Global.ClusterDomain,
k8s.RemoteClusterLinkerdNamespaceAnnotation: controlPlaneNamespace,
},
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
Expand Down Expand Up @@ -561,6 +549,11 @@ func newLinkCommand() *cobra.Command {
return err
}

selector, err := metav1.ParseToLabelSelector(opts.selector)
if err != nil {
return err
}

link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
Expand All @@ -572,9 +565,14 @@ func newLinkCommand() *cobra.Command {
GatewayPort: gatewayPort,
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
Selector: *selector,
}

linkOut, err := yaml.Marshal(link.ToUnstructured().Object)
obj, err := link.ToUnstructured()
if err != nil {
return err
}
linkOut, err := yaml.Marshal(obj.Object)
if err != nil {
return err
}
Expand Down Expand Up @@ -630,229 +628,7 @@ func newLinkCommand() *cobra.Command {
cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, "Docker registry to pull service mirror controller image from")

return cmd
}

type exportReport struct {
resourceKind string
resourceName string
exported bool
}

func transform(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) {
var metaType metav1.TypeMeta

if err := yaml.Unmarshal(bytes, &metaType); err != nil {
return nil, nil, err
}

if metaType.Kind == "Service" {
var service corev1.Service
if err := yaml.Unmarshal(bytes, &service); err != nil {
return nil, nil, err
}

if service.Annotations == nil {
service.Annotations = map[string]string{}
}
report := &exportReport{
resourceKind: strings.ToLower(metaType.Kind),
resourceName: service.Name,
}

if service.Labels != nil {
if _, isMirroredResource := service.Labels[k8s.MirroredResourceLabel]; isMirroredResource {
report.exported = false
return bytes, []*exportReport{report}, nil
}
}

service.Annotations[k8s.GatewayNameAnnotation] = gatewayName
service.Annotations[k8s.GatewayNsAnnotation] = gatewayNamespace

transformed, err := yaml.Marshal(service)

if err != nil {
return nil, nil, err
}
report.exported = true
return transformed, []*exportReport{report}, nil
}

report := &exportReport{
resourceKind: strings.ToLower(metaType.Kind),
exported: false,
}

return bytes, []*exportReport{report}, nil
}

func generateReport(reports []*exportReport, reportsOut io.Writer) error {
unexportedResources := map[string]int{}

for _, r := range reports {
if r.exported {
if _, err := reportsOut.Write([]byte(fmt.Sprintf("%s \"%s\" exported\n", r.resourceKind, r.resourceName))); err != nil {
return err
}
} else {
if val, ok := unexportedResources[r.resourceKind]; ok {
unexportedResources[r.resourceKind] = val + 1
} else {
unexportedResources[r.resourceKind] = 1
}
}
}

if len(unexportedResources) > 0 {
reportsOut.Write([]byte("\n"))
reportsOut.Write([]byte("Number of skipped resources:\n"))
}

for res, num := range unexportedResources {
reportsOut.Write([]byte(fmt.Sprintf("%ss: %d\n", res, num)))
}

return nil
}

func transformList(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) {
var sourceList corev1.List
if err := yaml.Unmarshal(bytes, &sourceList); err != nil {
return nil, nil, err
}

reports := []*exportReport{}
items := []runtime.RawExtension{}

for _, item := range sourceList.Items {
result, report, err := transform(item.Raw, gatewayName, gatewayNamespace)
if err != nil {
return nil, nil, err
}

exported, err := yaml.YAMLToJSON(result)
if err != nil {
return nil, nil, err
}

items = append(items, runtime.RawExtension{Raw: exported})
reports = append(reports, report...)
}

sourceList.Items = items
result, err := yaml.Marshal(sourceList)
if err != nil {
return nil, nil, err
}
return result, reports, nil
}

func processExportYaml(in io.Reader, out io.Writer, gatewayName, gatewayNamespace string) ([]*exportReport, error) {
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))
var reports []*exportReport
// Iterate over all YAML objects in the input
for {
// Read a single YAML object
bytes, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

isList, err := kindIsList(bytes)
if err != nil {
return nil, err
}

var result []byte
var currentReports []*exportReport

if isList {
result, currentReports, err = transformList(bytes, gatewayName, gatewayNamespace)

} else {
result, currentReports, err = transform(bytes, gatewayName, gatewayNamespace)
}

if err != nil {
return nil, err
}

reports = append(reports, currentReports...)
out.Write(result)
out.Write([]byte("---\n"))
}

return reports, nil
}

func transformExportInput(inputs []io.Reader, errWriter, outWriter io.Writer, gatewayName, gatewayNamespace string) int {
postTransformBuf := &bytes.Buffer{}
reportBuf := &bytes.Buffer{}
var finalReports []*exportReport
for _, input := range inputs {
reports, err := processExportYaml(input, postTransformBuf, gatewayName, gatewayNamespace)
if err != nil {
fmt.Fprintf(errWriter, "Error transforming resources: %v\n", err)
return 1
}
_, err = io.Copy(outWriter, postTransformBuf)

if err != nil {
fmt.Fprintf(errWriter, "Error printing YAML: %v\n", err)
return 1
}

finalReports = append(finalReports, reports...)
}

// print error report after yaml output, for better visibility
if err := generateReport(finalReports, reportBuf); err != nil {
fmt.Fprintf(errWriter, "Error generating reports: %v\n", err)
return 1
}
errWriter.Write([]byte("\n"))
io.Copy(errWriter, reportBuf)
errWriter.Write([]byte("\n"))
return 0
}

func newExportServiceCommand() *cobra.Command {
opts := exportServiceOptions{}

cmd := &cobra.Command{
Use: "export-service",
Short: "Exposes a service to be mirrored",
RunE: func(cmd *cobra.Command, args []string) error {

if len(args) < 1 {
return fmt.Errorf("please specify a kubernetes resource file")
}

if opts.gatewayName == "" {
return errors.New("The --gateway-name flag needs to be set")
}

if opts.gatewayNamespace == "" {
return errors.New("The --gateway-namespace flag needs to be set")
}

in, err := read(args[0])
if err != nil {
return err
}
exitCode := transformExportInput(in, stderr, stdout, opts.gatewayName, opts.gatewayNamespace)
os.Exit(exitCode)
return nil
},
}

cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", "linkerd-gateway", "the name of the gateway")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "the namespace of the gateway")
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")

return cmd
}
Expand All @@ -870,24 +646,14 @@ This command provides subcommands to manage the multicluster support
functionality of Linkerd. You can use it to install the service mirror
components on a cluster, manage credentials and link clusters together.`,
Example: ` # Install multicluster addons.
linkerd --context=cluster-a cluster install | kubectl --context=cluster-a apply -f -
linkerd --context=cluster-a multicluster install | kubectl --context=cluster-a apply -f -
# Extract mirroring cluster credentials from cluster A and install them on cluster B
linkerd --context=cluster-a cluster link --cluster-name=target | kubectl apply --context=cluster-b -f -
# Export services from cluster to be available to other clusters
kubectl get svc -o yaml | linkerd export-service - | kubectl apply -f -
# Exporting a file from a remote URL
linkerd export-service http://url.to/yml | kubectl apply -f -
# Exporting all the resources inside a folder and its sub-folders.
linkerd export-service <folder> | kubectl apply -f -`,
linkerd --context=cluster-a multicluster link --cluster-name=target | kubectl apply --context=cluster-b -f -`,
}

multiclusterCmd.AddCommand(newLinkCommand())
multiclusterCmd.AddCommand(newMulticlusterInstallCommand())
multiclusterCmd.AddCommand(newExportServiceCommand())
multiclusterCmd.AddCommand(newGatewaysCommand())
multiclusterCmd.AddCommand(newAllowCommand())
return multiclusterCmd
Expand Down
8 changes: 1 addition & 7 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
// metrics labels
service = "service"
namespace = "namespace"
targetGatewayNamespace = "target_gateway_namespace"
targetGateway = "target_gateway"
targetCluster = "target_cluster"
targetService = "target_service"
targetServiceNamespace = "target_service_namespace"
Expand Down Expand Up @@ -669,16 +667,12 @@ func metricLabels(resource interface{}) map[string]string {

labels := map[string]string{service: serviceName, namespace: ns}

gateway, hasRemoteGateway := resLabels[consts.RemoteGatewayNameLabel]
gatewayNs, hasRemoteGatwayNs := resLabels[consts.RemoteGatewayNsLabel]
remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel]
serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName]

if hasRemoteGateway && hasRemoteGatwayNs && hasRemoteClusterName && hasServiceFqn {
if hasRemoteClusterName && hasServiceFqn {
// this means we are looking at Endpoints created for the purpose of mirroring
// an out of cluster service.
labels[targetGatewayNamespace] = gatewayNs
labels[targetGateway] = gateway
labels[targetCluster] = remoteClusterName

fqParts := strings.Split(serviceFqn, ".")
Expand Down
Loading

0 comments on commit a1543b3

Please sign in to comment.