diff --git a/README.md b/README.md
index c74e77759..1412051bb 100644
--- a/README.md
+++ b/README.md
@@ -240,6 +240,8 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m
| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
+| Native Image Pub Sub Sample | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) |
+| Publish Operations | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/utilities/PublishOperations.java) |
| Create Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) |
| Create Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) |
| Create Pull Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) |
diff --git a/samples/native-image-sample/README.md b/samples/native-image-sample/README.md
new file mode 100644
index 000000000..ace93bd8e
--- /dev/null
+++ b/samples/native-image-sample/README.md
@@ -0,0 +1,77 @@
+# Pub/Sub Sample Application with Native Image
+
+The Pub/Sub sample application demonstrates some common operations with Pub/Sub and is compatible with Native Image compilation.
+
+## Setup Instructions
+
+You will need to follow these prerequisite steps in order to run the samples:
+
+1. If you have not already, [create a Google Cloud Platform Project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#creating_a_project).
+
+2. Install the [Google Cloud SDK](https://cloud.google.com/sdk/) which will allow you to run the sample with your project's credentials.
+
+ Once installed, log in with Application Default Credentials using the following command:
+
+ ```
+ gcloud auth application-default login
+ ```
+
+ **Note:** Authenticating with Application Default Credentials is convenient to use during development, but we recommend [alternate methods of authentication](https://cloud.google.com/docs/authentication/production) during production use.
+
+3. Install the GraalVM compiler.
+
+ You can follow the [official installation instructions](https://www.graalvm.org/docs/getting-started/#install-graalvm) from the GraalVM website.
+ After following the instructions, ensure that you install the native image extension installed by running:
+
+ ```
+ gu install native-image
+ ```
+
+ Once you finish following the instructions, verify that the default version of Java is set to the GraalVM version by running `java -version` in a terminal.
+
+ You will see something similar to the below output:
+
+ ```
+ $ java -version
+
+ openjdk version "11.0.7" 2020-04-14
+ OpenJDK Runtime Environment GraalVM CE 20.1.0 (build 11.0.7+10-jvmci-20.1-b02)
+ OpenJDK 64-Bit Server VM GraalVM CE 20.1.0 (build 11.0.7+10-jvmci-20.1-b02, mixed mode, sharing)
+ ```
+
+4. [Enable the Pub/Sub APIs](https://console.cloud.google.com/apis/api/pubsub.googleapis.com).
+
+### Sample
+
+Navigate to this directory in a new terminal.
+
+1. Compile the application using the Native Image Compiler. This step may take a few minutes.
+
+ ```
+ mvn package -P native -DskipTests
+ ```
+
+2. Run the application:
+
+ ```
+ ./target/native-image-sample
+ ```
+
+3. The application will create a new Pub/Sub topic, send and receive a message from it, and then delete the topic.
+
+ ```
+ Created topic: projects/YOUR_PROJECT_ID/topics/graal-pubsub-test-00e72640-4e36-4aff-84d2-13b7569b2289 under project: YOUR_PROJECT_ID
+ Created pull subscription: projects/YOUR_PROJECT_ID/subscriptions/graal-pubsub-test-sub2fb5e3f3-cb26-439b-b88c-9cb0cfca9e45
+ Published message with ID: 457327433078420
+ Received Payload: Pub/Sub Native Image Test published message at timestamp: 2020-09-23T19:45:42.746514Z
+ Deleted topic projects/YOUR_PROJECT_ID/topics/graal-pubsub-test-00e72640-4e36-4aff-84d2-13b7569b2289
+ Deleted subscription projects/YOUR_PROJECT_ID/subscriptions/graal-pubsub-test-sub2fb5e3f3-cb26-439b-b88c-9cb0cfca9e45
+ ```
+
+## Sample Integration Test with native Image Support
+
+In order to run the sample integration test as a native image, call the following command:
+
+ ```
+ mvn test -Pnative
+ ```
\ No newline at end of file
diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml
new file mode 100644
index 000000000..afdfa8e56
--- /dev/null
+++ b/samples/native-image-sample/pom.xml
@@ -0,0 +1,175 @@
+
+
+ 4.0.0
+ com.google.cloud
+ native-image-sample
+ Native Image Sample
+ https://github.com/googleapis/java-pubsub
+
+
+
+ com.google.cloud.samples
+ shared-configuration
+ 1.2.0
+
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+
+ com.google.cloud
+ libraries-bom
+ 24.3.0
+ pom
+ import
+
+
+
+
+
+
+ com.google.cloud
+ google-cloud-core
+
+
+ com.google.cloud
+ google-cloud-pubsub
+
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ com.google.truth
+ truth
+ 1.1.3
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ true
+ dependency-jars/
+ pubsub.NativeImagePubSubSample
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.2.0
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+
+ ${project.build.directory}/dependency-jars/
+
+
+
+
+
+
+
+
+
+
+
+ native
+
+
+
+ com.google.cloud
+ native-image-support
+ 0.12.4
+
+
+ org.junit.vintage
+ junit-vintage-engine
+ 5.8.2
+ test
+
+
+ org.graalvm.buildtools
+ junit-platform-native
+ 0.9.9
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 2.22.2
+
+
+ **/*IT
+
+
+
+
+ org.graalvm.buildtools
+ native-maven-plugin
+ 0.9.9
+ true
+
+ pubsub.NativeImagePubSubSample
+
+
+ --no-fallback
+ --no-server
+ --features=com.google.cloud.nativeimage.features.ProtobufMessageFeature
+
+
+
+
+ build-native
+
+ build
+ test
+
+ package
+
+
+ test-native
+
+ test
+
+ test
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java b/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java
new file mode 100644
index 000000000..2e96091a6
--- /dev/null
+++ b/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java
@@ -0,0 +1,385 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsub;
+
+import com.google.api.gax.rpc.NotFoundException;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.iam.v1.GetIamPolicyRequest;
+import com.google.iam.v1.Policy;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.iam.v1.TestIamPermissionsResponse;
+import com.google.protobuf.FieldMask;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.DeadLetterPolicy;
+import com.google.pubsub.v1.DetachSubscriptionRequest;
+import com.google.pubsub.v1.ProjectName;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.ReceivedMessage;
+import com.google.pubsub.v1.Subscription;
+import com.google.pubsub.v1.Topic;
+import com.google.pubsub.v1.TopicName;
+import com.google.pubsub.v1.UpdateSubscriptionRequest;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import utilities.PublishOperations;
+
+/** Pub/Sub sample application compiled with Native Image. */
+public class NativeImagePubSubSample {
+
+ /** Driver for the Pub/Sub Sample application which publishes a message to a specified topic. */
+ public static void main(String[] args) throws Exception {
+ Instant startTime = Instant.now();
+ String projectId = ServiceOptions.getDefaultProjectId();
+
+ String topicId = "native-pubsub-test-" + UUID.randomUUID().toString();
+ String pullSubId = "native-pubsub-test-sub" + UUID.randomUUID().toString();
+ String pushSubId = "native-pubsub-test-sub" + UUID.randomUUID().toString();
+
+ try {
+ // Topic management operations
+ createTopic(projectId, topicId);
+ createPullSubscription(projectId, pullSubId, topicId);
+ createPushSubscription(projectId, pushSubId, topicId);
+ detachSubscription(projectId, pushSubId);
+ getTopicPolicy(projectId, topicId);
+ getSubscriptionPolicy(projectId, pullSubId);
+ listSubscriptionInProject(projectId);
+ listSubscriptionInTopic(projectId, topicId);
+ listTopics(projectId);
+ updateSubscriptionDeadLetterTopic(projectId, pushSubId, topicId, topicId);
+ testTopicPermissions(projectId, topicId);
+ testSubscriptionPermissions(projectId, pushSubId);
+
+ // Publish Operations
+ PublishOperations.publishMessage(projectId, topicId);
+ PublishOperations.publishWithBatchSettings(projectId, topicId);
+ PublishOperations.publishWithCustomAttributes(projectId, topicId);
+ PublishOperations.publishWithErrorHandler(projectId, topicId);
+
+ // Receive messages
+ subscribeSync(projectId, pullSubId);
+ receiveMessagesWithDeliveryAttempts(projectId, pullSubId);
+ } finally {
+ deleteTopic(projectId, topicId);
+ deleteSubscription(projectId, pullSubId);
+ deleteSubscription(projectId, pushSubId);
+ }
+ Instant endTime = Instant.now();
+ Duration duration = Duration.between(startTime, endTime);
+ System.out.println("Duration: " + duration.toString());
+ }
+
+ static void createTopic(String projectId, String topicId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ Topic topic = topicAdminClient.createTopic(topicName);
+ System.out.println("Created topic: " + topic.getName() + " under project: " + projectId);
+ }
+ }
+
+ static void createPullSubscription(String projectId, String subscriptionId, String topicId)
+ throws IOException {
+
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+ Subscription subscription =
+ subscriptionAdminClient.createSubscription(
+ subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
+ System.out.println("Created pull subscription: " + subscription.getName());
+ }
+ }
+
+ static void createPushSubscription(String projectId, String subscriptionId, String topicId)
+ throws IOException {
+
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ // Intentionally set pushEndpoint empty just to exercise API call
+ PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint("").build();
+
+ Subscription subscription =
+ subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10);
+ System.out.println("Created push subscription: " + subscription.getName());
+ }
+ }
+
+ static void detachSubscription(String projectId, String subscriptionId) throws IOException {
+
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ topicAdminClient.detachSubscription(
+ DetachSubscriptionRequest.newBuilder()
+ .setSubscription(subscriptionName.toString())
+ .build());
+ }
+
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ Subscription subscription = subscriptionAdminClient.getSubscription(subscriptionName);
+ if (subscription.getDetached()) {
+ System.out.println("Subscription is detached.");
+ } else {
+ throw new RuntimeException("Subscription detachment was not successful.");
+ }
+ }
+ }
+
+ static void getSubscriptionPolicy(String projectId, String subscriptionId) throws IOException {
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+ GetIamPolicyRequest getIamPolicyRequest =
+ GetIamPolicyRequest.newBuilder().setResource(subscriptionName.toString()).build();
+ Policy policy = subscriptionAdminClient.getIamPolicy(getIamPolicyRequest);
+ System.out.println("Subscription policy: " + policy.toString().trim());
+ }
+ }
+
+ static void getTopicPolicy(String projectId, String topicId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ GetIamPolicyRequest getIamPolicyRequest =
+ GetIamPolicyRequest.newBuilder().setResource(topicName.toString()).build();
+ Policy policy = topicAdminClient.getIamPolicy(getIamPolicyRequest);
+ System.out.println("Topic policy: " + policy.toString().trim());
+ }
+ }
+
+ static void listSubscriptionInProject(String projectId) throws IOException {
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ ProjectName projectName = ProjectName.of(projectId);
+ int count = 0;
+ for (Subscription subscription :
+ subscriptionAdminClient.listSubscriptions(projectName).iterateAll()) {
+ count += 1;
+ }
+ System.out.println("Subscriptions in project count: " + count);
+ }
+ }
+
+ static void listSubscriptionInTopic(String projectId, String topicId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ int count = 0;
+ for (String subscription : topicAdminClient.listTopicSubscriptions(topicName).iterateAll()) {
+ count += 1;
+ }
+ System.out.println("Subscriptions under topic: " + count);
+ }
+ }
+
+ static void listTopics(String projectId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ ProjectName projectName = ProjectName.of(projectId);
+ int count = 0;
+ for (Topic topic : topicAdminClient.listTopics(projectName).iterateAll()) {
+ count += 1;
+ }
+ System.out.println("Topic count under project: " + count);
+ }
+ }
+
+ static void receiveMessagesWithDeliveryAttempts(String projectId, String subscriptionId) {
+
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ // Instantiate an asynchronous message receiver.
+ MessageReceiver receiver =
+ new MessageReceiver() {
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ consumer.ack();
+ }
+ };
+
+ Subscriber subscriber = null;
+ try {
+ subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
+ // Start the subscriber.
+ subscriber.startAsync().awaitRunning();
+ System.out.println("Successfully started an async message receiver.");
+ } finally {
+ // Shut down the subscriber after 10s. Stop receiving messages.
+ subscriber.stopAsync();
+ }
+ }
+
+ static void subscribeSync(String projectId, String subscriptionId) throws IOException {
+ SubscriberStubSettings subscriberStubSettings =
+ SubscriberStubSettings.newBuilder()
+ .setTransportChannelProvider(
+ SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
+ .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
+ .build())
+ .build();
+
+ try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
+ String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
+ PullRequest pullRequest =
+ PullRequest.newBuilder().setMaxMessages(1).setSubscription(subscriptionName).build();
+
+ PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
+ List ackIds = new ArrayList<>();
+ for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
+ String payload = message.getMessage().getData().toStringUtf8();
+ ackIds.add(message.getAckId());
+ System.out.println("Received Payload: " + payload);
+ }
+
+ AcknowledgeRequest acknowledgeRequest =
+ AcknowledgeRequest.newBuilder()
+ .setSubscription(subscriptionName)
+ .addAllAckIds(ackIds)
+ .build();
+
+ subscriber.acknowledgeCallable().call(acknowledgeRequest);
+ }
+ }
+
+ static void updateSubscriptionDeadLetterTopic(
+ String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
+ throws IOException {
+
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ TopicName topicName = TopicName.of(projectId, topicId);
+ TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);
+
+ DeadLetterPolicy deadLetterPolicy =
+ DeadLetterPolicy.newBuilder()
+ .setDeadLetterTopic(deadLetterTopicName.toString())
+ .setMaxDeliveryAttempts(20)
+ .build();
+
+ Subscription subscription =
+ Subscription.newBuilder()
+ .setName(subscriptionName.toString())
+ .setTopic(topicName.toString())
+ .setDeadLetterPolicy(deadLetterPolicy)
+ .build();
+
+ FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy").build();
+
+ UpdateSubscriptionRequest request =
+ UpdateSubscriptionRequest.newBuilder()
+ .setSubscription(subscription)
+ .setUpdateMask(updateMask)
+ .build();
+
+ Subscription response = subscriptionAdminClient.updateSubscription(request);
+ System.out.println("Updated subscription " + response.getName());
+ }
+ }
+
+ static void testSubscriptionPermissions(String projectId, String subscriptionId)
+ throws IOException {
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+
+ List permissions = new ArrayList<>();
+ permissions.add("pubsub.subscriptions.consume");
+ permissions.add("pubsub.subscriptions.update");
+
+ TestIamPermissionsRequest testIamPermissionsRequest =
+ TestIamPermissionsRequest.newBuilder()
+ .setResource(subscriptionName.toString())
+ .addAllPermissions(permissions)
+ .build();
+
+ TestIamPermissionsResponse testedPermissionsResponse =
+ subscriptionAdminClient.testIamPermissions(testIamPermissionsRequest);
+
+ System.out.println(
+ "Tested PubSub subscription permissions\n" + testedPermissionsResponse.toString().trim());
+ }
+ }
+
+ static void testTopicPermissions(String projectId, String topicId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
+
+ List permissions = new ArrayList<>();
+ permissions.add("pubsub.topics.attachSubscription");
+ permissions.add("pubsub.topics.publish");
+ permissions.add("pubsub.topics.update");
+
+ TestIamPermissionsRequest testIamPermissionsRequest =
+ TestIamPermissionsRequest.newBuilder()
+ .setResource(topicName.toString())
+ .addAllPermissions(permissions)
+ .build();
+
+ TestIamPermissionsResponse testedPermissionsResponse =
+ topicAdminClient.testIamPermissions(testIamPermissionsRequest);
+
+ System.out.println(
+ "Tested topic permissions\n" + testedPermissionsResponse.toString().trim());
+ }
+ }
+
+ static void deleteTopic(String projectId, String topicId) throws IOException {
+ try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
+ TopicName topicName = TopicName.of(projectId, topicId);
+ try {
+ topicAdminClient.deleteTopic(topicName);
+ System.out.println("Deleted topic " + topicName);
+ } catch (NotFoundException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+
+ static void deleteSubscription(String projectId, String subscriptionId) throws IOException {
+ try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+ try {
+ subscriptionAdminClient.deleteSubscription(subscriptionName);
+ System.out.println("Deleted subscription " + subscriptionName);
+ } catch (NotFoundException e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/samples/native-image-sample/src/main/java/utilities/PublishOperations.java b/samples/native-image-sample/src/main/java/utilities/PublishOperations.java
new file mode 100644
index 000000000..068312025
--- /dev/null
+++ b/samples/native-image-sample/src/main/java/utilities/PublishOperations.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utilities;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/** Sample methods for Publishing messages to a topic in Pub/Sub. */
+public class PublishOperations {
+
+ public static void publishMessage(String projectId, String topicId) throws Exception {
+
+ Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId)).build();
+
+ try {
+ String message = "Pub/Sub Native Image Test published message at timestamp: " + Instant.now();
+ ByteString data = ByteString.copyFromUtf8(message);
+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+
+ publisher.publish(pubsubMessage);
+
+ ApiFuture messageIdFuture = publisher.publish(pubsubMessage);
+ String messageId = messageIdFuture.get();
+
+ System.out.println("Published message with ID: " + messageId);
+ } finally {
+ publisher.shutdown();
+ }
+ }
+
+ public static void publishWithCustomAttributes(String projectId, String topicId)
+ throws Exception {
+
+ TopicName topicName = TopicName.of(projectId, topicId);
+ Publisher publisher = Publisher.newBuilder(topicName).build();
+
+ try {
+ String message = "first message";
+ ByteString data = ByteString.copyFromUtf8(message);
+ PubsubMessage pubsubMessage =
+ PubsubMessage.newBuilder()
+ .setData(data)
+ .putAllAttributes(Collections.singletonMap("year", "2020"))
+ .build();
+
+ // Once published, returns a server-assigned message id (unique within the topic)
+ ApiFuture messageIdFuture = publisher.publish(pubsubMessage);
+ String messageId = messageIdFuture.get();
+ System.out.println("Published a message with custom attributes: " + messageId);
+ } finally {
+ publisher.shutdown();
+ }
+ }
+
+ public static void publishWithBatchSettings(String projectId, String topicId)
+ throws IOException, ExecutionException, InterruptedException {
+
+ TopicName topicName = TopicName.of(projectId, topicId);
+ Publisher publisher = Publisher.newBuilder(topicName).build();
+ List> messageIdFutures = new ArrayList<>();
+
+ try {
+ // schedule publishing one message at a time : messages get automatically batched
+ for (int i = 0; i < 100; i++) {
+ String message = "message " + i;
+ ByteString data = ByteString.copyFromUtf8(message);
+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+
+ // Once published, returns a server-assigned message id (unique within the topic)
+ ApiFuture messageIdFuture = publisher.publish(pubsubMessage);
+ messageIdFutures.add(messageIdFuture);
+ }
+ } finally {
+ // Wait on any pending publish requests.
+ List messageIds = ApiFutures.allAsList(messageIdFutures).get();
+ System.out.println("Published " + messageIds.size() + " messages with batch settings.");
+
+ publisher.shutdown();
+ }
+ }
+
+ public static void publishWithErrorHandler(String projectId, String topicId) throws IOException {
+
+ TopicName topicName = TopicName.of(projectId, topicId);
+ Publisher publisher = null;
+
+ try {
+ // Create a publisher instance with default settings bound to the topic
+ publisher = Publisher.newBuilder(topicName).build();
+
+ List messages = Arrays.asList("first message", "second message");
+
+ for (final String message : messages) {
+ ByteString data = ByteString.copyFromUtf8(message);
+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
+
+ // Once published, returns a server-assigned message id (unique within the topic)
+ ApiFuture future = publisher.publish(pubsubMessage);
+
+ // Add an asynchronous callback to handle success / failure
+ ApiFutures.addCallback(
+ future,
+ new ApiFutureCallback() {
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ if (throwable instanceof ApiException) {
+ ApiException apiException = ((ApiException) throwable);
+ // details on the API exception
+ System.out.println(apiException.getStatusCode().getCode());
+ System.out.println(apiException.isRetryable());
+ }
+ System.out.println("Error publishing message : " + message);
+ }
+
+ @Override
+ public void onSuccess(String messageId) {
+ // Once published, returns server-assigned message ids (unique within the topic)
+ System.out.println("Success Callback: Published message " + messageId);
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+ } finally {
+ if (publisher != null) {
+ // When finished with the publisher, shutdown to free up resources.
+ publisher.shutdown();
+ }
+ }
+ }
+}
diff --git a/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java b/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java
new file mode 100644
index 000000000..c221d735a
--- /dev/null
+++ b/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pubsub;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.ServiceOptions;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.UUID;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import utilities.PublishOperations;
+
+public class NativeImagePubSubSampleIT {
+
+ private static String TOPIC_ID = "native-pubsub-test-" + UUID.randomUUID();
+ private static String PULL_SUB_ID = "native-pubsub-test-sub" + UUID.randomUUID();
+ private static String PUSH_SUB_ID = "native-pubsub-test-sub" + UUID.randomUUID();
+ private static String PROJECT_ID = ServiceOptions.getDefaultProjectId();
+ private static final TopicName TOPIC_NAME = TopicName.of(PROJECT_ID, TOPIC_ID);
+ private static final SubscriptionName PULL_SUBSCRIPTION_NAME =
+ SubscriptionName.of(PROJECT_ID, PULL_SUB_ID);
+ private static final SubscriptionName PUSH_SUBSCRIPTION_NAME =
+ SubscriptionName.of(PROJECT_ID, PUSH_SUB_ID);
+
+ private ByteArrayOutputStream bout;
+ private PrintStream out;
+
+ @Before
+ public void setUp() {
+ bout = new ByteArrayOutputStream();
+ out = new PrintStream(bout);
+ System.setOut(out);
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ NativeImagePubSubSample.deleteTopic(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.deleteSubscription(PROJECT_ID, PULL_SUB_ID);
+ NativeImagePubSubSample.deleteSubscription(PROJECT_ID, PUSH_SUB_ID);
+ }
+
+ @Test
+ public void testRunTopicManagementOperations() throws IOException {
+ // Topic management operations
+ NativeImagePubSubSample.createTopic(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.createPullSubscription(PROJECT_ID, PULL_SUB_ID, TOPIC_ID);
+ NativeImagePubSubSample.createPushSubscription(PROJECT_ID, PUSH_SUB_ID, TOPIC_ID);
+ NativeImagePubSubSample.detachSubscription(PROJECT_ID, PUSH_SUB_ID);
+ NativeImagePubSubSample.getTopicPolicy(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.getSubscriptionPolicy(PROJECT_ID, PULL_SUB_ID);
+ NativeImagePubSubSample.listSubscriptionInProject(PROJECT_ID);
+ NativeImagePubSubSample.listSubscriptionInTopic(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.listTopics(PROJECT_ID);
+ NativeImagePubSubSample.updateSubscriptionDeadLetterTopic(
+ PROJECT_ID, PUSH_SUB_ID, TOPIC_ID, TOPIC_ID);
+ NativeImagePubSubSample.testTopicPermissions(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.testSubscriptionPermissions(PROJECT_ID, PUSH_SUB_ID);
+
+ // Verify create topic and subscriptions
+ assertThat(bout.toString())
+ .contains("Created topic: " + TOPIC_NAME.toString() + " under project: " + PROJECT_ID);
+ assertThat(bout.toString())
+ .contains("Created pull subscription: " + PULL_SUBSCRIPTION_NAME.toString());
+ assertThat(bout.toString())
+ .contains("Created push subscription: " + PUSH_SUBSCRIPTION_NAME.toString());
+
+ // Verify detach subscription
+ assertThat(bout.toString()).contains("Subscription is detached");
+
+ // Verify topic and subscription IAM policy
+ assertThat(bout.toString()).contains("Topic policy: etag: \"\\000 \\001");
+ assertThat(bout.toString()).contains("Subscription policy: etag: \"\\000 \\001\"");
+
+ // Verify listing of subscriptions and topics
+ assertThat(bout.toString()).contains("Subscriptions in project count:");
+ assertThat(bout.toString()).contains("Subscriptions under topic:");
+ assertThat(bout.toString()).contains("Topic count under project:");
+
+ // Verify update of subscription
+ assertThat(bout.toString()).contains("Updated subscription " + PUSH_SUBSCRIPTION_NAME);
+
+ // Verify topic permissions
+ assertThat(bout.toString()).contains("Tested topic permissions");
+ assertThat(bout.toString()).contains("permissions: \"pubsub.topics.attachSubscription\"");
+ assertThat(bout.toString()).contains("permissions: \"pubsub.topics.publish\"");
+ assertThat(bout.toString()).contains("permissions: \"pubsub.topics.update\"");
+
+ // Verify subscription permissions
+ assertThat(bout.toString()).contains("Tested PubSub subscription permissions");
+ assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.consume\"");
+ assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.update\"");
+ }
+
+ @Test
+ public void testPublishAndSubscribe() throws Exception {
+ NativeImagePubSubSample.createTopic(PROJECT_ID, TOPIC_ID);
+ NativeImagePubSubSample.createPullSubscription(PROJECT_ID, PULL_SUB_ID, TOPIC_ID);
+
+ bout.reset();
+
+ // Publish
+ PublishOperations.publishMessage(PROJECT_ID, TOPIC_ID);
+ PublishOperations.publishWithBatchSettings(PROJECT_ID, TOPIC_ID);
+ PublishOperations.publishWithCustomAttributes(PROJECT_ID, TOPIC_ID);
+ PublishOperations.publishWithErrorHandler(PROJECT_ID, TOPIC_ID);
+
+ // Subscribe
+ NativeImagePubSubSample.subscribeSync(PROJECT_ID, PULL_SUB_ID);
+ NativeImagePubSubSample.receiveMessagesWithDeliveryAttempts(PROJECT_ID, PULL_SUB_ID);
+
+ assertThat(bout.toString()).contains("Published message with ID");
+ assertThat(bout.toString()).contains("Published 100 messages with batch settings.");
+ assertThat(bout.toString()).contains("Published a message with custom attributes");
+ assertThat(bout.toString()).contains("Success Callback: Published message");
+ assertThat(bout.toString()).contains("Success Callback: Published message");
+ assertThat(bout.toString()).contains("Received Payload");
+ assertThat(bout.toString()).contains("Successfully started an async message receiver");
+ }
+}
diff --git a/samples/pom.xml b/samples/pom.xml
index c3399fdec..e138ae3bc 100644
--- a/samples/pom.xml
+++ b/samples/pom.xml
@@ -46,6 +46,7 @@
install-without-bom
snapshot
snippets
+ native-image-sample