Skip to content

Commit

Permalink
Add unittest for venice helix admin
Browse files Browse the repository at this point in the history
  • Loading branch information
Minh Nguyen committed Feb 7, 2025
1 parent b845b20 commit dfade0e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7501,10 +7501,10 @@ public Optional<AdminCommandExecutionTracker> getAdminCommandExecutionTracker(St
* @return cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id.
*/
public Map<String, Long> getAdminTopicMetadata(String clusterName, Optional<String> storeName) {
Map<String, Long> metadata = adminConsumerServices.get(clusterName).getAdminTopicMetadata(clusterName);
Map<String, Long> metadata = getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName);

if (storeName.isPresent()) {
Long executionId = executionIdAccessor.getLastSucceededExecutionIdMap(clusterName).get(storeName.get());
Long executionId = getExecutionIdAccessor().getLastSucceededExecutionIdMap(clusterName).get(storeName.get());
Long adminOperationProtocolVersion = AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata);
return executionId == null
? Collections.emptyMap()
Expand All @@ -7524,12 +7524,12 @@ public void updateAdminTopicMetadata(
Optional<Long> offset,
Optional<Long> upstreamOffset) {
if (storeName.isPresent()) {
executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId);
getExecutionIdAccessor().updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId);
} else {
if (!offset.isPresent() || !upstreamOffset.isPresent()) {
throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata");
}
adminConsumerServices.get(clusterName)
getAdminConsumerService(clusterName)
.updateAdminTopicMetadata(clusterName, executionId, offset.get(), upstreamOffset.get());
}
}
Expand All @@ -7538,7 +7538,7 @@ public void updateAdminTopicMetadata(
* Update the version of admin operation protocol in admin topic metadata
*/
public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) {
adminConsumerServices.get(clusterName)
getAdminConsumerService(clusterName)
.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion);
}

Expand Down Expand Up @@ -7681,6 +7681,10 @@ public VeniceProperties getPubSubSSLProperties(String pubSubBrokerAddress) {
return this.getPubSubSSLPropertiesFromControllerConfig(pubSubBrokerAddress);
}

public AdminConsumerService getAdminConsumerService(String clusterName) {
return adminConsumerServices.get(clusterName);
}

private void startMonitorOfflinePush(
String clusterName,
String kafkaTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,6 @@ private void persistAdminTopicMetadata() {
// Skip since there are no new admin messages processed.
return;
}

Map<String, Long> metadata = remoteConsumptionEnabled
? AdminTopicMetadataAccessor
.generateMetadataMap(localOffsetCheckpointAtStartTime, lastOffset, lastDelegatedExecutionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.expectThrows;

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService;
import com.linkedin.venice.controller.stats.DisabledPartitionStats;
import com.linkedin.venice.controller.stats.VeniceAdminStats;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -955,4 +956,87 @@ public void testCleanupWhenPushCompleteWithViewConfigs() {
assertEquals(pubSubTopics.get(i).getName(), expectedUpdateCompactionTopics.get(i));
}
}

@Test
public void testGetAdminTopicMetadata() {
String clusterName = "test-cluster";
String storeName = "test-store";
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);
doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.of(storeName));
doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.empty());

// Case 1: Not store name provided
Map<String, Long> remoteMetadata = AdminTopicMetadataAccessor.generateMetadataMap(10, -1, 1, 1);
AdminConsumerService adminConsumerService = mock(AdminConsumerService.class);
when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService);
when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata);

Map<String, Long> metadata = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.empty());
assertEquals(metadata, remoteMetadata);

// Case 2: Store name is provided
ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class);
Map<String, Long> executionIdMap = new HashMap<>();
executionIdMap.put(storeName, 10L);
when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor);
when(executionIdAccessor.getLastSucceededExecutionIdMap(anyString())).thenReturn(executionIdMap);
when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor);
when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata);

Map<String, Long> expectedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, 10, 1);
Map<String, Long> metadataForStore = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.of(storeName));
assertEquals(metadataForStore, expectedMetadata);
}

@Test
public void testUpdateAdminTopicMetadata() {
String clusterName = "test-cluster";
String storeName = "test-store";
long executionId = 10L;
Long offset = 10L;
Long upstreamOffset = 1L;
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);
doCallRealMethod().when(veniceHelixAdmin)
.updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty());
doCallRealMethod().when(veniceHelixAdmin)
.updateAdminTopicMetadata(
clusterName,
executionId,
Optional.empty(),
Optional.of(offset),
Optional.of(upstreamOffset));

// Case 1: Store name is provided
ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class);
when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor);

veniceHelixAdmin
.updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty());
verify(executionIdAccessor, times(1)).updateLastSucceededExecutionIdMap(clusterName, storeName, executionId);

// Case 2: Store name is not provided
AdminConsumerService adminConsumerService = mock(AdminConsumerService.class);
when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService);
veniceHelixAdmin.updateAdminTopicMetadata(
clusterName,
executionId,
Optional.empty(),
Optional.of(offset),
Optional.of(upstreamOffset));
verify(executionIdAccessor, never()).updateLastSucceededExecutionId(anyString(), anyLong());
verify(adminConsumerService, times(1)).updateAdminTopicMetadata(clusterName, executionId, offset, upstreamOffset);
}

@Test
public void testUpdateAdminOperationProtocolVersion() {
String clusterName = "test-cluster";
Long adminProtocolVersion = 10L;
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);
doCallRealMethod().when(veniceHelixAdmin).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion);
AdminConsumerService adminConsumerService = mock(AdminConsumerService.class);
when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService);

veniceHelixAdmin.updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion);
verify(adminConsumerService, times(1)).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion);
}
}

0 comments on commit dfade0e

Please sign in to comment.