Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: using "interceptor" to enhance the api(compress) #126

Merged
merged 36 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d6b0d19
init
foreverneverer Aug 25, 2020
9dae4ce
init the api
foreverneverer Aug 25, 2020
a3637aa
add compress interceptor
foreverneverer Aug 26, 2020
cd24eed
fix open
foreverneverer Aug 27, 2020
482e335
move and after
foreverneverer Aug 31, 2020
2edb6a0
move and after
foreverneverer Aug 31, 2020
e7ecefb
move and after
foreverneverer Aug 31, 2020
49ac5fd
move and after
foreverneverer Aug 31, 2020
ef7aa21
merge lastest code
foreverneverer Aug 31, 2020
44eb64a
fix
foreverneverer Sep 1, 2020
e7c98c8
fix
foreverneverer Sep 1, 2020
151dd6a
fix
foreverneverer Sep 1, 2020
e3626d6
merge
foreverneverer Sep 2, 2020
181ed6f
merge
foreverneverer Sep 2, 2020
d8f1daa
merge
foreverneverer Sep 2, 2020
e4d5ba5
merge
foreverneverer Sep 2, 2020
660ac79
merge master
foreverneverer Sep 2, 2020
30e1567
fix
foreverneverer Sep 2, 2020
501a155
fix
foreverneverer Sep 2, 2020
5b4dcc0
fix
foreverneverer Sep 2, 2020
08fd240
fix
foreverneverer Sep 2, 2020
834888e
fix
foreverneverer Sep 2, 2020
3ced11e
add test
foreverneverer Sep 2, 2020
148b584
add test
foreverneverer Sep 2, 2020
4c0998e
add interface
foreverneverer Sep 2, 2020
9850a37
add interface
foreverneverer Sep 2, 2020
3aea6ca
add interface
foreverneverer Sep 2, 2020
ad8140b
add interface
foreverneverer Sep 2, 2020
779a80f
add interface
foreverneverer Sep 2, 2020
7fea1c6
fix options
foreverneverer Sep 3, 2020
e1602fe
fix options
foreverneverer Sep 3, 2020
fa94399
fix options
foreverneverer Sep 3, 2020
88b5e88
fix options
foreverneverer Sep 3, 2020
a098625
fix comment
foreverneverer Sep 4, 2020
2a09369
fix comment
foreverneverer Sep 4, 2020
d9acc5a
Update src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInt…
foreverneverer Sep 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), new TableOptions()));
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions)
throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
Table internalTable = cluster.openTable(tableName, internalTableOptions);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
Expand Down Expand Up @@ -191,7 +191,17 @@ public PegasusTableInterface openTable(String tableName) throws PException {
@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
return getTable(
tableName,
new InternalTableOptions(
new PegasusHasher(),
new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs)));
}

@Override
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException {
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), tableOptions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ public interface PegasusClientInterface {
*/
public PegasusTableInterface openTable(String tableName) throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* @deprecated retaining it only for compatibility, we will remove it later, don't use it any
* more. the latest interface please see {@link PegasusClientInterface#openTable(String,
* TableOptions)}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
*/
@Deprecated
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
Expand All @@ -55,13 +66,13 @@ public interface PegasusClientInterface {
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* administrator
* @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <=
* 0, The backup request is disabled.
* @return the table handler
* @throws PException throws exception if any error occurs.
* * administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
*/
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;

/**
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

/** TableOptions is the internal options for opening a Pegasus table. */
public class TableOptions {
private int backupRequestDelayMs;
private boolean enableCompression;

public TableOptions() {
this.backupRequestDelayMs = 0;
this.enableCompression = false;
}

public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
return this;
}

public TableOptions withCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
return this;
}

public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public boolean enableCompression() {
return enableCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Cluster createCluster(ClientOptions clientOptions)

public abstract String[] getMetaList();

public abstract Table openTable(String name, TableOptions options)
public abstract Table openTable(String name, InternalTableOptions options)
throws ReplicationException, TException;

public abstract void close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.xiaomi.infra.pegasus.rpc;

import com.xiaomi.infra.pegasus.client.TableOptions;

public class InternalTableOptions {
private final KeyHasher keyHasher;
private final TableOptions tableOptions;

public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) {
this.keyHasher = keyHasher;
this.tableOptions = tableOptions;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

public KeyHasher keyHasher() {
return keyHasher;
}

public TableOptions tableOptions() {
return tableOptions;
}

public static InternalTableOptions forTest() {
return new InternalTableOptions(KeyHasher.DEFAULT, new TableOptions());
}
}
31 changes: 0 additions & 31 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.metrics.MetricsManager;
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -126,8 +126,9 @@ public String[] getMetaList() {
}

@Override
public TableHandler openTable(String name, TableOptions options) throws ReplicationException {
return new TableHandler(this, name, options);
public TableHandler openTable(String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
return new TableHandler(this, name, internalTableOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import com.xiaomi.infra.pegasus.replication.partition_configuration;
import com.xiaomi.infra.pegasus.replication.query_cfg_request;
import com.xiaomi.infra.pegasus.replication.query_cfg_response;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
Expand Down Expand Up @@ -53,7 +53,7 @@ static final class TableConfiguration {
int backupRequestDelayMs;
private InterceptorManger interceptorManger;

public TableHandler(ClusterManager mgr, String name, TableOptions options)
public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
int i = 0;
for (; i < name.length(); i++) {
Expand Down Expand Up @@ -93,12 +93,12 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
// superclass members
tableName_ = name;
appID_ = resp.app_id;
hasher_ = options.keyHasher();
hasher_ = internalTableOptions.keyHasher();

// members of this
manager_ = mgr;
executor_ = manager_.getExecutor();
this.backupRequestDelayMs = options.backupRequestDelayMs();
this.backupRequestDelayMs = internalTableOptions.tableOptions().backupRequestDelayMs();
if (backupRequestDelayMs > 0) {
logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs);
}
Expand All @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManger = new InterceptorManger(options);
this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import java.util.concurrent.TimeUnit;

public class BackupRequestInterceptor implements TableInterceptor {
private final long backupRequestDelayMs;

private boolean isOpen;

public BackupRequestInterceptor(boolean isOpen) {
this.isOpen = isOpen;
public BackupRequestInterceptor(long backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
}

@Override
Expand All @@ -33,7 +32,7 @@ public void after(
}

private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) {
if (!clientRequestRound.getOperator().supportBackupRequest()) {
return;
}

Expand All @@ -59,7 +58,7 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl
clientRequestRound.timeoutMs(),
true);
},
tableHandler.backupRequestDelayMs(),
backupRequestDelayMs,
TimeUnit.MILLISECONDS));
}
}
Loading