diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 7805f77e30e1..d62994723e2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -62,7 +62,10 @@
public interface BufferedMutator extends Closeable {
/**
* Key to use setting non-default BufferedMutator implementation in Configuration.
+ *
+ * @deprecated For internal test use only, will be removed in the future, do not use it any more.
*/
+ @Deprecated
String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
/**
@@ -179,12 +182,16 @@ default long getWriteBufferPeriodicFlushTimerTickMs() {
/**
* Set rpc timeout for this mutator instance
+ * @deprecated Please set this through the {@link BufferedMutatorParams}.
*/
+ @Deprecated
void setRpcTimeout(int timeout);
/**
* Set operation timeout for this mutator instance
+ * @deprecated Please set this through the {@link BufferedMutatorParams}.
*/
+ @Deprecated
void setOperationTimeout(int timeout);
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
new file mode 100644
index 000000000000..a7d4595c4756
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}.
+ */
+@InterfaceAudience.Private
+class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator {
+
+ private final AsyncBufferedMutator mutator;
+
+ private final ExceptionListener listener;
+
+ private List> futures = new ArrayList<>();
+
+ private final ConcurrentLinkedQueue> errors =
+ new ConcurrentLinkedQueue<>();
+
+ private final static int BUFFERED_FUTURES_THRESHOLD = 1024;
+
+ BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator,
+ ExceptionListener listener) {
+ this.mutator = mutator;
+ this.listener = listener;
+ }
+
+ @Override
+ public TableName getName() {
+ return mutator.getName();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return mutator.getConfiguration();
+ }
+
+ @Override
+ public void mutate(Mutation mutation) throws IOException {
+ mutate(Collections.singletonList(mutation));
+ }
+
+ private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed");
+
+ // not always work, so may return an empty string
+ private String getHostnameAndPort(Throwable error) {
+ Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage());
+ if (matcher.matches()) {
+ return matcher.group(1);
+ } else {
+ return "";
+ }
+ }
+
+ private RetriesExhaustedWithDetailsException makeError() {
+ List rows = new ArrayList<>();
+ List throwables = new ArrayList<>();
+ List hostnameAndPorts = new ArrayList<>();
+ for (;;) {
+ Pair pair = errors.poll();
+ if (pair == null) {
+ break;
+ }
+ rows.add(pair.getFirst());
+ throwables.add(pair.getSecond());
+ hostnameAndPorts.add(getHostnameAndPort(pair.getSecond()));
+ }
+ return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts);
+ }
+
+ @Override
+ public void mutate(List extends Mutation> mutations) throws IOException {
+ List> toBuffered = new ArrayList<>();
+ List> fs = mutator.mutate(mutations);
+ for (int i = 0, n = fs.size(); i < n; i++) {
+ CompletableFuture toComplete = new CompletableFuture<>();
+ final int index = i;
+ addListener(fs.get(index), (r, e) -> {
+ if (e != null) {
+ errors.add(Pair.newPair(mutations.get(index), e));
+ toComplete.completeExceptionally(e);
+ } else {
+ toComplete.complete(r);
+ }
+ });
+ toBuffered.add(toComplete);
+ }
+ synchronized (this) {
+ futures.addAll(toBuffered);
+ if (futures.size() > BUFFERED_FUTURES_THRESHOLD) {
+ tryCompleteFuture();
+ }
+ if (!errors.isEmpty()) {
+ RetriesExhaustedWithDetailsException error = makeError();
+ listener.onException(error, this);
+ }
+ }
+ }
+
+ private void tryCompleteFuture() {
+ futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ mutator.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ mutator.flush();
+ synchronized (this) {
+ List> toComplete = this.futures;
+ this.futures = new ArrayList<>();
+ try {
+ CompletableFuture.allOf(toComplete.toArray(new CompletableFuture>[toComplete.size()]))
+ .join();
+ } catch (CompletionException e) {
+ // just ignore, we will record the actual error in the errors field
+ }
+ if (!errors.isEmpty()) {
+ RetriesExhaustedWithDetailsException error = makeError();
+ listener.onException(error, this);
+ }
+ }
+ }
+
+ @Override
+ public long getWriteBufferSize() {
+ return mutator.getWriteBufferSize();
+ }
+
+ @Override
+ public void setRpcTimeout(int timeout) {
+ // no effect
+ }
+
+ @Override
+ public void setOperationTimeout(int timeout) {
+ // no effect
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 3f6c56570e73..02bdf0ecd07b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -101,13 +101,21 @@ public BufferedMutatorParams setWriteBufferPeriodicFlushTimeoutMs(long timeoutMs
return this;
}
+ /**
+ * @deprecated We use a common timer in the whole client implementation so you can not set it any
+ * more.
+ */
+ @Deprecated
public long getWriteBufferPeriodicFlushTimerTickMs() {
return writeBufferPeriodicFlushTimerTickMs;
}
/**
* Set the TimerTick how often the buffer timeout if checked.
+ * @deprecated We use a common timer in the whole client implementation so you can not set it any
+ * more.
*/
+ @Deprecated
public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) {
this.writeBufferPeriodicFlushTimerTickMs = timerTickMs;
return this;
@@ -141,9 +149,12 @@ public BufferedMutatorParams pool(ExecutorService pool) {
}
/**
- * @return Name of the class we will use when we construct a
- * {@link BufferedMutator} instance or null if default implementation.
+ * @return Name of the class we will use when we construct a {@link BufferedMutator} instance or
+ * null if default implementation.
+ * @deprecated You can not set it any more as the implementation has to use too many internal
+ * stuffs in HBase.
*/
+ @Deprecated
public String getImplementationClassName() {
return this.implementationClassName;
}
@@ -151,7 +162,10 @@ public String getImplementationClassName() {
/**
* Specify a BufferedMutator implementation other than the default.
* @param implementationClassName Name of the BufferedMutator implementation class
+ * @deprecated You can not set it any more as the implementation has to use too many internal
+ * stuffs in HBase.
*/
+ @Deprecated
public BufferedMutatorParams implementationClassName(String implementationClassName) {
this.implementationClassName = implementationClassName;
return this;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index dfe7d8fb085b..8ec7ab83504b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -87,7 +87,24 @@ public Configuration getConfiguration() {
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
- return oldConn.getBufferedMutator(params);
+ AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
+ if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) {
+ builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) {
+ builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS);
+ }
+ if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) {
+ builder.setWriteBufferSize(params.getWriteBufferSize());
+ }
+ if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) {
+ builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
+ builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
+ }
+ return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
}
@Override
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
deleted file mode 100644
index 96bb8461284e..000000000000
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-@Category({ SmallTests.class, ClientTests.class })
-public class TestBufferedMutator {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestBufferedMutator.class);
-
- @Rule
- public TestName name = new TestName();
-
- /**
- * My BufferedMutator. Just to prove that I can insert a BM other than default.
- */
- public static class MyBufferedMutator extends BufferedMutatorImpl {
- MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory,
- RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
- super(conn, rpcCallerFactory, rpcFactory, params);
- }
- }
-
- @Test
- public void testAlternateBufferedMutatorImpl() throws IOException {
- BufferedMutatorParams params =
- new BufferedMutatorParams(TableName.valueOf(name.getMethodName()));
- Configuration conf = HBaseConfiguration.create();
- conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName());
- try (ConnectionImplementation connection = ConnectionFactory.createConnectionImpl(conf, null,
- UserProvider.instantiate(conf).getCurrent())) {
- BufferedMutator bm = connection.getBufferedMutator(params);
- // Assert we get default BM if nothing specified.
- assertTrue(bm instanceof BufferedMutatorImpl);
- // Now try and set my own BM implementation.
- params.implementationClassName(MyBufferedMutator.class.getName());
- bm = connection.getBufferedMutator(params);
- assertTrue(bm instanceof MyBufferedMutator);
- }
- // Now try creating a Connection after setting an alterate BufferedMutator into
- // the configuration and confirm we get what was expected.
- conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
- try (Connection connection = ConnectionFactory.createConnectionImpl(conf, null,
- UserProvider.instantiate(conf).getCurrent())) {
- BufferedMutator bm = connection.getBufferedMutator(params);
- assertTrue(bm instanceof MyBufferedMutator);
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
new file mode 100644
index 000000000000..23e69ee82b1f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestBufferedMutator {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBufferedMutator.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("test");
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private static byte[] CQ = Bytes.toBytes("cq");
+
+ private static int COUNT = 1024;
+
+ private static byte[] VALUE = new byte[1024];
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.createTable(TABLE_NAME, CF);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws Exception {
+ try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) {
+ mutator.mutate(IntStream.range(0, COUNT / 2)
+ .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+ .collect(Collectors.toList()));
+ mutator.flush();
+ mutator.mutate(IntStream.range(COUNT / 2, COUNT)
+ .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
+ .collect(Collectors.toList()));
+ mutator.close();
+ verifyData();
+ }
+ }
+
+ private void verifyData() throws IOException {
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ for (int i = 0; i < COUNT; i++) {
+ Result r = table.get(new Get(Bytes.toBytes(i)));
+ assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ));
+ }
+ }
+ }
+}