Skip to content

Commit

Permalink
Propagate gRPC status code when not ok (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows authored Oct 24, 2023
1 parent b4ce39c commit f1985a3
Show file tree
Hide file tree
Showing 23 changed files with 767 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,9 @@ protected void onKafkaData(
if (grpcStatus != null &&
!HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value()))
{
OctetsFW value = grpcStatus.value();
String16FW status = statusRW
.set(grpcStatus.value().buffer(), grpcStatus.offset(), grpcStatus.sizeof())
.set(value.buffer(), value.offset(), value.sizeof())
.build();
doGrpcAbort(traceId, authorization, status);
}
Expand Down Expand Up @@ -1410,7 +1411,8 @@ private void doGrpcAbort(
long authorization,
String16FW status)
{
if (GrpcKafkaState.replyOpened(state) && !GrpcKafkaState.replyClosed(state))
if (GrpcKafkaState.replyOpening(state) &&
!GrpcKafkaState.replyClosed(state))
{
replySeq = correlater.replySeq;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public void shouldRejectUnaryRpc() throws Exception
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
"${grpc}/unary.rpc.error/client",
"${kafka}/unary.rpc.error/server"})
public void shouldRejectUnaryRpcWithError() throws Exception
{
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class GrpcClientFactory implements GrpcStreamFactory
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcMessageFW.Builder grpcMessageRW = new GrpcMessageFW.Builder();

private final GrpcAbortExFW grpcAbortedStatusRO;

private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer metadataBuffer;
private final MutableDirectBuffer extBuffer;
Expand All @@ -139,6 +141,11 @@ public GrpcClientFactory(
this.grpcTypeId = context.supplyTypeId(GrpcBinding.NAME);
this.bindings = new Long2ObjectHashMap<>();
this.helper = new HttpGrpcResponseHeaderHelper();

this.grpcAbortedStatusRO = grpcAbortExRW.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();
}

@Override
Expand Down Expand Up @@ -234,6 +241,7 @@ private final class GrpcClient
private int replyPad;

private int state;
private String grpcStatus;

private GrpcClient(
MessageConsumer application,
Expand Down Expand Up @@ -403,7 +411,14 @@ private void onAppWindow(
replyPad = padding;
state = GrpcState.openReply(state);

delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
if (GrpcState.replyClosing(state))
{
doAppAbortDeferred(traceId, authorization);
}
else
{
delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
}

assert replyAck <= replySeq;

Expand Down Expand Up @@ -450,21 +465,49 @@ private void doAppData(
assert replySeq <= replyAck + replyMax;
}

private void doAppAbort(
private void doAppAbortDeferring(
long traceId,
long authorization,
String16FW grpcStatus)
{
this.grpcStatus = grpcStatus != null ? grpcStatus.asString() : null;
this.state = GrpcState.closingReply(state);

if (GrpcState.replyOpened(state))
{
doAppAbortDeferred(traceId, authorization);
}
}

private void doAppAbortDeferred(
long traceId,
long authorization)
{
if (!GrpcState.replyClosed(state))
GrpcAbortExFW abortEx = grpcStatus != null
? grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(grpcStatus)
.build()
: grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_INTERNAL_ERROR)
.build();

doAppAbort(traceId, authorization, abortEx);
}

private void doAppAbort(
long traceId,
long authorization,
Flyweight extension)
{
if (GrpcState.replyOpening(state) &&
!GrpcState.replyClosed(state))
{
state = GrpcState.closeReply(state);

GrpcAbortExFW abortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, abortEx);
traceId, authorization, extension);
}
}

Expand Down Expand Up @@ -500,15 +543,19 @@ private void doAppWindow(

private void doAppReset(
long traceId,
long authorization,
Flyweight extension)
long authorization)
{
if (!GrpcState.initialClosed(state))
{
state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, extension);
traceId, authorization, resetEx);
}
}
}
Expand Down Expand Up @@ -725,21 +772,13 @@ private void onNetBegin(
replyMax = maximum;
state = GrpcState.openingReply(state);

delegate.doAppBegin(traceId, authorization, affinity);

if (!HTTP_HEADER_VALUE_STATUS_200.equals(status) ||
grpcStatus != null && !HEADER_VALUE_GRPC_OK.equals(grpcStatus))
{
final String16FW newGrpcStatus = grpcStatus == null ? HEADER_VALUE_GRPC_INTERNAL_ERROR : grpcStatus;
GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(newGrpcStatus)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
doNetAbort(traceId, authorization);
}
else
{
delegate.doAppBegin(traceId, authorization, affinity);
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus);
doNetReset(traceId, authorization);
}
}

Expand Down Expand Up @@ -823,10 +862,9 @@ private void onNetEnd(
}
else
{
delegate.doAppAbort(traceId, authorization);
delegate.doAppAbortDeferring(traceId, authorization,
grpcStatus != null ? grpcStatus.value() : HEADER_VALUE_GRPC_INTERNAL_ERROR);
}


}

private void onNetAbort(
Expand All @@ -846,7 +884,7 @@ private void onNetAbort(

state = GrpcState.closeReply(state);

delegate.doAppAbort(traceId, authorization);
delegate.doAppAbort(traceId, authorization, grpcAbortedStatusRO);
}

private void onNetReset(
Expand All @@ -857,12 +895,7 @@ private void onNetReset(

state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
delegate.doAppReset(traceId, authorization);
}

private void onNetWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,25 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception
}


@Test
@Configuration("client.when.yaml")
@Specification({
"${app}/server.send.write.abort.on.open.response/client",
"${net}/response.with.grpc.error/server"
})
public void shouldAbortResponseWithGrpcError() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.when.yaml")
@Specification({
"${app}/response.missing.grpc.status/client",
"${net}/response.missing.grpc.status/server",
})
public void shouldAbortResponseMissingGrpcStatus() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,9 @@ private void doKafkaEnd(
if (KafkaGrpcState.initialOpened(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, HEADER_VALUE_GRPC_OK);
Expand All @@ -806,9 +806,9 @@ private void doKafkaAbort(
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, status);
Expand Down Expand Up @@ -1453,7 +1453,6 @@ private void onGrpcAbort(
final String16FW status = abortEx != null ? abortEx.status() : HEADER_VALUE_GRPC_ABORTED;

correlater.doKafkaAbort(traceId, authorization, status);

}

private void onGrpcReset(
Expand All @@ -1464,6 +1463,7 @@ private void onGrpcReset(
final int maximum = reset.maximum();
final long traceId = reset.traceId();
final long authorization = reset.authorization();
final OctetsFW extension = reset.extension();

assert acknowledge <= sequence;
assert sequence <= initialSeq;
Expand All @@ -1472,7 +1472,7 @@ private void onGrpcReset(

initialAck = acknowledge;
initialMax = maximum;
state = KafkaGrpcState.closingInitial(state);
state = KafkaGrpcState.closeInitial(state);

cleanup(traceId, authorization);

Expand Down Expand Up @@ -1501,7 +1501,7 @@ private void onGrpcWindow(
initialBud = budgetId;
initialPad = padding;
initialCap = capabilities;
state = KafkaGrpcState.openReply(state);
state = KafkaGrpcState.openInitial(state);

assert initialAck <= initialSeq;

Expand Down Expand Up @@ -1592,7 +1592,7 @@ private void doGrpcBegin(
OctetsFW service,
OctetsFW method)
{
state = KafkaGrpcState.openingReply(state);
state = KafkaGrpcState.openingInitial(state);

grpc = newGrpcStream(this::onGrpcMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, server.condition.scheme(), server.condition.authority(),
Expand Down Expand Up @@ -1621,7 +1621,8 @@ private void doGrpcAbort(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpened(state) && !KafkaGrpcState.replyClosed(state))
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
final GrpcAbortExFW grpcAbortEx =
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
Expand All @@ -1639,9 +1640,9 @@ private void doGrpcEnd(
long traceId,
long authorization)
{
if (!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.initialClosed(state))
{
state = KafkaGrpcState.closeReply(state);
state = KafkaGrpcState.closeInitial(state);

doEnd(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
Expand All @@ -1666,8 +1667,7 @@ private void doGrpcReset(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpening(state) &&
!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.replyClosed(state))
{
state = KafkaGrpcState.closeReply(state);

Expand All @@ -1682,6 +1682,7 @@ private void doGrpcReset(
}
}
}

private void doBegin(
MessageConsumer receiver,
long originId,
Expand Down
Loading

0 comments on commit f1985a3

Please sign in to comment.