Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add logs and stats to sync loop #573

Merged
merged 2 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string
// a) read template file
// b) substitute templatized variables with their resolved values
// 2. create the resource on the kubernetes cluster and cache successful outcomes
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
templateValues, customTemplateValues templateValuesType) error {
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain,
namespace NamespaceName, templateValues, customTemplateValues templateValuesType) (ResourceSyncStats, error) {
templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath()
if c.lastAppliedTemplateDir != templateDir {
// Invalidate all caches
Expand All @@ -283,12 +283,13 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
}
templateFiles, err := ioutil.ReadDir(templateDir)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal,
return ResourceSyncStats{}, errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to read config template dir [%s] for namespace [%s] with err: %v",
namespace, templateDir, err)
}

collectedErrs := make([]error, 0)
stats := ResourceSyncStats{}
for _, templateFile := range templateFiles {
templateFileName := templateFile.Name()
if filepath.Ext(templateFileName) != ".yaml" {
Expand All @@ -309,6 +310,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
if c.templateAlreadyApplied(namespace, templateFileName, checksum) {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name())
stats.AlreadyThere++
continue
}

Expand All @@ -320,6 +322,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
"into a dynamic unstructured mapping with err: %v, manifest: %v", namespace, err, k8sManifest)
collectedErrs = append(collectedErrs, err)
c.metrics.KubernetesResourcesCreateErrors.Inc()
stats.Errored++
continue
}

Expand All @@ -341,6 +344,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to get current resource from server [%+v] in namespace [%s] with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

Expand All @@ -350,6 +354,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to marshal resource [%+v] in namespace [%s] to json with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

Expand All @@ -359,12 +364,14 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to create patch for resource [%+v] in namespace [%s] err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

if string(patch) == noChange {
logger.Infof(ctx, "Resource [%+v] in namespace [%s] is not modified",
dynamicObj.obj.GetKind(), namespace)
stats.AlreadyThere++
continue
}

Expand All @@ -375,9 +382,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to patch resource [%+v] in namespace [%s] with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

stats.Updated++
logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
dynamicObj.obj.GetKind(), namespace)
c.setTemplateChecksum(namespace, templateFileName, checksum)
Expand All @@ -389,9 +398,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
err := errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to create kubernetes object from config template [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
stats.Errored++
collectedErrs = append(collectedErrs, err)
}
} else {
stats.Created++
logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes",
dynamicObj.obj.GetKind(), namespace)
c.metrics.KubernetesResourcesCreated.Inc()
Expand All @@ -400,9 +411,10 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
}
}
if len(collectedErrs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs)
return stats, errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs)
}
return nil

return stats, nil
}

var metadataAccessor = meta.NewAccessor()
Expand Down Expand Up @@ -573,40 +585,52 @@ func (c *controller) Sync(ctx context.Context) error {
errs = append(errs, err)
}

stats := ResourceSyncStats{}

for _, project := range projects.Projects {
for _, domain := range project.Domains {
namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Id, domain.Name)
customTemplateValues, err := c.getCustomTemplateValues(
ctx, project.Id, domain.Id, domainTemplateValues[domain.Id])
if err != nil {
logger.Warningf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err)
logger.Errorf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err)
errs = append(errs, err)
}
err = c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues)

newStats, err := c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues)
if err != nil {
logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err)
c.metrics.ResourceAddErrors.Inc()
errs = append(errs, err)
} else {
c.metrics.ResourcesAdded.Inc()
logger.Debugf(ctx, "Successfully created kubernetes resources for [%s]", namespace)
stats.Add(newStats)
}

logger.Infof(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, newStats)
}
}

logger.Infof(ctx, "Completed cluster resource creation loop with stats: [%+v]", stats)

if len(errs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, errs)
}

return nil
}

func (c *controller) Run() {
ctx := context.Background()
logger.Debugf(ctx, "Running ClusterResourceController")
logger.Info(ctx, "Running ClusterResourceController")
interval := c.config.ClusterResourceConfiguration().GetRefreshInterval()
wait.Forever(func() {
err := c.Sync(ctx)
if err != nil {
logger.Warningf(ctx, "Failed cluster resource creation loop with: %v", err)
logger.Errorf(ctx, "Failed cluster resource creation loop with: %v", err)
} else {
logger.Infof(ctx, "Successfully completed cluster resource creation loop")
}
}, interval)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/clusterresource/sync_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package clusterresource

// ResourceSyncStats is a simple struct to track the number of resources created, updated, already there, and errored
type ResourceSyncStats struct {
Created int
Updated int
AlreadyThere int
Errored int
}

// Add adds the values of the other ResourceSyncStats to this one
func (m *ResourceSyncStats) Add(other ResourceSyncStats) {
m.Created += other.Created
m.Updated += other.Updated
m.AlreadyThere += other.AlreadyThere
m.Errored += other.Errored
}