Skip to content

Commit

Permalink
[TH2-5141] Publish warning event with details about internal exception
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Jan 15, 2024
1 parent 54318d8 commit f5adbdc
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 9 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2-codec-oracle-log-miner (0.0.1)
# th2-codec-oracle-log-miner (0.0.2)

## Description

Expand Down Expand Up @@ -135,6 +135,9 @@ spec:
## Release notes
### 0.0.2
+ Publish warning event with details about internal exception.
### 0.0.1
+ Parse `INSERT` / `UPDATE` / `DELETE` SQL queries from the `SQL_REDO` field.
+ Copy specified fields from source to output parsed message.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release_version=0.0.1
release_version=0.0.2
app_main_class=com.exactpro.th2.codec.MainKt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ class LogMinerTransformer(private val config: LogMinerConfiguration) : IPipeline
else -> error("Unsupported operation kind '$operation'")
}
}.getOrElse { e ->
LOGGER.error(e) { "Message transformation failure, id: ${message.id.logId}" }
val text = "Message transformation failure, id: ${message.id.logId}"
LOGGER.error(e) { text }

context.warning("$text ${e.message?.lines()?.first()}")
message.toBuilderWithoutBody().apply {
setType(ERROR_TYPE_MESSAGE)
bodyBuilder().put(ERROR_CONTENT_FIELD, e.message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,14 +20,19 @@ import com.exactpro.th2.codec.api.IReportingContext
import com.exactpro.th2.codec.oracle.logminer.LogMinerTransformer.Companion.toReadable
import com.exactpro.th2.codec.oracle.logminer.cfg.LogMinerConfiguration
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage
import com.exactpro.th2.common.utils.message.transport.logId
import com.exactpro.th2.common.utils.message.transport.toGroup
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import net.sf.jsqlparser.parser.CCJSqlParserUtil
import net.sf.jsqlparser.statement.insert.Insert
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.verifyNoMoreInteractions
import strikt.api.expectThat
import strikt.assertions.filterIsInstance
Expand All @@ -39,7 +44,12 @@ import strikt.assertions.withElementAt
import java.time.Instant

class LogMinerTransformerTest {
private val reportingContext: IReportingContext = mock { }
private val reportingContext: IReportingContext = mock { }

@AfterEach
fun afterEach() {
verifyNoMoreInteractions(reportingContext)
}

@Test
fun decodesDataUsingDefaultHeader() {
Expand Down Expand Up @@ -108,16 +118,103 @@ class LogMinerTransformerTest {
}
}
}
verifyNoMoreInteractions(reportingContext)

verify(reportingContext).warning(eq("""Message transformation failure, id: ${sourceMessages[0].id.logId} Unsupported operation kind 'DDL'"""))
}

@Test
fun `decode broken row message test`() {
val config = LogMinerConfiguration()
val codec = LogMinerTransformer(config)
val source: ParsedMessage = ParsedMessage.builder()
.setId(
MessageId.builder()
.setSessionAlias("test-session-alias")
.setDirection(Direction.OUTGOING)
.setTimestamp(Instant.now())
.setSequence(1L)
.build()
)
.setBody(
mapOf(
"OPERATION" to "INSERT",
"SQL_REDO" to "broken query",
"ROW_ID" to 1,
"TIMESTAMP" to Instant.now().toString(),
"TABLE_NAME" to "test-table",
)
)
.setType("test-message")
.setProtocol("")
.build()

expectThat(codec.decode(source.toGroup(), reportingContext).messages).hasSize(1)
.filterIsInstance<ParsedMessage>()
.single().and {
get { id }.isEqualTo(source.id)
get { eventId }.isEqualTo(source.eventId)
get { type }.isEqualTo("th2-codec-error")
get { protocol }.isEqualTo("[csv,oracle-log-miner]")
get { body }.isEqualTo(
mapOf(
"content" to """
net.sf.jsqlparser.parser.ParseException: Encountered unexpected token: "broken" <S_IDENTIFIER>
at line 1, column 1.
Was expecting one of:
"("
"ALTER"
"ANALYZE"
"BEGIN"
"CALL"
"COMMENT"
"COMMIT"
"CREATE"
"DECLARE"
"DELETE"
"DESCRIBE"
"DROP"
"EXEC"
"EXECUTE"
"EXPLAIN"
"GRANT"
"IF"
"INSERT"
"MERGE"
"PURGE"
"RENAME"
"REPLACE"
"RESET"
"ROLLBACK"
"SAVEPOINT"
"SET"
"SHOW"
"TRUNCATE"
"UPDATE"
"UPSERT"
"USE"
"VALUE"
"VALUES"
"WITH"
<K_SELECT>
""".trimIndent()
) + source.body.filterKeys { config.saveColumns.contains(it) }
)
}
verify(reportingContext).warning(eq("""Message transformation failure, id: ${source.id.logId} net.sf.jsqlparser.parser.ParseException: Encountered unexpected token: "broken" <S_IDENTIFIER>"""))
}

@Test
fun `toReadable test`() {
val onWarning: (String) -> Unit = mock { }
val insert = CCJSqlParserUtil.parse("""
val onWarning: (String) -> Unit = mock { }
val insert = CCJSqlParserUtil.parse(
"""
insert into "OWNER"."table"("NAME","TIMESTAMP","DATE","NUMBER","NULL")
values ('An',TO_TIMESTAMP('12-DEC-23 02.55.01 PM'), TO_DATE('12-DEC-23', 'DD-MON-RR'), 8, NULL);
""".trimIndent()) as Insert
""".trimIndent()
) as Insert
val result: List<Any?> = insert.select.values.expressions.map { it.toReadable(onWarning) }
expectThat(result) {
hasSize(5)
Expand Down

0 comments on commit f5adbdc

Please sign in to comment.