Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactoring] Split AbstractRealTimeDataStorage class #1935

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -56,13 +56,13 @@ public class MetricsDataController {

private static final Integer METRIC_FULL_LENGTH = 3;

private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
private final List<RealTimeDataReader> realTimeDataReaders;

private final List<HistoryDataReader> historyDataReaders;

public MetricsDataController(List<AbstractRealTimeDataStorage> realTimeDataStorages,
public MetricsDataController(List<RealTimeDataReader> realTimeDataReaders,
List<HistoryDataReader> historyDataReaders) {
this.realTimeDataStorages = realTimeDataStorages;
this.realTimeDataReaders = realTimeDataReaders;
this.historyDataReaders = historyDataReaders;
}

Expand All @@ -87,8 +87,8 @@ public ResponseEntity<Message<MetricsData>> getMetricsData(
@PathVariable Long monitorId,
@Parameter(description = "Metrics Name", example = "cpu")
@PathVariable String metrics) {
AbstractRealTimeDataStorage realTimeDataStorage = realTimeDataStorages.stream()
.filter(AbstractRealTimeDataStorage::isServerAvailable)
RealTimeDataReader realTimeDataReader = realTimeDataReaders.stream()
.filter(RealTimeDataReader::isServerAvailable)
.max((o1, o2) -> {
if (o1 instanceof MemoryDataStorage) {
return -1;
Expand All @@ -98,10 +98,10 @@ public ResponseEntity<Message<MetricsData>> getMetricsData(
return 0;
}
}).orElse(null);
if (realTimeDataStorage == null) {
if (realTimeDataReader == null) {
return ResponseEntity.ok(Message.fail(FAIL_CODE, "real time store not available"));
}
CollectRep.MetricsData storageData = realTimeDataStorage.getCurrentMetricsData(monitorId, metrics);
CollectRep.MetricsData storageData = realTimeDataReader.getCurrentMetricsData(monitorId, metrics);
if (storageData == null) {
return ResponseEntity.ok(Message.success("query metrics data is empty"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hertzbeat.warehouse.WarehouseWorkerPool;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataWriter;
import org.apache.hertzbeat.warehouse.store.history.jpa.JpaDatabaseDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataWriter;
import org.apache.hertzbeat.warehouse.store.realtime.memory.MemoryDataStorage;
import org.springframework.stereotype.Component;

Expand All @@ -38,27 +38,27 @@ public class DataStorageDispatch {
private final CommonDataQueue commonDataQueue;
private final WarehouseWorkerPool workerPool;
private final List<HistoryDataWriter> historyDataWriters;
private final List<AbstractRealTimeDataStorage> realTimeDataStorages;
private final List<RealTimeDataWriter> realTimeDataWriters;

public DataStorageDispatch(CommonDataQueue commonDataQueue,
WarehouseWorkerPool workerPool,
List<HistoryDataWriter> historyDataWriters,
List<AbstractRealTimeDataStorage> realTimeDataStorages) {
List<RealTimeDataWriter> realTimeDataWriters) {
this.commonDataQueue = commonDataQueue;
this.workerPool = workerPool;
this.historyDataWriters = historyDataWriters;
this.realTimeDataStorages = realTimeDataStorages;
this.realTimeDataWriters = realTimeDataWriters;
startPersistentDataStorage();
startRealTimeDataStorage();
}

private void startRealTimeDataStorage() {
if (realTimeDataStorages == null || realTimeDataStorages.isEmpty()) {
if (realTimeDataWriters == null || realTimeDataWriters.isEmpty()) {
log.info("no real time data storage start");
return;
}
if (realTimeDataStorages.size() > 1) {
realTimeDataStorages.removeIf(MemoryDataStorage.class::isInstance);
if (realTimeDataWriters.size() > 1) {
realTimeDataWriters.removeIf(MemoryDataStorage.class::isInstance);
}
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-realtime-data-storage");
Expand All @@ -68,8 +68,8 @@ private void startRealTimeDataStorage() {
if (metricsData == null) {
continue;
}
for (AbstractRealTimeDataStorage realTimeDataStorage : realTimeDataStorages) {
realTimeDataStorage.saveData(metricsData);
for (RealTimeDataWriter realTimeDataWriter : realTimeDataWriters) {
realTimeDataWriter.saveData(metricsData);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,22 @@

package org.apache.hertzbeat.warehouse.store.realtime;

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;

/**
* Real-time data storage abstract class
*/
@Slf4j
public abstract class AbstractRealTimeDataStorage implements DisposableBean {
public abstract class AbstractRealTimeDataStorage implements RealTimeDataReader, RealTimeDataWriter, DisposableBean {

protected boolean serverAvailable;

/**
* @return data Whether the storage is available
*/
@Override
public boolean isServerAvailable() {
return serverAvailable;
}

/**
* save collect metrics data
* @param metricsData metrics data
*/
public abstract void saveData(CollectRep.MetricsData metricsData);

/**
* query real-time last metrics data
* @param monitorId monitorId
* @param metric metric name
* @return metrics data
*/
public abstract CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId, @NonNull String metric);

/**
* query real-time last metrics data
* @param monitorId monitor id
* @return metrics data
*/
public abstract List<CollectRep.MetricsData> getCurrentMetricsData(@NonNull Long monitorId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.warehouse.store.realtime;

import java.util.List;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.springframework.lang.NonNull;


/**
* Real-time data reading class
*/
public interface RealTimeDataReader {

/**
* @return data storage available
*/
boolean isServerAvailable();

/**
* query real-time last metrics data
* @param monitorId monitorId
* @param metric metric name
* @return metrics data
*/
CollectRep.MetricsData getCurrentMetricsData(@NonNull Long monitorId, @NonNull String metric);

/**
* query real-time last metrics data
* @param monitorId monitor id
* @return metrics data
*/
List<CollectRep.MetricsData> getCurrentMetricsData(@NonNull Long monitorId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.warehouse.store.realtime;

import org.apache.hertzbeat.common.entity.message.CollectRep;

/**
* Real-time data writing class
*/
public interface RealTimeDataWriter {

/**
* @return data storage available
*/
boolean isServerAvailable();

/**
* save metrics data
* @param metricsData metrics data
*/
void saveData(CollectRep.MetricsData metricsData);

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.hertzbeat.common.entity.dto.Value;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.warehouse.store.history.HistoryDataReader;
import org.apache.hertzbeat.warehouse.store.realtime.AbstractRealTimeDataStorage;
import org.apache.hertzbeat.warehouse.store.realtime.RealTimeDataReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -62,17 +62,17 @@ class MetricsDataControllerTest {
HistoryDataReader historyDataReader;

@Mock
AbstractRealTimeDataStorage realTimeDataStorage;
RealTimeDataReader realTimeDataReader;

private List<HistoryDataReader> historyDataReaders = new LinkedList<>();

private List<AbstractRealTimeDataStorage> realTimeDataStorages = new LinkedList<>();
private List<RealTimeDataReader> realTimeDataReaders = new LinkedList<>();

@BeforeEach
void setUp() {
historyDataReaders.add(historyDataReader);
realTimeDataStorages.add(realTimeDataStorage);
metricsDataController = new MetricsDataController(realTimeDataStorages, historyDataReaders);
realTimeDataReaders.add(realTimeDataReader);
metricsDataController = new MetricsDataController(realTimeDataReaders, historyDataReaders);
this.mockMvc = MockMvcBuilders.standaloneSetup(metricsDataController).build();
}

Expand Down Expand Up @@ -101,8 +101,8 @@ void getMetricsData() throws Exception {
final long time = System.currentTimeMillis();
final String getUrl = "/api/monitor/" + monitorId + "/metrics/" + metric;

when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(null);
when(realTimeDataStorage.isServerAvailable()).thenReturn(true);
when(realTimeDataReader.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(null);
when(realTimeDataReader.isServerAvailable()).thenReturn(true);
this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl))
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int) CommonConstants.SUCCESS_CODE))
Expand All @@ -116,8 +116,8 @@ void getMetricsData() throws Exception {
.setMetrics(metric)
.setTime(time)
.build();
when(realTimeDataStorage.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(metricsData);
when(realTimeDataStorage.isServerAvailable()).thenReturn(true);
when(realTimeDataReader.getCurrentMetricsData(eq(monitorId), eq(metric))).thenReturn(metricsData);
when(realTimeDataReader.isServerAvailable()).thenReturn(true);
this.mockMvc.perform(MockMvcRequestBuilders.get(getUrl))
.andExpect(status().isOk())
.andExpect(jsonPath("$.code").value((int) CommonConstants.SUCCESS_CODE))
Expand Down
Loading