Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(compress) #126

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
7fea1c6
fix options
foreverneverer Sep 3, 2020
e1602fe
fix options
foreverneverer Sep 3, 2020
fa94399
fix options
foreverneverer Sep 3, 2020
88b5e88
fix options
foreverneverer Sep 3, 2020
a098625
fix comment
foreverneverer Sep 4, 2020
2a09369
fix comment
foreverneverer Sep 4, 2020
d9acc5a
Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInt…
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class PegasusClient implements PegasusClientInterface {
private final Object tableMapLock;
private Cluster cluster;

private static class PegasusHasher implements KeyHasher {
public static class PegasusHasher implements KeyHasher {
@Override
public long hash(byte[] key) {
Validate.isTrue(key != null && key.length >= 2);
Expand All @@ -43,18 +43,17 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
return getTable(tableName, new TableOptions());
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
private PegasusTable getTable(String tableName, TableOptions tableOptions) throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
Table internalTable = cluster.openTable(tableName, tableOptions);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
Expand Down Expand Up @@ -191,7 +190,13 @@ public PegasusTableInterface openTable(String tableName) throws PException {
@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
return getTable(tableName, new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs));
}

@Override
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException {
return getTable(tableName, tableOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.rpc.TableOptions;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
import java.util.*;
import org.apache.commons.lang3.tuple.Pair;

Expand Down Expand Up @@ -44,6 +45,9 @@ public interface PegasusClientInterface {
/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* <p>Note: this interface is deprecated, retaining it only for compatibility, please see {@link
* PegasusClientInterface#openTable(String, TableOptions)}
*
* <p>Please notice that pegasus support two kinds of API: 1. the client-interface way, which is
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
* provided in this class. 2. the table-interface way, which is provided by {@link
* PegasusTableInterface}. With the client-interface, you don't need to create
Expand All @@ -61,9 +65,33 @@ public interface PegasusClientInterface {
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
@Deprecated
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* <p>Please notice that pegasus support two kinds of API: 1. the client-interface way, which is
* provided in this class. 2. the table-interface way, which is provided by {@link
* PegasusTableInterface}. With the client-interface, you don't need to create
* PegasusTableInterface by openTable, so you can access the pegasus cluster conveniently.
* However, the client-interface's api also has some restrictions: 1. we don't provide async
* methods in client-interface. 2. the timeout in client-interface isn't as accurate as the
* table-interface. 3. the client-interface may throw an exception when open table fails. It means
* that you may need to handle this exception in every data access operation, which is annoying.
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* * administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
*/
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;

/**
* Check value exist by key from the cluster
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
41 changes: 32 additions & 9 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,34 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc;

import com.xiaomi.infra.pegasus.client.PegasusClient.PegasusHasher;

/** TableOptions is the internal options for opening a Pegasus table. */
public class TableOptions {
private final KeyHasher keyHasher;
private final int backupRequestDelayMs;
private KeyHasher keyHasher;
private int backupRequestDelayMs;
private boolean enableCompression;

public TableOptions() {
this.keyHasher = new PegasusHasher();
this.backupRequestDelayMs = 0;
this.enableCompression = false;
}

public TableOptions withKeyHasher(KeyHasher keyHasher) {
this.keyHasher = keyHasher;
return this;
}

public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
return this;
}

public TableOptions withCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
return this;
}

public KeyHasher keyHasher() {
return this.keyHasher;
Expand All @@ -16,16 +40,15 @@ public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}

public static TableOptions forTest() {
return new TableOptions(KeyHasher.DEFAULT, 0);
public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public TableOptions(KeyHasher h, int backupRequestDelay) {
this.keyHasher = h;
this.backupRequestDelayMs = backupRequestDelay;
public boolean enableCompression() {
return enableCompression;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
public static TableOptions forTest() {
return new TableOptions().withKeyHasher(KeyHasher.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@

public class BackupRequestInterceptor implements TableInterceptor {

private boolean isOpen;

public BackupRequestInterceptor(boolean isOpen) {
this.isOpen = isOpen;
}

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
backupCall(clientRequestRound, tableHandler);
Expand All @@ -33,7 +27,7 @@ public void after(
}

private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) {
if (!clientRequestRound.getOperator().supportBackupRequest()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.xiaomi.infra.pegasus.rpc.interceptor;

import com.xiaomi.infra.pegasus.apps.key_value;
import com.xiaomi.infra.pegasus.apps.mutate;
import com.xiaomi.infra.pegasus.base.error_code.error_types;
import com.xiaomi.infra.pegasus.operator.client_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_multi_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_put_operator;
import com.xiaomi.infra.pegasus.operator.rrdb_scan_operator;
import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.tools.ZstdWrapper;
import java.util.List;

public class CompressionInterceptor implements TableInterceptor {

@Override
public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
tryCompress(clientRequestRound);
}

@Override
public void after(
ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) {
if (errno != error_types.ERR_OK) {
return;
}
tryDecompress(clientRequestRound);
}

private void tryCompress(ClientRequestRound clientRequestRound) {
client_operator operator = clientRequestRound.getOperator();
if (operator instanceof rrdb_put_operator) {
rrdb_put_operator put = (rrdb_put_operator) operator;
put.get_request().value.data = ZstdWrapper.compress(put.get_request().value.data);
return;
}

if (operator instanceof rrdb_multi_put_operator) {
List<key_value> kvs = ((rrdb_multi_put_operator) operator).get_request().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.compress(kv.value.data);
}
return;
}

if (operator instanceof rrdb_check_and_set_operator) {
rrdb_check_and_set_operator check_and_set = (rrdb_check_and_set_operator) operator;
check_and_set.get_request().set_value.data =
ZstdWrapper.compress(check_and_set.get_request().set_value.data);
return;
}

if (operator instanceof rrdb_check_and_mutate_operator) {
List<mutate> mutates = ((rrdb_check_and_mutate_operator) operator).get_request().mutate_list;
for (mutate mu : mutates) {
mu.value.data = ZstdWrapper.compress(mu.value.data);
}
}
}

private void tryDecompress(ClientRequestRound clientRequestRound) {
client_operator operator = clientRequestRound.getOperator();

if (operator instanceof rrdb_get_operator) {
rrdb_get_operator get = (rrdb_get_operator) operator;
get.get_response().value.data = ZstdWrapper.tryDecompress(get.get_response().value.data);
return;
}

if (operator instanceof rrdb_multi_get_operator) {
List<key_value> kvs = ((rrdb_multi_get_operator) operator).get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
return;
}

if (operator instanceof rrdb_scan_operator) {
List<key_value> kvs = ((rrdb_scan_operator) operator).get_response().kvs;
for (key_value kv : kvs) {
kv.value.data = ZstdWrapper.tryDecompress(kv.value.data);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ public class InterceptorManger {
private List<TableInterceptor> interceptors = new ArrayList<>();

public InterceptorManger(TableOptions options) {
register(new BackupRequestInterceptor(options.enableBackupRequest()));
this.register(new BackupRequestInterceptor(), options.enableBackupRequest())
.register(new CompressionInterceptor(), options.enableCompression());
}

private InterceptorManger register(TableInterceptor interceptor) {
interceptors.add(interceptor);
private InterceptorManger register(TableInterceptor interceptor, boolean enable) {
if (enable) {
interceptors.add(interceptor);
}
return this;
}

Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ public class ZstdWrapper {

private ZstdWrapper() {}

/**
* try decompress the `src`, return `src` directly if failed
*
* @param src the origin sending value
* @return the decompressed value.
*/
public static byte[] tryDecompress(byte[] src) {
byte[] decompressedValue;
try {
decompressedValue = decompress(src);
} catch (PException e) {
// decompress fail
decompressedValue = src;
}
return decompressedValue;
}

/**
* Compresses the `src` and returns the compressed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.xiaomi.infra.pegasus.rpc.async;

import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.client.PException;
import com.xiaomi.infra.pegasus.client.PegasusClientFactory;
import com.xiaomi.infra.pegasus.client.PegasusTableInterface;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

public class InterceptorTest {
@Test
public void testCompressionInterceptor() throws PException {
PegasusTableInterface commonTable =
PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp");
PegasusTableInterface compressTable =
PegasusClientFactory.createClient(ClientOptions.create())
.openTable("temp", new TableOptions().withCompression(true));

byte[] hashKey = "hashKey".getBytes();
byte[] sortKey = "sortKey".getBytes();
byte[] commonValue = "commonValue".getBytes();
byte[] compressionValue = "compressionValue".getBytes();

// if origin value was not compressed, both commonTable and compressTable can read origin value
commonTable.set(hashKey, sortKey, commonValue, 10000);
Assertions.assertEquals(
new String(commonTable.get(hashKey, sortKey, 10000)), new String(commonValue));
Assertions.assertEquals(
new String(compressTable.get(hashKey, sortKey, 10000)), new String(commonValue));

// if origin value was compressed, only compressTable can read successfully
compressTable.set(hashKey, sortKey, compressionValue, 10000);
Assertions.assertNotEquals(
new String(commonTable.get(hashKey, sortKey, 10000)), new String(compressionValue));
Assertions.assertEquals(
new String(compressTable.get(hashKey, sortKey, 10000)), new String(compressionValue));
}
}