Skip to content

Commit

Permalink
HBASE-23146 Support CheckAndMutate with multiple conditions (#1114)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
brfrn169 authored Feb 25, 2020
1 parent 9f223c2 commit ecbed33
Show file tree
Hide file tree
Showing 23 changed files with 1,454 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -289,6 +290,60 @@ default CheckAndMutateBuilder ifEquals(byte[] value) {
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}

/**
* Atomically checks if a row matches the specified filter. If it does, it adds the
* Put/Delete/RowMutations.
* <p>
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
* execute it. This is a fluent style API, the code is like:
*
* <pre>
* <code>
* table.checkAndMutate(row, filter).thenPut(put)
* .thenAccept(succ -> {
* if (succ) {
* System.out.println("Check and put succeeded");
* } else {
* System.out.println("Check and put failed");
* }
* });
* </code>
* </pre>
*/
CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter);

/**
* A helper class for sending checkAndMutate request with a filter.
*/
interface CheckAndMutateWithFilterBuilder {

/**
* @param timeRange time range to check.
*/
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);

/**
* @param put data to put if check succeeds
* @return {@code true} if the new put was executed, {@code false} otherwise. The return value
* will be wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenPut(Put put);

/**
* @param delete data to delete if check succeeds
* @return {@code true} if the new delete was executed, {@code false} otherwise. The return
* value will be wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenDelete(Delete delete);

/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise. The return value will be
* wrapped by a {@link CompletableFuture}.
*/
CompletableFuture<Boolean> thenMutate(RowMutations mutation);
}

/**
* Performs multiple mutations atomically on a single row. Currently {@link Put} and
* {@link Delete} are supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -173,6 +174,36 @@ public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value)
};
}

@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilder() {

private final CheckAndMutateWithFilterBuilder builder =
rawTable.checkAndMutate(row, filter);

@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
builder.timeRange(timeRange);
return this;
}

@Override
public CompletableFuture<Boolean> thenPut(Put put) {
return wrap(builder.thenPut(put));
}

@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
return wrap(builder.thenDelete(delete));
}

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
return wrap(builder.thenMutate(mutation));
}
};
}

@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return wrap(rawTable.mutateRow(mutation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -65,7 +65,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;

/**
* The implementation of RawAsyncTable.
Expand Down Expand Up @@ -320,10 +319,10 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
Expand All @@ -332,23 +331,23 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm),
resp -> resp.getExists()))
.call();
}
Expand All @@ -359,6 +358,68 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new CheckAndMutateBuilderImpl(row, family);
}


private final class CheckAndMutateWithFilterBuilderImpl
implements CheckAndMutateWithFilterBuilder {

private final byte[] row;

private final Filter filter;

private TimeRange timeRange;

public CheckAndMutateWithFilterBuilderImpl(byte[] row, Filter filter) {
this.row = Preconditions.checkNotNull(row, "row is null");
this.filter = Preconditions.checkNotNull(filter, "filter is null");
}

@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

@Override
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
return RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}

@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
return RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}

@Override
public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
return RawAsyncTableImpl.this.<Boolean> newCaller(row, mutation.getMaxPriority(),
rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, null, null, null, null,
filter, timeRange, rm),
resp -> resp.getExists()))
.call();
}
}

@Override
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilderImpl(row, filter);
}

// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -355,6 +356,53 @@ default CheckAndMutateBuilder ifEquals(byte[] value) {
* @return {@code true} if the new delete was executed, {@code false} otherwise.
*/
boolean thenDelete(Delete delete) throws IOException;

/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise.
*/
boolean thenMutate(RowMutations mutation) throws IOException;
}

/**
* Atomically checks if a row matches the specified filter. If it does, it adds the
* Put/Delete/RowMutations.
* <p>
* Use the returned {@link CheckAndMutateWithFilterBuilder} to construct your request and then
* execute it. This is a fluent style API, the code is like:
*
* <pre>
* <code>
* table.checkAndMutate(row, filter).thenPut(put);
* </code>
* </pre>
*/
default CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
throw new NotImplementedException("Add an implementation!");
}

/**
* A helper class for sending checkAndMutate request with a filter.
*/
interface CheckAndMutateWithFilterBuilder {

/**
* @param timeRange timeRange to check
*/
CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange);

/**
* @param put data to put if check succeeds
* @return {@code true} if the new put was executed, {@code false} otherwise.
*/
boolean thenPut(Put put) throws IOException;

/**
* @param delete data to delete if check succeeds
* @return {@code true} if the new delete was executed, {@code false} otherwise.
*/
boolean thenDelete(Delete delete) throws IOException;

/**
* @param mutation mutations to perform if check succeeds
* @return true if the new mutation was executed, false otherwise.
Expand Down
Loading

0 comments on commit ecbed33

Please sign in to comment.