diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index a234af371b..35ecc4d886 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -22,6 +22,8 @@ import ( "context" "fmt" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" + "cloud.google.com/go/pubsub" "go.uber.org/multierr" "go.uber.org/zap" @@ -56,6 +58,8 @@ type Reconciler struct { // pubsubClient is used as the Pubsub client when present. pubsubClient *pubsub.Client + + dataresidencyStore *dataresidency.Store } // Check that Reconciler implements Interface @@ -145,6 +149,12 @@ func (r *Reconciler) reconcileDecouplingTopicAndSubscription(ctx context.Context // Check if topic exists, and if not, create it. topicID := resources.GenerateDecouplingTopicName(b) topicConfig := &pubsub.TopicConfig{Labels: labels} + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 { + topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions + logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig)) + } + } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, b, &b.Status) if err != nil { return err diff --git a/pkg/reconciler/broker/controller.go b/pkg/reconciler/broker/controller.go index e022dc0263..7ba6705583 100644 --- a/pkg/reconciler/broker/controller.go +++ b/pkg/reconciler/broker/controller.go @@ -20,6 +20,8 @@ import ( "context" "os" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" + "cloud.google.com/go/pubsub" "go.uber.org/zap" "k8s.io/apimachinery/pkg/labels" @@ -72,10 +74,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl }() } + dataresidencySingleton := &dataresidency.StoreSingleton{} + r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - brokerCellLister: bcInformer.Lister(), - pubsubClient: client, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + brokerCellLister: bcInformer.Lister(), + pubsubClient: client, + dataresidencyStore: dataresidencySingleton.Store(ctx, cmw), } impl := brokerreconciler.NewImpl(ctx, r, brokerv1beta1.BrokerClass) diff --git a/pkg/reconciler/trigger/controller.go b/pkg/reconciler/trigger/controller.go index 72762341b8..c651bf1a61 100644 --- a/pkg/reconciler/trigger/controller.go +++ b/pkg/reconciler/trigger/controller.go @@ -38,6 +38,7 @@ import ( "knative.dev/pkg/resolver" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" brokerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/broker" triggerinformer "github.com/google/knative-gcp/pkg/client/injection/informers/broker/v1beta1/trigger" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" @@ -90,12 +91,13 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl client.Close() }() } - + dataresidencySingleton := &dataresidency.StoreSingleton{} r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - brokerLister: brokerinformer.Get(ctx).Lister(), - pubsubClient: client, - projectID: projectID, + Base: reconciler.NewBase(ctx, controllerAgentName, cmw), + brokerLister: brokerinformer.Get(ctx).Lister(), + pubsubClient: client, + projectID: projectID, + dataresidencyStore: dataresidencySingleton.Store(ctx, cmw), } impl := triggerreconciler.NewImpl(ctx, r, withAgentAndFinalizer) diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 9a721550ee..7024d66e88 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -36,6 +36,7 @@ import ( "cloud.google.com/go/pubsub" brokerv1beta1 "github.com/google/knative-gcp/pkg/apis/broker/v1beta1" + "github.com/google/knative-gcp/pkg/apis/configs/dataresidency" triggerreconciler "github.com/google/knative-gcp/pkg/client/injection/reconciler/broker/v1beta1/trigger" brokerlisters "github.com/google/knative-gcp/pkg/client/listers/broker/v1beta1" metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" @@ -81,6 +82,8 @@ type Reconciler struct { // pubsubClient is used as the Pubsub client when present. pubsubClient *pubsub.Client + + dataresidencyStore *dataresidency.Store } // Check that TriggerReconciler implements Interface @@ -223,6 +226,12 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri // Check if topic exists, and if not, create it. topicID := resources.GenerateRetryTopicName(trig) topicConfig := &pubsub.TopicConfig{Labels: labels} + if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil { + if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 { + topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions + logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig)) + } + } topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, trig, &trig.Status) if err != nil { return err