From 0b95c4e4e86b1cab2843513ca6177de7c8721009 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Tue, 6 Feb 2024 20:11:19 +0200 Subject: [PATCH 1/2] OAP-228 Move oap-statsdb into the oap project --- oap-statsdb/oap-statsdb-master/pom.xml | 47 +++ .../src/main/java/oap/statsdb/JsonCodec.java | 52 +++ .../src/main/java/oap/statsdb/MongoNode.java | 25 ++ .../main/java/oap/statsdb/StatsDBMaster.java | 161 +++++++++ .../oap/statsdb/StatsDBMessageListener.java | 43 +++ .../main/java/oap/statsdb/StatsDBStorage.java | 40 +++ .../java/oap/statsdb/StatsDBStorageMongo.java | 142 ++++++++ .../java/oap/statsdb/StatsDBStorageNull.java | 25 ++ .../oap/statsdb/StatsDBTransportMock.java | 45 +++ .../META-INF/oap-messages.properties | 1 + .../test/java/oap/statsdb/StatsDBTest.java | 328 ++++++++++++++++++ .../META-INF/json-mapping.properties | 3 + .../src/test/resources/logback-test.xml | 43 +++ oap-statsdb/oap-statsdb/pom.xml | 35 ++ .../src/main/java/oap/statsdb/IStatsDB.java | 39 +++ .../src/main/java/oap/statsdb/Node.java | 129 +++++++ .../src/main/java/oap/statsdb/NodeId.java | 68 ++++ .../src/main/java/oap/statsdb/NodeSchema.java | 82 +++++ .../main/java/oap/statsdb/RemoteStatsDB.java | 70 ++++ .../src/main/java/oap/statsdb/StatsDB.java | 263 ++++++++++++++ .../main/java/oap/statsdb/StatsDBNode.java | 112 ++++++ .../java/oap/statsdb/StatsDBTransport.java | 8 + .../oap/statsdb/StatsDBTransportMessage.java | 22 ++ .../META-INF/json-mapping.properties | 2 + oap-statsdb/pom.xml | 18 + pom.xml | 1 + 26 files changed, 1804 insertions(+) create mode 100644 oap-statsdb/oap-statsdb-master/pom.xml create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/JsonCodec.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/MongoNode.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMaster.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMessageListener.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorage.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageMongo.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageNull.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBTransportMock.java create mode 100644 oap-statsdb/oap-statsdb-master/src/main/resources/META-INF/oap-messages.properties create mode 100644 oap-statsdb/oap-statsdb-master/src/test/java/oap/statsdb/StatsDBTest.java create mode 100644 oap-statsdb/oap-statsdb-master/src/test/resources/META-INF/json-mapping.properties create mode 100644 oap-statsdb/oap-statsdb-master/src/test/resources/logback-test.xml create mode 100644 oap-statsdb/oap-statsdb/pom.xml create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/IStatsDB.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/Node.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeId.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeSchema.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/RemoteStatsDB.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDB.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBNode.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransport.java create mode 100644 oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransportMessage.java create mode 100644 oap-statsdb/oap-statsdb/src/main/resources/META-INF/json-mapping.properties create mode 100644 oap-statsdb/pom.xml diff --git a/oap-statsdb/oap-statsdb-master/pom.xml b/oap-statsdb/oap-statsdb-master/pom.xml new file mode 100644 index 0000000000..eb4a96fe76 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + oap-stats-db-master + oap-stats-db-master + + + oap + oap-statsdb-parent + ${oap.project.version} + + + + + oap + oap-statsdb + ${project.version} + + + + oap + oap-stdlib + ${project.version} + + + oap + oap-storage-mongo + ${project.version} + + + oap + oap-storage-mongo-test + ${project.version} + test + + + + org.projectlombok + lombok + ${oap.deps.lombok.version} + provided + + + diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/JsonCodec.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/JsonCodec.java new file mode 100644 index 0000000000..df0b578355 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/JsonCodec.java @@ -0,0 +1,52 @@ +package oap.statsdb; + +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import lombok.SneakyThrows; +import oap.json.Binder; +import oap.reflect.TypeRef; +import org.bson.BsonReader; +import org.bson.BsonWriter; +import org.bson.Document; +import org.bson.codecs.Codec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.DocumentCodec; +import org.bson.codecs.EncoderContext; + +/** + * Created by igor.petrenko on 26.03.2019. + */ +class JsonNodeCodec implements Codec { + private final DocumentCodec documentCodec; + private ObjectWriter fileWriter; + private ObjectReader fileReader; + + JsonNodeCodec() { + this.documentCodec = new DocumentCodec(); + var ref = new TypeRef() { + }; + this.fileReader = Binder.json.readerFor( ref ); + this.fileWriter = Binder.json.writerFor( ref ); + } + + @SneakyThrows + @Override + public MongoNode decode( BsonReader bsonReader, DecoderContext decoderContext ) { + var doc = documentCodec.decode( bsonReader, decoderContext ); + + return fileReader.readValue( Binder.json.marshal( doc ) ); + } + + @SneakyThrows + @Override + public void encode( BsonWriter bsonWriter, MongoNode data, EncoderContext encoderContext ) { + var doc = Document.parse( fileWriter.writeValueAsString( data ) ); + + documentCodec.encode( bsonWriter, doc, encoderContext ); + } + + @Override + public Class getEncoderClass() { + return MongoNode.class; + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/MongoNode.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/MongoNode.java new file mode 100644 index 0000000000..809f51f2ce --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/MongoNode.java @@ -0,0 +1,25 @@ +package oap.statsdb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import java.util.Map; + +/** + * Created by igor.petrenko on 26.03.2019. + */ +@ToString +@EqualsAndHashCode( of = { "_id" } ) +public class MongoNode { + @SuppressWarnings( "checkstyle:MemberName" ) + public final Map _id; + public final Node n; + + @JsonCreator + @SuppressWarnings( "checkstyle:ParameterName" ) + public MongoNode( Map _id, Node n ) { + this._id = _id; + this.n = n; + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMaster.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMaster.java new file mode 100644 index 0000000000..e917625475 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMaster.java @@ -0,0 +1,161 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import lombok.extern.slf4j.Slf4j; +import oap.statsdb.RemoteStatsDB.Sync; +import oap.util.Lists; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Slf4j +public class StatsDBMaster extends StatsDB implements Closeable, Runnable { + private final StatsDBStorage storage; + + public StatsDBMaster( NodeSchema schema, StatsDBStorage storage ) { + super( schema ); + this.storage = storage; + + db.putAll( storage.load( schema ) ); + init( db.values() ); + } + + private void merge( String key, Node masterNode, Node rNode, List> retList, int level ) { + if( log.isTraceEnabled() ) + log.trace( "merge {}/{}[{}]::{}", schema.get( level ).key, schema.get( level ).clazz, level, key ); + assert Objects.equals( masterNode.v.getClass(), rNode.v.getClass() ) + : "[" + level + "]/" + key + "::" + masterNode.v.getClass() + " vs " + rNode.v.getClass(); + + var list = merge( masterNode.db, rNode.db, retList, level ); + list.forEach( l -> l.add( 0, key ) ); + + retList.addAll( list ); + + var ret = masterNode.merge( rNode ); + if( !ret ) { + var k = new ArrayList(); + k.add( key ); + retList.add( k ); + } + } + + private List> merge( Map masterDB, Map remoteDB, List> retList, int level ) { + for( var entry : remoteDB.entrySet() ) { + var key = entry.getKey(); + var rNode = entry.getValue(); + + var masterNode = masterDB.computeIfAbsent( key, k -> new Node( schema.get( level + 1 ).newInstance() ) ); + + merge( key, masterNode, rNode, retList, level + 1 ); + } + + return retList; + } + + private List> merge( ArrayList remoteDB ) { + assert remoteDB != null; + + var retList = new ArrayList>(); + + var remoteDbTree = toTree( remoteDB ); + + remoteDbTree.forEach( ( key, rnode ) -> { + var mnode = db.computeIfAbsent( key, k -> new Node( schema.get( 0 ).newInstance() ) ); + + merge( key, mnode, rnode, retList, 0 ); + updateAggregates( mnode ); + } ); + + return retList; + } + + private Map toTree( ArrayList remoteDB ) { + var ret = new HashMap(); + + for( var nodeIdNode : remoteDB ) { + var node = nodeIdNode.node; + var nodeId = nodeIdNode.nodeId; + + Node treeNode = null; + for( var i = 0; i < nodeId.size(); i++ ) { + var key = nodeId.get( i ); + var finalI = i; + treeNode = ( treeNode != null ? treeNode.db + : ret ).computeIfAbsent( key, k -> new Node( schema.get( finalI ).newInstance() ) ); + } + + treeNode.set( node ); + } + + return ret; + } + + @SuppressWarnings( "unchecked" ) + private void init( Collection nodes ) { + nodes.forEach( node -> { + if( node.v instanceof Node.Container ) { + init( node.db.values() ); + ( ( Node.Container ) node.v ).aggregate( Lists.map( node.db.values(), b -> b.v ) ); + } + } ); + } + + public boolean update( Sync sync, String host ) { + assert sync != null; + assert sync.data != null; + + synchronized( host.intern() ) { + var failedKeys = merge( sync.data ); + + if( !failedKeys.isEmpty() ) { + log.error( "failed keys:" ); + failedKeys.forEach( key -> log.error( "[{}]: {}", host, key ) ); + } + + return true; + } + } + + public void reset() { + removeAll(); + storage.removeAll(); + } + + @Override + public void close() { + storage.store( schema, db ); + } + + @Override + public void run() { + storage.store( schema, db ); + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMessageListener.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMessageListener.java new file mode 100644 index 0000000000..a7b0cf07bd --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBMessageListener.java @@ -0,0 +1,43 @@ +package oap.statsdb; + +import lombok.extern.slf4j.Slf4j; +import oap.json.Binder; +import oap.message.MessageListener; +import oap.message.MessageProtocol; + +import java.io.ByteArrayInputStream; + +import static oap.statsdb.StatsDBTransportMessage.MESSAGE_TYPE; + +/** + * Created by igor.petrenko on 2019-12-17. + */ +@Slf4j +public class StatsDBMessageListener implements MessageListener { + private final StatsDBMaster master; + + public StatsDBMessageListener( StatsDBMaster master ) { + this.master = master; + } + + @Override + public byte getId() { + return MESSAGE_TYPE; + } + + @Override + public String getInfo() { + return "stats-db"; + } + + @Override + public short run( int version, String hostName, int size, byte[] data, String md5 ) { + log.trace( "new stats version {} hostName {} size {} md5 {} data '{}'", + version, hostName, size, md5, new String( data ) ); + + var sync = Binder.json.unmarshal( RemoteStatsDB.Sync.class, new ByteArrayInputStream( data ) ); + master.update( sync, hostName ); + + return MessageProtocol.STATUS_OK; + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorage.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorage.java new file mode 100644 index 0000000000..617010aea1 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorage.java @@ -0,0 +1,40 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import java.util.Map; + +/** + * Created by igor.petrenko on 26.03.2019. + */ +public interface StatsDBStorage { + StatsDBStorage NULL = new StatsDBStorageNull(); + + Map load( NodeSchema schema ); + + void store( NodeSchema schema, Map db ); + + void removeAll(); +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageMongo.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageMongo.java new file mode 100644 index 0000000000..89ade974be --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageMongo.java @@ -0,0 +1,142 @@ +package oap.statsdb; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.ReplaceOneModel; +import com.mongodb.client.model.ReplaceOptions; +import com.mongodb.client.model.WriteModel; +import lombok.extern.slf4j.Slf4j; +import oap.reflect.TypeRef; +import oap.storage.mongo.MongoClient; +import org.apache.commons.lang3.mutable.MutableInt; +import org.bson.BsonDocument; +import org.bson.codecs.configuration.CodecRegistries; +import org.joda.time.DateTimeUtils; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static com.mongodb.client.model.Filters.eq; + +/** + * Created by igor.petrenko on 26.03.2019. + */ +@Slf4j +public class StatsDBStorageMongo implements StatsDBStorage, Closeable { + private static final ReplaceOptions REPLACE_OPTIONS_UPSERT = new ReplaceOptions().upsert( true ); + + private final MongoCollection collection; + public int bulkSize = 1000; + private long lastFsync = -1; + + public StatsDBStorageMongo( MongoClient mongoClient, String table ) { + var ref = new TypeRef() { + }; + + var codecRegistry = CodecRegistries.fromRegistries( + CodecRegistries.fromCodecs( new JsonNodeCodec() ), + mongoClient.getCodecRegistry() + ); + + this.collection = mongoClient + .getCollection( table, ref.clazz() ) + .withCodecRegistry( codecRegistry ); + + } + + @Override + public Map load( NodeSchema schema ) { + log.debug( "load {}", schema ); + final Map db = new HashMap<>(); + + final Consumer cons = node -> { + assert node.n.db.isEmpty(); + + var cdb = db; + for( int i = 0; i < node._id.size() - 1; i++ ) { + var nc = schema.get( i ); + var key = node._id.get( nc.key ); + cdb = cdb.computeIfAbsent( key, k -> new Node( nc.newInstance() ) ).db; + } + + var lastId = node._id.get( schema.get( node._id.size() - 1 ).key ); + var lastNode = cdb.get( lastId ); + if( lastNode == null ) { + cdb.put( lastId, node.n ); + } else { + cdb.put( lastId, node.n ); + node.n.db.putAll( lastNode.db ); + } + }; + + collection.find().forEach( cons ); + + lastFsync = DateTimeUtils.currentTimeMillis(); + + return db; + } + + @Override + public void store( NodeSchema schema, Map db ) { + log.debug( "store {}", schema ); + var count = 0; + + var now = DateTimeUtils.currentTimeMillis(); + + var bulk = new ArrayList>(); + count += store( schema, 0, new HashMap<>(), db, bulk ); + if( !bulk.isEmpty() ) { + collection.bulkWrite( bulk ); + count += bulk.size(); + } + + lastFsync = now; + + log.debug( "[{}] fsync modified: {}", collection.getNamespace(), count ); + } + + private int store( NodeSchema schema, int index, Map id, + Map db, ArrayList> bulk ) { + if( db.isEmpty() ) return 0; + + if( index < 0 || index >= schema.size() ) { + throw new IllegalArgumentException( "index '" + index + "' is out of bounds [0.." + schema.size() + ")" ); + } + + var count = new MutableInt(); + + db.forEach( ( key, value ) -> { + var newId = new HashMap<>( id ); + newId.put( schema.get( index ).key, key ); + + if( value.mt >= lastFsync ) { + bulk.add( new ReplaceOneModel<>( eq( "_id", newId ), new MongoNode( newId, new Node( value.ct, value.mt, value.v ) ), REPLACE_OPTIONS_UPSERT ) ); + if( bulk.size() >= bulkSize ) { + collection.bulkWrite( bulk ); + count.add( bulk.size() ); + bulk.clear(); + } + } + + count.add( store( schema, index + 1, newId, value.db, bulk ) ); + } ); + + return count.intValue(); + } + + @Override + public void removeAll() { + collection.deleteMany( new BsonDocument() ); + } + + @Override + public void close() { + } + + public void insertMany( List stats ) { + collection.insertMany( stats ); + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageNull.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageNull.java new file mode 100644 index 0000000000..8afeb2cf9d --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBStorageNull.java @@ -0,0 +1,25 @@ +package oap.statsdb; + +import java.util.Map; + +import static java.util.Collections.emptyMap; + +/** + * Created by igor.petrenko on 26.03.2019. + */ +public class StatsDBStorageNull implements StatsDBStorage { + @Override + public Map load( NodeSchema schema ) { + return emptyMap(); + } + + @Override + public void store( NodeSchema schema, Map db ) { + + } + + @Override + public void removeAll() { + + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBTransportMock.java b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBTransportMock.java new file mode 100644 index 0000000000..1d30459435 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/java/oap/statsdb/StatsDBTransportMock.java @@ -0,0 +1,45 @@ +package oap.statsdb; + +import oap.net.Inet; + +import java.util.ArrayList; +import java.util.function.Function; + +/** + * Created by igor.petrenko on 2019-12-18. + */ +public class StatsDBTransportMock implements StatsDBTransport { + public final ArrayList syncs = new ArrayList<>(); + private final StatsDBMaster master; + private Function exceptionFunc; + + public StatsDBTransportMock() { + this( null ); + } + + public StatsDBTransportMock( StatsDBMaster master ) { + this.master = master; + } + + @Override + public void sendAsync( RemoteStatsDB.Sync sync ) { + if( exceptionFunc != null ) throw exceptionFunc.apply( sync ); + + syncs.add( sync ); + + if( master != null ) master.update( sync, Inet.HOSTNAME ); + } + + public void syncWithException( Function exceptionFunc ) { + this.exceptionFunc = exceptionFunc; + } + + public void syncWithoutException() { + this.exceptionFunc = null; + } + + public void reset() { + syncs.clear(); + exceptionFunc = null; + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/main/resources/META-INF/oap-messages.properties b/oap-statsdb/oap-statsdb-master/src/main/resources/META-INF/oap-messages.properties new file mode 100644 index 0000000000..850d318478 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/main/resources/META-INF/oap-messages.properties @@ -0,0 +1 @@ +type.STATS = 10 diff --git a/oap-statsdb/oap-statsdb-master/src/test/java/oap/statsdb/StatsDBTest.java b/oap-statsdb/oap-statsdb-master/src/test/java/oap/statsdb/StatsDBTest.java new file mode 100644 index 0000000000..a50f8e4431 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/test/java/oap/statsdb/StatsDBTest.java @@ -0,0 +1,328 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import oap.http.server.nio.NioHttpServer; +import oap.message.MessageHttpHandler; +import oap.message.MessageSender; +import oap.message.MessageSenderUtils; +import oap.storage.mongo.MongoFixture; +import oap.testng.EnvFixture; +import oap.testng.Fixtures; +import oap.testng.SystemTimerFixture; +import oap.testng.TestDirectoryFixture; +import oap.util.Cuid; +import org.joda.time.DateTimeUtils; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; + +import static oap.statsdb.NodeSchema.nc; +import static oap.testng.TestDirectoryFixture.testPath; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Created by igor.petrenko on 08.09.2017. + */ +@Test +public class StatsDBTest extends Fixtures { + + static final NodeSchema schema2 = new NodeSchema( + nc( "n1", MockChild2.class ), + nc( "n2", MockValue.class ) ); + static final NodeSchema schema3 = new NodeSchema( + nc( "n1", MockChild1.class ), + nc( "n2", MockChild2.class ), + nc( "n3", MockValue.class ) ); + private static final MongoFixture MONGO_FIXTURE = new MongoFixture(); + private final EnvFixture envFixture; + + { + envFixture = new EnvFixture(); + + fixture( MONGO_FIXTURE ); + fixture( TestDirectoryFixture.FIXTURE ); + fixture( SystemTimerFixture.FIXTURE ); + fixture( envFixture ); + } + + @Test + public void testEmptySync() { + try( var master = new StatsDBMaster( schema3, StatsDBStorage.NULL ); + var node = new StatsDBNode( schema3, new StatsDBTransportMock( master ), null ) ) { + + assertThat( node.lastSyncSuccess ).isFalse(); + node.sync(); + assertThat( node.lastSyncSuccess ).isTrue(); + } + } + + @Test + public void children() { + try( var master = new StatsDBMaster( schema2, StatsDBStorage.NULL ) ) { + master.update( "k1", "k2", c -> c.v = 10 ); + master.update( "k1", "k3", c -> c.v = 3 ); + master.update( "k2", "k4", c -> c.v = 4 ); + master.update( "k1", c -> c.vc = 10 ); + + + assertThat( master.children( "k1" ) ) + .hasSize( 2 ) + .contains( new MockValue( 10 ) ) + .contains( new MockValue( 3 ) ); + + assertThat( master.children( "k2" ) ) + .hasSize( 1 ) + .contains( new MockValue( 4 ) ); + + assertThat( master.children( "unknown" ) ).isEmpty(); + assertThat( master.children( "k1", "k2" ) ).isEmpty(); + } + } + + @Test + public void mergeChild() { + try( var master = new StatsDBMaster( schema3, StatsDBStorage.NULL ); + var node = new StatsDBNode( schema3, new StatsDBTransportMock( master ) ) ) { + + node.update( "p1", p -> p.vc += 1 ); + node.update( "p1", "c2", c -> c.vc += 1 ); + node.update( "p1", "c2", "c3", c -> c.v += 2 ); + node.sync(); + + assertThat( master.get( "p1" ).vc ).isEqualTo( 1 ); + assertThat( master.get( "p1" ).sum ).isEqualTo( 2 ); + assertThat( master.get( "p1" ).sum2 ).isEqualTo( 1 ); + + node.update( "p1", p -> p.vc += 1 ); + node.update( "p1", "c2", c -> c.vc += 2 ); + node.sync(); + + node.update( "p1", "c2", "c3", c -> c.v += 2 ); + node.sync(); + + assertThat( master.get( "p1" ).vc ).isEqualTo( 2 ); + assertThat( master.get( "p1" ).sum ).isEqualTo( 4 ); + assertThat( master.get( "p1" ).sum2 ).isEqualTo( 3 ); + + assertThat( master.get( "p1", "c2" ).vc ).isEqualTo( 3 ); + assertThat( master.get( "p1", "c2" ).sum ).isEqualTo( 4 ); + + assertThat( master.get( "p1", "c2", "c3" ).v ).isEqualTo( 4 ); + } + } + + @Test + public void persistMaster() { + try( var masterStorage = new StatsDBStorageMongo( MONGO_FIXTURE.client(), "test" ); + StatsDBMaster master = new StatsDBMaster( schema3, masterStorage ) ) { + master.update( "k1", "k2", "k3", c -> c.v += 8 ); + master.update( "k1", "k2", "k3", c -> c.v += 2 ); + master.update( "k1", "k2", "k33", c -> c.v += 1 ); + master.update( "k1", c -> c.vc += 111 ); + } + + try( var masterStorage = new StatsDBStorageMongo( MONGO_FIXTURE.client(), "test" ); + StatsDBMaster master = new StatsDBMaster( schema3, masterStorage ) ) { + assertThat( master.get( "k1", "k2", "k3" ).v ).isEqualTo( 10 ); + + assertThat( master.get( "k1" ).sum ).isEqualTo( 11L ); + assertThat( master.get( "k1" ).sum2 ).isEqualTo( 0L ); + } + } + + @Test + public void sync() { + try( var masterStorage = new StatsDBStorageMongo( MONGO_FIXTURE.client(), "test" ); + var master = new StatsDBMaster( schema2, masterStorage ); + var node = new StatsDBNode( schema2, new StatsDBTransportMock( master ) ) ) { + node.sync(); + + node.update( "k1", "k2", c -> c.v += 10 ); + node.update( "k1", "k3", c -> c.v += 1 ); + node.update( "k1", c -> c.vc += 20 ); + + node.sync(); + assertThat( node.get( "k1", "k2" ) ).isNull(); + assertThat( master.get( "k1", "k2" ).v ).isEqualTo( 10L ); + assertThat( master.get( "k1" ).vc ).isEqualTo( 20L ); + assertThat( master.get( "k1" ).sum ).isEqualTo( 11L ); + + node.update( "k1", "k2", c -> c.v += 10 ); + node.update( "k1", c -> c.vc += 21 ); + + node.sync(); + assertThat( node.get( "k1", "k2" ) ).isNull(); + assertThat( master.get( "k1", "k2" ).v ).isEqualTo( 20 ); + assertThat( master.get( "k1" ).vc ).isEqualTo( 41 ); + assertThat( master.get( "k1" ).sum ).isEqualTo( 21L ); + } + } + + @Test + public void calculatedValuesAfterRestart() { + try( var masterStorage = new StatsDBStorageMongo( MONGO_FIXTURE.client(), "test" ); + var master = new StatsDBMaster( schema2, masterStorage ); + var node = new StatsDBNode( schema2, new StatsDBTransportMock( master ) ) ) { + node.sync(); + + node.update( "k1", "k2", c -> c.v += 10 ); + node.update( "k1", "k3", c -> c.v += 1 ); + node.update( "k1", c -> c.vc += 20 ); + } + + try( var masterStorage = new StatsDBStorageMongo( MONGO_FIXTURE.client(), "test" ); + var master = new StatsDBMaster( schema2, masterStorage ) ) { + assertThat( master.get( "k1" ).sum ).isEqualTo( 11L ); + } + } + + @Test + public void syncFailed() { + var transport = new StatsDBTransportMock(); + + try( var node = new StatsDBNode( schema2, transport ) ) { + transport.syncWithException( sync -> new RuntimeException( "sync" ) ); + node.update( "k1", "k2", c -> c.v += 10 ); + node.sync(); + assertThat( node.get( "k1", "k2" ) ).isNull(); + transport.syncWithoutException(); + node.update( "k1", "k2", c -> c.v += 10 ); + } + + assertThat( transport.syncs ).hasSize( 1 ); + } + + @Test + public void version() throws IOException { + int port = envFixture.portFor( getClass() ); + Path controlStatePath = testPath( "controlStatePath.st" ); + + DateTimeUtils.setCurrentMillisFixed( 100 ); + + var uid = Cuid.incremental( 0 ); + try( var master = new StatsDBMaster( schema2, StatsDBStorage.NULL ); + var server = new NioHttpServer( new NioHttpServer.DefaultPort( port ) ); + var messageHttpHandler = new MessageHttpHandler( server, "/messages", controlStatePath, List.of( new StatsDBMessageListener( master ) ), -1 ); + var client = new MessageSender( "localhost", port, "/messages", TestDirectoryFixture.testPath( "msend" ), -1 ); + var node = new StatsDBNode( schema2, new StatsDBTransportMessage( client ), uid ) ) { + server.bind( "/messages", messageHttpHandler ); + client.start(); + server.start(); + messageHttpHandler.preStart(); + + uid.reset( 0 ); + + node.update( "k1", c -> c.vc += 20 ); + node.sync(); + client.syncMemory(); + MessageSenderUtils.waitSendAll( client, 5000, 50 ); + assertThat( master.get( "k1" ).vc ).isEqualTo( 20L ); + + uid.reset( 0 ); + node.update( "k1", c -> c.vc += 20 ); + node.sync(); + client.syncMemory(); + MessageSenderUtils.waitSendAll( client, 5000, 50 ); + assertThat( master.get( "k1" ).vc ).isEqualTo( 20L ); + } + } + + @ToString + @EqualsAndHashCode + public static class MockValue implements Node.Value { + public long v; + + public MockValue() { + } + + public MockValue( long v ) { + this.v = v; + } + + @Override + public MockValue merge( MockValue other ) { + v += other.v; + + return this; + } + } + + @ToString + @EqualsAndHashCode + public static class MockChild2 implements Node.Container { + public long vc; + @JsonIgnore + public long sum; + + public MockChild2() { + } + + @Override + public MockChild2 merge( MockChild2 other ) { + vc += other.vc; + + return this; + } + + @Override + public MockChild2 aggregate( List children ) { + sum = children.stream().mapToLong( c -> c.v ).sum(); + return this; + } + } + + @ToString + @EqualsAndHashCode + public static class MockChild1 implements Node.Container { + public long vc; + @JsonIgnore + public long sum; + @JsonIgnore + public long sum2; + + public MockChild1() { + } + + @Override + public MockChild1 merge( MockChild1 other ) { + vc += other.vc; + + return this; + } + + @Override + public MockChild1 aggregate( List children ) { + sum = children.stream().mapToLong( c -> c.sum ).sum(); + sum2 = children.stream().mapToLong( c -> c.vc ).sum(); + return this; + } + } +} diff --git a/oap-statsdb/oap-statsdb-master/src/test/resources/META-INF/json-mapping.properties b/oap-statsdb/oap-statsdb-master/src/test/resources/META-INF/json-mapping.properties new file mode 100644 index 0000000000..cfdbabdd35 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/test/resources/META-INF/json-mapping.properties @@ -0,0 +1,3 @@ +mock-value=oap.statsdb.StatsDBTest$MockValue +mock-child2=oap.statsdb.StatsDBTest$MockChild2 +mock-child1=oap.statsdb.StatsDBTest$MockChild1 diff --git a/oap-statsdb/oap-statsdb-master/src/test/resources/logback-test.xml b/oap-statsdb/oap-statsdb-master/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..d3a103b366 --- /dev/null +++ b/oap-statsdb/oap-statsdb-master/src/test/resources/logback-test.xml @@ -0,0 +1,43 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + diff --git a/oap-statsdb/oap-statsdb/pom.xml b/oap-statsdb/oap-statsdb/pom.xml new file mode 100644 index 0000000000..92f9453089 --- /dev/null +++ b/oap-statsdb/oap-statsdb/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + oap-statsdb + oap-statsdb + + + oap + oap-statsdb-parent + ${oap.project.version} + + + + + oap + oap-stdlib + ${project.version} + + + oap + oap-message + ${project.version} + + + + org.projectlombok + lombok + ${oap.deps.lombok.version} + provided + + + diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/IStatsDB.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/IStatsDB.java new file mode 100644 index 0000000000..4ef6003d04 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/IStatsDB.java @@ -0,0 +1,39 @@ +package oap.statsdb; + +import java.util.function.Consumer; + +/** + * Created by igor.petrenko on 2021-02-22. + */ +@SuppressWarnings( "checkstyle:AbstractClassName" ) +public abstract class IStatsDB { + public abstract void removeAll(); + + protected > void update( String key1, Consumer update ) { + update( new String[] { key1 }, update ); + } + + protected > void update( String key1, String key2, Consumer update ) { + update( new String[] { key1, key2 }, update ); + } + + protected > void update( String key1, String key2, String key3, Consumer update ) { + update( new String[] { key1, key2, key3 }, update ); + } + + protected > void update( String key1, String key2, String key3, String key4, Consumer update ) { + update( new String[] { key1, key2, key3, key4 }, update ); + } + + protected > void update( String key1, String key2, String key3, String key4, String key5, Consumer update ) { + update( new String[] { key1, key2, key3, key4, key5 }, update ); + } + + protected > void update( String key1, String key2, String key3, String key4, String key5, String key6, Consumer update ) { + update( new String[] { key1, key2, key3, key4, key5, key6 }, update ); + } + + protected abstract > void update( String[] keys, Consumer update ); + + public abstract > V get( String... key ); +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/Node.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/Node.java new file mode 100644 index 0000000000..6f0412f5c8 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/Node.java @@ -0,0 +1,129 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import com.google.common.base.Preconditions; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import oap.json.TypeIdFactory; +import oap.util.Mergeable; +import org.joda.time.DateTimeUtils; + +import javax.annotation.Nonnull; +import java.io.Serial; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +@EqualsAndHashCode +@ToString +@Slf4j +public class Node implements Serializable { + @Serial + private static final long serialVersionUID = 4194048067764234L; + + public volatile ConcurrentHashMap db = new ConcurrentHashMap<>(); + @JsonTypeIdResolver( TypeIdFactory.class ) + @JsonTypeInfo( use = JsonTypeInfo.Id.CUSTOM, property = "o:t" ) + public Value v; + public long ct; + public long mt; + + public Node( Value v ) { + this( DateTimeUtils.currentTimeMillis(), v ); + } + + public Node( long ct, Value v ) { + this( ct, ct, v ); + } + + @JsonCreator + public Node( long mt, long ct, Value v ) { + this.mt = mt; + this.ct = ct; + this.v = v; + } + + public void set( @Nonnull Node node ) { + Preconditions.checkNotNull( node ); + + this.mt = node.mt; + this.ct = node.ct; + this.v = node.v; + } + + @SuppressWarnings( "unchecked" ) + synchronized > void updateValue( Consumer update ) { + assert v != null; + update.accept( ( V ) v ); + this.mt = DateTimeUtils.currentTimeMillis(); + } + + @SuppressWarnings( "unchecked" ) + public > V get( Iterator key ) { + Node obj = this; + + while( key.hasNext() ) { + var item = key.next(); + + if( obj == null ) return null; + + obj = obj.db.get( item ); + } + + if( obj == null ) return null; + + return ( V ) obj.v; + } + + @SuppressWarnings( "unchecked" ) + public boolean merge( Node node ) { + mt = DateTimeUtils.currentTimeMillis(); + if( v == null ) v = node.v; + else { + try { + if( node.v != null ) v.merge( node.v ); + } catch( Throwable t ) { + log.error( t.getMessage(), t ); + + return false; + } + } + return true; + } + + public interface Value> extends Mergeable, Serializable { + } + + public interface Container, TChild extends Value> extends Value { + T aggregate( List children ); + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeId.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeId.java new file mode 100644 index 0000000000..0b7f87a733 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeId.java @@ -0,0 +1,68 @@ +package oap.statsdb; + +import com.google.common.base.Preconditions; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import javax.annotation.Nonnull; +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.function.Consumer; + +/** + * Created by igor.petrenko on 2021-02-22. + */ +@ToString +@EqualsAndHashCode +public class NodeId implements Serializable, Iterable { + @Serial + private static final long serialVersionUID = 5556915438010788701L; + private final ArrayList keys = new ArrayList<>(); + + public NodeId() { + } + + public NodeId( List keys ) { + this.keys.addAll( keys ); + + validate( this.keys ); + } + + public NodeId( String... keys ) { + Collections.addAll( this.keys, keys ); + validate( this.keys ); + } + + private static void validate( ArrayList keys ) { + for( var key : keys ) Preconditions.checkNotNull( key ); + } + + @Override + @Nonnull + public Iterator iterator() { + return keys.iterator(); + } + + @Override + public void forEach( Consumer action ) { + keys.forEach( action ); + } + + @Override + public Spliterator spliterator() { + return keys.spliterator(); + } + + public int size() { + return keys.size(); + } + + public String get( int index ) { + return keys.get( index ); + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeSchema.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeSchema.java new file mode 100644 index 0000000000..1e9d349342 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/NodeSchema.java @@ -0,0 +1,82 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import lombok.SneakyThrows; +import lombok.ToString; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; + +@ToString( callSuper = true ) +public class NodeSchema extends ArrayList implements Serializable { + @Serial + private static final long serialVersionUID = 1625813602788861879L; + + private final Map> cons = new HashMap<>(); + + public NodeSchema() { + } + + @SafeVarargs + public NodeSchema( NodeConfiguration... confs ) { + this( asList( confs ) ); + } + + public NodeSchema( List> confs ) { + super( confs ); + for( var c : confs ) { + cons.put( c.key, c.clazz ); + } + } + + public static NodeConfiguration nc( String key, Class clazz ) { + return new NodeConfiguration<>( key, clazz ); + } + + @ToString + public static class NodeConfiguration implements Serializable { + private static final long serialVersionUID = -2296344454378267699L; + + public final String key; + public final Class clazz; + + public NodeConfiguration( String key, Class clazz ) { + this.key = key; + this.clazz = clazz; + } + + @SneakyThrows + public Node.Value newInstance() { + return clazz.getDeclaredConstructor().newInstance(); + } + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/RemoteStatsDB.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/RemoteStatsDB.java new file mode 100644 index 0000000000..b3fb495dd9 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/RemoteStatsDB.java @@ -0,0 +1,70 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.ToString; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; + +public interface RemoteStatsDB { + @ToString + class Sync implements Serializable { + @Serial + private static final long serialVersionUID = 6835215675536753051L; + + public final ArrayList data; + public final String id; + + @JsonCreator + public Sync( ArrayList data, String id ) { + this.data = data; + this.id = id; + } + + @JsonIgnore + public final boolean isEmpty() { + return data.isEmpty(); + } + + @ToString + static class NodeIdNode implements Serializable { + @Serial + private static final long serialVersionUID = 1612321099236706698L; + + public final NodeId nodeId; + public final Node node; + + @JsonCreator + NodeIdNode( NodeId nodeId, Node node ) { + this.nodeId = nodeId; + this.node = node; + } + } + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDB.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDB.java new file mode 100644 index 0000000000..6606d9caf8 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDB.java @@ -0,0 +1,263 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; + +@Slf4j +@SuppressWarnings( "checkstyle:AbstractClassName" ) +public abstract class StatsDB extends IStatsDB { + protected final NodeSchema schema; + protected volatile ConcurrentHashMap db = new ConcurrentHashMap<>(); + + public StatsDB( NodeSchema schema ) { + this.schema = schema; + } + + @SuppressWarnings( "unchecked" ) + protected static void updateAggregates( Node mnode ) { + for( var node : mnode.db.values() ) { + updateAggregates( node ); + } + + var value = mnode.v; + if( value instanceof Node.Container ) { + ( ( Node.Container ) value ).aggregate( mnode.db.values().stream() + .map( n -> n.v ) + .filter( Objects::nonNull ) + .collect( toList() ) + ); + } + } + + protected > void update( String[] key, Consumer update ) { + assert key != null; + assert key.length > 0; + + var rootKey = key[0]; + + db.compute( rootKey, ( k, n ) -> { + Node newNode = n; + if( newNode == null ) { + newNode = new Node( schema.get( 0 ).newInstance() ); + } + + updateNode( key, update, newNode, schema ); + + return newNode; + } ); + } + + protected > + void update( String key1, Consumer update ) { + update( new String[] { key1 }, update ); + } + + protected > + void update( String key1, String key2, Consumer update ) { + update( new String[] { key1, key2 }, update ); + } + + protected > + void update( String key1, String key2, String key3, Consumer update ) { + update( new String[] { key1, key2, key3 }, update ); + } + + protected > + void update( String key1, String key2, String key3, String key4, Consumer update ) { + update( new String[] { key1, key2, key3, key4 }, update ); + } + + protected > + void update( String key1, String key2, String key3, String key4, String key5, Consumer update ) { + update( new String[] { key1, key2, key3, key4, key5 }, update ); + } + + @SuppressWarnings( "unchecked" ) + public > V get( String... key ) { + var node = getNode( key ); + return node != null ? ( V ) node.v : null; + } + + @SuppressWarnings( "checkstyle:MethodName" ) + public Node _getNode( String[] key, int position, Node node ) { + if( node == null ) return null; + if( position >= key.length ) return node; + + return _getNode( key, position + 1, node.db.get( key[position] ) ); + } + + protected Node getNode( String... key ) { + if( key.length == 0 ) return null; + + return _getNode( key, 1, db.get( key[0] ) ); + } + + public > Stream children( String... key ) { + if( key.length == 0 ) return Stream.empty(); + + return _children( key, 1, db.get( key[0] ) ); + } + + @SuppressWarnings( { "unchecked", "checkstyle:MethodName" } ) + private > Stream _children( String[] key, int position, Node node ) { + if( node == null ) return Stream.empty(); + if( position >= key.length ) return node.db.values().stream().map( n -> ( V ) n.v ); + + return _children( key, position + 1, node.db.get( key[position] ) ); + } + + public > N updateNode( String[] key, + Consumer update, + N node, + NodeSchema schema ) { + Node tNode = node; + + for( int i = 1; i < key.length; i++ ) { + var keyItem = key[i]; + var finalI = i; + tNode = tNode.db.computeIfAbsent( keyItem, k -> new Node( schema.get( finalI ).newInstance() ) ); + } + + tNode.updateValue( update ); + + return node; + } + + public synchronized void removeAll() { + db.clear(); + } + + @SuppressWarnings( "unchecked" ) + public , T2 extends Node.Value> Stream> select2() { + return + db.entrySet().stream() + .flatMap( e1 -> e1.getValue().db.entrySet().stream().map( + e2 -> new Select2<>( e1.getKey(), ( T1 ) e1.getValue().v, e2.getKey(), ( T2 ) e2.getValue().v ) ) ); + } + + @SuppressWarnings( "unchecked" ) + public , T2 extends Node.Value, T3 extends Node.Value> Stream> select3() { + return + db.entrySet().stream() + .flatMap( e1 -> e1.getValue().db.entrySet().stream().flatMap( + e2 -> e2.getValue().db.entrySet().stream().map( + e3 -> new Select3<>( e1.getKey(), ( T1 ) e1.getValue().v, + e2.getKey(), ( T2 ) e2.getValue().v, + e3.getKey(), ( T3 ) e3.getValue().v ) ) ) ); + } + + @SuppressWarnings( "unchecked" ) + public , T2 extends Node.Value, T3 extends Node.Value, T4 extends Node.Value> Stream> select4() { + return + db.entrySet().stream() + .flatMap( e1 -> e1.getValue().db.entrySet().stream().flatMap( + e2 -> e2.getValue().db.entrySet().stream().flatMap( + e3 -> e3.getValue().db.entrySet().stream().map( + e4 -> new Select4<>( e1.getKey(), ( T1 ) e1.getValue().v, + e2.getKey(), ( T2 ) e2.getValue().v, + e3.getKey(), ( T3 ) e3.getValue().v, + e4.getKey(), ( T4 ) e4.getValue().v ) ) ) ) ); + } + + @SuppressWarnings( "unchecked" ) + public , T2 extends Node.Value, T3 extends Node.Value, T4 extends Node.Value, T5 extends Node.Value> Stream> select5() { + return + db.entrySet().stream() + .flatMap( e1 -> e1.getValue().db.entrySet().stream().flatMap( + e2 -> e2.getValue().db.entrySet().stream().flatMap( + e3 -> e3.getValue().db.entrySet().stream().flatMap( + e4 -> e4.getValue().db.entrySet().stream().map( + e5 -> new Select5<>( e1.getKey(), ( T1 ) e1.getValue().v, + e2.getKey(), ( T2 ) e2.getValue().v, + e3.getKey(), ( T3 ) e3.getValue().v, + e4.getKey(), ( T4 ) e4.getValue().v, + e5.getKey(), ( T5 ) e5.getValue().v ) ) ) ) ) ); + } + + @ToString + @AllArgsConstructor + public static class Select2, T2 extends Node.Value> { + public final String id1; + public final T1 v1; + public final String id2; + public final T2 v2; + } + + @ToString + @AllArgsConstructor + public static class Select3, T2 extends Node.Value, T3 extends Node.Value> implements Serializable { + private static final long serialVersionUID = 3812951337765151702L; + + public final String id1; + public final T1 v1; + public final String id2; + public final T2 v2; + public final String id3; + public final T3 v3; + } + + @ToString + @AllArgsConstructor + public static class Select4, T2 extends Node.Value, T3 extends Node.Value, T4 extends Node.Value> implements Serializable { + private static final long serialVersionUID = 7466796137360157099L; + + public final String id1; + public final T1 v1; + public final String id2; + public final T2 v2; + public final String id3; + public final T3 v3; + public final String id4; + public final T4 v4; + } + + @ToString + @AllArgsConstructor + public static class Select5, T2 extends Node.Value, T3 extends Node.Value, T4 extends Node.Value, T5 extends Node.Value> implements Serializable { + private static final long serialVersionUID = -8184723490764842795L; + + public final String id1; + public final T1 v1; + public final String id2; + public final T2 v2; + public final String id3; + public final T3 v3; + public final String id4; + public final T4 v4; + public final String id5; + public final T5 v5; + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBNode.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBNode.java new file mode 100644 index 0000000000..2e3811e099 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBNode.java @@ -0,0 +1,112 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.statsdb; + +import lombok.extern.slf4j.Slf4j; +import oap.statsdb.RemoteStatsDB.Sync; +import oap.util.Cuid; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +@Slf4j +public class StatsDBNode extends IStatsDB implements Runnable, Closeable { + public final ConcurrentHashMap nodes = new ConcurrentHashMap<>(); + protected final NodeSchema schema; + private final StatsDBTransport transport; + private final Cuid timestamp; + protected boolean lastSyncSuccess = false; + + public StatsDBNode( NodeSchema schema, StatsDBTransport transport ) { + this( schema, transport, Cuid.UNIQUE ); + } + + public StatsDBNode( NodeSchema schema, StatsDBTransport transport, Cuid timestamp ) { + this.schema = schema; + this.transport = transport; + this.timestamp = timestamp; + } + + public synchronized void sync() { + try { + var snapshot = snapshot(); + if( !snapshot.isEmpty() ) { + var sync = new Sync( snapshot, timestamp.next() ); + transport.sendAsync( sync ); + } + + lastSyncSuccess = true; + } catch( Exception e ) { + lastSyncSuccess = false; + log.error( e.getMessage(), e ); + } + } + + private ArrayList snapshot() { + var ret = new ArrayList(); + for( var entry : new ArrayList<>( nodes.entrySet() ) ) { + ret.add( new Sync.NodeIdNode( entry.getKey(), entry.getValue() ) ); + nodes.remove( entry.getKey() ); + } + + return ret; + } + + @Override + public void run() { + sync(); + } + + @Override + public synchronized void removeAll() { + nodes.clear(); + } + + @Override + protected > void update( String[] keys, Consumer update ) { + nodes.compute( new NodeId( keys ), ( nid, n ) -> { + Node newNode = n; + if( newNode == null ) newNode = new Node( schema.get( keys.length - 1 ).newInstance() ); + newNode.updateValue( update ); + + return newNode; + } ); + } + + @Override + @SuppressWarnings( "unchecked" ) + public > V get( String... key ) { + var node = nodes.get( new NodeId( key ) ); + return node != null ? ( V ) node.v : null; + } + + @Override + public void close() { + log.info( "close" ); + sync(); + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransport.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransport.java new file mode 100644 index 0000000000..5f1cb5fd2b --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransport.java @@ -0,0 +1,8 @@ +package oap.statsdb; + +/** + * Created by igor.petrenko on 2019-12-17. + */ +public interface StatsDBTransport { + void sendAsync( RemoteStatsDB.Sync sync ); +} diff --git a/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransportMessage.java b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransportMessage.java new file mode 100644 index 0000000000..28df223e49 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/java/oap/statsdb/StatsDBTransportMessage.java @@ -0,0 +1,22 @@ +package oap.statsdb; + +import oap.io.content.ContentWriter; +import oap.message.MessageSender; + +/** + * Created by igor.petrenko on 2019-12-17. + */ +public class StatsDBTransportMessage implements StatsDBTransport { + public static final byte MESSAGE_TYPE = 10; + + private final MessageSender sender; + + public StatsDBTransportMessage( MessageSender sender ) { + this.sender = sender; + } + + @Override + public void sendAsync( RemoteStatsDB.Sync sync ) { + sender.send( MESSAGE_TYPE, sync, ContentWriter.ofJson() ); + } +} diff --git a/oap-statsdb/oap-statsdb/src/main/resources/META-INF/json-mapping.properties b/oap-statsdb/oap-statsdb/src/main/resources/META-INF/json-mapping.properties new file mode 100644 index 0000000000..0f9bb65369 --- /dev/null +++ b/oap-statsdb/oap-statsdb/src/main/resources/META-INF/json-mapping.properties @@ -0,0 +1,2 @@ +node=oap.statsdb.Node +nid=oap.statsdb.NodeId diff --git a/oap-statsdb/pom.xml b/oap-statsdb/pom.xml new file mode 100644 index 0000000000..3a57166b6e --- /dev/null +++ b/oap-statsdb/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + + + oap + oap + ${oap.project.version} + + + pom + oap-statsdb-parent + + + oap-statsdb + oap-statsdb-master + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index f2b3626b66..1cd705e0bb 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ oap-formats oap-storage oap-mail + oap-statsdb oap-highload oap-maven-plugin From d83a2a15a1f7f7f073f51095d54dfa33be9441a7 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Wed, 7 Feb 2024 07:35:29 +0200 Subject: [PATCH 2/2] OAP-228 Move oap-statsdb into the oap project --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1cd705e0bb..596992b964 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ - 21.9.1 + 21.10.0 21.0.0 21.0.1