Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: acr controller in 2.12 #336

Merged
merged 18 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ USER root
RUN ln -s /usr/local/bin/argocd /usr/local/bin/argocd-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-repo-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/event-reporter-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-change-revision-controller && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-cmp-server && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-application-controller && \
ln -s /usr/local/bin/argocd /usr/local/bin/argocd-dex && \
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ define run-in-test-client
-e GITHUB_TOKEN \
-e GOCACHE=/tmp/go-build-cache \
-e ARGOCD_LINT_GOGC=$(ARGOCD_LINT_GOGC) \
-e GOSUMDB=off \
-v ${DOCKER_SRC_MOUNT} \
-v ${GOPATH}/pkg/mod:/go/pkg/mod${VOLUME_MOUNT} \
-v ${GOCACHE}:/tmp/go-build-cache${VOLUME_MOUNT} \
Expand Down
98 changes: 98 additions & 0 deletions acr_controller/application/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package application_change_revision_controller

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"google.golang.org/grpc"

appclient "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
)

//go:generate go run github.com/vektra/mockery/[email protected] --name=ApplicationClient

type ApplicationClient interface {
GetChangeRevision(ctx context.Context, in *appclient.ChangeRevisionRequest, opts ...grpc.CallOption) (*appclient.ChangeRevisionResponse, error)
}

type httpApplicationClient struct {
httpClient *http.Client
baseUrl string
token string
rootpath string
}

func NewHttpApplicationClient(token string, address string, rootpath string) ApplicationClient {
if rootpath != "" && !strings.HasPrefix(rootpath, "/") {
rootpath = "/" + rootpath
}

if !strings.Contains(address, "http") {
address = "http://" + address
}

if rootpath != "" {
address = address + rootpath
}

return &httpApplicationClient{
httpClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
// Support for insecure connections
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
baseUrl: address,
token: token,
rootpath: rootpath,
}
}

func (c *httpApplicationClient) execute(ctx context.Context, url string, result interface{}, printBody ...bool) error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+c.token)

res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

b, _ := io.ReadAll(res.Body)

isStatusOK := res.StatusCode >= 200 && res.StatusCode < 300
if !isStatusOK {
return fmt.Errorf("argocd server respond with code %d, msg is: %s", res.StatusCode, string(b))
}

err = json.Unmarshal(b, &result)
if err != nil {
return err
}
return nil
}

func (c *httpApplicationClient) GetChangeRevision(ctx context.Context, in *appclient.ChangeRevisionRequest, opts ...grpc.CallOption) (*appclient.ChangeRevisionResponse, error) {
params := fmt.Sprintf("?appName=%s&namespace=%s&currentRevision=%s&previousRevision=%s", in.GetAppName(), in.GetNamespace(), in.GetCurrentRevision(), in.GetPreviousRevision())

url := fmt.Sprintf("%s/api/v1/application/changeRevision%s", c.baseUrl, params)

changeRevisionResponse := &appclient.ChangeRevisionResponse{}
err := c.execute(ctx, url, changeRevisionResponse)
if err != nil {
return nil, err
}
return changeRevisionResponse, nil
}
69 changes: 69 additions & 0 deletions acr_controller/application/mocks/ApplicationClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions acr_controller/controller/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package application_change_revision_controller

import (
"sync"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
)

type subscriber struct {
ch chan *appv1.ApplicationWatchEvent
filters []func(*appv1.ApplicationWatchEvent) bool
}

func (s *subscriber) matches(event *appv1.ApplicationWatchEvent) bool {
for i := range s.filters {
if !s.filters[i](event) {
return false
}
}
return true
}

// Broadcaster is an interface for broadcasting application informer watch events to multiple subscribers.
type Broadcaster interface {
Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func()
OnAdd(interface{}, bool)
OnUpdate(interface{}, interface{})
OnDelete(interface{})
}

type broadcasterHandler struct {
lock sync.Mutex
subscribers []*subscriber
}

func NewBroadcaster() Broadcaster {
return &broadcasterHandler{}
}

func (b *broadcasterHandler) notify(event *appv1.ApplicationWatchEvent) {
val, ok := event.Application.Annotations[appv1.AnnotationKeyManifestGeneratePaths]
if !ok || val == "" {
return
}

// Make a local copy of b.subscribers, then send channel events outside the lock,
// to avoid data race on b.subscribers changes
subscribers := []*subscriber{}
b.lock.Lock()
subscribers = append(subscribers, b.subscribers...)
b.lock.Unlock()

for _, s := range subscribers {
if s.matches(event) {
select {
case s.ch <- event:
{
// log.Infof("adding application '%s' to channel", event.Application.Name)
}
default:
// drop event if cannot send right away
log.WithField("application", event.Application.Name).Warn("unable to send event notification")
}
}
}
}

// Subscribe forward application informer watch events to the provided channel.
// The watch events are dropped if no receives are reading events from the channel so the channel must have
// buffer if dropping events is not acceptable.
func (b *broadcasterHandler) Subscribe(ch chan *appv1.ApplicationWatchEvent, filters ...func(event *appv1.ApplicationWatchEvent) bool) func() {
b.lock.Lock()
defer b.lock.Unlock()
subscriber := &subscriber{ch, filters}
b.subscribers = append(b.subscribers, subscriber)
return func() {
b.lock.Lock()
defer b.lock.Unlock()
for i := range b.subscribers {
if b.subscribers[i] == subscriber {
b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
break
}
}
}
}

func (b *broadcasterHandler) OnAdd(obj interface{}, isInInitialList bool) {
if app, ok := obj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Added})
}
}

func (b *broadcasterHandler) OnUpdate(_, newObj interface{}) {
if app, ok := newObj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Modified})
}
}

func (b *broadcasterHandler) OnDelete(obj interface{}) {
if app, ok := obj.(*appv1.Application); ok {
b.notify(&appv1.ApplicationWatchEvent{Application: *app, Type: watch.Deleted})
}
}
85 changes: 85 additions & 0 deletions acr_controller/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package application_change_revision_controller

import (
"context"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

appclient "github.com/argoproj/argo-cd/v2/acr_controller/application"
"github.com/argoproj/argo-cd/v2/acr_controller/service"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"

appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
"github.com/argoproj/argo-cd/v2/util/settings"
)

var watchAPIBufferSize = 1000

type ACRController interface {
Run(ctx context.Context)
}

type applicationChangeRevisionController struct {
settingsMgr *settings.SettingsManager
appBroadcaster Broadcaster
cache *servercache.Cache
appLister applisters.ApplicationLister
applicationServiceClient appclient.ApplicationClient
acrService service.ACRService
applicationClientset appclientset.Interface
}

func NewApplicationChangeRevisionController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, applicationClientset appclientset.Interface) ACRController {
appBroadcaster := NewBroadcaster()
_, err := appInformer.AddEventHandler(appBroadcaster)
if err != nil {
log.Error(err)
}
return &applicationChangeRevisionController{
appBroadcaster: appBroadcaster,
cache: cache,
settingsMgr: settingsMgr,
applicationServiceClient: applicationServiceClient,
appLister: appLister,
applicationClientset: applicationClientset,
acrService: service.NewACRService(applicationClientset, applicationServiceClient),
}
}

func (c *applicationChangeRevisionController) Run(ctx context.Context) {
var logCtx log.FieldLogger = log.StandardLogger()

calculateIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, ts string) error {
if eventType == watch.Bookmark || eventType == watch.Deleted {
return nil // ignore this event
}

return c.acrService.ChangeRevision(ctx, &a)
}

// TODO: move to abstraction
eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize)
unsubscribe := c.appBroadcaster.Subscribe(eventsChannel)
defer unsubscribe()
for {
select {
case <-ctx.Done():
return
case event := <-eventsChannel:
// logCtx.Infof("channel size is %d", len(eventsChannel))

ts := time.Now().Format("2006-01-02T15:04:05.000Z")
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
err := calculateIfPermitted(ctx, event.Application, event.Type, ts)
if err != nil {
logCtx.WithError(err).Error("failed to calculate change revision")
}
cancel()
}
}
}
Loading
Loading