Skip to content

Commit

Permalink
feat: add property to customize universe domain in Pub/Sub (#3348)
Browse files Browse the repository at this point in the history
* feat: add property to customize universe domain in Pub/Sub
  • Loading branch information
mpeddada1 authored Nov 1, 2024
1 parent f2212d3 commit 9cf2145
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ public PublisherFactory defaultPublisherFactory(
batchingSettings.ifAvailable(factory::setBatchingSettings);
factory.setEnableMessageOrdering(gcpPubSubProperties.getPublisher().getEnableMessageOrdering());
factory.setEndpoint(gcpPubSubProperties.getPublisher().getEndpoint());
factory.setUniverseDomain(gcpPubSubProperties.getPublisher().getUniverseDomain());

List<PublisherCustomizer> customizers = customizersProvider.orderedStream()
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,56 @@ void flowControlSettings_multipleKeysForSameSubscription_firstOneUsed(CapturedOu
});
}

@Test
void subscriberUniverseDomain_selectiveConfigurationSet() {
contextRunner
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);

assertThat(
gcpPubSubProperties.computeSubscriberUniverseDomain(
"subscription-name", projectIdProvider.getProjectId()))
.isEqualTo("example.com");
});
}

@Test
void subscriberUniverseDomain_globalAndSelectiveConfigurationSet_selectiveTakesPrecedence() {
contextRunner
.withPropertyValues(
"spring.cloud.gcp.pubsub.subscriber.universe-domain=example1.com",
"spring.cloud.gcp.pubsub.subscription.subscription-name.universe-domain=example2.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
GcpProjectIdProvider projectIdProvider = ctx.getBean(GcpProjectIdProvider.class);

assertThat(
gcpPubSubProperties.computeSubscriberUniverseDomain(
"subscription-name", projectIdProvider.getProjectId()))
.isEqualTo("example2.com");
});
}

@Test
void publisherUniverseDomain() {
contextRunner
.withPropertyValues("spring.cloud.gcp.pubsub.publisher.universe-domain=example.com")
.run(
ctx -> {
GcpPubSubProperties gcpPubSubProperties = ctx.getBean(GcpPubSubProperties.class);
CachingPublisherFactory publisherFactory =
ctx.getBean("defaultPublisherFactory", CachingPublisherFactory.class);
assertThat(gcpPubSubProperties.getPublisher().getUniverseDomain())
.isEqualTo("example.com");
assertThat(publisherFactory)
.hasFieldOrPropertyWithValue("delegate.universeDomain", "example.com");
});
}

@Configuration
static class CustomizerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ public String computePullEndpoint(String subscriptionName, String projectId) {
return pullEndpoint != null ? pullEndpoint : this.globalSubscriber.getPullEndpoint();
}

/**
* Returns the universe domain. The subscription-specific property takes precedence if both global
* and subscription-specific properties are set. If subscription-specific configuration is not set
* then the global configuration is picked.
*
* @param subscriptionName subscription name
* @param projectId project id
* @return pull endpoint
*/
public String computeSubscriberUniverseDomain(String subscriptionName, String projectId) {
String universeDomain =
getSubscriptionProperties(PubSubSubscriptionUtils.toProjectSubscriptionName(subscriptionName, projectId))
.getUniverseDomain();
return universeDomain != null ? universeDomain : this.globalSubscriber.getUniverseDomain();
}

/**
* Computes the retry settings. The subscription-specific property takes precedence if both global
* and subscription-specific properties are set. If subscription-specific settings are not set
Expand Down Expand Up @@ -384,6 +400,8 @@ public static class Publisher {
/** Set publisher endpoint. Example: "us-east1-pubsub.googleapis.com:443". */
private String endpoint;

private String universeDomain;

public Batching getBatching() {
return this.batching;
}
Expand Down Expand Up @@ -441,6 +459,14 @@ public String getEndpoint() {
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public String getUniverseDomain() {
return universeDomain;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}
}

/** Subscriber settings. */
Expand Down Expand Up @@ -487,6 +513,12 @@ public static class Subscriber {
/** RPC status codes that should be retried when pulling messages. */
private Code[] retryableCodes = null;

/**
* Universe domain of the client which is part of the endpoint that is formatted as
* `${service}.${universeDomain}:${port}`.
*/
private String universeDomain;

public String getFullyQualifiedName() {
return fullyQualifiedName;
}
Expand Down Expand Up @@ -571,6 +603,14 @@ public int getMaxAcknowledgementThreads() {
public void setMaxAcknowledgementThreads(int maxAcknowledgementThreads) {
this.maxAcknowledgementThreads = maxAcknowledgementThreads;
}

public String getUniverseDomain() {
return universeDomain;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}
}

/** Health Check settings. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class DefaultPublisherFactory implements PublisherFactory {

private String endpoint;

private String universeDomain;

private List<PublisherCustomizer> customizers;

/**
Expand Down Expand Up @@ -151,6 +153,10 @@ public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}

/**
* Accepts a list of {@link Publisher.Builder} customizers.
* The customizers are applied in the order provided, so the later customizers can override
Expand Down Expand Up @@ -222,6 +228,10 @@ void applyPublisherSettings(Publisher.Builder publisherBuilder) {
if (this.endpoint != null) {
publisherBuilder.setEndpoint(this.endpoint);
}

if (this.universeDomain != null) {
publisherBuilder.setUniverseDomain(this.universeDomain);
}
}

void applyCustomizers(Publisher.Builder publisherBuilder, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class DefaultSubscriberFactory implements SubscriberFactory {

private String pullEndpoint;

private String universeDomain;

private ApiClock apiClock;

private RetrySettings subscriberStubRetrySettings;
Expand Down Expand Up @@ -300,6 +302,12 @@ public Subscriber createSubscriber(String subscriptionName, MessageReceiver rece
subscriberBuilder.setParallelPullCount(pullCount);
}

String universeDomain = getUniverseDomain(subscriptionName);
if (universeDomain != null) {
subscriberBuilder.setUniverseDomain(universeDomain);
}


Subscriber subscriber = subscriberBuilder.build();

if (shouldAddToHealthCheck) {
Expand Down Expand Up @@ -557,6 +565,13 @@ public Code[] getRetryableCodes(String subscriptionName) {
return this.pubSubConfiguration.computeRetryableCodes(subscriptionName, projectId);
}

String getUniverseDomain(String subscriptionName) {
if (this.universeDomain != null) {
return this.universeDomain;
}
return this.pubSubConfiguration.computeSubscriberUniverseDomain(subscriptionName, projectId);
}

public void setExecutorProviderMap(Map<ProjectSubscriptionName, ExecutorProvider> executorProviderMap) {
this.executorProviderMap = executorProviderMap;
}
Expand All @@ -582,6 +597,10 @@ public void setRetrySettingsMap(Map<ProjectSubscriptionName, RetrySettings> retr
this.retrySettingsMap = retrySettingsMap;
}

public void setUniverseDomain(String universeDomain) {
this.universeDomain = universeDomain;
}

public void setGlobalRetrySettings(RetrySettings retrySettings) {
this.globalRetrySettings = retrySettings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ void testCreateSubscriber_validateSetProperties() {
when(mockPubSubConfiguration.computePullEndpoint(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn("test.endpoint");
when(mockPubSubConfiguration.computeSubscriberUniverseDomain(
"defaultSubscription", projectIdProvider.getProjectId()))
.thenReturn("example.com");

Subscriber expectedSubscriber =
factory.createSubscriber("defaultSubscription", (message, consumer) -> {});
Expand All @@ -385,7 +388,8 @@ void testCreateSubscriber_validateSetProperties() {
.hasFieldOrPropertyWithValue("minDurationPerAckExtension", Duration.ofSeconds(3L))
.hasFieldOrPropertyWithValue("maxDurationPerAckExtension", Duration.ofSeconds(4L))
.hasFieldOrPropertyWithValue("numPullers", 2)
.hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint");
.hasFieldOrPropertyWithValue("subStubSettings.endpoint", "test.endpoint")
.hasFieldOrPropertyWithValue("subStubSettings.universeDomain", "example.com");
}

@Test
Expand Down

0 comments on commit 9cf2145

Please sign in to comment.