Skip to content

Commit

Permalink
HBASE-21725 Implement BufferedMutator Based on AsyncBufferedMutator
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 12, 2019
1 parent f47f6df commit 4210ca9
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@
public interface BufferedMutator extends Closeable {
/**
* Key to use setting non-default BufferedMutator implementation in Configuration.
* <p/>
* @deprecated For internal test use only, will be removed in the future, do not use it any more.
*/
@Deprecated
String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";

/**
Expand Down Expand Up @@ -179,12 +182,16 @@ default long getWriteBufferPeriodicFlushTimerTickMs() {

/**
* Set rpc timeout for this mutator instance
* @deprecated Please set this through the {@link BufferedMutatorParams}.
*/
@Deprecated
void setRpcTimeout(int timeout);

/**
* Set operation timeout for this mutator instance
* @deprecated Please set this through the {@link BufferedMutatorParams}.
*/
@Deprecated
void setOperationTimeout(int timeout);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

/**
* {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}.
*/
@InterfaceAudience.Private
class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {

private final AsyncBufferedMutator mutator;

private final ExceptionListener listener;

private List<CompletableFuture<Void>> futures = new ArrayList<>();

private final ConcurrentLinkedQueue<Pair<Mutation, Throwable>> errors =
new ConcurrentLinkedQueue<>();

private final static int BUFFERED_FUTURES_THRESHOLD = 1024;

BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
ExceptionListener listener) {
this.mutator = mutator;
this.listener = listener;
}

@Override
public TableName getName() {
return mutator.getName();
}

@Override
public Configuration getConfiguration() {
return mutator.getConfiguration();
}

@Override
public void mutate(Mutation mutation) throws IOException {
mutate(Collections.singletonList(mutation));
}

private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed");

// not always work, so may return an empty string
private String getHostnameAndPort(Throwable error) {
Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage());
if (matcher.matches()) {
return matcher.group(1);
} else {
return "";
}
}

private RetriesExhaustedWithDetailsException makeError() {
List<Row> rows = new ArrayList<>();
List<Throwable> throwables = new ArrayList<>();
List<String> hostnameAndPorts = new ArrayList<>();
for (;;) {
Pair<Mutation, Throwable> pair = errors.poll();
if (pair == null) {
break;
}
rows.add(pair.getFirst());
throwables.add(pair.getSecond());
hostnameAndPorts.add(getHostnameAndPort(pair.getSecond()));
}
return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
}

@Override
public void mutate(List<? extends Mutation> mutations) throws IOException {
List<CompletableFuture<Void>> toBuffered = new ArrayList<>();
List<CompletableFuture<Void>> fs = mutator.mutate(mutations);
for (int i = 0, n = fs.size(); i < n; i++) {
CompletableFuture<Void> toComplete = new CompletableFuture<>();
final int index = i;
addListener(fs.get(index), (r, e) -> {
if (e != null) {
errors.add(Pair.newPair(mutations.get(index), e));
toComplete.completeExceptionally(e);
} else {
toComplete.complete(r);
}
});
toBuffered.add(toComplete);
}
synchronized (this) {
futures.addAll(toBuffered);
if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
tryCompleteFuture();
}
if (!errors.isEmpty()) {
RetriesExhaustedWithDetailsException error = makeError();
listener.onException(error, this);
}
}
}

private void tryCompleteFuture() {
futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
}

@Override
public void close() throws IOException {
flush();
mutator.close();
}

@Override
public void flush() throws IOException {
mutator.flush();
synchronized (this) {
List<CompletableFuture<Void>> toComplete = this.futures;
this.futures = new ArrayList<>();
try {
CompletableFuture.allOf(toComplete.toArray(new CompletableFuture<?>[toComplete.size()]))
.join();
} catch (CompletionException e) {
// just ignore, we will record the actual error in the errors field
}
if (!errors.isEmpty()) {
RetriesExhaustedWithDetailsException error = makeError();
listener.onException(error, this);
}
}
}

@Override
public long getWriteBufferSize() {
return mutator.getWriteBufferSize();
}

@Override
public void setRpcTimeout(int timeout) {
// no effect
}

@Override
public void setOperationTimeout(int timeout) {
// no effect
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,21 @@ public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs
return this;
}

/**
* @deprecated We use a common timer in the whole client implementation so you can not set it any
* more.
*/
@Deprecated
public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}

/**
* Set the TimerTick how often the buffer timeout if checked.
* @deprecated We use a common timer in the whole client implementation so you can not set it any
* more.
*/
@Deprecated
public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
return this;
Expand Down Expand Up @@ -141,17 +149,23 @@ public BufferedMutatorParams pool(ExecutorService pool) {
}

/**
* @return Name of the class we will use when we construct a
* {@link BufferedMutator} instance or null if default implementation.
* @return Name of the class we will use when we construct a {@link BufferedMutator} instance or
* null if default implementation.
* @deprecated You can not set it any more as the implementation has to use too many internal
* stuffs in HBase.
*/
@Deprecated
public String getImplementationClassName() {
return this.implementationClassName;
}

/**
* Specify a BufferedMutator implementation other than the default.
* @param implementationClassName Name of the BufferedMutator implementation class
* @deprecated You can not set it any more as the implementation has to use too many internal
* stuffs in HBase.
*/
@Deprecated
public BufferedMutatorParams implementationClassName(String implementationClassName) {
this.implementationClassName = implementationClassName;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,24 @@ public Configuration getConfiguration() {

@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
return oldConn.getBufferedMutator(params);
AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
}
if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
builder.setWriteBufferSize(params.getWriteBufferSize());
}
if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
TimeUnit.MILLISECONDS);
}
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
}
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
}

@Override
Expand Down

This file was deleted.

Loading

0 comments on commit 4210ca9

Please sign in to comment.