diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/OpTopicManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/OpTopicManager.java index 5c3fa742a..90b7e241f 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/OpTopicManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/OpTopicManager.java @@ -19,4 +19,9 @@ public interface OpTopicManager { * 扩分区 */ Result expandTopic(TopicExpansionDTO dto, String operator); + + /** + * 清空Topic + */ + Result truncateTopic(Long clusterPhyId, String topicName, String operator); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java index 5d27ed746..22d204ea5 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java @@ -10,10 +10,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; @@ -156,6 +158,16 @@ public Result expandTopic(TopicExpansionDTO dto, String operator) { return rv; } + @Override + public Result truncateTopic(Long clusterPhyId, String topicName, String operator) { + // 清空Topic + Result rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator); + if (rv.failed()) { + return rv; + } + + return Result.buildSuc(); + } /**************************************************** private method ****************************************************/ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicTruncateParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicTruncateParam.java new file mode 100644 index 000000000..8186b3e5f --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/topic/TopicTruncateParam.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TopicTruncateParam extends ClusterPhyParam { + protected String topicName; + protected long offset; + + public TopicTruncateParam(Long clusterPhyId, String topicName, long offset) { + super(clusterPhyId); + this.topicName = topicName; + this.offset = offset; + } + + @Override + public String toString() { + return "TopicParam{" + + "clusterPhyId=" + clusterPhyId + + ", topicName='" + topicName + '\'' + + ", offset='" + offset + '\'' + + '}'; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java index 465f6f8ac..a3f959bf3 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/KafkaConstant.java @@ -49,6 +49,8 @@ public class KafkaConstant { public static final Map KAFKA_ALL_CONFIG_DEF_MAP = new ConcurrentHashMap<>(); + public static final Integer TOPICK_TRUNCATE_DEFAULT_OFFSET = -1; + static { try { KAFKA_ALL_CONFIG_DEF_MAP.putAll(CollectionConverters.asJava(LogConfig$.MODULE$.configKeys())); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/OperationEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/OperationEnum.java index 302cb38b0..da25bc14f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/OperationEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/operaterecord/OperationEnum.java @@ -32,6 +32,8 @@ public enum OperationEnum { RESTART(11, "重启"), + TRUNCATE(12, "清空"), + ; OperationEnum(int code, String desc) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/OpTopicService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/OpTopicService.java index 3b529f817..1f656a6e1 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/OpTopicService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/OpTopicService.java @@ -3,6 +3,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; @@ -21,4 +22,9 @@ public interface OpTopicService { * 扩分区 */ Result expandTopic(TopicPartitionExpandParam expandParam, String operator); + + /** + * 清空topic消息 + */ + Result truncateTopic(TopicTruncateParam param, String operator); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java index 466f7a2fa..bb3e553d6 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java @@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; @@ -33,6 +34,7 @@ import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Option; @@ -57,6 +59,7 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement private static final String TOPIC_CREATE = "createTopic"; private static final String TOPIC_DELETE = "deleteTopic"; private static final String TOPIC_EXPAND = "expandTopic"; + private static final String TOPIC_TRUNCATE = "truncateTopic"; @Autowired private TopicService topicService; @@ -92,6 +95,8 @@ private void init() { registerVCHandler(TOPIC_EXPAND, V_0_10_0_0, V_0_11_0_3, "expandTopicByZKClient", this::expandTopicByZKClient); registerVCHandler(TOPIC_EXPAND, V_0_11_0_3, V_MAX, "expandTopicByKafkaClient", this::expandTopicByKafkaClient); + + registerVCHandler(TOPIC_TRUNCATE, V_0_11_0_0, V_MAX, "truncateTopicByKafkaClient", this::truncateTopicByKafkaClient); } @Override @@ -203,9 +208,58 @@ public Result expandTopic(TopicPartitionExpandParam expandParam, String op return rv; } + @Override + public Result truncateTopic(TopicTruncateParam param, String operator) { + try { + // 清空topic数据 + Result rv = (Result) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param); + + if (rv == null || rv.failed()) { + return rv; + } + + // 记录操作 + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.TRUNCATE.getDesc(), + ModuleEnum.KAFKA_TOPIC.getDesc(), + MsgConstant.getTopicBizStr(param.getClusterPhyId(), param.getTopicName()), + String.format("清空Topic:[%s]", param.toString())); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + return rv; + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } /**************************************************** private method ****************************************************/ + private Result truncateTopicByKafkaClient(VersionItemParam itemParam) { + TopicTruncateParam param = (TopicTruncateParam) itemParam; + try { + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); + //获取topic的分区信息 + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()), new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); + Map descriptionMap = describeTopicsResult.all().get(); + + Map recordsToDelete = new HashMap<>(); + RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(param.getOffset()); + + descriptionMap.forEach((topicName, topicDescription) -> { + for (TopicPartitionInfo topicPartition : topicDescription.partitions()) { + recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset); + } + }); + + DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete, new DeleteRecordsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)); + deleteRecordsResult.all().get(); + } catch (Exception e) { + log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{} offset:{}", param.getClusterPhyId(), param.getTopicName(), param.getOffset(), e); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildSuc(); + } private Result deleteByKafkaClient(VersionItemParam itemParam) { TopicParam param = (TopicParam) itemParam; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java index 3f4c0a687..db8baa4aa 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java @@ -36,6 +36,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { private static final String FE_HA_CREATE_MIRROR_TOPIC = "FEHaCreateMirrorTopic"; private static final String FE_HA_DELETE_MIRROR_TOPIC = "FEHaDeleteMirrorTopic"; + private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic"; + public FrontEndControlVersionItems(){} @Override @@ -89,6 +91,10 @@ public List init(){ itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX) .name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制")); + //truncate topic + itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX) + .name(FE_TRUNCATE_TOPIC).desc("清空topic")); + return itemList; } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicController.java index a2022b8a5..9eaee2df4 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/topic/TopicController.java @@ -61,6 +61,13 @@ public Result expandTopics(@Validated @RequestBody TopicExpansionDTO dto) return opTopicManager.expandTopic(dto, HttpRequestUtil.getOperator()); } + @ApiOperation(value = "Topic数据清空", notes = "") + @PostMapping(value = "topics/truncate-topic") + @ResponseBody + public Result truncateTopic(@Validated @RequestBody ClusterTopicDTO dto) { + return opTopicManager.truncateTopic(dto.getClusterId(), dto.getTopicName(), HttpRequestUtil.getOperator()); + } + @ApiOperation(value = "Topic元信息", notes = "带是否存在信息") @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/metadata-combine-exist") @ResponseBody