Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] change the warehouse properties the type to record #1806

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,209 +24,62 @@

/**
* Data warehouse configuration properties
* @param entrance Data entry configuration properties
* @param store Datastore configuration properties
*/
@ConfigurationProperties(prefix = "warehouse")
public class WarehouseProperties {

/**
* Data entry configuration properties
*/
private EntranceProperties entrance;

/**
* Datastore configuration properties
*/
private StoreProperties store;

public EntranceProperties getEntrance() {
return entrance;
}

public void setEntrance(EntranceProperties entrance) {
this.entrance = entrance;
}

public StoreProperties getStore() {
return store;
}

public void setStore(StoreProperties store) {
this.store = store;
}
public record WarehouseProperties (
EntranceProperties entrance,
StoreProperties store
) {

/**
* Data entry configuration properties
* The entrance can be to obtain data from message middleware such as kafka rabbitmq rocketmq
*/
public static class EntranceProperties {

/**
* kafka configuration information
*/
private KafkaProperties kafka;

public KafkaProperties getKafka() {
return kafka;
}

public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}

public record EntranceProperties(
KafkaProperties kafka
){
/**
* kafka configuration information
*/
public static class KafkaProperties {
/**
* Whether kafka data entry is started
*/
private boolean enabled = true;

/**
* kafka connection server url
*/
private String servers = "127.0.0.1:9092";
/**
* Topic name to receive data
*/
private String topic;
/**
* Consumer group ID
*/
private String groupId;

public boolean isEnabled() {
return enabled;
}

public String getServers() {
return servers;
}

public String getTopic() {
return topic;
}

public String getGroupId() {
return groupId;
}
}

public record KafkaProperties(
@DefaultValue("true") boolean enabled,
@DefaultValue("127.0.0.1:9092") String servers,
String topic,
String groupId
){}
}


/**
* Scheduling data export configuration properties
* @param jpa use mysql/h2 jpa store metrics history data
* @param memory Memory storage configuration information
* @param influxdb influxdb configuration information
* @param redis redis configuration information
* @param victoriaMetrics VictoriaMetrics Properties
* @param tdEngine TdEngine configuration information
* @param iotDb IoTDB configuration information
* @param greptime GrepTimeDB Config
*/
public static class StoreProperties {

/**
* use mysql/h2 jpa store metrics history data
*/
private JpaProperties jpa;

/**
* Memory storage configuration information
*/
private MemoryProperties memory;

/**
* influxdb configuration information
*/
private InfluxdbProperties influxdb;
/**
* redis configuration information
*/
private RedisProperties redis;
/**
* VictoriaMetrics Properties
*/
private VictoriaMetricsProperties victoriaMetrics;
/**
* TdEngine configuration information
*/
private TdEngineProperties tdEngine;
/**
* IoTDB configuration information
*/
private IotDbProperties iotDb;
/**
* GrepTimeDB Config
*/
private GreptimeProperties greptime;

public JpaProperties getJpa() {
return jpa;
}

public void setJpa(JpaProperties jpa) {
this.jpa = jpa;
}

public MemoryProperties getMemory() {
return memory;
}

public void setMemory(MemoryProperties memory) {
this.memory = memory;
}

public InfluxdbProperties getInfluxdb() {
return influxdb;
}

public void setInfluxdb(InfluxdbProperties influxdb) {
this.influxdb = influxdb;
}

public RedisProperties getRedis() {
return redis;
}

public void setRedis(RedisProperties redis) {
this.redis = redis;
}

public VictoriaMetricsProperties getVictoriaMetrics() {
return victoriaMetrics;
}

public void setVictoriaMetrics(VictoriaMetricsProperties victoriaMetrics) {
this.victoriaMetrics = victoriaMetrics;
}

public TdEngineProperties getTdEngine() {
return tdEngine;
}

public void setTdEngine(TdEngineProperties tdEngine) {
this.tdEngine = tdEngine;
}

public IotDbProperties getIotDb() {
return iotDb;
}

public void setIotDb(IotDbProperties iotDb) {
this.iotDb = iotDb;
}

public GreptimeProperties getGreptime() {
return greptime;
}

public void setGreptime(GreptimeProperties greptime) {
this.greptime = greptime;
}

public record StoreProperties(
JpaProperties jpa,
MemoryProperties memory,
InfluxdbProperties influxdb,
RedisProperties redis,
VictoriaMetricsProperties victoriaMetrics,
TdEngineProperties tdEngine,
IotDbProperties iotDb,
GreptimeProperties greptime
){
/**
* Memory storage configuration information
* @param enabled Whether memory data storage is enabled
* @param initSize Memory storage map initialization size
*/
public record MemoryProperties(
@DefaultValue("true") boolean enabled,
@DefaultValue("1024") Integer initSize
@DefaultValue("true") boolean enabled,
@DefaultValue("1024") Integer initSize
){}

/**
Expand All @@ -236,21 +89,21 @@ public record MemoryProperties(
* @param maxHistoryRecordNum The maximum number of history records retained
*/
public record JpaProperties(
@DefaultValue("true") boolean enabled,
@DefaultValue("1h") String expireTime,
@DefaultValue("20000") Integer maxHistoryRecordNum
@DefaultValue("true") boolean enabled,
@DefaultValue("1h") String expireTime,
@DefaultValue("20000") Integer maxHistoryRecordNum
) {}

/**
* Influxdb configuration information
*/
public record InfluxdbProperties(
@DefaultValue("false") boolean enabled,
String serverUrl,
String username,
String password,
@DefaultValue("30d") String expireTime,
@DefaultValue("1") int replication) {}
@DefaultValue("false") boolean enabled,
String serverUrl,
String username,
String password,
@DefaultValue("30d") String expireTime,
@DefaultValue("1") int replication) {}

/**
*
Expand All @@ -262,32 +115,31 @@ public record InfluxdbProperties(
* @param tableStrColumnDefineMaxLength auto create table's string column define max length : NCHAR(200)
*/
public record TdEngineProperties(
@DefaultValue("false") boolean enabled,
@DefaultValue("jdbc:TAOS-RS://localhost:6041/demo") String url,
@DefaultValue("com.taosdata.jdbc.rs.RestfulDriver") String driverClassName,
String username,
String password,
@DefaultValue("200") int tableStrColumnDefineMaxLength) {}
@DefaultValue("false") boolean enabled,
@DefaultValue("jdbc:TAOS-RS://localhost:6041/demo") String url,
@DefaultValue("com.taosdata.jdbc.rs.RestfulDriver") String driverClassName,
String username,
String password,
@DefaultValue("200") int tableStrColumnDefineMaxLength) {}

/**
* Victoriametrics configuration information
*/
public record VictoriaMetricsProperties(
@DefaultValue("false") boolean enabled,
@DefaultValue("http://localhost:8428") String url,
String username,
String password)
{}
@DefaultValue("false") boolean enabled,
@DefaultValue("http://localhost:8428") String url,
String username,
String password) {}

/**
* Redis configuration information
*/
public record RedisProperties(
@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("6379") Integer port,
String password,
@DefaultValue("0") Integer db) {}
@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("6379") Integer port,
String password,
@DefaultValue("0") Integer db) {}

/**
* IotDB configuration information
Expand All @@ -296,31 +148,29 @@ public record RedisProperties(
* @param expireTime save data expire time(ms),-1 means it never expires Data storage time (unit: ms,-1 means never expire)
* Note: Why is String used here instead of Long? At present, the set ttl of IoTDB only supports milliseconds as a unit,
* and other units may be added later, so the String type is used for compatibility Data storage time (unit: ms, -1 means never expires)
*
* Note: Why use String instead of Long here? Currently, IoTDB's set ttl only supports milliseconds as the unit.
* Other units may be added later. In order to be compatible with the future, the String type is used.
*/
public record IotDbProperties(
@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("6667") Integer rpcPort,
String username,
String password,
List<String> nodeUrls,
ZoneId zoneId,
IotDbVersion version,
long queryTimeoutInMs,
String expireTime) {}
@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1") String host,
@DefaultValue("6667") Integer rpcPort,
String username,
String password,
List<String> nodeUrls,
ZoneId zoneId,
IotDbVersion version,
long queryTimeoutInMs,
String expireTime) {}

/**
* GrepTimeDB configuration information
*/
public record GreptimeProperties(
@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1:4001") String endpoint,
String username,
String password) {}

@DefaultValue("false") boolean enabled,
@DefaultValue("127.0.0.1:4001") String endpoint,
String username,
String password) {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class HistoryGrepTimeDbDataStorage extends AbstractHistoryDataStorage {
private GreptimeDB greptimeDb;

public HistoryGrepTimeDbDataStorage(WarehouseProperties properties) {
this.serverAvailable = this.initDbSession(properties.getStore().getGreptime());
this.serverAvailable = this.initDbSession(properties.store().greptime());
}

private boolean initDbSession(WarehouseProperties.StoreProperties.GreptimeProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void initInfluxDb(WarehouseProperties properties) {
client.sslSocketFactory(defaultSslSocketFactory(), defaultTrustManager());
client.hostnameVerifier(noopHostnameVerifier());

WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProperties = properties.getStore().getInfluxdb();
WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProperties = properties.store().influxdb();
this.influxDb = InfluxDBFactory.connect(influxdbProperties.serverUrl(), influxdbProperties.username(), influxdbProperties.password(), client);
// Close it if your application is terminating, or you are not using it anymore.
Runtime.getRuntime().addShutdownHook(new Thread(influxDb::close));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class HistoryIotDbDataStorage extends AbstractHistoryDataStorage {
private long queryTimeoutInMs;

public HistoryIotDbDataStorage(WarehouseProperties properties) {
this.serverAvailable = this.initIotDbSession(properties.getStore().getIotDb());
this.serverAvailable = this.initIotDbSession(properties.store().iotDb());
}

private boolean initIotDbSession(WarehouseProperties.StoreProperties.IotDbProperties properties) {
Expand Down
Loading
Loading