Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbanda committed Dec 19, 2024
1 parent 38cbced commit 60a7796
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ object AwsEventStreamBytesDecoder {
// eventJson might look like:
// { ":message-type":"event", ":event-type":"...", "bytes":"base64string" }

val base64Str = (eventJson \ "bytes").asOpt[String]
base64Str match {
case Some(encoded) =>
(eventJson \ "bytes")
.asOpt[String]
.map { encoded =>
val decoded = Base64.getDecoder.decode(encoded)
Json.parse(decoded)
case None =>
}
.getOrElse(
// If there's no "bytes" field, return the original JSON (or handle differently)
eventJson
}
)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ object AwsEventStreamEventParser {
None
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,33 @@ class AwsEventStreamFrameDecoder extends GraphStage[FlowShape[ByteString, ByteSt
override def createLogic(attrs: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var buffer = ByteString.empty

setHandler(in, new InHandler {
override def onPush(): Unit = {
buffer ++= grab(in)
emitFrames()
}
override def onUpstreamFinish(): Unit = {
emitFrames()
if (buffer.isEmpty) completeStage()
else failStage(new RuntimeException("Truncated frame at stream end"))
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
buffer ++= grab(in)
emitFrames()
}
override def onUpstreamFinish(): Unit = {
emitFrames()
if (buffer.isEmpty) completeStage()
else failStage(new RuntimeException("Truncated frame at stream end"))
}
}
})
)

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (!hasBeenPulled(in)) pull(in)
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
if (!hasBeenPulled(in)) pull(in)
}
}
})
)

def emitFrames(): Unit = {
while (buffer.size >= 4) {
val totalLength = buffer.iterator.getInt
println("buffer size: " + buffer.size)
println("total length: " + totalLength)
println("buffer: " + buffer.utf8String)

if (buffer.size < 4 + totalLength) {
// not enough data yet
Expand All @@ -53,4 +56,4 @@ class AwsEventStreamFrameDecoder extends GraphStage[FlowShape[ByteString, ByteSt
}
}
}
}
}

0 comments on commit 60a7796

Please sign in to comment.