Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate gRPC status code when not ok #519

Merged
merged 11 commits into from
Oct 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,9 @@ protected void onKafkaData(
if (grpcStatus != null &&
!HEADER_VALUE_GRPC_OK.value().equals(grpcStatus.value().value()))
{
OctetsFW value = grpcStatus.value();
String16FW status = statusRW
.set(grpcStatus.value().buffer(), grpcStatus.offset(), grpcStatus.sizeof())
.set(value.buffer(), value.offset(), value.sizeof())
.build();
doGrpcAbort(traceId, authorization, status);
}
Expand Down Expand Up @@ -1410,7 +1411,8 @@ private void doGrpcAbort(
long authorization,
String16FW status)
{
if (GrpcKafkaState.replyOpened(state) && !GrpcKafkaState.replyClosed(state))
if (GrpcKafkaState.replyOpening(state) &&
!GrpcKafkaState.replyClosed(state))
{
replySeq = correlater.replySeq;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public void shouldRejectUnaryRpc() throws Exception
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
"${grpc}/unary.rpc.error/client",
"${kafka}/unary.rpc.error/server"})
public void shouldRejectUnaryRpcWithError() throws Exception
{
k3po.finish();
}

@Test
@Configuration("produce.proxy.rpc.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class GrpcClientFactory implements GrpcStreamFactory
private final GrpcResetExFW.Builder grpcResetExRW = new GrpcResetExFW.Builder();
private final GrpcMessageFW.Builder grpcMessageRW = new GrpcMessageFW.Builder();

private final GrpcAbortExFW grpcAbortedStatusRO;

private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer metadataBuffer;
private final MutableDirectBuffer extBuffer;
Expand All @@ -139,6 +141,11 @@ public GrpcClientFactory(
this.grpcTypeId = context.supplyTypeId(GrpcBinding.NAME);
this.bindings = new Long2ObjectHashMap<>();
this.helper = new HttpGrpcResponseHeaderHelper();

this.grpcAbortedStatusRO = grpcAbortExRW.wrap(new UnsafeBuffer(new byte[32]), 0, 32)
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();
}

@Override
Expand Down Expand Up @@ -234,6 +241,7 @@ private final class GrpcClient
private int replyPad;

private int state;
private String grpcStatus;

private GrpcClient(
MessageConsumer application,
Expand Down Expand Up @@ -403,7 +411,14 @@ private void onAppWindow(
replyPad = padding;
state = GrpcState.openReply(state);

delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
if (GrpcState.replyClosing(state))
{
doAppAbortDeferred(traceId, authorization);
}
else
{
delegate.doNetWindow(traceId, authorization, budgetId, padding, replyAck, replyMax);
}

assert replyAck <= replySeq;

Expand Down Expand Up @@ -450,21 +465,49 @@ private void doAppData(
assert replySeq <= replyAck + replyMax;
}

private void doAppAbort(
private void doAppAbortDeferring(
long traceId,
long authorization,
String16FW grpcStatus)
{
this.grpcStatus = grpcStatus != null ? grpcStatus.asString() : null;
this.state = GrpcState.closingReply(state);

if (GrpcState.replyOpened(state))
jfallows marked this conversation as resolved.
Show resolved Hide resolved
{
doAppAbortDeferred(traceId, authorization);
}
}

private void doAppAbortDeferred(
long traceId,
long authorization)
{
if (!GrpcState.replyClosed(state))
GrpcAbortExFW abortEx = grpcStatus != null
? grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(grpcStatus)
.build()
: grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_INTERNAL_ERROR)
.build();
jfallows marked this conversation as resolved.
Show resolved Hide resolved

doAppAbort(traceId, authorization, abortEx);
}

private void doAppAbort(
long traceId,
long authorization,
Flyweight extension)
{
if (GrpcState.replyOpening(state) &&
!GrpcState.replyClosed(state))
{
state = GrpcState.closeReply(state);

GrpcAbortExFW abortEx = grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doAbort(application, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, abortEx);
traceId, authorization, extension);
}
}

Expand Down Expand Up @@ -500,15 +543,19 @@ private void doAppWindow(

private void doAppReset(
long traceId,
long authorization,
Flyweight extension)
long authorization)
{
if (!GrpcState.initialClosed(state))
{
state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

doReset(application, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, extension);
traceId, authorization, resetEx);
}
}
}
Expand Down Expand Up @@ -725,21 +772,13 @@ private void onNetBegin(
replyMax = maximum;
state = GrpcState.openingReply(state);

delegate.doAppBegin(traceId, authorization, affinity);

if (!HTTP_HEADER_VALUE_STATUS_200.equals(status) ||
grpcStatus != null && !HEADER_VALUE_GRPC_OK.equals(grpcStatus))
{
final String16FW newGrpcStatus = grpcStatus == null ? HEADER_VALUE_GRPC_INTERNAL_ERROR : grpcStatus;
GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(newGrpcStatus)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
doNetAbort(traceId, authorization);
}
else
{
delegate.doAppBegin(traceId, authorization, affinity);
delegate.doAppAbortDeferring(traceId, authorization, grpcStatus);
doNetReset(traceId, authorization);
}
}

Expand Down Expand Up @@ -823,10 +862,9 @@ private void onNetEnd(
}
else
{
delegate.doAppAbort(traceId, authorization);
delegate.doAppAbortDeferring(traceId, authorization,
grpcStatus != null ? grpcStatus.value() : HEADER_VALUE_GRPC_INTERNAL_ERROR);
}


}

private void onNetAbort(
Expand All @@ -846,7 +884,7 @@ private void onNetAbort(

state = GrpcState.closeReply(state);

delegate.doAppAbort(traceId, authorization);
delegate.doAppAbort(traceId, authorization, grpcAbortedStatusRO);
}

private void onNetReset(
Expand All @@ -857,12 +895,7 @@ private void onNetReset(

state = GrpcState.closeInitial(state);

GrpcResetExFW resetEx = grpcResetExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(grpcTypeId)
.status(HEADER_VALUE_GRPC_ABORTED)
.build();

delegate.doAppReset(traceId, authorization, resetEx);
delegate.doAppReset(traceId, authorization);
}

private void onNetWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,25 @@ public void serverSendsWriteAbortOnOpenRequestResponse() throws Exception
}


@Test
@Configuration("client.when.yaml")
@Specification({
"${app}/server.send.write.abort.on.open.response/client",
"${net}/response.with.grpc.error/server"
})
public void shouldAbortResponseWithGrpcError() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.when.yaml")
@Specification({
"${app}/response.missing.grpc.status/client",
"${net}/response.missing.grpc.status/server",
})
public void shouldAbortResponseMissingGrpcStatus() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,9 @@ private void doKafkaEnd(
if (KafkaGrpcState.initialOpened(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, HEADER_VALUE_GRPC_OK);
Expand All @@ -806,9 +806,9 @@ private void doKafkaAbort(
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
initialSeq = delegate.replySeq;
initialAck = delegate.replyAck;
initialMax = delegate.replyMax;
state = KafkaGrpcState.closeInitial(state);

doKafkaTombstone(traceId, authorization, status);
Expand Down Expand Up @@ -1453,7 +1453,6 @@ private void onGrpcAbort(
final String16FW status = abortEx != null ? abortEx.status() : HEADER_VALUE_GRPC_ABORTED;

correlater.doKafkaAbort(traceId, authorization, status);

}

private void onGrpcReset(
Expand All @@ -1464,6 +1463,7 @@ private void onGrpcReset(
final int maximum = reset.maximum();
final long traceId = reset.traceId();
final long authorization = reset.authorization();
final OctetsFW extension = reset.extension();

assert acknowledge <= sequence;
assert sequence <= initialSeq;
Expand All @@ -1472,7 +1472,7 @@ private void onGrpcReset(

initialAck = acknowledge;
initialMax = maximum;
state = KafkaGrpcState.closingInitial(state);
state = KafkaGrpcState.closeInitial(state);

cleanup(traceId, authorization);

Expand Down Expand Up @@ -1501,7 +1501,7 @@ private void onGrpcWindow(
initialBud = budgetId;
initialPad = padding;
initialCap = capabilities;
state = KafkaGrpcState.openReply(state);
state = KafkaGrpcState.openInitial(state);

assert initialAck <= initialSeq;

Expand Down Expand Up @@ -1592,7 +1592,7 @@ private void doGrpcBegin(
OctetsFW service,
OctetsFW method)
{
state = KafkaGrpcState.openingReply(state);
state = KafkaGrpcState.openingInitial(state);

grpc = newGrpcStream(this::onGrpcMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, server.condition.scheme(), server.condition.authority(),
Expand Down Expand Up @@ -1621,7 +1621,8 @@ private void doGrpcAbort(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpened(state) && !KafkaGrpcState.replyClosed(state))
if (KafkaGrpcState.initialOpening(state) &&
!KafkaGrpcState.initialClosed(state))
{
final GrpcAbortExFW grpcAbortEx =
grpcAbortExRW.wrap(extBuffer, 0, extBuffer.capacity())
Expand All @@ -1639,9 +1640,9 @@ private void doGrpcEnd(
long traceId,
long authorization)
{
if (!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.initialClosed(state))
{
state = KafkaGrpcState.closeReply(state);
state = KafkaGrpcState.closeInitial(state);

doEnd(grpc, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
Expand All @@ -1666,8 +1667,7 @@ private void doGrpcReset(
long traceId,
long authorization)
{
if (KafkaGrpcState.replyOpening(state) &&
!KafkaGrpcState.replyClosed(state))
if (!KafkaGrpcState.replyClosed(state))
{
state = KafkaGrpcState.closeReply(state);

Expand All @@ -1682,6 +1682,7 @@ private void doGrpcReset(
}
}
}

private void doBegin(
MessageConsumer receiver,
long originId,
Expand Down
Loading