Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug: Backend metrics of rocksdb disk usage can't work #326

Merged
merged 1 commit into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,71 @@

package com.baidu.hugegraph.backend.store.rocksdb;

import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;

import com.baidu.hugegraph.backend.store.BackendMetrics;
import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions.Session;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.InsertionOrderUtil;

public class RocksDBMetrics implements BackendMetrics {

private static final String INDEX_FILTER =
"rocksdb.estimate-table-readers-mem";
private static final String MEM_TABLE = "rocksdb.cur-size-all-mem-tables";
private static final String R_DATA_SIZE = "rocksdb.estimate-live-data-size";
public static final String BLOCK_CACHE = "rocksdb.block-cache-usage";
public static final String INDEX_FILTER =
"rocksdb.estimate-table-readers-mem";
public static final String MEM_TABLE = "rocksdb.cur-size-all-mem-tables";

private final Session session;
public static final String DISK_USAGE = "rocksdb.disk-usage";

public RocksDBMetrics(Session session) {
private final List<RocksDBSessions> dbs;
private final RocksDBSessions.Session session;

public RocksDBMetrics(List<RocksDBSessions> dbs,
RocksDBSessions.Session session) {
this.dbs = dbs;
this.session = session;
}

@Override
public Map<String, Object> getMetrics() {
Map<String, Object> metrics = InsertionOrderUtil.newMap();
// NOTE: the unit of rocksdb mem property is kb
metrics.put(MEM_USED, this.getMemUsed() / Bytes.BASE);
// NOTE: the unit of rocksdb mem property is bytes
metrics.put(MEM_USED, this.getMemUsed() / Bytes.MB);
metrics.put(MEM_UNIT, "MB");
String size = FileUtils.byteCountToDisplaySize(this.getDataSize());
metrics.put(DATA_SIZE, size);
return metrics;
}

private long getMemUsed() {
long indexFilter = Long.parseLong(this.session.property(INDEX_FILTER));
long memtable = Long.parseLong(this.session.property(MEM_TABLE));
return indexFilter + memtable;
private double getMemUsed() {
double blockCache = this.sum(this.session, BLOCK_CACHE);
double indexFilter = this.sum(this.session, INDEX_FILTER);
double memtable = this.sum(this.session, MEM_TABLE);
return blockCache + indexFilter + memtable;
}

private long getDataSize() {
return Long.parseLong(this.session.property(R_DATA_SIZE));
return (long) this.sum(DISK_USAGE);
}

private double sum(RocksDBSessions.Session session, String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
total += Double.parseDouble(db.property(property));
for (String table : db.openedTables()) {
total += Double.parseDouble(session.property(table, property));
}
}
return total;
}

private double sum(String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
total += Double.parseDouble(db.property(property));
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public RocksDBSessions(String database, String store) {
public abstract void createTable(String table) throws RocksDBException;
public abstract void dropTable(String table) throws RocksDBException;

public abstract String property(String property);

@Override
public abstract Session session();

Expand All @@ -54,7 +56,6 @@ public static abstract class Session extends BackendSession {
public static final int SCAN_LT_END = 0x10;
public static final int SCAN_LTE_END = 0x30;

public abstract String property(String property);
public abstract String property(String table, String property);

public abstract void put(String table, byte[] key, byte[] value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.DBOptionsInterface;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.MutableColumnFamilyOptionsInterface;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.SstFileManager;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

Expand All @@ -64,6 +66,7 @@ public class RocksDBStdSessions extends RocksDBSessions {

private final HugeConfig conf;
private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
Expand All @@ -77,6 +80,9 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
RocksDBStdSessions.initOptions(this.conf, options, options, options);
options.setWalDir(walPath);

this.sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(this.sstFileManager);

/*
* Open RocksDB at the first time
* Don't merge old CFs, we expect a clear DB when using this one
Expand Down Expand Up @@ -108,6 +114,9 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
RocksDBStdSessions.initOptions(this.conf, options, null, null);
options.setWalDir(walPath);

this.sstFileManager = new SstFileManager(Env.getDefault());
options.setSstFileManager(this.sstFileManager);

// Open RocksDB with CFs
List<ColumnFamilyHandle> cfhs = new ArrayList<>();
this.rocksdb = RocksDB.open(options, dataPath, cfds, cfhs);
Expand Down Expand Up @@ -164,6 +173,18 @@ public void dropTable(String table) throws RocksDBException {
this.cfs.remove(table);
}

@Override
public String property(String property) {
try {
if (property.equals(RocksDBMetrics.DISK_USAGE)) {
return String.valueOf(this.sstFileManager.getTotalSize());
}
return rocksdb().getProperty(property);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

@Override
public final synchronized Session session() {
return (Session) super.getOrNewSession();
Expand Down Expand Up @@ -398,18 +419,6 @@ public boolean hasChanges() {
return this.batch.count() > 0;
}

/**
* Get property value
*/
@Override
public String property(String property) {
try {
return rocksdb().getProperty(property);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
* Get property value by name from specified table
*/
Expand Down Expand Up @@ -450,7 +459,11 @@ public Integer commit() {
*/
@Override
public void put(String table, byte[] key, byte[] value) {
this.batch.put(cf(table), key, value);
try {
this.batch.put(cf(table), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -460,7 +473,11 @@ public void put(String table, byte[] key, byte[] value) {
*/
@Override
public void merge(String table, byte[] key, byte[] value) {
this.batch.merge(cf(table), key, value);
try {
this.batch.merge(cf(table), key, value);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -480,7 +497,11 @@ public void increase(String table, byte[] key, byte[] value) {
*/
@Override
public void remove(String table, byte[] key) {
this.batch.remove(cf(table), key);
try {
this.batch.singleDelete(cf(table), key);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand All @@ -491,15 +512,23 @@ public void delete(String table, byte[] key) {
byte[] keyFrom = key;
byte[] keyTo = Arrays.copyOf(key, key.length);
keyTo = BinarySerializer.increaseOne(keyTo);
this.batch.deleteRange(cf(table), keyFrom, keyTo);
try {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
* Delete a range of keys from a table
*/
@Override
public void delete(String table, byte[] keyFrom, byte[] keyTo) {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
try {
this.batch.deleteRange(cf(table), keyFrom, keyTo);
} catch (RocksDBException e) {
throw new BackendException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.InsertionOrderUtil;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -86,7 +87,11 @@ public RocksDBStore(final BackendStoreProvider provider,

private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
RocksDBMetrics metrics = new RocksDBMetrics(session);
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
return metrics.getMetrics();
});
}
Expand Down Expand Up @@ -243,6 +248,16 @@ protected String wrapPath(String path) {
return Paths.get(path, this.store).toString();
}

protected Map<String, RocksDBSessions> tableDBMapping() {
Map<String, RocksDBSessions> tableDBMap = InsertionOrderUtil.newMap();
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
tableDBMap.put(table, db);
}
return tableDBMap;
}

@Override
public void close() {
LOG.debug("Store close: {}", this.store);
Expand Down Expand Up @@ -305,10 +320,9 @@ public void init() {
}

// Create table with optimized disk
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
this.createTable(db, table);
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
this.createTable(e.getValue(), e.getKey());
}

LOG.debug("Store initialized: {}", this.store);
Expand All @@ -332,10 +346,9 @@ public void clear() {
}

// Drop table with optimized disk
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
RocksDBSessions db = db(e.getValue());
this.dropTable(db, table);
Map<String, RocksDBSessions> tableDBMap = this.tableDBMapping();
for (Map.Entry<String, RocksDBSessions> e : tableDBMap.entrySet()) {
this.dropTable(e.getValue(), e.getKey());
}

LOG.debug("Store cleared: {}", this.store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public void dropTable(String table) throws RocksDBException {
this.tables.remove(table);
}

@Override
public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down Expand Up @@ -217,14 +222,6 @@ public Integer commit() {
return count;
}

/**
* Get property value
*/
@Override
public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

/**
* Get property value by name from specified table
*/
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
<httpclient.version>4.5.2</httpclient.version>
<datastax.cassandra.version>3.2.0</datastax.cassandra.version>
<apache.cassandra.version>3.10</apache.cassandra.version>
<rocksdb.version>5.8.6</rocksdb.version>
<rocksdb.version>5.17.2</rocksdb.version>
<hbase.client.version>2.0.0</hbase.client.version>
<mysql.driver.version>5.1.45</mysql.driver.version>
<jersey.version>2.25.1</jersey.version>
Expand Down