Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publiser meldinger på Kafka sammen med nøkkel (i rivers) #824

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class HentArbeidsforholdRiver(
)
}

override fun HentArbeidsforholdMelding.bestemNoekkel(): KafkaKey? = svarKafkaKey

override fun HentArbeidsforholdMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val arbeidsforhold =
Metrics.aaregRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class AltinnRiver(
)
}

override fun Melding.bestemNoekkel(): KafkaKey? = svarKafkaKey

override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val rettigheterForenklet =
Metrics.altinnRequest.recordTime(altinnClient::hentRettighetOrganisasjoner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.metrics.Metrics
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -53,6 +54,8 @@ class TilgangRiver(
)
}

override fun TilgangMelding.bestemNoekkel(): KafkaKey = KafkaKey(fnr)

override fun TilgangMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val harTilgang =
Metrics.altinnRequest.recordTime(altinnClient::harRettighetForOrganisasjon) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -47,6 +48,8 @@ class HentEksternImRiver(
)
}

override fun HentEksternImMelding.bestemNoekkel(): KafkaKey = KafkaKey(forespoerselId)

override fun HentEksternImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement>? {
logger.info("Henter ekstern inntektsmelding med ID '$spinnImId' fra Spinn.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class HentVirksomhetNavnRiver(
)
}

override fun HentVirksomhetMelding.bestemNoekkel(): KafkaKey? = svarKafkaKey

override fun HentVirksomhetMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val orgnrMedNavn =
if (isPreProd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class HentLagretImRiver(
)
}

override fun HentLagretImMelding.bestemNoekkel(): KafkaKey? = svarKafkaKey

override fun HentLagretImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val (
skjema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -48,6 +49,8 @@ class HentSelvbestemtImRiver(
)
}

override fun HentSelvbestemtImMelding.bestemNoekkel(): KafkaKey = KafkaKey(selvbestemtId)

override fun HentSelvbestemtImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
"Skal hente selvbestemt inntektsmelding.".also {
logger.info(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -45,6 +46,8 @@ class LagreEksternImRiver(
)
}

override fun LagreEksternImMelding.bestemNoekkel(): KafkaKey = KafkaKey(forespoerselId)

override fun LagreEksternImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
imRepo.lagreEksternInntektsmelding(forespoerselId, eksternInntektsmelding)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -52,6 +53,8 @@ class LagreImRiver(
)
}

override fun LagreImMelding.bestemNoekkel(): KafkaKey = KafkaKey(inntektsmelding.type.id)

override fun LagreImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val inntektsmeldingGammeltFormat = inntektsmelding.convert()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -58,6 +59,8 @@ class LagreImSkjemaRiver(
)
}

override fun LagreImSkjemaMelding.bestemNoekkel(): KafkaKey = KafkaKey(skjema.forespoerselId)

override fun LagreImSkjemaMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
val sisteImSkjema = repository.hentNyesteInntektsmeldingSkjema(skjema.forespoerselId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toPretty
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -49,6 +50,8 @@ class LagreJournalpostIdRiver(
)
}

override fun LagreJournalpostIdMelding.bestemNoekkel(): KafkaKey = KafkaKey(inntektsmelding.type.id)

override fun LagreJournalpostIdMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement>? {
logger.info("Mottok melding.")
sikkerLogger.info("Mottok melding:\n${json.toPretty()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -50,6 +51,8 @@ class LagreSelvbestemtImRiver(
)
}

override fun LagreSelvbestemtImMelding.bestemNoekkel(): KafkaKey = KafkaKey(selvbestemtInntektsmelding.type.id)

override fun LagreSelvbestemtImMelding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
"Skal lagre selvbestemt inntektsmelding.".also {
logger.info(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import no.nav.helsearbeidsgiver.felles.json.krev
import no.nav.helsearbeidsgiver.felles.json.les
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toPretty
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -49,6 +50,8 @@ class DistribusjonRiver(
)
}

override fun Melding.bestemNoekkel(): KafkaKey = KafkaKey(inntektsmelding.type.id)

override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
"Forsøker å distribuere IM med journalpost-ID '$journalpostId'.".also {
logger.info(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import no.nav.helsearbeidsgiver.felles.json.lesOrNull
import no.nav.helsearbeidsgiver.felles.json.toJson
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.json.toPretty
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.model.Fail
import no.nav.helsearbeidsgiver.felles.rapidsrivers.river.ObjectRiver
import no.nav.helsearbeidsgiver.felles.utils.Log
Expand Down Expand Up @@ -54,6 +55,9 @@ class FeilLytter(
fail = Key.FAIL.les(Fail.serializer(), json),
)

// Riveren publiserer ikke via ObjectRiver, så denne nøkkelen blir ikke brukt
override fun Melding.bestemNoekkel(): KafkaKey = KafkaKey(UUID.randomUUID())

override fun Melding.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement>? {
logger.info("Mottok feil.")
sikkerLogger.info("Mottok feil.\n${json.toPretty()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonNull
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.json.toMap
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.utils.log.MdcUtils
import no.nav.helsearbeidsgiver.utils.log.logger
import no.nav.helsearbeidsgiver.utils.log.sikkerLogger
Expand Down Expand Up @@ -39,6 +40,8 @@ import no.nav.helsearbeidsgiver.utils.log.sikkerLogger
* height = Key.HEIGHT.lesOrNull(Int.serializer(), json)
* )
*
* override fun LotrCharacter.bestemNoekkel(): KafkaKey = KafkaKey(name)
*
* override fun LotrCharacter.haandter(json: Map<Key, JsonElement>): Map<Key, JsonElement> {
* val favouriteFood = when (name) {
* "Frodo" -> "\uD83C\uDF53"
Expand Down Expand Up @@ -93,6 +96,15 @@ abstract class ObjectRiver<Melding : Any> {
*/
protected abstract fun les(json: Map<Key, JsonElement>): Melding?

/**
* @receiver [Melding] - output fra [les].

* @return
* Nøkkel som utgående melding sendes sammen med.
* Meldinger sendt med samme nøkkel vil opprettholde rekkefølgen mellom dem (og konsumeres av samme pod).
*/
protected abstract fun Melding.bestemNoekkel(): KafkaKey?

/**
* Riverens hovedfunksjon. Agerer på innkommende melding.
* Kastede exceptions håndteres i [haandterFeil].
Expand Down Expand Up @@ -126,7 +138,7 @@ abstract class ObjectRiver<Melding : Any> {
protected abstract fun Melding.loggfelt(): Map<String, String>

/** Brukes av [OpenRiver]. */
private fun lesOgHaandter(json: JsonElement): Map<Key, JsonElement>? {
private fun lesOgHaandter(json: JsonElement): Pair<KafkaKey?, Map<Key, JsonElement>>? {
val jsonMap = json.toMap().filterValues { it !is JsonNull }

val innkommende = runCatching { les(jsonMap) }.getOrNull()
Expand All @@ -145,16 +157,33 @@ abstract class ObjectRiver<Melding : Any> {
.orEmpty()

return MdcUtils.withLogFields(*loggfelt) {
val utgaaende =
innkommende?.let {
if (innkommende == null) {
null
} else {
val key =
runCatching {
it.haandter(jsonMap)
innkommende.bestemNoekkel()
}.getOrElse { e ->
it.haandterFeil(jsonMap, e)
"Klarte ikke lage Kafka-nøkkel.".also {
logger.error(it)
sikkerLogger.error(it, e)
}
null
}
}

utgaaende?.takeIf { it.isNotEmpty() }
val msg =
runCatching {
innkommende.haandter(jsonMap)
}.getOrElse { e ->
innkommende.haandterFeil(jsonMap, e)
}

if (msg.isNullOrEmpty()) {
null
} else {
key to msg
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.github.navikt.tbd_libs.rapids_and_rivers_api.MessageContext
import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import kotlinx.serialization.json.JsonElement
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.publish
import no.nav.helsearbeidsgiver.utils.json.parseJson

Expand All @@ -17,7 +18,7 @@ import no.nav.helsearbeidsgiver.utils.json.parseJson
*/
internal class OpenRiver(
rapid: RapidsConnection,
private val haandterMelding: JsonElement.() -> Map<Key, JsonElement>?,
private val haandterMelding: JsonElement.() -> Pair<KafkaKey?, Map<Key, JsonElement>>?,
) : River.PacketListener {
init {
River(rapid).register(this)
Expand All @@ -31,6 +32,9 @@ internal class OpenRiver(
.toJson()
.parseJson()
.haandterMelding()
?.also { context.publish(null, it) }
?.also { (kafkaKey, melding) ->
// TODO gjør key non-nullable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO skal tas i kommende PR.

context.publish(kafkaKey?.key, melding)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.github.navikt.tbd_libs.rapids_and_rivers_api.RapidsConnection
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.JsonNull
import no.nav.helsearbeidsgiver.felles.Key
import no.nav.helsearbeidsgiver.felles.rapidsrivers.KafkaKey
import no.nav.helsearbeidsgiver.felles.rapidsrivers.pritopic.Pri
import no.nav.helsearbeidsgiver.utils.json.fromJsonMapFiltered
import no.nav.helsearbeidsgiver.utils.log.MdcUtils
Expand All @@ -26,6 +27,8 @@ abstract class PriObjectRiver<Melding : Any> {

protected abstract fun les(json: Map<Pri.Key, JsonElement>): Melding?

protected abstract fun Melding.bestemNoekkel(): KafkaKey?

protected abstract fun Melding.haandter(json: Map<Pri.Key, JsonElement>): Map<Key, JsonElement>?

protected abstract fun Melding.haandterFeil(
Expand All @@ -35,7 +38,7 @@ abstract class PriObjectRiver<Melding : Any> {

protected abstract fun Melding.loggfelt(): Map<String, String>

private fun lesOgHaandter(json: JsonElement): Map<Key, JsonElement>? {
private fun lesOgHaandter(json: JsonElement): Pair<KafkaKey?, Map<Key, JsonElement>>? {
val jsonMap = json.fromJsonMapFiltered(Pri.Key.serializer()).filterValues { it !is JsonNull }

val innkommende = runCatching { les(jsonMap) }.getOrNull()
Expand All @@ -54,16 +57,33 @@ abstract class PriObjectRiver<Melding : Any> {
.orEmpty()

return MdcUtils.withLogFields(*loggfelt) {
val utgaaende =
innkommende?.let {
if (innkommende == null) {
null
} else {
val key =
runCatching {
it.haandter(jsonMap)
innkommende.bestemNoekkel()
}.getOrElse { e ->
it.haandterFeil(jsonMap, e)
"Klarte ikke lage Kafka-nøkkel.".also {
logger.error(it)
sikkerLogger.error(it, e)
}
null
}

val msg =
runCatching {
innkommende.haandter(jsonMap)
}.getOrElse { e ->
innkommende.haandterFeil(jsonMap, e)
}
}

utgaaende?.takeIf { it.isNotEmpty() }
if (msg.isNullOrEmpty()) {
null
} else {
key to msg
}
}
}
}
}
Loading
Loading