Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update endpoint k8s new #337

Closed
wants to merge 11 commits into from
46 changes: 43 additions & 3 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/google/uuid"
"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
"github.com/kube-tarian/kad/capten/agent/internal/temporalclient"
"github.com/kube-tarian/kad/capten/agent/internal/workers"

"github.com/kube-tarian/kad/capten/agent/internal/pb/captenpluginspb"

Expand Down Expand Up @@ -37,15 +39,25 @@ var (

type ClusterClaimSyncHandler struct {
log logging.Logger
tc *temporalclient.Client
dbStore *captenstore.Store
}

func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) *ClusterClaimSyncHandler {
return &ClusterClaimSyncHandler{log: log, dbStore: dbStore}
func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*ClusterClaimSyncHandler, error) {
tc, err := temporalclient.NewClient(log)
if err != nil {
return nil, err
}

return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil
}

func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
return k8s.RegisterDynamicInformers(NewClusterClaimSyncHandler(log, dbStore), dynamicClient, cgvk)
obj, err := NewClusterClaimSyncHandler(log, dbStore)
if err != nil {
return err
}
return k8s.RegisterDynamicInformers(obj, dynamicClient, cgvk)
}

func getClusterClaimObj(obj any) (*model.ClusterClaim, error) {
Expand Down Expand Up @@ -199,6 +211,17 @@ func (h *ClusterClaimSyncHandler) updateManagedClusters(clusterCliams []model.Cl
h.log.Info("failed to update information to db, %v", err)
continue
}

if managedCluster.ClusterDeployStatus == clusterReadyStatus {
// call config-worker.
err = h.triggerClusterUpdates(clusterCliam.Spec.Id, managedCluster.Id)
if err != nil {
h.log.Info("failed to update cluster endpoint information %v", err)
continue
}

}

h.log.Infof("updated the cluster claim %s with status %s", managedCluster.ClusterName, managedCluster.ClusterDeployStatus)
}
}
Expand All @@ -217,3 +240,20 @@ func (h *ClusterClaimSyncHandler) getManagedClusters() (map[string]*captenplugin
}
return clusterEndpointMap, nil
}

func (h *ClusterClaimSyncHandler) triggerClusterUpdates(clusterName, managedClusterID string) error {
proj, err := h.dbStore.GetCrossplaneProject()
if err != nil {
return err
}
ci := model.CrossplaneClusterUpdate{RepoURL: proj.GitProjectUrl, GitProjectId: proj.GitProjectId, Name: clusterName, ManagedK8SId: managedClusterID}

wd := workers.NewConfig(h.tc, h.log)

_, err = wd.SendEvent(context.TODO(), &model.ConfigureParameters{Resource: model.CrossPlaneResource, Action: model.CrossPlaneClusterUpdate}, ci)
if err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions capten/agent/internal/crossplane/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ func RegisterK8SWatcher(log logging.Logger, dbStore *captenstore.Store) error {
if err != nil {
return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err)
}

return nil
}
6 changes: 5 additions & 1 deletion capten/agent/internal/job/crossplane_resources_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ type CrossplaneResourcesSync struct {
}

func NewCrossplaneResourcesSync(log logging.Logger, frequency string, dbStore *captenstore.Store) (*CrossplaneResourcesSync, error) {
ccObj, err := crossplane.NewClusterClaimSyncHandler(log, dbStore)
if err != nil {
return nil, err
}
return &CrossplaneResourcesSync{
log: log,
frequency: frequency,
dbStore: dbStore,
clusterHandler: crossplane.NewClusterClaimSyncHandler(log, dbStore),
clusterHandler: ccObj,
providerHandler: crossplane.NewProvidersSyncHandler(log, dbStore),
}, nil
}
Expand Down
9 changes: 9 additions & 0 deletions capten/common-pkg/k8s/dynamic_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ func ConvertYamlToJson(data []byte) ([]byte, error) {
return jsonData, nil
}

func ConvertJsonToYaml(data []byte) ([]byte, error) {
yamlData, err := yaml.JSONToYAML(data)
if err != nil {
return nil, err
}

return yamlData, nil
}

func (dc *DynamicClientSet) GetNameNamespace(jsonByte []byte) (string, string, error) {
var keyValue map[string]interface{}
if err := json.Unmarshal(jsonByte, &keyValue); err != nil {
Expand Down
42 changes: 37 additions & 5 deletions capten/config-worker/internal/app_config/app_git_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ const (
tmpGitProjectCloneStr = "clone*"
gitProjectAccessTokenAttribute = "accessToken"
gitUrlSuffix = ".git"
kubeConfig = "kubeconfig"
k8sEndpoint = "endpoint"
k8sClusterCA = "clusterCA"
)

type Config struct {
GitDefaultCommiterName string `envconfig:"GIT_COMMIT_NAME" default:"capten-bot"`
GitDefaultCommiterEmail string `envconfig:"GIT_COMMIT_EMAIL" default:"[email protected]"`
GitVaultEntityName string `envconfig:"GIT_VAULT_ENTITY_NAME" default:"git-project"`
GitCloneDir string `envconfig:"GIT_CLONE_DIR" default:"/gitCloneDir"`
GitBranchName string `envconfig:"GIT_BRANCH_NAME" default:"capten-template-bot"`
GitDefaultCommiterName string `envconfig:"GIT_COMMIT_NAME" default:"capten-bot"`
GitDefaultCommiterEmail string `envconfig:"GIT_COMMIT_EMAIL" default:"[email protected]"`
GitVaultEntityName string `envconfig:"GIT_VAULT_ENTITY_NAME" default:"git-project"`
GitCloneDir string `envconfig:"GIT_CLONE_DIR" default:"/gitCloneDir"`
GitBranchName string `envconfig:"GIT_BRANCH_NAME" default:"capten-template-bot"`
ManagedClusterEntityName string `envconfig:"MANAGED_CLUSER_VAULT_ENTITY_NAME" default:"managedcluster"`
}

var logger = logging.NewLogger()
Expand Down Expand Up @@ -125,6 +129,34 @@ func (ca *AppGitConfigHelper) SyncArgoCDApp(ctx context.Context, ns, resName str
return nil
}

func (ca *AppGitConfigHelper) CreateCluster(ctx context.Context, id, clusterName string) (string, error) {
credReader, err := credentials.NewCredentialReader(ctx)
if err != nil {
err = errors.WithMessage(err, "error in initializing credential reader")
return "", err
}

cred, err := credReader.GetCredential(ctx, credentials.GenericCredentialType,
ca.cfg.ManagedClusterEntityName, id)
if err != nil {
err = errors.WithMessagef(err, "error while reading credential %s/%s from the vault",
ca.cfg.GitVaultEntityName, id)
return "", err
}

client, err := argocd.NewClient(logger)
if err != nil {
return "", err
}

err = client.CreateOrUpdateCluster(ctx, clusterName, cred[kubeConfig])
if err != nil {
return "", err
}

return cred[k8sEndpoint], nil
}

func (ca *AppGitConfigHelper) WaitForArgoCDToSync(ctx context.Context, ns, resName string) error {
client, err := argocd.NewClient(logger)
if err != nil {
Expand Down
46 changes: 27 additions & 19 deletions capten/config-worker/internal/crossplane/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package crossplane
import (
"context"
"encoding/json"
"fmt"

"github.com/intelops/go-common/logging"
"github.com/kube-tarian/kad/capten/model"
Expand All @@ -14,15 +15,6 @@ var logger = logging.NewLogger()

func (c *CrossPlaneActivities) ConfigurationActivity(ctx context.Context, params model.ConfigureParameters, payload json.RawMessage) (model.ResponsePayload, error) {
logger.Infof("Activity: %s, %s", params.Resource, params.Action)

req := &model.CrossplaneUseCase{}
if err := json.Unmarshal(payload, req); err != nil {
return model.ResponsePayload{
Status: string(model.WorkFlowStatusFailed),
Message: json.RawMessage("{\"error\": \"failed to read payload\"}"),
}, err
}

config, err := NewCrossPlaneApp()
if err != nil {
return model.ResponsePayload{
Expand All @@ -31,16 +23,32 @@ func (c *CrossPlaneActivities) ConfigurationActivity(ctx context.Context, params
}, err
}

status, err := config.Configure(ctx, req)
if err != nil {
logger.Errorf("crossplane plugin configure failed, %v", err)
return model.ResponsePayload{
Status: status,
Message: json.RawMessage("{\"error\": \"failed to configure crossplane plugin\"}"),
}, err
status := ""

switch params.Action {
case model.CrossPlaneClusterUpdate:
reqLocal := &model.CrossplaneClusterUpdate{}
if err = json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req for %s, %v", model.CrossPlaneClusterUpdate, err)
err = fmt.Errorf("failed to unmarshall the crossplane req for %s", model.CrossPlaneClusterUpdate)
}
status, err = config.configureClusterUpdate(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project for %s, %v", model.CrossPlaneClusterUpdate, err)
err = fmt.Errorf("failed to configure crossplane project for %s", model.CrossPlaneClusterUpdate)
}
default:
reqLocal := &model.CrossplaneUseCase{}
if err = json.Unmarshal(payload, reqLocal); err != nil {
logger.Errorf("failed to unmarshall the crossplane req, %v", err)
err = fmt.Errorf("failed to unmarshall the crossplane req")
}
status, err = config.configureProjectAndApps(ctx, reqLocal)
if err != nil {
logger.Errorf("failed to configure crossplane project, %v", err)
err = fmt.Errorf("failed to configure crossplane project")
}
}
logger.Infof("crossplane plugin configured")
return model.ResponsePayload{
Status: status,
}, err
return model.ResponsePayload{Status: status}, err
}
153 changes: 153 additions & 0 deletions capten/config-worker/internal/crossplane/config_cluster_updates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package crossplane

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/intelops/go-common/logging"
"github.com/kube-tarian/kad/capten/common-pkg/k8s"
"github.com/kube-tarian/kad/capten/model"
agentmodel "github.com/kube-tarian/kad/capten/model"
"github.com/pkg/errors"
)

func getAppNameNamespace(ctx context.Context, fileName string) (string, string, error) {
k8sclient, err := k8s.NewK8SClient(logging.NewLogger())
if err != nil {
return "", "", fmt.Errorf("failed to initalize k8s client: %v", err)
}

data, err := os.ReadFile(fileName)
if err != nil {
return "", "", err
}

jsonData, err := k8s.ConvertYamlToJson(data)
if err != nil {
return "", "", err
}

// For the testing change the reqrepo to template one
ns, resName, err := k8sclient.DynamicClient.GetNameNamespace(jsonData)
if err != nil {
return "", "", fmt.Errorf("failed to create the k8s custom resource: %v", err)
}

return ns, resName, nil

}

func (cp *CrossPlaneApp) configureClusterUpdate(ctx context.Context, req *model.CrossplaneClusterUpdate) (status string, err error) {
logger.Infof("configuring the cluster endpoint for %s", req.RepoURL)
endpoint, err := cp.helper.CreateCluster(ctx, req.ManagedK8SId, req.Name)
// if err != nil {
// return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to CreateCluster in argocd app")
// }

logger.Infof("CreateCluster argocd err: ", err)
accessToken, err := cp.helper.GetAccessToken(ctx, req.GitProjectId)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to get token from vault")
}

logger.Infof("cloning default templates %s to project %s", cp.pluginConfig.TemplateGitRepo, req.RepoURL)
templateRepo, customerRepo, err := cp.helper.CloneRepos(ctx, cp.pluginConfig.TemplateGitRepo, req.RepoURL, accessToken)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to clone repos")
}
logger.Infof("cloned default templates to project %s", req.RepoURL)

defer os.RemoveAll(templateRepo)
defer os.RemoveAll(customerRepo)

fileName := filepath.Join(customerRepo, cp.pluginConfig.ClusterEndpointUpdates.File)
// replace cluster endpoint
err = updateClusterEndpointDetials(fileName, req.Name, endpoint, cp.cfg.ClusterDefaultAppsFile)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to replace the file")
}

err = cp.helper.AddToGit(ctx, model.CrossPlaneClusterUpdate, req.RepoURL, accessToken)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to add git repo")
}

logger.Infof("added cloned project %s changed to git", req.RepoURL)
ns, resName, err := getAppNameNamespace(ctx, filepath.Join(customerRepo, cp.pluginConfig.ClusterEndpointUpdates.MainAppGitPath))
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to get name and namespace from")
}

err = cp.helper.SyncArgoCDApp(ctx, ns, resName)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to sync argocd app")
}
logger.Infof("synched provider config main-app %s", resName)

err = cp.helper.WaitForArgoCDToSync(ctx, ns, resName)
if err != nil {
return string(agentmodel.WorkFlowStatusFailed), errors.WithMessage(err, "failed to fetch argocd app")
}

return string(agentmodel.WorkFlowStatusCompleted), nil
}

func updateClusterEndpointDetials(filename, clusterName, clusterEndpoint, defaultAppFile string) error {
data, err := os.ReadFile(filename)
if err != nil {
return err
}

jsonData, err := k8s.ConvertYamlToJson(data)
if err != nil {
return err
}

var argoCDAppValue model.ArgoCDAppValue

err = json.Unmarshal(jsonData, &argoCDAppValue)
if err != nil {
return err
}

clusters := *argoCDAppValue.Clusters
for index := range clusters {
cluster := &clusters[index]
if cluster.Name == clusterName {
defaultApps, err := readClusterDefaultApps(defaultAppFile)
if err != nil {
return err
}

for index := range defaultApps {
localObj := &defaultApps[index]
strings.ReplaceAll(localObj.AppConfigPath, clusterNameSub, clusterName)
}

logger.Infof("udpated the req endpoint details to %s for name %s ", clusterEndpoint, clusterName)
cluster.Server = clusterEndpoint

break
}
}

argoCDAppValue.Clusters = &clusters

jsonBytes, err := json.Marshal(argoCDAppValue)
if err != nil {
return err
}

yamlBytes, err := k8s.ConvertJsonToYaml(jsonBytes)
if err != nil {
return err
}

err = os.WriteFile(filename, yamlBytes, os.ModeAppend)

return err
}
Loading
Loading