Skip to content

Commit

Permalink
Removed fs2 jawn
Browse files Browse the repository at this point in the history
It's not updated for 2 years and the usage is minimal, so it can be
replaced by a single function implemented in the codebase.
  • Loading branch information
hnaderi committed Aug 16, 2024
1 parent 8b6f458 commit a353f2d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ lazy val http4s = module("http4s") {
.settings(
description := "http4s based client for kubernetes",
libraryDependencies ++= Seq(
"org.http4s" %%% "http4s-client" % "0.23.27",
"org.typelevel" %%% "jawn-fs2" % "2.4.0"
"org.http4s" %%% "http4s-client" % "0.23.27"
)
)
.dependsOn(client, jawn)
Expand Down
29 changes: 24 additions & 5 deletions modules/http4s/src/main/scala/Http4sBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ final class Http4sBackend[F[_], T] private (client: Client[F])(implicit
)
)

private def parseJsonStream: fs2.Pipe[F, Byte, T] = {
import dev.hnaderi.k8s.jawn
import org.typelevel.jawn._
import fs2.{Pull, Chunk}

implicit val jawnFacade: Facade.SimpleFacade[T] = jawn.jawnFacade[T]
def go(
parser: AsyncParser[T]
)(s: Stream[F, Chunk[Byte]]): Pull[F, T, Unit] = {
def handle(attempt: Either[ParseException, collection.Seq[T]]) =
attempt.fold(Pull.raiseError[F], js => Pull.output(Chunk.from(js)))

s.pull.uncons1.flatMap {
case Some((a, stream)) =>
handle(parser.absorb(a.toByteBuffer)) >> go(parser)(stream)
case None =>
handle(parser.finish()) >> Pull.done
}
}

src => go(AsyncParser[T](AsyncParser.ValueStream))(src.chunks).stream
}

override def connect[O: Decoder](
url: String,
verb: APIVerb,
Expand All @@ -115,15 +138,11 @@ final class Http4sBackend[F[_], T] private (client: Client[F])(implicit
cookies: Seq[(String, String)]
): Stream[F, O] = {
import Stream._
import org.typelevel.jawn.fs2._
import dev.hnaderi.k8s.jawn
import org.typelevel.jawn.Facade
implicit val jawnFacade: Facade.SimpleFacade[T] = jawn.jawnFacade[T]

eval(urlFrom(url, params))
.map(methodFor(verb)(_, Headers(headers) ++ cookiesFor(cookies)))
.flatMap(client.stream(_))
.flatMap(_.body.chunks.parseJsonStream[T])
.flatMap(_.body.through(parseJsonStream))
.flatMap { s =>
s.decodeTo[O]
.fold(err => raiseError[F](new Exception(s"$err\n$s")), emit(_))
Expand Down

0 comments on commit a353f2d

Please sign in to comment.