Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Move the common AllowedRegionPolicy computation to dataresidency/defa…
Browse files Browse the repository at this point in the history
…ult.go
  • Loading branch information
Jimmy Lin committed Sep 16, 2020
1 parent 2a5488d commit a61b0d5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 11 deletions.
21 changes: 21 additions & 0 deletions pkg/apis/configs/dataresidency/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package dataresidency

import (
"cloud.google.com/go/pubsub"
)

// Defaults includes the default values to be populated by the Webhook.
type Defaults struct {
// ClusterDefaults are the data residency defaults to use for all namepaces
Expand All @@ -39,6 +43,23 @@ func (d *Defaults) scoped() *ScopedDefaults {
return scopedDefaults
}

// AllowedPersistenceRegions gets the AllowedPersistenceRegions setting in the default.
func (d *Defaults) AllowedPersistenceRegions() []string {
return d.scoped().AllowedPersistenceRegions
}

// ComputeAllowedPersistenceRegions computes the final message storage policy in
// topicConfig. Return true if the topicConfig is updated.
func (d *Defaults) ComputeAllowedPersistenceRegions(topicConfig *pubsub.TopicConfig) bool {
// We can do subset of both in the future, but for now, we just overwrite the
// configuration as the relationship between region and zones are not clear to handle,
// eg. us-east1 vs us-east1-a. Important note: setting the AllowedPersistenceRegions
// to empty string slice is an error, should set it to nil for all regions.
allowedRegions := d.AllowedPersistenceRegions()
if allowedRegions == nil || len(allowedRegions) == 0 {
return false
}

topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
return true
}
9 changes: 5 additions & 4 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,11 @@ func (r *Reconciler) reconcileDecouplingTopicAndSubscription(ctx context.Context
// Check if topic exists, and if not, create it.
topicID := resources.GenerateDecouplingTopicName(b)
topicConfig := &pubsub.TopicConfig{Labels: labels}
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 {
topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig))
if r.dataresidencyStore != nil {
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) {
logging.FromContext(ctx).Info("Updated Topic Config for Broker", zap.Any("topicConfig", *topicConfig))
}
}
}
topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, b, &b.Status)
Expand Down
4 changes: 1 addition & 3 deletions pkg/reconciler/intevents/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, topic *v1.Topic) error
topicConfig := &pubsub.TopicConfig{}
if r.dataresidencyStore != nil {
if dataresidencyCfg := r.dataresidencyStore.Load(); dataresidencyCfg != nil {
// Empty region list is an error in pubsub, we will left it unset in this case which means all regions
if allowedRegions := dataresidencyCfg.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 {
topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
if dataresidencyCfg.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) {
r.Logger.Infow("Updated Topic Config", zap.Any("topicConfig", *topicConfig))
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,11 @@ func (r *Reconciler) reconcileRetryTopicAndSubscription(ctx context.Context, tri
// Check if topic exists, and if not, create it.
topicID := resources.GenerateRetryTopicName(trig)
topicConfig := &pubsub.TopicConfig{Labels: labels}
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if allowedRegions := dataresidencyConfig.DataResidencyDefaults.AllowedPersistenceRegions(); len(allowedRegions) != 0 {
topicConfig.MessageStoragePolicy.AllowedPersistenceRegions = allowedRegions
logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig))
if r.dataresidencyStore != nil {
if dataresidencyConfig := r.dataresidencyStore.Load(); dataresidencyConfig != nil {
if dataresidencyConfig.DataResidencyDefaults.ComputeAllowedPersistenceRegions(topicConfig) {
logging.FromContext(ctx).Info("Updated Topic Config for Trigger", zap.Any("topicConfig", *topicConfig))
}
}
}
topic, err := pubsubReconciler.ReconcileTopic(ctx, topicID, topicConfig, trig, &trig.Status)
Expand Down

0 comments on commit a61b0d5

Please sign in to comment.