Skip to content

Commit

Permalink
fix: Returns empty lag info for a dead host rather than last received…
Browse files Browse the repository at this point in the history
… lags (#4837)
  • Loading branch information
AlanConfluent authored Mar 23, 2020
1 parent a66e489 commit 3d98527
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ClusterStatusResource {
private final KsqlEngine engine;
private final HeartbeatAgent heartbeatAgent;
private final Optional<LagReportingAgent> lagReportingAgent;
private static final HostStoreLags EMPTY_HOST_STORE_LAGS =
static final HostStoreLags EMPTY_HOST_STORE_LAGS =
new HostStoreLags(ImmutableMap.of(), 0);

public ClusterStatusResource(
Expand Down Expand Up @@ -86,13 +86,19 @@ private ClusterStatusResponse getResponse() {
entry -> new HostStatusEntity(entry.getValue().isHostAlive(),
entry.getValue().getLastStatusUpdateMs(),
getActiveStandbyInformation(entry.getKey()),
getHostStoreLags(entry.getKey()))
getHostStoreLags(entry.getKey(),
entry.getValue().isHostAlive()))
));

return new ClusterStatusResponse(response);
}

private HostStoreLags getHostStoreLags(final KsqlHostInfo ksqlHostInfo) {
private HostStoreLags getHostStoreLags(final KsqlHostInfo ksqlHostInfo, final boolean isAlive) {
// The lag reporting agent currently caches lag info without regard to the current alive status,
// so don't show lags for hosts we consider down.
if (!isAlive) {
return EMPTY_HOST_STORE_LAGS;
}
return lagReportingAgent
.flatMap(agent -> agent.getLagPerHost(ksqlHostInfo))
.orElse(EMPTY_HOST_STORE_LAGS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,29 @@

package io.confluent.ksql.rest.server.resources;

import static io.confluent.ksql.rest.server.resources.ClusterStatusResource.EMPTY_HOST_STORE_LAGS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.rest.entity.ClusterStatusResponse;
import io.confluent.ksql.rest.entity.HostStoreLags;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.LagInfoEntity;
import io.confluent.ksql.rest.entity.QueryStateStoreId;
import io.confluent.ksql.rest.entity.StateStoreLags;
import io.confluent.ksql.rest.server.HeartbeatAgent;
import io.confluent.ksql.rest.server.LagReportingAgent;
import io.confluent.ksql.util.HostStatus;
import io.confluent.ksql.util.KsqlHostInfo;
import java.util.Map;
import java.util.Optional;
import javax.ws.rs.core.Response;
import org.junit.Before;
Expand All @@ -33,9 +49,26 @@
@RunWith(MockitoJUnitRunner.class)
public class ClusterStatusResourceTest {

private static final HostStoreLags HOST_STORE_LAGS = new HostStoreLags(
ImmutableMap.of(QueryStateStoreId.of("a", "b"), new StateStoreLags(
ImmutableMap.of(1, new LagInfoEntity(2, 10, 8)))
), 20L);

private static final KsqlHostInfo HOST1 = new KsqlHostInfo("host1", 8088);
private static final KsqlHostInfo HOST2 = new KsqlHostInfo("host2", 8089);
private static final KsqlHostInfoEntity HOST1_ENTITY = new KsqlHostInfoEntity("host1", 8088);
private static final KsqlHostInfoEntity HOST2_ENTITY = new KsqlHostInfoEntity("host2", 8089);

private static final Map<KsqlHostInfo, HostStatus> HOSTS = ImmutableMap.of(
HOST1, new HostStatus(true, 30L),
HOST2, new HostStatus(false, 30L)
);

@Mock
private HeartbeatAgent heartbeatAgent;
@Mock
private LagReportingAgent lagReportingAgent;
@Mock
private KsqlEngine ksqlEngine;

private ClusterStatusResource clusterStatusResource;
Expand All @@ -45,16 +78,44 @@ public void setUp() {
clusterStatusResource = new ClusterStatusResource(
ksqlEngine,
heartbeatAgent,
Optional.empty());
Optional.of(lagReportingAgent));
}

@Test
public void shouldReturnClusterStatus() {
// Given:
when(heartbeatAgent.getHostsStatus()).thenReturn(HOSTS);
when(lagReportingAgent.getLagPerHost(any())).thenReturn(Optional.empty());

// When:
final Response response = clusterStatusResource.checkClusterStatus();

// Then:
assertThat(response.getStatus(), is(200));
assertThat(response.getEntity(), instanceOf(ClusterStatusResponse.class));

ClusterStatusResponse clusterStatusResponse = (ClusterStatusResponse) response.getEntity();
assertTrue(clusterStatusResponse.getClusterStatus().get(HOST1_ENTITY).getHostAlive());
assertFalse(clusterStatusResponse.getClusterStatus().get(HOST2_ENTITY).getHostAlive());
}

@Test
public void shouldReturnEmptyLagsForDeadHost() {
// Given:
when(heartbeatAgent.getHostsStatus()).thenReturn(HOSTS);
when(lagReportingAgent.getLagPerHost(any())).thenReturn(Optional.of(HOST_STORE_LAGS));

// When:
final Response response = clusterStatusResource.checkClusterStatus();

// Then:
assertThat(response.getStatus(), is(200));
assertThat(response.getEntity(), instanceOf(ClusterStatusResponse.class));

ClusterStatusResponse clusterStatusResponse = (ClusterStatusResponse) response.getEntity();
assertEquals(HOST_STORE_LAGS,
clusterStatusResponse.getClusterStatus().get(HOST1_ENTITY).getHostStoreLags());
assertEquals(EMPTY_HOST_STORE_LAGS,
clusterStatusResponse.getClusterStatus().get(HOST2_ENTITY).getHostStoreLags());
}
}

0 comments on commit 3d98527

Please sign in to comment.