Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat: using "interceptor" to enhance the api(compress) (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored Sep 4, 2020
1 parent 5d60bd1 commit ed7d4c7
Show file tree
Hide file tree
Showing 21 changed files with 291 additions and 76 deletions.
20 changes: 15 additions & 5 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ public long hash(byte[] key) {
}

private PegasusTable getTable(String tableName) throws PException {
return getTable(tableName, 0);
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), new TableOptions()));
}

private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws PException {
private PegasusTable getTable(String tableName, InternalTableOptions internalTableOptions)
throws PException {
PegasusTable table = tableMap.get(tableName);
if (table == null) {
synchronized (tableMapLock) {
table = tableMap.get(tableName);
if (table == null) {
try {
TableOptions options = new TableOptions(new PegasusHasher(), backupRequestDelayMs);
Table internalTable = cluster.openTable(tableName, options);
Table internalTable = cluster.openTable(tableName, internalTableOptions);
table = new PegasusTable(this, internalTable);
} catch (Throwable e) {
throw new PException(e);
Expand Down Expand Up @@ -191,7 +191,17 @@ public PegasusTableInterface openTable(String tableName) throws PException {
@Override
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException {
return getTable(tableName, backupRequestDelayMs);
return getTable(
tableName,
new InternalTableOptions(
new PegasusHasher(),
new TableOptions().withBackupRequestDelayMs(backupRequestDelayMs)));
}

@Override
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException {
return getTable(tableName, new InternalTableOptions(new PegasusHasher(), tableOptions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ public interface PegasusClientInterface {
*/
public PegasusTableInterface openTable(String tableName) throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
* @deprecated Retained only for backward compatibility, will be removed later. Don't use it any
* more. The latest interface please see {@link PegasusClientInterface#openTable(String,
* TableOptions)}
*/
@Deprecated
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
throws PException;

/**
* Open a table, and prepare the sessions and route-table to the replica-servers.
*
Expand All @@ -55,13 +66,13 @@ public interface PegasusClientInterface {
* 4. You can't specify a per-operation timeout. So we recommend you to use the table-interface.
*
* @param tableName the table should be exist on the server, which is created before by the system
* administrator
* @param backupRequestDelayMs the delay time to send backup request. If backupRequestDelayMs <=
* 0, The backup request is disabled.
* @return the table handler
* @throws PException throws exception if any error occurs.
* * administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
*/
public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs)
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;

/**
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/TableOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

/** TableOptions is the internal options for opening a Pegasus table. */
public class TableOptions {
private int backupRequestDelayMs;
private boolean enableCompression;

public TableOptions() {
this.backupRequestDelayMs = 0;
this.enableCompression = false;
}

public TableOptions withBackupRequestDelayMs(int backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
return this;
}

public TableOptions withCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
return this;
}

public int backupRequestDelayMs() {
return this.backupRequestDelayMs;
}

public boolean enableBackupRequest() {
return backupRequestDelayMs > 0;
}

public boolean enableCompression() {
return enableCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_mutate_response get_response() {
return resp;
}

public check_and_mutate_request get_request() {
return request;
}

private check_and_mutate_request request;
private check_and_mutate_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public check_and_set_response get_response() {
return resp;
}

public check_and_set_request get_request() {
return request;
}

private check_and_set_request request;
private check_and_set_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public update_response get_response() {
return resp;
}

public multi_put_request get_request() {
return request;
}

private multi_put_request request;
private update_response resp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public update_response get_response() {
return resp;
}

public update_request get_request() {
return request;
}

private update_request request;
private update_response resp;
}
2 changes: 1 addition & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static Cluster createCluster(ClientOptions clientOptions)

public abstract String[] getMetaList();

public abstract Table openTable(String name, TableOptions options)
public abstract Table openTable(String name, InternalTableOptions options)
throws ReplicationException, TException;

public abstract void close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.xiaomi.infra.pegasus.rpc;

import com.xiaomi.infra.pegasus.client.TableOptions;

public class InternalTableOptions {
private final KeyHasher keyHasher;
private final TableOptions tableOptions;

public InternalTableOptions(KeyHasher keyHasher, TableOptions tableOptions) {
this.keyHasher = keyHasher;
this.tableOptions = tableOptions;
}

public KeyHasher keyHasher() {
return keyHasher;
}

public TableOptions tableOptions() {
return tableOptions;
}

public static InternalTableOptions forTest() {
return new InternalTableOptions(KeyHasher.DEFAULT, new TableOptions());
}
}
31 changes: 0 additions & 31 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/TableOptions.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.metrics.MetricsManager;
import com.xiaomi.infra.pegasus.rpc.Cluster;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -126,8 +126,9 @@ public String[] getMetaList() {
}

@Override
public TableHandler openTable(String name, TableOptions options) throws ReplicationException {
return new TableHandler(this, name, options);
public TableHandler openTable(String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
return new TableHandler(this, name, internalTableOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import com.xiaomi.infra.pegasus.replication.partition_configuration;
import com.xiaomi.infra.pegasus.replication.query_cfg_request;
import com.xiaomi.infra.pegasus.replication.query_cfg_response;
import com.xiaomi.infra.pegasus.rpc.InternalTableOptions;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.TableOptions;
import com.xiaomi.infra.pegasus.rpc.interceptor.InterceptorManger;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.EventExecutor;
Expand Down Expand Up @@ -53,7 +53,7 @@ static final class TableConfiguration {
int backupRequestDelayMs;
private InterceptorManger interceptorManger;

public TableHandler(ClusterManager mgr, String name, TableOptions options)
public TableHandler(ClusterManager mgr, String name, InternalTableOptions internalTableOptions)
throws ReplicationException {
int i = 0;
for (; i < name.length(); i++) {
Expand Down Expand Up @@ -93,12 +93,12 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
// superclass members
tableName_ = name;
appID_ = resp.app_id;
hasher_ = options.keyHasher();
hasher_ = internalTableOptions.keyHasher();

// members of this
manager_ = mgr;
executor_ = manager_.getExecutor();
this.backupRequestDelayMs = options.backupRequestDelayMs();
this.backupRequestDelayMs = internalTableOptions.tableOptions().backupRequestDelayMs();
if (backupRequestDelayMs > 0) {
logger.info("the delay time of backup request is \"{}\"", backupRequestDelayMs);
}
Expand All @@ -109,7 +109,7 @@ public TableHandler(ClusterManager mgr, String name, TableOptions options)
inQuerying_ = new AtomicBoolean(false);
lastQueryTime_ = 0;

this.interceptorManger = new InterceptorManger(options);
this.interceptorManger = new InterceptorManger(internalTableOptions.tableOptions());
}

public ReplicaConfiguration getReplicaConfig(int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import java.util.concurrent.TimeUnit;

public class BackupRequestInterceptor implements TableInterceptor {
private final long backupRequestDelayMs;

private boolean isOpen;

public BackupRequestInterceptor(boolean isOpen) {
this.isOpen = isOpen;
public BackupRequestInterceptor(long backupRequestDelayMs) {
this.backupRequestDelayMs = backupRequestDelayMs;
}

@Override
Expand All @@ -33,7 +32,7 @@ public void after(
}

private void backupCall(ClientRequestRound clientRequestRound, TableHandler tableHandler) {
if (!isOpen || !clientRequestRound.getOperator().supportBackupRequest()) {
if (!clientRequestRound.getOperator().supportBackupRequest()) {
return;
}

Expand All @@ -59,7 +58,7 @@ private void backupCall(ClientRequestRound clientRequestRound, TableHandler tabl
clientRequestRound.timeoutMs(),
true);
},
tableHandler.backupRequestDelayMs(),
backupRequestDelayMs,
TimeUnit.MILLISECONDS));
}
}
Loading

0 comments on commit ed7d4c7

Please sign in to comment.