Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump smallrye-reactive-messaging.version from 4.7.0 to 4.8.0 #34716

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.5.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.7.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.8.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.3.1</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
85 changes: 84 additions & 1 deletion docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,12 @@ The Quarkus Pulsar extension supports 4 strategies:
The negative acknowledgment can be further configured using `negativeAckRedeliveryDelayMicros` and `negativeAck.redeliveryBackoff` properties.
- `fail` fail the application, no more messages will be processed.
- `ignore` the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.
- `continue` the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with <<ack-timeout>> configuration.
- `reconsume-later` sends the message to the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic[retry letter topic] using the `reconsumeLater` API to be reconsumed with a delay.
The delay can be configured using the `reconsumeLater.delay` property and defaults to 3 seconds.
Custom delay or properties per message can be configured by adding an instance of `io.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata` to the failure metadata.

[[ack-timeout]]
==== Acknowledgement timeout

Similar to the negative acknowledgement, with the https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#acknowledgment-timeout[acknowledgement timeout] mechanism, the Pulsar client tracks the unacknowledged messages,
Expand All @@ -307,7 +309,7 @@ The `ackTimeout.redeliveryBackoff` value accepts comma separated values of min d

[source, properties]
----
mp.messaging.incoming.out.failure-strategy=ignore
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
----
Expand Down Expand Up @@ -1078,6 +1080,87 @@ Note that the topic configuration needs to reference full name of topics:
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
----

==== Configuring access to StreamNative Cloud

StreamNative Cloud is a fully managed Pulsar-as-a-Service available in different deployment options,
whether it is fully-hosted, on a public cloud but managed by StreamNative or self-managed on Kubernetes.

The StreamNative Pulsar clusters use Oauth2 authentication,
so you need to make sure that a https://docs.streamnative.io/docs/service-account[service account] exists with
required https://docs.streamnative.io/docs/access-control#authorize-namespaces[permissions to the Pulsar namespace/topic] your application is using.

Next, you need to download the **Key file** (which serves as **private key**) of the service account and note the **issuer URL** (typically `https://auth.streamnative.cloud/`)
and the **audience** (for example `urn:sn:pulsar:o-rf3ol:redhat`) for your cluster.
The **Pulsar Clients** page in the **Admin** section in the StreamNative Cloud console helps you with this process.

To configure your application with Pulsar Oauth2 authentication:

[source, properties]
----
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
----

Note that the `pulsar.client.authParams` configuration contains a Json string with `issuerUrl`, `audience` and the `privateKey` in the `data:application/json;base64,<base64-encoded-key-file>` format.

Alternatively you can configure the authentication programmatically:

[source, java]
----
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
----

This assumes that the key file is included to the application classpath as a resource, then the configuration would like the following:

[source, properties]
----
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
----

Note that channels using the client configuration identified with `pulsar-auth` need to set the `client-configuration` attribute.

[[pulsar-health-check]]
== Health Checks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public NativeImageConfigBuildItem pulsarRuntimeInitialized(
.builder(ClientConfigurationData.class.getName(),
ProducerConfigurationData.class.getName(),
ConsumerConfigurationData.class.getName(),
"org.apache.pulsar.client.impl.auth.oauth2.KeyFile",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenError",
"org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest",
"org.apache.pulsar.client.api.url.DataURLStreamHandler",
"com.google.protobuf.GeneratedMessageV3",
"org.apache.pulsar.common.protocol.schema.ProtobufNativeSchemaData",
"org.apache.pulsar.client.impl.schema.ProtobufNativeSchema$ProtoBufParsingInfo",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package io.quarkus.pulsar.runtime.graal;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URISyntaxException;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;

import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.impl.auth.oauth2.KeyFile;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.RecomputeFieldValue;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;
import com.scurrilous.circe.checksum.IntHash;
import com.scurrilous.circe.checksum.Java8IntHash;
Expand All @@ -14,3 +25,31 @@ final class Target_com_scurrilous_circe_checksum_Crc32cIntChecksum {
private static IntHash CRC32C_HASH = new Java8IntHash();

}

@TargetClass(className = "org.apache.pulsar.client.impl.auth.oauth2.ClientCredentialsFlow")
final class Target_org_apache_pulsaR_client_impl_auth_oauth2_ClientCredentialsFlow {

@Substitute
private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
try {
URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
try {
String protocol = urlConnection.getURL().getProtocol();
if ("data".equals(protocol) && !"application/json".equals(urlConnection.getContentType())) {
throw new IllegalArgumentException(
"Unsupported media type or encoding format: " + urlConnection.getContentType());
}
KeyFile privateKey;
try (Reader r = new InputStreamReader(urlConnection.getInputStream(), StandardCharsets.UTF_8)) {
privateKey = KeyFile.fromJson(r);
}
return privateKey;
} finally {
IOUtils.close(urlConnection);
}
} catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
throw new IOException("Invalid privateKey format", e);
}
}

}