Skip to content

Commit

Permalink
[improve](routine-load) add retry when get Kafka meta info (#35376)
Browse files Browse the repository at this point in the history
If be down when FE send RPC `getInfo` or meet network error when be send
RPC to Kafka, routine load job will pause.
To keep routine load stable, add retry when get Kafka meta info.
  • Loading branch information
sollhui authored Jun 5, 2024
1 parent 5b31cea commit f732052
Showing 1 changed file with 66 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,11 @@
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;

public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
TNetworkAddress address = null;
Backend be = null;
long beId = -1L;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get all partitions. No alive backends");
}
Collections.shuffle(backendIds);
beId = backendIds.get(0);
be = Env.getCurrentSystemInfo().getBackend(beId);
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -72,21 +59,10 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
)
)
).build();

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
} else {
return result.getKafkaMetaResult().getPartitionIdsList();
}
return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId
+ "]. error: " + e.getMessage());
"Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
}
}

Expand All @@ -96,20 +72,10 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets)
throws LoadException {
TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get offset for times. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -131,24 +97,17 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St

InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
}
return partitionOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
Expand All @@ -159,21 +118,11 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Integer> partitionIds) throws LoadException {
TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get latest offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -193,25 +142,18 @@ public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
}
return partitionOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
Expand Down Expand Up @@ -239,17 +181,7 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String
return offsets;
}

TNetworkAddress address = null;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get real offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -270,27 +202,56 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
realOffsets.addAll(partitionOffsets);
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
return realOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
realOffsets.addAll(partitionOffsets);
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
return realOffsets;
} catch (Exception e) {
LOG.warn("failed to get real offsets.", e);
throw new LoadException(
"Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage());
}
}

private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}

throw new LoadException("Failed to get info");
}
}

0 comments on commit f732052

Please sign in to comment.