Skip to content

Commit

Permalink
Always return from trigger even if read from output topic times out
Browse files Browse the repository at this point in the history
  • Loading branch information
cdbartholomew authored and zzzming committed Dec 24, 2020
1 parent 8171007 commit d5b70ad
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ public String triggerFunction(final String tenant,
log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName);
throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready");
}

String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
Reader<byte[]> reader = null;
Producer<byte[]> producer = null;
Expand Down Expand Up @@ -1017,25 +1018,22 @@ public String triggerFunction(final String tenant,
if (reader == null) {
return null;
}
long curTime = System.currentTimeMillis();
long maxTime = curTime + 1000;
while (curTime < maxTime) {
Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
if (msg == null)
break;
if (msg.getProperties().containsKey("__pfn_input_msg_id__")
&& msg.getProperties().containsKey("__pfn_input_topic__")) {
MessageId newMsgId = MessageId.fromByteArray(
Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));

if (msgId.equals(newMsgId)
&& msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
return new String(msg.getData());
}

Message msg = reader.readNext(2500, TimeUnit.MILLISECONDS);

if (msg == null) {
return new String("No Message On Output Topic");
}

if (msg.getProperties().containsKey("__pfn_input_msg_id__")
&& msg.getProperties().containsKey("__pfn_input_topic__")) {
MessageId newMsgId = MessageId.fromByteArray(
Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
if (msgId.equals(newMsgId)
&& msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) {
return new String(msg.getData());
}
curTime = System.currentTimeMillis();
}
throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out");
} catch (SchemaSerializationException e) {
throw new RestException(Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please check if input data conforms with the schema of the input topic.", e.getMessage()));
} catch (IOException e) {
Expand All @@ -1048,6 +1046,9 @@ public String triggerFunction(final String tenant,
producer.closeAsync();
}
}

return null;

}

public FunctionState getFunctionState(final String tenant,
Expand Down

0 comments on commit d5b70ad

Please sign in to comment.