From d622eccb845698c245340556b6b4965fec075305 Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Fri, 29 May 2020 11:44:04 -0700 Subject: [PATCH] feat: add extra log messages for pull queries (#4909) * extra log messages * fixed test * forgot to add --- .../io/confluent/ksql/rest/server/HeartbeatAgent.java | 8 ++++++++ .../ksql/rest/server/execution/PullQueryExecutor.java | 10 +++++++--- .../integration/PullQueryRoutingFunctionalTest.java | 2 +- .../streams/materialization/ks/KsLocator.java | 3 +++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java index 2429fd33fd48..aea5a822b02b 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java @@ -241,6 +241,8 @@ private void processHeartbeats(final long windowStart, final long windowEnd) { // 1. remove heartbeats older than window heartbeats.headMap(windowStart).clear(); copy = new TreeMap<>(heartbeats.subMap(windowStart, true, windowEnd, true)); + LOG.debug("Process heartbeats: {} of host: {}, window start: {}, window end: {}", + copy, ksqlHostInfo, windowStart, windowEnd); } // 2. count consecutive missed heartbeats and mark as alive or dead final boolean isAlive = decideStatus(ksqlHostInfo, windowStart, windowEnd, copy); @@ -285,6 +287,9 @@ private boolean decideStatus( } if (ts - config.heartbeatSendIntervalMs > prev) { missedCount = (ts - prev - 1) / config.heartbeatSendIntervalMs; + LOG.debug("Host: {} missed: {} heartbeats, current heartbeat: {}, previous heartbeat: {}," + + " send interval: {}.", + ksqlHostInfo, missedCount, ts, prev, config.heartbeatSendIntervalMs); } else { //Reset missed count when we receive heartbeat missedCount = 0; @@ -294,6 +299,9 @@ private boolean decideStatus( // Check frame from last received heartbeat to window end if (windowEnd - prev - 1 > 0) { missedCount = (windowEnd - prev - 1) / config.heartbeatSendIntervalMs; + LOG.debug("Host: {} missed: {} heartbeats, window end: {}, previous heartbeat: {}," + + " send interval: {}.", + ksqlHostInfo, missedCount, windowEnd, prev, config.heartbeatSendIntervalMs); } LOG.debug("Host: {} has {} missing heartbeats", ksqlHostInfo, missedCount); return (missedCount < config.heartbeatMissedThreshold); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index a22b9cceb261..c08efaa07ee9 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -259,7 +259,11 @@ private TableRowsEntity handlePullQuery( ); if (filteredAndOrderedNodes.isEmpty()) { - throw new MaterializationException("All nodes are dead or exceed max allowed lag."); + LOG.debug("Unable to execute pull query: {}. All nodes are dead or exceed max allowed lag.", + statement.getStatementText()); + throw new MaterializationException(String.format( + "Unable to execute pull query %s. All nodes are dead or exceed max allowed lag.", + statement.getStatementText())); } // Nodes are ordered by preference: active is first if alive then standby nodes in @@ -268,8 +272,8 @@ private TableRowsEntity handlePullQuery( try { return routeQuery(node, statement, executionContext, serviceContext, pullQueryContext); } catch (Exception t) { - LOG.error("Error routing query {} to host {} at timestamp {}", - statement.getStatementText(), node, System.currentTimeMillis(), t); + LOG.debug("Error routing query {} to host {} at timestamp {} with exception {}", + statement.getStatementText(), node, System.currentTimeMillis(), t); } } throw new MaterializationException(String.format( diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java index 2ba34d33040b..a0128524459d 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -332,7 +332,7 @@ public void shouldFilterLaggyServers() throws Exception { KsqlErrorMessage errorMessage = makePullQueryRequestWithError(clusterFormation.router.right, sql, LAG_FILTER_25); Assert.assertEquals(40001, errorMessage.getErrorCode()); - Assert.assertEquals("All nodes are dead or exceed max allowed lag.", errorMessage.getMessage()); + Assert.assertTrue(errorMessage.getMessage().contains("All nodes are dead or exceed max allowed lag.")); } private void sendLagReportingMessages( diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java index 058675715f7c..d00c13084f4d 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java @@ -84,6 +84,9 @@ public List locate( "KeyQueryMetadata not available for state store %s and key %s", stateStoreName, key)); } + LOG.debug("Handling pull query for key {} in partition {} of state store {}.", + key, metadata.getPartition(), stateStoreName); + final HostInfo activeHost = metadata.getActiveHost(); final Set standByHosts = metadata.getStandbyHosts();