Skip to content

Commit

Permalink
Merge pull request #34716 from quarkusio/dependabot/maven/smallrye-re…
Browse files Browse the repository at this point in the history
…active-messaging.version-4.8.0

Bump smallrye-reactive-messaging.version from 4.7.0 to 4.8.0
  • Loading branch information
cescoffier authored Jul 16, 2023
2 parents b0c2f02 + d3c039b commit 35b1b74
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.5.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.7.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.8.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.3.1</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
85 changes: 84 additions & 1 deletion docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,12 @@ The Quarkus Pulsar extension supports 4 strategies:
The negative acknowledgment can be further configured using `negativeAckRedeliveryDelayMicros` and `negativeAck.redeliveryBackoff` properties.
- `fail` fail the application, no more messages will be processed.
- `ignore` the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.
- `continue` the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with <<ack-timeout>> configuration.
- `reconsume-later` sends the message to the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic[retry letter topic] using the `reconsumeLater` API to be reconsumed with a delay.
The delay can be configured using the `reconsumeLater.delay` property and defaults to 3 seconds.
Custom delay or properties per message can be configured by adding an instance of `io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata` to the failure metadata.

[[ack-timeout]]
==== Acknowledgement timeout

Similar to the negative acknowledgement, with the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#acknowledgment-timeout[acknowledgement timeout] mechanism, the Pulsar client tracks the unacknowledged messages,
Expand All @@ -307,7 +309,7 @@ The `ackTimeout.redeliveryBackoff` value accepts comma separated values of min d

[source, properties]
----
mp.messaging.incoming.out.failure-strategy=ignore
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
----
Expand Down Expand Up @@ -1078,6 +1080,87 @@ Note that the topic configuration needs to reference full name of topics:
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
----

==== Configuring access to StreamNative Cloud

StreamNative Cloud is a fully managed Pulsar-as-a-Service available in different deployment options,
whether it is fully-hosted, on a public cloud but managed by StreamNative or self-managed on Kubernetes.

The StreamNative Pulsar clusters use Oauth2 authentication,
so you need to make sure that a https://docs.streamnative.io/docs/service-account[service account] exists with
required https://docs.streamnative.io/docs/access-control#authorize-namespaces[permissions to the Pulsar namespace/topic] your application is using.

Next, you need to download the **Key file** (which serves as **private key**) of the service account and note the **issuer URL** (typically `https://auth.streamnative.cloud/`)
and the **audience** (for example `urn:sn:pulsar:o-rf3ol:redhat`) for your cluster.
The **Pulsar Clients** page in the **Admin** section in the StreamNative Cloud console helps you with this process.

To configure your application with Pulsar Oauth2 authentication:

[source, properties]
----
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
----

Note that the `pulsar.client.authParams` configuration contains a Json string with `issuerUrl`, `audience` and the `privateKey` in the `data:application/json;base64,<base64-encoded-key-file>` format.

Alternatively you can configure the authentication programmatically:

[source, java]
----
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
----

This assumes that the key file is included to the application classpath as a resource, then the configuration would like the following:

[source, properties]
----
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
----

Note that channels using the client configuration identified with `pulsar-auth` need to set the `client-configuration` attribute.

[[pulsar-health-check]]
== Health Checks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public NativeImageConfigBuildItem pulsarRuntimeInitialized(
.builder(ClientConfigurationData.class.getName(),
ProducerConfigurationData.class.getName(),
ConsumerConfigurationData.class.getName(),
"org.apache.pulsar.client.impl.auth.oauth2.KeyFile",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenError",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest",
"org.apache.pulsar.client.api.url.DataURLStreamHandler",
"com.google.protobuf.GeneratedMessageV3",
"org.apache.pulsar.common.protocol.schema.ProtobufNativeSchemaData",
"org.apache.pulsar.client.impl.schema.ProtobufNativeSchema$ProtoBufParsingInfo",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package io.quarkus.pulsar.runtime.graal;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URISyntaxException;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;

import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.impl.auth.oauth2.KeyFile;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.RecomputeFieldValue;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;
import com.scurrilous.circe.checksum.IntHash;
import com.scurrilous.circe.checksum.Java8IntHash;
Expand All @@ -14,3 +25,31 @@ final class Target_com_scurrilous_circe_checksum_Crc32cIntChecksum {
private static IntHash CRC32C_HASH = new Java8IntHash();

}

@TargetClass(className = "org.apache.pulsar.client.impl.auth.oauth2.ClientCredentialsFlow")
final class Target_org_apache_pulsaR_client_impl_auth_oauth2_ClientCredentialsFlow {

@Substitute
private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
try {
URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
try {
String protocol = urlConnection.getURL().getProtocol();
if ("data".equals(protocol) && !"application/json".equals(urlConnection.getContentType())) {
throw new IllegalArgumentException(
"Unsupported media type or encoding format: " + urlConnection.getContentType());
}
KeyFile privateKey;
try (Reader r = new InputStreamReader(urlConnection.getInputStream(), StandardCharsets.UTF_8)) {
privateKey = KeyFile.fromJson(r);
}
return privateKey;
} finally {
IOUtils.close(urlConnection);
}
} catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
throw new IOException("Invalid privateKey format", e);
}
}

}

0 comments on commit 35b1b74

Please sign in to comment.