Skip to content

Commit

Permalink
Convert partialUpdate to updateMetadata and using compareAndUpdate
Browse files Browse the repository at this point in the history
  • Loading branch information
Minh Nguyen committed Feb 8, 2025
1 parent 00a12ae commit f1479e6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public abstract class AdminTopicMetadataAccessor {

/**
* @return a map with {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}, {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY},
* {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY} specified to input values.
* {@linkplain AdminTopicMetadataAccessor#EXECUTION_ID_KEY}, {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY}
* specified to input values.
*/
public static Map<String, Long> generateMetadataMap(
Optional<Long> localOffset,
Expand Down Expand Up @@ -72,12 +73,15 @@ public static long getExecutionId(Map<String, Long> metadata) {
return metadata.getOrDefault(EXECUTION_ID_KEY, UNDEFINED_VALUE);
}

/**
* @return the value to which the specified key is mapped to {@linkplain AdminTopicMetadataAccessor#ADMIN_OPERATION_PROTOCOL_VERSION_KEY}.
*/
public static long getAdminOperationProtocolVersion(Map<String, Long> metadata) {
return metadata.getOrDefault(ADMIN_OPERATION_PROTOCOL_VERSION_KEY, UNDEFINED_VALUE);
}

/**
* Update all relevant metadata for a given cluster in a single transaction.
* Update all relevant metadata for a given cluster in a single transaction with information provided in metadata.
* @param clusterName of the cluster at interest.
* @param metadata map containing relevant information.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,24 @@ public ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer ad
zkMapAccessor = new ZkBaseDataAccessor<>(zkClient);
}

public synchronized void partialUpdateMetadata(String clusterName, Map<String, Long> metadata) {
Map<String, Long> currentMetadata = getMetadata(clusterName);

// Update only the keys present in the metadata map.
metadata.forEach(currentMetadata::put);

updateMetadata(clusterName, currentMetadata);
}

/**
* @see AdminTopicMetadataAccessor#updateMetadata(String, Map)
* Update the upstream metadata map for the given cluster with specific information provided in newMetadata
*/
@Override
public synchronized void updateMetadata(String clusterName, Map<String, Long> metadata) {
public void updateMetadata(String clusterName, Map<String, Long> newMetadata) {
String path = getAdminTopicMetadataNodePath(clusterName);
HelixUtils.update(zkMapAccessor, path, metadata, ZK_UPDATE_RETRY);
LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, metadata);
HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> {
currentMetadataMap.putAll(newMetadata);
return currentMetadataMap;
});
LOGGER.info("Persisted admin topic metadata map for cluster: {}, map: {}", clusterName, newMetadata);
}

/**
* @see AdminTopicMetadataAccessor#getMetadata(String)
*/
@Override
public synchronized Map<String, Long> getMetadata(String clusterName) {
public Map<String, Long> getMetadata(String clusterName) {
int retry = ZK_UPDATE_RETRY;
String path = getAdminTopicMetadataNodePath(clusterName);
while (retry > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -187,8 +188,12 @@ public Map<String, Long> getAdminTopicMetadata(String clusterName) {
*/
public void updateAdminTopicMetadata(String clusterName, long executionId, long offset, long upstreamOffset) {
if (clusterName.equals(config.getClusterName())) {
Map<String, Long> metadata = AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId);
adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata);
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) {
Map<String, Long> metadata =
AdminTopicMetadataAccessor.generateMetadataMap(offset, upstreamOffset, executionId);
adminTopicMetadataAccessor.updateMetadata(clusterName, metadata);
}
} else {
throw new VeniceException(
"This AdminConsumptionService is for cluster: " + config.getClusterName()
Expand All @@ -201,12 +206,15 @@ public void updateAdminTopicMetadata(String clusterName, long executionId, long
*/
public void updateAdminOperationProtocolVersion(String clusterName, long adminOperationProtocolVersion) {
if (clusterName.equals(config.getClusterName())) {
Map<String, Long> metadata = AdminTopicMetadataAccessor.generateMetadataMap(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(adminOperationProtocolVersion));
adminTopicMetadataAccessor.partialUpdateMetadata(clusterName, metadata);
try (AutoCloseableLock ignore =
admin.getHelixVeniceClusterResources(clusterName).getClusterLockManager().createClusterWriteLock()) {
Map<String, Long> metadata = AdminTopicMetadataAccessor.generateMetadataMap(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(adminOperationProtocolVersion));
adminTopicMetadataAccessor.updateMetadata(clusterName, metadata);
}
} else {
throw new VeniceException(
"This AdminConsumptionService is for cluster: " + config.getClusterName()
Expand Down

0 comments on commit f1479e6

Please sign in to comment.