Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Add data residency support to broker and trigger, not using injection
Browse files Browse the repository at this point in the history
  • Loading branch information
Jimmy Lin committed Sep 16, 2020
1 parent 6afc6d5 commit 2a5488d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
10 changes: 10 additions & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"fmt"

"github.com/google/knative-gcp/pkg/apis/configs/dataresidency"

"cloud.google.com/go/pubsub"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -56,6 +58,8 @@ type Reconciler struct {

// pubsubClient is used as the Pubsub client when present.
pubsubClient *pubsub.Client

dataresidencyStore *dataresidency.Store
}

// Check that Reconciler implements Interface
Expand Down Expand Up @@ -145,6 +149,12 @@ func (r *Reconciler) reconcileDecouplingTopicAndSubscription(ctx context.Context
// Check if topic exists, and if not, create it.
topicID := resources.GenerateDecouplingTopicName(b)
topicConfig := &pubsub.TopicConfig{Labels: labels}
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 {
topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig))
}
}
topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, b, &b.Status)
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"os"

"github.com/google/knative-gcp/pkg/apis/configs/dataresidency"

"cloud.google.com/go/pubsub"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -72,10 +74,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
}()
}

dataresidencySingleton := &dataresidency.StoreSingleton{}

r := &Reconciler{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
brokerCellLister: bcInformer.Lister(),
pubsubClient: client,
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
brokerCellLister: bcInformer.Lister(),
pubsubClient: client,
dataresidencyStore: dataresidencySingleton.Store(ctx, cmw),
}

impl := brokerreconciler.NewImpl(ctx, r, brokerv1beta1.BrokerClass)
Expand Down
12 changes: 7 additions & 5 deletions pkg/reconciler/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/pkg/resolver"

brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
"github.com/google/knative-gcp/pkg/apis/configs/dataresidency"
brokerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/broker"
triggerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/trigger"
triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger"
Expand Down Expand Up @@ -90,12 +91,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
client.Close()
}()
}

dataresidencySingleton := &dataresidency.StoreSingleton{}
r := &Reconciler{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
brokerLister: brokerinformer.Get(ctx).Lister(),
pubsubClient: client,
projectID: projectID,
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
brokerLister: brokerinformer.Get(ctx).Lister(),
pubsubClient: client,
projectID: projectID,
dataresidencyStore: dataresidencySingleton.Store(ctx, cmw),
}

impl := triggerreconciler.NewImpl(ctx, r, withAgentAndFinalizer)
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"cloud.google.com/go/pubsub"
brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1"
"github.com/google/knative-gcp/pkg/apis/configs/dataresidency"
triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger"
brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
Expand Down Expand Up @@ -81,6 +82,8 @@ type Reconciler struct {

// pubsubClient is used as the Pubsub client when present.
pubsubClient *pubsub.Client

dataresidencyStore *dataresidency.Store
}

// Check that TriggerReconciler implements Interface
Expand Down Expand Up @@ -223,6 +226,12 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri
// Check if topic exists, and if not, create it.
topicID := resources.GenerateRetryTopicName(trig)
topicConfig := &pubsub.TopicConfig{Labels: labels}
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 {
topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig))
}
}
topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, trig, &trig.Status)
if err != nil {
return err
Expand Down

0 comments on commit 2a5488d

Please sign in to comment.