diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java index 6c6416c3e0d..a5b19159c48 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/controller/MetricsDataController.java @@ -37,7 +37,7 @@ import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader; import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage; -import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader; import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -56,13 +56,13 @@ public class MetricsDataController { private static final Integer METRIC_FULL_LENGTH = 3; - private final List realTimeDataStorages; + private final List realTimeDataReaders; private final List historyDataReaders; - public MetricsDataController(List realTimeDataStorages, + public MetricsDataController(List realTimeDataReaders, List historyDataReaders) { - this.realTimeDataStorages = realTimeDataStorages; + this.realTimeDataReaders = realTimeDataReaders; this.historyDataReaders = historyDataReaders; } @@ -87,8 +87,8 @@ public ResponseEntity> getMetricsData( @PathVariable Long monitorId, @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics) { - AbstractRealTimeDataStorage realTimeDataStorage = realTimeDataStorages.stream() - .filter(AbstractRealTimeDataStorage::isServerAvailable) + RealTimeDataReader realTimeDataReader = realTimeDataReaders.stream() + .filter(RealTimeDataReader::isServerAvailable) .max((o1, o2) -> { if (o1 instanceof MemoryDataStorage) { return -1; @@ -98,10 +98,10 @@ public ResponseEntity> getMetricsData( return 0; } }).orElse(null); - if (realTimeDataStorage == null) { + if (realTimeDataReader == null) { return ResponseEntity.ok(Message.fail(FAIL_CODE, "real time store not available")); } - CollectRep.MetricsData storageData = realTimeDataStorage.getCurrentMetricsData(monitorId, metrics); + CollectRep.MetricsData storageData = realTimeDataReader.getCurrentMetricsData(monitorId, metrics); if (storageData == null) { return ResponseEntity.ok(Message.success("query metrics data is empty")); } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java index f5977545948..0451200c6ab 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/DataStorageDispatch.java @@ -24,7 +24,7 @@ import org.apache.hertzbeat.warehouse.WarehouseWorkerPool; import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter; import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage; -import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter; import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage; import org.springframework.stereotype.Component; @@ -38,27 +38,27 @@ public class DataStorageDispatch { private final CommonDataQueue commonDataQueue; private final WarehouseWorkerPool workerPool; private final List historyDataWriters; - private final List realTimeDataStorages; + private final List realTimeDataWriters; public DataStorageDispatch(CommonDataQueue commonDataQueue, WarehouseWorkerPool workerPool, List historyDataWriters, - List realTimeDataStorages) { + List realTimeDataWriters) { this.commonDataQueue = commonDataQueue; this.workerPool = workerPool; this.historyDataWriters = historyDataWriters; - this.realTimeDataStorages = realTimeDataStorages; + this.realTimeDataWriters = realTimeDataWriters; startPersistentDataStorage(); startRealTimeDataStorage(); } private void startRealTimeDataStorage() { - if (realTimeDataStorages == null || realTimeDataStorages.isEmpty()) { + if (realTimeDataWriters == null || realTimeDataWriters.isEmpty()) { log.info("no real time data storage start"); return; } - if (realTimeDataStorages.size() > 1) { - realTimeDataStorages.removeIf(MemoryDataStorage.class::isInstance); + if (realTimeDataWriters.size() > 1) { + realTimeDataWriters.removeIf(MemoryDataStorage.class::isInstance); } Runnable runnable = () -> { Thread.currentThread().setName("warehouse-realtime-data-storage"); @@ -68,8 +68,8 @@ private void startRealTimeDataStorage() { if (metricsData == null) { continue; } - for (AbstractRealTimeDataStorage realTimeDataStorage : realTimeDataStorages) { - realTimeDataStorage.saveData(metricsData); + for (RealTimeDataWriter realTimeDataWriter : realTimeDataWriters) { + realTimeDataWriter.saveData(metricsData); } } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java index d5f631c5369..04eb33b33ad 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/AbstractRealTimeDataStorage.java @@ -17,45 +17,22 @@ package org.apache.hertzbeat.warehouse.store.realtime; -import java.util.List; import lombok.extern.slf4j.Slf4j; -import org.apache.hertzbeat.common.entity.message.CollectRep; import org.springframework.beans.factory.DisposableBean; -import org.springframework.lang.NonNull; /** * Real-time data storage abstract class */ @Slf4j -public abstract class AbstractRealTimeDataStorage implements DisposableBean { +public abstract class AbstractRealTimeDataStorage implements RealTimeDataReader, RealTimeDataWriter, DisposableBean { protected boolean serverAvailable; /** * @return data Whether the storage is available */ + @Override public boolean isServerAvailable() { return serverAvailable; } - - /** - * save collect metrics data - * @param metricsData metrics data - */ - public abstract void saveData(CollectRep.MetricsData metricsData); - - /** - * query real-time last metrics data - * @param monitorId monitorId - * @param metric metric name - * @return metrics data - */ - public abstract CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId, @NonNull String metric); - - /** - * query real-time last metrics data - * @param monitorId monitor id - * @return metrics data - */ - public abstract List getCurrentMetricsData(@NonNull Long monitorId); } diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java new file mode 100644 index 00000000000..3d01dc67859 --- /dev/null +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataReader.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.warehouse.store.realtime; + +import java.util.List; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.springframework.lang.NonNull; + + +/** + * Real-time data reading class + */ +public interface RealTimeDataReader { + + /** + * @return data storage available + */ + boolean isServerAvailable(); + + /** + * query real-time last metrics data + * @param monitorId monitorId + * @param metric metric name + * @return metrics data + */ + CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId, @NonNull String metric); + + /** + * query real-time last metrics data + * @param monitorId monitor id + * @return metrics data + */ + List getCurrentMetricsData(@NonNull Long monitorId); + +} diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java new file mode 100644 index 00000000000..46eaeed0491 --- /dev/null +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/realtime/RealTimeDataWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.warehouse.store.realtime; + +import org.apache.hertzbeat.common.entity.message.CollectRep; + +/** + * Real-time data writing class + */ +public interface RealTimeDataWriter { + + /** + * @return data storage available + */ + boolean isServerAvailable(); + + /** + * save metrics data + * @param metricsData metrics data + */ + void saveData(CollectRep.MetricsData metricsData); + +} diff --git a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java index baf99b0ef31..0a2fdfb6017 100644 --- a/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java +++ b/warehouse/src/test/java/org/apache/hertzbeat/warehouse/controller/MetricsDataControllerTest.java @@ -34,7 +34,7 @@ import org.apache.hertzbeat.common.entity.dto.Value; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader; -import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage; +import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -62,17 +62,17 @@ class MetricsDataControllerTest { HistoryDataReader historyDataReader; @Mock - AbstractRealTimeDataStorage realTimeDataStorage; + RealTimeDataReader realTimeDataReader; private List historyDataReaders = new LinkedList<>(); - private List realTimeDataStorages = new LinkedList<>(); + private List realTimeDataReaders = new LinkedList<>(); @BeforeEach void setUp() { historyDataReaders.add(historyDataReader); - realTimeDataStorages.add(realTimeDataStorage); - metricsDataController = new MetricsDataController(realTimeDataStorages, historyDataReaders); + realTimeDataReaders.add(realTimeDataReader); + metricsDataController = new MetricsDataController(realTimeDataReaders, historyDataReaders); this.mockMvc = MockMvcBuilders.standaloneSetup(metricsDataController).build(); } @@ -101,8 +101,8 @@ void getMetricsData() throws Exception { final long time = System.currentTimeMillis(); final String getUrl = "/api/monitor/" + monitorId + "/metrics/" + metric; - when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(null); - when(realTimeDataStorage.isServerAvailable()).thenReturn(true); + when(realTimeDataReader.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(null); + when(realTimeDataReader.isServerAvailable()).thenReturn(true); this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl)) .andExpect(status().isOk()) .andExpect(jsonPath("$.code").value((int) CommonConstants.SUCCESS_CODE)) @@ -116,8 +116,8 @@ void getMetricsData() throws Exception { .setMetrics(metric) .setTime(time) .build(); - when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(metricsData); - when(realTimeDataStorage.isServerAvailable()).thenReturn(true); + when(realTimeDataReader.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(metricsData); + when(realTimeDataReader.isServerAvailable()).thenReturn(true); this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl)) .andExpect(status().isOk()) .andExpect(jsonPath("$.code").value((int) CommonConstants.SUCCESS_CODE))