Skip to content

Commit

Permalink
MESH-5198: implement touchpoints (#768)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Tay <[email protected]>
Co-authored-by: Anubhav Aeron <[email protected]>
Co-authored-by: nirvanagit <[email protected]>

Adding operator functionality
  • Loading branch information
Ryan Tay committed Aug 14, 2024
1 parent 033be91 commit c4d191c
Show file tree
Hide file tree
Showing 22 changed files with 695 additions and 165 deletions.
2 changes: 2 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().BoolVar(&params.AdmiralOperatorMode, "admiral_operator_mode", false, "Enable/Disable admiral operator functionality")
rootCmd.PersistentFlags().StringVar(&params.OperatorSyncNamespace, "operator_sync_namespace", "admiral-operator-sync",
"Namespace in which Admiral Operator will put its generated configurations")
rootCmd.PersistentFlags().StringVar(&params.OperatorSecretFilterTags, "operator_secret_filter_tags", "admiral/syncoperator",
"Filter tags for the specific admiral operator namespace secret to watch")
return rootCmd
}

Expand Down
2 changes: 1 addition & 1 deletion admiral/pkg/clusters/configSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func updateRegistryConfigForClusterPerEnvironment(ctxLogger *logrus.Entry, remot
task := "updateRegistryConfigForClusterPerEnvironment"
defer util.LogElapsedTimeForTask(ctxLogger, task, registryConfig.IdentityName, "", "", "processingTime")()
k8sClient, err := remoteRegistry.ClientLoader.LoadKubeClientFromPath(common.GetKubeconfigPath())
if err != nil && common.GetSecretFilterTags() == "admiral/syncrtay" {
if err != nil && common.GetSecretFilterTags() == common.GetOperatorSecretFilterTags() {
ctxLogger.Infof(common.CtxLogFormat, task, registryConfig.IdentityName, "", "", "unable to get kube client")
return errors.Wrap(err, "unable to get kube client")
}
Expand Down
13 changes: 9 additions & 4 deletions admiral/pkg/clusters/configwriter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clusters

import (
"fmt"
"sort"
"strconv"
"strings"
Expand All @@ -12,6 +13,8 @@ import (
networkingV1Alpha3 "istio.io/api/networking/v1alpha3"
)

const typeLabel = "type"

// IstioSEBuilder is an interface to construct Service Entry objects
// from IdentityConfig objects. It can construct multiple Service Entries
// from an IdentityConfig or construct just one given a IdentityConfigEnvironment.
Expand All @@ -36,7 +39,7 @@ func (b *ServiceEntryBuilder) BuildServiceEntriesFromIdentityConfig(ctxLogger *l
)
ctxLogger.Infof(common.CtxLogFormat, "buildServiceEntry", identity, common.GetSyncNamespace(), b.ClientCluster, "Beginning to build the SE spec")
ingressEndpoints, err := getIngressEndpoints(identityConfig.Clusters)
if err != nil {
if err != nil || len(ingressEndpoints) == 0 {
return serviceEntries, err
}
_, isServerOnClientCluster := ingressEndpoints[b.ClientCluster]
Expand Down Expand Up @@ -110,7 +113,10 @@ func getServiceEntryEndpoint(
var err error
endpoint := ingressEndpoints[serverCluster]
tmpEp := endpoint.DeepCopy()
tmpEp.Labels["type"] = identityConfigEnvironment.Type
tmpEp.Labels[typeLabel] = identityConfigEnvironment.Type
if len(identityConfigEnvironment.Services) == 0 {
return tmpEp, fmt.Errorf("there were no services for the asset in namespace %s on cluster %s", identityConfigEnvironment.Namespace, serverCluster)
}
if clientCluster == serverCluster {
for _, service := range identityConfigEnvironment.Services {
if service.Weight == -1 {
Expand Down Expand Up @@ -140,9 +146,8 @@ func getExportTo(ctxLogger *logrus.Entry, registryClient registry.IdentityConfig
// For each client asset of cname, we fetch its identityConfig
clientIdentityConfig, err = registryClient.GetIdentityConfigByIdentityName(clientAsset, ctxLogger)
if err != nil {
// TODO: this should return an error.
ctxLogger.Infof(common.CtxLogFormat, "buildServiceEntry", clientAsset, common.GetSyncNamespace(), "", "could not fetch IdentityConfig: "+err.Error())
continue
return clientNamespaces, err
}
for _, clientIdentityConfigCluster := range clientIdentityConfig.Clusters {
// For each cluster the client asset is deployed on, we check if that cluster is the client cluster we are writing to
Expand Down
43 changes: 39 additions & 4 deletions admiral/pkg/clusters/configwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func createMockServiceEntry(env string, identity string, endpointAddress string,
}

func TestGetIngressEndpoints(t *testing.T) {
identityConfig := registry.GetSampleIdentityConfig()
identityConfig := registry.GetSampleIdentityConfig("sample")
expectedIngressEndpoints := map[string]*networkingV1Alpha3.WorkloadEntry{
"cluster1": {
Address: "abc-elb.us-west-2.elb.amazonaws.com.",
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestGetServiceEntryEndpoint(t *testing.T) {
admiralParams := admiralParamsForConfigWriterTests()
common.ResetSync()
common.InitializeConfig(admiralParams)
e2eEnv := registry.GetSampleIdentityConfigEnvironment("e2e", "ns-1-usw2-e2e")
e2eEnv := registry.GetSampleIdentityConfigEnvironment("e2e", "ns-1-usw2-e2e", "sample")
ingressEndpoints := map[string]*networkingV1Alpha3.WorkloadEntry{"cluster1": {
Address: "abc-elb.us-west-2.elb.amazonaws.com.",
Locality: "us-west-2",
Expand Down Expand Up @@ -226,17 +226,24 @@ func TestBuildServiceEntriesFromIdentityConfig(t *testing.T) {
common.InitializeConfig(admiralParams)
rr, _ := InitAdmiralOperator(context.Background(), admiralParams)
ctxLogger := common.GetCtxLogger(context.Background(), "test", "")
identityConfig := registry.GetSampleIdentityConfig()
identityConfig := registry.GetSampleIdentityConfig("sample")
expectedLocalServiceEntryPRF := createMockServiceEntry("prf", "sample", "app-1-spk-root-service.ns-1-usw2-prf.svc.cluster.local.", 8090, []string{"istio-system", "ns-1-usw2-e2e", "ns-1-usw2-prf", "ns-1-usw2-qal"})
expectedLocalServiceEntryE2E := createMockServiceEntry("e2e", "sample", "app-1-spk-root-service.ns-1-usw2-e2e.svc.cluster.local.", 8090, []string{"istio-system", "ns-1-usw2-e2e", "ns-1-usw2-prf", "ns-1-usw2-qal"})
expectedLocalServiceEntryQAL := createMockServiceEntry("qal", "sample", "app-1-spk-root-service.ns-1-usw2-qal.svc.cluster.local.", 8090, []string{"istio-system", "ns-1-usw2-e2e", "ns-1-usw2-prf", "ns-1-usw2-qal"})
expectedLocalServiceEntries := []*networkingV1Alpha3.ServiceEntry{&expectedLocalServiceEntryQAL, &expectedLocalServiceEntryPRF, &expectedLocalServiceEntryE2E}
identityConfigFailsIngressEndpoints := registry.GetSampleIdentityConfig("sample")
identityConfigFailsIngressEndpoints.Clusters = map[string]*registry.IdentityConfigCluster{}
identityConfigFailsExportTo := registry.GetSampleIdentityConfig("sample")
identityConfigFailsExportTo.ClientAssets["fake"] = "fake"
identityConfigFailsServiceEntryEndpoint := registry.GetSampleIdentityConfig("sample")
identityConfigFailsServiceEntryEndpoint.Clusters["cluster1"].Environment["e2e"].Services = make(map[string]*registry.RegistryServiceConfig)
testCases := []struct {
name string
clientCluster string
event admiral.EventType
identityConfig registry.IdentityConfig
expectedServiceEntries []*networkingV1Alpha3.ServiceEntry
expectedErr bool
}{
{
name: "Given information to build an se, " +
Expand All @@ -246,13 +253,41 @@ func TestBuildServiceEntriesFromIdentityConfig(t *testing.T) {
event: admiral.Add,
identityConfig: identityConfig,
expectedServiceEntries: expectedLocalServiceEntries,
expectedErr: false,
},
{
name: "Given getIngressEndpoints fails with a non-nil error or is empty, " +
"Then there should be an empty array of returned service entries",
clientCluster: "cluster1",
event: admiral.Add,
identityConfig: identityConfigFailsIngressEndpoints,
expectedServiceEntries: make([]*networkingV1Alpha3.ServiceEntry, 0),
expectedErr: false,
},
{
name: "Given getExportTo fails with a non-nil error, " +
"Then there should be an empty array of returned service entries",
clientCluster: "cluster1",
event: admiral.Add,
identityConfig: identityConfigFailsExportTo,
expectedServiceEntries: make([]*networkingV1Alpha3.ServiceEntry, 0),
expectedErr: true,
},
{
name: "Given getServiceEntryEndpoint fails with a non-nil error, " +
"Then there should be an empty array of returned service entries",
clientCluster: "cluster1",
event: admiral.Add,
identityConfig: identityConfigFailsServiceEntryEndpoint,
expectedServiceEntries: make([]*networkingV1Alpha3.ServiceEntry, 0),
expectedErr: true,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
serviceEntryBuilder := ServiceEntryBuilder{ClientCluster: c.clientCluster, RemoteRegistry: rr}
serviceEntries, err := serviceEntryBuilder.BuildServiceEntriesFromIdentityConfig(ctxLogger, c.identityConfig)
if err != nil {
if err != nil && !c.expectedErr {
t.Errorf("want=%v, \ngot=%v", nil, err)
}
opts := cmpopts.IgnoreUnexported(networkingV1Alpha3.ServiceEntry{}, networkingV1Alpha3.ServicePort{}, networkingV1Alpha3.WorkloadEntry{})
Expand Down
53 changes: 45 additions & 8 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ func modifySidecarForLocalClusterCommunication(

sidecar, err := sidecarConfig.IstioClient.NetworkingV1alpha3().Sidecars(sidecarNamespace).Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err != nil {
ctxLogger.Warnf(common.CtxLogFormat, "modifySidecarForLocalClusterCommunication", sourceIdentity, sidecarNamespace, rc.ClusterID, err)
return
}
if sidecar == nil || (sidecar.Spec.Egress == nil) {
Expand Down Expand Up @@ -1132,12 +1133,40 @@ func addUpdateSidecar(ctxLogger *logrus.Entry, ctx context.Context, obj *v1alpha
}
_, err = rc.SidecarController.IstioClient.NetworkingV1alpha3().Sidecars(namespace).Update(ctx, obj, v12.UpdateOptions{})
if err != nil {
err = retryUpdatingSidecar(ctxLogger, ctx, obj, exist, namespace, rc, err, "Update")
ctxLogger.Infof(LogErrFormat, "Update", "Sidecar", obj.Name, rc.ClusterID, err)
} else {
ctxLogger.Infof(LogErrFormat, "Update", "Sidecar", obj.Name, rc.ClusterID, "Success")
}
}

func retryUpdatingSidecar(ctxLogger *logrus.Entry, ctx context.Context, obj *v1alpha3.Sidecar, exist *v1alpha3.Sidecar, namespace string, rc *RemoteController, err error, op string) error {
numRetries := 5
if err != nil && (k8sErrors.IsConflict(err) || k8sErrors.IsInvalid(err)) {
for i := 0; i < numRetries; i++ {
ctxLogger.Errorf(common.CtxLogFormat, op, obj.Name, obj.Namespace, rc.ClusterID, err.Error()+". retrying sidecar update.")

updatedSidecar, err := rc.SidecarController.IstioClient.NetworkingV1alpha3().Sidecars(namespace).Get(ctx, exist.Name, v12.GetOptions{})
// if old sidecar not found, just create a new sidecar instead
if err != nil {
ctxLogger.Infof(common.CtxLogFormat, op, exist.Name, exist.Namespace, rc.ClusterID, err.Error()+fmt.Sprintf(". Error getting old sidecar"))
continue
}
existingResourceVersion := updatedSidecar.GetResourceVersion()
ctxLogger.Infof(common.CtxLogFormat, op, obj.Name, obj.Namespace, rc.ClusterID, fmt.Sprintf("existingResourceVersion=%s resourceVersionUsedForUpdate=%s", updatedSidecar.ResourceVersion, obj.ResourceVersion))
updatedSidecar.Spec = obj.Spec
updatedSidecar.Annotations = obj.Annotations
updatedSidecar.Labels = obj.Labels
updatedSidecar.SetResourceVersion(existingResourceVersion)
_, err = rc.SidecarController.IstioClient.NetworkingV1alpha3().Sidecars(namespace).Update(ctx, updatedSidecar, v12.UpdateOptions{})
if err == nil {
return nil
}
}
}
return err
}

func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {
newSidecarObj := &v1alpha3.Sidecar{}
newSidecarObj.Spec.WorkloadSelector = sidecar.Spec.WorkloadSelector
Expand Down Expand Up @@ -1226,7 +1255,6 @@ func AddServiceEntriesWithDrWorker(
//partitionedIdentity holds the originally passed in identity which could have a partition prefix
partitionedIdentity := identityId
//identityId is guaranteed to have the non-partitioned identity
// Operator Branch 1: since partition cache will not be filled, return identityId from getNonPartitionedIdentity
identityId = getNonPartitionedIdentity(rr.AdmiralCache, identityId)
// Operator: When calling this function make a channel with one cluster in it
for cluster := range clusters { // TODO log cluster / service entry
Expand All @@ -1238,6 +1266,10 @@ func AddServiceEntriesWithDrWorker(
addSEorDRToAClusterError error
)

if common.IsAdmiralOperatorMode() {
syncNamespace = common.GetOperatorSyncNamespace()
}

rc := rr.GetRemoteController(cluster)
if rc == nil {
ctxLogger.Warnf(common.CtxLogFormat, "AddServiceEntriesWithDrWorker", "", "", cluster, "remote controller not found for the cluster")
Expand All @@ -1252,7 +1284,6 @@ func AddServiceEntriesWithDrWorker(
}

//this get is within the loop to avoid race condition when one event could update destination rule on stale data
// TODO: Operator: Fill these caches in AdmiralCache in shardHandler
globalTrafficPolicy, err := cache.GlobalTrafficCache.GetFromIdentity(partitionedIdentity, env)
if err != nil {
ctxLogger.Errorf(LogErrFormat, "GlobalTrafficCache", "", "", cluster, err.Error())
Expand Down Expand Up @@ -1452,12 +1483,11 @@ func AddServiceEntriesWithDrWorker(
// build list of gateway clusters
gwClusters := []string{}
for _, gwAlias := range common.GetGatewayAssetAliases() {
// TODO: Operator fills this cache in produceIdentityConfigs
dependents := rr.AdmiralCache.IdentityDependencyCache.Get(partitionedIdentity)
if dependents != nil && dependents.Len() > 0 {
dependents.Range(func(_ string, dependent string) {
if strings.Contains(strings.ToLower(dependent), strings.ToLower(gwAlias)) {
gwClustersMap := getClusters(rr, dependent)
gwClustersMap := getClusters(rr, dependent, ctxLogger)
if gwClustersMap != nil {
for _, cluster := range gwClustersMap.GetKeys() {
gwClusters = append(gwClusters, cluster)
Expand Down Expand Up @@ -1532,7 +1562,6 @@ func AddServiceEntriesWithDrWorker(
}
}
}

errors <- addSEorDRToAClusterError
}
}
Expand All @@ -1547,10 +1576,18 @@ func getClusterRegion(rr *RemoteRegistry, cluster string, rc *RemoteController)
return "", fmt.Errorf("failed to get region of cluster %v", cluster)
}

func getClusters(rr *RemoteRegistry, dependent string) *common.Map {
func getClusters(rr *RemoteRegistry, dependent string, ctxLogger *logrus.Entry) *common.Map {
if common.IsAdmiralOperatorMode() {
// TODO: go through registry client to pull dependent identity clusters and construct map...
return nil
// Any better way than calling service registry here?
dependentIdentityConfig, err := rr.RegistryClient.GetIdentityConfigByIdentityName(dependent, ctxLogger)
if err != nil {
return nil
}
gwClusterMap := common.NewMap()
for _, cluster := range dependentIdentityConfig.Clusters {
gwClusterMap.Put(cluster.Name, cluster.Name)
}
return gwClusterMap
}
return rr.AdmiralCache.IdentityClusterCache.Get(dependent)
}
Expand Down
Loading

0 comments on commit c4d191c

Please sign in to comment.