Skip to content

Commit

Permalink
Add support got packageServerSyncInterval
Browse files Browse the repository at this point in the history
The API was updated to include `packageServerSyncInterval`

In the packageserver, read the OLMConfig at init and set the
sourceProvider timeout from that (or default).

Watch the OLMConfig resource and if the sync interval changes,
restart the sourceProvider.

Signed-off-by: Todd Short <[email protected]>
  • Loading branch information
tmshort authored and stevekuznetsov committed Oct 2, 2023
1 parent a627c78 commit 25ed683
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 4 deletions.
4 changes: 4 additions & 0 deletions deploy/chart/crds/0000_50_olm_00-olmconfigs.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ spec:
disableCopiedCSVs:
description: DisableCopiedCSVs is used to disable OLM's "Copied CSV" feature for operators installed at the cluster scope, where a cluster scoped operator is one that has been installed in an OperatorGroup that targets all namespaces. When reenabled, OLM will recreate the "Copied CSVs" for each cluster scoped operator.
type: boolean
packageServerSyncInterval:
description: PackageServerSyncInterval is used to define the sync interval for packagerserver pods. Packageserver pods periodically check the status of CatalogSources; this specifies the period using duration format (e.g. "60m"). For this parameter, only hours ("h"), minutes ("m"), and seconds ("s") may be specified. When not specified, the period defaults to the value specified within the packageserver.
type: string
pattern: ^([0-9]+(\.[0-9]+)?(s|m|h))+$
status:
description: OLMConfigStatus is the status for an OLMConfig resource.
type: object
Expand Down
76 changes: 72 additions & 4 deletions pkg/package-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,38 @@ import (
"fmt"
"io"
"net"
"os"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
genericserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client"
olminformers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apiserver"
genericpackageserver "github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/apiserver/generic"
"github.com/operator-framework/operator-lifecycle-manager/pkg/package-server/provider"
)

const DefaultWakeupInterval = 5 * time.Minute

type Operator struct {
queueinformer.Operator
olmConfigQueue workqueue.RateLimitingInterface
options *PackageServerOptions
}

// NewCommandStartPackageServer provides a CLI handler for 'start master' command
// with a default PackageServerOptions.
func NewCommandStartPackageServer(ctx context.Context, defaults *PackageServerOptions) *cobra.Command {
Expand All @@ -39,7 +52,7 @@ func NewCommandStartPackageServer(ctx context.Context, defaults *PackageServerOp
}

flags := cmd.Flags()
flags.DurationVar(&defaults.WakeupInterval, "interval", defaults.WakeupInterval, "interval at which to re-sync CatalogSources")
flags.DurationVar(&defaults.SyncInterval, "interval", defaults.SyncInterval, "interval at which to re-sync CatalogSources")
flags.StringVar(&defaults.GlobalNamespace, "global-namespace", defaults.GlobalNamespace, "Name of the namespace where the global CatalogSources are located")
flags.StringVar(&defaults.Kubeconfig, "kubeconfig", defaults.Kubeconfig, "path to the kubeconfig used to connect to the Kubernetes API server and the Kubelets (defaults to in-cluster config)")
flags.BoolVar(&defaults.Debug, "debug", defaults.Debug, "use debug log level")
Expand All @@ -59,7 +72,7 @@ type PackageServerOptions struct {
Features *genericoptions.FeatureOptions

GlobalNamespace string
WakeupInterval time.Duration
SyncInterval time.Duration

Kubeconfig string
RegistryAddr string
Expand All @@ -82,7 +95,7 @@ func NewPackageServerOptions(out, errOut io.Writer) *PackageServerOptions {
Authorization: genericoptions.NewDelegatingAuthorizationOptions(),
Features: genericoptions.NewFeatureOptions(),

WakeupInterval: 5 * time.Minute,
SyncInterval: DefaultWakeupInterval,

DisableAuthForTesting: false,
Debug: false,
Expand Down Expand Up @@ -215,7 +228,41 @@ func (o *PackageServerOptions) Run(ctx context.Context) error {
return err
}

sourceProvider, err := provider.NewRegistryProvider(ctx, crClient, queueOperator, o.WakeupInterval, o.GlobalNamespace)
op := &Operator{
Operator: queueOperator,
olmConfigQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "olmConfig"),
options: o,
}

olmConfigInformer := olminformers.NewSharedInformerFactoryWithOptions(crClient, 0).Operators().V1().OLMConfigs()
olmConfigQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithInformer(olmConfigInformer.Informer()),
queueinformer.WithQueue(op.olmConfigQueue),
queueinformer.WithIndexer(olmConfigInformer.Informer().GetIndexer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOLMConfig).ToSyncer()),
)
if err != nil {
return err
}
if err := op.RegisterQueueInformer(olmConfigQueueInformer); err != nil {
return err
}

// Grab the Sync config
cfg, err := crClient.OperatorsV1().OLMConfigs().Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
log.Warnf("Error retrieving Interval from OLMConfig: '%v'", err)
} else {
if cfg.Spec.Features != nil && cfg.Spec.Features.PackageServerSyncInterval != nil {
o.SyncInterval = cfg.Spec.Features.PackageServerSyncInterval.Duration
log.Infof("Retrieved Interval from OLMConfig: '%v'", o.SyncInterval.String())
} else {
log.Infof("Defaulting Interval to '%v'", DefaultWakeupInterval)
}
}

sourceProvider, err := provider.NewRegistryProvider(ctx, crClient, queueOperator, o.SyncInterval, o.GlobalNamespace)
if err != nil {
return err
}
Expand All @@ -239,3 +286,24 @@ func (o *PackageServerOptions) Run(ctx context.Context) error {

return err
}

func (op *Operator) syncOLMConfig(obj interface{}) error {
olmConfig, ok := obj.(*operatorsv1.OLMConfig)
if !ok {
return fmt.Errorf("casting OLMConfig failed")
}
// restart the pod on change
if olmConfig.Spec.Features == nil || olmConfig.Spec.Features.PackageServerSyncInterval == nil {
if op.options.SyncInterval != DefaultWakeupInterval {
log.Warnf("Change to olmConfig: '%v' != default '%v'", op.options.SyncInterval, DefaultWakeupInterval)
os.Exit(0)
}
} else {
if op.options.SyncInterval != olmConfig.Spec.Features.PackageServerSyncInterval.Duration {
log.Warnf("Change to olmConfig: old '%v' != new '%v'", op.options.SyncInterval, olmConfig.Spec.Features.PackageServerSyncInterval.Duration)
os.Exit(0)
}
}

return nil
}

0 comments on commit 25ed683

Please sign in to comment.