Skip to content

Commit

Permalink
SSE asyncapi server, client (#1085)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored Jun 18, 2024
1 parent bface68 commit 4000687
Show file tree
Hide file tree
Showing 32 changed files with 9,796 additions and 90 deletions.
12 changes: 10 additions & 2 deletions runtime/binding-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-sse</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-kafka</artifactId>
Expand Down Expand Up @@ -187,7 +193,7 @@
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core mqtt http kafka asyncapi</scopeNames>
<scopeNames>core mqtt http sse kafka asyncapi</scopeNames>
<packageName>io.aklivity.zilla.runtime.binding.asyncapi.internal.types</packageName>
</configuration>
<executions>
Expand Down Expand Up @@ -222,11 +228,13 @@
<includes>io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.schema.patch.json,
io/aklivity/zilla/specs/binding/mqtt/kafka/schema/mqtt.kafka.schema.patch.json,
io/aklivity/zilla/specs/binding/http/schema/http.schema.patch.json,
io/aklivity/zilla/specs/binding/sse/schema/sse.schema.patch.json,
io/aklivity/zilla/specs/binding/kafka/schema/kafka.schema.patch.json,
io/aklivity/zilla/specs/binding/tcp/schema/tcp.schema.patch.json,
io/aklivity/zilla/specs/binding/tls/schema/tls.schema.patch.json,
io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.2.6.0.schema.json,
io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.3.0.1.schema.json
io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.3.0.1.schema.json,
io/aklivity/zilla/specs/binding/asyncapi/schema/asyncapi.3.0.2-zilla.schema.json
</includes>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@

public class AsyncapiParser
{
private static final Pattern VERSION_PATTERN = Pattern.compile("(\\d\\.\\d)\\.\\d+");
private static final Pattern VERSION_PATTERN = Pattern.compile("(?!\\.)(\\d+(\\.\\d+)+)(?:[-.][a-zA-Z]+)?(?![\\d.])$");

private final Map<String, JsonSchema> schemas;

public AsyncapiParser()
{
Map<String, JsonSchema> schemas = new Object2ObjectHashMap<>();
schemas.put("2.6", schema("2.6.0"));
schemas.put("3.0", schema("3.0.1"));
schemas.put("2.6.0", schema("2.6.0"));
schemas.put("3.0.0", schema("3.0.1"));
schemas.put("3.0.2-zilla", schema("3.0.2-zilla"));
this.schemas = unmodifiableMap(schemas);
}

Expand Down Expand Up @@ -115,8 +116,7 @@ private String detectAsyncApiVersion(
final String versionString = json.getString("asyncapi");
final Matcher matcher = VERSION_PATTERN.matcher(versionString);

final String majorMinorVersion = matcher.matches() ? matcher.group(1) : null;
return majorMinorVersion;
return matcher.matches() ? matcher.group(0) : null;
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.engine.config.KindConfig.CACHE_CLIENT;
import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY;
import static io.aklivity.zilla.runtime.engine.config.KindConfig.SERVER;
import static java.util.stream.Collectors.toList;

import java.util.ArrayList;
Expand Down Expand Up @@ -197,11 +198,32 @@ public void attach(
{
Integer k = entry.getKey();
NamespaceConfig v = entry.getValue();
List<BindingConfig> bindings = v.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") ||
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
b.type.equals("http-kafka"))
.collect(toList());
List<BindingConfig> bindings;
boolean containsSse = v.bindings.stream().anyMatch(b -> b.type.equals("sse"));
if (containsSse)
{
if (binding.kind.equals(SERVER))
{
bindings = v.bindings.stream()
.filter(b -> b.type.equals("http") || b.type.equals("http-kafka") || b.type.equals("sse"))
.collect(toList());
}
else
{
bindings = v.bindings.stream()
.filter(b -> b.type.equals("sse"))
.collect(toList());
}
}
else
{
bindings = v.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") || b.type.equals("sse") ||
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
b.type.equals("http-kafka"))
.collect(toList());
}

extractResolveId(k, bindings);
extractNamespace(k, bindings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ public NamespaceConfig generate(
BindingConfig binding,
AsyncapiNamespaceConfig namespaceConfig)
{
List<AsyncapiServerView> servers = namespaceConfig.servers;
AsyncapiOptionsConfig options = binding.options != null ? (AsyncapiOptionsConfig) binding.options : EMPTY_OPTION;
final List<AsyncapiServerView> servers = namespaceConfig.servers;
final AsyncapiOptionsConfig options = binding.options != null ? (AsyncapiOptionsConfig) binding.options : EMPTY_OPTION;
final List<MetricRefConfig> metricRefs = binding.telemetryRef != null ?
binding.telemetryRef.metricRefs : emptyList();

//TODO: keep it until we support different protocols on the same composite binding
AsyncapiServerView serverView = servers.get(0);
this.protocol = serverView.getAsyncapiProtocol();
int[] compositeSecurePorts = resolvePorts(servers, true);
this.isTlsEnabled = compositeSecurePorts.length > 0;

Expand All @@ -48,24 +45,70 @@ public NamespaceConfig generate(
.name(String.format("%s.%s-%s", qname, "$composite", namespace))
.inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty()))
.inject(n -> this.injectCatalog(n, namespaceConfig.asyncapis))
.inject(n -> protocol.injectProtocolClientCache(n, metricRefs))
.binding()
.name(String.format("%s_client0", protocol.scheme))
.type(protocol.scheme)
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, protocol.scheme))
.inject(protocol::injectProtocolClientOptions)
.exit(isTlsEnabled ? "tls_client0" : "tcp_client0")
.build()
.inject(n -> injectTlsClient(n, options, metricRefs))
.inject(n -> this.injectProtocolClients(n, servers, metricRefs))
.inject(n -> this.injectProtocolRelatedBindings(n, servers, metricRefs))
.inject(n -> this.injectTlsClient(n, options, metricRefs))
.inject(n -> this.injectTcpClient(n, servers, options, metricRefs))
.build();
}

private <C> NamespaceConfigBuilder<C> injectTcpClient(
NamespaceConfigBuilder<C> namespace,
List<AsyncapiServerView> servers,
AsyncapiOptionsConfig options,
List<MetricRefConfig> metricRefs)
{
for (AsyncapiServerView server : servers)
{
final AsyncapiProtocol protocol = server.getAsyncapiProtocol();
namespace = namespace
.binding()
.name("tcp_client0")
.type("tcp")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tcp"))
.inject(b -> this.injectMetrics(b, metricRefs))
.options(!protocol.scheme.equals(AyncapiKafkaProtocol.SCHEME) ? options.tcp : null)
.build()
.build();
}

return namespace;
}

private <C> NamespaceConfigBuilder<C> injectProtocolClients(
NamespaceConfigBuilder<C> namespace,
List<AsyncapiServerView> servers,
List<MetricRefConfig> metricRefs)
{
for (AsyncapiServerView server : servers)
{
final AsyncapiProtocol protocol = server.getAsyncapiProtocol();
final String scheme = protocol.scheme;
final String exit = "sse".equals(scheme) ? "http_client0" : isTlsEnabled ? "tls_client0" : "tcp_client0";
namespace = namespace
.inject(n -> protocol.injectProtocolClientCache(n, metricRefs))
.binding()
.name(String.format("%s_client0", scheme))
.type(scheme)
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs))
.inject(protocol::injectProtocolClientOptions)
.exit(exit)
.build();
}
return namespace;
}

protected <C> NamespaceConfigBuilder<C> injectProtocolRelatedBindings(
NamespaceConfigBuilder<C> namespace,
List<AsyncapiServerView> servers,
List<MetricRefConfig> metricRefs)
{
for (AsyncapiServerView server : servers)
{
final AsyncapiProtocol protocol = server.getAsyncapiProtocol();
namespace = protocol.injectProtocolRelatedClientBindings(namespace, metricRefs, isTlsEnabled);
}
return namespace;
}

private <C> NamespaceConfigBuilder<C> injectTlsClient(
Expand All @@ -80,7 +123,7 @@ private <C> NamespaceConfigBuilder<C> injectTlsClient(
.name("tls_client0")
.type("tls")
.kind(CLIENT)
.inject(b -> this.injectMetrics(b, metricRefs, "tls"))
.inject(b -> this.injectMetrics(b, metricRefs))
.options(options.tls)
.vault(String.format("%s:%s", this.namespace, vault))
.exit("tcp_client0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,40 @@ public <C> BindingConfigBuilder<C> injectProtocolServerRoutes(
for (Map.Entry<String, AsyncapiServer> entry : asyncapi.servers.entrySet())
{
AsyncapiServerView server = AsyncapiServerView.of(entry.getValue());
for (String name : asyncapi.operations.keySet())
if ("http".equals(server.protocol()))
{
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address().replaceAll("\\{[^}]+\\}", "*");
String method = operation.bindings.get("http").method;
binding
.route()
.exit(qname)
.when(HttpConditionConfig::builder)
.header(":path", path)
.header(":method", method)
.build()
.inject(route -> injectHttpServerRouteGuarded(route, server))
.build();
for (String name : asyncapi.operations.keySet())
{
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address().replaceAll("\\{[^}]+\\}", "*");
String method = operation.bindings.get("http").method;
binding
.route()
.exit(qname)
.when(HttpConditionConfig::builder)
.header(":path", path)
.header(":method", method)
.build()
.inject(route -> injectHttpServerRouteGuarded(route, server))
.build();
}
}
else if ("sse".equals(server.protocol()))
{
for (String name : asyncapi.operations.keySet())
{
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address().replaceAll("\\{[^}]+\\}", "*");
binding
.route()
.exit("sse_server0")
.when(HttpConditionConfig::builder)
.header(":path", path)
.build()
.build();
}
}
}
}
Expand Down Expand Up @@ -154,10 +173,11 @@ private <C> HttpOptionsConfigBuilder<C> injectHttpServerRequests(
AsyncapiOperation operation = asyncapi.operations.get(name);
AsyncapiChannelView channel = AsyncapiChannelView.of(asyncapi.channels, operation.channel);
String path = channel.address();
Method method = Method.valueOf(operation.bindings.get("http").method);
if (channel.messages() != null && !channel.messages().isEmpty() ||

if (operation.bindings != null && channel.messages() != null && !channel.messages().isEmpty() ||
channel.parameters() != null && !channel.parameters().isEmpty())
{
Method method = Method.valueOf(operation.bindings.get("http").method);
options
.request()
.path(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.MINIMIZE_QUOTES;
import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;
import static io.aklivity.zilla.runtime.common.feature.FeatureFilter.featureEnabled;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

Expand Down Expand Up @@ -67,7 +68,6 @@ public abstract class AsyncapiNamespaceGenerator

protected Map<String, Asyncapi> asyncapis;
protected boolean isTlsEnabled;
protected AsyncapiProtocol protocol;
protected String qname;
protected String namespace;
protected String qvault;
Expand Down Expand Up @@ -104,7 +104,7 @@ protected AsyncapiProtocol resolveProtocol(
List<Asyncapi> asyncapis,
List<AsyncapiServerView> servers)
{
Pattern pattern = Pattern.compile("(http|mqtt|kafka)");
Pattern pattern = Pattern.compile("(http|sse|mqtt|kafka)");
Matcher matcher = pattern.matcher(protocolName);
AsyncapiProtocol protocol = null;
if (matcher.find())
Expand All @@ -114,6 +114,14 @@ protected AsyncapiProtocol resolveProtocol(
case "http":
protocol = new AsyncapiHttpProtocol(qname, asyncapis, options, protocolName);
break;
case "sse":
case "secure-sse":
if (featureEnabled(AsyncapiSseProtocol.class))
{
final boolean httpServerAvailable = servers.stream().anyMatch(s -> "http".equals(s.protocol()));
protocol = new AsyncapiSseProtocol(qname, httpServerAvailable, asyncapis, options, protocolName);
}
break;
case "mqtt":
protocol = new AsyncapiMqttProtocol(qname, asyncapis, options, protocolName, namespace);
break;
Expand Down Expand Up @@ -187,6 +195,20 @@ public int[] resolvePorts(
return ports;
}

public int[] resolvePortForServer(
AsyncapiServerView server,
boolean secure)
{
int[] ports = {};

if (server.getAsyncapiProtocol().isSecure() == secure)
{
ports = new int[] { server.getPort() };
}

return ports;
}

protected <C> NamespaceConfigBuilder<C> injectCatalog(
NamespaceConfigBuilder<C> namespace,
List<Asyncapi> asyncapis)
Expand Down Expand Up @@ -276,8 +298,7 @@ protected static String writeSchemaYaml(

protected <C> BindingConfigBuilder<C> injectMetrics(
BindingConfigBuilder<C> binding,
List<MetricRefConfig> metricRefs,
String protocol)
List<MetricRefConfig> metricRefs)
{
List<MetricRefConfig> metrics = metricRefs.stream()
.filter(m -> m.name.startsWith("stream."))
Expand Down
Loading

0 comments on commit 4000687

Please sign in to comment.