Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding pubsub routing support #631

Merged
merged 4 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ public class SubscriberController {
});
}

@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.examples.pubsub.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.PostMapping;
Expand Down Expand Up @@ -47,4 +48,23 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
});
}

/**
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
Expand All @@ -26,7 +27,6 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -85,21 +85,23 @@ private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embedde
continue;
}


Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
String match = embeddedValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route,
metadata);
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}

} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
throw new IllegalArgumentException("Error while parsing metadata: " + e);
}
}
}
Expand Down
42 changes: 28 additions & 14 deletions sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@

package io.dapr.springboot;

import java.util.ArrayList;
import java.util.HashSet;
import io.dapr.Rule;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Internal Singleton to handle Dapr configuration.
Expand All @@ -30,14 +31,9 @@ class DaprRuntime {
private static volatile DaprRuntime instance;

/**
* List of subscribed topics.
* Map of subscription builders.
*/
private final Set<String> subscribedTopics = new HashSet<>();

/**
* List of subscriptions.
*/
private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();

/**
* Private constructor to make this singleton.
Expand Down Expand Up @@ -67,20 +63,38 @@ public static DaprRuntime getInstance() {
*
* @param pubsubName Pubsub name to subcribe to.
* @param topicName Name of the topic being subscribed to.
* @param rule The optional rule for this route.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);

DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}

if (match.length() > 0) {
builder.addRule(route, match, priority);
} else {
builder.setDefaultPath(route);
}

if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
}

public synchronized DaprTopicSubscription[] listSubscribedTopics() {
return this.subscriptions.toArray(new DaprTopicSubscription[0]);
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

class DaprSubscriptionBuilder {
private final String pubsubName;
private final String topic;
private final List<TopicRule> rules;
private String defaultPath;
private Map<String, String> metadata;

/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
*/
DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
this.rules = new ArrayList<>();
this.defaultPath = null;
this.metadata = Collections.emptyMap();
}

/**
* Sets the default path for the subscription.
* @param path The default path.
* @return this instance.
*/
DaprSubscriptionBuilder setDefaultPath(String path) {
if (defaultPath != null) {
throw new RuntimeException(
String.format(
"a default route is already set for topic %s on pubsub %s",
this.topic, this.pubsubName));
}
defaultPath = path;
return this;
}

/**
* Adds a rule to the subscription.
* @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule.
* @return this instance.
*/
public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException(
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
}
rules.add(new TopicRule(path, match, priority));
return this;
}

/**
* Sets the metadata for the subscription.
* @param metadata The metadata.
* @return this instance.
*/
public DaprSubscriptionBuilder setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}

/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
* @return The DaprTopicSubscription.
*/
public DaprTopicSubscription build() {
String route = null;
DaprTopicRoutes routes = null;

if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath);
} else {
route = defaultPath;
}

return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata);
}

private static class TopicRule {
private final String path;
private final String match;
private final int priority;

public TopicRule(String path, String match, int priority) {
this.path = path;
this.match = match;
this.priority = priority;
}

public int getPriority() {
return priority;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import java.util.Objects;

class DaprTopicKey {
private final String pubsubName;
private final String topic;

DaprTopicKey(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
}

public String getPubsubName() {
return pubsubName;
}

public String getTopic() {
return topic;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DaprTopicKey that = (DaprTopicKey) o;
return pubsubName.equals(that.pubsubName) && topic.equals(that.topic);
}

@Override
public int hashCode() {
return Objects.hash(pubsubName, topic);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.springboot;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Optional;

class DaprTopicRoutes {
private final List<DaprTopicRule> rules;
@JsonProperty("default")
private final String defaultRoute;

DaprTopicRoutes(List<DaprTopicRule> rules, String defaultRoute) {
this.rules = rules;
this.defaultRoute = defaultRoute;
}

public List<DaprTopicRule> getRules() {
return rules;
}

public String getDefaultRoute() {
return defaultRoute;
}
}
Loading