diff --git a/cmd/event-reporter-server/commands/event_reporter_server.go b/cmd/event-reporter-server/commands/event_reporter_server.go index 43b41695f3d44..bc2e5960a224f 100644 --- a/cmd/event-reporter-server/commands/event_reporter_server.go +++ b/cmd/event-reporter-server/commands/event_reporter_server.go @@ -1,254 +1,263 @@ package commands import ( - "context" - "fmt" - "math" - "time" - - "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - - "github.com/argoproj/argo-cd/v2/event_reporter" - appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" - "github.com/argoproj/argo-cd/v2/pkg/apiclient" - "github.com/argoproj/argo-cd/v2/pkg/codefresh" - - "github.com/argoproj/pkg/stats" - "github.com/redis/go-redis/v9" - log "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/clientcmd" - - cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" - "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - cacheutil "github.com/argoproj/argo-cd/v2/util/cache" - "github.com/argoproj/argo-cd/v2/util/cli" - "github.com/argoproj/argo-cd/v2/util/env" - "github.com/argoproj/argo-cd/v2/util/errors" - "github.com/argoproj/argo-cd/v2/util/kube" - "github.com/argoproj/argo-cd/v2/util/tls" + "context" + "fmt" + "github.com/argoproj/argo-cd/v2/pkg/sources_server_client" + "math" + "time" + + "github.com/argoproj/argo-cd/v2/event_reporter/reporter" + + "github.com/argoproj/argo-cd/v2/event_reporter" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" + "github.com/argoproj/argo-cd/v2/pkg/apiclient" + "github.com/argoproj/argo-cd/v2/pkg/codefresh" + + "github.com/argoproj/pkg/stats" + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + cmdutil "github.com/argoproj/argo-cd/v2/cmd/util" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" + repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + cacheutil "github.com/argoproj/argo-cd/v2/util/cache" + "github.com/argoproj/argo-cd/v2/util/cli" + "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/errors" + "github.com/argoproj/argo-cd/v2/util/kube" + "github.com/argoproj/argo-cd/v2/util/tls" ) const ( - failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT" - failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS" + failureRetryCountEnv = "EVENT_REPORTER_K8S_RETRY_COUNT" + failureRetryPeriodMilliSecondsEnv = "EVENT_REPORTE_K8S_RETRY_DURATION_MILLISECONDS" ) var ( - failureRetryCount = 0 - failureRetryPeriodMilliSeconds = 100 + failureRetryCount = 0 + failureRetryPeriodMilliSeconds = 100 ) func init() { - failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10) - failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) + failureRetryCount = env.ParseNumFromEnv(failureRetryCountEnv, failureRetryCount, 0, 10) + failureRetryPeriodMilliSeconds = env.ParseNumFromEnv(failureRetryPeriodMilliSecondsEnv, failureRetryPeriodMilliSeconds, 0, 1000) } func getApplicationClient(useGrpc bool, address, token string, path string) appclient.ApplicationClient { - if useGrpc { - applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ - ServerAddr: address, - Insecure: true, - GRPCWeb: true, - PlainText: true, - AuthToken: token, - GRPCWebRootPath: path, - }) + if useGrpc { + applicationClientSet, err := apiclient.NewClient(&apiclient.ClientOptions{ + ServerAddr: address, + Insecure: true, + GRPCWeb: true, + PlainText: true, + AuthToken: token, + GRPCWebRootPath: path, + }) - errors.CheckError(err) + errors.CheckError(err) - _, applicationClient, err := applicationClientSet.NewApplicationClient() + _, applicationClient, err := applicationClientSet.NewApplicationClient() - errors.CheckError(err) + errors.CheckError(err) - return applicationClient - } - return appclient.NewHttpApplicationClient(token, address, path) + return applicationClient + } + return appclient.NewHttpApplicationClient(token, address, path) } // NewCommand returns a new instance of an event reporter command func NewCommand() *cobra.Command { - var ( - redisClient *redis.Client - insecure bool - listenHost string - listenPort int - metricsHost string - metricsPort int - glogLevel int - clientConfig clientcmd.ClientConfig - repoServerTimeoutSeconds int - repoServerAddress string - applicationServerAddress string - cacheSrc func() (*servercache.Cache, error) - contentSecurityPolicy string - repoServerPlaintext bool - repoServerStrictTLS bool - applicationNamespaces []string - argocdToken string - codefreshTlsInsecure bool - codefreshTlsCertPath string - codefreshUrl string - codefreshToken string - shardingAlgorithm string - runtimeVersion string - rootpath string - useGrpc bool - - rateLimiterEnabled bool - rateLimiterBucketSize int - rateLimiterDuration time.Duration - rateLimiterLearningMode bool - ) - command := &cobra.Command{ - Use: cliName, - Short: "Run the Event Reporter server", - Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.", - DisableAutoGenTag: true, - Run: func(c *cobra.Command, args []string) { - ctx := c.Context() - - vers := common.GetVersion() - namespace, _, err := clientConfig.Namespace() - errors.CheckError(err) - vers.LogStartupInfo( - "Event Reporter Server", - map[string]any{ - "namespace": namespace, - "port": listenPort, - }, - ) - - cli.SetLogFormat(cmdutil.LogFormat) - cli.SetLogLevel(cmdutil.LogLevel) - cli.SetGLogLevel(glogLevel) - - config, err := clientConfig.ClientConfig() - errors.CheckError(err) - errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) - - cache, err := cacheSrc() - errors.CheckError(err) - - kubeclientset := kubernetes.NewForConfigOrDie(config) - - appclientsetConfig, err := clientConfig.ClientConfig() - errors.CheckError(err) - errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig)) - config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform) - - if failureRetryCount > 0 { - appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds) - } - appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig) - tlsConfig := repoapiclient.TLSConfiguration{ - DisableTLS: repoServerPlaintext, - StrictValidation: repoServerStrictTLS, - } - - // Load CA information to use for validating connections to the - // repository server, if strict TLS validation was requested. - if !repoServerPlaintext && repoServerStrictTLS { - pool, err := tls.LoadX509CertPool( - fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), - fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), - ) - if err != nil { - log.Fatalf("%v", err) - } - tlsConfig.Certificates = pool - } - - repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) - - eventReporterServerOpts := event_reporter.EventReporterServerOpts{ - ListenPort: listenPort, - ListenHost: listenHost, - MetricsPort: metricsPort, - MetricsHost: metricsHost, - Namespace: namespace, - KubeClientset: kubeclientset, - AppClientset: appClientSet, - RepoClientset: repoclientset, - Cache: cache, - RedisClient: redisClient, - ApplicationNamespaces: applicationNamespaces, - ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken, rootpath), - CodefreshConfig: &codefresh.CodefreshConfig{ - BaseURL: codefreshUrl, - AuthToken: codefreshToken, - TlsInsecure: codefreshTlsInsecure, - CaCertPath: codefreshTlsCertPath, - RuntimeVersion: runtimeVersion, - }, - RateLimiterOpts: &reporter.RateLimiterOpts{ - Enabled: rateLimiterEnabled, - Rate: rateLimiterDuration, - Capacity: rateLimiterBucketSize, - LearningMode: rateLimiterLearningMode, - }, - } - - log.Infof("Starting event reporter server with grpc transport %v", useGrpc) - - stats.RegisterStackDumper() - stats.StartStatsTicker(10 * time.Minute) - stats.RegisterHeapDumper("memprofile") - eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts) - eventReporterServer.Init(ctx) - lns, err := eventReporterServer.Listen() - errors.CheckError(err) - for { - var closer func() - ctx, cancel := context.WithCancel(ctx) - eventReporterServer.Run(ctx, lns) - cancel() - if closer != nil { - closer() - } - } - }, - } - - clientConfig = cli.AddKubectlFlagsToCmd(command) - command.Flags().StringVar(&rootpath, "argocd-server-path", env.StringFromEnv("ARGOCD_SERVER_ROOTPATH", ""), "Used if Argo CD is running behind reverse proxy under subpath different from /") - command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS") - command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json") - command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error") - command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") - command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address") - command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token") - command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address") - command.AddCommand(cli.NewVersionCmd(cliName)) - command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address") - command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port") - command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address") - command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port") - command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.") - command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".") - command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server") - command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server") - command.Flags().StringVar(&codefreshTlsCertPath, "codefresh-tls-cert-path", env.StringFromEnv("CODEFRESH_SSL_CERT_PATH", ""), "Codefresh TLS CA cert file path") - command.Flags().BoolVar(&codefreshTlsInsecure, "codefresh-tls-insecure", env.ParseBoolFromEnv("CODEFRESH_TLS_INSECURE", false), "Codefresh TLS insecure") - command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") - command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") - command.Flags().StringVar(&runtimeVersion, "codefresh-runtime-version", env.StringFromEnv("CODEFRESH_RUNTIME_VERSION", ""), "Codefresh runtime version to be reported with each event to platform") - command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces where application resources can be managed in") - command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", false), "Use grpc for interact with argocd server") - command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed") - command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.") - command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.") - command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )") - cacheSrc = servercache.AddCacheFlagsToCmd(command, cacheutil.Options{ - OnClientCreated: func(client *redis.Client) { - redisClient = client - }, - }) - return command + var ( + redisClient *redis.Client + insecure bool + listenHost string + listenPort int + metricsHost string + metricsPort int + glogLevel int + clientConfig clientcmd.ClientConfig + repoServerTimeoutSeconds int + repoServerAddress string + applicationServerAddress string + cacheSrc func() (*servercache.Cache, error) + contentSecurityPolicy string + repoServerPlaintext bool + repoServerStrictTLS bool + applicationNamespaces []string + argocdToken string + codefreshTlsInsecure bool + codefreshTlsCertPath string + codefreshUrl string + codefreshToken string + shardingAlgorithm string + runtimeVersion string + rootpath string + useGrpc bool + + rateLimiterEnabled bool + rateLimiterBucketSize int + rateLimiterDuration time.Duration + rateLimiterLearningMode bool + useSourcesServer bool + sourcesServerBaseURL string + ) + command := &cobra.Command{ + Use: cliName, + Short: "Run the Event Reporter server", + Long: "The Event reporter is a server that listens to Kubernetes events and reports them to the Codefresh server.", + DisableAutoGenTag: true, + Run: func(c *cobra.Command, args []string) { + ctx := c.Context() + + vers := common.GetVersion() + namespace, _, err := clientConfig.Namespace() + errors.CheckError(err) + vers.LogStartupInfo( + "Event Reporter Server", + map[string]any{ + "namespace": namespace, + "port": listenPort, + }, + ) + + cli.SetLogFormat(cmdutil.LogFormat) + cli.SetLogLevel(cmdutil.LogLevel) + cli.SetGLogLevel(glogLevel) + + config, err := clientConfig.ClientConfig() + errors.CheckError(err) + errors.CheckError(v1alpha1.SetK8SConfigDefaults(config)) + + cache, err := cacheSrc() + errors.CheckError(err) + + kubeclientset := kubernetes.NewForConfigOrDie(config) + + appclientsetConfig, err := clientConfig.ClientConfig() + errors.CheckError(err) + errors.CheckError(v1alpha1.SetK8SConfigDefaults(appclientsetConfig)) + config.UserAgent = fmt.Sprintf("argocd-server/%s (%s)", vers.Version, vers.Platform) + + if failureRetryCount > 0 { + appclientsetConfig = kube.AddFailureRetryWrapper(appclientsetConfig, failureRetryCount, failureRetryPeriodMilliSeconds) + } + appClientSet := appclientset.NewForConfigOrDie(appclientsetConfig) + tlsConfig := repoapiclient.TLSConfiguration{ + DisableTLS: repoServerPlaintext, + StrictValidation: repoServerStrictTLS, + } + + // Load CA information to use for validating connections to the + // repository server, if strict TLS validation was requested. + if !repoServerPlaintext && repoServerStrictTLS { + pool, err := tls.LoadX509CertPool( + fmt.Sprintf("%s/server/tls/tls.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), + fmt.Sprintf("%s/server/tls/ca.crt", env.StringFromEnv(common.EnvAppConfigPath, common.DefaultAppConfigPath)), + ) + if err != nil { + log.Fatalf("%v", err) + } + tlsConfig.Certificates = pool + } + + repoclientset := repoapiclient.NewRepoServerClientset(repoServerAddress, repoServerTimeoutSeconds, tlsConfig) + + eventReporterServerOpts := event_reporter.EventReporterServerOpts{ + ListenPort: listenPort, + ListenHost: listenHost, + MetricsPort: metricsPort, + MetricsHost: metricsHost, + Namespace: namespace, + KubeClientset: kubeclientset, + AppClientset: appClientSet, + RepoClientset: repoclientset, + Cache: cache, + RedisClient: redisClient, + ApplicationNamespaces: applicationNamespaces, + ApplicationServiceClient: getApplicationClient(useGrpc, applicationServerAddress, argocdToken, rootpath), + CodefreshConfig: &codefresh.CodefreshConfig{ + BaseURL: codefreshUrl, + AuthToken: codefreshToken, + TlsInsecure: codefreshTlsInsecure, + CaCertPath: codefreshTlsCertPath, + RuntimeVersion: runtimeVersion, + }, + RateLimiterOpts: &reporter.RateLimiterOpts{ + Enabled: rateLimiterEnabled, + Rate: rateLimiterDuration, + Capacity: rateLimiterBucketSize, + LearningMode: rateLimiterLearningMode, + }, + UseSourcesServer: useSourcesServer, + SourcesServerConfig: &sources_server_client.SourcesServerConfig{ + BaseURL: sourcesServerBaseURL, + }, + } + + log.Infof("Starting event reporter server with grpc transport %v", useGrpc) + + stats.RegisterStackDumper() + stats.StartStatsTicker(10 * time.Minute) + stats.RegisterHeapDumper("memprofile") + eventReporterServer := event_reporter.NewEventReporterServer(ctx, eventReporterServerOpts) + eventReporterServer.Init(ctx) + lns, err := eventReporterServer.Listen() + errors.CheckError(err) + for { + var closer func() + ctx, cancel := context.WithCancel(ctx) + eventReporterServer.Run(ctx, lns) + cancel() + if closer != nil { + closer() + } + } + }, + } + + clientConfig = cli.AddKubectlFlagsToCmd(command) + command.Flags().StringVar(&rootpath, "argocd-server-path", env.StringFromEnv("ARGOCD_SERVER_ROOTPATH", ""), "Used if Argo CD is running behind reverse proxy under subpath different from /") + command.Flags().BoolVar(&insecure, "insecure", env.ParseBoolFromEnv("EVENT_REPORTER_INSECURE", false), "Run server without TLS") + command.Flags().StringVar(&cmdutil.LogFormat, "logformat", env.StringFromEnv("EVENT_REPORTER_LOGFORMAT", "text"), "Set the logging format. One of: text|json") + command.Flags().StringVar(&cmdutil.LogLevel, "loglevel", env.StringFromEnv("EVENT_REPORTER_LOG_LEVEL", "info"), "Set the logging level. One of: debug|info|warn|error") + command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level") + command.Flags().StringVar(&applicationServerAddress, "application-server", env.StringFromEnv("EVENT_REPORTER_APPLICATION_SERVER", common.DefaultApplicationServerAddr), "Application server address") + command.Flags().StringVar(&argocdToken, "argocd-token", env.StringFromEnv("ARGOCD_TOKEN", ""), "ArgoCD server JWT token") + command.Flags().StringVar(&repoServerAddress, "repo-server", env.StringFromEnv("EVENT_REPORTER_REPO_SERVER", common.DefaultRepoServerAddr), "Repo server address") + command.AddCommand(cli.NewVersionCmd(cliName)) + command.Flags().StringVar(&listenHost, "address", env.StringFromEnv("EVENT_REPORTER_LISTEN_ADDRESS", common.DefaultAddressEventReporterServer), "Listen on given address") + command.Flags().IntVar(&listenPort, "port", common.DefaultPortEventReporterServer, "Listen on given port") + command.Flags().StringVar(&metricsHost, env.StringFromEnv("EVENT_REPORTER_METRICS_LISTEN_ADDRESS", "metrics-address"), common.DefaultAddressEventReporterServerMetrics, "Listen for metrics on given address") + command.Flags().IntVar(&metricsPort, "metrics-port", common.DefaultPortEventReporterServerMetrics, "Start metrics on given port") + command.Flags().IntVar(&repoServerTimeoutSeconds, "repo-server-timeout-seconds", env.ParseNumFromEnv("EVENT_REPORTER_REPO_SERVER_TIMEOUT_SECONDS", 60, 0, math.MaxInt64), "Repo server RPC call timeout seconds.") + command.Flags().StringVar(&contentSecurityPolicy, "content-security-policy", env.StringFromEnv("EVENT_REPORTER_CONTENT_SECURITY_POLICY", "frame-ancestors 'self';"), "Set Content-Security-Policy header in HTTP responses to `value`. To disable, set to \"\".") + command.Flags().BoolVar(&repoServerPlaintext, "repo-server-plaintext", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_PLAINTEXT", false), "Use a plaintext client (non-TLS) to connect to repository server") + command.Flags().BoolVar(&repoServerStrictTLS, "repo-server-strict-tls", env.ParseBoolFromEnv("EVENT_REPORTER_REPO_SERVER_STRICT_TLS", false), "Perform strict validation of TLS certificates when connecting to repo server") + command.Flags().StringVar(&codefreshTlsCertPath, "codefresh-tls-cert-path", env.StringFromEnv("CODEFRESH_SSL_CERT_PATH", ""), "Codefresh TLS CA cert file path") + command.Flags().BoolVar(&codefreshTlsInsecure, "codefresh-tls-insecure", env.ParseBoolFromEnv("CODEFRESH_TLS_INSECURE", false), "Codefresh TLS insecure") + command.Flags().StringVar(&codefreshUrl, "codefresh-url", env.StringFromEnv("CODEFRESH_URL", "https://g.codefresh.io"), "Codefresh API url") + command.Flags().StringVar(&codefreshToken, "codefresh-token", env.StringFromEnv("CODEFRESH_TOKEN", ""), "Codefresh token") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvEventReporterShardingAlgorithm, common.DefaultEventReporterShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy] ") + command.Flags().StringVar(&runtimeVersion, "codefresh-runtime-version", env.StringFromEnv("CODEFRESH_RUNTIME_VERSION", ""), "Codefresh runtime version to be reported with each event to platform") + command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces where application resources can be managed in") + command.Flags().BoolVar(&useGrpc, "grpc", env.ParseBoolFromEnv("USE_GRPC", false), "Use grpc for interact with argocd server") + command.Flags().BoolVar(&rateLimiterEnabled, "rate-limiter-enabled", env.ParseBoolFromEnv("RATE_LIMITER_ENABLED", false), "Use rate limiter for prevent queue to be overflowed") + command.Flags().IntVar(&rateLimiterBucketSize, "rate-limiter-bucket-size", env.ParseNumFromEnv("RATE_LIMITER_BUCKET_SIZE", math.MaxInt, 0, math.MaxInt), "The maximum amount of requests allowed per window.") + command.Flags().DurationVar(&rateLimiterDuration, "rate-limiter-period", env.ParseDurationFromEnv("RATE_LIMITER_DURATION", 24*time.Hour, 0, math.MaxInt64), "The rate limit window size.") + command.Flags().BoolVar(&rateLimiterLearningMode, "rate-limiter-learning-mode", env.ParseBoolFromEnv("RATE_LIMITER_LEARNING_MODE_ENABLED", false), "The rate limit enabled in learning mode ( not blocking sending to queue but logging it )") + command.Flags().BoolVar(&useSourcesServer, "use-sources-server", env.ParseBoolFromEnv("SOURCES_SERVER_ENABLED", false), "Use sources-server instead of repo-server fork") + command.Flags().StringVar(&sourcesServerBaseURL, "sources-server-base-url", env.StringFromEnv("SOURCES_SERVER_BASE_URL", common.DefaultSourcesServerAddr), "Sources-server base URL") + cacheSrc = servercache.AddCacheFlagsToCmd(command, cacheutil.Options{ + OnClientCreated: func(client *redis.Client) { + redisClient = client + }, + }) + return command } diff --git a/common/common.go b/common/common.go index d3b43828fe761..d22f5c07dee6b 100644 --- a/common/common.go +++ b/common/common.go @@ -1,431 +1,432 @@ package common import ( - "errors" - "os" - "path/filepath" - "strconv" - "time" - - "github.com/sirupsen/logrus" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "errors" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Component names const ( - ApplicationController = "argocd-application-controller" + ApplicationController = "argocd-application-controller" ) // Default service addresses and URLS of Argo CD internal services const ( - // DefaultRepoServerAddr is the gRPC address of the Argo CD repo server - DefaultRepoServerAddr = "argocd-repo-server:8081" - DefaultApplicationServerAddr = "argo-cd-server:80" - // DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against - DefaultDexServerAddr = "argocd-dex-server:5556" - // DefaultRedisAddr is the default redis address - DefaultRedisAddr = "argocd-redis:6379" + // DefaultRepoServerAddr is the gRPC address of the Argo CD repo server + DefaultRepoServerAddr = "argocd-repo-server:8081" + DefaultApplicationServerAddr = "argo-cd-server:80" + // DefaultDexServerAddr is the HTTP address of the Dex OIDC server, which we run a reverse proxy against + DefaultDexServerAddr = "argocd-dex-server:5556" + // DefaultRedisAddr is the default redis address + DefaultRedisAddr = "argocd-redis:6379" + DefaultSourcesServerAddr = "sources-server:8090" ) // Kubernetes ConfigMap and Secret resource names which hold Argo CD settings const ( - ArgoCDConfigMapName = "argocd-cm" - ArgoCDSecretName = "argocd-secret" - ArgoCDNotificationsConfigMapName = "argocd-notifications-cm" - ArgoCDNotificationsSecretName = "argocd-notifications-secret" - ArgoCDRBACConfigMapName = "argocd-rbac-cm" - // ArgoCDKnownHostsConfigMapName contains SSH known hosts data for connecting repositories. Will get mounted as volume to pods - ArgoCDKnownHostsConfigMapName = "argocd-ssh-known-hosts-cm" - // ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods - ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm" - ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm" - // ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping - ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm" + ArgoCDConfigMapName = "argocd-cm" + ArgoCDSecretName = "argocd-secret" + ArgoCDNotificationsConfigMapName = "argocd-notifications-cm" + ArgoCDNotificationsSecretName = "argocd-notifications-secret" + ArgoCDRBACConfigMapName = "argocd-rbac-cm" + // ArgoCDKnownHostsConfigMapName contains SSH known hosts data for connecting repositories. Will get mounted as volume to pods + ArgoCDKnownHostsConfigMapName = "argocd-ssh-known-hosts-cm" + // ArgoCDTLSCertsConfigMapName contains TLS certificate data for connecting repositories. Will get mounted as volume to pods + ArgoCDTLSCertsConfigMapName = "argocd-tls-certs-cm" + ArgoCDGPGKeysConfigMapName = "argocd-gpg-keys-cm" + // ArgoCDAppControllerShardConfigMapName contains the application controller to shard mapping + ArgoCDAppControllerShardConfigMapName = "argocd-app-controller-shard-cm" ) // Some default configurables const ( - DefaultSystemNamespace = "kube-system" - DefaultRepoType = "git" + DefaultSystemNamespace = "kube-system" + DefaultRepoType = "git" ) // Default listener ports for ArgoCD components const ( - DefaultPortAPIServer = 8080 - DefaultPortRepoServer = 8081 - DefaultPortArgoCDMetrics = 8082 - DefaultPortArgoCDAPIServerMetrics = 8083 - DefaultPortRepoServerMetrics = 8084 + DefaultPortAPIServer = 8080 + DefaultPortRepoServer = 8081 + DefaultPortArgoCDMetrics = 8082 + DefaultPortArgoCDAPIServerMetrics = 8083 + DefaultPortRepoServerMetrics = 8084 - DefaultPortEventReporterServerMetrics = 8087 - DefaultPortEventReporterServer = 8088 + DefaultPortEventReporterServerMetrics = 8087 + DefaultPortEventReporterServer = 8088 - DefaultPortACRServer = 8090 + DefaultPortACRServer = 8090 ) // DefaultAddressAPIServer for ArgoCD components const ( - DefaultAddressAdminDashboard = "localhost" - DefaultAddressAPIServer = "0.0.0.0" - DefaultAddressAPIServerMetrics = "0.0.0.0" - DefaultAddressRepoServer = "0.0.0.0" - DefaultAddressRepoServerMetrics = "0.0.0.0" - - DefaultAddressEventReporterServer = "0.0.0.0" - DefaultAddressACRController = "0.0.0.0" - DefaultAddressEventReporterServerMetrics = "0.0.0.0" + DefaultAddressAdminDashboard = "localhost" + DefaultAddressAPIServer = "0.0.0.0" + DefaultAddressAPIServerMetrics = "0.0.0.0" + DefaultAddressRepoServer = "0.0.0.0" + DefaultAddressRepoServerMetrics = "0.0.0.0" + + DefaultAddressEventReporterServer = "0.0.0.0" + DefaultAddressACRController = "0.0.0.0" + DefaultAddressEventReporterServerMetrics = "0.0.0.0" ) // Default paths on the pod's file system const ( - // DefaultPathTLSConfig is the default path where TLS certificates for repositories are located - DefaultPathTLSConfig = "/app/config/tls" - // DefaultPathSSHConfig is the default path where SSH known hosts are stored - DefaultPathSSHConfig = "/app/config/ssh" - // DefaultSSHKnownHostsName is the Default name for the SSH known hosts file - DefaultSSHKnownHostsName = "ssh_known_hosts" - // DefaultGnuPgHomePath is the Default path to GnuPG home directory - DefaultGnuPgHomePath = "/app/config/gpg/keys" - // DefaultAppConfigPath is the Default path to repo server TLS endpoint config - DefaultAppConfigPath = "/app/config" - // DefaultPluginSockFilePath is the Default path to cmp server plugin socket file - DefaultPluginSockFilePath = "/home/argocd/cmp-server/plugins" - // DefaultPluginConfigFilePath is the Default path to cmp server plugin configuration file - DefaultPluginConfigFilePath = "/home/argocd/cmp-server/config" - // PluginConfigFileName is the Plugin Config File is a ConfigManagementPlugin manifest located inside the plugin container - PluginConfigFileName = "plugin.yaml" + // DefaultPathTLSConfig is the default path where TLS certificates for repositories are located + DefaultPathTLSConfig = "/app/config/tls" + // DefaultPathSSHConfig is the default path where SSH known hosts are stored + DefaultPathSSHConfig = "/app/config/ssh" + // DefaultSSHKnownHostsName is the Default name for the SSH known hosts file + DefaultSSHKnownHostsName = "ssh_known_hosts" + // DefaultGnuPgHomePath is the Default path to GnuPG home directory + DefaultGnuPgHomePath = "/app/config/gpg/keys" + // DefaultAppConfigPath is the Default path to repo server TLS endpoint config + DefaultAppConfigPath = "/app/config" + // DefaultPluginSockFilePath is the Default path to cmp server plugin socket file + DefaultPluginSockFilePath = "/home/argocd/cmp-server/plugins" + // DefaultPluginConfigFilePath is the Default path to cmp server plugin configuration file + DefaultPluginConfigFilePath = "/home/argocd/cmp-server/config" + // PluginConfigFileName is the Plugin Config File is a ConfigManagementPlugin manifest located inside the plugin container + PluginConfigFileName = "plugin.yaml" ) // Argo CD application related constants const ( - // ArgoCDAdminUsername is the username of the 'admin' user - ArgoCDAdminUsername = "admin" - // ArgoCDUserAgentName is the default user-agent name used by the gRPC API client library and grpc-gateway - ArgoCDUserAgentName = "argocd-client" - // ArgoCDSSAManager is the default argocd manager name used by server-side apply syncs - ArgoCDSSAManager = "argocd-controller" - // AuthCookieName is the HTTP cookie name where we store our auth token - AuthCookieName = "argocd.token" - // StateCookieName is the HTTP cookie name that holds temporary nonce tokens for CSRF protection - StateCookieName = "argocd.oauthstate" - // StateCookieMaxAge is the maximum age of the oauth state cookie - StateCookieMaxAge = time.Minute * 5 - - // ChangePasswordSSOTokenMaxAge is the max token age for password change operation - ChangePasswordSSOTokenMaxAge = time.Minute * 5 - // GithubAppCredsExpirationDuration is the default time used to cache the GitHub app credentials - GithubAppCredsExpirationDuration = time.Minute * 60 - - // PasswordPatten is the default password patten - PasswordPatten = `^.{8,32}$` - - // LegacyShardingAlgorithm is the default value for Sharding Algorithm it uses an `uid` based distribution (non-uniform) - LegacyShardingAlgorithm = "legacy" - // RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution across all shards - RoundRobinShardingAlgorithm = "round-robin" - // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller - AppControllerHeartbeatUpdateRetryCount = 3 - - // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution across - // all shards but is optimised to handle sharding and/or cluster addition or removal. In case of sharding or - // cluster changes, this algorithm minimises the changes between shard and clusters assignments. - ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" - - DefaultShardingAlgorithm = LegacyShardingAlgorithm + // ArgoCDAdminUsername is the username of the 'admin' user + ArgoCDAdminUsername = "admin" + // ArgoCDUserAgentName is the default user-agent name used by the gRPC API client library and grpc-gateway + ArgoCDUserAgentName = "argocd-client" + // ArgoCDSSAManager is the default argocd manager name used by server-side apply syncs + ArgoCDSSAManager = "argocd-controller" + // AuthCookieName is the HTTP cookie name where we store our auth token + AuthCookieName = "argocd.token" + // StateCookieName is the HTTP cookie name that holds temporary nonce tokens for CSRF protection + StateCookieName = "argocd.oauthstate" + // StateCookieMaxAge is the maximum age of the oauth state cookie + StateCookieMaxAge = time.Minute * 5 + + // ChangePasswordSSOTokenMaxAge is the max token age for password change operation + ChangePasswordSSOTokenMaxAge = time.Minute * 5 + // GithubAppCredsExpirationDuration is the default time used to cache the GitHub app credentials + GithubAppCredsExpirationDuration = time.Minute * 60 + + // PasswordPatten is the default password patten + PasswordPatten = `^.{8,32}$` + + // LegacyShardingAlgorithm is the default value for Sharding Algorithm it uses an `uid` based distribution (non-uniform) + LegacyShardingAlgorithm = "legacy" + // RoundRobinShardingAlgorithm is a flag value that can be opted for Sharding Algorithm it uses an equal distribution across all shards + RoundRobinShardingAlgorithm = "round-robin" + // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller + AppControllerHeartbeatUpdateRetryCount = 3 + + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution across + // all shards but is optimised to handle sharding and/or cluster addition or removal. In case of sharding or + // cluster changes, this algorithm minimises the changes between shard and clusters assignments. + ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" + + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) // Dex related constants const ( - // DexAPIEndpoint is the endpoint where we serve the Dex API server - DexAPIEndpoint = "/api/dex" - // LoginEndpoint is Argo CD's shorthand login endpoint which redirects to dex's OAuth 2.0 provider's consent page - LoginEndpoint = "/auth/login" - // LogoutEndpoint is Argo CD's shorthand logout endpoint which invalidates OIDC session after logout - LogoutEndpoint = "/auth/logout" - // CallbackEndpoint is Argo CD's final callback endpoint we reach after OAuth 2.0 login flow has been completed - CallbackEndpoint = "/auth/callback" - // DexCallbackEndpoint is Argo CD's final callback endpoint when Dex is configured - DexCallbackEndpoint = "/api/dex/callback" - // ArgoCDClientAppName is name of the Oauth client app used when registering our web app to dex - ArgoCDClientAppName = "Argo CD" - // ArgoCDClientAppID is the Oauth client ID we will use when registering our app to dex - ArgoCDClientAppID = "argo-cd" - // ArgoCDCLIClientAppName is name of the Oauth client app used when registering our CLI to dex - ArgoCDCLIClientAppName = "Argo CD CLI" - // ArgoCDCLIClientAppID is the Oauth client ID we will use when registering our CLI to dex - ArgoCDCLIClientAppID = "argo-cd-cli" + // DexAPIEndpoint is the endpoint where we serve the Dex API server + DexAPIEndpoint = "/api/dex" + // LoginEndpoint is Argo CD's shorthand login endpoint which redirects to dex's OAuth 2.0 provider's consent page + LoginEndpoint = "/auth/login" + // LogoutEndpoint is Argo CD's shorthand logout endpoint which invalidates OIDC session after logout + LogoutEndpoint = "/auth/logout" + // CallbackEndpoint is Argo CD's final callback endpoint we reach after OAuth 2.0 login flow has been completed + CallbackEndpoint = "/auth/callback" + // DexCallbackEndpoint is Argo CD's final callback endpoint when Dex is configured + DexCallbackEndpoint = "/api/dex/callback" + // ArgoCDClientAppName is name of the Oauth client app used when registering our web app to dex + ArgoCDClientAppName = "Argo CD" + // ArgoCDClientAppID is the Oauth client ID we will use when registering our app to dex + ArgoCDClientAppID = "argo-cd" + // ArgoCDCLIClientAppName is name of the Oauth client app used when registering our CLI to dex + ArgoCDCLIClientAppName = "Argo CD CLI" + // ArgoCDCLIClientAppID is the Oauth client ID we will use when registering our CLI to dex + ArgoCDCLIClientAppID = "argo-cd-cli" ) // Resource metadata labels and annotations (keys and values) used by Argo CD components const ( - // LabelKeyAppInstance is the label key to use to uniquely identify the instance of an application - // The Argo CD application name is used as the instance name - LabelKeyAppInstance = "app.kubernetes.io/instance" - // LabelKeyAppName is the label key to use to uniquely identify the name of the Kubernetes application - LabelKeyAppName = "app.kubernetes.io/name" - // LabelKeyAutoLabelClusterInfo if set to true will automatically add extra labels from the cluster info (currently it only adds a k8s version label) - LabelKeyAutoLabelClusterInfo = "argocd.argoproj.io/auto-label-cluster-info" - // LabelKeyLegacyApplicationName is the legacy label (v0.10 and below) and is superseded by 'app.kubernetes.io/instance' - LabelKeyLegacyApplicationName = "applications.argoproj.io/app-name" - // LabelKeySecretType contains the type of argocd secret (currently: 'cluster', 'repository', 'repo-config' or 'repo-creds') - LabelKeySecretType = "argocd.argoproj.io/secret-type" - // LabelKeyClusterKubernetesVersion contains the kubernetes version of the cluster secret if it has been enabled - LabelKeyClusterKubernetesVersion = "argocd.argoproj.io/kubernetes-version" - // LabelValueSecretTypeCluster indicates a secret type of cluster - LabelValueSecretTypeCluster = "cluster" - // LabelValueSecretTypeRepository indicates a secret type of repository - LabelValueSecretTypeRepository = "repository" - // LabelValueSecretTypeRepoCreds indicates a secret type of repository credentials - LabelValueSecretTypeRepoCreds = "repo-creds" - - // AnnotationKeyAppInstance is the Argo CD application name is used as the instance name - AnnotationKeyAppInstance = "argocd.argoproj.io/tracking-id" - - // AnnotationCompareOptions is a comma-separated list of options for comparison - AnnotationCompareOptions = "argocd.argoproj.io/compare-options" - - // AnnotationKeyManagedBy is annotation name which indicates that k8s resource is managed by an application. - AnnotationKeyManagedBy = "managed-by" - // AnnotationValueManagedByArgoCD is a 'managed-by' annotation value for resources managed by Argo CD - AnnotationValueManagedByArgoCD = "argocd.argoproj.io" - - // AnnotationKeyLinkPrefix tells the UI to add an external link icon to the application node - // that links to the value given in the annotation. - // The annotation key must be followed by a unique identifier. Ex: link.argocd.argoproj.io/dashboard - // It's valid to have multiple annotations that match the prefix. - // Values can simply be a url or they can have - // an optional link title separated by a "|" - // Ex: "http://grafana.example.com/d/yu5UH4MMz/deployments" - // Ex: "Go to Dashboard|http://grafana.example.com/d/yu5UH4MMz/deployments" - AnnotationKeyLinkPrefix = "link.argocd.argoproj.io/" - - // AnnotationKeyAppSkipReconcile tells the Application to skip the Application controller reconcile. - // Skip reconcile when the value is "true" or any other string values that can be strconv.ParseBool() to be true. - AnnotationKeyAppSkipReconcile = "argocd.argoproj.io/skip-reconcile" - // LabelKeyComponentRepoServer is the label key to identify the component as repo-server - LabelKeyComponentRepoServer = "app.kubernetes.io/component" - // LabelValueComponentRepoServer is the label value for the repo-server component - LabelValueComponentRepoServer = "repo-server" + // LabelKeyAppInstance is the label key to use to uniquely identify the instance of an application + // The Argo CD application name is used as the instance name + LabelKeyAppInstance = "app.kubernetes.io/instance" + // LabelKeyAppName is the label key to use to uniquely identify the name of the Kubernetes application + LabelKeyAppName = "app.kubernetes.io/name" + // LabelKeyAutoLabelClusterInfo if set to true will automatically add extra labels from the cluster info (currently it only adds a k8s version label) + LabelKeyAutoLabelClusterInfo = "argocd.argoproj.io/auto-label-cluster-info" + // LabelKeyLegacyApplicationName is the legacy label (v0.10 and below) and is superseded by 'app.kubernetes.io/instance' + LabelKeyLegacyApplicationName = "applications.argoproj.io/app-name" + // LabelKeySecretType contains the type of argocd secret (currently: 'cluster', 'repository', 'repo-config' or 'repo-creds') + LabelKeySecretType = "argocd.argoproj.io/secret-type" + // LabelKeyClusterKubernetesVersion contains the kubernetes version of the cluster secret if it has been enabled + LabelKeyClusterKubernetesVersion = "argocd.argoproj.io/kubernetes-version" + // LabelValueSecretTypeCluster indicates a secret type of cluster + LabelValueSecretTypeCluster = "cluster" + // LabelValueSecretTypeRepository indicates a secret type of repository + LabelValueSecretTypeRepository = "repository" + // LabelValueSecretTypeRepoCreds indicates a secret type of repository credentials + LabelValueSecretTypeRepoCreds = "repo-creds" + + // AnnotationKeyAppInstance is the Argo CD application name is used as the instance name + AnnotationKeyAppInstance = "argocd.argoproj.io/tracking-id" + + // AnnotationCompareOptions is a comma-separated list of options for comparison + AnnotationCompareOptions = "argocd.argoproj.io/compare-options" + + // AnnotationKeyManagedBy is annotation name which indicates that k8s resource is managed by an application. + AnnotationKeyManagedBy = "managed-by" + // AnnotationValueManagedByArgoCD is a 'managed-by' annotation value for resources managed by Argo CD + AnnotationValueManagedByArgoCD = "argocd.argoproj.io" + + // AnnotationKeyLinkPrefix tells the UI to add an external link icon to the application node + // that links to the value given in the annotation. + // The annotation key must be followed by a unique identifier. Ex: link.argocd.argoproj.io/dashboard + // It's valid to have multiple annotations that match the prefix. + // Values can simply be a url or they can have + // an optional link title separated by a "|" + // Ex: "http://grafana.example.com/d/yu5UH4MMz/deployments" + // Ex: "Go to Dashboard|http://grafana.example.com/d/yu5UH4MMz/deployments" + AnnotationKeyLinkPrefix = "link.argocd.argoproj.io/" + + // AnnotationKeyAppSkipReconcile tells the Application to skip the Application controller reconcile. + // Skip reconcile when the value is "true" or any other string values that can be strconv.ParseBool() to be true. + AnnotationKeyAppSkipReconcile = "argocd.argoproj.io/skip-reconcile" + // LabelKeyComponentRepoServer is the label key to identify the component as repo-server + LabelKeyComponentRepoServer = "app.kubernetes.io/component" + // LabelValueComponentRepoServer is the label value for the repo-server component + LabelValueComponentRepoServer = "repo-server" ) // Environment variables for tuning and debugging Argo CD const ( - // EnvApplicationEventCacheDuration controls the expiration of application events cache - EnvApplicationEventCacheDuration = "ARGOCD_APP_EVENTS_CACHE_DURATION" - // EnvResourceEventCacheDuration controls the expiration of resource events cache - EnvResourceEventCacheDuration = "ARGOCD_RESOURCE_EVENTS_CACHE_DURATION" - // EnvEventReporterShardingAlgorithm is the distribution sharding algorithm to be used: legacy - EnvEventReporterShardingAlgorithm = "EVENT_REPORTER_SHARDING_ALGORITHM" - // EnvEventReporterReplicas is the number of EventReporter replicas - EnvEventReporterReplicas = "EVENT_REPORTER_REPLICAS" - // EnvEventReporterShard is the shard number that should be handled by reporter - EnvEventReporterShard = "EVENT_REPORTER_SHARD" - // EnvVarSSODebug is an environment variable to enable additional OAuth debugging in the API server - EnvVarSSODebug = "ARGOCD_SSO_DEBUG" - // EnvVarRBACDebug is an environment variable to enable additional RBAC debugging in the API server - EnvVarRBACDebug = "ARGOCD_RBAC_DEBUG" - // EnvVarSSHDataPath overrides the location where SSH known hosts for repo access data is stored - EnvVarSSHDataPath = "ARGOCD_SSH_DATA_PATH" - // EnvVarTLSDataPath overrides the location where TLS certificate for repo access data is stored - EnvVarTLSDataPath = "ARGOCD_TLS_DATA_PATH" - // EnvGitAttemptsCount specifies number of git remote operations attempts count - EnvGitAttemptsCount = "ARGOCD_GIT_ATTEMPTS_COUNT" - // EnvGitRetryMaxDuration specifies max duration of git remote operation retry - EnvGitRetryMaxDuration = "ARGOCD_GIT_RETRY_MAX_DURATION" - // EnvGitRetryDuration specifies duration of git remote operation retry - EnvGitRetryDuration = "ARGOCD_GIT_RETRY_DURATION" - // EnvGitRetryFactor specifies fator of git remote operation retry - EnvGitRetryFactor = "ARGOCD_GIT_RETRY_FACTOR" - // EnvGitSubmoduleEnabled overrides git submodule support, true by default - EnvGitSubmoduleEnabled = "ARGOCD_GIT_MODULES_ENABLED" - // EnvGnuPGHome is the path to ArgoCD's GnuPG keyring for signature verification - EnvGnuPGHome = "ARGOCD_GNUPGHOME" - // EnvWatchAPIBufferSize is the buffer size used to transfer K8S watch events to watch API consumer - EnvWatchAPIBufferSize = "ARGOCD_WATCH_API_BUFFER_SIZE" - // EnvPauseGenerationAfterFailedAttempts will pause manifest generation after the specified number of failed generation attempts - EnvPauseGenerationAfterFailedAttempts = "ARGOCD_PAUSE_GEN_AFTER_FAILED_ATTEMPTS" - // EnvPauseGenerationMinutes pauses manifest generation for the specified number of minutes, after sufficient manifest generation failures - EnvPauseGenerationMinutes = "ARGOCD_PAUSE_GEN_MINUTES" - // EnvPauseGenerationRequests pauses manifest generation for the specified number of requests, after sufficient manifest generation failures - EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS" - // EnvControllerReplicas is the number of controller replicas - EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS" - // EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard - EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME" - // EnvControllerShard is the shard number that should be handled by controller - EnvControllerShard = "ARGOCD_CONTROLLER_SHARD" - // EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin - EnvControllerShardingAlgorithm = "ARGOCD_CONTROLLER_SHARDING_ALGORITHM" - // EnvEnableDynamicClusterDistribution enables dynamic sharding (ALPHA) - EnvEnableDynamicClusterDistribution = "ARGOCD_ENABLE_DYNAMIC_CLUSTER_DISTRIBUTION" - // EnvEnableGRPCTimeHistogramEnv enables gRPC metrics collection - EnvEnableGRPCTimeHistogramEnv = "ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM" - // EnvGithubAppCredsExpirationDuration controls the caching of Github app credentials. This value is in minutes (default: 60) - EnvGithubAppCredsExpirationDuration = "ARGOCD_GITHUB_APP_CREDS_EXPIRATION_DURATION" - // EnvHelmIndexCacheDuration controls how the helm repository index file is cached for (default: 0) - EnvHelmIndexCacheDuration = "ARGOCD_HELM_INDEX_CACHE_DURATION" - // EnvAppConfigPath allows to override the configuration path for repo server - EnvAppConfigPath = "ARGOCD_APP_CONF_PATH" - // EnvLogFormat log format that is defined by `--logformat` option - EnvLogFormat = "ARGOCD_LOG_FORMAT" - // EnvLogLevel log level that is defined by `--loglevel` option - EnvLogLevel = "ARGOCD_LOG_LEVEL" - // EnvLogFormatEnableFullTimestamp enables the FullTimestamp option in logs - EnvLogFormatEnableFullTimestamp = "ARGOCD_LOG_FORMAT_ENABLE_FULL_TIMESTAMP" - // EnvMaxCookieNumber max number of chunks a cookie can be broken into - EnvMaxCookieNumber = "ARGOCD_MAX_COOKIE_NUMBER" - // EnvPluginSockFilePath allows to override the pluginSockFilePath for repo server and cmp server - EnvPluginSockFilePath = "ARGOCD_PLUGINSOCKFILEPATH" - // EnvCMPChunkSize defines the chunk size in bytes used when sending files to the cmp server - EnvCMPChunkSize = "ARGOCD_CMP_CHUNK_SIZE" - // EnvCMPWorkDir defines the full path of the work directory used by the CMP server - EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR" - // EnvGPGDataPath overrides the location where GPG keyring for signature verification is stored - EnvGPGDataPath = "ARGOCD_GPG_DATA_PATH" - // EnvServerName is the name of the Argo CD server component, as specified by the value under the LabelKeyAppName label key. - EnvServerName = "ARGOCD_SERVER_NAME" - // EnvRepoServerName is the name of the Argo CD repo server component, as specified by the value under the LabelKeyAppName label key. - EnvRepoServerName = "ARGOCD_REPO_SERVER_NAME" - // EnvAppControllerName is the name of the Argo CD application controller component, as specified by the value under the LabelKeyAppName label key. - EnvAppControllerName = "ARGOCD_APPLICATION_CONTROLLER_NAME" - // EnvRedisName is the name of the Argo CD redis component, as specified by the value under the LabelKeyAppName label key. - EnvRedisName = "ARGOCD_REDIS_NAME" - // EnvRedisHaProxyName is the name of the Argo CD Redis HA proxy component, as specified by the value under the LabelKeyAppName label key. - EnvRedisHaProxyName = "ARGOCD_REDIS_HAPROXY_NAME" - // EnvGRPCKeepAliveMin defines the GRPCKeepAliveEnforcementMinimum, used in the grpc.KeepaliveEnforcementPolicy. Expects a "Duration" format (e.g. 10s). - EnvGRPCKeepAliveMin = "ARGOCD_GRPC_KEEP_ALIVE_MIN" - // EnvServerSideDiff defines the env var used to enable ServerSide Diff feature. - // If defined, value must be "true" or "false". - EnvServerSideDiff = "ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF" - // EnvGRPCMaxSizeMB is the environment variable to look for a max GRPC message size - EnvGRPCMaxSizeMB = "ARGOCD_GRPC_MAX_SIZE_MB" + // EnvApplicationEventCacheDuration controls the expiration of application events cache + EnvApplicationEventCacheDuration = "ARGOCD_APP_EVENTS_CACHE_DURATION" + // EnvResourceEventCacheDuration controls the expiration of resource events cache + EnvResourceEventCacheDuration = "ARGOCD_RESOURCE_EVENTS_CACHE_DURATION" + // EnvEventReporterShardingAlgorithm is the distribution sharding algorithm to be used: legacy + EnvEventReporterShardingAlgorithm = "EVENT_REPORTER_SHARDING_ALGORITHM" + // EnvEventReporterReplicas is the number of EventReporter replicas + EnvEventReporterReplicas = "EVENT_REPORTER_REPLICAS" + // EnvEventReporterShard is the shard number that should be handled by reporter + EnvEventReporterShard = "EVENT_REPORTER_SHARD" + // EnvVarSSODebug is an environment variable to enable additional OAuth debugging in the API server + EnvVarSSODebug = "ARGOCD_SSO_DEBUG" + // EnvVarRBACDebug is an environment variable to enable additional RBAC debugging in the API server + EnvVarRBACDebug = "ARGOCD_RBAC_DEBUG" + // EnvVarSSHDataPath overrides the location where SSH known hosts for repo access data is stored + EnvVarSSHDataPath = "ARGOCD_SSH_DATA_PATH" + // EnvVarTLSDataPath overrides the location where TLS certificate for repo access data is stored + EnvVarTLSDataPath = "ARGOCD_TLS_DATA_PATH" + // EnvGitAttemptsCount specifies number of git remote operations attempts count + EnvGitAttemptsCount = "ARGOCD_GIT_ATTEMPTS_COUNT" + // EnvGitRetryMaxDuration specifies max duration of git remote operation retry + EnvGitRetryMaxDuration = "ARGOCD_GIT_RETRY_MAX_DURATION" + // EnvGitRetryDuration specifies duration of git remote operation retry + EnvGitRetryDuration = "ARGOCD_GIT_RETRY_DURATION" + // EnvGitRetryFactor specifies fator of git remote operation retry + EnvGitRetryFactor = "ARGOCD_GIT_RETRY_FACTOR" + // EnvGitSubmoduleEnabled overrides git submodule support, true by default + EnvGitSubmoduleEnabled = "ARGOCD_GIT_MODULES_ENABLED" + // EnvGnuPGHome is the path to ArgoCD's GnuPG keyring for signature verification + EnvGnuPGHome = "ARGOCD_GNUPGHOME" + // EnvWatchAPIBufferSize is the buffer size used to transfer K8S watch events to watch API consumer + EnvWatchAPIBufferSize = "ARGOCD_WATCH_API_BUFFER_SIZE" + // EnvPauseGenerationAfterFailedAttempts will pause manifest generation after the specified number of failed generation attempts + EnvPauseGenerationAfterFailedAttempts = "ARGOCD_PAUSE_GEN_AFTER_FAILED_ATTEMPTS" + // EnvPauseGenerationMinutes pauses manifest generation for the specified number of minutes, after sufficient manifest generation failures + EnvPauseGenerationMinutes = "ARGOCD_PAUSE_GEN_MINUTES" + // EnvPauseGenerationRequests pauses manifest generation for the specified number of requests, after sufficient manifest generation failures + EnvPauseGenerationRequests = "ARGOCD_PAUSE_GEN_REQUESTS" + // EnvControllerReplicas is the number of controller replicas + EnvControllerReplicas = "ARGOCD_CONTROLLER_REPLICAS" + // EnvControllerHeartbeatTime will update the heartbeat for application controller to claim shard + EnvControllerHeartbeatTime = "ARGOCD_CONTROLLER_HEARTBEAT_TIME" + // EnvControllerShard is the shard number that should be handled by controller + EnvControllerShard = "ARGOCD_CONTROLLER_SHARD" + // EnvControllerShardingAlgorithm is the distribution sharding algorithm to be used: legacy or round-robin + EnvControllerShardingAlgorithm = "ARGOCD_CONTROLLER_SHARDING_ALGORITHM" + // EnvEnableDynamicClusterDistribution enables dynamic sharding (ALPHA) + EnvEnableDynamicClusterDistribution = "ARGOCD_ENABLE_DYNAMIC_CLUSTER_DISTRIBUTION" + // EnvEnableGRPCTimeHistogramEnv enables gRPC metrics collection + EnvEnableGRPCTimeHistogramEnv = "ARGOCD_ENABLE_GRPC_TIME_HISTOGRAM" + // EnvGithubAppCredsExpirationDuration controls the caching of Github app credentials. This value is in minutes (default: 60) + EnvGithubAppCredsExpirationDuration = "ARGOCD_GITHUB_APP_CREDS_EXPIRATION_DURATION" + // EnvHelmIndexCacheDuration controls how the helm repository index file is cached for (default: 0) + EnvHelmIndexCacheDuration = "ARGOCD_HELM_INDEX_CACHE_DURATION" + // EnvAppConfigPath allows to override the configuration path for repo server + EnvAppConfigPath = "ARGOCD_APP_CONF_PATH" + // EnvLogFormat log format that is defined by `--logformat` option + EnvLogFormat = "ARGOCD_LOG_FORMAT" + // EnvLogLevel log level that is defined by `--loglevel` option + EnvLogLevel = "ARGOCD_LOG_LEVEL" + // EnvLogFormatEnableFullTimestamp enables the FullTimestamp option in logs + EnvLogFormatEnableFullTimestamp = "ARGOCD_LOG_FORMAT_ENABLE_FULL_TIMESTAMP" + // EnvMaxCookieNumber max number of chunks a cookie can be broken into + EnvMaxCookieNumber = "ARGOCD_MAX_COOKIE_NUMBER" + // EnvPluginSockFilePath allows to override the pluginSockFilePath for repo server and cmp server + EnvPluginSockFilePath = "ARGOCD_PLUGINSOCKFILEPATH" + // EnvCMPChunkSize defines the chunk size in bytes used when sending files to the cmp server + EnvCMPChunkSize = "ARGOCD_CMP_CHUNK_SIZE" + // EnvCMPWorkDir defines the full path of the work directory used by the CMP server + EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR" + // EnvGPGDataPath overrides the location where GPG keyring for signature verification is stored + EnvGPGDataPath = "ARGOCD_GPG_DATA_PATH" + // EnvServerName is the name of the Argo CD server component, as specified by the value under the LabelKeyAppName label key. + EnvServerName = "ARGOCD_SERVER_NAME" + // EnvRepoServerName is the name of the Argo CD repo server component, as specified by the value under the LabelKeyAppName label key. + EnvRepoServerName = "ARGOCD_REPO_SERVER_NAME" + // EnvAppControllerName is the name of the Argo CD application controller component, as specified by the value under the LabelKeyAppName label key. + EnvAppControllerName = "ARGOCD_APPLICATION_CONTROLLER_NAME" + // EnvRedisName is the name of the Argo CD redis component, as specified by the value under the LabelKeyAppName label key. + EnvRedisName = "ARGOCD_REDIS_NAME" + // EnvRedisHaProxyName is the name of the Argo CD Redis HA proxy component, as specified by the value under the LabelKeyAppName label key. + EnvRedisHaProxyName = "ARGOCD_REDIS_HAPROXY_NAME" + // EnvGRPCKeepAliveMin defines the GRPCKeepAliveEnforcementMinimum, used in the grpc.KeepaliveEnforcementPolicy. Expects a "Duration" format (e.g. 10s). + EnvGRPCKeepAliveMin = "ARGOCD_GRPC_KEEP_ALIVE_MIN" + // EnvServerSideDiff defines the env var used to enable ServerSide Diff feature. + // If defined, value must be "true" or "false". + EnvServerSideDiff = "ARGOCD_APPLICATION_CONTROLLER_SERVER_SIDE_DIFF" + // EnvGRPCMaxSizeMB is the environment variable to look for a max GRPC message size + EnvGRPCMaxSizeMB = "ARGOCD_GRPC_MAX_SIZE_MB" ) // Config Management Plugin related constants const ( - // DefaultCMPChunkSize defines chunk size in bytes used when sending files to the cmp server - DefaultCMPChunkSize = 1024 + // DefaultCMPChunkSize defines chunk size in bytes used when sending files to the cmp server + DefaultCMPChunkSize = 1024 - // DefaultCMPWorkDirName defines the work directory name used by the cmp-server - DefaultCMPWorkDirName = "_cmp_server" + // DefaultCMPWorkDirName defines the work directory name used by the cmp-server + DefaultCMPWorkDirName = "_cmp_server" - ConfigMapPluginDeprecationWarning = "argocd-cm plugins are deprecated, and support will be removed in v2.7. Upgrade your plugin to be installed via sidecar. https://argo-cd.readthedocs.io/en/stable/user-guide/config-management-plugins/" + ConfigMapPluginDeprecationWarning = "argocd-cm plugins are deprecated, and support will be removed in v2.7. Upgrade your plugin to be installed via sidecar. https://argo-cd.readthedocs.io/en/stable/user-guide/config-management-plugins/" ) const ( - // MinClientVersion is the minimum client version that can interface with this API server. - // When introducing breaking changes to the API or datastructures, this number should be bumped. - // The value here may be lower than the current value in VERSION - MinClientVersion = "1.4.0" - // CacheVersion is a objects version cached using util/cache/cache.go. - // Number should be bumped in case of backward incompatible change to make sure cache is invalidated after upgrade. - CacheVersion = "1.8.3" + // MinClientVersion is the minimum client version that can interface with this API server. + // When introducing breaking changes to the API or datastructures, this number should be bumped. + // The value here may be lower than the current value in VERSION + MinClientVersion = "1.4.0" + // CacheVersion is a objects version cached using util/cache/cache.go. + // Number should be bumped in case of backward incompatible change to make sure cache is invalidated after upgrade. + CacheVersion = "1.8.3" ) // Constants used by util/clusterauth package const ( - ClusterAuthRequestTimeout = 10 * time.Second - BearerTokenTimeout = 30 * time.Second + ClusterAuthRequestTimeout = 10 * time.Second + BearerTokenTimeout = 30 * time.Second ) const ( - DefaultGitRetryMaxDuration time.Duration = time.Second * 5 // 5s - DefaultGitRetryDuration time.Duration = time.Millisecond * 250 // 0.25s - DefaultGitRetryFactor = int64(2) + DefaultGitRetryMaxDuration time.Duration = time.Second * 5 // 5s + DefaultGitRetryDuration time.Duration = time.Millisecond * 250 // 0.25s + DefaultGitRetryFactor = int64(2) ) // Constants represent the pod selector labels of the Argo CD component names. These values are determined by the // installation manifests. const ( - DefaultServerName = "argocd-server" - DefaultRepoServerName = "argocd-repo-server" - DefaultApplicationControllerName = "argocd-application-controller" - DefaultRedisName = "argocd-redis" - DefaultRedisHaProxyName = "argocd-redis-ha-haproxy" + DefaultServerName = "argocd-server" + DefaultRepoServerName = "argocd-repo-server" + DefaultApplicationControllerName = "argocd-application-controller" + DefaultRedisName = "argocd-redis" + DefaultRedisHaProxyName = "argocd-redis-ha-haproxy" ) // GetGnuPGHomePath retrieves the path to use for GnuPG home directory, which is either taken from GNUPGHOME environment or a default value func GetGnuPGHomePath() string { - if gnuPgHome := os.Getenv(EnvGnuPGHome); gnuPgHome == "" { - return DefaultGnuPgHomePath - } else { - return gnuPgHome - } + if gnuPgHome := os.Getenv(EnvGnuPGHome); gnuPgHome == "" { + return DefaultGnuPgHomePath + } else { + return gnuPgHome + } } // GetPluginSockFilePath retrieves the path of plugin sock file, which is either taken from PluginSockFilePath environment or a default value func GetPluginSockFilePath() string { - if pluginSockFilePath := os.Getenv(EnvPluginSockFilePath); pluginSockFilePath == "" { - return DefaultPluginSockFilePath - } else { - return pluginSockFilePath - } + if pluginSockFilePath := os.Getenv(EnvPluginSockFilePath); pluginSockFilePath == "" { + return DefaultPluginSockFilePath + } else { + return pluginSockFilePath + } } // GetCMPChunkSize will return the env var EnvCMPChunkSize value if defined or DefaultCMPChunkSize otherwise. // If EnvCMPChunkSize is defined but not a valid int, DefaultCMPChunkSize will be returned func GetCMPChunkSize() int { - if chunkSizeStr := os.Getenv(EnvCMPChunkSize); chunkSizeStr != "" { - chunkSize, err := strconv.Atoi(chunkSizeStr) - if err != nil { - logrus.Warnf("invalid env var value for %s: not a valid int: %s. Default value will be used.", EnvCMPChunkSize, err) - return DefaultCMPChunkSize - } - return chunkSize - } - return DefaultCMPChunkSize + if chunkSizeStr := os.Getenv(EnvCMPChunkSize); chunkSizeStr != "" { + chunkSize, err := strconv.Atoi(chunkSizeStr) + if err != nil { + logrus.Warnf("invalid env var value for %s: not a valid int: %s. Default value will be used.", EnvCMPChunkSize, err) + return DefaultCMPChunkSize + } + return chunkSize + } + return DefaultCMPChunkSize } // GetCMPWorkDir will return the full path of the work directory used by the CMP server. // This directory and all it's contents will be deleted during CMP bootstrap. func GetCMPWorkDir() string { - if workDir := os.Getenv(EnvCMPWorkDir); workDir != "" { - return filepath.Join(workDir, DefaultCMPWorkDirName) - } - return filepath.Join(os.TempDir(), DefaultCMPWorkDirName) + if workDir := os.Getenv(EnvCMPWorkDir); workDir != "" { + return filepath.Join(workDir, DefaultCMPWorkDirName) + } + return filepath.Join(os.TempDir(), DefaultCMPWorkDirName) } const ( - // AnnotationApplicationSetRefresh is an annotation that is added when an ApplicationSet is requested to be refreshed by a webhook. The ApplicationSet controller will remove this annotation at the end of reconciliation. - AnnotationApplicationSetRefresh = "argocd.argoproj.io/application-set-refresh" + // AnnotationApplicationSetRefresh is an annotation that is added when an ApplicationSet is requested to be refreshed by a webhook. The ApplicationSet controller will remove this annotation at the end of reconciliation. + AnnotationApplicationSetRefresh = "argocd.argoproj.io/application-set-refresh" ) // gRPC settings const ( - defaultGRPCKeepAliveEnforcementMinimum = 10 * time.Second + defaultGRPCKeepAliveEnforcementMinimum = 10 * time.Second ) func GetGRPCKeepAliveEnforcementMinimum() time.Duration { - if GRPCKeepAliveMinStr := os.Getenv(EnvGRPCKeepAliveMin); GRPCKeepAliveMinStr != "" { - GRPCKeepAliveMin, err := time.ParseDuration(GRPCKeepAliveMinStr) - if err != nil { - logrus.Warnf("invalid env var value for %s: cannot parse: %s. Default value %s will be used.", EnvGRPCKeepAliveMin, err, defaultGRPCKeepAliveEnforcementMinimum) - return defaultGRPCKeepAliveEnforcementMinimum - } - return GRPCKeepAliveMin - } - return defaultGRPCKeepAliveEnforcementMinimum + if GRPCKeepAliveMinStr := os.Getenv(EnvGRPCKeepAliveMin); GRPCKeepAliveMinStr != "" { + GRPCKeepAliveMin, err := time.ParseDuration(GRPCKeepAliveMinStr) + if err != nil { + logrus.Warnf("invalid env var value for %s: cannot parse: %s. Default value %s will be used.", EnvGRPCKeepAliveMin, err, defaultGRPCKeepAliveEnforcementMinimum) + return defaultGRPCKeepAliveEnforcementMinimum + } + return GRPCKeepAliveMin + } + return defaultGRPCKeepAliveEnforcementMinimum } func GetGRPCKeepAliveTime() time.Duration { - // GRPCKeepAliveTime is 2x enforcement minimum to ensure network jitter does not introduce ENHANCE_YOUR_CALM errors - return 2 * GetGRPCKeepAliveEnforcementMinimum() + // GRPCKeepAliveTime is 2x enforcement minimum to ensure network jitter does not introduce ENHANCE_YOUR_CALM errors + return 2 * GetGRPCKeepAliveEnforcementMinimum() } // Security severity logging const ( - SecurityField = "security" - // SecurityCWEField is the logs field for the CWE associated with a log line. CWE stands for Common Weakness Enumeration. See https://cwe.mitre.org/ - SecurityCWEField = "CWE" - SecurityCWEIncompleteCleanup = 459 - SecurityCWEMissingReleaseOfFileDescriptor = 775 - SecurityEmergency = 5 // Indicates unmistakably malicious events that should NEVER occur accidentally and indicates an active attack (i.e. brute forcing, DoS) - SecurityCritical = 4 // Indicates any malicious or exploitable event that had a side effect (i.e. secrets being left behind on the filesystem) - SecurityHigh = 3 // Indicates likely malicious events but one that had no side effects or was blocked (i.e. out of bounds symlinks in repos) - SecurityMedium = 2 // Could indicate malicious events, but has a high likelihood of being user/system error (i.e. access denied) - SecurityLow = 1 // Unexceptional entries (i.e. successful access logs) + SecurityField = "security" + // SecurityCWEField is the logs field for the CWE associated with a log line. CWE stands for Common Weakness Enumeration. See https://cwe.mitre.org/ + SecurityCWEField = "CWE" + SecurityCWEIncompleteCleanup = 459 + SecurityCWEMissingReleaseOfFileDescriptor = 775 + SecurityEmergency = 5 // Indicates unmistakably malicious events that should NEVER occur accidentally and indicates an active attack (i.e. brute forcing, DoS) + SecurityCritical = 4 // Indicates any malicious or exploitable event that had a side effect (i.e. secrets being left behind on the filesystem) + SecurityHigh = 3 // Indicates likely malicious events but one that had no side effects or was blocked (i.e. out of bounds symlinks in repos) + SecurityMedium = 2 // Could indicate malicious events, but has a high likelihood of being user/system error (i.e. access denied) + SecurityLow = 1 // Unexceptional entries (i.e. successful access logs) ) // TokenVerificationError is a generic error message for a failure to verify a JWT @@ -437,6 +438,6 @@ var PermissionDeniedAPIError = status.Error(codes.PermissionDenied, "permission // CF Event reporter constants const ( - EventReporterLegacyShardingAlgorithm = "legacy" - DefaultEventReporterShardingAlgorithm = EventReporterLegacyShardingAlgorithm + EventReporterLegacyShardingAlgorithm = "legacy" + DefaultEventReporterShardingAlgorithm = EventReporterLegacyShardingAlgorithm ) diff --git a/event_reporter/controller/controller.go b/event_reporter/controller/controller.go index 262f55a7a7c34..4cfc55f9d697e 100644 --- a/event_reporter/controller/controller.go +++ b/event_reporter/controller/controller.go @@ -1,126 +1,127 @@ package controller import ( - "context" - "math" - "strings" - "time" + "context" + "github.com/argoproj/argo-cd/v2/pkg/sources_server_client" + "math" + "strings" + "time" - "github.com/argoproj/argo-cd/v2/util/db" + "github.com/argoproj/argo-cd/v2/util/db" - appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" - log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/pkg/codefresh" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - argoutil "github.com/argoproj/argo-cd/v2/util/argo" - "github.com/argoproj/argo-cd/v2/util/env" - "github.com/argoproj/argo-cd/v2/util/settings" + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + "github.com/argoproj/argo-cd/v2/event_reporter/reporter" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/pkg/codefresh" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + argoutil "github.com/argoproj/argo-cd/v2/util/argo" + "github.com/argoproj/argo-cd/v2/util/env" + "github.com/argoproj/argo-cd/v2/util/settings" ) var ( - watchAPIBufferSize = 1000 - applicationEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvApplicationEventCacheDuration, 20, 0, math.MaxInt32)) + watchAPIBufferSize = 1000 + applicationEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvApplicationEventCacheDuration, 20, 0, math.MaxInt32)) ) type EventReporterController interface { - Run(ctx context.Context) + Run(ctx context.Context) } type eventReporterController struct { - settingsMgr *settings.SettingsManager - appBroadcaster reporter.Broadcaster - applicationEventReporter reporter.ApplicationEventReporter - cache *servercache.Cache - appLister applisters.ApplicationLister - applicationServiceClient appclient.ApplicationClient - metricsServer *metrics.MetricsServer + settingsMgr *settings.SettingsManager + appBroadcaster reporter.Broadcaster + applicationEventReporter reporter.ApplicationEventReporter + cache *servercache.Cache + appLister applisters.ApplicationLister + applicationServiceClient appclient.ApplicationClient + metricsServer *metrics.MetricsServer } -func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB) EventReporterController { - appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts) - _, err := appInformer.AddEventHandler(appBroadcaster) - if err != nil { - log.Error(err) - } - return &eventReporterController{ - appBroadcaster: appBroadcaster, - applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db), - cache: cache, - settingsMgr: settingsMgr, - applicationServiceClient: applicationServiceClient, - appLister: appLister, - metricsServer: metricsServer, - } +func NewEventReporterController(appInformer cache.SharedIndexInformer, cache *servercache.Cache, settingsMgr *settings.SettingsManager, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, featureManager *reporter.FeatureManager, rateLimiterOpts *reporter.RateLimiterOpts, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) EventReporterController { + appBroadcaster := reporter.NewBroadcaster(featureManager, metricsServer, rateLimiterOpts) + _, err := appInformer.AddEventHandler(appBroadcaster) + if err != nil { + log.Error(err) + } + return &eventReporterController{ + appBroadcaster: appBroadcaster, + applicationEventReporter: reporter.NewApplicationEventReporter(cache, applicationServiceClient, appLister, codefreshConfig, metricsServer, db, useSourcesServer, sourcesServerConfig), + cache: cache, + settingsMgr: settingsMgr, + applicationServiceClient: applicationServiceClient, + appLister: appLister, + metricsServer: metricsServer, + } } func (c *eventReporterController) Run(ctx context.Context) { - var logCtx log.FieldLogger = log.StandardLogger() + var logCtx log.FieldLogger = log.StandardLogger() - // sendIfPermitted is a helper to send the application to the client's streaming channel if the - // caller has RBAC privileges permissions to view it - sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, eventProcessingStartedAt string, ignoreResourceCache bool) error { - if eventType == watch.Bookmark { - return nil // ignore this event - } + // sendIfPermitted is a helper to send the application to the client's streaming channel if the + // caller has RBAC privileges permissions to view it + sendIfPermitted := func(ctx context.Context, a appv1.Application, eventType watch.EventType, eventProcessingStartedAt string, ignoreResourceCache bool) error { + if eventType == watch.Bookmark { + return nil // ignore this event + } - appInstanceLabelKey, err := c.settingsMgr.GetAppInstanceLabelKey() - if err != nil { - return err - } - trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr) + appInstanceLabelKey, err := c.settingsMgr.GetAppInstanceLabelKey() + if err != nil { + return err + } + trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr) - err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, &reporter.ArgoTrackingMetadata{ - AppInstanceLabelKey: &appInstanceLabelKey, - TrackingMethod: &trackingMethod, - }) - if err != nil { - return err - } + err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, &reporter.ArgoTrackingMetadata{ + AppInstanceLabelKey: &appInstanceLabelKey, + TrackingMethod: &trackingMethod, + }) + if err != nil { + return err + } - if err := c.cache.SetLastApplicationEvent(&a, applicationEventCacheExpiration); err != nil { - logCtx.WithError(err).Error("failed to cache last sent application event") - return err - } - return nil - } + if err := c.cache.SetLastApplicationEvent(&a, applicationEventCacheExpiration); err != nil { + logCtx.WithError(err).Error("failed to cache last sent application event") + return err + } + return nil + } - // TODO: move to abstraction - eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) - unsubscribe := c.appBroadcaster.Subscribe(eventsChannel) - defer unsubscribe() - for { - select { - case <-ctx.Done(): - return - case event := <-eventsChannel: - logCtx.Infof("channel size is %d", len(eventsChannel)) - c.metricsServer.SetQueueSizeGauge(len(eventsChannel)) - shouldProcess, ignoreResourceCache := c.applicationEventReporter.ShouldSendApplicationEvent(event) - if !shouldProcess { - logCtx.Infof("Skipping event %s/%s", event.Application.Name, event.Type) - c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name) - continue - } - eventProcessingStartedAt := time.Now().Format("2006-01-02T15:04:05.000Z") - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - err := sendIfPermitted(ctx, event.Application, event.Type, eventProcessingStartedAt, ignoreResourceCache) - if err != nil { - logCtx.WithError(err).Error("failed to stream application events") - if strings.Contains(err.Error(), "context deadline exceeded") { - logCtx.Info("Closing event-source connection") - cancel() - } - } - cancel() - } - } + // TODO: move to abstraction + eventsChannel := make(chan *appv1.ApplicationWatchEvent, watchAPIBufferSize) + unsubscribe := c.appBroadcaster.Subscribe(eventsChannel) + defer unsubscribe() + for { + select { + case <-ctx.Done(): + return + case event := <-eventsChannel: + logCtx.Infof("channel size is %d", len(eventsChannel)) + c.metricsServer.SetQueueSizeGauge(len(eventsChannel)) + shouldProcess, ignoreResourceCache := c.applicationEventReporter.ShouldSendApplicationEvent(event) + if !shouldProcess { + logCtx.Infof("Skipping event %s/%s", event.Application.Name, event.Type) + c.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricAppEventType, event.Application.Name) + continue + } + eventProcessingStartedAt := time.Now().Format("2006-01-02T15:04:05.000Z") + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + err := sendIfPermitted(ctx, event.Application, event.Type, eventProcessingStartedAt, ignoreResourceCache) + if err != nil { + logCtx.WithError(err).Error("failed to stream application events") + if strings.Contains(err.Error(), "context deadline exceeded") { + logCtx.Info("Closing event-source connection") + cancel() + } + } + cancel() + } + } } diff --git a/event_reporter/reporter/application_event_reporter.go b/event_reporter/reporter/application_event_reporter.go index 359f694f7af5a..2d9a29f4cd724 100644 --- a/event_reporter/reporter/application_event_reporter.go +++ b/event_reporter/reporter/application_event_reporter.go @@ -1,574 +1,596 @@ package reporter import ( - "context" - "encoding/json" - "fmt" - "math" - "reflect" - "strings" - "time" - - "github.com/argoproj/argo-cd/v2/util/db" - - "github.com/argoproj/argo-cd/v2/event_reporter/utils" - - argoutils "github.com/argoproj/argo-cd/v2/util/argo" - - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - - argocommon "github.com/argoproj/argo-cd/v2/common" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - "github.com/argoproj/argo-cd/v2/pkg/codefresh" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/util/env" - - "github.com/argoproj/gitops-engine/pkg/health" - "github.com/argoproj/gitops-engine/pkg/utils/kube" - "github.com/argoproj/gitops-engine/pkg/utils/text" - log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/watch" - - appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" - metricsUtils "github.com/argoproj/argo-cd/v2/event_reporter/metrics/utils" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" - appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "context" + "encoding/json" + "fmt" + "github.com/argoproj/argo-cd/v2/pkg/sources_server_client" + "math" + "reflect" + "strings" + "time" + + "github.com/argoproj/argo-cd/v2/util/db" + + "github.com/argoproj/argo-cd/v2/event_reporter/utils" + + argoutils "github.com/argoproj/argo-cd/v2/util/argo" + + "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + + argocommon "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/pkg/codefresh" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + "github.com/argoproj/argo-cd/v2/util/env" + + "github.com/argoproj/gitops-engine/pkg/health" + "github.com/argoproj/gitops-engine/pkg/utils/kube" + "github.com/argoproj/gitops-engine/pkg/utils/text" + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/watch" + + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" + metricsUtils "github.com/argoproj/argo-cd/v2/event_reporter/metrics/utils" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/application" + appv1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" ) var resourceEventCacheExpiration = time.Minute * time.Duration(env.ParseNumFromEnv(argocommon.EnvResourceEventCacheDuration, 20, 0, math.MaxInt32)) type applicationEventReporter struct { - cache *servercache.Cache - codefreshClient codefresh.CodefreshClientInterface - appLister applisters.ApplicationLister - applicationServiceClient appclient.ApplicationClient - metricsServer *metrics.MetricsServer - db db.ArgoDB - runtimeVersion string + cache *servercache.Cache + codefreshClient codefresh.CodefreshClientInterface + appLister applisters.ApplicationLister + applicationServiceClient appclient.ApplicationClient + metricsServer *metrics.MetricsServer + db db.ArgoDB + runtimeVersion string + useSourcesServer bool + sourcesServerClient sources_server_client.SourceServerClientInteface } type ApplicationEventReporter interface { - StreamApplicationEvents( - ctx context.Context, - a *appv1.Application, - eventProcessingStartedAt string, - ignoreResourceCache bool, - argoTrackingMetadata *ArgoTrackingMetadata, - ) error - ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) + StreamApplicationEvents( + ctx context.Context, + a *appv1.Application, + eventProcessingStartedAt string, + ignoreResourceCache bool, + argoTrackingMetadata *ArgoTrackingMetadata, + ) error + ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) } -func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB) ApplicationEventReporter { - return &applicationEventReporter{ - cache: cache, - applicationServiceClient: applicationServiceClient, - codefreshClient: codefresh.NewCodefreshClient(codefreshConfig), - appLister: appLister, - metricsServer: metricsServer, - db: db, - runtimeVersion: codefreshConfig.RuntimeVersion, - } +func NewApplicationEventReporter(cache *servercache.Cache, applicationServiceClient appclient.ApplicationClient, appLister applisters.ApplicationLister, codefreshConfig *codefresh.CodefreshConfig, metricsServer *metrics.MetricsServer, db db.ArgoDB, useSourcesServer bool, sourcesServerConfig *sources_server_client.SourcesServerConfig) ApplicationEventReporter { + return &applicationEventReporter{ + cache: cache, + applicationServiceClient: applicationServiceClient, + codefreshClient: codefresh.NewCodefreshClient(codefreshConfig), + appLister: appLister, + metricsServer: metricsServer, + db: db, + runtimeVersion: codefreshConfig.RuntimeVersion, + useSourcesServer: useSourcesServer, + sourcesServerClient: sources_server_client.NewSourceServerClient(sourcesServerConfig), + } } func (s *applicationEventReporter) shouldSendResourceEvent(a *appv1.Application, rs appv1.ResourceStatus) bool { - logCtx := utils.LogWithResourceStatus(log.WithFields(log.Fields{ - "app": a.Name, - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }), rs) - - cachedRes, err := s.cache.GetLastResourceEvent(a, rs, utils.GetApplicationLatestRevision(a)) - if err != nil { - logCtx.Debug("resource not in cache") - return true - } - - if reflect.DeepEqual(&cachedRes, &rs) { - logCtx.Debug("resource status not changed") - - // status not changed - return false - } - - logCtx.Info("resource status changed") - return true + logCtx := utils.LogWithResourceStatus(log.WithFields(log.Fields{ + "app": a.Name, + "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), + "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), + }), rs) + + cachedRes, err := s.cache.GetLastResourceEvent(a, rs, utils.GetApplicationLatestRevision(a)) + if err != nil { + logCtx.Debug("resource not in cache") + return true + } + + if reflect.DeepEqual(&cachedRes, &rs) { + logCtx.Debug("resource status not changed") + + // status not changed + return false + } + + logCtx.Info("resource status changed") + return true } func (r *applicationEventReporter) getDesiredManifests( - ctx context.Context, - logCtx *log.Entry, - a *appv1.Application, - revision *string, - sourcePositions *[]int64, - revisions *[]string, + ctx context.Context, + logCtx *log.Entry, + a *appv1.Application, + revision *string, + sourcePositions *[]int64, + revisions *[]string, ) (*apiclient.ManifestResponse, bool) { - // get the desired state manifests of the application - project := a.Spec.GetProject() - query := application.ApplicationManifestQuery{ - Name: &a.Name, - AppNamespace: &a.Namespace, - Revision: revision, - Project: &project, - } - if sourcePositions != nil && query.Revisions != nil { - query.SourcePositions = *sourcePositions - query.Revisions = *revisions - } - - desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &query) - if err != nil { - // if it's manifest generation error we need to still report the actual state - // of the resources, but since we can't get the desired state, we will report - // each resource with empty desired state - logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only") - desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}} - return desiredManifests, true // will ignore requiresPruning=true to not delete resources with actual state - } - return desiredManifests, false + // get the desired state manifests of the application + project := a.Spec.GetProject() + query := application.ApplicationManifestQuery{ + Name: &a.Name, + AppNamespace: &a.Namespace, + Revision: revision, + Project: &project, + } + if sourcePositions != nil && query.Revisions != nil { + query.SourcePositions = *sourcePositions + query.Revisions = *revisions + } + + desiredManifests, err := r.applicationServiceClient.GetManifests(ctx, &query) + if err != nil { + // if it's manifest generation error we need to still report the actual state + // of the resources, but since we can't get the desired state, we will report + // each resource with empty desired state + logCtx.WithError(err).Warn("failed to get application desired state manifests, reporting actual state only") + desiredManifests = &apiclient.ManifestResponse{Manifests: []*apiclient.Manifest{}} + return desiredManifests, true // will ignore requiresPruning=true to not delete resources with actual state + } + return desiredManifests, false } func (s *applicationEventReporter) StreamApplicationEvents( - ctx context.Context, - a *appv1.Application, - eventProcessingStartedAt string, - ignoreResourceCache bool, - argoTrackingMetadata *ArgoTrackingMetadata, + ctx context.Context, + a *appv1.Application, + eventProcessingStartedAt string, + ignoreResourceCache bool, + argoTrackingMetadata *ArgoTrackingMetadata, ) error { - metricTimer := metricsUtils.NewMetricTimer() - - logCtx := log.WithField("app", a.Name) - logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") - - project := a.Spec.GetProject() - appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{ - ApplicationName: &a.Name, - Project: &project, - AppNamespace: &a.Namespace, - }) - if err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to get application tree: %w", err) - } - - // we still need process app even without tree, it is in case of app yaml originally contain error, - // we still want to show it the errors that related to it on codefresh ui - logCtx.WithError(err).Warn("failed to get application tree, resuming") - } - - logCtx.Info("getting desired manifests") - - desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, a, nil, nil, nil) - - applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx) - - logCtx.Info("getting parent application name") - - parentAppIdentity := utils.GetParentAppIdentity(a, *argoTrackingMetadata.AppInstanceLabelKey, *argoTrackingMetadata.TrackingMethod) - - if utils.IsChildApp(parentAppIdentity) { - logCtx.Info("processing as child application") - parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{ - Name: &parentAppIdentity.Name, - AppNamespace: &parentAppIdentity.Namespace, - }) - if err != nil { - return fmt.Errorf("failed to get parent application entity: %w", err) - } - - rs := utils.GetAppAsResource(a) - utils.SetHealthStatusIfMissing(rs) - - parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, parentApplicationEntity, nil, nil, nil) - - parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity) - if err != nil { - logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming") - } - - validatedDestination := parentApplicationEntity.Spec.Destination.DeepCopy() - _ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing - - err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{ - app: parentApplicationEntity, - appTree: appTree, - revisionsMetadata: parentAppSyncRevisionsMetadata, - validatedDestination: validatedDestination, - }, argoTrackingMetadata) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name) - return err - } - s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration()) - } else { - // will get here only for root applications (not managed as a resource by another application) - logCtx.Info("processing as root application") - appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata, s.runtimeVersion) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name) - return fmt.Errorf("failed to get application event: %w", err) - } - - if appEvent == nil { - // event did not have an OperationState - skip all events - return nil - } - - utils.LogWithAppStatus(a, logCtx, eventProcessingStartedAt).Info("sending root application event") - if err := s.codefreshClient.SendEvent(ctx, a.Name, appEvent); err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name) - return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err) - } - s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, metricTimer.Duration()) - } - - validatedDestination := a.Spec.Destination.DeepCopy() - _ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing - - revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logCtx, a) - // for each resource in the application get desired and actual state, - // then stream the event - for _, rs := range a.Status.Resources { - if utils.IsApp(rs) { - continue - } - utils.SetHealthStatusIfMissing(&rs) - if !ignoreResourceCache && !s.shouldSendResourceEvent(a, rs) { - s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name) - continue - } - err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{ - app: a, - appTree: appTree, - revisionsMetadata: revisionsMetadata, - validatedDestination: validatedDestination, - }, argoTrackingMetadata) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name) - return err - } - } - return nil + metricTimer := metricsUtils.NewMetricTimer() + + logCtx := log.WithField("app", a.Name) + logCtx.WithField("ignoreResourceCache", ignoreResourceCache).Info("streaming application events") + + project := a.Spec.GetProject() + appTree, err := s.applicationServiceClient.ResourceTree(ctx, &application.ResourcesQuery{ + ApplicationName: &a.Name, + Project: &project, + AppNamespace: &a.Namespace, + }) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return fmt.Errorf("failed to get application tree: %w", err) + } + + // we still need process app even without tree, it is in case of app yaml originally contain error, + // we still want to show it the errors that related to it on codefresh ui + logCtx.WithError(err).Warn("failed to get application tree, resuming") + } + + logCtx.Info("getting desired manifests") + + desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, a, nil, nil, nil) + + applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx) + + logCtx.Info("getting parent application name") + + parentAppIdentity := utils.GetParentAppIdentity(a, *argoTrackingMetadata.AppInstanceLabelKey, *argoTrackingMetadata.TrackingMethod) + + if utils.IsChildApp(parentAppIdentity) { + logCtx.Info("processing as child application") + parentApplicationEntity, err := s.applicationServiceClient.Get(ctx, &application.ApplicationQuery{ + Name: &parentAppIdentity.Name, + AppNamespace: &parentAppIdentity.Namespace, + }) + if err != nil { + return fmt.Errorf("failed to get parent application entity: %w", err) + } + + rs := utils.GetAppAsResource(a) + utils.SetHealthStatusIfMissing(rs) + + parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, logCtx, parentApplicationEntity, nil, nil, nil) + + parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity) + if err != nil { + logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming") + } + + validatedDestination := parentApplicationEntity.Spec.Destination.DeepCopy() + _ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing + + err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{ + app: parentApplicationEntity, + appTree: appTree, + revisionsMetadata: parentAppSyncRevisionsMetadata, + validatedDestination: validatedDestination, + }, argoTrackingMetadata) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name) + return err + } + s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration()) + } else { + // will get here only for root applications (not managed as a resource by another application) + logCtx.Info("processing as root application") + appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata, s.runtimeVersion) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name) + return fmt.Errorf("failed to get application event: %w", err) + } + + if appEvent == nil { + // event did not have an OperationState - skip all events + return nil + } + + utils.LogWithAppStatus(a, logCtx, eventProcessingStartedAt).Info("sending root application event") + if err := s.codefreshClient.SendEvent(ctx, a.Name, appEvent); err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventDeliveryErrorType, a.Name) + return fmt.Errorf("failed to send event for root application %s/%s: %w", a.Namespace, a.Name, err) + } + s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricParentAppEventType, metricTimer.Duration()) + } + + validatedDestination := a.Spec.Destination.DeepCopy() + _ = argoutils.ValidateDestination(ctx, validatedDestination, s.db) // resolves server field if missing + + revisionsMetadata, _ := s.getApplicationRevisionsMetadata(ctx, logCtx, a) + // for each resource in the application get desired and actual state, + // then stream the event + for _, rs := range a.Status.Resources { + if utils.IsApp(rs) { + continue + } + utils.SetHealthStatusIfMissing(&rs) + if !ignoreResourceCache && !s.shouldSendResourceEvent(a, rs) { + s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name) + continue + } + err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{ + app: a, + appTree: appTree, + revisionsMetadata: revisionsMetadata, + validatedDestination: validatedDestination, + }, argoTrackingMetadata) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name) + return err + } + } + return nil } // returns appVersion from first non-ref source for multisourced apps func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Context, a *appv1.Application, logCtx *log.Entry) *apiclient.ApplicationVersions { - if a.Spec.HasMultipleSources() { - syncResultRevisions := utils.GetOperationSyncResultRevisions(a) - if syncResultRevisions == nil { - return nil - } - - var sourcePositions []int64 - for i := 0; i < len(*syncResultRevisions); i++ { - sourcePositions = append(sourcePositions, int64(i+1)) - } - - syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, nil, &sourcePositions, syncResultRevisions) - return syncManifests.GetApplicationVersions() - } - - syncResultRevision := utils.GetOperationSyncResultRevision(a) - - if syncResultRevision == nil { - return nil - } - - syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, syncResultRevision, nil, nil) - return syncManifests.GetApplicationVersions() + if a.Spec.HasMultipleSources() { + syncResultRevisions := utils.GetOperationSyncResultRevisions(a) + if syncResultRevisions == nil { + return nil + } + + var sourcePositions []int64 + for i := 0; i < len(*syncResultRevisions); i++ { + sourcePositions = append(sourcePositions, int64(i+1)) + } + + syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, nil, &sourcePositions, syncResultRevisions) + + var applicationVersions *apiclient.ApplicationVersions + if s.useSourcesServer { + appVers := s.sourcesServerClient.GetAppVersion(a) + applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx) + } else { + applicationVersions = syncManifests.GetApplicationVersions() + } + + return applicationVersions + } + + syncResultRevision := utils.GetOperationSyncResultRevision(a) + + if syncResultRevision == nil { + return nil + } + + syncManifests, _ := s.getDesiredManifests(ctx, logCtx, a, syncResultRevision, nil, nil) + + var applicationVersions *apiclient.ApplicationVersions + if s.useSourcesServer { + appVers := s.sourcesServerClient.GetAppVersion(a) + applicationVersions = utils.SourcesAppVersionsToRepo(appVers, logCtx) + } else { + applicationVersions = syncManifests.GetApplicationVersions() + } + return applicationVersions } func (s *applicationEventReporter) getAppForResourceReporting( - rs appv1.ResourceStatus, - ctx context.Context, - logCtx *log.Entry, - a *appv1.Application, - syncRevisionsMetadata *utils.AppSyncRevisionsMetadata, + rs appv1.ResourceStatus, + ctx context.Context, + logCtx *log.Entry, + a *appv1.Application, + syncRevisionsMetadata *utils.AppSyncRevisionsMetadata, ) (*appv1.Application, *utils.AppSyncRevisionsMetadata) { - if rs.Kind != "Rollout" { // for rollout it's crucial to report always correct operationSyncRevision - return a, syncRevisionsMetadata - } + if rs.Kind != "Rollout" { // for rollout it's crucial to report always correct operationSyncRevision + return a, syncRevisionsMetadata + } - latestAppStatus, err := s.appLister.Applications(a.Namespace).Get(a.Name) - if err != nil { - return a, syncRevisionsMetadata - } + latestAppStatus, err := s.appLister.Applications(a.Namespace).Get(a.Name) + if err != nil { + return a, syncRevisionsMetadata + } - revisionMetadataToReport, err := s.getApplicationRevisionsMetadata(ctx, logCtx, latestAppStatus) - if err != nil { - return a, syncRevisionsMetadata - } + revisionMetadataToReport, err := s.getApplicationRevisionsMetadata(ctx, logCtx, latestAppStatus) + if err != nil { + return a, syncRevisionsMetadata + } - return latestAppStatus, revisionMetadataToReport + return latestAppStatus, revisionMetadataToReport } func (s *applicationEventReporter) processResource( - ctx context.Context, - rs appv1.ResourceStatus, - logCtx *log.Entry, - appEventProcessingStartedAt string, - desiredManifests *apiclient.ManifestResponse, - manifestGenErr bool, - originalApplication *appv1.Application, // passed onlu if resource is app - applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app - reportedEntityParentApp *ReportedEntityParentApp, - argoTrackingMetadata *ArgoTrackingMetadata, + ctx context.Context, + rs appv1.ResourceStatus, + logCtx *log.Entry, + appEventProcessingStartedAt string, + desiredManifests *apiclient.ManifestResponse, + manifestGenErr bool, + originalApplication *appv1.Application, // passed only if resource is app + applicationVersions *apiclient.ApplicationVersions, // passed only if resource is app + reportedEntityParentApp *ReportedEntityParentApp, + argoTrackingMetadata *ArgoTrackingMetadata, ) error { - metricsEventType := metrics.MetricResourceEventType - if utils.IsApp(rs) { - metricsEventType = metrics.MetricChildAppEventType - } - - logCtx = logCtx.WithFields(log.Fields{ - "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), - "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), - }) - - // get resource desired state - desiredState, appSourceIdx := getResourceDesiredState(&rs, desiredManifests, logCtx) - - actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication) - if err != nil { - return err - } - if actualState == nil { - return nil - } - - parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, reportedEntityParentApp.app, reportedEntityParentApp.revisionsMetadata) - - var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil - - if originalApplication != nil { - originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication) - } - - ev, err := getResourceEventPayload( - appEventProcessingStartedAt, - &ReportedResource{ - rs: &rs, - actualState: actualState, - desiredState: desiredState, - manifestGenErr: manifestGenErr, - appSourceIdx: appSourceIdx, - rsAsAppInfo: &ReportedResourceAsApp{ - app: originalApplication, - revisionsMetadata: originalAppRevisionMetadata, - applicationVersions: applicationVersions, - }, - }, - &ReportedEntityParentApp{ - app: parentApplicationToReport, - appTree: reportedEntityParentApp.appTree, - revisionsMetadata: revisionMetadataToReport, - validatedDestination: reportedEntityParentApp.validatedDestination, - desiredManifests: reportedEntityParentApp.desiredManifests, - }, - argoTrackingMetadata, - s.runtimeVersion, - ) - if err != nil { - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name) - logCtx.WithError(err).Warn("failed to get event payload, resuming") - return nil - } - - appRes := appv1.Application{} - appName := "" - if utils.IsApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil { - utils.LogWithAppStatus(&appRes, logCtx, appEventProcessingStartedAt).Info("streaming resource event") - appName = appRes.Name - } else { - utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event") - appName = reportedEntityParentApp.app.Name - } - - if err := s.codefreshClient.SendEvent(ctx, appName, ev); err != nil { - if strings.Contains(err.Error(), "context deadline exceeded") { - return fmt.Errorf("failed to send resource event: %w", err) - } - - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventDeliveryErrorType, appName) - logCtx.WithError(err).Warn("failed to send resource event, resuming") - return nil - } - - if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, utils.GetApplicationLatestRevision(parentApplicationToReport)); err != nil { - logCtx.WithError(err).Warn("failed to cache resource event") - } - - return nil + metricsEventType := metrics.MetricResourceEventType + if utils.IsApp(rs) { + metricsEventType = metrics.MetricChildAppEventType + } + + logCtx = logCtx.WithFields(log.Fields{ + "gvk": fmt.Sprintf("%s/%s/%s", rs.Group, rs.Version, rs.Kind), + "resource": fmt.Sprintf("%s/%s", rs.Namespace, rs.Name), + }) + + // get resource desired state + desiredState, appSourceIdx := getResourceDesiredState(&rs, desiredManifests, logCtx) + + actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication) + if err != nil { + return err + } + if actualState == nil { + return nil + } + + parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, reportedEntityParentApp.app, reportedEntityParentApp.revisionsMetadata) + + var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil + + if originalApplication != nil { + originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication) + } + + ev, err := getResourceEventPayload( + appEventProcessingStartedAt, + &ReportedResource{ + rs: &rs, + actualState: actualState, + desiredState: desiredState, + manifestGenErr: manifestGenErr, + appSourceIdx: appSourceIdx, + rsAsAppInfo: &ReportedResourceAsApp{ + app: originalApplication, + revisionsMetadata: originalAppRevisionMetadata, + applicationVersions: applicationVersions, + }, + }, + &ReportedEntityParentApp{ + app: parentApplicationToReport, + appTree: reportedEntityParentApp.appTree, + revisionsMetadata: revisionMetadataToReport, + validatedDestination: reportedEntityParentApp.validatedDestination, + desiredManifests: reportedEntityParentApp.desiredManifests, + }, + argoTrackingMetadata, + s.runtimeVersion, + ) + if err != nil { + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name) + logCtx.WithError(err).Warn("failed to get event payload, resuming") + return nil + } + + appRes := appv1.Application{} + appName := "" + if utils.IsApp(rs) && actualState.Manifest != nil && json.Unmarshal([]byte(*actualState.Manifest), &appRes) == nil { + utils.LogWithAppStatus(&appRes, logCtx, appEventProcessingStartedAt).Info("streaming resource event") + appName = appRes.Name + } else { + utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event") + appName = reportedEntityParentApp.app.Name + } + + if err := s.codefreshClient.SendEvent(ctx, appName, ev); err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") { + return fmt.Errorf("failed to send resource event: %w", err) + } + + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventDeliveryErrorType, appName) + logCtx.WithError(err).Warn("failed to send resource event, resuming") + return nil + } + + if err := s.cache.SetLastResourceEvent(parentApplicationToReport, rs, resourceEventCacheExpiration, utils.GetApplicationLatestRevision(parentApplicationToReport)); err != nil { + logCtx.WithError(err).Warn("failed to cache resource event") + } + + return nil } func (s *applicationEventReporter) getResourceActualState(ctx context.Context, logCtx *log.Entry, metricsEventType metrics.MetricEventType, rs appv1.ResourceStatus, parentApplication *appv1.Application, childApplication *appv1.Application) (*application.ApplicationResourceResponse, error) { - if utils.IsApp(rs) { - if childApplication.IsEmptyTypeMeta() { - // make sure there is type meta on object - childApplication.SetDefaultTypeMeta() - } - - manifestBytes, err := json.Marshal(childApplication) - - if err == nil && len(manifestBytes) > 0 { - manifest := string(manifestBytes) - return &application.ApplicationResourceResponse{Manifest: &manifest}, nil - } - } - - // get resource actual state - project := parentApplication.Spec.GetProject() - - actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{ - Name: &parentApplication.Name, - AppNamespace: &parentApplication.Namespace, - Namespace: &rs.Namespace, - ResourceName: &rs.Name, - Version: &rs.Version, - Group: &rs.Group, - Kind: &rs.Kind, - Project: &project, - }) - if err != nil { - if !strings.Contains(err.Error(), "not found") { - // only return error if there is no point in trying to send the - // next resource. For example if the shared context has exceeded - // its deadline - if strings.Contains(err.Error(), "context deadline exceeded") { - return nil, fmt.Errorf("failed to get actual state: %w", err) - } - - s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventUnknownErrorType, parentApplication.Name) - logCtx.WithError(err).Warn("failed to get actual state, resuming") - return nil, nil - } - - manifest := "" - // empty actual state - actualState = &application.ApplicationResourceResponse{Manifest: &manifest} - } - - return actualState, nil + if utils.IsApp(rs) { + if childApplication.IsEmptyTypeMeta() { + // make sure there is type meta on object + childApplication.SetDefaultTypeMeta() + } + + manifestBytes, err := json.Marshal(childApplication) + + if err == nil && len(manifestBytes) > 0 { + manifest := string(manifestBytes) + return &application.ApplicationResourceResponse{Manifest: &manifest}, nil + } + } + + // get resource actual state + project := parentApplication.Spec.GetProject() + + actualState, err := s.applicationServiceClient.GetResource(ctx, &application.ApplicationResourceRequest{ + Name: &parentApplication.Name, + AppNamespace: &parentApplication.Namespace, + Namespace: &rs.Namespace, + ResourceName: &rs.Name, + Version: &rs.Version, + Group: &rs.Group, + Kind: &rs.Kind, + Project: &project, + }) + if err != nil { + if !strings.Contains(err.Error(), "not found") { + // only return error if there is no point in trying to send the + // next resource. For example if the shared context has exceeded + // its deadline + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil, fmt.Errorf("failed to get actual state: %w", err) + } + + s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventUnknownErrorType, parentApplication.Name) + logCtx.WithError(err).Warn("failed to get actual state, resuming") + return nil, nil + } + + manifest := "" + // empty actual state + actualState = &application.ApplicationResourceResponse{Manifest: &manifest} + } + + return actualState, nil } func (s *applicationEventReporter) ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool) { - logCtx := log.WithField("app", ae.Application.Name) - - if ae.Type == watch.Deleted { - logCtx.Info("application deleted") - return true, false - } - - cachedApp, err := s.cache.GetLastApplicationEvent(&ae.Application) - if err != nil || cachedApp == nil { - return true, false - } - - cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff - cachedApp.Spec.Project = ae.Application.Spec.Project // not using GetProject() so that the comparison will be with the real field values - for i := range cachedApp.Status.Conditions { - cachedApp.Status.Conditions[i].LastTransitionTime = nil - } - for i := range ae.Application.Status.Conditions { - ae.Application.Status.Conditions[i].LastTransitionTime = nil - } - - // check if application changed to healthy status - if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { - return true, true - } - - if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { - logCtx.Info("application spec changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { - logCtx.Info("application status changed") - return true, false - } - - if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { - logCtx.Info("application operation changed") - return true, false - } - - metadataChanged := applicationMetadataChanged(ae, cachedApp) - - if metadataChanged { - logCtx.Info("application metadata changed") - return true, false - } - - return false, false + logCtx := log.WithField("app", ae.Application.Name) + + if ae.Type == watch.Deleted { + logCtx.Info("application deleted") + return true, false + } + + cachedApp, err := s.cache.GetLastApplicationEvent(&ae.Application) + if err != nil || cachedApp == nil { + return true, false + } + + cachedApp.Status.ReconciledAt = ae.Application.Status.ReconciledAt // ignore those in the diff + cachedApp.Spec.Project = ae.Application.Spec.Project // not using GetProject() so that the comparison will be with the real field values + for i := range cachedApp.Status.Conditions { + cachedApp.Status.Conditions[i].LastTransitionTime = nil + } + for i := range ae.Application.Status.Conditions { + ae.Application.Status.Conditions[i].LastTransitionTime = nil + } + + // check if application changed to healthy status + if ae.Application.Status.Health.Status == health.HealthStatusHealthy && cachedApp.Status.Health.Status != health.HealthStatusHealthy { + return true, true + } + + if !reflect.DeepEqual(ae.Application.Spec, cachedApp.Spec) { + logCtx.Info("application spec changed") + return true, false + } + + if !reflect.DeepEqual(ae.Application.Status, cachedApp.Status) { + logCtx.Info("application status changed") + return true, false + } + + if !reflect.DeepEqual(ae.Application.Operation, cachedApp.Operation) { + logCtx.Info("application operation changed") + return true, false + } + + metadataChanged := applicationMetadataChanged(ae, cachedApp) + + if metadataChanged { + logCtx.Info("application metadata changed") + return true, false + } + + return false, false } func applicationMetadataChanged(ae *appv1.ApplicationWatchEvent, cachedApp *appv1.Application) (changed bool) { - if ae.Type != watch.Modified { - return false - } + if ae.Type != watch.Modified { + return false + } - cachedAppMeta := cachedApp.ObjectMeta.DeepCopy() - newEventAppMeta := ae.Application.ObjectMeta.DeepCopy() + cachedAppMeta := cachedApp.ObjectMeta.DeepCopy() + newEventAppMeta := ae.Application.ObjectMeta.DeepCopy() - if newEventAppMeta.Annotations != nil { - delete(newEventAppMeta.Annotations, "kubectl.kubernetes.io/last-applied-configuration") - delete(cachedAppMeta.Annotations, "kubectl.kubernetes.io/last-applied-configuration") - } + if newEventAppMeta.Annotations != nil { + delete(newEventAppMeta.Annotations, "kubectl.kubernetes.io/last-applied-configuration") + delete(cachedAppMeta.Annotations, "kubectl.kubernetes.io/last-applied-configuration") + } - cachedAppMeta.ResourceVersion = newEventAppMeta.ResourceVersion // ignore those in the diff - cachedAppMeta.Generation = newEventAppMeta.Generation // ignore those in the diff - cachedAppMeta.GenerateName = newEventAppMeta.GenerateName // ignore those in the diff - newEventAppMeta.ManagedFields = nil // ignore those in the diff - cachedAppMeta.ManagedFields = nil // ignore those in the diff + cachedAppMeta.ResourceVersion = newEventAppMeta.ResourceVersion // ignore those in the diff + cachedAppMeta.Generation = newEventAppMeta.Generation // ignore those in the diff + cachedAppMeta.GenerateName = newEventAppMeta.GenerateName // ignore those in the diff + newEventAppMeta.ManagedFields = nil // ignore those in the diff + cachedAppMeta.ManagedFields = nil // ignore those in the diff - return !reflect.DeepEqual(newEventAppMeta, cachedAppMeta) + return !reflect.DeepEqual(newEventAppMeta, cachedAppMeta) } func getResourceDesiredState(rs *appv1.ResourceStatus, ds *apiclient.ManifestResponse, logger *log.Entry) (manifest *apiclient.Manifest, sourceIdx int32) { - if ds == nil { - return &apiclient.Manifest{}, 0 - } - for idx, m := range ds.Manifests { - u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest) - if err != nil { - logger.WithError(err).Warnf("failed to unmarshal compiled manifest") - continue - } - - if u == nil { - continue - } - - ns := text.FirstNonEmpty(u.GetNamespace(), rs.Namespace) - - if u.GroupVersionKind().String() == rs.GroupVersionKind().String() && - u.GetName() == rs.Name && - ns == rs.Namespace { - if rs.Kind == kube.SecretKind && rs.Version == "v1" { - m.RawManifest = m.CompiledManifest - } - - return m, getResourceSourceIdxFromManifestResponse(idx, ds) - } - } - - // no desired state for resource - // it's probably deleted from git - return &apiclient.Manifest{}, 0 + if ds == nil { + return &apiclient.Manifest{}, 0 + } + for idx, m := range ds.Manifests { + u, err := appv1.UnmarshalToUnstructured(m.CompiledManifest) + if err != nil { + logger.WithError(err).Warnf("failed to unmarshal compiled manifest") + continue + } + + if u == nil { + continue + } + + ns := text.FirstNonEmpty(u.GetNamespace(), rs.Namespace) + + if u.GroupVersionKind().String() == rs.GroupVersionKind().String() && + u.GetName() == rs.Name && + ns == rs.Namespace { + if rs.Kind == kube.SecretKind && rs.Version == "v1" { + m.RawManifest = m.CompiledManifest + } + + return m, getResourceSourceIdxFromManifestResponse(idx, ds) + } + } + + // no desired state for resource + // it's probably deleted from git + return &apiclient.Manifest{}, 0 } func getResourceSourceIdxFromManifestResponse(rsIdx int, ds *apiclient.ManifestResponse) int32 { - if ds.SourcesManifestsStartingIdx == nil { - return -1 - } + if ds.SourcesManifestsStartingIdx == nil { + return -1 + } - sourceIdx := int32(-1) + sourceIdx := int32(-1) - for currentSourceIdx, sourceStartingIdx := range ds.SourcesManifestsStartingIdx { - if int32(rsIdx) >= sourceStartingIdx { - sourceIdx = int32(currentSourceIdx) - } - } + for currentSourceIdx, sourceStartingIdx := range ds.SourcesManifestsStartingIdx { + if int32(rsIdx) >= sourceStartingIdx { + sourceIdx = int32(currentSourceIdx) + } + } - return sourceIdx + return sourceIdx } diff --git a/event_reporter/server.go b/event_reporter/server.go index 6c23dcf099d44..2cd7b9c364a5b 100644 --- a/event_reporter/server.go +++ b/event_reporter/server.go @@ -1,219 +1,222 @@ package event_reporter import ( - "context" - "crypto/tls" - "fmt" - "net" - "net/http" - "os" - "strings" - "time" - - appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" - "github.com/argoproj/argo-cd/v2/event_reporter/reporter" - - "github.com/redis/go-redis/v9" - log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - "github.com/argoproj/argo-cd/v2/common" - event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller" - "github.com/argoproj/argo-cd/v2/event_reporter/handlers" - "github.com/argoproj/argo-cd/v2/event_reporter/metrics" - appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" - applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" - codefresh "github.com/argoproj/argo-cd/v2/pkg/codefresh" - repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" - servercache "github.com/argoproj/argo-cd/v2/server/cache" - "github.com/argoproj/argo-cd/v2/server/rbacpolicy" - "github.com/argoproj/argo-cd/v2/server/repository" - "github.com/argoproj/argo-cd/v2/util/assets" - "github.com/argoproj/argo-cd/v2/util/db" - errorsutil "github.com/argoproj/argo-cd/v2/util/errors" - "github.com/argoproj/argo-cd/v2/util/healthz" - "github.com/argoproj/argo-cd/v2/util/io" - "github.com/argoproj/argo-cd/v2/util/rbac" - settings_util "github.com/argoproj/argo-cd/v2/util/settings" + "context" + "crypto/tls" + "fmt" + "github.com/argoproj/argo-cd/v2/pkg/sources_server_client" + "net" + "net/http" + "os" + "strings" + "time" + + appclient "github.com/argoproj/argo-cd/v2/event_reporter/application" + "github.com/argoproj/argo-cd/v2/event_reporter/reporter" + + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/argoproj/argo-cd/v2/common" + event_reporter "github.com/argoproj/argo-cd/v2/event_reporter/controller" + "github.com/argoproj/argo-cd/v2/event_reporter/handlers" + "github.com/argoproj/argo-cd/v2/event_reporter/metrics" + appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" + appinformer "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions" + applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + codefresh "github.com/argoproj/argo-cd/v2/pkg/codefresh" + repoapiclient "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + servercache "github.com/argoproj/argo-cd/v2/server/cache" + "github.com/argoproj/argo-cd/v2/server/rbacpolicy" + "github.com/argoproj/argo-cd/v2/server/repository" + "github.com/argoproj/argo-cd/v2/util/assets" + "github.com/argoproj/argo-cd/v2/util/db" + errorsutil "github.com/argoproj/argo-cd/v2/util/errors" + "github.com/argoproj/argo-cd/v2/util/healthz" + "github.com/argoproj/argo-cd/v2/util/io" + "github.com/argoproj/argo-cd/v2/util/rbac" + settings_util "github.com/argoproj/argo-cd/v2/util/settings" ) const ( - // catches corrupted informer state; see https://github.com/argoproj/argo-cd/issues/4960 for more information - notObjectErrMsg = "object does not implement the Object interfaces" + // catches corrupted informer state; see https://github.com/argoproj/argo-cd/issues/4960 for more information + notObjectErrMsg = "object does not implement the Object interfaces" ) var backoff = wait.Backoff{ - Steps: 5, - Duration: 500 * time.Millisecond, - Factor: 1.0, - Jitter: 0.1, + Steps: 5, + Duration: 500 * time.Millisecond, + Factor: 1.0, + Jitter: 0.1, } type EventReporterServer struct { - EventReporterServerOpts - - settings *settings_util.ArgoCDSettings - log *log.Entry - settingsMgr *settings_util.SettingsManager - enf *rbac.Enforcer - projInformer cache.SharedIndexInformer - projLister applisters.AppProjectNamespaceLister - policyEnforcer *rbacpolicy.RBACPolicyEnforcer - appInformer cache.SharedIndexInformer - appLister applisters.ApplicationLister - db db.ArgoDB - - // stopCh is the channel which when closed, will shutdown the Event Reporter server - stopCh chan struct{} - serviceSet *EventReporterServerSet - featureManager *reporter.FeatureManager + EventReporterServerOpts + + settings *settings_util.ArgoCDSettings + log *log.Entry + settingsMgr *settings_util.SettingsManager + enf *rbac.Enforcer + projInformer cache.SharedIndexInformer + projLister applisters.AppProjectNamespaceLister + policyEnforcer *rbacpolicy.RBACPolicyEnforcer + appInformer cache.SharedIndexInformer + appLister applisters.ApplicationLister + db db.ArgoDB + + // stopCh is the channel which when closed, will shutdown the Event Reporter server + stopCh chan struct{} + serviceSet *EventReporterServerSet + featureManager *reporter.FeatureManager } type EventReporterServerSet struct { - RepoService *repository.Server - MetricsServer *metrics.MetricsServer + RepoService *repository.Server + MetricsServer *metrics.MetricsServer } type EventReporterServerOpts struct { - ListenPort int - ListenHost string - MetricsPort int - MetricsHost string - Namespace string - KubeClientset kubernetes.Interface - AppClientset appclientset.Interface - RepoClientset repoapiclient.Clientset - ApplicationServiceClient appclient.ApplicationClient - Cache *servercache.Cache - RedisClient *redis.Client - ApplicationNamespaces []string - BaseHRef string - RootPath string - CodefreshConfig *codefresh.CodefreshConfig - RateLimiterOpts *reporter.RateLimiterOpts + ListenPort int + ListenHost string + MetricsPort int + MetricsHost string + Namespace string + KubeClientset kubernetes.Interface + AppClientset appclientset.Interface + RepoClientset repoapiclient.Clientset + ApplicationServiceClient appclient.ApplicationClient + Cache *servercache.Cache + RedisClient *redis.Client + ApplicationNamespaces []string + BaseHRef string + RootPath string + CodefreshConfig *codefresh.CodefreshConfig + RateLimiterOpts *reporter.RateLimiterOpts + UseSourcesServer bool + SourcesServerConfig *sources_server_client.SourcesServerConfig } type handlerSwitcher struct { - handler http.Handler - urlToHandler map[string]http.Handler - contentTypeToHandler map[string]http.Handler + handler http.Handler + urlToHandler map[string]http.Handler + contentTypeToHandler map[string]http.Handler } type Listeners struct { - Main net.Listener - Metrics net.Listener + Main net.Listener + Metrics net.Listener } func (l *Listeners) Close() error { - if l.Main != nil { - if err := l.Main.Close(); err != nil { - return err - } - l.Main = nil - } - if l.Metrics != nil { - if err := l.Metrics.Close(); err != nil { - return err - } - l.Metrics = nil - } - return nil + if l.Main != nil { + if err := l.Main.Close(); err != nil { + return err + } + l.Main = nil + } + if l.Metrics != nil { + if err := l.Metrics.Close(); err != nil { + return err + } + l.Metrics = nil + } + return nil } func (s *handlerSwitcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if urlHandler, ok := s.urlToHandler[r.URL.Path]; ok { - urlHandler.ServeHTTP(w, r) - } else if contentHandler, ok := s.contentTypeToHandler[r.Header.Get("content-type")]; ok { - contentHandler.ServeHTTP(w, r) - } else { - s.handler.ServeHTTP(w, r) - } + if urlHandler, ok := s.urlToHandler[r.URL.Path]; ok { + urlHandler.ServeHTTP(w, r) + } else if contentHandler, ok := s.contentTypeToHandler[r.Header.Get("content-type")]; ok { + contentHandler.ServeHTTP(w, r) + } else { + s.handler.ServeHTTP(w, r) + } } func (a *EventReporterServer) healthCheck(r *http.Request) error { - if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { - argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) - _, err := argoDB.ListClusters(r.Context()) - if err != nil && strings.Contains(err.Error(), notObjectErrMsg) { - return err - } - } - return nil + if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { + argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) + _, err := argoDB.ListClusters(r.Context()) + if err != nil && strings.Contains(err.Error(), notObjectErrMsg) { + return err + } + } + return nil } // Init starts informers used by the API server func (a *EventReporterServer) Init(ctx context.Context) { - go a.appInformer.Run(ctx.Done()) - svcSet := newEventReporterServiceSet(a) - a.serviceSet = svcSet + go a.appInformer.Run(ctx.Done()) + svcSet := newEventReporterServiceSet(a) + a.serviceSet = svcSet } func (a *EventReporterServer) RunController(ctx context.Context) { - controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db) - go controller.Run(ctx) + controller := event_reporter.NewEventReporterController(a.appInformer, a.Cache, a.settingsMgr, a.ApplicationServiceClient, a.appLister, a.CodefreshConfig, a.serviceSet.MetricsServer, a.featureManager, a.RateLimiterOpts, a.db, a.UseSourcesServer, a.SourcesServerConfig) + go controller.Run(ctx) } // newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented // using grpc-gateway as a proxy to the gRPC server. func (a *EventReporterServer) newHTTPServer(ctx context.Context, port int) *http.Server { - endpoint := fmt.Sprintf("localhost:%d", port) - mux := http.NewServeMux() - httpS := http.Server{ - Addr: endpoint, - Handler: &handlerSwitcher{ - handler: mux, - }, - } + endpoint := fmt.Sprintf("localhost:%d", port) + mux := http.NewServeMux() + httpS := http.Server{ + Addr: endpoint, + Handler: &handlerSwitcher{ + handler: mux, + }, + } - healthz.ServeHealthCheck(mux, a.healthCheck) + healthz.ServeHealthCheck(mux, a.healthCheck) - rH := handlers.GetRequestHandlers(a.ApplicationServiceClient) - mux.HandleFunc("/app-distribution", rH.GetAppDistribution) + rH := handlers.GetRequestHandlers(a.ApplicationServiceClient) + mux.HandleFunc("/app-distribution", rH.GetAppDistribution) - return &httpS + return &httpS } func (a *EventReporterServer) checkServeErr(name string, err error) { - if err != nil { - if a.stopCh == nil { - // a nil stopCh indicates a graceful shutdown - log.Infof("graceful shutdown %s: %v", name, err) - } else { - log.Fatalf("%s: %v", name, err) - } - } else { - log.Infof("graceful shutdown %s", name) - } + if err != nil { + if a.stopCh == nil { + // a nil stopCh indicates a graceful shutdown + log.Infof("graceful shutdown %s: %v", name, err) + } else { + log.Fatalf("%s: %v", name, err) + } + } else { + log.Infof("graceful shutdown %s", name) + } } func startListener(host string, port int) (net.Listener, error) { - var conn net.Listener - var realErr error - _ = wait.ExponentialBackoff(backoff, func() (bool, error) { - conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) - if realErr != nil { - return false, nil - } - return true, nil - }) - return conn, realErr + var conn net.Listener + var realErr error + _ = wait.ExponentialBackoff(backoff, func() (bool, error) { + conn, realErr = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if realErr != nil { + return false, nil + } + return true, nil + }) + return conn, realErr } func (a *EventReporterServer) Listen() (*Listeners, error) { - mainLn, err := startListener(a.ListenHost, a.ListenPort) - if err != nil { - return nil, err - } - metricsLn, err := startListener(a.MetricsHost, a.MetricsPort) - if err != nil { - io.Close(mainLn) - return nil, err - } - return &Listeners{Main: mainLn, Metrics: metricsLn}, nil + mainLn, err := startListener(a.ListenHost, a.ListenPort) + if err != nil { + return nil, err + } + metricsLn, err := startListener(a.MetricsHost, a.MetricsPort) + if err != nil { + io.Close(mainLn) + return nil, err + } + return &Listeners{Main: mainLn, Metrics: metricsLn}, nil } // Run runs the API Server @@ -221,82 +224,82 @@ func (a *EventReporterServer) Listen() (*Listeners, error) { // k8s.io/ go-to-protobuf uses protoc-gen-gogo, which comes from gogo/protobuf (a fork of // golang/protobuf). func (a *EventReporterServer) Run(ctx context.Context, lns *Listeners) { - httpS := a.newHTTPServer(ctx, a.ListenPort) - tlsConfig := tls.Config{} - tlsConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { - return a.settings.Certificate, nil - } - go func() { a.checkServeErr("httpS", httpS.Serve(lns.Main)) }() - go func() { a.checkServeErr("metrics", a.serviceSet.MetricsServer.Serve(lns.Metrics)) }() - go a.RunController(ctx) - - if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { - log.Fatal("Timed out waiting for project cache to sync") - } - - a.stopCh = make(chan struct{}) - <-a.stopCh + httpS := a.newHTTPServer(ctx, a.ListenPort) + tlsConfig := tls.Config{} + tlsConfig.GetCertificate = func(info *tls.ClientHelloInfo) (*tls.Certificate, error) { + return a.settings.Certificate, nil + } + go func() { a.checkServeErr("httpS", httpS.Serve(lns.Main)) }() + go func() { a.checkServeErr("metrics", a.serviceSet.MetricsServer.Serve(lns.Metrics)) }() + go a.RunController(ctx) + + if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { + log.Fatal("Timed out waiting for project cache to sync") + } + + a.stopCh = make(chan struct{}) + <-a.stopCh } // NewServer returns a new instance of the Event Reporter server func NewEventReporterServer(ctx context.Context, opts EventReporterServerOpts) *EventReporterServer { - settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace) - settings, err := settingsMgr.InitializeSettings(true) - errorsutil.CheckError(err) - - appInformerNs := opts.Namespace - if len(opts.ApplicationNamespaces) > 0 { - appInformerNs = "" - } - projFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(opts.Namespace), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - appFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(appInformerNs), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) - - projInformer := projFactory.Argoproj().V1alpha1().AppProjects().Informer() - projLister := projFactory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace) - - appInformer := appFactory.Argoproj().V1alpha1().Applications().Informer() - appLister := appFactory.Argoproj().V1alpha1().Applications().Lister() - - enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil) - enf.EnableEnforce(false) - err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV) - errorsutil.CheckError(err) - enf.EnableLog(os.Getenv(common.EnvVarRBACDebug) == "1") - - policyEnf := rbacpolicy.NewRBACPolicyEnforcer(enf, projLister) - enf.SetClaimsEnforcerFunc(policyEnf.EnforceClaims) - - dbInstance := db.NewDB(opts.Namespace, settingsMgr, opts.KubeClientset) - - server := &EventReporterServer{ - EventReporterServerOpts: opts, - log: log.NewEntry(log.StandardLogger()), - settings: settings, - settingsMgr: settingsMgr, - enf: enf, - projInformer: projInformer, - projLister: projLister, - appInformer: appInformer, - appLister: appLister, - policyEnforcer: policyEnf, - db: dbInstance, - featureManager: reporter.NewFeatureManager(settingsMgr), - } - - if err != nil { - // Just log. It's not critical. - log.Warnf("Failed to log in-cluster warnings: %v", err) - } - - return server + settingsMgr := settings_util.NewSettingsManager(ctx, opts.KubeClientset, opts.Namespace) + settings, err := settingsMgr.InitializeSettings(true) + errorsutil.CheckError(err) + + appInformerNs := opts.Namespace + if len(opts.ApplicationNamespaces) > 0 { + appInformerNs = "" + } + projFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(opts.Namespace), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) + appFactory := appinformer.NewSharedInformerFactoryWithOptions(opts.AppClientset, 0, appinformer.WithNamespace(appInformerNs), appinformer.WithTweakListOptions(func(options *metav1.ListOptions) {})) + + projInformer := projFactory.Argoproj().V1alpha1().AppProjects().Informer() + projLister := projFactory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace) + + appInformer := appFactory.Argoproj().V1alpha1().Applications().Informer() + appLister := appFactory.Argoproj().V1alpha1().Applications().Lister() + + enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil) + enf.EnableEnforce(false) + err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV) + errorsutil.CheckError(err) + enf.EnableLog(os.Getenv(common.EnvVarRBACDebug) == "1") + + policyEnf := rbacpolicy.NewRBACPolicyEnforcer(enf, projLister) + enf.SetClaimsEnforcerFunc(policyEnf.EnforceClaims) + + dbInstance := db.NewDB(opts.Namespace, settingsMgr, opts.KubeClientset) + + server := &EventReporterServer{ + EventReporterServerOpts: opts, + log: log.NewEntry(log.StandardLogger()), + settings: settings, + settingsMgr: settingsMgr, + enf: enf, + projInformer: projInformer, + projLister: projLister, + appInformer: appInformer, + appLister: appLister, + policyEnforcer: policyEnf, + db: dbInstance, + featureManager: reporter.NewFeatureManager(settingsMgr), + } + + if err != nil { + // Just log. It's not critical. + log.Warnf("Failed to log in-cluster warnings: %v", err) + } + + return server } func newEventReporterServiceSet(a *EventReporterServer) *EventReporterServerSet { - repoService := repository.NewServer(a.RepoClientset, a.db, a.enf, a.Cache, a.appLister, a.projInformer, a.Namespace, a.settingsMgr) - metricsServer := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort) + repoService := repository.NewServer(a.RepoClientset, a.db, a.enf, a.Cache, a.appLister, a.projInformer, a.Namespace, a.settingsMgr) + metricsServer := metrics.NewMetricsServer(a.MetricsHost, a.MetricsPort) - return &EventReporterServerSet{ - RepoService: repoService, - MetricsServer: metricsServer, - } + return &EventReporterServerSet{ + RepoService: repoService, + MetricsServer: metricsServer, + } } diff --git a/event_reporter/utils/app_version.go b/event_reporter/utils/app_version.go index b2396461cfefc..faadc76c310a1 100644 --- a/event_reporter/utils/app_version.go +++ b/event_reporter/utils/app_version.go @@ -1,18 +1,34 @@ package utils import ( - "encoding/json" + "encoding/json" + sourcesServerCommon "github.com/codefresh-io/octopus-argo/sources-server/common" + log "github.com/sirupsen/logrus" - "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" - "github.com/argoproj/argo-cd/v2/reposerver/apiclient" + "github.com/argoproj/argo-cd/v2/pkg/apiclient/events" + "github.com/argoproj/argo-cd/v2/reposerver/apiclient" ) func RepoAppVersionsToEvent(applicationVersions *apiclient.ApplicationVersions) (*events.ApplicationVersions, error) { - applicationVersionsEvents := &events.ApplicationVersions{} - applicationVersionsData, _ := json.Marshal(applicationVersions) - err := json.Unmarshal(applicationVersionsData, applicationVersionsEvents) - if err != nil { - return nil, err - } - return applicationVersionsEvents, nil + applicationVersionsEvents := &events.ApplicationVersions{} + applicationVersionsData, _ := json.Marshal(applicationVersions) + err := json.Unmarshal(applicationVersionsData, applicationVersionsEvents) + if err != nil { + return nil, err + } + return applicationVersionsEvents, nil +} + +func SourcesAppVersionsToRepo(applicationVersions *sourcesServerCommon.AppVersionResult, logCtx *log.Entry) *apiclient.ApplicationVersions { + if applicationVersions == nil { + return nil + } + applicationVersionsRepo := &apiclient.ApplicationVersions{} + applicationVersionsData, _ := json.Marshal(applicationVersions) + err := json.Unmarshal(applicationVersionsData, applicationVersionsRepo) + if err != nil { + logCtx.Errorf("can't unmarshal app version: %v", err) + return nil + } + return applicationVersionsRepo } diff --git a/go.mod b/go.mod index ce2b2c2816fc8..e0b3eb388b7f0 100644 --- a/go.mod +++ b/go.mod @@ -133,6 +133,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect + github.com/codefresh-io/octopus-argo v0.0.0-00010101000000-000000000000 // indirect github.com/davidmz/go-pageant v1.0.2 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/go-fed/httpsig v1.1.0 // indirect @@ -190,7 +191,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/dlclark/regexp2 v1.11.2 + github.com/dlclark/regexp2 v1.11.4 github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/evanphx/json-patch/v5 v5.8.0 // indirect @@ -348,3 +349,6 @@ replace ( k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.29.6 k8s.io/sample-controller => k8s.io/sample-controller v0.29.6 ) + +// TODO:CR-26144: Remove alias +replace github.com/codefresh-io/octopus-argo => /Users/andrii/go/src/github.com/codefresh-io/octopus-argo diff --git a/go.sum b/go.sum index b4557335ca692..8422b0f91a895 100644 --- a/go.sum +++ b/go.sum @@ -850,6 +850,8 @@ github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.11.2 h1:/u628IuisSTwri5/UKloiIsH8+qF2Pu7xEQX+yIKg68= github.com/dlclark/regexp2 v1.11.2/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo= +github.com/dlclark/regexp2 v1.11.4/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= diff --git a/pkg/sources_server_client/client.go b/pkg/sources_server_client/client.go new file mode 100644 index 0000000000000..6a423b41bd3bc --- /dev/null +++ b/pkg/sources_server_client/client.go @@ -0,0 +1,87 @@ +package sources_server_client + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + log "github.com/sirupsen/logrus" + "io" + "net/http" + + sourcesServerCommon "github.com/codefresh-io/octopus-argo/sources-server/common" +) + +type SourcesServerConfig struct { + BaseURL string +} + +type sourceServerClient struct { + clientConfig *SourcesServerConfig +} + +type SourceServerClientInteface interface { + GetAppVersion(app *v1alpha1.Application) *sourcesServerCommon.AppVersionResult +} + +func (c *sourceServerClient) sendRequest(method, url string, payload interface{}) ([]byte, error) { + var requestBody []byte + var err error + if payload != nil { + requestBody, err = json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("error marshalling payload: %v", err) + } + } + + req, err := http.NewRequest(method, fmt.Sprintf("%s%s", c.clientConfig.BaseURL, url), bytes.NewBuffer(requestBody)) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("server responded with status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + return body, nil +} + +func (c *sourceServerClient) GetAppVersion(app *v1alpha1.Application) *sourcesServerCommon.AppVersionResult { + appVersionResult, err := c.sendRequest("POST", "/getAppVersion", app) + if err != nil { + log.Errorf("error getting app version: %v", err) + return nil + } + + var versionStruct sourcesServerCommon.AppVersionResult + err = json.Unmarshal(appVersionResult, &versionStruct) + if err != nil { + log.Errorf("error unmarshaling app version: %v", err) + return nil + } + + // TODO: remove this marker line + versionStruct.AppVersion = versionStruct.AppVersion + "*" + return &versionStruct +} + +func NewSourceServerClient(clientConfig *SourcesServerConfig) SourceServerClientInteface { + return &sourceServerClient{ + clientConfig: clientConfig, + } +}