From 8b015fd682fb89f93bcef9608328768d1f4ed95a Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Sat, 5 Feb 2022 15:52:26 -0800 Subject: [PATCH] Merge session unprocessed catalog properties into normal properties --- .../src/main/java/io/trino/Session.java | 59 ++++++------------- .../java/io/trino/SessionRepresentation.java | 15 ----- .../java/io/trino/event/QueryMonitor.java | 7 --- .../metadata/SessionPropertyManager.java | 2 +- .../io/trino/testing/LocalQueryRunner.java | 3 +- .../src/test/java/io/trino/TestSession.java | 30 +--------- .../execution/TestQueryStateMachine.java | 2 +- .../server/TestSessionPropertyDefaults.java | 4 +- .../plugin/hive/BaseHiveConnectorTest.java | 5 +- .../testing/AbstractTestingTrinoClient.java | 7 +-- .../trino/testing/TestingSessionContext.java | 2 +- .../tests/AbstractTestEngineOnlyQueries.java | 1 - 12 files changed, 30 insertions(+), 107 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/Session.java b/core/trino-main/src/main/java/io/trino/Session.java index 08f01151bf54..f1ecab9ff677 100644 --- a/core/trino-main/src/main/java/io/trino/Session.java +++ b/core/trino-main/src/main/java/io/trino/Session.java @@ -77,9 +77,7 @@ public final class Session private final Instant start; private final Map systemProperties; // TODO use Table - private final Map> connectorProperties; - // TODO use Table - private final Map> unprocessedCatalogProperties; + private final Map> catalogProperties; private final SessionPropertyManager sessionPropertyManager; private final Map preparedStatements; private final ProtocolHeaders protocolHeaders; @@ -104,8 +102,7 @@ public Session( ResourceEstimates resourceEstimates, Instant start, Map systemProperties, - Map> connectorProperties, - Map> unprocessedCatalogProperties, + Map> catalogProperties, SessionPropertyManager sessionPropertyManager, Map preparedStatements, ProtocolHeaders protocolHeaders) @@ -133,19 +130,12 @@ public Session( this.preparedStatements = requireNonNull(preparedStatements, "preparedStatements is null"); this.protocolHeaders = requireNonNull(protocolHeaders, "protocolHeaders is null"); + requireNonNull(catalogProperties, "catalogProperties is null"); ImmutableMap.Builder> catalogPropertiesBuilder = ImmutableMap.builder(); - connectorProperties.entrySet().stream() + catalogProperties.entrySet().stream() .map(entry -> Maps.immutableEntry(entry.getKey(), ImmutableMap.copyOf(entry.getValue()))) .forEach(catalogPropertiesBuilder::put); - this.connectorProperties = catalogPropertiesBuilder.buildOrThrow(); - - ImmutableMap.Builder> unprocessedCatalogPropertiesBuilder = ImmutableMap.builder(); - unprocessedCatalogProperties.entrySet().stream() - .map(entry -> Maps.immutableEntry(entry.getKey(), ImmutableMap.copyOf(entry.getValue()))) - .forEach(unprocessedCatalogPropertiesBuilder::put); - this.unprocessedCatalogProperties = unprocessedCatalogPropertiesBuilder.buildOrThrow(); - - checkArgument(transactionId.isEmpty() || unprocessedCatalogProperties.isEmpty(), "Catalog session properties cannot be set if there is an open transaction"); + this.catalogProperties = catalogPropertiesBuilder.buildOrThrow(); checkArgument(catalog.isPresent() || schema.isEmpty(), "schema is set but catalog is not"); } @@ -256,19 +246,14 @@ public T getSystemProperty(String name, Class type) return sessionPropertyManager.decodeSystemPropertyValue(name, systemProperties.get(name), type); } - public Map> getConnectorProperties() + public Map> getCatalogProperties() { - return connectorProperties; + return catalogProperties; } - public Map getConnectorProperties(String catalogName) + public Map getCatalogProperties(String catalogName) { - return connectorProperties.getOrDefault(catalogName, ImmutableMap.of()); - } - - public Map> getUnprocessedCatalogProperties() - { - return unprocessedCatalogProperties; + return catalogProperties.getOrDefault(catalogName, ImmutableMap.of()); } public Map getSystemProperties() @@ -309,7 +294,7 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage // Now that there is a transaction, the catalog name can be resolved to a connector, and the catalog properties can be validated ImmutableMap.Builder> connectorProperties = ImmutableMap.builder(); - for (Entry> catalogEntry : unprocessedCatalogProperties.entrySet()) { + for (Entry> catalogEntry : this.catalogProperties.entrySet()) { String catalogName = catalogEntry.getKey(); Map catalogProperties = catalogEntry.getValue(); if (catalogProperties.isEmpty()) { @@ -359,7 +344,6 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage start, systemProperties, connectorProperties.buildOrThrow(), - ImmutableMap.of(), sessionPropertyManager, preparedStatements, protocolHeaders); @@ -371,22 +355,18 @@ public Session withDefaultProperties(Map systemPropertyDefaults, requireNonNull(catalogPropertyDefaults, "catalogPropertyDefaults is null"); checkState(transactionId.isEmpty(), "property defaults can not be added to a transaction already in progress"); - checkState(connectorProperties.isEmpty(), "catalog properties have already been processed"); // NOTE: properties should not be validated here and instead will be validated in beginTransactionId Map systemProperties = new HashMap<>(); systemProperties.putAll(systemPropertyDefaults); systemProperties.putAll(this.systemProperties); - Map> unprocessedCatalogProperties = catalogPropertyDefaults.entrySet().stream() + Map> catalogProperties = catalogPropertyDefaults.entrySet().stream() .map(entry -> Maps.immutableEntry(entry.getKey(), new HashMap<>(entry.getValue()))) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - for (Entry> catalogProperties : this.unprocessedCatalogProperties.entrySet()) { - String catalog = catalogProperties.getKey(); - for (Entry entry : catalogProperties.getValue().entrySet()) { - unprocessedCatalogProperties.computeIfAbsent(catalog, id -> new HashMap<>()) - .put(entry.getKey(), entry.getValue()); - } + for (Entry> catalogEntry : this.catalogProperties.entrySet()) { + catalogProperties.computeIfAbsent(catalogEntry.getKey(), id -> new HashMap<>()) + .putAll(catalogEntry.getValue()); } return new Session( @@ -409,8 +389,7 @@ public Session withDefaultProperties(Map systemPropertyDefaults, resourceEstimates, start, systemProperties, - ImmutableMap.of(), - unprocessedCatalogProperties, + catalogProperties, sessionPropertyManager, preparedStatements, protocolHeaders); @@ -433,7 +412,7 @@ public ConnectorSession toConnectorSession(CatalogName catalogName) return new FullConnectorSession( this, identity.toConnectorIdentity(catalogName.getCatalogName()), - connectorProperties.getOrDefault(catalogName.getCatalogName(), ImmutableMap.of()), + catalogProperties.getOrDefault(catalogName.getCatalogName(), ImmutableMap.of()), catalogName, catalogName.getCatalogName(), sessionPropertyManager); @@ -464,8 +443,7 @@ public SessionRepresentation toSessionRepresentation() resourceEstimates, start, systemProperties, - connectorProperties, - unprocessedCatalogProperties, + catalogProperties, identity.getCatalogRoles(), preparedStatements, protocolHeaders.getProtocolName()); @@ -590,7 +568,7 @@ private SessionBuilder(Session session) this.clientTags = ImmutableSet.copyOf(session.clientTags); this.start = session.start; this.systemProperties.putAll(session.systemProperties); - session.unprocessedCatalogProperties + session.catalogProperties .forEach((catalog, properties) -> catalogSessionProperties.put(catalog, new HashMap<>(properties))); this.preparedStatements.putAll(session.preparedStatements); this.protocolHeaders = session.protocolHeaders; @@ -819,7 +797,6 @@ public Session build() Optional.ofNullable(resourceEstimates).orElse(new ResourceEstimateBuilder().build()), start, systemProperties, - ImmutableMap.of(), catalogSessionProperties, sessionPropertyManager, preparedStatements, diff --git a/core/trino-main/src/main/java/io/trino/SessionRepresentation.java b/core/trino-main/src/main/java/io/trino/SessionRepresentation.java index 5a8705b57585..9f46bb7e38ed 100644 --- a/core/trino-main/src/main/java/io/trino/SessionRepresentation.java +++ b/core/trino-main/src/main/java/io/trino/SessionRepresentation.java @@ -63,7 +63,6 @@ public final class SessionRepresentation private final ResourceEstimates resourceEstimates; private final Map systemProperties; private final Map> catalogProperties; - private final Map> unprocessedCatalogProperties; private final Map catalogRoles; private final Map preparedStatements; private final String protocolName; @@ -93,7 +92,6 @@ public SessionRepresentation( @JsonProperty("start") Instant start, @JsonProperty("systemProperties") Map systemProperties, @JsonProperty("catalogProperties") Map> catalogProperties, - @JsonProperty("unprocessedCatalogProperties") Map> unprocessedCatalogProperties, @JsonProperty("catalogRoles") Map catalogRoles, @JsonProperty("preparedStatements") Map preparedStatements, @JsonProperty("protocolName") String protocolName) @@ -129,12 +127,6 @@ public SessionRepresentation( catalogPropertiesBuilder.put(entry.getKey(), ImmutableMap.copyOf(entry.getValue())); } this.catalogProperties = catalogPropertiesBuilder.buildOrThrow(); - - ImmutableMap.Builder> unprocessedCatalogPropertiesBuilder = ImmutableMap.builder(); - for (Entry> entry : unprocessedCatalogProperties.entrySet()) { - unprocessedCatalogPropertiesBuilder.put(entry.getKey(), ImmutableMap.copyOf(entry.getValue())); - } - this.unprocessedCatalogProperties = unprocessedCatalogPropertiesBuilder.buildOrThrow(); } @JsonProperty @@ -275,12 +267,6 @@ public Map> getCatalogProperties() return catalogProperties; } - @JsonProperty - public Map> getUnprocessedCatalogProperties() - { - return unprocessedCatalogProperties; - } - @JsonProperty public Map getCatalogRoles() { @@ -349,7 +335,6 @@ public Session toSession(SessionPropertyManager sessionPropertyManager, Map mergeSessionAndCatalogProperties(SessionRepre { Map mergedProperties = new LinkedHashMap<>(session.getSystemProperties()); - // Either processed or unprocessed catalog properties, but not both. Instead of trying to enforces this while - // firing events, allow both to be set and if there is a duplicate favor the processed properties. - for (Map.Entry> catalogEntry : session.getUnprocessedCatalogProperties().entrySet()) { - for (Map.Entry entry : catalogEntry.getValue().entrySet()) { - mergedProperties.put(catalogEntry.getKey() + "." + entry.getKey(), entry.getValue()); - } - } for (Map.Entry> catalogEntry : session.getCatalogProperties().entrySet()) { for (Map.Entry entry : catalogEntry.getValue().entrySet()) { mergedProperties.put(catalogEntry.getKey() + "." + entry.getKey(), entry.getValue()); diff --git a/core/trino-main/src/main/java/io/trino/metadata/SessionPropertyManager.java b/core/trino-main/src/main/java/io/trino/metadata/SessionPropertyManager.java index 5afb42b91425..63811e3c6a10 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/SessionPropertyManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/SessionPropertyManager.java @@ -164,7 +164,7 @@ public List getAllSessionProperties(Session session, Map entry : new TreeMap<>(catalogs).entrySet()) { String catalog = entry.getKey(); CatalogName catalogName = entry.getValue(); - Map connectorProperties = session.getConnectorProperties(catalog); + Map connectorProperties = session.getCatalogProperties(catalog); for (PropertyMetadata property : new TreeMap<>(connectorSessionProperties.get(catalogName)).values()) { String defaultValue = firstNonNull(property.getDefaultValue(), "").toString(); diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index 9c3d5faa16f7..5e04d00fe6c5 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -475,8 +475,7 @@ private LocalQueryRunner( defaultSession.getResourceEstimates(), defaultSession.getStart(), defaultSession.getSystemProperties(), - defaultSession.getConnectorProperties(), - defaultSession.getUnprocessedCatalogProperties(), + defaultSession.getCatalogProperties(), sessionPropertyManager, defaultSession.getPreparedStatements(), defaultSession.getProtocolHeaders()); diff --git a/core/trino-main/src/test/java/io/trino/TestSession.java b/core/trino-main/src/test/java/io/trino/TestSession.java index 4b9f65fdab1b..620a8730439f 100644 --- a/core/trino-main/src/test/java/io/trino/TestSession.java +++ b/core/trino-main/src/test/java/io/trino/TestSession.java @@ -29,16 +29,8 @@ public void testSetCatalogProperty() .setCatalogSessionProperty("some_catalog", "first_property", "some_value") .build(); - assertThat(session.getUnprocessedCatalogProperties()) + assertThat(session.getCatalogProperties()) .isEqualTo(Map.of("some_catalog", Map.of("first_property", "some_value"))); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties()) - .isEqualTo(Map.of()); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties("some_catalog")) - .isEqualTo(Map.of()); } @Test @@ -50,16 +42,8 @@ public void testBuildWithCatalogProperty() session = Session.builder(session) .build(); - assertThat(session.getUnprocessedCatalogProperties()) + assertThat(session.getCatalogProperties()) .isEqualTo(Map.of("some_catalog", Map.of("first_property", "some_value"))); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties()) - .isEqualTo(Map.of()); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties("some_catalog")) - .isEqualTo(Map.of()); } @Test @@ -72,17 +56,9 @@ public void testAddSecondCatalogProperty() .setCatalogSessionProperty("some_catalog", "second_property", "another_value") .build(); - assertThat(session.getUnprocessedCatalogProperties()) + assertThat(session.getCatalogProperties()) .isEqualTo(Map.of("some_catalog", Map.of( "first_property", "some_value", "second_property", "another_value"))); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties()) - .isEqualTo(Map.of()); - - // empty, will be populated at transaction start - assertThat(session.getConnectorProperties("some_catalog")) - .isEqualTo(Map.of()); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java index 801b13b6298b..07d4659de248 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java @@ -555,6 +555,6 @@ private static void assertEqualSessionsWithoutTransactionId(Session actual, Sess assertEquals(actual.getUserAgent(), expected.getUserAgent()); assertEquals(actual.getStart(), expected.getStart()); assertEquals(actual.getSystemProperties(), expected.getSystemProperties()); - assertEquals(actual.getConnectorProperties(), expected.getConnectorProperties()); + assertEquals(actual.getCatalogProperties(), expected.getCatalogProperties()); } } diff --git a/core/trino-main/src/test/java/io/trino/server/TestSessionPropertyDefaults.java b/core/trino-main/src/test/java/io/trino/server/TestSessionPropertyDefaults.java index 157e3ababc55..9854cc741a2e 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestSessionPropertyDefaults.java +++ b/core/trino-main/src/test/java/io/trino/server/TestSessionPropertyDefaults.java @@ -80,7 +80,7 @@ public void testApplyDefaultProperties() .put(HASH_PARTITION_COUNT, "43") .buildOrThrow()); assertEquals( - session.getUnprocessedCatalogProperties(), + session.getCatalogProperties(), ImmutableMap.of( "testCatalog", ImmutableMap.builder() @@ -96,7 +96,7 @@ public void testApplyDefaultProperties() .put(QUERY_MAX_TOTAL_MEMORY, "2GB") // Default value is used .buildOrThrow()); assertEquals( - session.getUnprocessedCatalogProperties(), + session.getCatalogProperties(), ImmutableMap.of( "testCatalog", ImmutableMap.builder() diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 3f865323b161..1bfb5ee39071 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -8429,15 +8429,14 @@ private static void testWithStorageFormat(TestingHiveStorageFormat storageFormat test.accept(session, storageFormat.getFormat()); } catch (Exception | AssertionError e) { - fail(format("Failure for format %s with properties %s / %s", storageFormat.getFormat(), session.getConnectorProperties(), session.getUnprocessedCatalogProperties()), e); + fail(format("Failure for format %s with properties %s", storageFormat.getFormat(), session.getCatalogProperties()), e); } } private boolean isNativeParquetWriter(Session session, HiveStorageFormat storageFormat) { return storageFormat == HiveStorageFormat.PARQUET && - ("true".equals(session.getConnectorProperties("hive").get("experimental_parquet_optimized_writer_enabled")) || - "true".equals(session.getUnprocessedCatalogProperties().getOrDefault("hive", Map.of()).get("experimental_parquet_optimized_writer_enabled"))); + "true".equals(session.getCatalogProperties("hive").get("experimental_parquet_optimized_writer_enabled")); } private List getAllTestingHiveStorageFormat() diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java index b158ffe037a9..a1765283245b 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestingTrinoClient.java @@ -133,17 +133,12 @@ private static ClientSession toClientSession(Session session, URI server, Durati { ImmutableMap.Builder properties = ImmutableMap.builder(); properties.putAll(session.getSystemProperties()); - for (Entry> catalogAndConnectorProperties : session.getConnectorProperties().entrySet()) { + for (Entry> catalogAndConnectorProperties : session.getCatalogProperties().entrySet()) { for (Entry connectorProperties : catalogAndConnectorProperties.getValue().entrySet()) { String catalogName = catalogAndConnectorProperties.getKey(); properties.put(catalogName + "." + connectorProperties.getKey(), connectorProperties.getValue()); } } - for (Entry> connectorProperties : session.getUnprocessedCatalogProperties().entrySet()) { - for (Entry entry : connectorProperties.getValue().entrySet()) { - properties.put(connectorProperties.getKey() + "." + entry.getKey(), entry.getValue()); - } - } ImmutableMap.Builder resourceEstimates = ImmutableMap.builder(); ResourceEstimates estimates = session.getResourceEstimates(); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/TestingSessionContext.java b/testing/trino-testing/src/main/java/io/trino/testing/TestingSessionContext.java index 74de9d66b93a..71df76db4051 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/TestingSessionContext.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/TestingSessionContext.java @@ -61,7 +61,7 @@ else if (enabledRoles.size() == 1) { session.getClientCapabilities(), session.getResourceEstimates(), session.getSystemProperties(), - session.getConnectorProperties(), + session.getCatalogProperties(), session.getPreparedStatements(), session.getTransactionId(), session.isClientTransactionSupport(), diff --git a/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java index 3578ba09bb46..2c9e29019d54 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/AbstractTestEngineOnlyQueries.java @@ -5286,7 +5286,6 @@ public void testShowSession() .put("test_string", "foo string") .put("test_long", "424242") .buildOrThrow(), - ImmutableMap.of(), ImmutableMap.of(TESTING_CATALOG, ImmutableMap.builder() .put("connector_string", "bar string") .put("connector_long", "11")