From 69ebbe3777c8723d80062dc66e95eab766b94db6 Mon Sep 17 00:00:00 2001 From: zhangminglei Date: Fri, 21 Apr 2023 14:16:37 +0800 Subject: [PATCH] [#71][Tested]Create hive metastore client from the shuffled URIs. --- ...taticTokenAwareMetastoreClientFactory.java | 35 +++++++++++++++- ...taticTokenAwareMetastoreClientFactory.java | 42 +++++++++++++------ 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/StaticTokenAwareMetastoreClientFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/StaticTokenAwareMetastoreClientFactory.java index dda309091ef5..8cc2aea2dca6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/StaticTokenAwareMetastoreClientFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/StaticTokenAwareMetastoreClientFactory.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; import com.google.common.net.HostAndPort; import io.airlift.units.Duration; import io.trino.plugin.hive.metastore.thrift.FailureAwareThriftMetastoreClient.Callback; @@ -29,6 +30,9 @@ import java.util.List; import java.util.Optional; import java.util.OptionalLong; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; @@ -104,7 +108,7 @@ public ThriftMetastoreClient createMetastoreClient(Optional delegationTo .collect(toImmutableList()); TException lastException = null; - for (Backoff backoff : backoffsSorted) { + for (Backoff backoff : groupShuffle(backoffsSorted, (left, right) -> comparator.compare(left, right) == 0)) { try { return getClient(backoff.getAddress(), backoff, delegationToken); } @@ -117,6 +121,35 @@ public ThriftMetastoreClient createMetastoreClient(Optional delegationTo throw new TException("Failed connecting to Hive metastore: " + addresses, lastException); } + static List groupShuffle(List backoffs, BiFunction groupFunc) + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + Backoff[] elements = backoffs.toArray(new Backoff[0]); + int groupFirst = 0; + for (int i = 1; i < elements.length; i++) { + if (!groupFunc.apply(elements[i - 1], elements[i])) { + // shuffle in group + shuffle(elements, groupFirst, i - groupFirst, random); + // begin a new group + groupFirst = i; + } + } + if (elements.length - groupFirst > 1) { + shuffle(elements, groupFirst, elements.length - groupFirst, random); + } + return ImmutableList.copyOf(elements); + } + + static void shuffle(Backoff[] elements, int start, int length, Random random) + { + for (int i = length; i > 1; i--) { + int pos = start + random.nextInt(i); + Backoff tmp = elements[pos]; + elements[pos] = elements[start + i - 1]; + elements[start + i - 1] = tmp; + } + } + private ThriftMetastoreClient getClient(HostAndPort address, Backoff backoff, Optional delegationToken) throws TException { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestStaticTokenAwareMetastoreClientFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestStaticTokenAwareMetastoreClientFactory.java index 7e211bee0695..48c65b791cba 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestStaticTokenAwareMetastoreClientFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestStaticTokenAwareMetastoreClientFactory.java @@ -24,10 +24,13 @@ import java.net.SocketTimeoutException; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestStaticTokenAwareMetastoreClientFactory { @@ -103,16 +106,17 @@ public void testMetastoreFailedWithoutFallbackWithHiveUser() public void testFallbackHiveMetastoreOnTimeOut() throws TException { + Set clients = CLIENTS.values().stream().map(Optional::get).collect(Collectors.toSet()); + TokenAwareMetastoreClientFactory clientFactory = createMetastoreClientFactory(CONFIG_WITH_FALLBACK, CLIENTS); ThriftMetastoreClient metastoreClient1 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient1, DEFAULT_CLIENT); + assertContainsHiveClient(clients, metastoreClient1); assertGetTableException(metastoreClient1); ThriftMetastoreClient metastoreClient2 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient2, FALLBACK_CLIENT); - + assertContainsHiveClient(clients, metastoreClient2); assertGetTableException(metastoreClient2); } @@ -120,67 +124,73 @@ public void testFallbackHiveMetastoreOnTimeOut() public void testFallbackHiveMetastoreOnAllTimeOut() throws TException { + Set clients = CLIENTS.values().stream().map(Optional::get).collect(Collectors.toSet()); TokenAwareMetastoreClientFactory clientFactory = createMetastoreClientFactory(CONFIG_WITH_FALLBACK, CLIENTS); - ThriftMetastoreClient metastoreClient1 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient1, DEFAULT_CLIENT); + assertContainsHiveClient(clients, metastoreClient1); for (int i = 0; i < 20; ++i) { assertGetTableException(metastoreClient1); } + clients.remove(((FailureAwareThriftMetastoreClient) metastoreClient1).getDelegate()); + ThriftMetastoreClient metastoreClient2 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient2, FALLBACK_CLIENT); + assertContainsHiveClient(clients, metastoreClient2); assertGetTableException(metastoreClient2); // Still get FALLBACK_CLIENT because DEFAULT_CLIENT failed more times before and therefore longer backoff ThriftMetastoreClient metastoreClient3 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient3, FALLBACK_CLIENT); + assertContainsHiveClient(clients, metastoreClient3); } @Test public void testStickToFallbackAfterBackoff() throws TException { + Set clients = CLIENTS.values().stream().map(Optional::get).collect(Collectors.toSet()); + TestingTicker ticker = new TestingTicker(); TokenAwareMetastoreClientFactory clientFactory = createMetastoreClientFactory(CONFIG_WITH_FALLBACK, CLIENTS, ticker); ticker.increment(10, NANOSECONDS); ThriftMetastoreClient metastoreClient1 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient1, DEFAULT_CLIENT); + assertContainsHiveClient(clients, metastoreClient1); assertGetTableException(metastoreClient1); ticker.increment(10, NANOSECONDS); ThriftMetastoreClient metastoreClient2 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient2, FALLBACK_CLIENT); + assertContainsHiveClient(clients, metastoreClient2); // even after backoff for DEFAULT_CLIENT passes we should stick to client which we saw working correctly most recently ticker.increment(StaticTokenAwareMetastoreClientFactory.Backoff.MAX_BACKOFF, NANOSECONDS); ThriftMetastoreClient metastoreClient3 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient3, FALLBACK_CLIENT); + assertContainsHiveClient(clients, metastoreClient3); } @Test public void testReturnsToDefaultClientAfterErrorOnFallback() throws TException { + Set clients = CLIENTS.values().stream().map(Optional::get).collect(Collectors.toSet()); + TestingTicker ticker = new TestingTicker(); TokenAwareMetastoreClientFactory clientFactory = createMetastoreClientFactory(CONFIG_WITH_FALLBACK, CLIENTS, ticker); ticker.increment(10, NANOSECONDS); ThriftMetastoreClient metastoreClient1 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient1, DEFAULT_CLIENT); + assertContainsHiveClient(clients, metastoreClient1); assertGetTableException(metastoreClient1); ticker.increment(10, NANOSECONDS); ThriftMetastoreClient metastoreClient2 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient2, FALLBACK_CLIENT); + assertContainsHiveClient(clients, metastoreClient2); assertGetTableException(metastoreClient2); ticker.increment(10, NANOSECONDS); ThriftMetastoreClient metastoreClient3 = clientFactory.createMetastoreClient(Optional.empty()); - assertEqualHiveClient(metastoreClient3, DEFAULT_CLIENT); + assertContainsHiveClient(clients, metastoreClient3); } private static void assertGetTableException(ThriftMetastoreClient client) @@ -230,4 +240,10 @@ private static void assertEqualHiveClient(ThriftMetastoreClient actual, ThriftMe } assertEquals(actual, expected); } + + private void assertContainsHiveClient(Set clients, ThriftMetastoreClient expected) + { + ThriftMetastoreClient actual = ((FailureAwareThriftMetastoreClient) expected).getDelegate(); + assertTrue(clients.contains(actual)); + } }