From fa7bb5043bdf78f93bab59e4481a1e9435d8e9b7 Mon Sep 17 00:00:00 2001 From: James Roper Date: Tue, 30 Jun 2020 16:19:38 +1000 Subject: [PATCH] Adds Java Controller and Metadata support I've only added metadata support inputs for event sourcing and crdts, I haven't added it for outputs yet. Controller support is in full. --- build.sbt | 5 +- .../paradox/user/lang/java/gettingstarted.md | 3 +- .../io/cloudstate/javasupport/CloudEvent.java | 171 +++++++ .../io/cloudstate/javasupport/CloudState.java | 110 ++++- .../io/cloudstate/javasupport/Metadata.java | 254 ++++++++++ .../javasupport/MetadataContext.java | 7 + .../cloudstate/javasupport/ServiceCall.java | 7 + .../javasupport/ServiceCallRef.java | 14 +- .../javasupport/controller/CallHandler.java | 49 ++ .../javasupport/controller/Controller.java | 14 + .../controller/ControllerContext.java | 19 + .../controller/ControllerHandler.java | 55 +++ .../controller/ControllerReply.java | 75 +++ .../javasupport/controller/Effect.java | 35 ++ .../javasupport/controller/ForwardReply.java | 16 + .../controller/MessageEnvelope.java | 42 ++ .../javasupport/controller/MessageReply.java | 23 + .../javasupport/crdt/CommandContext.java | 5 +- .../crdt/StreamCancelledContext.java | 3 +- .../crdt/StreamedCommandContext.java | 4 +- .../eventsourced/CommandContext.java | 4 +- .../javasupport/CloudStateRunner.scala | 27 +- .../impl/EntityDiscoveryImpl.scala | 4 +- .../javasupport/impl/MetadataImpl.scala | 173 +++++++ .../javasupport/impl/ReflectionHelper.scala | 167 +++++-- .../impl/ResolvedServiceCallFactory.scala | 4 +- .../impl/ResolvedServiceMethod.scala | 13 +- .../AnnotationBasedControllerSupport.scala | 462 ++++++++++++++++++ .../impl/controller/ControllerImpl.scala | 286 +++++++++++ .../crdt/AnnotationBasedCrdtSupport.scala | 20 +- .../javasupport/impl/crdt/CrdtImpl.scala | 19 +- .../AnnotationBasedEventSourcedSupport.scala | 13 +- .../impl/eventsourced/EventSourcedImpl.scala | 8 +- .../javasupport/controllerspec.proto | 18 + ...AnnotationBasedControllerSupportSpec.scala | 401 +++++++++++++++ .../controller/ControllerServiceSpec.scala | 203 ++++++++ ...notationBasedEventSourcedSupportSpec.scala | 2 + 37 files changed, 2636 insertions(+), 99 deletions(-) create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/Metadata.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/CallHandler.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/Controller.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerContext.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerHandler.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerReply.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/Effect.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/ForwardReply.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/MessageEnvelope.java create mode 100644 java-support/src/main/java/io/cloudstate/javasupport/controller/MessageReply.java create mode 100644 java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala create mode 100644 java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupport.scala create mode 100644 java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/ControllerImpl.scala create mode 100644 java-support/src/test/proto/cloudstate/javasupport/controllerspec.proto create mode 100644 java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupportSpec.scala create mode 100644 java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/ControllerServiceSpec.scala diff --git a/build.sbt b/build.sbt index 27a36cebc..788edf4bb 100644 --- a/build.sbt +++ b/build.sbt @@ -613,7 +613,7 @@ lazy val `java-support` = (project in file("java-support")) "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.9.3" ), javacOptions in Compile ++= Seq("-encoding", "UTF-8"), - javacOptions in (Compile, compile) ++= Seq("-source", "1.8", "-target", "1.8"), + javacOptions in (Compile, compile) ++= Seq("-source", "11", "-target", "11"), akkaGrpcGeneratedSources in Compile := Seq(AkkaGrpc.Server), akkaGrpcGeneratedLanguages in Compile := Seq(AkkaGrpc.Scala), // FIXME should be Java, but here be dragons @@ -635,7 +635,8 @@ lazy val `java-support` = (project in file("java-support")) sbtprotoc.ProtocPlugin.protobufConfigSettings ++ Seq( PB.protoSources ++= { val baseDir = (baseDirectory in ThisBuild).value / "protocols" - Seq(baseDir / "example") + val testSources = sourceDirectory.value / "proto" + Seq(baseDir / "example", testSources) }, PB.targets := Seq( PB.gens.java -> crossTarget.value / "akka-grpc" / "test" diff --git a/docs/src/main/paradox/user/lang/java/gettingstarted.md b/docs/src/main/paradox/user/lang/java/gettingstarted.md index fca5b1aa0..893a1a5b7 100644 --- a/docs/src/main/paradox/user/lang/java/gettingstarted.md +++ b/docs/src/main/paradox/user/lang/java/gettingstarted.md @@ -180,4 +180,5 @@ Exactly which context parameters are available depend on the type of entity and |--------------------------------------------------------|-----------------------|-----------------------| | @javadoc[`Context`](io.cloudstate.javasupport.Context) | | The super type of all Cloudstate contexts. Every invoker makes a subtype of this available for injection, and method or constructor may accept that sub type, or any super type of that subtype that is a subtype of `Context`. | | `java.lang.String` | @javadoc[`@EntityId`](io.cloudstate.javasupport.EntityId) | The ID of the entity. | - +| @javadoc[`Metadata`](io.cloudstate.javasupport.Metadata) | | The metadata associated with the command. | +| @javadoc[`CloudEvent`](io.cloudstate.javasupport.CloudEvent) | | The CloudEvent metadata associated with the command. May be wrapped in `java.util.Optional`. | diff --git a/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java b/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java new file mode 100644 index 000000000..754d64a1f --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/CloudEvent.java @@ -0,0 +1,171 @@ +package io.cloudstate.javasupport; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.Optional; + +/** CloudEvent representation of Metadata. */ +public interface CloudEvent { + + /** + * The CloudEvent spec version. + * + * @return The spec version. + */ + String specversion(); + + /** + * The id of this CloudEvent. + * + * @return The id. + */ + String id(); + + /** + * Return a new CloudEvent with the given id. + * + * @param id The id to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withId(String id); + + /** + * The source of this CloudEvent. + * + * @return The source. + */ + URI source(); + + /** + * Return a new CloudEvent with the given source. + * + * @param source The source to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withSource(URI source); + + /** + * The type of this CloudEvent. + * + * @return The type. + */ + String type(); + + /** + * Return a new CloudEvent with the given type. + * + * @param type The type to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withType(String type); + + /** + * The data content type of this CloudEvent. + * + * @return The data content type, if set. + */ + Optional datacontenttype(); + + /** + * Return a new CloudEvent with the given data content type. + * + * @param datacontenttype The data content type to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withDatacontenttype(String datacontenttype); + + /** + * Clear the data content type of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearDatacontenttype(); + + /** + * The data schema of this CloudEvent. + * + * @return The data schema, if set. + */ + Optional dataschema(); + + /** + * Return a new CloudEvent with the given data schema. + * + * @param dataschema The data schema to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withDataschema(URI dataschema); + + /** + * Clear the data schema of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearDataschema(); + + /** + * The subject of this CloudEvent. + * + * @return The subject, if set. + */ + Optional subject(); + + /** + * Return a new CloudEvent with the given subject. + * + * @param subject The subject to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withSubject(String subject); + + /** + * Clear the subject of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearSubject(); + + /** + * The time of this CloudEvent. + * + * @return The time, if set. + */ + Optional time(); + + /** + * Return a new CloudEvent with the given time. + * + * @param time The time to set. + * @return A copy of this CloudEvent. + */ + CloudEvent withTime(ZonedDateTime time); + + /** + * Clear the time of this CloudEvent, if set. + * + * @return A copy of this CloudEvent. + */ + CloudEvent clearTime(); + + /** + * Return this CloudEvent represented as Metadata. + * + *

If this CloudEvent was created by {{@link Metadata#asCloudEvent()}}, then any non CloudEvent + * metadata that was present will still be present. + * + * @return This CloudEvent expressed as Cloudstate metadata. + */ + Metadata asMetadata(); + + /** + * Create a CloudEvent. + * + * @param id The id of the CloudEvent. + * @param source The source of the CloudEvent. + * @param type The type of the CloudEvent. + * @return The newly created CloudEvent. + */ + static CloudEvent of(String id, URI source, String type) { + return Metadata.EMPTY.asCloudEvent(id, source, type); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java b/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java index 53f57c884..106b79769 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/CloudState.java @@ -1,12 +1,18 @@ package io.cloudstate.javasupport; +import akka.actor.ActorSystem; +import akka.stream.Materializer; import com.typesafe.config.Config; import com.google.protobuf.Descriptors; +import io.cloudstate.javasupport.controller.Controller; +import io.cloudstate.javasupport.controller.ControllerHandler; import io.cloudstate.javasupport.crdt.CrdtEntity; import io.cloudstate.javasupport.crdt.CrdtEntityFactory; import io.cloudstate.javasupport.eventsourced.EventSourcedEntity; import io.cloudstate.javasupport.eventsourced.EventSourcedEntityFactory; import io.cloudstate.javasupport.impl.AnySupport; +import io.cloudstate.javasupport.impl.controller.AnnotationBasedControllerSupport; +import io.cloudstate.javasupport.impl.controller.ControllerService; import io.cloudstate.javasupport.impl.crdt.AnnotationBasedCrdtSupport; import io.cloudstate.javasupport.impl.crdt.CrdtStatefulService; import io.cloudstate.javasupport.impl.eventsourced.AnnotationBasedEventSourcedSupport; @@ -16,13 +22,14 @@ import java.util.concurrent.CompletionStage; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; /** * The CloudState class is the main interface to configuring entities to deploy, and subsequently * starting a local server which will expose these entities to the CloudState Proxy Sidecar. */ public final class CloudState { - private final Map services = new HashMap<>(); + private final Map> services = new HashMap<>(); private ClassLoader classLoader = getClass().getClassLoader(); private String typeUrlPrefix = AnySupport.DefaultTypeUrlPrefix(); private AnySupport.Prefer prefer = AnySupport.PREFER_JAVA(); @@ -108,14 +115,15 @@ public CloudState registerEventSourcedEntity( final AnySupport anySupport = newAnySupport(additionalDescriptors); - services.put( - descriptor.getFullName(), + EventSourcedStatefulService service = new EventSourcedStatefulService( new AnnotationBasedEventSourcedSupport(entityClass, anySupport, descriptor), descriptor, anySupport, persistenceId, - snapshotEvery)); + snapshotEvery); + + services.put(descriptor.getFullName(), system -> service); return this; } @@ -144,12 +152,13 @@ public CloudState registerEventSourcedEntity( Descriptors.FileDescriptor... additionalDescriptors) { services.put( descriptor.getFullName(), - new EventSourcedStatefulService( - factory, - descriptor, - newAnySupport(additionalDescriptors), - persistenceId, - snapshotEvery)); + service -> + new EventSourcedStatefulService( + factory, + descriptor, + newAnySupport(additionalDescriptors), + persistenceId, + snapshotEvery)); return this; } @@ -178,18 +187,19 @@ public CloudState registerCrdtEntity( final AnySupport anySupport = newAnySupport(additionalDescriptors); - services.put( - descriptor.getFullName(), + CrdtStatefulService service = new CrdtStatefulService( new AnnotationBasedCrdtSupport(entityClass, anySupport, descriptor), descriptor, - anySupport)); + anySupport); + + services.put(descriptor.getFullName(), system -> service); return this; } /** - * Register an CRDt entity factory. + * Register a CRDT entity factory. * *

This is a low level API intended for custom (eg, non reflection based) mechanisms for * implementing the entity. @@ -206,7 +216,77 @@ public CloudState registerCrdtEntity( Descriptors.FileDescriptor... additionalDescriptors) { services.put( descriptor.getFullName(), - new CrdtStatefulService(factory, descriptor, newAnySupport(additionalDescriptors))); + system -> + new CrdtStatefulService(factory, descriptor, newAnySupport(additionalDescriptors))); + + return this; + } + + /** + * Register an annotated Controller service. + * + *

The controller class must be annotated with {@link + * io.cloudstate.javasupport.controller.Controller}. + * + * @param controller The controller object. + * @param descriptor The descriptor for the service that this controller implements. + * @param additionalDescriptors Any additional descriptors that should be used to look up protobuf + * types when needed. + * @return This Cloudstate builder. + */ + public CloudState registerController( + Object controller, + Descriptors.ServiceDescriptor descriptor, + Descriptors.FileDescriptor... additionalDescriptors) { + + Controller controllerAnnotation = controller.getClass().getAnnotation(Controller.class); + if (controllerAnnotation == null) { + throw new IllegalArgumentException( + controller.getClass() + " does not declare an " + Controller.class + " annotation!"); + } + + final AnySupport anySupport = newAnySupport(additionalDescriptors); + + services.put( + descriptor.getFullName(), + system -> + new ControllerService( + new AnnotationBasedControllerSupport( + controller, anySupport, descriptor, Materializer.createMaterializer(system)), + descriptor, + anySupport)); + + return this; + } + + /** + * Register a Controller handler. + * + *

This is a low level API intended for custom (eg, non reflection based) mechanisms for + * implementing the controller. + * + * @param controller The controller handler. + * @param descriptor The descriptor for the service that this controller implements. + * @param additionalDescriptors Any additional descriptors that should be used to look up protobuf + * types when needed. + * @return This Cloudstate builder. + */ + public CloudState registerController( + ControllerHandler controller, + Descriptors.ServiceDescriptor descriptor, + Descriptors.FileDescriptor... additionalDescriptors) { + + Controller controllerAnnotation = controller.getClass().getAnnotation(Controller.class); + if (controllerAnnotation == null) { + throw new IllegalArgumentException( + controller.getClass() + " does not declare an " + Controller.class + " annotation!"); + } + + final AnySupport anySupport = newAnySupport(additionalDescriptors); + + ControllerService service = new ControllerService(controller, descriptor, anySupport); + + services.put(descriptor.getFullName(), system -> service); return this; } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java b/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java new file mode 100644 index 000000000..2bff0ab6c --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/Metadata.java @@ -0,0 +1,254 @@ +package io.cloudstate.javasupport; + +import io.cloudstate.javasupport.impl.MetadataImpl; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; + +/** + * Transport specific metadata. + * + *

The exact semantics of how metadata is handled depends on the underlying transport. This API + * exposes case insensitive lookups on metadata, but maintains the original case of the keys as + * received or inserted. If case matters, the iterator should be used to access elements. + * + *

Multiple values are also supported per key, if the underlying transport does not support + * multiple values per key, which value will be used is undefined. + * + *

Metadata can either have a string or a binary value. If the underlying transport doesn't + * support one or the other, how those values are handled is undefined - eg, text values may be + * UTF-8 encoded in binary, or binary values may be Base64 encoded, it depends on the transport. + * + *

This API maintains the order of entries, but the underlying transport may not. + * + *

Implementations of this class should be immutable, all update operations should return a copy + * of the metadata. + */ +public interface Metadata extends Iterable { + + /** + * Get the string value for the given key, if found. + * + *

If the entry is a binary entry, nothing will be returned. + * + *

The key lookup is case insensitive. If multiple entries with the same key are present, the + * first string entry will be returned. + * + * @param key The key to lookup. + * @return The value, if found. + */ + Optional get(String key); + + /** + * Get all the string values for a given key. + * + *

Binary values will be ignored. The key lookup is case insensitive. + * + * @param key The key to lookup. + * @return A list of all the string values for the given key. + */ + List getAll(String key); + + /** + * Get the binary value for the given key, if found. + * + *

If the entry is a string entry, nothing will be returned. + * + *

The key lookup is case insensitive. If multiple entries with the same key are present, the + * first binary entry will be returned. + * + * @param key The key to lookup. + * @return The value, if found. + */ + Optional getBinary(String key); + + /** + * Get all the binary values for a given key. + * + *

String values will be ignored. The key lookup is case insensitive. + * + * @param key The key to lookup. + * @return A list of all the binary values for the given key. + */ + List getBinaryAll(String key); + + /** + * Check whether this metadata has a entry for the given key. + * + *

The key lookup will be case insensitive. + * + * @param key The key to lookup. + * @return True if an entry for the given key exists, otherwise false. + */ + boolean has(String key); + + /** + * Get all the keys for all the entries. + * + *

This list may contain duplicate keys if there are multiple entries with the same key. + * + *

The case of the keys will be the case as passed from the proxy or from other APIs. + * + * @return A list of all the keys in this metadata. + */ + List getAllKeys(); + + /** + * Set the string value for the given key. + * + *

This will replace any existing entries that case insensitively match the given key. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry set. + * + * @param key The key to set. + * @param value The value to set. + * @return A copy of this Metadata object with the entry set. + */ + Metadata set(String key, String value); + + /** + * Set the binary value for the given key. + * + *

This will replace any existing entries that case insensitively match the given key. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry set. + * + * @param key The key to set. + * @param value The value to set. + * @return A copy of this Metadata object with the entry set. + */ + Metadata setBinary(String key, ByteBuffer value); + + /** + * Add the string value for the given key. + * + *

This will not replace any existing entries, it will simply append the entry to the end of + * the list. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry added. + * + * @param key The key to add. + * @param value The value to add. + * @return A copy of this Metadata object with the entry added. + */ + Metadata add(String key, String value); + + /** + * Add the binary value for the given key. + * + *

This will not replace any existing entries, it will simply append the entry to the end of + * the list. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entry added. + * + * @param key The key to add. + * @param value The value to add. + * @return A copy of this Metadata object with the entry added. + */ + Metadata addBinary(String key, ByteBuffer value); + + /** + * Remove all metadata entries with the given key. + * + *

The key will be matched against entries case insensitively. + * + *

This method does not modify this Metadata object, it returns a copy of this Metadata object + * with the entries removed. + * + * @param key The key to remove. + * @return A copy of this Metadata object with the entries removed. + */ + Metadata remove(String key); + + /** + * Clear all metadata entries. + * + *

This method does not modify this Metadata object, it returns an empty Metadata object. + * + * @return An empty metadata object. + */ + Metadata clear(); + + /** + * Whether this metadata is also a CloudEvent. + * + *

This will return true if all of the required CloudEvent fields are set, that is, the + * specversion, id, source and type fields. + * + * @return True if the CloudEvent required attributes are set in this Metadata. + */ + boolean isCloudEvent(); + + /** + * Return a CloudEvent representation of this Metadata. + * + *

Note that the CloudEvent representation will retain any non CloudEvent metadata when + * converted back to Metadata. + * + * @return This Metadata expressed as CloudEvent metadata. + * @throws IllegalStateException If this metadata is not a CloudEvent, that is, if it doesn't have + * any of specversion, id, source or type CloudEvent fields defined. + */ + CloudEvent asCloudEvent(); + + /** + * Convert this metadata to a CloudEvent, adding the given required CloudEvent fields. + * + *

Any metadata in this Metadata object will be left intact when asMetadata is called + * + * @param id The id of the CloudEvent. + * @param source The source of the CloudEvent. + * @param type The type of the CloudEvent. + * @return This metadata, represented as a CloudEvent with the specified fields. + */ + CloudEvent asCloudEvent(String id, URI source, String type); + + /** A metadata entry. */ + interface MetadataEntry { + /** + * The key for the metadata entry. + * + *

The key will be in the original case it was inserted or sent as. + * + * @return The key. + */ + String getKey(); + + /** + * The string value for the metadata entry. + * + * @return The string value, or null if this entry is not a string Metadata entry. + */ + String getValue(); + + /** + * The binary value for the metadata entry. + * + * @return The binary value, or null if this entry is not a string Metadata entry. + */ + ByteBuffer getBinaryValue(); + + /** + * Whether this entry is a text entry. + * + * @return True if this entry is a text entry. + */ + boolean isText(); + + /** + * Whether this entry is a binary entry. + * + * @return True if this entry is a binary entry. + */ + boolean isBinary(); + } + + /** An empty Metadata object. */ + Metadata EMPTY = MetadataImpl.Empty(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java b/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java new file mode 100644 index 000000000..cd01e098f --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/MetadataContext.java @@ -0,0 +1,7 @@ +package io.cloudstate.javasupport; + +/** Context that provides access to metadata. */ +public interface MetadataContext extends Context { + /** Get the metadata associated with this context. */ + Metadata metadata(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java index da11cbd16..5e2277f39 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCall.java @@ -19,4 +19,11 @@ public interface ServiceCall { * @return The message to pass to the call, serialized as an {@link Any}. */ Any message(); + + /** + * The metadata to pass with the message when the call is invoked. + * + * @return The metadata. + */ + Metadata metadata(); } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java index 63b499466..3e25a75af 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/ServiceCallRef.java @@ -22,5 +22,17 @@ public interface ServiceCallRef { * @param message The message to pass to the method. * @return A service call that can be used as a forward or effect. */ - ServiceCall createCall(T message); + default ServiceCall createCall(T message) { + return createCall(message, Metadata.EMPTY); + } + + /** + * Create a call from this reference, using the given message as the message to pass to it when + * it's invoked. + * + * @param message The message to pass to the method. + * @param metadata The Metadata to send. + * @return A service call that can be used as a forward or effect. + */ + ServiceCall createCall(T message, Metadata metadata); } diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/CallHandler.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/CallHandler.java new file mode 100644 index 000000000..f992dca36 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/CallHandler.java @@ -0,0 +1,49 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.impl.CloudStateAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * A controller call handler. + * + *

This annotation should be placed on methods that handle Controller service calls. + * + *

The types of the input and output parameters for these methods depend on whether the call is a + * unary or streamed call. + * + *

Calls with a unary in argument may accept the protobuf type of the call, either bare, or + * wrapped in {@link MessageEnvelope}. + * + *

Calls with a streamed in argument may accept either a {@link akka.stream.javadsl.Source}, + * {@link org.reactivestreams.Publisher} or a {@link java.util.concurrent.Flow.Publisher}. The + * element type may either be the bare protobuf type of the call, or that type wrapped in {@link + * MessageEnvelope}. + * + *

Calls with a unary out argument may either return synchronously, or return a {@link + * java.util.concurrent.CompletionStage}. The argument return type may either be the raw protobuf + * output type of the call, or wrapped in {@link MessageEnvelope} or {@link ControllerReply}. + * + *

Calls with a streamed out argument may either return a {@link akka.stream.javadsl.Source}, + * {@link org.reactivestreams.Publisher} or a {@link java.util.concurrent.Flow.Publisher}. The + * element type of these may either be the raw protobuf output type of the call, or wrapped in + * {@link MessageEnvelope} or {@link ControllerReply}. + */ +@CloudStateAnnotation +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface CallHandler { + + /** + * The name of the command to handle. + * + *

If not specified, the name of the method will be used as the command name, with the first + * letter capitalized to match the gRPC convention of capitalizing rpc method names. + * + * @return The command name. + */ + String name() default ""; +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/Controller.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/Controller.java new file mode 100644 index 000000000..2f28eecd7 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/Controller.java @@ -0,0 +1,14 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.impl.CloudStateAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** A controller. */ +@CloudStateAnnotation +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Controller {} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerContext.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerContext.java new file mode 100644 index 000000000..12403cb14 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerContext.java @@ -0,0 +1,19 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.MetadataContext; + +/** Context for controller calls. */ +public interface ControllerContext extends MetadataContext { + /** + * Get the metadata associated with this call. + * + *

Note, this only returns call level associated metadata. For unary in calls, this will be the + * same as the message metadata, but for streamed calls, it will contain metadata associated with + * the whole stream, so for example if this was a gRPC call, it will contain the HTTP headers for + * that gRPC call. + * + * @return The call level metadata. + */ + Metadata metadata(); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerHandler.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerHandler.java new file mode 100644 index 000000000..d84d25002 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerHandler.java @@ -0,0 +1,55 @@ +package io.cloudstate.javasupport.controller; + +import akka.NotUsed; +import akka.stream.javadsl.Source; +import com.google.protobuf.Any; + +import java.util.concurrent.CompletionStage; + +/** Low level interface for handling for controller calls. */ +public interface ControllerHandler { + + /** + * Handle a unary call. + * + * @param commandName The name of the command this call is for. + * @param message The message envelope of the message. + * @param context The controller context. + * @return A future of the message to return. + */ + CompletionStage> handleUnary( + String commandName, MessageEnvelope message, ControllerContext context); + + /** + * Handle a streamed out call call. + * + * @param commandName The name of the command this call is for. + * @param message The message envelope of the message. + * @param context The controller context. + * @return The stream of messages to return. + */ + Source, NotUsed> handleStreamedOut( + String commandName, MessageEnvelope message, ControllerContext context); + + /** + * Handle a streamed in call. + * + * @param commandName The name of the command this call is for. + * @param stream The stream of messages to handle. + * @param context The controller context. + * @return A future of the message to return. + */ + CompletionStage> handleStreamedIn( + String commandName, Source, NotUsed> stream, ControllerContext context); + + /** + * Handle a full duplex streamed in call. + * + * @param commandName The name of the command this call is for. + * @param stream The stream of messages to handle. + * @param context The controller context. + * @return The stream of messages to return. + */ + Source, NotUsed> handleStreamed( + String commandName, Source, NotUsed> stream, ControllerContext context); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerReply.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerReply.java new file mode 100644 index 000000000..2da23961d --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/ControllerReply.java @@ -0,0 +1,75 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.ServiceCall; +import io.cloudstate.javasupport.impl.controller.ForwardReplyImpl; +import io.cloudstate.javasupport.impl.controller.MessageReplyImpl; +import io.cloudstate.javasupport.impl.controller.NoReply; + +import java.util.Collection; + +/** + * A controller reply. + * + *

Controller replies allow returning forwards and attaching effects to messages. + * + * @param The type of the message that must be returned by this controller call. + */ +public interface ControllerReply { + /** + * The effects attached to this reply. + * + * @return The effects. + */ + Collection effects(); + + /** + * Attach the given effects to this reply. + * + * @param effects The effects to attach. + * @return A new reply with the attached effects. + */ + ControllerReply withEffects(Effect... effects); + + /** + * Create a message reply. + * + * @param payload The payload of the reply. + * @return A message reply. + */ + static MessageReply message(T payload) { + return message(payload, Metadata.EMPTY); + } + + /** + * Create a message reply. + * + * @param payload The payload of the reply. + * @param metadata The metadata for the message. + * @return A message reply. + */ + static MessageReply message(T payload, Metadata metadata) { + return new MessageReplyImpl<>(payload, metadata); + } + + /** + * Create a forward reply. + * + * @param serviceCall The service call representing the forward. + * @return A forward reply. + */ + static ForwardReply forward(ServiceCall serviceCall) { + return new ForwardReplyImpl<>(serviceCall); + } + + /** + * Create a reply that contains neither a message nor a forward. + * + *

This may be useful for emitting effects without sending a message. + * + * @return The reply. + */ + static ControllerReply noReply() { + return NoReply.apply(); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/Effect.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/Effect.java new file mode 100644 index 000000000..90881bb4f --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/Effect.java @@ -0,0 +1,35 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.ServiceCall; +import io.cloudstate.javasupport.impl.controller.EffectImpl; + +/** An effect. */ +public interface Effect { + + /** The service call that is executed as this effect. */ + ServiceCall serviceCall(); + + /** Whether this effect should be executed synchronously or not. */ + boolean synchronous(); + + /** + * Create an effect of the given service call. + * + * @param serviceCall The service call to effect. + * @param synchronous Whether this effect should be executed synchronously. + * @return The effect. + */ + static Effect of(ServiceCall serviceCall, boolean synchronous) { + return new EffectImpl(serviceCall, synchronous); + } + + /** + * Create an effect of the given service call. + * + * @param serviceCall The service call to effect. + * @return The effect. + */ + static Effect of(ServiceCall serviceCall) { + return new EffectImpl(serviceCall, false); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/ForwardReply.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/ForwardReply.java new file mode 100644 index 000000000..ad3a38be1 --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/ForwardReply.java @@ -0,0 +1,16 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.ServiceCall; + +/** A forward reply. */ +public interface ForwardReply extends ControllerReply { + + /** + * The service call that is being forwarded to. + * + * @return The service call. + */ + ServiceCall serviceCall(); + + ForwardReply withEffects(Effect... effect); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageEnvelope.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageEnvelope.java new file mode 100644 index 000000000..cbaba686e --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageEnvelope.java @@ -0,0 +1,42 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.Metadata; +import io.cloudstate.javasupport.impl.controller.MessageEnvelopeImpl; + +/** A message envelope. */ +public interface MessageEnvelope { + /** + * The metadata associated with the message. + * + * @return The metadata. + */ + Metadata metadata(); + + /** + * The payload of the message. + * + * @return The payload. + */ + T payload(); + + /** + * Create a message. + * + * @param payload The payload of the message. + * @return The message. + */ + static MessageEnvelope of(T payload) { + return new MessageEnvelopeImpl<>(payload, Metadata.EMPTY); + } + + /** + * Create a message. + * + * @param payload The payload of the message. + * @param metadata The metadata associated with the message. + * @return The message. + */ + static MessageEnvelope of(T payload, Metadata metadata) { + return new MessageEnvelopeImpl<>(payload, metadata); + } +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageReply.java b/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageReply.java new file mode 100644 index 000000000..bb886886c --- /dev/null +++ b/java-support/src/main/java/io/cloudstate/javasupport/controller/MessageReply.java @@ -0,0 +1,23 @@ +package io.cloudstate.javasupport.controller; + +import io.cloudstate.javasupport.Metadata; + +/** A message reply. */ +public interface MessageReply extends ControllerReply { + + /** + * The payload of the message reply. + * + * @return The payload. + */ + T payload(); + + /** + * The metadata associated with the message. + * + * @return The metadata. + */ + Metadata metadata(); + + MessageReply withEffects(Effect... effect); +} diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java index 048cd8b7d..9f0cccc12 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/CommandContext.java @@ -2,8 +2,7 @@ import io.cloudstate.javasupport.ClientActionContext; import io.cloudstate.javasupport.EffectContext; - -import java.util.Optional; +import io.cloudstate.javasupport.MetadataContext; /** * Context for handling a command. @@ -11,7 +10,7 @@ *

This may be passed to any {@link CommandHandler} annotated element. */ public interface CommandContext - extends CrdtContext, CrdtFactory, EffectContext, ClientActionContext { + extends CrdtContext, CrdtFactory, EffectContext, ClientActionContext, MetadataContext { /** * The id of the command. This is an internal ID generated by the proxy, and is unique to a given * entity stream. It may be used for debugging purposes. diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java index 4816d9694..582f2ebfc 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamCancelledContext.java @@ -1,6 +1,7 @@ package io.cloudstate.javasupport.crdt; import io.cloudstate.javasupport.EffectContext; +import io.cloudstate.javasupport.MetadataContext; import java.util.function.Consumer; @@ -9,7 +10,7 @@ * *

This is sent to callbacks registered by {@link StreamedCommandContext#onCancel(Consumer)}. */ -public interface StreamCancelledContext extends CrdtContext, EffectContext { +public interface StreamCancelledContext extends CrdtContext, EffectContext, MetadataContext { /** * The id of the command that the stream was for. * diff --git a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java index eb8dfa71e..ebf6933d4 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/crdt/StreamedCommandContext.java @@ -1,5 +1,7 @@ package io.cloudstate.javasupport.crdt; +import io.cloudstate.javasupport.MetadataContext; + import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -10,7 +12,7 @@ *

This may be passed to any {@link CommandHandler} annotated element that corresponds to a * command whose output is streamed. */ -public interface StreamedCommandContext extends CommandContext { +public interface StreamedCommandContext extends CommandContext, MetadataContext { /** * Whether the call is actually streamed. * diff --git a/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java b/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java index f8c0b5ccf..726b0a416 100644 --- a/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java +++ b/java-support/src/main/java/io/cloudstate/javasupport/eventsourced/CommandContext.java @@ -2,6 +2,7 @@ import io.cloudstate.javasupport.ClientActionContext; import io.cloudstate.javasupport.EffectContext; +import io.cloudstate.javasupport.MetadataContext; /** * An event sourced command context. @@ -10,7 +11,8 @@ * new events in response to a command, along with forwarding the result to other entities, and * performing side effects on other entities. */ -public interface CommandContext extends EventSourcedContext, ClientActionContext, EffectContext { +public interface CommandContext + extends EventSourcedContext, ClientActionContext, EffectContext, MetadataContext { /** * The current sequence number of events in this entity. * diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala index eb0e2bae7..39e7a658f 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/CloudStateRunner.scala @@ -25,16 +25,19 @@ import akka.http.scaladsl._ import akka.http.scaladsl.model._ import akka.stream.{ActorMaterializer, Materializer} import com.google.protobuf.Descriptors +import io.cloudstate.javasupport.impl.controller.{ControllerService, StatelessFunctionImpl} import io.cloudstate.javasupport.impl.eventsourced.{EventSourcedImpl, EventSourcedStatefulService} import io.cloudstate.javasupport.impl.{EntityDiscoveryImpl, ResolvedServiceCallFactory, ResolvedServiceMethod} import io.cloudstate.javasupport.impl.crdt.{CrdtImpl, CrdtStatefulService} import io.cloudstate.protocol.crdt.CrdtHandler import io.cloudstate.protocol.entity.EntityDiscoveryHandler import io.cloudstate.protocol.event_sourced.EventSourcedHandler +import io.cloudstate.protocol.function.StatelessFunctionHandler import scala.compat.java8.FutureConverters import scala.concurrent.Future import scala.collection.JavaConverters._ +import scala.collection.immutable object CloudStateRunner { final case class Configuration(userFunctionInterface: String, userFunctionPort: Int, snapshotEvery: Int) { @@ -56,20 +59,27 @@ object CloudStateRunner { /** * The CloudStateRunner is responsible for handle the bootstrap of entities, - * and is used by [[io.cloudstate.javasupport.CloudState.start()]] to set up the local + * and is used by [[io.cloudstate.javasupport.CloudState#start()]] to set up the local * server with the given configuration. * - * CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState.start()]] isn't enough. + * CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState#start()]] isn't enough. */ -final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[String, StatefulService]) { +final class CloudStateRunner private[this] ( + _system: ActorSystem, + serviceFactories: Map[String, java.util.function.Function[ActorSystem, Service]] +) { private[this] implicit final val system = _system private[this] implicit final val materializer: Materializer = ActorMaterializer() private[this] final val configuration = new CloudStateRunner.Configuration(system.settings.config.getConfig("cloudstate")) + private val services = serviceFactories.toSeq.map { + case (serviceName, factory) => serviceName -> factory(system) + }.toMap + // TODO JavaDoc - def this(services: java.util.Map[String, StatefulService]) { + def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]]) { this(ActorSystem("StatefulService", { val conf = ConfigFactory.load() conf.getConfig("cloudstate.system").withFallback(conf) @@ -77,7 +87,7 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ } // TODO JavaDoc - def this(services: java.util.Map[String, StatefulService], config: Config) { + def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]], config: Config) { this(ActorSystem("StatefulService", config), services.asScala.toMap) } @@ -100,6 +110,11 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ val crdtImpl = new CrdtImpl(system, crdtServices, rootContext) route orElse CrdtHandler.partial(crdtImpl) + case (route, (serviceClass, controllerServices: Map[String, ControllerService] @unchecked)) + if serviceClass == classOf[ControllerService] => + val controllerImpl = new StatelessFunctionImpl(system, controllerServices, rootContext) + route orElse StatelessFunctionHandler.partial(controllerImpl) + case (_, (serviceClass, _)) => sys.error(s"Unknown StatefulService: $serviceClass") } @@ -144,7 +159,7 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[ * StatefulService describes an entitiy type in a way which makes it possible * to deploy. */ -trait StatefulService { +trait Service { /** * @return a Protobuf ServiceDescriptor of its externally accessible gRPC API diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala index a5b84c739..a9437623f 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/EntityDiscoveryImpl.scala @@ -21,9 +21,9 @@ import io.cloudstate.protocol.entity._ import scala.concurrent.Future import akka.actor.ActorSystem import com.google.protobuf.DescriptorProtos -import io.cloudstate.javasupport.{BuildInfo, StatefulService} +import io.cloudstate.javasupport.{BuildInfo, Service} -class EntityDiscoveryImpl(system: ActorSystem, services: Map[String, StatefulService]) extends EntityDiscovery { +class EntityDiscoveryImpl(system: ActorSystem, services: Map[String, Service]) extends EntityDiscovery { private def configuredOrElse(key: String, default: String): String = if (system.settings.config.hasPath(key)) system.settings.config.getString(key) else default diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala new file mode 100644 index 000000000..a9a757798 --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/MetadataImpl.scala @@ -0,0 +1,173 @@ +package io.cloudstate.javasupport.impl + +import java.net.URI +import java.nio.ByteBuffer +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util +import java.util.{Objects, Optional} + +import com.google.protobuf.ByteString +import io.cloudstate.javasupport.{CloudEvent, Metadata} +import io.cloudstate.protocol.entity.MetadataEntry + +import scala.collection.immutable +import scala.compat.java8.OptionConverters._ +import scala.collection.JavaConverters._ + +private[impl] class MetadataImpl(val entries: immutable.Seq[MetadataEntry]) extends Metadata with CloudEvent { + + override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key)) + + override def get(key: String): Optional[String] = + entries.collectFirst { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + }.asJava + + override def getAll(key: String): util.List[String] = + entries.collect { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + }.asJava + + override def getBinary(key: String): Optional[ByteBuffer] = + entries.collectFirst { + case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => + value.asReadOnlyByteBuffer() + }.asJava + + override def getBinaryAll(key: String): util.List[ByteBuffer] = + entries.collect { + case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => + value.asReadOnlyByteBuffer() + }.asJava + + override def getAllKeys: util.List[String] = entries.map(_.key).asJava + + override def set(key: String, value: String): MetadataImpl = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + } + + override def setBinary(key: String, value: ByteBuffer): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + } + + override def add(key: String, value: String): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + } + + override def addBinary(key: String, value: ByteBuffer): Metadata = { + Objects.requireNonNull(key, "Key must not be null") + Objects.requireNonNull(value, "Value must not be null") + new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + } + + override def remove(key: String): MetadataImpl = new MetadataImpl(removeKey(key)) + + override def clear(): Metadata = MetadataImpl.Empty + + override def iterator(): util.Iterator[Metadata.MetadataEntry] = + entries.iterator.map { entry => + new Metadata.MetadataEntry { + override def getKey: String = entry.key + override def getValue: String = entry.value.stringValue.orNull + override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull + override def isText: Boolean = entry.value.isStringValue + override def isBinary: Boolean = entry.value.isBytesValue + } + }.asJava + + private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key)) + + def isCloudEvent: Boolean = MetadataImpl.CeRequired.forall(h => has(h)) + + override def asCloudEvent(): CloudEvent = + if (!isCloudEvent) { + throw new IllegalStateException("Metadata is not a CloudEvent!") + } else this + + override def asCloudEvent(id: String, source: URI, `type`: String): CloudEvent = + new MetadataImpl( + entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++ + Seq( + MetadataEntry(MetadataImpl.CeSpecversion, MetadataEntry.Value.StringValue(MetadataImpl.CeSpecversionValue)), + MetadataEntry(MetadataImpl.CeId, MetadataEntry.Value.StringValue(id)), + MetadataEntry(MetadataImpl.CeSource, MetadataEntry.Value.StringValue(source.toString)), + MetadataEntry(MetadataImpl.CeType, MetadataEntry.Value.StringValue(`type`)) + ) + ) + + private def getRequiredCloudEventField(key: String) = + entries + .collectFirst { + case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => + value + } + .getOrElse { + throw new IllegalStateException(s"Metadata is not a CloudEvent because it does not have required field $key") + } + + override def specversion(): String = getRequiredCloudEventField(MetadataImpl.CeSpecversion) + + override def id(): String = getRequiredCloudEventField(MetadataImpl.CeId) + + override def withId(id: String): CloudEvent = set(MetadataImpl.CeId, id) + + override def source(): URI = URI.create(getRequiredCloudEventField(MetadataImpl.CeSource)) + + override def withSource(source: URI): CloudEvent = set(MetadataImpl.CeSource, source.toString) + + override def `type`(): String = getRequiredCloudEventField(MetadataImpl.CeType) + + override def withType(`type`: String): CloudEvent = set(MetadataImpl.CeType, `type`) + + override def datacontenttype(): Optional[String] = get(MetadataImpl.CeDatacontenttype) + + override def withDatacontenttype(datacontenttype: String): CloudEvent = + set(MetadataImpl.CeDatacontenttype, datacontenttype) + + override def clearDatacontenttype(): CloudEvent = remove(MetadataImpl.CeDatacontenttype) + + override def dataschema(): Optional[URI] = get(MetadataImpl.CeDataschema).map(URI.create(_)) + + override def withDataschema(dataschema: URI): CloudEvent = set(MetadataImpl.CeDataschema, dataschema.toString) + + override def clearDataschema(): CloudEvent = remove(MetadataImpl.CeDataschema) + + override def subject(): Optional[String] = get(MetadataImpl.CeSubject) + + override def withSubject(subject: String): CloudEvent = set(MetadataImpl.CeSubject, subject) + + override def clearSubject(): CloudEvent = remove(MetadataImpl.CeSubject) + + override def time(): Optional[ZonedDateTime] = get(MetadataImpl.CeTime).map(ZonedDateTime.parse(_)) + + override def withTime(time: ZonedDateTime): CloudEvent = + set(MetadataImpl.CeTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time)) + + override def clearTime(): CloudEvent = remove(MetadataImpl.CeTime) + + override def asMetadata(): Metadata = this +} + +object MetadataImpl { + val CeSpecversion = "ce-specversion" + val CeSpecversionValue = "1.0" + val CeId = "ce-id" + val CeSource = "ce-source" + val CeType = "ce-type" + val CeDatacontenttype = "ce-datacontenttype" + val CeDataschema = "ce-dataschema" + val CeSubject = "ce-subject" + val CeTime = "ce-time" + val CeRequired: Set[String] = Set(CeSpecversion, CeId, CeSource, CeType) + + val Empty = new MetadataImpl(Vector.empty) +} diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala index 08a8e11ee..b994337a4 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ReflectionHelper.scala @@ -4,11 +4,53 @@ import java.lang.annotation.Annotation import java.lang.reflect.{AccessibleObject, Executable, Member, Method, ParameterizedType, Type, WildcardType} import java.util.Optional -import io.cloudstate.javasupport.{Context, EntityContext, EntityId, ServiceCallFactory} +import io.cloudstate.javasupport.{ + CloudEvent, + Context, + EntityContext, + EntityId, + Metadata, + MetadataContext, + ServiceCallFactory +} import com.google.protobuf.{Any => JavaPbAny} import scala.reflect.ClassTag +/** + * How we do reflection: + * + * Where possible, all reflection should be done up front, parameter handlers should be calculated, return type + * mappers should be calculated, and everything stored in maps for fast lookup in request hot paths. + * + * Where this isn't possible, eg because some things may be routed based on type, and supertypes may be supported, + * and the full type hierarchy isn't known up front, then the results of reflection should be cached. + * + * The general approach to reflective invocations is that each type of method (eg, command handler, event handler, + * etc) should have an invoker defined for it. This invoker is responsible for working out how to invoke the method, + * given a set of input parameters, and what to do with its result. + * + * Each invoker should store an array of parameter handlers. A parameter handler takes an input context, and converts + * it to the thing that needs to be passed to the method. When invoking the method, this array of handlers is mapped + * to the array of parameters, to be used in the reflective invocation. Determining the right parameter handler for + * a given parameter type is done by partial functions, the case statements check if the parameter type is of a + * particular type or has a particular annotation, and if it does, returns the handler for that. If nothing matches, + * the fall back is to treat that parameter as the "main argument", this is the command message or event message that + * is being handled by the method. If possible, validation is done on the main argument to ensure it is of the + * expected type. + * + * An invoker may also need to do some processing on the return type. It should, up front, define a mapping function + * up front that converts the type returned by the method to the type that the invoker needs to return. + * + * Invokers themselves are stored in a map - the key of the map depends on the type of invoker, so for example event + * handlers are looked up by type, so the key of the map will be the type of event that the invoker handles. Command + * handlers though are looked up by command name, so the key of the map will be the name of the command that the + * the handler handles. + * + * This helper class provides shared functionality for achieving the above, including some shared parameter handlers, + * and the common logic for command invokers. The helper methods in here are used by the various service types + * annotation support classes. + */ private[impl] object ReflectionHelper { def getAllDeclaredMethods(clazz: Class[_]): Seq[Method] = @@ -35,19 +77,36 @@ private[impl] object ReflectionHelper { member.getName.charAt(0).toUpper + member.getName.drop(1) } else member.getName - final case class InvocationContext[+C <: Context](mainArgument: AnyRef, context: C) - trait ParameterHandler[-C <: Context] extends (InvocationContext[C] => AnyRef) - case object ContextParameterHandler extends ParameterHandler[Context] { - override def apply(ctx: InvocationContext[Context]): AnyRef = ctx.context.asInstanceOf[AnyRef] + final case class InvocationContext[M, +C <: Context](mainArgument: M, context: C) + trait ParameterHandler[M, -C <: Context] extends (InvocationContext[M, C] => AnyRef) + case object ContextParameterHandler extends ParameterHandler[Nothing, Context] { + override def apply(ctx: InvocationContext[Nothing, Context]): AnyRef = ctx.context.asInstanceOf[AnyRef] + } + final case class MainArgumentParameterHandler[M <: AnyRef, C <: Context](argumentType: Class[M]) + extends ParameterHandler[M, C] { + override def apply(ctx: InvocationContext[M, C]): AnyRef = ctx.mainArgument + } + final case object EntityIdParameterHandler extends ParameterHandler[Nothing, EntityContext] { + override def apply(ctx: InvocationContext[Nothing, EntityContext]): AnyRef = ctx.context.entityId() + } + final case object ServiceCallFactoryParameterHandler extends ParameterHandler[Nothing, Context] { + override def apply(ctx: InvocationContext[Nothing, Context]): AnyRef = ctx.context.serviceCallFactory() } - final case class MainArgumentParameterHandler[C <: Context](argumentType: Class[_]) extends ParameterHandler[C] { - override def apply(ctx: InvocationContext[C]): AnyRef = ctx.mainArgument + final case object MetadataParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + ctx.context.metadata } - final case object EntityIdParameterHandler extends ParameterHandler[EntityContext] { - override def apply(ctx: InvocationContext[EntityContext]): AnyRef = ctx.context.entityId() + final case object CloudEventParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + ctx.context.metadata.asCloudEvent } - final case object ServiceCallFactoryParameterHandler extends ParameterHandler[Context] { - override def apply(ctx: InvocationContext[Context]): AnyRef = ctx.context.serviceCallFactory() + final case object OptionalCloudEventParameterHandler extends ParameterHandler[Nothing, MetadataContext] { + override def apply(ctx: InvocationContext[Nothing, MetadataContext]): AnyRef = + if (ctx.context.metadata.isCloudEvent) { + Optional.of(ctx.context.metadata.asCloudEvent) + } else { + Optional.empty() + } } final case class MethodParameter(method: Executable, param: Int) { @@ -59,14 +118,24 @@ private[impl] object ReflectionHelper { .find(a => implicitly[ClassTag[A]].runtimeClass.isInstance(a)) } - def getParameterHandlers[C <: Context: ClassTag](method: Executable)( - extras: PartialFunction[MethodParameter, ParameterHandler[C]] = PartialFunction.empty - ): Array[ParameterHandler[C]] = { - val handlers = Array.ofDim[ParameterHandler[_]](method.getParameterCount) + /** + * Determine the parameter handler for the given method. + * + * @param method The method (or constructor). + * @param extras A partial function for any additional argument handlers other than the default one. + * @tparam M The type of the main argument. + * @tparam C The context type for this method. + * @return An array of parameter handlers the same length as the number of parameters accepted by this method. + */ + def getParameterHandlers[M <: AnyRef, C <: Context: ClassTag](method: Executable)( + extras: PartialFunction[MethodParameter, ParameterHandler[M, C]] = PartialFunction.empty + ): Array[ParameterHandler[M, C]] = { + val handlers = Array.ofDim[ParameterHandler[_, _]](method.getParameterCount) + val contextClass = implicitly[ClassTag[C]].runtimeClass + val metadataContext = classOf[MetadataContext].isAssignableFrom(contextClass) for (i <- 0 until method.getParameterCount) { val parameter = MethodParameter(method, i) // First match things that we can be specific about - val contextClass = implicitly[ClassTag[C]].runtimeClass handlers(i) = if (isWithinBounds(parameter.parameterType, classOf[Context], contextClass)) ContextParameterHandler @@ -77,35 +146,52 @@ private[impl] object ReflectionHelper { ) else if (parameter.parameterType == classOf[ServiceCallFactory]) ServiceCallFactoryParameterHandler - else if (parameter.annotation[EntityId].isDefined) { + else if (parameter.annotation[EntityId].isDefined && classOf[EntityContext].isAssignableFrom(contextClass)) { if (parameter.parameterType != classOf[String]) { throw new RuntimeException( s"@EntityId annotated parameter on method ${method.getName} has type ${parameter.parameterType}, must be String." ) } EntityIdParameterHandler - } else - extras.applyOrElse(parameter, (p: MethodParameter) => MainArgumentParameterHandler(p.parameterType)) + } else if (metadataContext && parameter.parameterType == classOf[Metadata]) + MetadataParameterHandler + else if (metadataContext && parameter.parameterType == classOf[CloudEvent]) + CloudEventParameterHandler + else if (metadataContext && parameter.parameterType == classOf[Optional[_]] && + getFirstParameter(parameter.genericParameterType) == classOf[CloudEvent]) + OptionalCloudEventParameterHandler + else + extras.applyOrElse( + parameter, + (p: MethodParameter) => MainArgumentParameterHandler(p.parameterType.asInstanceOf[Class[M]]) + ) } - handlers.asInstanceOf[Array[ParameterHandler[C]]] + handlers.asInstanceOf[Array[ParameterHandler[M, C]]] } + def verifyAtMostOneMainArgument[M, C <: Context](name: String, + method: Method, + parameters: Array[ParameterHandler[M, C]]) = + if (parameters.count(_.isInstanceOf[MainArgumentParameterHandler[_, _]]) > 1) { + throw new RuntimeException( + s"$name method $method must defined at most one non context parameter to handle commands, the parameters defined were: ${parameters + .collect { case MainArgumentParameterHandler(clazz) => clazz.getName } + .mkString(",")}" + ) + } + final class CommandHandlerInvoker[CommandContext <: Context: ClassTag]( val method: Method, val serviceMethod: ResolvedServiceMethod[_, _], - extraParameters: PartialFunction[MethodParameter, ParameterHandler[CommandContext]] = PartialFunction.empty + extraParameters: PartialFunction[MethodParameter, ParameterHandler[AnyRef, CommandContext]] = + PartialFunction.empty ) { private val name = serviceMethod.descriptor.getFullName - private val parameters = ReflectionHelper.getParameterHandlers[CommandContext](method)(extraParameters) + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, CommandContext](method)(extraParameters) + + verifyAtMostOneMainArgument("CommandHandler", method, parameters) - if (parameters.count(_.isInstanceOf[MainArgumentParameterHandler[_]]) > 1) { - throw new RuntimeException( - s"CommandHandler method $method must defined at most one non context parameter to handle commands, the parameters defined were: ${parameters - .collect { case MainArgumentParameterHandler(clazz) => clazz.getName } - .mkString(",")}" - ) - } parameters.foreach { case MainArgumentParameterHandler(inClass) if !inClass.isAssignableFrom(serviceMethod.inputType.typeClass) => throw new RuntimeException( @@ -154,21 +240,36 @@ private[impl] object ReflectionHelper { } } - private def getRawType(t: Type): Class[_] = t match { + def getRawType(t: Type): Class[_] = t match { case clazz: Class[_] => clazz case pt: ParameterizedType => getRawType(pt.getRawType) case wct: WildcardType => getRawType(wct.getUpperBounds.headOption.getOrElse(classOf[Object])) case _ => classOf[Object] } - def getFirstParameter(t: Type): Class[_] = + /** + * Get the type of the first parameter of this parameterized type. + * + * If it's not a parameterized type, AnyRef is returned. + */ + def getGenericFirstParameter(t: Type): Type = t match { case pt: ParameterizedType => - getRawType(pt.getActualTypeArguments()(0)) + pt.getActualTypeArguments()(0) case _ => classOf[AnyRef] } + /** + * Get the class of the first parameter of this parameterized type. + * + * If it's not a parameterized type, AnyRef is returned. + * + * This is useful if, for example, you have a type who's raw type equals say java.util.Optional, + * and you want to find out what it's an optional of. + */ + def getFirstParameter(t: Type): Class[_] = getRawType(getGenericFirstParameter(t)) + /** * Verifies that none of the given methods have CloudState annotations that are not allowed. * @@ -185,7 +286,7 @@ private[impl] object ReflectionHelper { val maybeAlternative = allowed.find(_.getSimpleName == annotation.annotationType().getSimpleName) throw new RuntimeException( s"Annotation @${annotation.annotationType().getName} on method ${method.getDeclaringClass.getName}." + - s"${method.getName} not allowed in @${entity.getName} annotated entity." + + s"${method.getName} not allowed in @${entity.getName} annotated service." + maybeAlternative.fold("")(alterative => s" Did you mean to use @${alterative.getName}?") ) } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala index bad047d15..d21039f28 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceCallFactory.scala @@ -1,8 +1,8 @@ package io.cloudstate.javasupport.impl -import io.cloudstate.javasupport.{ServiceCallFactory, ServiceCallRef, StatefulService} +import io.cloudstate.javasupport.{Service, ServiceCallFactory, ServiceCallRef} -class ResolvedServiceCallFactory(services: Map[String, StatefulService]) extends ServiceCallFactory { +class ResolvedServiceCallFactory(services: Map[String, Service]) extends ServiceCallFactory { override def lookup[T](serviceName: String, methodName: String, methodType: Class[T]): ServiceCallRef[T] = services.get(serviceName) match { case Some(service) => diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala index c9b907aab..230e4a206 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/ResolvedServiceMethod.scala @@ -6,10 +6,10 @@ import com.google.protobuf.{ Descriptors, Parser, UnsafeByteOperations, - Message => JavaMessage, - Any => JavaPbAny + Any => JavaPbAny, + Message => JavaMessage } -import io.cloudstate.javasupport.{ServiceCall, ServiceCallRef} +import io.cloudstate.javasupport.{Metadata, ServiceCall, ServiceCallRef} /** * A resolved service method. @@ -24,16 +24,17 @@ final case class ResolvedServiceMethod[I, O](descriptor: Descriptors.MethodDescr override def method(): Descriptors.MethodDescriptor = descriptor - override def createCall(message: I): ServiceCall = + override def createCall(message: I, metadata: Metadata): ServiceCall = ResolvedServiceCall(this, JavaPbAny .newBuilder() .setTypeUrl(inputType.typeUrl) .setValue(inputType.toByteString(message)) - .build()) + .build(), + metadata) } -final case class ResolvedServiceCall(ref: ServiceCallRef[_], message: JavaPbAny) extends ServiceCall +final case class ResolvedServiceCall(ref: ServiceCallRef[_], message: JavaPbAny, metadata: Metadata) extends ServiceCall /** * A resolved type diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupport.scala new file mode 100644 index 000000000..46ddf65b6 --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupport.scala @@ -0,0 +1,462 @@ +package io.cloudstate.javasupport.impl.controller + +import java.lang.reflect.{InvocationTargetException, Method, Type} +import java.util.concurrent.{CompletableFuture, CompletionStage} + +import akka.NotUsed +import akka.stream.{javadsl, Materializer} +import akka.stream.javadsl.{AsPublisher, Source} +import akka.stream.scaladsl.{JavaFlowSupport, Sink} +import com.google.protobuf.{Descriptors, Any => JavaPbAny} +import io.cloudstate.javasupport.controller._ +import io.cloudstate.javasupport.impl.ReflectionHelper.{InvocationContext, ParameterHandler} +import io.cloudstate.javasupport.impl.{ + AnySupport, + ReflectionHelper, + ResolvedEntityFactory, + ResolvedServiceMethod, + ResolvedType +} +import io.cloudstate.javasupport.Metadata + +/** + * Annotation based implementation of the [[ControllerHandler]]. + */ +private[impl] class AnnotationBasedControllerSupport( + controller: AnyRef, + anySupport: AnySupport, + override val resolvedMethods: Map[String, ResolvedServiceMethod[_, _]] +)(implicit mat: Materializer) + extends ControllerHandler + with ResolvedEntityFactory { + + def this(controller: AnyRef, anySupport: AnySupport, serviceDescriptor: Descriptors.ServiceDescriptor)( + implicit mat: Materializer + ) = + this(controller, anySupport, anySupport.resolveServiceDescriptor(serviceDescriptor)) + + private val behavior = ControllerReflection(controller.getClass, resolvedMethods) + + override def handleUnary(commandName: String, + message: MessageEnvelope[JavaPbAny], + context: ControllerContext): CompletionStage[ControllerReply[JavaPbAny]] = unwrap { + behavior.unaryHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(controller, message, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${controller.getClass.getName}" + ) + } + } + + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[JavaPbAny], + context: ControllerContext): Source[ControllerReply[JavaPbAny], NotUsed] = unwrap { + behavior.serverStreamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(controller, message, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${controller.getClass.getName}" + ) + } + } + + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ControllerContext): CompletionStage[ControllerReply[JavaPbAny]] = + behavior.clientStreamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(controller, stream, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${controller.getClass.getName}" + ) + } + + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ControllerContext): Source[ControllerReply[JavaPbAny], NotUsed] = + behavior.streamedHandlers.get(commandName) match { + case Some(handler) => + handler.invoke(controller, stream, context) + case None => + throw new RuntimeException( + s"No call handler found for call $commandName on ${controller.getClass.getName}" + ) + } + + private def unwrap[T](block: => T): T = + try { + block + } catch { + case ite: InvocationTargetException if ite.getCause != null => + throw ite.getCause + } +} + +private class ControllerReflection( + val unaryHandlers: Map[String, UnaryCallInvoker], + val serverStreamedHandlers: Map[String, ServerStreamedCallInvoker], + val clientStreamedHandlers: Map[String, ClientStreamedCallInvoker], + val streamedHandlers: Map[String, StreamedCallInvoker] +) + +private object ControllerReflection { + def apply(behaviorClass: Class[_], serviceMethods: Map[String, ResolvedServiceMethod[_, _]])( + implicit mat: Materializer + ): ControllerReflection = { + + val allMethods = ReflectionHelper.getAllDeclaredMethods(behaviorClass) + + // First, find all the call handler methods, and match them with corresponding service methods + val allCallHandlers = allMethods + .filter(_.getAnnotation(classOf[CallHandler]) != null) + .map { method => + method.setAccessible(true) + val annotation = method.getAnnotation(classOf[CallHandler]) + val name: String = if (annotation.name().isEmpty) { + ReflectionHelper.getCapitalizedName(method) + } else annotation.name() + + val serviceMethod = serviceMethods.getOrElse(name, { + throw new RuntimeException( + s"Command handler method ${method.getName} for command $name found, but the service has no command by that name." + ) + }) + + (method, serviceMethod) + } + .groupBy(_._2.name) + .map { + case (commandName, Seq((method, serviceMethod))) => (commandName, method, serviceMethod) + case (commandName, many) => + throw new RuntimeException( + s"Multiple methods found for handling command of name $commandName: ${many.map(_._1.getName).mkString(", ")}" + ) + } + + val unaryCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if !serviceMethod.descriptor.isClientStreaming && !serviceMethod.descriptor.isServerStreaming => + commandName -> new UnaryCallInvoker(method, serviceMethod) + }.toMap + + val serverStreamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if !serviceMethod.descriptor.isClientStreaming && serviceMethod.descriptor.isServerStreaming => + commandName -> new ServerStreamedCallInvoker(method, serviceMethod) + }.toMap + + val clientStreamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if serviceMethod.descriptor.isClientStreaming && !serviceMethod.descriptor.isServerStreaming => + commandName -> new ClientStreamedCallInvoker(method, serviceMethod, mat) + }.toMap + + val streamedCallHandlers = allCallHandlers.collect { + case (commandName, method, serviceMethod) + if serviceMethod.descriptor.isClientStreaming && serviceMethod.descriptor.isServerStreaming => + commandName -> new StreamedCallInvoker(method, serviceMethod, mat) + }.toMap + + ReflectionHelper.validateNoBadMethods( + allMethods, + classOf[Controller], + Set(classOf[CallHandler]) + ) + + new ControllerReflection(unaryCallHandlers, + serverStreamedCallHandlers, + clientStreamedCallHandlers, + streamedCallHandlers) + } + + def getOutputParameterMapper[T](method: String, + resolvedType: ResolvedType[T], + returnType: Type): Any => ControllerReply[JavaPbAny] = { + val (payloadClass, mapper) = ReflectionHelper.getRawType(returnType) match { + case envelope if envelope == classOf[MessageEnvelope[_]] => + val payload = ReflectionHelper.getFirstParameter(returnType) + (payload, { any: Any => + val envelope = any.asInstanceOf[MessageEnvelope[T]] + ControllerReply.message(JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(envelope.payload)) + .setTypeUrl(resolvedType.typeUrl) + .build(), + envelope.metadata) + }) + case message if message == classOf[ControllerReply[_]] => + val payload = ReflectionHelper.getFirstParameter(returnType) + (payload, { any: Any => + val message = any.asInstanceOf[ControllerReply[T]] + message match { + case envelope: MessageReply[T] => + ControllerReply.message(JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(envelope.payload)) + .setTypeUrl(resolvedType.typeUrl) + .build(), + envelope.metadata) + case other => other.asInstanceOf[ControllerReply[JavaPbAny]] + } + }) + case payload => + (payload, { any: Any => + ControllerReply.message( + JavaPbAny + .newBuilder() + .setValue(resolvedType.toByteString(any.asInstanceOf[T])) + .setTypeUrl(resolvedType.typeUrl) + .build() + ) + }) + } + + if (payloadClass != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible return type $payloadClass for call $method, expected ${resolvedType.typeClass}" + ) + } + mapper + } + + def getInputParameterMapper(method: String, + resolvedType: ResolvedType[_], + parameterType: Type): MessageEnvelope[JavaPbAny] => AnyRef = + ReflectionHelper.getRawType(parameterType) match { + case envelope if envelope == classOf[MessageEnvelope[_]] => + val messageType = ReflectionHelper.getFirstParameter(parameterType) + if (messageType != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible message class $messageType for call $method, expected ${resolvedType.typeClass}" + ) + } else { envelope => + MessageEnvelope.of( + resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef], + envelope.metadata + ) + } + case payload => + if (payload != resolvedType.typeClass) { + throw new RuntimeException( + s"Incompatible message class $payload for call $method, expected ${resolvedType.typeClass}" + ) + } else { envelope => + resolvedType.parseFrom(envelope.payload.getValue).asInstanceOf[AnyRef] + } + } +} + +private class PayloadParameterHandler(mapper: MessageEnvelope[JavaPbAny] => AnyRef) + extends ParameterHandler[MessageEnvelope[JavaPbAny], ControllerContext] { + override def apply(ctx: InvocationContext[MessageEnvelope[JavaPbAny], ControllerContext]): AnyRef = + mapper(ctx.mainArgument) +} + +private class StreamedPayloadParameterHandler(mapper: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed] => AnyRef) + extends ParameterHandler[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ControllerContext] { + override def apply( + ctx: InvocationContext[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ControllerContext] + ): AnyRef = + mapper(ctx.mainArgument) +} + +private trait UnaryInSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val parameters: Array[ParameterHandler[MessageEnvelope[JavaPbAny], ControllerContext]] = + ReflectionHelper.getParameterHandlers[MessageEnvelope[JavaPbAny], ControllerContext](method) { + case payload => + new PayloadParameterHandler( + ControllerReflection + .getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, payload.genericParameterType) + ) + } +} + +private trait UnaryOutSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val outputMapper: Any => CompletionStage[ControllerReply[JavaPbAny]] = method.getReturnType match { + case cstage if cstage == classOf[CompletionStage[_]] => + val cstageType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper = + ControllerReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, cstageType) + + any: Any => any.asInstanceOf[CompletionStage[Any]].thenApply(mapper.apply) + case _ => + val mapper = ControllerReflection.getOutputParameterMapper(serviceMethod.name, + serviceMethod.outputType, + method.getGenericReturnType) + + any: Any => CompletableFuture.completedFuture(mapper(any)) + } +} + +private trait StreamedInSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + implicit protected val materializer: Materializer + + protected val parameters + : Array[ParameterHandler[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ControllerContext]] = + ReflectionHelper.getParameterHandlers[javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], ControllerContext]( + method + ) { + case source if source.parameterType == classOf[javadsl.Source[_, _]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(source.genericParameterType) + val mapper = + ControllerReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, sourceType) + + new StreamedPayloadParameterHandler(source => source.map(mapper.apply)) + + case rsPublisher if rsPublisher.parameterType == classOf[org.reactivestreams.Publisher[_]] => + val publisherType = ReflectionHelper.getGenericFirstParameter(rsPublisher.genericParameterType) + val mapper = + ControllerReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, publisherType) + + new StreamedPayloadParameterHandler( + source => + source.asScala + .map(mapper.apply) + .runWith(Sink.asPublisher(false)) + ) + + case jdkPublisher if jdkPublisher.parameterType == classOf[java.util.concurrent.Flow.Publisher[_]] => + val publisherType = ReflectionHelper.getGenericFirstParameter(jdkPublisher.genericParameterType) + val mapper = + ControllerReflection.getInputParameterMapper(serviceMethod.name, serviceMethod.inputType, publisherType) + + new StreamedPayloadParameterHandler( + source => + source.asScala + .map(mapper.apply) + .runWith(JavaFlowSupport.Sink.asPublisher(false)) + ) + + case other => + throw new RuntimeException( + s"Unknown input parameter of type $other. Streamed call ${serviceMethod.name} must accept a ${classOf[ + javadsl.Source[_, _] + ]} or ${classOf[org.reactivestreams.Publisher[_]]}." + ) + } + + if (parameters.count(_.isInstanceOf[StreamedPayloadParameterHandler]) != 1) { + throw new RuntimeException( + s"Streamed call ${serviceMethod.name} must accept exactly one parameter of type ${classOf[javadsl.Source[_, _]]} or ${classOf[org.reactivestreams.Publisher[_]]}" + ) + } +} + +private trait StreamedOutSupport { + protected val method: Method + protected val serviceMethod: ResolvedServiceMethod[_, _] + + protected val outputMapper: Any => javadsl.Source[ControllerReply[JavaPbAny], NotUsed] = method.getReturnType match { + case source if source == classOf[javadsl.Source[_, _]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ControllerReply[JavaPbAny] = + ControllerReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => + any + .asInstanceOf[javadsl.Source[Any, _]] + .map(mapper.apply) + .mapMaterializedValue(_ => NotUsed) + + case rsPublisher if rsPublisher == classOf[org.reactivestreams.Publisher[_]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ControllerReply[JavaPbAny] = + ControllerReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => { + javadsl.Source + .fromPublisher(any.asInstanceOf[org.reactivestreams.Publisher[Any]]) + .map(mapper.apply) + } + + case jdkPublisher if jdkPublisher == classOf[java.util.concurrent.Flow.Publisher[_]] => + val sourceType = ReflectionHelper.getGenericFirstParameter(method.getGenericReturnType) + val mapper: Any => ControllerReply[JavaPbAny] = + ControllerReflection.getOutputParameterMapper(serviceMethod.name, serviceMethod.outputType, sourceType) + + any: Any => { + JavaFlowSupport.Source + .fromPublisher(any.asInstanceOf[java.util.concurrent.Flow.Publisher[Any]]) + .map(mapper.apply) + .asJava + } + + case _ => + throw new RuntimeException( + s"Streamed call ${serviceMethod.name} must return a ${classOf[javadsl.Source[_, _]]} or ${classOf[org.reactivestreams.Publisher[_]]}." + ) + } +} + +private class UnaryCallInvoker(protected val method: Method, protected val serviceMethod: ResolvedServiceMethod[_, _]) + extends UnaryInSupport + with UnaryOutSupport { + + def invoke(controller: AnyRef, + message: MessageEnvelope[JavaPbAny], + context: ControllerContext): CompletionStage[ControllerReply[JavaPbAny]] = { + val ctx = InvocationContext(message, context) + val result = method.invoke(controller, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class ServerStreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _]) + extends UnaryInSupport + with StreamedOutSupport { + + def invoke(controller: AnyRef, + message: MessageEnvelope[JavaPbAny], + context: ControllerContext): javadsl.Source[ControllerReply[JavaPbAny], NotUsed] = { + val ctx = InvocationContext(message, context) + val result = method.invoke(controller, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class ClientStreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _], + protected val materializer: Materializer) + extends UnaryOutSupport + with StreamedInSupport { + + def invoke(controller: AnyRef, + stream: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ControllerContext): CompletionStage[ControllerReply[JavaPbAny]] = { + val ctx = InvocationContext(stream, context) + val result = method.invoke(controller, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} + +private class StreamedCallInvoker(protected val method: Method, + protected val serviceMethod: ResolvedServiceMethod[_, _], + protected val materializer: Materializer) + extends StreamedOutSupport + with StreamedInSupport { + + def invoke(controller: AnyRef, + stream: javadsl.Source[MessageEnvelope[JavaPbAny], NotUsed], + context: ControllerContext): javadsl.Source[ControllerReply[JavaPbAny], NotUsed] = { + val ctx = InvocationContext(stream, context) + val result = method.invoke(controller, parameters.map(_.apply(ctx)): _*) + outputMapper(result) + } + +} diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/ControllerImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/ControllerImpl.scala new file mode 100644 index 000000000..25275029e --- /dev/null +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/controller/ControllerImpl.scala @@ -0,0 +1,286 @@ +/* + * Copyright 2019 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.cloudstate.javasupport.impl.controller + +import java.util + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Sink, Source} +import com.google.protobuf.any.{Any => ScalaPbAny} +import com.google.protobuf.{Descriptors, Any => JavaPbAny} +import io.cloudstate.javasupport.CloudStateRunner.Configuration +import io.cloudstate.javasupport.controller._ +import io.cloudstate.javasupport.impl._ +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCall, ServiceCallFactory} +import io.cloudstate.protocol.entity.{Failure, Forward, Reply, SideEffect, Metadata => PbMetadata} +import io.cloudstate.protocol.function.{FunctionCommand, FunctionReply, StatelessFunction} + +import scala.concurrent.Future +import scala.compat.java8.FutureConverters._ +import scala.collection.JavaConverters._ + +final class ControllerService(val controllerHandler: ControllerHandler, + override val descriptor: Descriptors.ServiceDescriptor, + val anySupport: AnySupport) + extends Service { + + override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = + controllerHandler match { + case resolved: ResolvedEntityFactory => Some(resolved.resolvedMethods) + case _ => None + } + + override final val entityType = StatelessFunction.name +} + +final class StatelessFunctionImpl(_system: ActorSystem, services: Map[String, ControllerService], rootContext: Context) + extends StatelessFunction { + + import _system.dispatcher + implicit val system: ActorSystem = _system + + private def toJavaPbAny(any: Option[ScalaPbAny]) = + any.fold(JavaPbAny.getDefaultInstance)(ScalaPbAny.toJavaProto) + + private def toOptionPbMetadata(metadata: Metadata) = + metadata match { + case impl: MetadataImpl if impl.entries.nonEmpty => + Some(PbMetadata(impl.entries)) + case _: MetadataImpl => None + case other => throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send") + } + + private def controllerMessageToReply(msg: ControllerReply[JavaPbAny]) = { + val response = msg match { + case message: MessageReply[JavaPbAny] => + FunctionReply.Response.Reply( + Reply( + Some(ScalaPbAny.fromJavaProto(message.payload())), + toOptionPbMetadata(message.metadata()) + ) + ) + case forward: ForwardReply[JavaPbAny] => + FunctionReply.Response.Forward( + Forward( + forward.serviceCall().ref().method().getService.getFullName, + forward.serviceCall().ref().method().getName, + Some(ScalaPbAny.fromJavaProto(forward.serviceCall().message())), + toOptionPbMetadata(forward.serviceCall().metadata()) + ) + ) + // ie, NoReply + case _ => FunctionReply.Response.Empty + } + + val effects = msg match { + case impl: ControllerReplyImpl[_] => + impl._effects + case other => + other.effects().asScala.toList + } + val encodedEffects = effects.map { effect => + SideEffect( + effect.serviceCall().ref().method().getService.getFullName, + effect.serviceCall().ref().method().getName, + Some(ScalaPbAny.fromJavaProto(effect.serviceCall().message())), + effect.synchronous(), + toOptionPbMetadata(effect.serviceCall().metadata()) + ) + } + + FunctionReply(response, encodedEffects) + } + + /** + * Handle a unary command. + * The input command will contain the service name, command name, request metadata and the command + * payload. The reply may contain a direct reply, a forward or a failure, and it may contain many + * side effects. + */ + override def handleUnary(in: FunctionCommand): Future[FunctionReply] = + services.get(in.serviceName) match { + case Some(service) => + val context = createContext(in) + service.controllerHandler + .handleUnary(in.name, MessageEnvelope.of(toJavaPbAny(in.payload), context.metadata()), context) + .toScala + .map(controllerMessageToReply) + case None => + Future.successful( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + in.serviceName))) + ) + } + + /** + * Handle a streamed in command. + * The first message in will contain the request metadata, including the service name and command + * name. It will not have an associated payload set. This will be followed by zero to many messages + * in with a payload, but no service name or command name set. + * The semantics of stream closure in this protocol map 1:1 with the semantics of gRPC stream closure, + * that is, when the client closes the stream, the stream is considered half closed, and the server + * should eventually, but not necessarily immediately, send a response message with a status code and + * trailers. + * If however the server sends a response message before the client closes the stream, the stream is + * completely closed, and the client should handle this and stop sending more messages. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamedIn(in: Source[FunctionCommand, NotUsed]): Future[FunctionReply] = + in.prefixAndTail(1) + .runWith(Sink.head) + .flatMap { + case (Nil, _) => + Future.successful( + FunctionReply( + FunctionReply.Response.Failure( + Failure( + 0, + "Cloudstate protocol failure: expected command message with service name and command name, but got empty stream" + ) + ) + ) + ) + case (Seq(call), messages) => + services.get(call.serviceName) match { + case Some(service) => + service.controllerHandler + .handleStreamedIn( + call.name, + messages.map { message => + val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil)) + MessageEnvelope.of(toJavaPbAny(message.payload), metadata) + }.asJava, + createContext(call) + ) + .toScala + .map(controllerMessageToReply) + case None => + Future.successful( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + call.serviceName))) + ) + } + } + + /** + * Handle a streamed out command. + * The input command will contain the service name, command name, request metadata and the command + * payload. Zero or more replies may be sent, each containing either a direct reply, a forward or a + * failure, and each may contain many side effects. The stream to the client will be closed when the + * this stream is closed, with the same status as this stream is closed with. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamedOut(in: FunctionCommand): Source[FunctionReply, NotUsed] = + services.get(in.serviceName) match { + case Some(service) => + val context = createContext(in) + service.controllerHandler + .handleStreamedOut(in.name, MessageEnvelope.of(toJavaPbAny(in.payload), context.metadata()), context) + .asScala + .map(controllerMessageToReply) + case None => + Source.single(FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + in.serviceName)))) + } + + /** + * Handle a full duplex streamed command. + * The first message in will contain the request metadata, including the service name and command + * name. It will not have an associated payload set. This will be followed by zero to many messages + * in with a payload, but no service name or command name set. + * Zero or more replies may be sent, each containing either a direct reply, a forward or a failure, + * and each may contain many side effects. + * The semantics of stream closure in this protocol map 1:1 with the semantics of gRPC stream closure, + * that is, when the client closes the stream, the stream is considered half closed, and the server + * should eventually, but not necessarily immediately, close the streamage with a status code and + * trailers. + * If however the server closes the stream with a status code and trailers, the stream is immediately + * considered completely closed, and no further messages sent by the client will be handled by the + * server. + * Either the client or the server may cancel the stream at any time, cancellation is indicated + * through an HTTP2 stream RST message. + */ + override def handleStreamed(in: Source[FunctionCommand, NotUsed]): Source[FunctionReply, NotUsed] = + in.prefixAndTail(1) + .flatMapConcat { + case (Nil, _) => + Source.single( + FunctionReply( + FunctionReply.Response.Failure( + Failure( + 0, + "Cloudstate protocol failure: expected command message with service name and command name, but got empty stream" + ) + ) + ) + ) + case (Seq(call), messages) => + services.get(call.serviceName) match { + case Some(service) => + service.controllerHandler + .handleStreamed( + call.name, + messages.map { message => + val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil)) + MessageEnvelope.of(toJavaPbAny(message.payload), metadata) + }.asJava, + createContext(call) + ) + .asScala + .map(controllerMessageToReply) + case None => + Source.single( + FunctionReply(FunctionReply.Response.Failure(Failure(0, "Unknown service: " + call.serviceName))) + ) + } + } + + private def createContext(in: FunctionCommand): ControllerContext = { + val metadata = new MetadataImpl(in.metadata.map(_.entries.toVector).getOrElse(Nil)) + new ControllerContextImpl(metadata) + } + + class ControllerContextImpl(override val metadata: Metadata) extends ControllerContext { + override val serviceCallFactory: ServiceCallFactory = rootContext.serviceCallFactory() + } +} + +trait ControllerReplyImpl[T] extends ControllerReply[T] { + def _effects: List[Effect] + override def effects(): util.Collection[Effect] = _effects.asJava +} +case class MessageEnvelopeImpl[T](payload: T, metadata: Metadata) extends MessageEnvelope[T] +case class MessageReplyImpl[T](payload: T, metadata: Metadata, _effects: List[Effect]) + extends MessageReply[T] + with ControllerReplyImpl[T] { + def this(payload: T, metadata: Metadata) = this(payload, metadata, Nil) + override def withEffects(effect: Effect*): MessageReply[T] = MessageReplyImpl(payload, metadata, _effects ++ effect) +} +case class ForwardReplyImpl[T](serviceCall: ServiceCall, _effects: List[Effect]) + extends ForwardReply[T] + with ControllerReplyImpl[T] { + def this(serviceCall: ServiceCall) = this(serviceCall, Nil) + override def withEffects(effect: Effect*): ForwardReply[T] = ForwardReplyImpl(serviceCall, _effects ++ effect) +} +case class NoReply[T](_effects: List[Effect]) extends ControllerReplyImpl[T] { + override def withEffects(effect: Effect*): ControllerReply[T] = NoReply(_effects ++ effect) +} +object NoReply { + private val instance = NoReply[Any](Nil) + def apply[T]: ControllerReply[T] = instance.asInstanceOf[NoReply[T]] +} +case class EffectImpl(serviceCall: ServiceCall, synchronous: Boolean) extends Effect diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala index 0ff4db870..8fe97dc7d 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/AnnotationBasedCrdtSupport.scala @@ -5,9 +5,8 @@ import java.util.{function, Optional} import java.util.function.Consumer import scala.annotation.unchecked - import com.google.protobuf.{Descriptors, Any => JavaPbAny} -import io.cloudstate.javasupport.{Context, EntityFactory, ServiceCall, ServiceCallFactory} +import io.cloudstate.javasupport.{Context, EntityFactory, Metadata, ServiceCall, ServiceCallFactory} import io.cloudstate.javasupport.crdt.{ CommandContext, CommandHandler, @@ -194,7 +193,7 @@ private object CrdtAnnotationHelper { } def crdtParameterHandlers[C <: CrdtContext with CrdtFactory] - : PartialFunction[MethodParameter, ParameterHandler[C]] = { + : PartialFunction[MethodParameter, ParameterHandler[AnyRef, C]] = { case crdt if injectorMap.contains(crdt.parameterType) => new CrdtParameterHandler[C, Crdt, AnyRef](injectorMap(crdt.parameterType), crdt.method) case crdt @@ -208,8 +207,8 @@ private object CrdtAnnotationHelper { private class CrdtParameterHandler[C <: CrdtContext with CrdtFactory, D <: Crdt, T](injector: CrdtInjector[D, T], method: Executable) - extends ParameterHandler[C] { - override def apply(ctx: InvocationContext[C]): AnyRef = { + extends ParameterHandler[AnyRef, C] { + override def apply(ctx: InvocationContext[AnyRef, C]): AnyRef = { val state = ctx.context.state(injector.crdtClass) if (state.isPresent) { injector.wrap(state.get()).asInstanceOf[AnyRef] @@ -220,10 +219,10 @@ private object CrdtAnnotationHelper { } private class OptionalCrdtParameterHandler[C <: Crdt, T](injector: CrdtInjector[C, T], method: Executable) - extends ParameterHandler[CrdtContext] { + extends ParameterHandler[AnyRef, CrdtContext] { import scala.compat.java8.OptionConverters._ - override def apply(ctx: InvocationContext[CrdtContext]): AnyRef = + override def apply(ctx: InvocationContext[AnyRef, CrdtContext]): AnyRef = ctx.context.state(injector.crdtClass).asScala.map(injector.wrap).asJava } @@ -256,6 +255,7 @@ private final class AdaptedStreamedCommandContext(val delegate: StreamedCommandC override def entityId(): String = delegate.entityId() override def commandId(): Long = delegate.commandId() override def commandName(): String = delegate.commandName() + override def metadata(): Metadata = delegate.metadata() override def state[T <: Crdt](crdtClass: Class[T]): Optional[T] = delegate.state(crdtClass) override def delete(): Unit = delegate.delete() @@ -276,7 +276,9 @@ private final class AdaptedStreamedCommandContext(val delegate: StreamedCommandC private final class EntityConstructorInvoker(constructor: Constructor[_]) extends (CrdtCreationContext => AnyRef) { private val parameters = - ReflectionHelper.getParameterHandlers[CrdtCreationContext](constructor)(CrdtAnnotationHelper.crdtParameterHandlers) + ReflectionHelper.getParameterHandlers[AnyRef, CrdtCreationContext](constructor)( + CrdtAnnotationHelper.crdtParameterHandlers + ) parameters.foreach { case MainArgumentParameterHandler(clazz) => throw new RuntimeException(s"Don't know how to handle argument of type $clazz in constructor") @@ -284,7 +286,7 @@ private final class EntityConstructorInvoker(constructor: Constructor[_]) extend } def apply(context: CrdtCreationContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) constructor.newInstance(parameters.map(_.apply(ctx)): _*).asInstanceOf[AnyRef] } } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala index dd3ff2466..15d23f292 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/crdt/CrdtImpl.scala @@ -23,7 +23,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.{Flow, Source} import com.google.protobuf.Descriptors -import io.cloudstate.javasupport.{Context, ServiceCallFactory, StatefulService} +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCallFactory} import io.cloudstate.javasupport.crdt.{ CommandContext, CrdtContext, @@ -39,6 +39,7 @@ import io.cloudstate.javasupport.impl.{ ActivatableContext, AnySupport, FailInvoked, + MetadataImpl, ResolvedEntityFactory, ResolvedServiceMethod } @@ -48,13 +49,12 @@ import io.cloudstate.protocol.entity.{Command, Failure, StreamCancelled} import com.google.protobuf.any.{Any => ScalaPbAny} import com.google.protobuf.{Any => JavaPbAny} -import scala.compat.java8.OptionConverters._ import scala.collection.JavaConverters._ final class CrdtStatefulService(val factory: CrdtEntityFactory, override val descriptor: Descriptors.ServiceDescriptor, val anySupport: AnySupport) - extends StatefulService { + extends Service { override final val entityType = Crdt.name override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = @@ -135,7 +135,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], private var crdtIsNew = false private var subscribers = Map.empty[Long, function.Function[SubscriptionContext, Optional[JavaPbAny]]] - private var cancelListeners = Map.empty[Long, function.Consumer[StreamCancelledContext]] + private var cancelListeners = Map.empty[Long, (function.Consumer[StreamCancelledContext], Metadata)] private val entity = { val ctx = new CrdtCreationContext with CapturingCrdtFactory with ActivatableContext try { @@ -241,9 +241,9 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], def handleStreamCancelled(cancelled: StreamCancelled): List[CrdtStreamOut] = { subscribers -= cancelled.id cancelListeners.get(cancelled.id) match { - case Some(onCancel) => + case Some((onCancel, metadata)) => cancelListeners -= cancelled.id - val ctx = new CrdtStreamCancelledContext(cancelled) + val ctx = new CrdtStreamCancelledContext(cancelled, metadata) try { onCancel.accept(ctx) } finally { @@ -345,7 +345,7 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], subscribers = subscribers.updated(command.id, onChange) } cancelCallback.foreach { onCancel => - cancelListeners = cancelListeners.updated(command.id, onCancel) + cancelListeners = cancelListeners.updated(command.id, (onCancel, metadata)) } changeCallback.isDefined || cancelCallback.isDefined } @@ -363,9 +363,12 @@ class CrdtImpl(system: ActorSystem, services: Map[String, CrdtStatefulService], override final def commandId: Long = command.id override final def commandName(): String = command.name + + override val metadata: Metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + } - class CrdtStreamCancelledContext(cancelled: StreamCancelled) + class CrdtStreamCancelledContext(cancelled: StreamCancelled, override val metadata: Metadata) extends StreamCancelledContext with CapturingCrdtFactory with AbstractEffectContext diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala index d3b132b66..0e18c8a98 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupport.scala @@ -238,7 +238,8 @@ private object EventBehaviorReflection { private class EntityConstructorInvoker(constructor: Constructor[_]) extends (EventSourcedEntityCreationContext => AnyRef) { - private val parameters = ReflectionHelper.getParameterHandlers[EventSourcedEntityCreationContext](constructor)() + private val parameters = + ReflectionHelper.getParameterHandlers[AnyRef, EventSourcedEntityCreationContext](constructor)() parameters.foreach { case MainArgumentParameterHandler(clazz) => throw new RuntimeException(s"Don't know how to handle argument of type $clazz in constructor") @@ -246,7 +247,7 @@ private class EntityConstructorInvoker(constructor: Constructor[_]) } def apply(context: EventSourcedEntityCreationContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) constructor.newInstance(parameters.map(_.apply(ctx)): _*).asInstanceOf[AnyRef] } } @@ -255,7 +256,7 @@ private class EventHandlerInvoker(val method: Method) { private val annotation = method.getAnnotation(classOf[EventHandler]) - private val parameters = ReflectionHelper.getParameterHandlers[EventContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, EventContext](method)() private def annotationEventClass = annotation.eventClass() match { case obj if obj == classOf[Object] => None @@ -293,7 +294,7 @@ private class EventHandlerInvoker(val method: Method) { private class SnapshotHandlerInvoker(val method: Method) { private val annotation = method.getAnnotation(classOf[SnapshotHandler]) - private val parameters = ReflectionHelper.getParameterHandlers[SnapshotContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, SnapshotContext](method)() // Verify that there is at most one event handler val snapshotClass: Class[_] = parameters.collect { @@ -315,7 +316,7 @@ private class SnapshotHandlerInvoker(val method: Method) { private class SnapshotInvoker(val method: Method) { - private val parameters = ReflectionHelper.getParameterHandlers[SnapshotContext](method)() + private val parameters = ReflectionHelper.getParameterHandlers[AnyRef, SnapshotContext](method)() parameters.foreach { case MainArgumentParameterHandler(clazz) => @@ -326,7 +327,7 @@ private class SnapshotInvoker(val method: Method) { } def invoke(obj: AnyRef, context: SnapshotContext): AnyRef = { - val ctx = InvocationContext("", context) + val ctx = InvocationContext(null.asInstanceOf[AnyRef], context) method.invoke(obj, parameters.map(_.apply(ctx)): _*) } diff --git a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala index bbabd0151..12bc97702 100644 --- a/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala +++ b/java-support/src/main/scala/io/cloudstate/javasupport/impl/eventsourced/EventSourcedImpl.scala @@ -24,7 +24,7 @@ import akka.stream.scaladsl.Flow import com.google.protobuf.{Descriptors, Any => JavaPbAny} import com.google.protobuf.any.{Any => ScalaPbAny} import io.cloudstate.javasupport.CloudStateRunner.Configuration -import io.cloudstate.javasupport.{Context, ServiceCallFactory, StatefulService} +import io.cloudstate.javasupport.{Context, Metadata, Service, ServiceCallFactory} import io.cloudstate.javasupport.eventsourced._ import io.cloudstate.javasupport.impl.{ AbstractClientActionContext, @@ -32,6 +32,7 @@ import io.cloudstate.javasupport.impl.{ ActivatableContext, AnySupport, FailInvoked, + MetadataImpl, ResolvedEntityFactory, ResolvedServiceMethod } @@ -49,7 +50,7 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory, val anySupport: AnySupport, override val persistenceId: String, val snapshotEvery: Int) - extends StatefulService { + extends Service { override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] = factory match { @@ -139,10 +140,12 @@ final class EventSourcedImpl(_system: ActorSystem, if (thisEntityId != command.entityId) throw new IllegalStateException("Receiving entity is not the intended recipient of command") val cmd = ScalaPbAny.toJavaProto(command.payload.get) + val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil)) val context = new CommandContextImpl(thisEntityId, sequence, command.name, command.id, + metadata, service.anySupport, handler, service.snapshotEvery) @@ -212,6 +215,7 @@ final class EventSourcedImpl(_system: ActorSystem, override val sequenceNumber: Long, override val commandName: String, override val commandId: Long, + override val metadata: Metadata, val anySupport: AnySupport, val handler: EventSourcedEntityHandler, val snapshotEvery: Int) diff --git a/java-support/src/test/proto/cloudstate/javasupport/controllerspec.proto b/java-support/src/test/proto/cloudstate/javasupport/controllerspec.proto new file mode 100644 index 000000000..bfc3fb9ca --- /dev/null +++ b/java-support/src/test/proto/cloudstate/javasupport/controllerspec.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package cloudstate.javasupport; + +message In { + string field = 1; +} + +message Out { + string field = 1; +} + +service ControllerSpec { + rpc Unary(In) returns (Out); + rpc StreamedIn(stream In) returns (Out); + rpc StreamedOut(In) returns (stream Out); + rpc Streamed(stream In) returns (stream Out); +} \ No newline at end of file diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupportSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupportSpec.scala new file mode 100644 index 000000000..d3191561d --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/AnnotationBasedControllerSupportSpec.scala @@ -0,0 +1,401 @@ +package io.cloudstate.javasupport.impl.controller + +import java.util.Optional +import java.util.concurrent.{CompletableFuture, CompletionStage, TimeUnit} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.javadsl.Source +import akka.stream.scaladsl.{JavaFlowSupport, Sink} +import cloudstate.javasupport.Controllerspec +import cloudstate.javasupport.Controllerspec.{In, Out} +import com.google.protobuf +import io.cloudstate.javasupport.{Metadata, ServiceCallFactory} +import io.cloudstate.javasupport.controller.{ + CallHandler, + Controller, + ControllerContext, + ControllerHandler, + ControllerReply, + MessageEnvelope, + MessageReply +} +import io.cloudstate.javasupport.impl.AnySupport +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.compat.java8.FutureConverters._ + +class AnnotationBasedControllerSupportSpec extends WordSpec with Matchers with BeforeAndAfterAll { + + private implicit val sys = ActorSystem("AnnotationBasedControllerSupportSpec") + + import sys.dispatcher + + override protected def afterAll(): Unit = { + super.afterAll() + sys.terminate() + } + + private val anySupport = new AnySupport(Array(Controllerspec.getDescriptor), this.getClass.getClassLoader) + + private object ctx extends ControllerContext { + override def metadata(): Metadata = Metadata.EMPTY.add("scope", "call") + + override def serviceCallFactory(): ServiceCallFactory = ??? + } + + private def create(handler: AnyRef): ControllerHandler = + new AnnotationBasedControllerSupport(handler, + anySupport, + Controllerspec.getDescriptor.findServiceByName("ControllerSpec")) + + "Annotation based controller support" should { + + "support invoking unary commands" when { + def test(handler: AnyRef) = { + val reply = create(handler) + .handleUnary("Unary", createInEnvelope("in"), ctx) + .toCompletableFuture + .get(10, TimeUnit.SECONDS) + assertIsOutReplyWithField(reply, "out: in") + } + + def inToOut(in: In): Out = + Out.newBuilder().setField("out: " + in.getField).build() + + "synchronous" in test(new { + @CallHandler + def unary(in: In): Out = inToOut(in) + }) + + "asynchronous" in test(new { + @CallHandler + def unary(in: In): CompletionStage[Out] = CompletableFuture.completedFuture(inToOut(in)) + }) + + "in wrapped in envelope" in test(new { + @CallHandler + def unary(in: MessageEnvelope[In]): Out = { + in.metadata().get("scope") should ===(Optional.of("message")) + inToOut(in.payload()) + } + }) + + "synchronous out wrapped in envelope" in test(new { + @CallHandler + def unary(in: In): MessageEnvelope[Out] = MessageEnvelope.of(inToOut(in)) + }) + + "asynchronous out wrapped in envelope" in test(new { + @CallHandler + def unary(in: In): CompletionStage[MessageEnvelope[Out]] = + CompletableFuture.completedFuture(MessageEnvelope.of(inToOut(in))) + }) + + "synchronous out wrapped in reply" in test(new { + @CallHandler + def unary(in: In): ControllerReply[Out] = ControllerReply.message(inToOut(in)) + }) + + "asynchronous out wrapped in reply" in test(new { + @CallHandler + def unary(in: In): CompletionStage[ControllerReply[Out]] = + CompletableFuture.completedFuture(ControllerReply.message(inToOut(in))) + }) + + "with metadata parameter" in test(new { + @CallHandler + def unary(in: In, metadata: Metadata): Out = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in) + } + }) + + "with context parameter" in test(new { + @CallHandler + def unary(in: In, context: ControllerContext): Out = inToOut(in) + }) + + } + + "support invoking streamed out commands" when { + def test(handler: AnyRef) = { + val replies = Await.result( + create(handler) + .handleStreamedOut("StreamedOut", createInEnvelope("in"), ctx) + .asScala + .runWith(Sink.seq), + 10.seconds + ) + replies should have size 3 + replies.zipWithIndex.foreach { + case (reply, idx) => + assertIsOutReplyWithField(reply, s"out ${idx + 1}: in") + } + } + + def inToOut(in: In): akka.stream.scaladsl.Source[Out, NotUsed] = + akka.stream.scaladsl + .Source(1 to 3) + .map { idx => + Out.newBuilder().setField(s"out $idx: " + in.getField).build() + } + + "source" in test(new { + @CallHandler + def streamedOut(in: In): Source[Out, NotUsed] = inToOut(in).asJava + }) + + "reactive streams publisher" in test(new { + @CallHandler + def streamedOut(in: In): org.reactivestreams.Publisher[Out] = + inToOut(in).runWith(Sink.asPublisher(false)) + }) + + "jdk publisher" in test(new { + @CallHandler + def streamedOut(in: In): java.util.concurrent.Flow.Publisher[Out] = + inToOut(in).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "message envelope" in test(new { + @CallHandler + def streamedOut(in: MessageEnvelope[In]): Source[Out, NotUsed] = inToOut(in.payload()).asJava + }) + + "source wrapped in envelope" in test(new { + @CallHandler + def streamedOut(in: In): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in).map(MessageEnvelope.of(_)).asJava + }) + + "source wrapped in reply" in test(new { + @CallHandler + def streamedOut(in: In): Source[ControllerReply[Out], NotUsed] = + inToOut(in).map[ControllerReply[Out]](ControllerReply.message(_)).asJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamedOut(in: In, metadata: Metadata): Source[Out, NotUsed] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in).asJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamedOut(in: In, metadata: Metadata): Source[Out, NotUsed] = inToOut(in).asJava + }) + + } + + "support invoking streamed in commands" when { + def test(handler: AnyRef) = { + val reply = create(handler) + .handleStreamedIn( + "StreamedIn", + akka.stream.scaladsl + .Source(1 to 3) + .map(idx => createInEnvelope("in " + idx)) + .asJava, + ctx + ) + .toCompletableFuture + .get(10, TimeUnit.SECONDS) + + assertIsOutReplyWithField(reply, "out: in 1, in 2, in 3") + } + + def inToOut(in: akka.stream.scaladsl.Source[In, NotUsed]): Future[Out] = + in.runWith(Sink.seq).map { ins => + Out.newBuilder().setField("out: " + ins.map(_.getField).mkString(", ")).build() + } + + "source" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[Out] = inToOut(in.asScala).toJava + }) + + "reactive streams publisher" in test(new { + @CallHandler + def streamedIn(in: org.reactivestreams.Publisher[In]): CompletionStage[Out] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).toJava + }) + + "jdk publisher" in test(new { + @CallHandler + def streamedIn(in: java.util.concurrent.Flow.Publisher[In]): CompletionStage[Out] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).toJava + }) + + "source wrapped in envelope" in test(new { + @CallHandler + def streamedIn(in: Source[MessageEnvelope[In], NotUsed]): CompletionStage[Out] = + inToOut(in.asScala.map(_.payload)).toJava + }) + + "returns envelope" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[MessageEnvelope[Out]] = + inToOut(in.asScala).map(MessageEnvelope.of(_)).toJava + }) + + "returns reply" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed]): CompletionStage[ControllerReply[Out]] = + inToOut(in.asScala).map[ControllerReply[Out]](ControllerReply.message(_)).toJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed], metadata: Metadata): CompletionStage[Out] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in.asScala).toJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamedIn(in: Source[In, NotUsed], context: ControllerContext): CompletionStage[Out] = + inToOut(in.asScala).toJava + }) + + } + + "support invoking streamed commands" when { + def test(handler: AnyRef) = { + val replies = Await.result( + create(handler) + .handleStreamed( + "Streamed", + akka.stream.scaladsl + .Source(1 to 3) + .map(idx => createInEnvelope("in " + idx)) + .asJava, + ctx + ) + .asScala + .runWith(Sink.seq), + 10.seconds + ) + + replies should have size 3 + replies.zipWithIndex.foreach { + case (reply, idx) => + assertIsOutReplyWithField(reply, s"out: in ${idx + 1}") + } + } + + def inToOut(stream: akka.stream.scaladsl.Source[In, NotUsed]): akka.stream.scaladsl.Source[Out, NotUsed] = + stream.map { in => + Out.newBuilder().setField("out: " + in.getField).build() + } + + "source in source out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[Out, NotUsed] = inToOut(in.asScala).asJava + }) + + "reactive streams publisher in source out" in test(new { + @CallHandler + def streamed(in: org.reactivestreams.Publisher[In]): Source[Out, NotUsed] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).asJava + }) + + "source in reactive streams publisher out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): org.reactivestreams.Publisher[Out] = + inToOut(in.asScala).runWith(Sink.asPublisher(false)) + }) + + "reactive streams publisher in reactive streams publisher out" in test(new { + @CallHandler + def streamed(in: org.reactivestreams.Publisher[In]): org.reactivestreams.Publisher[Out] = + inToOut(akka.stream.scaladsl.Source.fromPublisher(in)).runWith(Sink.asPublisher(false)) + }) + + "jdk publisher in source out" in test(new { + @CallHandler + def streamed(in: java.util.concurrent.Flow.Publisher[In]): Source[Out, NotUsed] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).asJava + }) + + "source in jdk publisher out" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): java.util.concurrent.Flow.Publisher[Out] = + inToOut(in.asScala).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "jdk publisher in jdk publisher out" in test(new { + @CallHandler + def streamed(in: java.util.concurrent.Flow.Publisher[In]): java.util.concurrent.Flow.Publisher[Out] = + inToOut(JavaFlowSupport.Source.fromPublisher(in)).runWith(JavaFlowSupport.Sink.asPublisher(false)) + }) + + "in wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[Out, NotUsed] = + inToOut(in.asScala.map(_.payload)).asJava + }) + + "out wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in.asScala).map(MessageEnvelope.of(_)).asJava + }) + + "in and out wrapped in envelope" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[MessageEnvelope[Out], NotUsed] = + inToOut(in.asScala.map(_.payload())).map(MessageEnvelope.of(_)).asJava + }) + + "out wrapped in reply" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed]): Source[ControllerReply[Out], NotUsed] = + inToOut(in.asScala).map[ControllerReply[Out]](ControllerReply.message(_)).asJava + }) + + "in wrapped in envelope out wrapped in reply" in test(new { + @CallHandler + def streamed(in: Source[MessageEnvelope[In], NotUsed]): Source[ControllerReply[Out], NotUsed] = + inToOut(in.asScala.map(_.payload())).map[ControllerReply[Out]](ControllerReply.message(_)).asJava + }) + + "with metadata parameter" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed], metadata: Metadata): Source[Out, NotUsed] = { + metadata.get("scope") should ===(Optional.of("call")) + inToOut(in.asScala).asJava + } + }) + + "with context parameter" in test(new { + @CallHandler + def streamed(in: Source[In, NotUsed], context: ControllerContext): Source[Out, NotUsed] = + inToOut(in.asScala).asJava + }) + + } + + } + + private def createInEnvelope(field: String) = + MessageEnvelope.of( + protobuf.Any.pack(In.newBuilder().setField(field).build()), + Metadata.EMPTY.add("scope", "message") + ) + + private def assertIsOutReplyWithField(reply: ControllerReply[protobuf.Any], field: String) = + reply match { + case message: MessageReply[protobuf.Any] => + val out = message.payload().unpack(classOf[Out]) + out.getField should ===(field) + case other => + fail(s"$reply is not a MessageReply") + } + +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/ControllerServiceSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/ControllerServiceSpec.scala new file mode 100644 index 000000000..44ca623b9 --- /dev/null +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/controller/ControllerServiceSpec.scala @@ -0,0 +1,203 @@ +package io.cloudstate.javasupport.impl.controller + +import java.util.concurrent.{CompletableFuture, CompletionStage} + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.javadsl.Source +import akka.stream.scaladsl.Sink +import cloudstate.javasupport.Controllerspec +import cloudstate.javasupport.Controllerspec.{In, Out} +import com.google.protobuf +import com.google.protobuf.any.{Any => ScalaPbAny} +import io.cloudstate.javasupport.{Context, ServiceCallFactory} +import io.cloudstate.javasupport.controller.{ + ControllerContext, + ControllerHandler, + ControllerReply, + Effect, + MessageEnvelope +} +import io.cloudstate.javasupport.impl.{AnySupport, ResolvedServiceCallFactory} +import io.cloudstate.protocol.entity.{Forward, Reply} +import io.cloudstate.protocol.function.{FunctionCommand, FunctionReply, StatelessFunction} +import org.scalatest.{BeforeAndAfterAll, Inside, Matchers, OptionValues, WordSpec} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.compat.java8.FutureConverters._ + +class ControllerServiceSpec extends WordSpec with Matchers with BeforeAndAfterAll with Inside with OptionValues { + + private implicit val system = ActorSystem("ControllerServiceSpec") + + import system.dispatcher + + private val serviceDescriptor = + cloudstate.javasupport.Controllerspec.getDescriptor.findServiceByName("ControllerSpec") + private val serviceName = serviceDescriptor.getFullName + + override protected def afterAll(): Unit = { + super.afterAll() + system.terminate() + } + + def create(handler: ControllerHandler): StatelessFunction = { + val service = new ControllerService( + handler, + serviceDescriptor, + new AnySupport(Array(Controllerspec.getDescriptor), this.getClass.getClassLoader) + ) + + val services = Map(serviceName -> service) + val scf = new ResolvedServiceCallFactory(services) + + new StatelessFunctionImpl(system, services, new Context() { + override def serviceCallFactory(): ServiceCallFactory = scf + }) + } + + "The controller service" should { + "invoke unary commands" in { + val service = create(new AbstractHandler { + override def handleUnary(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ControllerContext): CompletionStage[ControllerReply[protobuf.Any]] = + CompletableFuture.completedFuture(createOutReply("out: " + extractInField(message))) + }) + + val reply = Await.result(service.handleUnary( + FunctionCommand(serviceName, "Unary", createInPayload("in")) + ), + 10.seconds) + + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===("out: in") + } + } + + "invoke streamed in commands" in { + val service = create(new AbstractHandler { + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ControllerContext): CompletionStage[ControllerReply[protobuf.Any]] = + stream.asScala + .map(extractInField) + .runWith(Sink.seq) + .map(ins => createOutReply("out: " + ins.mkString(", "))) + .toJava + }) + + val reply = Await.result( + service.handleStreamedIn( + akka.stream.scaladsl.Source + .single(FunctionCommand(serviceName, "StreamedIn")) + .concat( + akka.stream.scaladsl.Source(1 to 3).map(idx => FunctionCommand(payload = createInPayload(s"in $idx"))) + ) + ), + 10.seconds + ) + + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===("out: in 1, in 2, in 3") + } + } + + "invoke streamed out commands" in { + val service = create(new AbstractHandler { + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ControllerContext): Source[ControllerReply[protobuf.Any], NotUsed] = { + val in = extractInField(message) + akka.stream.scaladsl.Source(1 to 3).map(idx => createOutReply(s"out $idx: $in")).asJava + } + }) + + val replies = Await.result(service + .handleStreamedOut( + FunctionCommand(serviceName, "Unary", createInPayload("in")) + ) + .runWith(Sink.seq), + 10.seconds) + + replies.zipWithIndex.foreach { + case (reply, idx) => + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===(s"out ${idx + 1}: in") + } + } + } + + "invoke streamed commands" in { + val service = create(new AbstractHandler { + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ControllerContext): Source[ControllerReply[protobuf.Any], NotUsed] = + stream.asScala + .map(extractInField) + .map(in => createOutReply(s"out: $in")) + .asJava + }) + + val replies = Await.result( + service + .handleStreamed( + akka.stream.scaladsl.Source + .single(FunctionCommand(serviceName, "StreamedIn")) + .concat( + akka.stream.scaladsl.Source(1 to 3).map(idx => FunctionCommand(payload = createInPayload(s"in $idx"))) + ) + ) + .runWith(Sink.seq), + 10.seconds + ) + + replies.zipWithIndex.foreach { + case (reply, idx) => + inside(reply.response) { + case FunctionReply.Response.Reply(Reply(payload, _, _)) => + extractOutField(payload) should ===(s"out: in ${idx + 1}") + } + } + } + + } + + private def createOutAny(field: String) = + protobuf.Any.pack(Out.newBuilder().setField(field).build()) + + private def createOutReply(field: String): ControllerReply[protobuf.Any] = + ControllerReply.message(createOutAny(field)) + + private def extractInField(message: MessageEnvelope[protobuf.Any]) = + message.payload().unpack(classOf[In]).getField + + private def createInPayload(field: String) = + Some(ScalaPbAny.fromJavaProto(protobuf.Any.pack(In.newBuilder().setField(field).build()))) + + private def extractOutField(payload: Option[ScalaPbAny]) = + ScalaPbAny.toJavaProto(payload.value).unpack(classOf[Out]).getField + + private trait AbstractHandler extends ControllerHandler { + override def handleUnary(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ControllerContext): CompletionStage[ControllerReply[protobuf.Any]] = ??? + + override def handleStreamedOut(commandName: String, + message: MessageEnvelope[protobuf.Any], + context: ControllerContext): Source[ControllerReply[protobuf.Any], NotUsed] = ??? + + override def handleStreamedIn(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ControllerContext): CompletionStage[ControllerReply[protobuf.Any]] = ??? + + override def handleStreamed(commandName: String, + stream: Source[MessageEnvelope[protobuf.Any], NotUsed], + context: ControllerContext): Source[ControllerReply[protobuf.Any], NotUsed] = ??? + } + +} diff --git a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala index c5939b9b9..88aad46ca 100644 --- a/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala +++ b/java-support/src/test/scala/io/cloudstate/javasupport/impl/eventsourced/AnnotationBasedEventSourcedSupportSpec.scala @@ -9,6 +9,7 @@ import io.cloudstate.javasupport.{ EntityContext, EntityFactory, EntityId, + Metadata, ServiceCall, ServiceCallFactory, ServiceCallRef @@ -41,6 +42,7 @@ class AnnotationBasedEventSourcedSupportSpec extends WordSpec with Matchers { override def fail(errorMessage: String): RuntimeException = ??? override def forward(to: ServiceCall): Unit = ??? override def effect(effect: ServiceCall, synchronous: Boolean): Unit = ??? + override def metadata(): Metadata = Metadata.EMPTY } val eventCtx = new EventContext with BaseContext {