Skip to content

Commit

Permalink
[Feature]增加Truncate数据功能(didi#1043)
Browse files Browse the repository at this point in the history
  • Loading branch information
duanxiaoqiu committed Jun 21, 2023
1 parent e1e02f7 commit 5a38899
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public interface OpTopicManager {
* 扩分区
*/
Result<Void> expandTopic(TopicExpansionDTO dto, String operator);

/**
* 清空Topic
*/
Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ public Result<Void> expandTopic(TopicExpansionDTO dto, String operator) {
return rv;
}

@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicParam(clusterPhyId, topicName), operator);
if (rv.failed()) {
return rv;
}

return Result.buildSuc();
}

/**************************************************** private method ****************************************************/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum OperationEnum {

RESTART(11, "重启"),

TRUNCATE(12, "清空"),

;

OperationEnum(int code, String desc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public interface OpTopicService {
* 扩分区
*/
Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String operator);

/**
* 清空topic消息
*/
Result<Void> truncateTopic(TopicParam param, String operator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Option;
Expand All @@ -57,6 +58,7 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
private static final String TOPIC_CREATE = "createTopic";
private static final String TOPIC_DELETE = "deleteTopic";
private static final String TOPIC_EXPAND = "expandTopic";
private static final String TOPIC_TRUNCATE = "truncateTopic";

@Autowired
private TopicService topicService;
Expand Down Expand Up @@ -92,6 +94,8 @@ private void init() {

registerVCHandler(TOPIC_EXPAND, V_0_10_0_0, V_0_11_0_3, "expandTopicByZKClient", this::expandTopicByZKClient);
registerVCHandler(TOPIC_EXPAND, V_0_11_0_3, V_MAX, "expandTopicByKafkaClient", this::expandTopicByKafkaClient);

registerVCHandler(TOPIC_TRUNCATE, V_0_11_0_0, V_MAX, "truncateTopicByKafkaClient", this::truncateTopicByKafkaClient);
}

@Override
Expand Down Expand Up @@ -203,9 +207,78 @@ public Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String op
return rv;
}

@Override
public Result<Void> truncateTopic(TopicParam param, String operator) {
try {
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param);
if (rv.failed()) {
return rv;
}

// 删除DB中的Topic数据
topicService.deleteTopicInDB(param.getClusterPhyId(), param.getTopicName());

//解除高可用Topic关联
List<HaActiveStandbyRelation> haActiveStandbyRelations = haActiveStandbyRelationService.listByClusterAndType(param.getClusterPhyId(), HaResTypeEnum.MIRROR_TOPIC);
for (HaActiveStandbyRelation activeStandbyRelation : haActiveStandbyRelations) {
if (activeStandbyRelation.getResName().equals(param.getTopicName())) {
try {
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(activeStandbyRelation.getStandbyClusterPhyId());
Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", activeStandbyRelation.getResName());
if (haTopics.size() != 0) {
kafkaZkClient.setOrCreateEntityConfigs("ha-topics", activeStandbyRelation.getResName(), new Properties());
kafkaZkClient.createConfigChangeNotification("ha-topics/" + activeStandbyRelation.getResName());
}
haActiveStandbyRelationService.batchDeleteTopicHA(activeStandbyRelation.getActiveClusterPhyId(), activeStandbyRelation.getStandbyClusterPhyId(), Collections.singletonList(activeStandbyRelation.getResName()));
} catch (Exception e) {
log.error("method=truncateTopic||topicName:{}||errMsg=exception", activeStandbyRelation.getResName(), e);
return Result.buildFailure(e.getMessage());
}
}
}

// 记录操作
OplogDTO oplogDTO = new OplogDTO(operator,
OperationEnum.TRUNCATE.getDesc(),
ModuleEnum.KAFKA_TOPIC.getDesc(),
MsgConstant.getTopicBizStr(param.getClusterPhyId(), param.getTopicName()),
String.format("清空Topic:[%s]", param.toString()));
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
return rv;
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}

/**************************************************** private method ****************************************************/

private Result<Void> truncateTopicByKafkaClient(VersionItemParam itemParam) {
TopicParam param = (TopicParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
//获取topic的分区信息
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()));
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();

Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(-1);

descriptionMap.forEach((topicName, topicDescription) -> {
for (TopicPartitionInfo topicPartition : topicDescription.partitions()) {
recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset);
}
});

DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
deleteRecordsResult.all().get();
} catch (Exception e) {
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{}", param.getClusterPhyId(), param.getTopicName(), e);

return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
}

return Result.buildSuc();
}

private Result<Void> deleteByKafkaClient(VersionItemParam itemParam) {
TopicParam param = (TopicParam) itemParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public Result<Void> expandTopics(@Validated @RequestBody TopicExpansionDTO dto)
return opTopicManager.expandTopic(dto, HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic数据清空", notes = "")
@PostMapping(value = "topics/truncate-topic")
@ResponseBody
public Result<Void> truncateTopic(@Validated @RequestBody ClusterTopicDTO dto) {
return opTopicManager.truncateTopic(dto.getClusterId(), dto.getTopicName(), HttpRequestUtil.getOperator());
}

@ApiOperation(value = "Topic元信息", notes = "带是否存在信息")
@GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/metadata-combine-exist")
@ResponseBody
Expand Down

0 comments on commit 5a38899

Please sign in to comment.