Skip to content

Commit

Permalink
Provide an error when entity id is missing (cloudstateio#1)
Browse files Browse the repository at this point in the history
* Provide an error when entity id is missing

Provide a more useful error when the entity id is missing from the
incoming command, so we don't know how to route it.

Previously this would lead to a dead letter and a request that
eventually times out.

* Add a test

Verified that it timed out before the changes in this PR
  • Loading branch information
raboof authored Feb 4, 2021
1 parent 7b24da9 commit 26ad80a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
17 changes: 13 additions & 4 deletions proxy/core/src/main/scala/io/cloudstate/proxy/Serve.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import akka.http.scaladsl.model.{
import akka.http.scaladsl.model.Uri.Path
import akka.actor.{ActorRef, ActorSystem}
import akka.event.{Logging, LoggingAdapter}
import akka.grpc.scaladsl.GrpcExceptionHandler
import akka.http.scaladsl.model.headers.RawHeader
import akka.util.ByteString
import akka.stream.Materializer
Expand Down Expand Up @@ -157,10 +158,18 @@ object Serve {
val grpcProxy = createGrpcApi(entities, router, entityDiscoveryClient, emitters)
val grpcHandler = Function.unlift { request: HttpRequest =>
val asResponse = grpcProxy.andThen { futureResult =>
Some(futureResult.map {
case (headers, messages) =>
createResponse(request, headers, messages)
})
Some(
futureResult
.map {
case (headers, messages) =>
createResponse(request, headers, messages)
}
.recoverWith(throwable => {
implicit val writer = GrpcProtocolNative.newWriter(Codecs.negotiate(request))
val exceptionHandler = GrpcExceptionHandler.default
exceptionHandler(throwable)
})
)
}
asResponse.applyOrElse(request, (_: HttpRequest) => None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.cloudstate.proxy
import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.grpc.GrpcServiceException
import akka.stream.scaladsl.Flow
import akka.util.Timeout
import com.google.protobuf.Descriptors.{MethodDescriptor, ServiceDescriptor}
Expand All @@ -29,9 +30,11 @@ import io.cloudstate.legacy_entity_key.LegacyEntityKeyProto
import io.cloudstate.protocol.entity.EntityPassivationStrategy.Strategy
import io.cloudstate.proxy.entity.{EntityCommand, UserFunctionReply}
import io.cloudstate.proxy.protobuf.Options
import io.grpc.Status

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace

trait UserFunctionTypeSupport {

Expand Down Expand Up @@ -128,16 +131,19 @@ final class EntityMethodDescriptor(val method: MethodDescriptor) {

def keyFieldsCount: Int = keyFields.length

def extractId(bytes: ByteString): String =
def extractId(bytes: ByteString): Option[String] =
keyFields.length match {
case 0 =>
""
None
case 1 =>
val dm = DynamicMessage.parseFrom(method.getInputType, bytes)
dm.getField(keyFields.head).toString
dm.getField(keyFields.head).toString match {
case "" => None
case other => Some(other)
}
case _ =>
val dm = DynamicMessage.parseFrom(method.getInputType, bytes)
keyFields.iterator.map(dm.getField).mkString(EntityMethodDescriptor.Separator)
Some(keyFields.iterator.map(dm.getField).mkString(EntityMethodDescriptor.Separator))
}

}
Expand All @@ -147,23 +153,40 @@ private final class EntityUserFunctionTypeSupport(serviceDescriptor: ServiceDesc
entityTypeSupport: EntityTypeSupport)(implicit ec: ExecutionContext)
extends UserFunctionTypeSupport {

val entityIdNotFound = new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("entity id not found"))
with NoStackTrace

override def handler(name: String,
metadata: Metadata): Flow[UserFunctionRouter.Message, UserFunctionReply, NotUsed] = {
val method = methodDescriptor(name)
Flow[UserFunctionRouter.Message].map(ufToEntityCommand(method)).via(entityTypeSupport.handler(method, metadata))
Flow[UserFunctionRouter.Message]
.map { message =>
ufToEntityCommand(method)(message) match {
case Some(command) => command
case None => throw entityIdNotFound
}
}
.via(entityTypeSupport.handler(method, metadata))
}

override def handleUnary(commandName: String, message: UserFunctionRouter.Message): Future[UserFunctionReply] =
entityTypeSupport.handleUnary(ufToEntityCommand(methodDescriptor(commandName))(message))
ufToEntityCommand(methodDescriptor(commandName))(message) match {
case Some(command) => entityTypeSupport.handleUnary(command)
case None => Future.failed(entityIdNotFound)
}

private def ufToEntityCommand(method: EntityMethodDescriptor): UserFunctionRouter.Message => EntityCommand = {
private def ufToEntityCommand(method: EntityMethodDescriptor): UserFunctionRouter.Message => Option[EntityCommand] = {
command =>
val entityId = method.extractId(command.payload.value)
EntityCommand(entityId = entityId,
name = method.method.getName,
payload = Some(command.payload),
streamed = method.method.isServerStreaming,
metadata = Some(command.metadata))
method
.extractId(command.payload.value)
.map(
entityId =>
EntityCommand(entityId = entityId,
name = method.method.getName,
payload = Some(command.payload),
streamed = method.method.isServerStreaming,
metadata = Some(command.metadata))
)
}

private def methodDescriptor(name: String): EntityMethodDescriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ class ExceptionHandlingSpec extends WordSpec with Matchers with BeforeAndAfterAl

"Cloudstate proxy" should {

"respond with gRPC error for request without entity id" in {
val call = client.get(Key(""))
val error = call.failed.futureValue
error shouldBe a[StatusRuntimeException]
error.getMessage should be("INVALID_ARGUMENT: entity id not found")
}

"respond with gRPC error for action failure in entity" in {
val call = client.get(Key("one"))
val connection = service.eventSourced.expectConnection()
Expand Down

0 comments on commit 26ad80a

Please sign in to comment.