Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]增加Truncate数据功能(didi#1043) #1062

Merged
merged 2 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public interface OpTopicManager {
* 扩分区
*/
Result<Void> expandTopic(TopicExpansionDTO dto, String operator);

/**
* 清空Topic
*/
Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ public Result<Void> expandTopic(TopicExpansionDTO dto, String operator) {
return rv;
}

@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicParam(clusterPhyId, topicName), operator);
ZQKC marked this conversation as resolved.
Show resolved Hide resolved
if (rv.failed()) {
return rv;
}

return Result.buildSuc();
}

/**************************************************** private method ****************************************************/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum OperationEnum {

RESTART(11, "重启"),

TRUNCATE(12, "清空"),

;

OperationEnum(int code, String desc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public interface OpTopicService {
* 扩分区
*/
Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String operator);

/**
* 清空topic消息
*/
Result<Void> truncateTopic(TopicParam param, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Option;
Expand All @@ -57,6 +58,7 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
private static final String TOPIC_CREATE = "createTopic";
private static final String TOPIC_DELETE = "deleteTopic";
private static final String TOPIC_EXPAND = "expandTopic";
private static final String TOPIC_TRUNCATE = "truncateTopic";

@Autowired
private TopicService topicService;
Expand Down Expand Up @@ -92,6 +94,8 @@ private void init() {

registerVCHandler(TOPIC_EXPAND, V_0_10_0_0, V_0_11_0_3, "expandTopicByZKClient", this::expandTopicByZKClient);
registerVCHandler(TOPIC_EXPAND, V_0_11_0_3, V_MAX, "expandTopicByKafkaClient", this::expandTopicByKafkaClient);

registerVCHandler(TOPIC_TRUNCATE, V_0_11_0_0, V_MAX, "truncateTopicByKafkaClient", this::truncateTopicByKafkaClient);
}

@Override
Expand Down Expand Up @@ -203,9 +207,58 @@ public Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String op
return rv;
}

@Override
public Result<Void> truncateTopic(TopicParam param, String operator) {
try {
// 清空topic数据
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param);

if (rv == null || rv.failed()) {
return rv;
}

// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.TRUNCATE.getDesc(),
ModuleEnum.KAFKA_TOPIC.getDesc(),
MsgConstant.getTopicBizStr(param.getClusterPhyId(), param.getTopicName()),
String.format("清空Topic:[%s]", param.toString()));
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

/**************************************************** private method ****************************************************/

private Result<Void> truncateTopicByKafkaClient(VersionItemParam itemParam) {
ZQKC marked this conversation as resolved.
Show resolved Hide resolved
TopicParam param = (TopicParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
//获取topic的分区信息
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()));
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();

Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(-1);

descriptionMap.forEach((topicName, topicDescription) -> {
for (TopicPartitionInfo topicPartition : topicDescription.partitions()) {
recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset);
}
});

DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
deleteRecordsResult.all().get();
} catch (Exception e) {
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{}", param.getClusterPhyId(), param.getTopicName(), e);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private Result<Void> deleteByKafkaClient(VersionItemParam itemParam) {
TopicParam param = (TopicParam) itemParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public Result<Void> expandTopics(@Validated @RequestBody TopicExpansionDTO dto)
return opTopicManager.expandTopic(dto, HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic数据清空", notes = "")
@PostMapping(value = "topics/truncate-topic")
@ResponseBody
public Result<Void> truncateTopic(@Validated @RequestBody ClusterTopicDTO dto) {
return opTopicManager.truncateTopic(dto.getClusterId(), dto.getTopicName(), HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic元信息", notes = "带是否存在信息")
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/metadata-combine-exist")
@ResponseBody
Expand Down