Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *
http://www.apache.org/licenses/LICENSE-2.0 + * + *
Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.appengine.pubsub; + +/** + * A message captures information from the Pubsub message received over the push endpoint and is + * persisted in storage. + */ +public class Message { + private String messageId; + private String publishTime; + private String data; + + public Message(String messageId) { + this.messageId = messageId; + } + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getPublishTime() { + return publishTime; + } + + public void setPublishTime(String publishTime) { + this.publishTime = publishTime; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } +} diff --git a/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/MessageRepository.java b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/MessageRepository.java new file mode 100644 index 00000000000..d68e210f560 --- /dev/null +++ b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/MessageRepository.java @@ -0,0 +1,30 @@ +/** + * Copyright 2017 Google Inc. + * + *
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *
http://www.apache.org/licenses/LICENSE-2.0 + * + *
Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.appengine.pubsub;
+
+import java.util.List;
+
+public interface MessageRepository {
+
+ /** Save message to persistent storage. */
+ void save(Message message);
+
+ /**
+ * Retrieve most recent stored messages.
+ * @param limit number of messages
+ * @return list of messages
+ */
+ List Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.appengine.pubsub;
+
+import com.google.cloud.datastore.Datastore;
+import com.google.cloud.datastore.DatastoreOptions;
+import com.google.cloud.datastore.Entity;
+import com.google.cloud.datastore.Key;
+import com.google.cloud.datastore.KeyFactory;
+import com.google.cloud.datastore.Query;
+import com.google.cloud.datastore.QueryResults;
+import com.google.cloud.datastore.StructuredQuery;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Storage for Message objects using Cloud Datastore. */
+public class MessageRepositoryImpl implements MessageRepository {
+
+ private static MessageRepositoryImpl instance;
+
+ private String messagesKind = "messages";
+ private KeyFactory keyFactory = getDatastoreInstance().newKeyFactory().setKind(messagesKind);
+
+ @Override
+ public void save(Message message) {
+ // Save message to "messages"
+ Datastore datastore = getDatastoreInstance();
+ Key key = datastore.allocateId(keyFactory.newKey());
+
+ Entity.Builder messageEntityBuilder = Entity.newBuilder(key)
+ .set("messageId", message.getMessageId());
+
+ if (message.getData() != null) {
+ messageEntityBuilder = messageEntityBuilder.set("data", message.getData());
+ }
+
+ if (message.getPublishTime() != null) {
+ messageEntityBuilder = messageEntityBuilder.set("publishTime", message.getPublishTime());
+ }
+ datastore.put(messageEntityBuilder.build());
+ }
+
+ @Override
+ public List Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.appengine.pubsub;
+
+import java.util.List;
+
+public class PubSubHome {
+
+ private static MessageRepository messageRepository = MessageRepositoryImpl.getInstance();
+ private static int MAX_MESSAGES = 10;
+
+ /**
+ * Retrieve received messages in html.
+ *
+ * @return html representation of messages (one per row)
+ */
+ public static String getReceivedMessages() {
+ List Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.appengine.pubsub;
+
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.http.HttpStatus;
+
+@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish")
+public class PubSubPublish extends HttpServlet {
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException, ServletException {
+ Publisher publisher = this.publisher;
+ try {
+ String topicId = System.getenv("PUBSUB_TOPIC");
+ // create a publisher on the topic
+ if (publisher == null) {
+ publisher = Publisher.defaultBuilder(
+ TopicName.create(ServiceOptions.getDefaultProjectId(), topicId))
+ .build();
+ }
+ // construct a pubsub message from the payload
+ final String payload = req.getParameter("payload");
+ PubsubMessage pubsubMessage =
+ PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();
+
+ publisher.publish(pubsubMessage);
+ // redirect to home page
+ resp.sendRedirect("/");
+ } catch (Exception e) {
+ resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ }
+
+ private Publisher publisher;
+
+ public PubSubPublish() { }
+
+ PubSubPublish(Publisher publisher) {
+ this.publisher = publisher;
+ }
+}
diff --git a/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPush.java b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPush.java
new file mode 100644
index 00000000000..225757a8744
--- /dev/null
+++ b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPush.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2017 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example.appengine.pubsub;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.stream.Collectors;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@WebServlet(value = "/pubsub/push")
+public class PubSubPush extends HttpServlet {
+
+ @Override
+ public void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws IOException, ServletException {
+ String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN");
+ // Do not process message if request token does not match pubsubVerificationToken
+ if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) {
+ resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ return;
+ }
+ // parse message object from "message" field in the request body json
+ // decode message data from base64
+ Message message = getMessage(req);
+ try {
+ messageRepository.save(message);
+ // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system
+ resp.setStatus(102);
+ } catch (Exception e) {
+ resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private Message getMessage(HttpServletRequest request) throws IOException {
+ String requestBody = request.getReader().lines().collect(Collectors.joining("\n"));
+ JsonElement jsonRoot = jsonParser.parse(requestBody);
+ String messageStr = jsonRoot.getAsJsonObject().get("message").toString();
+ Message message = gson.fromJson(messageStr, Message.class);
+ // decode from base64
+ String decoded = decode(message.getData());
+ message.setData(decoded);
+ return message;
+ }
+
+ private String decode(String data) {
+ return new String(Base64.getDecoder().decode(data));
+ }
+
+ private final Gson gson = new Gson();
+ private final JsonParser jsonParser = new JsonParser();
+ private MessageRepository messageRepository;
+
+ PubSubPush(MessageRepository messageRepository) {
+ this.messageRepository = messageRepository;
+ }
+
+ public PubSubPush() {
+ this.messageRepository = MessageRepositoryImpl.getInstance();
+ }
+}
diff --git a/appengine-java8/pubsub/src/main/webapp/WEB-INF/appengine-web.xml b/appengine-java8/pubsub/src/main/webapp/WEB-INF/appengine-web.xml
new file mode 100644
index 00000000000..13cefc05511
--- /dev/null
+++ b/appengine-java8/pubsub/src/main/webapp/WEB-INF/appengine-web.xml
@@ -0,0 +1,9 @@
+");
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+
+ private PubSubHome() { }
+}
diff --git a/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPublish.java b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPublish.java
new file mode 100644
index 00000000000..72f15161702
--- /dev/null
+++ b/appengine-java8/pubsub/src/main/java/com/example/appengine/pubsub/PubSubPublish.java
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2017 Google Inc.
+ *
+ * " + message.getMessageId() + " ");
+ sb.append("" + message.getData() + " ");
+ sb.append("" + message.getPublishTime() + " ");
+ sb.append(" Publish a message
+
+ Last received messages
+
+
+
+
+
+ <%= PubSubHome.getReceivedMessages() %>
+ Id
+ Data
+ PublishTime
+