From 3dce59ffed507d7bd3eae998d252acef3d2164c1 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Sat, 4 Jan 2025 10:12:13 +0700 Subject: [PATCH 01/22] Adding admin protocol version into the admintopicmetadata --- .../java/com/linkedin/venice/AdminTool.java | 13 +++ .../main/java/com/linkedin/venice/Arg.java | 5 +- .../java/com/linkedin/venice/Command.java | 5 ++ .../com/linkedin/venice/TestAdminTool.java | 12 +++ .../AdminTopicMetadataResponse.java | 13 +++ .../controllerapi/ControllerApiConstants.java | 1 + .../controllerapi/ControllerClient.java | 9 ++ .../venice/controllerapi/ControllerRoute.java | 5 ++ .../controller/TestAdminToolEndToEnd.java | 44 ++++++++++ .../AdminConsumptionTaskIntegrationTest.java | 47 +++++++++++ .../com/linkedin/venice/controller/Admin.java | 2 + .../AdminTopicMetadataAccessor.java | 41 +++++++-- .../venice/controller/VeniceHelixAdmin.java | 25 ++++-- .../controller/VeniceParentHelixAdmin.java | 12 ++- .../ZkAdminTopicMetadataAccessor.java | 13 ++- .../kafka/consumer/AdminConsumerService.java | 20 ++++- .../controller/server/AdminSparkServer.java | 7 +- .../server/AdminTopicMetadataRoutes.java | 45 ++++++++++ .../server/ClusterAdminOpsRequestHandler.java | 2 + .../controller/TestVeniceHelixAdmin.java | 84 +++++++++++++++++++ 20 files changed, 388 insertions(+), 17 deletions(-) diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index aa3ce1857d2..e1bca6e6eb0 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -597,6 +597,9 @@ public static void main(String[] args) throws Exception { case CLUSTER_BATCH_TASK: clusterBatchTask(cmd); break; + case UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION: + updateAdminOperationProtocolVersion(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -3270,6 +3273,16 @@ private static void dumpHostHeartbeat(CommandLine cmd) throws Exception { } } + private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws Exception { + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + String protocolVersionInString = + getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION); + long protocolVersion = + Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name()); + ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion); + printObject(response); + } + private static void migrateVeniceZKPaths(CommandLine cmd) throws Exception { Set clusterNames = Utils.parseCommaSeparatedStringToSet(getRequiredArgument(cmd, Arg.CLUSTER_LIST)); String srcZKUrl = getRequiredArgument(cmd, Arg.SRC_ZOOKEEPER_URL); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 05603f884c4..4cda268e058 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -297,7 +297,10 @@ public enum Arg { ), DAVINCI_HEARTBEAT_REPORTED( "dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats" - ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"); + ), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"), + ADMIN_OPERATION_PROTOCOL_VERSION( + "admin-operation-protocol-version", "aopv", true, "Admin operation protocol version" + ); private final String argName; private final String first; diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index abdf6089278..fb77d377ff0 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.Arg.ACCESS_CONTROL; import static com.linkedin.venice.Arg.ACL_PERMS; import static com.linkedin.venice.Arg.ACTIVE_ACTIVE_REPLICATION_ENABLED; +import static com.linkedin.venice.Arg.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.Arg.ALLOW_STORE_MIGRATION; import static com.linkedin.venice.Arg.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED; import static com.linkedin.venice.Arg.BACKUP_FOLDER; @@ -582,6 +583,10 @@ public enum Command { "dump-host-heartbeat", "Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.", new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED } + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "update-admin-operation-protocol-version", "Update the admin operation protocol version", + new Arg[] { URL, CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION } ); private final String commandName; diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index 17e72096942..58bf4f692bd 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -412,4 +412,16 @@ public void testAdminConfigureView() throws ParseException, IOException { CommandLine finalCommandLine = commandLine; Assert.assertThrows(() -> AdminTool.getConfigureStoreViewQueryParams(finalCommandLine)); } + + @Test + public void testUpdateAdminOperationProtocolVersion() throws ParseException, IOException { + String[] args = { "--update-admin-operation-protocol-version", "--url", "http://localhost:7036", "--cluster", + "test-cluster", "--admin-operation-protocol-version", "1" }; + + try { + AdminTool.main(args); + } catch (Exception e) { + Assert.fail("AdminTool should allow admin topic metadata to be updated admin operation version", e); + } + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java index 34264f1afd4..e4b7ab49d82 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java @@ -18,6 +18,11 @@ public class AdminTopicMetadataResponse extends ControllerResponse { */ private long upstreamOffset = -1; + /** + * The current admin operation protocol version, which is cluster-level and be SOT for serialize/deserialize admin operation message + */ + private long adminOperationProtocolVersion = -1; + public long getExecutionId() { return executionId; } @@ -41,4 +46,12 @@ public void setOffset(long offset) { public void setUpstreamOffset(long upstreamOffset) { this.upstreamOffset = upstreamOffset; } + + public void setAdminOperationProtocolVersion(long adminOperationProtocolVersion) { + this.adminOperationProtocolVersion = adminOperationProtocolVersion; + } + + public Long getAdminOperationProtocolVersion() { + return adminOperationProtocolVersion; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index 7dddb841afe..498efdb7a96 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -225,6 +225,7 @@ public class ControllerApiConstants { public static final String KAFKA_TOPIC_RETENTION_IN_MS = "kafka.topic.retention.in.ms"; public static final String KAFKA_TOPIC_MIN_IN_SYNC_REPLICA = "kafka.topic.min.in.sync.replica"; public static final String UPSTREAM_OFFSET = "upstream_offset"; + public static final String ADMIN_OPERATION_PROTOCOL_VERSION = "admin_operation_protocol_version"; public static final String PERSONA_NAME = "persona_name"; public static final String PERSONA_OWNERS = "persona_owners"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index 11452d95fe3..bc49f285c9c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controllerapi; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AMPLIFICATION_FACTOR; import static com.linkedin.venice.controllerapi.ControllerApiConstants.BATCH_JOB_HEARTBEAT_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; @@ -1362,6 +1363,14 @@ public ControllerResponse updateAdminTopicMetadata( return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class); } + public ControllerResponse updateAdminOperationProtocolVersion( + String clusterName, + Long adminOperationProtocolVersion) { + QueryParams params = + newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion); + return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class); + } + public ControllerResponse deleteKafkaTopic(String topicName) { QueryParams params = newParams().add(TOPIC, topicName); return request(ControllerRoute.DELETE_KAFKA_TOPIC, params, ControllerResponse.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java index 8ddc43c8d5b..9e203916bba 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerRoute.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_CONTROLLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.ACCESS_PERMISSION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AMPLIFICATION_FACTOR; import static com.linkedin.venice.controllerapi.ControllerApiConstants.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.BACKUP_STRATEGY; @@ -284,6 +285,10 @@ public enum ControllerRoute { UPDATE_ADMIN_TOPIC_METADATA( "/update_admin_topic_metadata", HttpMethod.POST, Arrays.asList(CLUSTER, EXECUTION_ID), NAME, OFFSET, UPSTREAM_OFFSET + ), + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION( + "/update_admin_operation_protocol_version", HttpMethod.POST, + Arrays.asList(CLUSTER, ADMIN_OPERATION_PROTOCOL_VERSION) ), DELETE_KAFKA_TOPIC("/delete_kafka_topic", HttpMethod.POST, Arrays.asList(CLUSTER, TOPIC)), CREATE_STORAGE_PERSONA( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java index 31e8e18b315..0c92c91bae3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java @@ -7,6 +7,7 @@ import com.linkedin.venice.AdminTool; import com.linkedin.venice.Arg; +import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiStoreResponse; import com.linkedin.venice.controllerapi.NewStoreResponse; @@ -19,7 +20,10 @@ import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions; import com.linkedin.venice.integration.utils.VeniceServerWrapper; +import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -27,6 +31,7 @@ import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.helix.zookeeper.impl.client.ZkClient; @@ -186,4 +191,43 @@ public void testNodeReplicasReadinessCommand() throws Exception { clusterName, "--storage-node", Utils.getHelixNodeIdentifier(Utils.getHostName(), server.getPort()) }; AdminTool.main(nodeReplicasReadinessArgs); } + + @Test(timeOut = 4 * TEST_TIMEOUT) + public void testUpdateAdminOperationVersion() throws Exception { + Long currentVersion = -1L; + Long newVersion = 80L; + try (VeniceTwoLayerMultiRegionMultiClusterWrapper venice = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) + .numberOfClusters(1) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .build());) { + String clusterName = venice.getClusterNames()[0]; + + // Get the parent con†roller + VeniceControllerWrapper controller = venice.getParentControllers().get(0); + ControllerClient controllerClient = new ControllerClient(clusterName, controller.getControllerUrl()); + + // Setup the original metadata + AdminTopicMetadataResponse originalMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); + Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), currentVersion); + + // Update the admin operation version to newVersion - 80 + String[] updateAdminOperationVersionArgs = + { "--update-admin-operation-protocol-version", "--url", controller.getControllerUrl(), "--cluster", + clusterName, "--admin-operation-protocol-version", newVersion.toString() }; + + AdminTool.main(updateAdminOperationVersionArgs); + + // Verify the admin operation metadata version is updated + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + AdminTopicMetadataResponse updatedMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); + Assert.assertEquals(updatedMetadata.getAdminOperationProtocolVersion(), newVersion); + }); + } + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java index a95ebd8bec3..1e55abc3aa9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.java @@ -32,6 +32,8 @@ import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.io.IOException; +import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -196,6 +198,51 @@ public void testParallelAdminExecutionTasks() throws IOException, InterruptedExc } } + @Test(timeOut = 2 * TIMEOUT) + public void testUpdateAdminOperationVersion() { + Long currentVersion = -1L; + Long newVersion = 18L; + try (VeniceTwoLayerMultiRegionMultiClusterWrapper venice = + ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( + new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) + .numberOfClusters(1) + .numberOfParentControllers(1) + .numberOfChildControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .build())) { + + String clusterName = venice.getClusterNames()[0]; + + // Get the child controller + VeniceControllerWrapper controller = venice.getChildRegions().get(0).getLeaderController(clusterName); + Admin admin = controller.getVeniceAdmin(); + + AdminConsumerService adminConsumerService = controller.getAdminConsumerServiceByCluster(clusterName); + + // Setup the original metadata + adminConsumerService.updateAdminOperationProtocolVersion(clusterName, currentVersion); + + // Verify that the original metadata is correct + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map adminTopicMetadata = admin.getAdminTopicMetadata(clusterName, Optional.empty()); + Assert.assertTrue(adminTopicMetadata.containsKey("adminOperationProtocolVersion")); + Assert.assertEquals(adminTopicMetadata.get("adminOperationProtocolVersion"), currentVersion); + }); + + // Update the admin operation version + admin.updateAdminOperationProtocolVersion(clusterName, newVersion); + + // Verify the admin operation metadata version is updated + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + Map adminTopicMetadata = admin.getAdminTopicMetadata(clusterName, Optional.empty()); + Assert.assertTrue(adminTopicMetadata.containsKey("adminOperationProtocolVersion")); + Assert.assertEquals(adminTopicMetadata.get("adminOperationProtocolVersion"), newVersion); + }); + } + } + private Runnable getRunnable( VeniceTwoLayerMultiRegionMultiClusterWrapper venice, String storeName, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 11298442e95..053c448bf97 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -953,6 +953,8 @@ void updateAdminTopicMetadata( Optional offset, Optional upstreamOffset); + void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion); + void createStoragePersona( String clusterName, String name, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java index 66c7b75daee..3f9a3386b43 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java @@ -3,6 +3,7 @@ import com.linkedin.venice.utils.Pair; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public abstract class AdminTopicMetadataAccessor { @@ -14,20 +15,46 @@ public abstract class AdminTopicMetadataAccessor { */ private static final String UPSTREAM_OFFSET_KEY = "upstreamOffset"; private static final String EXECUTION_ID_KEY = "executionId"; + private static final String ADMIN_OPERATION_PROTOCOL_VERSION_KEY = "adminOperationProtocolVersion"; private static final long UNDEFINED_VALUE = -1; /** * @return a map with {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}, {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}, - * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY} specified to input values. + * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY} specified to input values. */ - public static Map generateMetadataMap(long localOffset, long upstreamOffset, long executionId) { + public static Map generateMetadataMap( + Optional localOffset, + Optional upstreamOffset, + Optional executionId, + Optional adminOperationProtocolVersion) { Map metadata = new HashMap<>(); - metadata.put(OFFSET_KEY, localOffset); - metadata.put(UPSTREAM_OFFSET_KEY, upstreamOffset); - metadata.put(EXECUTION_ID_KEY, executionId); + localOffset.ifPresent(offset -> metadata.put(OFFSET_KEY, offset)); + upstreamOffset.ifPresent(offset -> metadata.put(UPSTREAM_OFFSET_KEY, offset)); + executionId.ifPresent(id -> metadata.put(EXECUTION_ID_KEY, id)); + adminOperationProtocolVersion.ifPresent(version -> metadata.put(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, version)); return metadata; } + public static Map generateMetadataMap(long localOffset, long upstreamOffset, long executionId) { + return generateMetadataMap( + Optional.of(localOffset), + Optional.of(upstreamOffset), + Optional.of(executionId), + Optional.empty()); + } + + public static Map generateMetadataMap( + long localOffset, + long upstreamOffset, + long executionId, + long adminOperationProtocolVersion) { + return generateMetadataMap( + Optional.of(localOffset), + Optional.of(upstreamOffset), + Optional.of(executionId), + Optional.of(adminOperationProtocolVersion)); + } + /** * @return a pair of values to which the specified keys are mapped to {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY} * and {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}. @@ -45,6 +72,10 @@ public static long getExecutionId(Map metadata) { return metadata.getOrDefault(EXECUTION_ID_KEY, UNDEFINED_VALUE); } + public static long getAdminOperationProtocolVersion(Map metadata) { + return metadata.getOrDefault(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, UNDEFINED_VALUE); + } + /** * Update all relevant metadata for a given cluster in a single transaction. * @param clusterName of the cluster at interest. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 45e6cb50510..0936368b69e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7501,13 +7501,16 @@ public Optional getAdminCommandExecutionTracker(St * @return cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id. */ public Map getAdminTopicMetadata(String clusterName, Optional storeName) { + Map metadata = getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName); + if (storeName.isPresent()) { - Long executionId = executionIdAccessor.getLastSucceededExecutionIdMap(clusterName).get(storeName.get()); + Long executionId = getExecutionIdAccessor().getLastSucceededExecutionIdMap(clusterName).get(storeName.get()); + Long adminOperationProtocolVersion = AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata); return executionId == null ? Collections.emptyMap() - : AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, executionId); + : AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, executionId, adminOperationProtocolVersion); } - return adminConsumerServices.get(clusterName).getAdminTopicMetadata(clusterName); + return metadata; } /** @@ -7521,16 +7524,24 @@ public void updateAdminTopicMetadata( Optional offset, Optional upstreamOffset) { if (storeName.isPresent()) { - executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId); + getExecutionIdAccessor().updateLastSucceededExecutionIdMap(clusterName, storeName.get(), executionId); } else { if (!offset.isPresent() || !upstreamOffset.isPresent()) { throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata"); } - adminConsumerServices.get(clusterName) + getAdminConsumerService(clusterName) .updateAdminTopicMetadata(clusterName, executionId, offset.get(), upstreamOffset.get()); } } + /** + * Update the version of admin operation protocol in admin topic metadata + */ + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { + getAdminConsumerService(clusterName) + .updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } + /** * @see Admin#getRoutersClusterConfig(String) */ @@ -7670,6 +7681,10 @@ public VeniceProperties getPubSubSSLProperties(String pubSubBrokerAddress) { return this.getPubSubSSLPropertiesFromControllerConfig(pubSubBrokerAddress); } + public AdminConsumerService getAdminConsumerService(String clusterName) { + return adminConsumerServices.get(clusterName); + } + private void startMonitorOfflinePush( String clusterName, String kafkaTopic, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 2e680a42fb8..0f895efcec8 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -4268,11 +4268,11 @@ public Optional getAdminCommandExecutionTracker(St } /** - * Unsupported operation in the parent controller. + * Get AdminTopicMetadata from parent controller */ @Override public Map getAdminTopicMetadata(String clusterName, Optional storeName) { - throw new VeniceUnsupportedOperationException("getAdminTopicMetadata"); + return getVeniceHelixAdmin().getAdminTopicMetadata(clusterName, storeName); } /** @@ -4288,6 +4288,14 @@ public void updateAdminTopicMetadata( throw new VeniceUnsupportedOperationException("updateAdminTopicMetadata"); } + /** + * Update AdminOperationProtocolVersion in metadata + */ + @Override + public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) { + getVeniceHelixAdmin().updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } + /** * Unsupported operation in the parent controller. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index 0f892f654c5..72380910ffb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -35,11 +35,20 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad zkMapAccessor = new ZkBaseDataAccessor<>(zkClient); } + public synchronized void partialUpdateMetadata(String clusterName, Map metadata) { + Map currentMetadata = getMetadata(clusterName); + + // Update only the keys present in the metadata map. + metadata.forEach(currentMetadata::put); + + updateMetadata(clusterName, currentMetadata); + } + /** * @see AdminTopicMetadataAccessor#updateMetadata(String, Map) */ @Override - public void updateMetadata(String clusterName, Map metadata) { + public synchronized void updateMetadata(String clusterName, Map metadata) { String path = getAdminTopicMetadataNodePath(clusterName); HelixUtils.update(zkMapAccessor, path, metadata, ZK_UPDATE_RETRY); LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata); @@ -49,7 +58,7 @@ public void updateMetadata(String clusterName, Map metadata) { * @see AdminTopicMetadataAccessor#getMetadata(String) */ @Override - public Map getMetadata(String clusterName) { + public synchronized Map getMetadata(String clusterName) { int retry = ZK_UPDATE_RETRY; String path = getAdminTopicMetadataNodePath(clusterName); while (retry > 0) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java index abceff1af2e..fcf779f5ad0 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java @@ -188,7 +188,7 @@ public Map getAdminTopicMetadata(String clusterName) { public void updateAdminTopicMetadata(String clusterName, long executionId, long offset, long upstreamOffset) { if (clusterName.equals(config.getClusterName())) { Map metadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); - adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata); } else { throw new VeniceException( "This AdminConsumptionService is for cluster: " + config.getClusterName() @@ -196,6 +196,24 @@ public void updateAdminTopicMetadata(String clusterName, long executionId, long } } + /** + * Update the admin operation protocol version for the given cluster. + */ + public void updateAdminOperationProtocolVersion(String clusterName, long adminOperationProtocolVersion) { + if (clusterName.equals(config.getClusterName())) { + Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(adminOperationProtocolVersion)); + adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata); + } else { + throw new VeniceException( + "This AdminConsumptionService is for cluster: " + config.getClusterName() + + ". Cannot update the version for cluster: " + clusterName); + } + } + private PubSubConsumerAdapter createKafkaConsumer(String clusterName) { String pubSubServerUrl = remoteConsumptionEnabled ? remoteKafkaServerUrl.get() : localKafkaServerUrl; Properties kafkaConsumerProperties = admin.getPubSubSSLProperties(pubSubServerUrl).toProperties(); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java index eec4fc44207..8b3d8d58ffa 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminSparkServer.java @@ -95,6 +95,7 @@ import static com.linkedin.venice.controllerapi.ControllerRoute.STORE; import static com.linkedin.venice.controllerapi.ControllerRoute.STORE_MIGRATION_ALLOWED; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ACL; +import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_CLUSTER_CONFIG; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_KAFKA_TOPIC_LOG_COMPACTION; @@ -636,7 +637,11 @@ public boolean startInner() throws Exception { admin, adminTopicMetadataRoutes .updateAdminTopicMetadata(admin, requestHandler.getClusterAdminOpsRequestHandler()))); - + httpService.post( + UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(), + new VeniceParentControllerRegionStateHandler( + admin, + adminTopicMetadataRoutes.updateAdminOperationProtocolVersion(admin))); httpService.post( DELETE_KAFKA_TOPIC.getPath(), new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteKafkaTopic(admin))); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index d854a9d5fa8..e183ff9b6cb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -1,11 +1,13 @@ package com.linkedin.venice.controller.server; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.EXECUTION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OFFSET; import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPSTREAM_OFFSET; import static com.linkedin.venice.controllerapi.ControllerRoute.GET_ADMIN_TOPIC_METADATA; +import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA; import com.linkedin.venice.HttpConstants; @@ -51,6 +53,7 @@ public Route getAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler re responseObject.setCluster(clusterName); storeName.ifPresent(responseObject::setName); responseObject.setExecutionId(adminTopicMetadata.getExecutionId()); + responseObject.setAdminOperationProtocolVersion(adminTopicMetadata.getAdminOperationProtocolVersion()); if (!storeName.isPresent()) { responseObject.setOffset(adminTopicMetadata.getOffset()); responseObject.setUpstreamOffset(adminTopicMetadata.getUpstreamOffset()); @@ -94,6 +97,48 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder).build()); responseObject.setCluster(internalResponse.getClusterName()); responseObject.setName(internalResponse.hasStoreName() ? internalResponse.getStoreName() : null); + if (storeName.isPresent()) { + if (offset.isPresent() || upstreamOffset.isPresent()) { + throw new VeniceException("There is no store-level offsets to be updated"); + } + } else { + if (!offset.isPresent() || !upstreamOffset.isPresent()) { + throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata"); + } + } + + responseObject.setCluster(clusterName); + storeName.ifPresent(responseObject::setName); + + admin.updateAdminTopicMetadata(clusterName, executionId, storeName, offset, upstreamOffset); + } catch (Throwable e) { + responseObject.setError(e); + AdminSparkServer.handleError(new VeniceException(e), request, response); + } + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + }; + } + + public Route updateAdminOperationProtocolVersion(Admin admin) { + return (request, response) -> { + ControllerResponse responseObject = new ControllerResponse(); + response.type(HttpConstants.JSON); + try { + if (!isAllowListUser(request)) { + response.status(HttpStatus.SC_FORBIDDEN); + responseObject.setError("Only admin users are allowed to run " + request.url()); + responseObject.setErrorType(ErrorType.BAD_REQUEST); + return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject); + } + + AdminSparkServer.validateParams(request, UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getParams(), admin); + String clusterName = request.queryParams(CLUSTER); + Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)); + + responseObject.setCluster(clusterName); + + admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); + } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index 8a6d0c425b9..f9fb1556a91 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -105,6 +105,8 @@ public AdminTopicMetadataGrpcResponse getAdminTopicMetadata(AdminTopicMetadataGr } else { adminMetadataBuilder.setStoreName(storeName); } + adminMetadataBuilder + .setAdminOperationProtocolVersion(AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata)); return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build(); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 1ee4f23849a..855310108d8 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -25,6 +25,7 @@ import static org.testng.Assert.expectThrows; import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.controller.kafka.consumer.AdminConsumerService; import com.linkedin.venice.controller.stats.DisabledPartitionStats; import com.linkedin.venice.controller.stats.VeniceAdminStats; import com.linkedin.venice.exceptions.VeniceException; @@ -955,4 +956,87 @@ public void testCleanupWhenPushCompleteWithViewConfigs() { assertEquals(pubSubTopics.get(i).getName(), expectedUpdateCompactionTopics.get(i)); } } + + @Test + public void testGetAdminTopicMetadata() { + String clusterName = "test-cluster"; + String storeName = "test-store"; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.of(storeName)); + doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.empty()); + + // Case 1: Not store name provided + Map remoteMetadata = AdminTopicMetadataAccessor.generateMetadataMap(10, -1, 1, 1); + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); + + Map metadata = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.empty()); + assertEquals(metadata, remoteMetadata); + + // Case 2: Store name is provided + ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class); + Map executionIdMap = new HashMap<>(); + executionIdMap.put(storeName, 10L); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + when(executionIdAccessor.getLastSucceededExecutionIdMap(anyString())).thenReturn(executionIdMap); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); + + Map expectedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, 10, 1); + Map metadataForStore = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.of(storeName)); + assertEquals(metadataForStore, expectedMetadata); + } + + @Test + public void testUpdateAdminTopicMetadata() { + String clusterName = "test-cluster"; + String storeName = "test-store"; + long executionId = 10L; + Long offset = 10L; + Long upstreamOffset = 1L; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin) + .updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty()); + doCallRealMethod().when(veniceHelixAdmin) + .updateAdminTopicMetadata( + clusterName, + executionId, + Optional.empty(), + Optional.of(offset), + Optional.of(upstreamOffset)); + + // Case 1: Store name is provided + ExecutionIdAccessor executionIdAccessor = mock(ExecutionIdAccessor.class); + when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); + + veniceHelixAdmin + .updateAdminTopicMetadata(clusterName, executionId, Optional.of(storeName), Optional.empty(), Optional.empty()); + verify(executionIdAccessor, times(1)).updateLastSucceededExecutionIdMap(clusterName, storeName, executionId); + + // Case 2: Store name is not provided + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + veniceHelixAdmin.updateAdminTopicMetadata( + clusterName, + executionId, + Optional.empty(), + Optional.of(offset), + Optional.of(upstreamOffset)); + verify(executionIdAccessor, never()).updateLastSucceededExecutionId(anyString(), anyLong()); + verify(adminConsumerService, times(1)).updateAdminTopicMetadata(clusterName, executionId, offset, upstreamOffset); + } + + @Test + public void testUpdateAdminOperationProtocolVersion() { + String clusterName = "test-cluster"; + Long adminProtocolVersion = 10L; + VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class); + doCallRealMethod().when(veniceHelixAdmin).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); + when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); + + veniceHelixAdmin.updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + verify(adminConsumerService, times(1)).updateAdminOperationProtocolVersion(clusterName, adminProtocolVersion); + } } From 61e19c8fa07a2bfe3cf2a3770bfa0acecfe2e395 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 7 Feb 2025 19:21:50 -0800 Subject: [PATCH 02/22] Convert partialUpdate to updateMetadata and using compareAndUpdate --- .../AdminTopicMetadataAccessor.java | 8 +++++-- .../ZkAdminTopicMetadataAccessor.java | 22 +++++++---------- .../kafka/consumer/AdminConsumerService.java | 24 ++++++++++++------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java index 3f9a3386b43..ab30b1f4fa7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java @@ -20,7 +20,8 @@ public abstract class AdminTopicMetadataAccessor { /** * @return a map with {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}, {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}, - * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY} specified to input values. + * {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY} + * specified to input values. */ public static Map generateMetadataMap( Optional localOffset, @@ -72,12 +73,15 @@ public static long getExecutionId(Map metadata) { return metadata.getOrDefault(EXECUTION_ID_KEY, UNDEFINED_VALUE); } + /** + * @return the value to which the specified key is mapped to {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY}. + */ public static long getAdminOperationProtocolVersion(Map metadata) { return metadata.getOrDefault(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, UNDEFINED_VALUE); } /** - * Update all relevant metadata for a given cluster in a single transaction. + * Update all relevant metadata for a given cluster in a single transaction with information provided in metadata. * @param clusterName of the cluster at interest. * @param metadata map containing relevant information. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index 72380910ffb..8f879a8d0ec 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -35,30 +35,24 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad zkMapAccessor = new ZkBaseDataAccessor<>(zkClient); } - public synchronized void partialUpdateMetadata(String clusterName, Map metadata) { - Map currentMetadata = getMetadata(clusterName); - - // Update only the keys present in the metadata map. - metadata.forEach(currentMetadata::put); - - updateMetadata(clusterName, currentMetadata); - } - /** - * @see AdminTopicMetadataAccessor#updateMetadata(String, Map) + * Update the upstream metadata map for the given cluster with specific information provided in newMetadata */ @Override - public synchronized void updateMetadata(String clusterName, Map metadata) { + public void updateMetadata(String clusterName, Map newMetadata) { String path = getAdminTopicMetadataNodePath(clusterName); - HelixUtils.update(zkMapAccessor, path, metadata, ZK_UPDATE_RETRY); - LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata); + HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> { + currentMetadataMap.putAll(newMetadata); + return currentMetadataMap; + }); + LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, newMetadata); } /** * @see AdminTopicMetadataAccessor#getMetadata(String) */ @Override - public synchronized Map getMetadata(String clusterName) { + public Map getMetadata(String clusterName) { int retry = ZK_UPDATE_RETRY; String path = getAdminTopicMetadataNodePath(clusterName); while (retry > 0) { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java index fcf779f5ad0..948f498402e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumerService.java @@ -17,6 +17,7 @@ import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.utils.DaemonThreadFactory; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.locks.AutoCloseableLock; import io.tehuti.metrics.MetricsRepository; import java.util.Map; import java.util.Optional; @@ -187,8 +188,12 @@ public Map getAdminTopicMetadata(String clusterName) { */ public void updateAdminTopicMetadata(String clusterName, long executionId, long offset, long upstreamOffset) { if (clusterName.equals(config.getClusterName())) { - Map metadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); - adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata); + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = + AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + } } else { throw new VeniceException( "This AdminConsumptionService is for cluster: " + config.getClusterName() @@ -201,12 +206,15 @@ public void updateAdminTopicMetadata(String clusterName, long executionId, long */ public void updateAdminOperationProtocolVersion(String clusterName, long adminOperationProtocolVersion) { if (clusterName.equals(config.getClusterName())) { - Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.of(adminOperationProtocolVersion)); - adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata); + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(adminOperationProtocolVersion)); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + } } else { throw new VeniceException( "This AdminConsumptionService is for cluster: " + config.getClusterName() From 5ad7bda7c1cf8f4f321bb7f0e6dad6179398ecef Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Fri, 7 Feb 2025 20:31:21 -0800 Subject: [PATCH 03/22] Increase test coverage --- .../AdminTopicMetadataResponse.java | 2 +- .../controller/TestAdminToolEndToEnd.java | 4 +- .../server/AdminTopicMetadataRoutes.java | 17 +---- .../server/AdminTopicMetadataRoutesTest.java | 69 +++++++++++++++++++ 4 files changed, 74 insertions(+), 18 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java index e4b7ab49d82..c014ef863f6 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/AdminTopicMetadataResponse.java @@ -51,7 +51,7 @@ public void setAdminOperationProtocolVersion(long adminOperationProtocolVersion) this.adminOperationProtocolVersion = adminOperationProtocolVersion; } - public Long getAdminOperationProtocolVersion() { + public long getAdminOperationProtocolVersion() { return adminOperationProtocolVersion; } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java index 0c92c91bae3..531fb5f65c5 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java @@ -214,7 +214,7 @@ public void testUpdateAdminOperationVersion() throws Exception { // Setup the original metadata AdminTopicMetadataResponse originalMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); - Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), currentVersion); + Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), (long) currentVersion); // Update the admin operation version to newVersion - 80 String[] updateAdminOperationVersionArgs = @@ -226,7 +226,7 @@ public void testUpdateAdminOperationVersion() throws Exception { // Verify the admin operation metadata version is updated TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { AdminTopicMetadataResponse updatedMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); - Assert.assertEquals(updatedMetadata.getAdminOperationProtocolVersion(), newVersion); + Assert.assertEquals(updatedMetadata.getAdminOperationProtocolVersion(), (long) newVersion); }); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index e183ff9b6cb..a904c7e1c49 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -97,19 +97,6 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder).build()); responseObject.setCluster(internalResponse.getClusterName()); responseObject.setName(internalResponse.hasStoreName() ? internalResponse.getStoreName() : null); - if (storeName.isPresent()) { - if (offset.isPresent() || upstreamOffset.isPresent()) { - throw new VeniceException("There is no store-level offsets to be updated"); - } - } else { - if (!offset.isPresent() || !upstreamOffset.isPresent()) { - throw new VeniceException("Offsets must be provided to update cluster-level admin topic metadata"); - } - } - - responseObject.setCluster(clusterName); - storeName.ifPresent(responseObject::setName); - admin.updateAdminTopicMetadata(clusterName, executionId, storeName, offset, upstreamOffset); } catch (Throwable e) { responseObject.setError(e); @@ -121,7 +108,7 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler public Route updateAdminOperationProtocolVersion(Admin admin) { return (request, response) -> { - ControllerResponse responseObject = new ControllerResponse(); + AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse(); response.type(HttpConstants.JSON); try { if (!isAllowListUser(request)) { @@ -136,9 +123,9 @@ public Route updateAdminOperationProtocolVersion(Admin admin) { Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)); responseObject.setCluster(clusterName); + responseObject.setAdminOperationProtocolVersion(adminOperationProtocolVersion); admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion); - } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java index 39e7b3a263a..863f50f54b0 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutesTest.java @@ -1,6 +1,7 @@ package com.linkedin.venice.controller.server; import static com.linkedin.venice.VeniceConstants.CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.ADMIN_OPERATION_PROTOCOL_VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.EXECUTION_ID; import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_NAME; @@ -215,4 +216,72 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception { assertNotNull(responseObject.getError()); assertTrue(responseObject.getError().contains("Internal error")); } + + @Test + public void testUpdateAdminOperationProtocolVersion() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + String adminOperationProtocolVersion = "1"; + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER); + when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); + + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + assertEquals(responseObject.getCluster(), TEST_CLUSTER); + assertEquals(responseObject.getAdminOperationProtocolVersion(), 1L); + assertNull(responseObject.getError()); + } + + @Test + public void testUpdateAdminOperationProtocolVersionHandlesUnauthorizedAccess() throws Exception { + DynamicAccessController accessController = mock(DynamicAccessController.class); + when(accessController.isAllowlistUsers(any(), any(), any())).thenReturn(false); + HttpServletRequest httpServletRequest = mock(HttpServletRequest.class); + when(request.raw()).thenReturn(httpServletRequest); + X509Certificate certificate = mock(X509Certificate.class); + X500Principal principal = new X500Principal("CN=foo"); + X509Certificate[] certificates = new X509Certificate[] { mock(X509Certificate.class) }; + when(httpServletRequest.getAttribute(CONTROLLER_SSL_CERTIFICATE_ATTRIBUTE_NAME)).thenReturn(certificates); + doReturn(principal).when(certificate).getSubjectX500Principal(); + doReturn(httpServletRequest).when(request).raw(); + + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + String adminOperationProtocolVersion = "1"; + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER); + when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion); + + Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController)) + .updateAdminOperationProtocolVersion(mockAdmin); + + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + assertNotNull(responseObject.getError()); + assertTrue(responseObject.getError().contains("Only admin users are allowed")); + } + + @Test + public void testUpdateAdminOperationProtocolVersionHandlesMissingParams() throws Exception { + QueryParamsMap paramsMap = mock(QueryParamsMap.class); + doReturn(new HashMap<>()).when(paramsMap).toMap(); + doReturn(paramsMap).when(request).queryMap(); + + when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter + + Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin); + AdminTopicMetadataResponse responseObject = + OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class); + + verify(requestHandler, never()).getAdminTopicMetadata(any()); + assertNotNull(responseObject.getError()); + assertTrue(responseObject.getError().contains("cluster_name is a required parameter")); + } } From eb9c94ebaf4d31f8cba855a9597c096c07136f45 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 10 Feb 2025 10:47:48 -0800 Subject: [PATCH 04/22] Fix updateMetadata when currentMetadataMap is null --- .../venice/controller/ZkAdminTopicMetadataAccessor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index 8f879a8d0ec..9f6b97568f6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -42,6 +42,9 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad public void updateMetadata(String clusterName, Map newMetadata) { String path = getAdminTopicMetadataNodePath(clusterName); HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> { + if (currentMetadataMap == null) { + currentMetadataMap = new HashMap<>(); + } currentMetadataMap.putAll(newMetadata); return currentMetadataMap; }); From 68b9037706e1ebbeb93fbf32c92901d0f77185e3 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 10 Feb 2025 14:51:27 -0800 Subject: [PATCH 05/22] Add inline mock extensions --- .../resources/mockito-extensions/org.mockito.plugins.MockMaker | 1 + 1 file changed, 1 insertion(+) create mode 100644 services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..ca6ee9cea8e --- /dev/null +++ b/services/venice-controller/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file From 6eaf3c818f40de95b0818471a989301df1badfd7 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 10 Feb 2025 14:54:24 -0800 Subject: [PATCH 06/22] Update comments --- .../venice/admin/InMemoryAdminTopicMetadataAccessor.java | 2 +- .../linkedin/venice/controller/AdminTopicMetadataAccessor.java | 2 +- .../venice/controller/ZkAdminTopicMetadataAccessor.java | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java index 087b359bde7..4890db30b8e 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/admin/InMemoryAdminTopicMetadataAccessor.java @@ -16,7 +16,7 @@ public class InMemoryAdminTopicMetadataAccessor extends AdminTopicMetadataAccess @Override public void updateMetadata(String clusterName, Map metadata) { - inMemoryMetadata = metadata; + inMemoryMetadata.putAll(metadata); LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java index ab30b1f4fa7..0f3018598d9 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java @@ -81,7 +81,7 @@ public static long getAdminOperationProtocolVersion(Map metadata) } /** - * Update all relevant metadata for a given cluster in a single transaction with information provided in metadata. + * Update specific metadata for a given cluster in a single transaction with information provided in metadata. * @param clusterName of the cluster at interest. * @param metadata map containing relevant information. */ diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index 9f6b97568f6..ef0435bddc3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -37,6 +37,8 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad /** * Update the upstream metadata map for the given cluster with specific information provided in newMetadata + * + * @see AdminTopicMetadataAccessor#updateMetadata(String, Map) */ @Override public void updateMetadata(String clusterName, Map newMetadata) { From 7c55981e7618c7afe3b5b72ad2d632bb74542c5a Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 10 Feb 2025 14:54:48 -0800 Subject: [PATCH 07/22] Add tests for ZkAdminTopicMetadataAccessor --- .../TestZkAdminTopicMetadataAccessor.java | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java new file mode 100644 index 00000000000..9674fe2833d --- /dev/null +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -0,0 +1,82 @@ +package com.linkedin.venice.controller; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.venice.helix.HelixAdapterSerializer; +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.DataTree; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class TestZkAdminTopicMetadataAccessor { + private ZkClient zkClient; + private HelixAdapterSerializer adapterSerializer; + private ZkAdminTopicMetadataAccessor zkAdminTopicMetadataAccessor; + + @BeforeMethod + public void setUp() { + zkClient = mock(ZkClient.class); + adapterSerializer = mock(HelixAdapterSerializer.class); + zkAdminTopicMetadataAccessor = new ZkAdminTopicMetadataAccessor(zkClient, adapterSerializer); + } + + @Test + public void testUpdateMetadata() { + String clusterName = "test-cluster"; + + // Original metadata + Map currentMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1, 18); + + // New metadata + Map newMetadata = new HashMap<>(); + newMetadata.put("offset", 100L); + + // Updated metadata with new metadata + Map updatedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(100, -1, 1, 18); + + String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); + try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class, RETURNS_SMART_NULLS)) { + dataTreeMockedStatic.when(() -> DataTree.copyStat(any(), any())).thenAnswer(invocation -> null); + Stat readStat = new Stat(); + + when(zkClient.readData(metadataPath, readStat)).thenReturn(null) // Case 1: when there is no metadata + .thenReturn(currentMetadata); // Case 2: the metadata is not null + + // Case 1: when there is no metadata - null + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); + verify(zkClient, times(1)).writeDataGetStat(metadataPath, newMetadata, 0); + + // Case 2: the metadata is not null + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); + verify(zkClient, times(1)).writeDataGetStat(metadataPath, updatedMetadata, 0); + + // Verify that the metadata path got read 2 times + verify(zkClient, times(2)).readData(metadataPath, readStat); + } + } + + @Test + public void testGetMetadata() { + String clusterName = "test-cluster"; + Map currentMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1, 18); + String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); + + when(zkClient.readData(metadataPath, null)).thenReturn(null).thenReturn(currentMetadata); + + // Case 1: when there is no metadata + Map metadata = zkAdminTopicMetadataAccessor.getMetadata(clusterName); + assertEquals(metadata, new HashMap<>()); + + // Case 2: the metadata is not null + metadata = zkAdminTopicMetadataAccessor.getMetadata(clusterName); + assertEquals(metadata, currentMetadata); + } +} From 91b2604339afdc3f2f7d51f93881a16211f82289 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Mon, 10 Feb 2025 16:40:06 -0800 Subject: [PATCH 08/22] Add lock for updateMetadata inside AdminConsumptionTask, updated corresponding tests --- .../kafka/consumer/AdminConsumptionTask.java | 24 ++++++++++------- .../consumer/AdminConsumptionTaskTest.java | 26 +++++++------------ 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index c294fc1a4e2..1443e361fdb 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -34,6 +34,7 @@ import com.linkedin.venice.utils.Pair; import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.utils.locks.AutoCloseableLock; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -896,16 +897,19 @@ private void persistAdminTopicMetadata() { // Skip since there are no new admin messages processed. return; } - Map metadata = remoteConsumptionEnabled - ? AdminTopicMetadataAccessor - .generateMetadataMap(localOffsetCheckpointAtStartTime, lastOffset, lastDelegatedExecutionId) - : AdminTopicMetadataAccessor - .generateMetadataMap(lastOffset, upstreamOffsetCheckpointAtStartTime, lastDelegatedExecutionId); - adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); - lastPersistedOffset = lastOffset; - lastPersistedExecutionId = lastDelegatedExecutionId; - LOGGER.info("Updated lastPersistedOffset to {}", lastPersistedOffset); - stats.setAdminConsumptionCheckpointOffset(lastPersistedOffset); + try (AutoCloseableLock ignore = + admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { + Map metadata = remoteConsumptionEnabled + ? AdminTopicMetadataAccessor + .generateMetadataMap(localOffsetCheckpointAtStartTime, lastOffset, lastDelegatedExecutionId) + : AdminTopicMetadataAccessor + .generateMetadataMap(lastOffset, upstreamOffsetCheckpointAtStartTime, lastDelegatedExecutionId); + adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); + lastPersistedOffset = lastOffset; + lastPersistedExecutionId = lastDelegatedExecutionId; + LOGGER.info("Updated lastPersistedOffset to {}", lastPersistedOffset); + stats.setAdminConsumptionCheckpointOffset(lastPersistedOffset); + } } void skipMessageWithOffset(long offset) { diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 71fcaa5034f..8d1a7908a06 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -19,29 +19,14 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.anyDouble; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.linkedin.venice.admin.InMemoryAdminTopicMetadataAccessor; import com.linkedin.venice.admin.InMemoryExecutionIdAccessor; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controller.AdminTopicMetadataAccessor; import com.linkedin.venice.controller.ExecutionIdAccessor; +import com.linkedin.venice.controller.HelixVeniceClusterResources; import com.linkedin.venice.controller.VeniceHelixAdmin; import com.linkedin.venice.controller.kafka.AdminTopicUtils; import com.linkedin.venice.controller.kafka.protocol.admin.AddVersion; @@ -97,6 +82,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.locks.ClusterLockManager; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterOptions; import java.io.IOException; @@ -189,6 +175,12 @@ public void methodSetup() { doReturn(new HashSet<>(Arrays.asList(pubSubTopic))).when(topicManager).listTopics(); doReturn(topicManager).when(admin).getTopicManager(); doReturn(true).when(topicManager).containsTopicAndAllPartitionsAreOnline(pubSubTopic); + + HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class, RETURNS_DEEP_STUBS); + ClusterLockManager lockManager = new ClusterLockManager(clusterName); + doReturn(resources).when(admin).getHelixVeniceClusterResources(clusterName); + doReturn(lockManager).when(resources).getClusterLockManager(); + doCallRealMethod().when(resources).getStoreMetadataRepository(); } @AfterMethod From ddb25938f51e8bc8e1e3cef92aed64dbf0c5789d Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Tue, 11 Feb 2025 13:26:26 -0800 Subject: [PATCH 09/22] Specific the imports --- .../TestZkAdminTopicMetadataAccessor.java | 9 ++++++--- .../consumer/AdminConsumptionTaskTest.java | 20 ++++++++++++++++++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java index 9674fe2833d..20a6f8434b4 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -1,8 +1,11 @@ package com.linkedin.venice.controller; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; -import static org.testng.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import com.linkedin.venice.helix.HelixAdapterSerializer; import java.util.HashMap; @@ -43,7 +46,7 @@ public void testUpdateMetadata() { Map updatedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(100, -1, 1, 18); String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); - try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class, RETURNS_SMART_NULLS)) { + try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { dataTreeMockedStatic.when(() -> DataTree.copyStat(any(), any())).thenAnswer(invocation -> null); Stat readStat = new Stat(); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 8d1a7908a06..2f385fe4337 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -19,7 +19,25 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyDouble; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.linkedin.venice.admin.InMemoryAdminTopicMetadataAccessor; import com.linkedin.venice.admin.InMemoryExecutionIdAccessor; From 788a2f8bf4b804478e42377b4accbf73e97f76ef Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Tue, 11 Feb 2025 13:28:16 -0800 Subject: [PATCH 10/22] doc: add comments --- .../java/com/linkedin/venice/controller/VeniceHelixAdmin.java | 1 + 1 file changed, 1 insertion(+) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 0936368b69e..42d15a84c59 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7681,6 +7681,7 @@ public VeniceProperties getPubSubSSLProperties(String pubSubBrokerAddress) { return this.getPubSubSSLPropertiesFromControllerConfig(pubSubBrokerAddress); } + // public for testing purpose public AdminConsumerService getAdminConsumerService(String clusterName) { return adminConsumerServices.get(clusterName); } From 1bf784bfd958cb563697db03d9f985f76bf8ba82 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Tue, 11 Feb 2025 13:49:43 -0800 Subject: [PATCH 11/22] Update comment --- .../java/com/linkedin/venice/controller/VeniceHelixAdmin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 42d15a84c59..acfa37e9216 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7498,7 +7498,8 @@ public Optional getAdminCommandExecutionTracker(St } /** - * @return cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id. + * @return cluster-level execution id, offset, upstream offset, and admin operation protocol version. + * If store name is specified, it returns store-level execution id. */ public Map getAdminTopicMetadata(String clusterName, Optional storeName) { Map metadata = getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName); From a8676506bce736658ef9a828397ea8980f4e9e25 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 10:15:37 -0800 Subject: [PATCH 12/22] Remove redundant call --- .../venice/controller/server/AdminTopicMetadataRoutes.java | 1 - 1 file changed, 1 deletion(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index a904c7e1c49..e1575f78899 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -97,7 +97,6 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler UpdateAdminTopicMetadataGrpcRequest.newBuilder().setMetadata(adminMetadataBuilder).build()); responseObject.setCluster(internalResponse.getClusterName()); responseObject.setName(internalResponse.hasStoreName() ? internalResponse.getStoreName() : null); - admin.updateAdminTopicMetadata(clusterName, executionId, storeName, offset, upstreamOffset); } catch (Throwable e) { responseObject.setError(e); AdminSparkServer.handleError(new VeniceException(e), request, response); From 9191ed5662614158eb6c2c49431598a7ea179165 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 10:37:52 -0800 Subject: [PATCH 13/22] Rename newMetadata and add log inside dataUpdater --- .../controller/ZkAdminTopicMetadataAccessor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java index ef0435bddc3..cda6fbff92c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java @@ -36,21 +36,25 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad } /** - * Update the upstream metadata map for the given cluster with specific information provided in newMetadata + * Update the upstream metadata map for the given cluster with specific information provided in metadataDelta * * @see AdminTopicMetadataAccessor#updateMetadata(String, Map) */ @Override - public void updateMetadata(String clusterName, Map newMetadata) { + public void updateMetadata(String clusterName, Map metadataDelta) { String path = getAdminTopicMetadataNodePath(clusterName); HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> { if (currentMetadataMap == null) { currentMetadataMap = new HashMap<>(); } - currentMetadataMap.putAll(newMetadata); + LOGGER.info( + "Updating AdminTopicMetadata map for cluster: {}. Current metadata: {}. New delta metadata: {}", + clusterName, + currentMetadataMap, + metadataDelta); + currentMetadataMap.putAll(metadataDelta); return currentMetadataMap; }); - LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, newMetadata); } /** From acbfcf366c8c569c005e0013ad37b4753cd85e1b Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 11:19:14 -0800 Subject: [PATCH 14/22] Remove overload generateMetadataMap --- .../AdminTopicMetadataAccessor.java | 20 ---- .../venice/controller/VeniceHelixAdmin.java | 6 +- .../kafka/consumer/AdminConsumerService.java | 7 +- .../kafka/consumer/AdminConsumptionTask.java | 14 ++- .../controller/TestVeniceHelixAdmin.java | 6 +- .../TestVeniceParentHelixAdmin.java | 97 ++++++++++++++----- .../TestVeniceParentHelixAdminWithAcl.java | 16 ++- .../TestZkAdminTopicMetadataAccessor.java | 10 +- .../consumer/AdminConsumptionTaskTest.java | 12 ++- 9 files changed, 124 insertions(+), 64 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java index 0f3018598d9..51f77b45b76 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java @@ -36,26 +36,6 @@ public static Map generateMetadataMap( return metadata; } - public static Map generateMetadataMap(long localOffset, long upstreamOffset, long executionId) { - return generateMetadataMap( - Optional.of(localOffset), - Optional.of(upstreamOffset), - Optional.of(executionId), - Optional.empty()); - } - - public static Map generateMetadataMap( - long localOffset, - long upstreamOffset, - long executionId, - long adminOperationProtocolVersion) { - return generateMetadataMap( - Optional.of(localOffset), - Optional.of(upstreamOffset), - Optional.of(executionId), - Optional.of(adminOperationProtocolVersion)); - } - /** * @return a pair of values to which the specified keys are mapped to {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY} * and {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index acfa37e9216..cb5b4876439 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7509,7 +7509,11 @@ public Map getAdminTopicMetadata(String clusterName, Optional metadata = - AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId); + Map metadata = AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(offset), + Optional.of(upstreamOffset), + Optional.of(executionId), + Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); } } else { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java index 1443e361fdb..2fef1d90311 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java @@ -900,10 +900,16 @@ private void persistAdminTopicMetadata() { try (AutoCloseableLock ignore = admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) { Map metadata = remoteConsumptionEnabled - ? AdminTopicMetadataAccessor - .generateMetadataMap(localOffsetCheckpointAtStartTime, lastOffset, lastDelegatedExecutionId) - : AdminTopicMetadataAccessor - .generateMetadataMap(lastOffset, upstreamOffsetCheckpointAtStartTime, lastDelegatedExecutionId); + ? AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(localOffsetCheckpointAtStartTime), + Optional.of(lastOffset), + Optional.of(lastDelegatedExecutionId), + Optional.empty()) + : AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(lastOffset), + Optional.of(upstreamOffsetCheckpointAtStartTime), + Optional.of(lastDelegatedExecutionId), + Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, metadata); lastPersistedOffset = lastOffset; lastPersistedExecutionId = lastDelegatedExecutionId; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 855310108d8..6b4d33490ff 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -966,7 +966,8 @@ public void testGetAdminTopicMetadata() { doCallRealMethod().when(veniceHelixAdmin).getAdminTopicMetadata(clusterName, Optional.empty()); // Case 1: Not store name provided - Map remoteMetadata = AdminTopicMetadataAccessor.generateMetadataMap(10, -1, 1, 1); + Map remoteMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(10L), Optional.of(-1L), Optional.of(1L), Optional.of(1L)); AdminConsumerService adminConsumerService = mock(AdminConsumerService.class); when(veniceHelixAdmin.getAdminConsumerService(clusterName)).thenReturn(adminConsumerService); when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); @@ -983,7 +984,8 @@ public void testGetAdminTopicMetadata() { when(veniceHelixAdmin.getExecutionIdAccessor()).thenReturn(executionIdAccessor); when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); - Map expectedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(-1, -1, 10, 1); + Map expectedMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(10L), Optional.of(1L)); Map metadataForStore = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.of(storeName)); assertEquals(metadataForStore, expectedMetadata); } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index d1c1adfc4be..9f355ffdf29 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -299,7 +299,9 @@ public void testAddStore() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); @@ -378,8 +380,9 @@ public void testCreateStoreForMultiCluster() { String valueSchemaStr = "\"string\""; when(veniceWriter.put(any(), any(), anyInt())).then(invocation -> { // Once we send message to topic through venice writer, return offset 1 - when(zkClient.readData(metadataPath, null)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + when(zkClient.readData(metadataPath, null)).thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); CompletableFuture future = mock(CompletableFuture.class); doReturn(new SimplePubSubProduceResultImpl(adminTopic, partitionId, 1, -1)).when(future).get(); return future; @@ -491,7 +494,9 @@ public void testAddValueSchema() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.addValueSchema(clusterName, storeName, valueSchemaStr, DirectionalSchemaCompatibilityType.FULL); @@ -543,7 +548,9 @@ public void testAddDerivedSchema() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.addDerivedSchema(clusterName, storeName, valueSchemaId, derivedSchemaStr); @@ -570,7 +577,9 @@ public void testDisableStoreRead() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -606,7 +615,9 @@ public void testDisableStoreWrite() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -647,7 +658,9 @@ public void testDisableStoreWriteWhenStoreDoesNotExist() { when(zkClient.readData(zkMetadataNodePath, null)) .thenReturn(new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer())) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); assertThrows(VeniceNoStoreException.class, () -> parentAdmin.setStoreWriteability(clusterName, storeName, false)); @@ -660,7 +673,9 @@ public void testEnableStoreRead() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -696,7 +711,9 @@ public void testEnableStoreWrite() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); String storeName = "test-store"; parentAdmin.initStorageCluster(clusterName); @@ -735,7 +752,9 @@ public void testKillOfflinePushJob() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); Store store = mock(Store.class); doReturn(store).when(internalAdmin).getStore(clusterName, pubSubTopic.getStoreName()); @@ -798,7 +817,9 @@ public void testIdempotentIncrementVersionWhenNoPreviousTopics() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); partialMockParentAdmin.incrementVersionIdempotent(clusterName, storeName, pushJobId, 1, 1); verify(internalAdmin).addVersionAndTopicOnly( clusterName, @@ -929,7 +950,9 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistAndOfflineJobIs .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); Version newVersion = partialMockParentAdmin.incrementVersionIdempotent( clusterName, storeName, @@ -1019,7 +1042,9 @@ public void testIdempotentIncrementVersionWhenPreviousTopicsExistButTruncated() .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); partialMockParentAdmin.incrementVersionIdempotent( clusterName, storeName, @@ -1783,7 +1808,9 @@ public void testUpdateStore() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); UpdateStoreQueryParams storeQueryParams1 = new UpdateStoreQueryParams().setBlobTransferEnabled(true); parentAdmin.initStorageCluster(clusterName); @@ -1913,7 +1940,9 @@ public void testUpdateStoreNativeReplicationSourceFabric() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin @@ -1938,7 +1967,9 @@ public void testUpdateStoreTargetSwapRegion() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams().setTargetRegionSwap("prod") .setTargetRegionSwapWaitTime(100) @@ -1976,7 +2007,9 @@ public void testDisableHybridConfigWhenActiveActiveOrIncPushConfigIsEnabled() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); // When user disable hybrid but also try to manually turn on A/A or Incremental Push, update operation should fail @@ -2147,7 +2180,9 @@ public void testRemoveStoreViewConfig() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.updateStore( @@ -2178,7 +2213,9 @@ public void testUpdateStoreWithBadPartitionerConfigs() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); assertThrows( @@ -2230,7 +2267,9 @@ public void testDeleteStore() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.deleteStore(clusterName, storeName, false, 0, true); @@ -2569,7 +2608,9 @@ public void testAdminCanKillLingeringVersion(boolean isIncrementalPush) { .put(any(), any(), anyInt()); mockControllerClients(storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); if (isIncrementalPush) { /** @@ -2727,7 +2768,9 @@ public void testHybridAndIncrementalUpdateStoreCommands(boolean aaEnabled) { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); parentAdmin.updateStore( @@ -2792,7 +2835,9 @@ public void testSendAdminMessageAcquiresClusterReadLock() { .when(veniceWriter) .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); String storeName = "test-store"; String owner = "test-owner"; @@ -2967,7 +3012,9 @@ private Store setupForStoreViewConfigUpdateTest(String storeName) { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); parentAdmin.initStorageCluster(clusterName); return store; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java index ba63bf15e3c..36618899709 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java @@ -126,7 +126,9 @@ public void testDeleteStoreWithAuthorization() { .put(any(), any(), anyInt()); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(null) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); parentAdmin.initStorageCluster(clusterName); parentAdmin.deleteStore(clusterName, storeName, false, 0, true); @@ -175,7 +177,9 @@ public void testUpdateAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows( VeniceNoStoreException.class, @@ -193,7 +197,9 @@ public void testGetAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows(VeniceNoStoreException.class, () -> parentAdmin.getAclForStore(clusterName, storeName)); } @@ -208,7 +214,9 @@ public void testDeleteAclException() { .checkPreConditionForAclOp(clusterName, storeName); when(zkClient.readData(zkMetadataNodePath, null)).thenReturn(new OffsetRecord(partitionStateSerializer)) - .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); + .thenReturn( + AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty())); initializeParentAdmin(Optional.of(authorizerService)); Assert.assertThrows(VeniceNoStoreException.class, () -> parentAdmin.deleteAclForStore(clusterName, storeName)); Assert.assertEquals(0, authorizerService.clearAclCounter); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java index 20a6f8434b4..aedb176710b 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -10,6 +10,7 @@ import com.linkedin.venice.helix.HelixAdapterSerializer; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.DataTree; @@ -36,14 +37,16 @@ public void testUpdateMetadata() { String clusterName = "test-cluster"; // Original metadata - Map currentMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1, 18); + Map currentMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); // New metadata Map newMetadata = new HashMap<>(); newMetadata.put("offset", 100L); // Updated metadata with new metadata - Map updatedMetadata = AdminTopicMetadataAccessor.generateMetadataMap(100, -1, 1, 18); + Map updatedMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(100L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { @@ -69,7 +72,8 @@ public void testUpdateMetadata() { @Test public void testGetMetadata() { String clusterName = "test-cluster"; - Map currentMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1, 18); + Map currentMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); when(zkClient.readData(metadataPath, null)).thenReturn(null).thenReturn(currentMetadata); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 2f385fe4337..6c6e7244c67 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -744,7 +744,11 @@ public void testRunWithFalsePositiveMissingMessagesWhenFirstBecomeLeaderControll AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); adminTopicMetadataAccessor.updateMetadata( clusterName, - AdminTopicMetadataAccessor.generateMetadataMap(metadataForStoreName0Future.get().getOffset(), -1, 1)); + AdminTopicMetadataAccessor.generateMetadataMap( + Optional.of(metadataForStoreName0Future.get().getOffset()), + Optional.of(-1L), + Optional.of(1L), + Optional.empty())); // Write a message with a skipped execution id but a different producer metadata. veniceWriter.put( @@ -830,7 +834,8 @@ public void testRunWithBiggerStartingOffset() throws InterruptedException, IOExc // The store doesn't exist doReturn(false).when(admin).hasStore(clusterName, storeName1); doReturn(false).when(admin).hasStore(clusterName, storeName2); - Map newMetadata = AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1); + Map newMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); AdminConsumptionTask task = getAdminConsumptionTask(new RandomPollStrategy(), false); @@ -1119,7 +1124,8 @@ public void testResubscribe() throws IOException, InterruptedException, TimeoutE getKillOfflinePushJobMessage(clusterName, storeTopicName, 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION); long offset = future.get(TIMEOUT, TimeUnit.MILLISECONDS).getOffset(); - Map newMetadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, -1, 4L); + Map newMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(offset), Optional.of(-1L), Optional.of(4L), Optional.empty()); adminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); executionIdAccessor.updateLastSucceededExecutionIdMap(clusterName, storeName, 4L); // Resubscribe to the admin topic and make sure it can still process new admin messages From 7cb1099cb92a9e285d0429d1d5222d7b3e71575f Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 14:53:41 -0800 Subject: [PATCH 15/22] Update integration test to test updateAdminOperationProtocolVersion --- .../controller/TestAdminToolEndToEnd.java | 54 +++++++++++++++---- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java index 531fb5f65c5..c76af357746 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestAdminToolEndToEnd.java @@ -9,9 +9,11 @@ import com.linkedin.venice.Arg; import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse; import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.MultiStoreResponse; import com.linkedin.venice.controllerapi.NewStoreResponse; import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.controllerapi.VersionCreationResponse; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; @@ -194,8 +196,9 @@ public void testNodeReplicasReadinessCommand() throws Exception { @Test(timeOut = 4 * TEST_TIMEOUT) public void testUpdateAdminOperationVersion() throws Exception { - Long currentVersion = -1L; + Long defaultVersion = -1L; Long newVersion = 80L; + String storeName = Utils.getUniqueString("test-store"); try (VeniceTwoLayerMultiRegionMultiClusterWrapper venice = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper( new VeniceMultiRegionClusterCreateOptions.Builder().numberOfRegions(1) @@ -209,24 +212,57 @@ public void testUpdateAdminOperationVersion() throws Exception { String clusterName = venice.getClusterNames()[0]; // Get the parent con†roller - VeniceControllerWrapper controller = venice.getParentControllers().get(0); - ControllerClient controllerClient = new ControllerClient(clusterName, controller.getControllerUrl()); + VeniceControllerWrapper parentController = venice.getParentControllers().get(0); + ControllerClient parentControllerClient = new ControllerClient(clusterName, parentController.getControllerUrl()); - // Setup the original metadata - AdminTopicMetadataResponse originalMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); - Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), (long) currentVersion); + // Verify the original metadata - default value + AdminTopicMetadataResponse originalMetadata = parentControllerClient.getAdminTopicMetadata(Optional.empty()); + Assert.assertEquals(originalMetadata.getAdminOperationProtocolVersion(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getExecutionId(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getOffset(), (long) defaultVersion); + Assert.assertEquals(originalMetadata.getUpstreamOffset(), (long) defaultVersion); + + // Create store + NewStoreResponse newStoreResponse = + parentControllerClient.createNewStore(storeName, "test", "\"string\"", "\"string\""); + Assert.assertFalse(newStoreResponse.isError()); + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(storeName, Utils.getUniqueString("empty-push-1"), 1L); + Assert.assertFalse(versionCreationResponse.isError()); + + // Update store config + ControllerResponse updateStore = + parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setBatchGetLimit(100)); + Assert.assertFalse(updateStore.isError()); + + // Check the baseline metadata + AdminTopicMetadataResponse metdataAfterStoreCreation = + parentControllerClient.getAdminTopicMetadata(Optional.empty()); + long baselineExecutionId = metdataAfterStoreCreation.getExecutionId(); + long baselineOffset = metdataAfterStoreCreation.getOffset(); + long baselineUpstreamOffset = metdataAfterStoreCreation.getUpstreamOffset(); + long baselineAdminVersion = metdataAfterStoreCreation.getAdminOperationProtocolVersion(); + + // Execution id and offset should be positive now since we have created a store and updated the store config + Assert.assertEquals(baselineAdminVersion, (long) defaultVersion); + Assert.assertTrue(baselineExecutionId > 0); + Assert.assertTrue(baselineOffset > 0); + Assert.assertEquals(baselineUpstreamOffset, (long) defaultVersion); // Update the admin operation version to newVersion - 80 String[] updateAdminOperationVersionArgs = - { "--update-admin-operation-protocol-version", "--url", controller.getControllerUrl(), "--cluster", + { "--update-admin-operation-protocol-version", "--url", parentController.getControllerUrl(), "--cluster", clusterName, "--admin-operation-protocol-version", newVersion.toString() }; AdminTool.main(updateAdminOperationVersionArgs); - // Verify the admin operation metadata version is updated + // Verify the admin operation metadata version is updated and the remaining data is unchanged TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { - AdminTopicMetadataResponse updatedMetadata = controllerClient.getAdminTopicMetadata(Optional.empty()); + AdminTopicMetadataResponse updatedMetadata = parentControllerClient.getAdminTopicMetadata(Optional.empty()); Assert.assertEquals(updatedMetadata.getAdminOperationProtocolVersion(), (long) newVersion); + Assert.assertEquals(updatedMetadata.getExecutionId(), baselineExecutionId); + Assert.assertEquals(updatedMetadata.getOffset(), baselineOffset); + Assert.assertEquals(updatedMetadata.getUpstreamOffset(), baselineUpstreamOffset); }); } } From 3d2cd67745e1ef4d4a94652262d54d98ef7406d9 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 15:22:00 -0800 Subject: [PATCH 16/22] Modify unittest to test the input --- .../test/java/com/linkedin/venice/TestAdminTool.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index 58bf4f692bd..17fa568a152 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -414,14 +414,9 @@ public void testAdminConfigureView() throws ParseException, IOException { } @Test - public void testUpdateAdminOperationProtocolVersion() throws ParseException, IOException { + public void testUpdateAdminOperationProtocolVersionWithInvalidInput() { String[] args = { "--update-admin-operation-protocol-version", "--url", "http://localhost:7036", "--cluster", - "test-cluster", "--admin-operation-protocol-version", "1" }; - - try { - AdminTool.main(args); - } catch (Exception e) { - Assert.fail("AdminTool should allow admin topic metadata to be updated admin operation version", e); - } + "test-cluster", "--admin-operation-protocol-version", "many" }; + Assert.assertThrows(VeniceException.class, () -> AdminTool.main(args)); } } From b5bacdec456e6f2bfea929bbd93801199a55cfd9 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 15:24:40 -0800 Subject: [PATCH 17/22] Change the value --- .../src/test/java/com/linkedin/venice/TestAdminTool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index 17fa568a152..9069052669c 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -416,7 +416,7 @@ public void testAdminConfigureView() throws ParseException, IOException { @Test public void testUpdateAdminOperationProtocolVersionWithInvalidInput() { String[] args = { "--update-admin-operation-protocol-version", "--url", "http://localhost:7036", "--cluster", - "test-cluster", "--admin-operation-protocol-version", "many" }; + "test-cluster", "--admin-operation-protocol-version", "thisShouldBeLongValue" }; Assert.assertThrows(VeniceException.class, () -> AdminTool.main(args)); } } From 31ded8b5997bdf4cf681dfc97fc0a34ef560fa7d Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 15:58:14 -0800 Subject: [PATCH 18/22] Remove admin operation version from getAdminTopicMetadata when storeName is provided --- .../linkedin/venice/controller/VeniceHelixAdmin.java | 12 +++--------- .../controller/server/AdminTopicMetadataRoutes.java | 2 +- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index cb5b4876439..db1ce315564 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -7502,20 +7502,14 @@ public Optional getAdminCommandExecutionTracker(St * If store name is specified, it returns store-level execution id. */ public Map getAdminTopicMetadata(String clusterName, Optional storeName) { - Map metadata = getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName); - if (storeName.isPresent()) { Long executionId = getExecutionIdAccessor().getLastSucceededExecutionIdMap(clusterName).get(storeName.get()); - Long adminOperationProtocolVersion = AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata); return executionId == null ? Collections.emptyMap() - : AdminTopicMetadataAccessor.generateMetadataMap( - Optional.of(-1L), - Optional.of(-1L), - Optional.of(executionId), - Optional.of(adminOperationProtocolVersion)); + : AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(executionId), Optional.of(-1L)); } - return metadata; + return getAdminConsumerService(clusterName).getAdminTopicMetadata(clusterName); } /** diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java index e1575f78899..fa2a6300e04 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/AdminTopicMetadataRoutes.java @@ -53,10 +53,10 @@ public Route getAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler re responseObject.setCluster(clusterName); storeName.ifPresent(responseObject::setName); responseObject.setExecutionId(adminTopicMetadata.getExecutionId()); - responseObject.setAdminOperationProtocolVersion(adminTopicMetadata.getAdminOperationProtocolVersion()); if (!storeName.isPresent()) { responseObject.setOffset(adminTopicMetadata.getOffset()); responseObject.setUpstreamOffset(adminTopicMetadata.getUpstreamOffset()); + responseObject.setAdminOperationProtocolVersion(adminTopicMetadata.getAdminOperationProtocolVersion()); } } catch (Throwable e) { responseObject.setError(e); From 2292bb4843d2a5aeb52e293f6d5f88db423d9fed Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Wed, 12 Feb 2025 16:01:03 -0800 Subject: [PATCH 19/22] Fix test --- .../com/linkedin/venice/controller/TestVeniceHelixAdmin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 6b4d33490ff..5e91eb7d45e 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -985,7 +985,7 @@ public void testGetAdminTopicMetadata() { when(adminConsumerService.getAdminTopicMetadata(anyString())).thenReturn(remoteMetadata); Map expectedMetadata = AdminTopicMetadataAccessor - .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(10L), Optional.of(1L)); + .generateMetadataMap(Optional.of(-1L), Optional.of(-1L), Optional.of(10L), Optional.of(-1L)); Map metadataForStore = veniceHelixAdmin.getAdminTopicMetadata(clusterName, Optional.of(storeName)); assertEquals(metadataForStore, expectedMetadata); } From 32bb1daf90a3355adac1a697bbe02d97a48bc0e8 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Thu, 13 Feb 2025 13:39:55 -0800 Subject: [PATCH 20/22] Move the admin version inside the response when store name is not provided --- .../controller/server/ClusterAdminOpsRequestHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java index f9fb1556a91..ccdcdb097b3 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/ClusterAdminOpsRequestHandler.java @@ -102,11 +102,11 @@ public AdminTopicMetadataGrpcResponse getAdminTopicMetadata(AdminTopicMetadataGr Pair offsets = AdminTopicMetadataAccessor.getOffsets(metadata); adminMetadataBuilder.setOffset(offsets.getFirst()); adminMetadataBuilder.setUpstreamOffset(offsets.getSecond()); + adminMetadataBuilder + .setAdminOperationProtocolVersion(AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata)); } else { adminMetadataBuilder.setStoreName(storeName); } - adminMetadataBuilder - .setAdminOperationProtocolVersion(AdminTopicMetadataAccessor.getAdminOperationProtocolVersion(metadata)); return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build(); } From 59ba89be2e8166af725e3128fa9a9172a0af5814 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Thu, 13 Feb 2025 16:41:25 -0800 Subject: [PATCH 21/22] Break the test into 2 smaller tests --- .../TestZkAdminTopicMetadataAccessor.java | 59 +++++++++++++------ 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java index aedb176710b..ebc9f957b5c 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -33,39 +33,64 @@ public void setUp() { } @Test - public void testUpdateMetadata() { + public void testUpdateMetadataWhenMetadataIsEmpty() { + String clusterName = "test-cluster"; + + // metadata that we are trying to update + Map metadataDelta = new HashMap<>(); + metadataDelta.put("offset", 100L); + + String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); + try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { + dataTreeMockedStatic.when(() -> DataTree.copyStat(any(), any())).thenAnswer(invocation -> null); + Stat readStat = new Stat(); + + // Mock the metadata on prod - null + when(zkClient.readData(metadataPath, readStat)).thenReturn(null); + + // Update the metadata + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, metadataDelta); + + // Verify that the metadata path got read 1 time + verify(zkClient, times(1)).readData(metadataPath, readStat); + + // Verify that the metadata path got read 1 time with the metadataDelta map + // When the metadata is empty, the metadataDelta should be written as is + verify(zkClient, times(1)).writeDataGetStat(metadataPath, metadataDelta, 0); + } + } + + @Test + public void testUpdateMetadataWithFullMetadata() { String clusterName = "test-cluster"; // Original metadata Map currentMetadata = AdminTopicMetadataAccessor .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); - // New metadata - Map newMetadata = new HashMap<>(); - newMetadata.put("offset", 100L); - - // Updated metadata with new metadata - Map updatedMetadata = AdminTopicMetadataAccessor - .generateMetadataMap(Optional.of(100L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + // metadata that we are trying to update + Map metadataDelta = new HashMap<>(); + metadataDelta.put("offset", 100L); String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { dataTreeMockedStatic.when(() -> DataTree.copyStat(any(), any())).thenAnswer(invocation -> null); Stat readStat = new Stat(); - when(zkClient.readData(metadataPath, readStat)).thenReturn(null) // Case 1: when there is no metadata - .thenReturn(currentMetadata); // Case 2: the metadata is not null + when(zkClient.readData(metadataPath, readStat)).thenReturn(currentMetadata); // Case 2: the metadata is not null - // Case 1: when there is no metadata - null - zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); - verify(zkClient, times(1)).writeDataGetStat(metadataPath, newMetadata, 0); + // Update the metadata on prod with new offset + zkAdminTopicMetadataAccessor.updateMetadata(clusterName, metadataDelta); - // Case 2: the metadata is not null - zkAdminTopicMetadataAccessor.updateMetadata(clusterName, newMetadata); - verify(zkClient, times(1)).writeDataGetStat(metadataPath, updatedMetadata, 0); + // The updated metadata should be the original metadata with the offset updated + Map updatedMetadata = AdminTopicMetadataAccessor + .generateMetadataMap(Optional.of(100L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); // Verify that the metadata path got read 2 times - verify(zkClient, times(2)).readData(metadataPath, readStat); + verify(zkClient, times(1)).readData(metadataPath, readStat); + + // Verify that the metadata path got written with the correct updated metadata + verify(zkClient, times(1)).writeDataGetStat(metadataPath, updatedMetadata, 0); } } From d78773a76084ce124fc4255c88557a9415995384 Mon Sep 17 00:00:00 2001 From: Minh Nguyen Date: Thu, 13 Feb 2025 16:49:36 -0800 Subject: [PATCH 22/22] Fix comments and add variables --- .../controller/TestZkAdminTopicMetadataAccessor.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java index ebc9f957b5c..78eceb70cf5 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkAdminTopicMetadataAccessor.java @@ -63,14 +63,16 @@ public void testUpdateMetadataWhenMetadataIsEmpty() { @Test public void testUpdateMetadataWithFullMetadata() { String clusterName = "test-cluster"; + Long originalOffset = 1L; + Long newOffset = 100L; // Original metadata Map currentMetadata = AdminTopicMetadataAccessor - .generateMetadataMap(Optional.of(1L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + .generateMetadataMap(Optional.of(originalOffset), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); // metadata that we are trying to update Map metadataDelta = new HashMap<>(); - metadataDelta.put("offset", 100L); + metadataDelta.put("offset", newOffset); String metadataPath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName); try (MockedStatic dataTreeMockedStatic = Mockito.mockStatic(DataTree.class)) { @@ -84,9 +86,9 @@ public void testUpdateMetadataWithFullMetadata() { // The updated metadata should be the original metadata with the offset updated Map updatedMetadata = AdminTopicMetadataAccessor - .generateMetadataMap(Optional.of(100L), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); + .generateMetadataMap(Optional.of(newOffset), Optional.of(-1L), Optional.of(1L), Optional.of(18L)); - // Verify that the metadata path got read 2 times + // Verify that the metadata path got read 1 times verify(zkClient, times(1)).readData(metadataPath, readStat); // Verify that the metadata path got written with the correct updated metadata