From 4dfcd23a8dd81d0d802fc26cd8b60570ebc0591b Mon Sep 17 00:00:00 2001 From: "taekyu.kang" Date: Thu, 25 Apr 2024 23:39:41 +0900 Subject: [PATCH 1/2] feature. change apply rule logic to reload --- cmd/server/appgroup_status.go | 2 +- cmd/server/cloud_account_status.go | 2 +- cmd/server/cluster_byoh.go | 2 +- cmd/server/cluster_status.go | 2 +- cmd/server/main.go | 5 +- cmd/server/organization_status.go | 2 +- cmd/server/system_notification_rule.go | 21 ++-- cmd/server/thanos_ruler.go | 102 ++++++++++++++++++ internal/organization/organization.go | 18 +++- .../syste-notification-rule.go | 22 +++- 10 files changed, 157 insertions(+), 21 deletions(-) create mode 100644 cmd/server/thanos_ruler.go diff --git a/cmd/server/appgroup_status.go b/cmd/server/appgroup_status.go index 3f91cf2..d9bcb57 100644 --- a/cmd/server/appgroup_status.go +++ b/cmd/server/appgroup_status.go @@ -18,7 +18,7 @@ func processAppGroupStatus() error { if len(appGroups) == 0 { return nil } - log.Info(context.TODO(), "appGroups : ", appGroups) + log.Info(context.TODO(), "[processAppGroupStatus] appGroups : ", appGroups) for i := range appGroups { appGroup := appGroups[i] diff --git a/cmd/server/cloud_account_status.go b/cmd/server/cloud_account_status.go index 35cf1af..0cdb0b4 100644 --- a/cmd/server/cloud_account_status.go +++ b/cmd/server/cloud_account_status.go @@ -17,7 +17,7 @@ func processCloudAccountStatus() error { if len(cloudAccounts) == 0 { return nil } - log.Info(context.TODO(), "cloudAccounts : ", cloudAccounts) + log.Info(context.TODO(), "[processCloudAccountStatus] cloudAccounts : ", cloudAccounts) for i := range cloudAccounts { cloudaccount := cloudAccounts[i] diff --git a/cmd/server/cluster_byoh.go b/cmd/server/cluster_byoh.go index da5555a..6f113fd 100644 --- a/cmd/server/cluster_byoh.go +++ b/cmd/server/cluster_byoh.go @@ -22,7 +22,7 @@ func processClusterByoh() error { if len(clusters) == 0 { return nil } - log.Info(context.TODO(), "byoh clusters : ", clusters) + log.Info(context.TODO(), "[processClusterByoh] byoh clusters : ", clusters) token = getTksApiToken() if token != "" { diff --git a/cmd/server/cluster_status.go b/cmd/server/cluster_status.go index 9c533d1..33e4652 100644 --- a/cmd/server/cluster_status.go +++ b/cmd/server/cluster_status.go @@ -17,7 +17,7 @@ func processClusterStatus() error { if len(clusters) == 0 { return nil } - log.Info(context.TODO(), "clusters : ", clusters) + log.Info(context.TODO(), "[processClusterStatus] clusters : ", clusters) for i := range clusters { cluster := clusters[i] diff --git a/cmd/server/main.go b/cmd/server/main.go index 32aca54..65d2d53 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -109,7 +109,10 @@ func main() { if err != nil { log.Error(context.TODO(), err) } - + err = processReloadThanosRules() + if err != nil { + log.Error(context.TODO(), err) + } time.Sleep(time.Second * INTERVAL_SEC) } diff --git a/cmd/server/organization_status.go b/cmd/server/organization_status.go index 25ce95c..3e03ab0 100644 --- a/cmd/server/organization_status.go +++ b/cmd/server/organization_status.go @@ -17,7 +17,7 @@ func processOrganizationStatus() error { if len(organizations) == 0 { return nil } - log.Info(context.TODO(), "organizations : ", organizations) + log.Info(context.TODO(), "[processOrganizationStatus] organizations : ", organizations) for i := range organizations { organization := organizations[i] diff --git a/cmd/server/system_notification_rule.go b/cmd/server/system_notification_rule.go index e0b14d9..962bcc9 100644 --- a/cmd/server/system_notification_rule.go +++ b/cmd/server/system_notification_rule.go @@ -55,7 +55,7 @@ func processSystemNotificationRule() error { if len(rules) == 0 { return nil } - log.Info(context.TODO(), "incompleted rules : ", len(rules)) + log.Info(context.TODO(), "[processSystemNotificationRule] incompleted rules : ", len(rules)) incompletedOrganizations := []string{} @@ -219,14 +219,17 @@ func applyRules(organizationId string, primaryClusterId string, rc RulerConfig) } // restart thanos-ruler - deletePolicy := metav1.DeletePropagationForeground - err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{ - PropagationPolicy: &deletePolicy, - }) - if err != nil { - log.Error(context.TODO(), err) - return err - } + // thanos-ruler reload 방식으로 변경했으나, 혹시 몰라 일단 코드는 주석처리해둠 + /* + deletePolicy := metav1.DeletePropagationForeground + err = clientset.CoreV1().Pods("lma").Delete(context.TODO(), "thanos-ruler-0", metav1.DeleteOptions{ + PropagationPolicy: &deletePolicy, + }) + if err != nil { + log.Error(context.TODO(), err) + return err + } + */ // update status err = systemNotificationRuleAccessor.UpdateSystemNotificationRuleStatus(organizationId, domain.SystemNotificationRuleStatus_APPLIED) diff --git a/cmd/server/thanos_ruler.go b/cmd/server/thanos_ruler.go new file mode 100644 index 0000000..fb2ab43 --- /dev/null +++ b/cmd/server/thanos_ruler.go @@ -0,0 +1,102 @@ +package main + +import ( + "context" + "fmt" + "io" + "net/http" + "strconv" + + "github.com/openinfradev/tks-api/pkg/kubernetes" + "github.com/openinfradev/tks-api/pkg/log" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const LAST_UPDATED_MIN = 1 + +func processReloadThanosRules() error { + organizationIds, err := systemNotificationRuleAccessor.GetRecentlyUpdatedOrganizations(LAST_UPDATED_MIN) + if err != nil { + return err + } + if len(organizationIds) == 0 { + return nil + } + log.Info(context.TODO(), "[processReloadThanosRules] new updated organizationIds : ", organizationIds) + + for _, organizationId := range organizationIds { + organization, err := organizationAccessor.Get(organizationId) + if err != nil { + log.Error(context.TODO(), err) + continue + } + + url, err := GetThanosRulerUrl(organization.PrimaryClusterId) + if err != nil { + log.Error(context.TODO(), err) + continue + } + + if err = Reload(url); err != nil { + log.Error(context.TODO(), err) + continue + } + } + + return nil +} + +func GetThanosRulerUrl(primaryClusterId string) (url string, err error) { + clientset_admin, err := kubernetes.GetClientAdminCluster(context.TODO()) + if err != nil { + return url, errors.Wrap(err, "Failed to get client set for user cluster") + } + + secrets, err := clientset_admin.CoreV1().Secrets(primaryClusterId).Get(context.TODO(), "tks-endpoint-secret", metav1.GetOptions{}) + if err != nil { + log.Info(context.TODO(), "cannot found tks-endpoint-secret. so use LoadBalancer...") + + clientset_user, err := kubernetes.GetClientFromClusterId(context.TODO(), primaryClusterId) + if err != nil { + return url, errors.Wrap(err, "Failed to get client set for user cluster") + } + + service, err := clientset_user.CoreV1().Services("lma").Get(context.TODO(), "thanos-ruler", metav1.GetOptions{}) + if err != nil { + return url, errors.Wrap(err, "Failed to get services.") + } + + // LoadBalaner 일경우, aws address 형태의 경우만 가정한다. + if service.Spec.Type != "LoadBalancer" { + return url, fmt.Errorf("Service type is not LoadBalancer. [%s] ", service.Spec.Type) + } + + lbs := service.Status.LoadBalancer.Ingress + ports := service.Spec.Ports + if len(lbs) > 0 && len(ports) > 0 { + url = ports[0].TargetPort.StrVal + "://" + lbs[0].Hostname + ":" + strconv.Itoa(int(ports[0].Port)) + } + } else { + url = "http://" + string(secrets.Data["thanos-ruler"]) + } + return url, nil +} + +func Reload(thanosRulerUrl string) (err error) { + reqUrl := thanosRulerUrl + "/-/reload" + + log.Info(context.TODO(), "url : ", reqUrl) + resp, err := http.Post(reqUrl, "text/plain", nil) + if err != nil { + return err + } + + defer resp.Body.Close() + + _, err = io.ReadAll(resp.Body) + if err != nil { + return err + } + return nil +} diff --git a/internal/organization/organization.go b/internal/organization/organization.go index adde61a..a35cd77 100644 --- a/internal/organization/organization.go +++ b/internal/organization/organization.go @@ -12,10 +12,11 @@ import ( // Organization represents a kubernetes organization information. type Organization struct { - ID string `gorm:"primarykey"` - WorkflowId string - Status domain.OrganizationStatus - StatusDesc string + ID string `gorm:"primarykey"` + WorkflowId string + Status domain.OrganizationStatus + StatusDesc string + PrimaryClusterId string } // Accessor accesses organization info in DB. @@ -49,6 +50,15 @@ func (x *OrganizationAccessor) GetIncompleteOrganizations() ([]Organization, err return organizations, nil } +func (x *OrganizationAccessor) Get(id string) (organization Organization, err error) { + res := x.db.Where("id = ?", id).First(&organization) + if res.Error != nil { + return organization, res.Error + } + + return +} + func (x *OrganizationAccessor) UpdateOrganizationStatus(organizationId string, status domain.OrganizationStatus, statusDesc string, workflowId string) error { log.Info(context.TODO(), fmt.Sprintf("UpdateOrganizationStatus. organizationId[%s], status[%d], statusDesc[%s], workflowId[%s]", organizationId, status, statusDesc, workflowId)) res := x.db.Model(Organization{}). diff --git a/internal/system-notification-rule/syste-notification-rule.go b/internal/system-notification-rule/syste-notification-rule.go index 9dda3f0..5067fda 100644 --- a/internal/system-notification-rule/syste-notification-rule.go +++ b/internal/system-notification-rule/syste-notification-rule.go @@ -99,8 +99,6 @@ func (x *SystemNotificationAccessor) GetIncompletedRules() ([]SystemNotification Joins("join clusters on clusters.id = organizations.primary_cluster_id AND clusters.status = ?", domain.ClusterStatus_RUNNING). Joins("join app_groups on app_groups.cluster_id = clusters.id AND app_groups.status = ?", domain.AppGroupStatus_RUNNING). Where("system_notification_rules.status = ?", domain.SystemNotificationRuleStatus_PENDING). - //Where("system_notification_rules.is_system = false"). - Order("system_notification_rules.organization_id"). Unscoped(). Find(&rules) @@ -111,6 +109,26 @@ func (x *SystemNotificationAccessor) GetIncompletedRules() ([]SystemNotification return rules, nil } +func (x *SystemNotificationAccessor) GetRecentlyUpdatedOrganizations(lastUpdateMin int) ([]string, error) { + var organizationIds []string + + res := x.db.Model(&SystemNotificationRule{}). + Select("system_notification_rules.organization_id"). + Joins("join organizations on organizations.id = system_notification_rules.organization_id"). + Joins("join clusters on clusters.id = organizations.primary_cluster_id AND clusters.status = ?", domain.ClusterStatus_RUNNING). + Joins("join app_groups on app_groups.cluster_id = clusters.id AND app_groups.status = ?", domain.AppGroupStatus_RUNNING). + Where("system_notification_rules.status = ?", domain.SystemNotificationRuleStatus_APPLIED). + Where(fmt.Sprintf("system_notification_rules.updated_at between now()-interval '%d minutes' and now() OR system_notification_rules.deleted_at between now()-interval '%d minutes' and now()", lastUpdateMin, lastUpdateMin)). + Group("system_notification_rules.organization_id"). + Unscoped(). + Find(&organizationIds) + + if res.Error != nil { + return nil, res.Error + } + return organizationIds, nil +} + func (x *SystemNotificationAccessor) GetRules(organizationId string) ([]SystemNotificationRule, error) { var rules []SystemNotificationRule From ae1571f7125453d282fb2d8b906479049f1702f2 Mon Sep 17 00:00:00 2001 From: "taekyu.kang" Date: Fri, 26 Apr 2024 10:52:58 +0900 Subject: [PATCH 2/2] feature. add cache feature --- .github/workflows/golangcic-lint.yml | 50 ++++++++++++++-------------- cmd/server/main.go | 9 +++-- cmd/server/thanos_ruler.go | 12 ++++++- go.mod | 5 +-- go.sum | 4 +-- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/.github/workflows/golangcic-lint.yml b/.github/workflows/golangcic-lint.yml index cdd16fb..f889b7a 100644 --- a/.github/workflows/golangcic-lint.yml +++ b/.github/workflows/golangcic-lint.yml @@ -1,35 +1,35 @@ name: Lint on: push: + tags: + - v* + branches: + - main + - develop + - release pull_request: - + branches: + - main + - develop + - release jobs: golangci: name: lint runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: golangci-lint - uses: golangci/golangci-lint-action@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 with: - # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: latest - args: --timeout=5m - - # Optional: working directory, useful for monorepos - # working-directory: somedir - - # Optional: golangci-lint command line arguments. - # args: --issues-exit-code=0 - - # Optional: show only new issues if it's a pull request. The default value is `false`. - # only-new-issues: true - - # Optional: if set to true then the action will use pre-installed Go. - # skip-go-installation: true - - # Optional: if set to true then the action don't cache or restore ~/go/pkg. - # skip-pkg-cache: true - - # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. - # skip-build-cache: true + go-version: "1.21" + cache: false + - name: Install golangci-lint + # Install golangci-lint from source instead of using + # golangci-lint-action to ensure the golangci-lint binary is built with + # the same Go version we're targeting. + # Avoids incompatibility issues such as: + # - https://github.com/golangci/golangci-lint/issues/2922 + # - https://github.com/golangci/golangci-lint/issues/2673 + # - https://github.com/golangci/golangci-lint-action/issues/442 + run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.56.2 + - name: Run golangci-lint + run: golangci-lint run --verbose --out-format=github-actions diff --git a/cmd/server/main.go b/cmd/server/main.go index 65d2d53..da7a59f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -9,15 +9,15 @@ import ( _apiClient "github.com/openinfradev/tks-api/pkg/api-client" argo "github.com/openinfradev/tks-api/pkg/argo-client" "github.com/openinfradev/tks-api/pkg/log" - "github.com/spf13/pflag" - "github.com/spf13/viper" - "github.com/openinfradev/tks-batch/internal/application" cloudAccount "github.com/openinfradev/tks-batch/internal/cloud-account" "github.com/openinfradev/tks-batch/internal/cluster" "github.com/openinfradev/tks-batch/internal/database" "github.com/openinfradev/tks-batch/internal/organization" systemNotificationRule "github.com/openinfradev/tks-batch/internal/system-notification-rule" + gcache "github.com/patrickmn/go-cache" + "github.com/spf13/pflag" + "github.com/spf13/viper" ) const INTERVAL_SEC = 5 @@ -30,6 +30,7 @@ var ( organizationAccessor *organization.OrganizationAccessor systemNotificationRuleAccessor *systemNotificationRule.SystemNotificationAccessor apiClient _apiClient.ApiClient + cache *gcache.Cache ) func init() { @@ -84,6 +85,8 @@ func main() { log.Fatal(context.TODO(), "failed to create tks-api client : ", err) } + cache = gcache.New(5*time.Minute, 10*time.Minute) + for { err = processClusterStatus() if err != nil { diff --git a/cmd/server/thanos_ruler.go b/cmd/server/thanos_ruler.go index fb2ab43..72ccfce 100644 --- a/cmd/server/thanos_ruler.go +++ b/cmd/server/thanos_ruler.go @@ -9,11 +9,12 @@ import ( "github.com/openinfradev/tks-api/pkg/kubernetes" "github.com/openinfradev/tks-api/pkg/log" + gcache "github.com/patrickmn/go-cache" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const LAST_UPDATED_MIN = 1 +const LAST_UPDATED_MIN = 2 func processReloadThanosRules() error { organizationIds, err := systemNotificationRuleAccessor.GetRecentlyUpdatedOrganizations(LAST_UPDATED_MIN) @@ -48,6 +49,13 @@ func processReloadThanosRules() error { } func GetThanosRulerUrl(primaryClusterId string) (url string, err error) { + const prefix = "CACHE_KEY_THANOS_RULER_URL" + value, found := cache.Get(prefix + primaryClusterId) + if found { + log.Info(context.TODO(), "Cache HIT [CACHE_KEY_THANOS_RULER_URL] ", value) + return value.(string), nil + } + clientset_admin, err := kubernetes.GetClientAdminCluster(context.TODO()) if err != nil { return url, errors.Wrap(err, "Failed to get client set for user cluster") @@ -80,6 +88,8 @@ func GetThanosRulerUrl(primaryClusterId string) (url string, err error) { } else { url = "http://" + string(secrets.Data["thanos-ruler"]) } + + cache.Set(prefix+primaryClusterId, url, gcache.DefaultExpiration) return url, nil } diff --git a/go.mod b/go.mod index f6b78f4..33f8f35 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,9 @@ toolchain go1.21.7 require ( github.com/gofrs/uuid v4.0.0+incompatible + github.com/openinfradev/tks-api v0.0.0-20240411053710-5b8a434e8797 + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pkg/errors v0.9.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.18.2 gopkg.in/yaml.v2 v2.4.0 @@ -47,9 +50,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/openinfradev/tks-api v0.0.0-20240411053710-5b8a434e8797 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/go.sum b/go.sum index 1235b18..dc73524 100644 --- a/go.sum +++ b/go.sum @@ -142,10 +142,10 @@ github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo= github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys= github.com/onsi/gomega v1.23.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= -github.com/openinfradev/tks-api v0.0.0-20240409091158-eff7241c1731 h1:gmVBHSDzJGdf9p4wm28bDFcA3yFU6QjZl4prCd2fvIg= -github.com/openinfradev/tks-api v0.0.0-20240409091158-eff7241c1731/go.mod h1:OGfXiL0YRby+OzOm+OI0d+wtPkOj3SMCiAv3lvpmaiU= github.com/openinfradev/tks-api v0.0.0-20240411053710-5b8a434e8797 h1:DQ5naso3RdA0XxQ2Fj70xZ4O3vBhtWYR9Kpdy7LQqRE= github.com/openinfradev/tks-api v0.0.0-20240411053710-5b8a434e8797/go.mod h1:Ph4lPgdWg06R1GUNCtmXfzHNlNCW/XjUAvei+m5DD2o= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=