From ed7d4c7e4175603467bb6a9c6e22ee84cafc2ac9 Mon Sep 17 00:00:00 2001 From: Shuo Date: Fri, 4 Sep 2020 16:33:22 +0800 Subject: [PATCH] feat: using "interceptor" to enhance the api(compress) (#126) --- .../infra/pegasus/client/PegasusClient.java | 20 +++-- .../client/PegasusClientInterface.java | 23 +++-- .../infra/pegasus/client/TableOptions.java | 37 ++++++++ .../rrdb_check_and_mutate_operator.java | 4 + .../operator/rrdb_check_and_set_operator.java | 4 + .../operator/rrdb_multi_put_operator.java | 4 + .../pegasus/operator/rrdb_put_operator.java | 4 + .../com/xiaomi/infra/pegasus/rpc/Cluster.java | 2 +- .../pegasus/rpc/InternalTableOptions.java | 25 ++++++ .../infra/pegasus/rpc/TableOptions.java | 31 ------- .../pegasus/rpc/async/ClusterManager.java | 7 +- .../infra/pegasus/rpc/async/TableHandler.java | 10 +-- .../interceptor/BackupRequestInterceptor.java | 11 ++- .../interceptor/CompressionInterceptor.java | 90 +++++++++++++++++++ .../rpc/interceptor/InterceptorManger.java | 13 +-- .../infra/pegasus/tools/ZstdWrapper.java | 17 ++++ .../infra/pegasus/client/TestPException.java | 6 +- .../pegasus/rpc/async/ClusterManagerTest.java | 8 +- .../pegasus/rpc/async/InterceptorTest.java | 39 ++++++++ .../pegasus/rpc/async/TableHandlerTest.java | 8 +- .../pegasus/rpc/async/TimeoutBenchmark.java | 4 +- 21 files changed, 291 insertions(+), 76 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java create mode 100644 src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 73c56b7e..f80dc8b6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -43,18 +43,18 @@ public long hash(byte[] key) { } private PegasusTable getTable(String tableName) throws PException { - return getTable(tableName, 0); + return getTable(tableName, new InternalTableOptions(new PegasusHasher(), new TableOptions())); } - private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException { + private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions) + throws PException { PegasusTable table = tableMap.get(tableName); if (table == null) { synchronized (tableMapLock) { table = tableMap.get(tableName); if (table == null) { try { - TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs); - Table internalTable = cluster.openTable(tableName, options); + Table internalTable = cluster.openTable(tableName, internalTableOptions); table = new PegasusTable(this, internalTable); } catch (Throwable e) { throw new PException(e); @@ -191,7 +191,17 @@ public PegasusTableInterface openTable(String tableName) throws PException { @Override public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException { - return getTable(tableName, backupRequestDelayMs); + return getTable( + tableName, + new InternalTableOptions( + new PegasusHasher(), + new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs))); + } + + @Override + public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) + throws PException { + return getTable(tableName, new InternalTableOptions(new PegasusHasher(), tableOptions)); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index f28f9a26..7988638f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -41,6 +41,17 @@ public interface PegasusClientInterface { */ public PegasusTableInterface openTable(String tableName) throws PException; + /** + * Open a table, and prepare the sessions and route-table to the replica-servers. + * + * @deprecated Retained only for backward compatibility, will be removed later. Don't use it any + * more. The latest interface please see {@link PegasusClientInterface#openTable(String, + * TableOptions)} + */ + @Deprecated + public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + throws PException; + /** * Open a table, and prepare the sessions and route-table to the replica-servers. * @@ -55,13 +66,13 @@ public interface PegasusClientInterface { * 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface. * * @param tableName the table should be exist on the server, which is created before by the system - * administrator - * @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <= - * 0, The backup request is disabled. - * @return the table handler - * @throws PException throws exception if any error occurs. + * * administrator + * @param tableOptions control the table feature, such as open backup-request, compress and etc, + * see {@link TableOptions} + * @return + * @throws PException */ - public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) + public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) throws PException; /** diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java new file mode 100644 index 00000000..8fd59d17 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java @@ -0,0 +1,37 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client; + +/** TableOptions is the internal options for opening a Pegasus table. */ +public class TableOptions { + private int backupRequestDelayMs; + private boolean enableCompression; + + public TableOptions() { + this.backupRequestDelayMs = 0; + this.enableCompression = false; + } + + public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) { + this.backupRequestDelayMs = backupRequestDelayMs; + return this; + } + + public TableOptions withCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + return this; + } + + public int backupRequestDelayMs() { + return this.backupRequestDelayMs; + } + + public boolean enableBackupRequest() { + return backupRequestDelayMs > 0; + } + + public boolean enableCompression() { + return enableCompression; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java index ed8a61a1..2c724e21 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_mutate_operator.java @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() { return resp; } + public check_and_mutate_request get_request() { + return request; + } + private check_and_mutate_request request; private check_and_mutate_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java index 4d167b9e..3e04f636 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_check_and_set_operator.java @@ -47,6 +47,10 @@ public check_and_set_response get_response() { return resp; } + public check_and_set_request get_request() { + return request; + } + private check_and_set_request request; private check_and_set_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java index b81e01db..c8399aaa 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_multi_put_operator.java @@ -47,6 +47,10 @@ public update_response get_response() { return resp; } + public multi_put_request get_request() { + return request; + } + private multi_put_request request; private update_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java index 20532851..1b06d2fc 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/rrdb_put_operator.java @@ -46,6 +46,10 @@ public update_response get_response() { return resp; } + public update_request get_request() { + return request; + } + private update_request request; private update_response resp; } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java index 0ec9e59b..719883f1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java @@ -17,7 +17,7 @@ public static Cluster createCluster(ClientOptions clientOptions) public abstract String[] getMetaList(); - public abstract Table openTable(String name, TableOptions options) + public abstract Table openTable(String name, InternalTableOptions options) throws ReplicationException, TException; public abstract void close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java new file mode 100644 index 00000000..20107df9 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/InternalTableOptions.java @@ -0,0 +1,25 @@ +package com.xiaomi.infra.pegasus.rpc; + +import com.xiaomi.infra.pegasus.client.TableOptions; + +public class InternalTableOptions { + private final KeyHasher keyHasher; + private final TableOptions tableOptions; + + public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) { + this.keyHasher = keyHasher; + this.tableOptions = tableOptions; + } + + public KeyHasher keyHasher() { + return keyHasher; + } + + public TableOptions tableOptions() { + return tableOptions; + } + + public static InternalTableOptions forTest() { + return new InternalTableOptions(KeyHasher.DEFAULT, new TableOptions()); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java deleted file mode 100644 index a780959a..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. -package com.xiaomi.infra.pegasus.rpc; - -/** TableOptions is the internal options for opening a Pegasus table. */ -public class TableOptions { - private final KeyHasher keyHasher; - private final int backupRequestDelayMs; - - public KeyHasher keyHasher() { - return this.keyHasher; - } - - public int backupRequestDelayMs() { - return this.backupRequestDelayMs; - } - - public static TableOptions forTest() { - return new TableOptions(KeyHasher.DEFAULT, 0); - } - - public TableOptions(KeyHasher h, int backupRequestDelay) { - this.keyHasher = h; - this.backupRequestDelayMs = backupRequestDelay; - } - - public boolean enableBackupRequest() { - return backupRequestDelayMs > 0; - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 79302b5d..3682d68b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -9,8 +9,8 @@ import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.metrics.MetricsManager; import com.xiaomi.infra.pegasus.rpc.Cluster; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -126,8 +126,9 @@ public String[] getMetaList() { } @Override - public TableHandler openTable(String name, TableOptions options) throws ReplicationException { - return new TableHandler(this, name, options); + public TableHandler openTable(String name, InternalTableOptions internalTableOptions) + throws ReplicationException { + return new TableHandler(this, name, internalTableOptions); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index b2bff7a0..21a24a29 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -13,9 +13,9 @@ import com.xiaomi.infra.pegasus.replication.partition_configuration; import com.xiaomi.infra.pegasus.replication.query_cfg_request; import com.xiaomi.infra.pegasus.replication.query_cfg_response; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.EventExecutor; @@ -53,7 +53,7 @@ static final class TableConfiguration { int backupRequestDelayMs; private InterceptorManger interceptorManger; - public TableHandler(ClusterManager mgr, String name, TableOptions options) + public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions) throws ReplicationException { int i = 0; for (; i < name.length(); i++) { @@ -93,12 +93,12 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) // superclass members tableName_ = name; appID_ = resp.app_id; - hasher_ = options.keyHasher(); + hasher_ = internalTableOptions.keyHasher(); // members of this manager_ = mgr; executor_ = manager_.getExecutor(); - this.backupRequestDelayMs = options.backupRequestDelayMs(); + this.backupRequestDelayMs = internalTableOptions.tableOptions().backupRequestDelayMs(); if (backupRequestDelayMs > 0) { logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs); } @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options) inQuerying_ = new AtomicBoolean(false); lastQueryTime_ = 0; - this.interceptorManger = new InterceptorManger(options); + this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions()); } public ReplicaConfiguration getReplicaConfig(int index) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java index 272d5a5c..c4c39eac 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/BackupRequestInterceptor.java @@ -10,11 +10,10 @@ import java.util.concurrent.TimeUnit; public class BackupRequestInterceptor implements TableInterceptor { + private final long backupRequestDelayMs; - private boolean isOpen; - - public BackupRequestInterceptor(boolean isOpen) { - this.isOpen = isOpen; + public BackupRequestInterceptor(long backupRequestDelayMs) { + this.backupRequestDelayMs = backupRequestDelayMs; } @Override @@ -33,7 +32,7 @@ public void after( } private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) { - if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) { + if (!clientRequestRound.getOperator().supportBackupRequest()) { return; } @@ -59,7 +58,7 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl clientRequestRound.timeoutMs(), true); }, - tableHandler.backupRequestDelayMs(), + backupRequestDelayMs, TimeUnit.MILLISECONDS)); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java new file mode 100644 index 00000000..fb47bd0d --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/CompressionInterceptor.java @@ -0,0 +1,90 @@ +package com.xiaomi.infra.pegasus.rpc.interceptor; + +import com.xiaomi.infra.pegasus.apps.key_value; +import com.xiaomi.infra.pegasus.apps.mutate; +import com.xiaomi.infra.pegasus.base.error_code.error_types; +import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_check_and_mutate_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_check_and_set_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_get_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_multi_get_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_multi_put_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; +import com.xiaomi.infra.pegasus.operator.rrdb_scan_operator; +import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; +import com.xiaomi.infra.pegasus.tools.ZstdWrapper; +import java.util.List; + +public class CompressionInterceptor implements TableInterceptor { + + @Override + public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { + tryCompress(clientRequestRound); + } + + @Override + public void after( + ClientRequestRound clientRequestRound, error_types errno, TableHandler tableHandler) { + if (errno != error_types.ERR_OK) { + return; + } + tryDecompress(clientRequestRound); + } + + private void tryCompress(ClientRequestRound clientRequestRound) { + client_operator operator = clientRequestRound.getOperator(); + if (operator instanceof rrdb_put_operator) { + rrdb_put_operator put = (rrdb_put_operator) operator; + put.get_request().value.data = ZstdWrapper.compress(put.get_request().value.data); + return; + } + + if (operator instanceof rrdb_multi_put_operator) { + List kvs = ((rrdb_multi_put_operator) operator).get_request().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.compress(kv.value.data); + } + return; + } + + if (operator instanceof rrdb_check_and_set_operator) { + rrdb_check_and_set_operator check_and_set = (rrdb_check_and_set_operator) operator; + check_and_set.get_request().set_value.data = + ZstdWrapper.compress(check_and_set.get_request().set_value.data); + return; + } + + if (operator instanceof rrdb_check_and_mutate_operator) { + List mutates = ((rrdb_check_and_mutate_operator) operator).get_request().mutate_list; + for (mutate mu : mutates) { + mu.value.data = ZstdWrapper.compress(mu.value.data); + } + } + } + + private void tryDecompress(ClientRequestRound clientRequestRound) { + client_operator operator = clientRequestRound.getOperator(); + + if (operator instanceof rrdb_get_operator) { + rrdb_get_operator get = (rrdb_get_operator) operator; + get.get_response().value.data = ZstdWrapper.tryDecompress(get.get_response().value.data); + return; + } + + if (operator instanceof rrdb_multi_get_operator) { + List kvs = ((rrdb_multi_get_operator) operator).get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + return; + } + + if (operator instanceof rrdb_scan_operator) { + List kvs = ((rrdb_scan_operator) operator).get_response().kvs; + for (key_value kv : kvs) { + kv.value.data = ZstdWrapper.tryDecompress(kv.value.data); + } + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java index 8535330f..cc1a27e9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/InterceptorManger.java @@ -1,7 +1,7 @@ package com.xiaomi.infra.pegasus.rpc.interceptor; import com.xiaomi.infra.pegasus.base.error_code.error_types; -import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.client.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClientRequestRound; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import java.util.ArrayList; @@ -12,12 +12,13 @@ public class InterceptorManger { private List interceptors = new ArrayList<>(); public InterceptorManger(TableOptions options) { - register(new BackupRequestInterceptor(options.enableBackupRequest())); - } + if (options.enableBackupRequest()) { + interceptors.add(new BackupRequestInterceptor(options.backupRequestDelayMs())); + } - private InterceptorManger register(TableInterceptor interceptor) { - interceptors.add(interceptor); - return this; + if (options.enableCompression()) { + interceptors.add(new CompressionInterceptor()); + } } public void before(ClientRequestRound clientRequestRound, TableHandler tableHandler) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java index eace0af8..004b24b6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java @@ -12,6 +12,23 @@ public class ZstdWrapper { private ZstdWrapper() {} + /** + * try decompress the `src`, return `src` directly if failed + * + * @param src the origin sending value + * @return the decompressed value. + */ + public static byte[] tryDecompress(byte[] src) { + byte[] decompressedValue; + try { + decompressedValue = decompress(src); + } catch (PException e) { + // decompress fail + decompressedValue = src; + } + return decompressedValue; + } + /** * Compresses the `src` and returns the compressed. * diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index 3b529b00..dc5d0e51 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -11,7 +11,7 @@ import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.client.PegasusTable.Request; import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; -import com.xiaomi.infra.pegasus.rpc.TableOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import io.netty.util.concurrent.DefaultPromise; @@ -59,7 +59,7 @@ public void testHandleReplicationException() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", TableOptions.forTest()); + TableHandler table = manager.openTable("temp", InternalTableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); @@ -97,7 +97,7 @@ public void testTimeOutIsZero() throws Exception { String metaList = "127.0.0.1:34601,127.0.0.1:34602, 127.0.0.1:34603"; ClusterManager manager = new ClusterManager(ClientOptions.builder().metaServers(metaList).build()); - TableHandler table = manager.openTable("temp", TableOptions.forTest()); + TableHandler table = manager.openTable("temp", InternalTableOptions.forTest()); DefaultPromise promise = table.newPromise(); update_request req = new update_request(new blob(), new blob(), 100); gpid gpid = table.getGpidByHash(1); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java index 8e9ec9ca..3a232e21 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManagerTest.java @@ -6,8 +6,8 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,7 +50,7 @@ public void testOpenTable() throws Exception { TableHandler result = null; try { - result = testManager.openTable("testName", TableOptions.forTest()); + result = testManager.openTable("testName", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_SESSION_RESET, e.getErrorType()); } finally { @@ -62,7 +62,7 @@ public void testOpenTable() throws Exception { String address_list2 = "127.0.0.1:123,127.0.0.1:34603,127.0.0.1:34601,127.0.0.1:34602"; testManager = new ClusterManager(ClientOptions.builder().metaServers(address_list2).build()); try { - result = testManager.openTable("hehe", TableOptions.forTest()); + result = testManager.openTable("hehe", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.assertEquals(error_code.error_types.ERR_OBJECT_NOT_FOUND, e.getErrorType()); } finally { @@ -71,7 +71,7 @@ public void testOpenTable() throws Exception { // test open an valid table try { - result = testManager.openTable("temp", TableOptions.forTest()); + result = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } finally { diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java new file mode 100644 index 00000000..e53fdeec --- /dev/null +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/InterceptorTest.java @@ -0,0 +1,39 @@ +package com.xiaomi.infra.pegasus.rpc.async; + +import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.client.PException; +import com.xiaomi.infra.pegasus.client.PegasusClientFactory; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface; +import com.xiaomi.infra.pegasus.client.TableOptions; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class InterceptorTest { + @Test + public void testCompressionInterceptor() throws PException { + PegasusTableInterface commonTable = + PegasusClientFactory.createClient(ClientOptions.create()).openTable("temp"); + PegasusTableInterface compressTable = + PegasusClientFactory.createClient(ClientOptions.create()) + .openTable("temp", new TableOptions().withCompression(true)); + + byte[] hashKey = "hashKey".getBytes(); + byte[] sortKey = "sortKey".getBytes(); + byte[] commonValue = "commonValue".getBytes(); + byte[] compressionValue = "compressionValue".getBytes(); + + // if origin value was not compressed, both commonTable and compressTable can read origin value + commonTable.set(hashKey, sortKey, commonValue, 10000); + Assertions.assertEquals( + new String(commonTable.get(hashKey, sortKey, 10000)), new String(commonValue)); + Assertions.assertEquals( + new String(compressTable.get(hashKey, sortKey, 10000)), new String(commonValue)); + + // if origin value was compressed, only compressTable can read successfully + compressTable.set(hashKey, sortKey, compressionValue, 10000); + Assertions.assertNotEquals( + new String(commonTable.get(hashKey, sortKey, 10000)), new String(compressionValue)); + Assertions.assertEquals( + new String(compressTable.get(hashKey, sortKey, 10000)), new String(compressionValue)); + } +} diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java index 66a41311..714c2053 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TableHandlerTest.java @@ -8,8 +8,8 @@ import com.xiaomi.infra.pegasus.base.rpc_address; import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.operator.client_operator; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; import com.xiaomi.infra.pegasus.tools.Toollet; import java.util.ArrayList; @@ -61,7 +61,7 @@ public void testOperateOp() throws Exception { System.out.println("TableHandlerTest#testOperateOp"); TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } @@ -148,7 +148,7 @@ public void testTryQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } @@ -192,7 +192,7 @@ public void testConnectAfterQueryMeta() throws Exception { TableHandler table = null; try { - table = testManager.openTable("temp", TableOptions.forTest()); + table = testManager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { Assert.fail(); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java index 656bb85d..9c1bbe7c 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/TimeoutBenchmark.java @@ -6,8 +6,8 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.InternalTableOptions; import com.xiaomi.infra.pegasus.rpc.ReplicationException; -import com.xiaomi.infra.pegasus.rpc.TableOptions; import com.xiaomi.infra.pegasus.tools.Toollet; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -68,7 +68,7 @@ public void timeoutChecker() { TableHandler handle; try { - handle = manager.openTable("temp", TableOptions.forTest()); + handle = manager.openTable("temp", InternalTableOptions.forTest()); } catch (ReplicationException e) { e.printStackTrace(); Assert.fail();