Skip to content

Commit

Permalink
Adding pubsub routing support (dapr#631)
Browse files Browse the repository at this point in the history
* Adding pubsub routing support

Signed-off-by: Artur Souza <[email protected]>

* remove deprecated classes (dapr#704)

* remove deprecated classes

Signed-off-by: Mukundan Sundararajan <[email protected]>

* remove builder class ref in ITs

Signed-off-by: Mukundan Sundararajan <[email protected]>

* fix setter in IT

Signed-off-by: Mukundan Sundararajan <[email protected]>
Signed-off-by: Artur Souza <[email protected]>

* Address PR comments.

Signed-off-by: Artur Souza <[email protected]>

Co-authored-by: Mukundan Sundararajan <[email protected]>
Co-authored-by: Artur Souza <[email protected]>
Signed-off-by: naah69 <[email protected]>
  • Loading branch information
3 people authored and naah69 committed Jul 7, 2022
1 parent 704fc20 commit 6358b27
Show file tree
Hide file tree
Showing 17 changed files with 523 additions and 26 deletions.
14 changes: 14 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ public class SubscriberController {
});
}

@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.examples.pubsub.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -47,4 +48,23 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
});
}

/**
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
Expand All @@ -26,7 +27,6 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -85,21 +85,23 @@ private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embedde
continue;
}


Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
String match = embeddedValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route,
metadata);
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}

} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
throw new IllegalArgumentException("Error while parsing metadata: " + e);
}
}
}
Expand Down
42 changes: 28 additions & 14 deletions sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

package io.dapr.springboot;

import java.util.ArrayList;
import java.util.HashSet;
import io.dapr.Rule;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Internal Singleton to handle Dapr configuration.
Expand All @@ -30,14 +31,9 @@ class DaprRuntime {
private static volatile DaprRuntime instance;

/**
* List of subscribed topics.
* Map of subscription builders.
*/
private final Set<String> subscribedTopics = new HashSet<>();

/**
* List of subscriptions.
*/
private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();

/**
* Private constructor to make this singleton.
Expand Down Expand Up @@ -67,20 +63,38 @@ public static DaprRuntime getInstance() {
*
* @param pubsubName Pubsub name to subcribe to.
* @param topicName Name of the topic being subscribed to.
* @param rule The optional rule for this route.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);

DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}

if (match.length() > 0) {
builder.addRule(route, match, priority);
} else {
builder.setDefaultPath(route);
}

if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
}

public synchronized DaprTopicSubscription[] listSubscribedTopics() {
return this.subscriptions.toArray(new DaprTopicSubscription[0]);
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

class DaprSubscriptionBuilder {
private final String pubsubName;
private final String topic;
private final List<TopicRule> rules;
private String defaultPath;
private Map<String, String> metadata;

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
*/
DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
this.rules = new ArrayList<>();
this.defaultPath = null;
this.metadata = Collections.emptyMap();
}

/**
* Sets the default path for the subscription.
* @param path The default path.
* @return this instance.
*/
DaprSubscriptionBuilder setDefaultPath(String path) {
if (defaultPath != null) {
throw new RuntimeException(
String.format(
"a default route is already set for topic %s on pubsub %s",
this.topic, this.pubsubName));
}
defaultPath = path;
return this;
}

/**
* Adds a rule to the subscription.
* @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule.
* @return this instance.
*/
public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException(
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
}
rules.add(new TopicRule(path, match, priority));
return this;
}

/**
* Sets the metadata for the subscription.
* @param metadata The metadata.
* @return this instance.
*/
public DaprSubscriptionBuilder setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}

/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
* @return The DaprTopicSubscription.
*/
public DaprTopicSubscription build() {
String route = null;
DaprTopicRoutes routes = null;

if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath);
} else {
route = defaultPath;
}

return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata);
}

private static class TopicRule {
private final String path;
private final String match;
private final int priority;

public TopicRule(String path, String match, int priority) {
this.path = path;
this.match = match;
this.priority = priority;
}

public int getPriority() {
return priority;
}
}
}
51 changes: 51 additions & 0 deletions sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import java.util.Objects;

class DaprTopicKey {
private final String pubsubName;
private final String topic;

DaprTopicKey(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
}

public String getPubsubName() {
return pubsubName;
}

public String getTopic() {
return topic;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DaprTopicKey that = (DaprTopicKey) o;
return pubsubName.equals(that.pubsubName) && topic.equals(that.topic);
}

@Override
public int hashCode() {
return Objects.hash(pubsubName, topic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Optional;

class DaprTopicRoutes {
private final List<DaprTopicRule> rules;
@JsonProperty("default")
private final String defaultRoute;

DaprTopicRoutes(List<DaprTopicRule> rules, String defaultRoute) {
this.rules = rules;
this.defaultRoute = defaultRoute;
}

public List<DaprTopicRule> getRules() {
return rules;
}

public String getDefaultRoute() {
return defaultRoute;
}
}
Loading

0 comments on commit 6358b27

Please sign in to comment.