Skip to content

Commit

Permalink
Use auto-commit flag in Hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Nov 17, 2021
1 parent d945f97 commit 536a63e
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ public boolean isSingleStatementWritesOnly()
}

@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
checkConnectorSupports(READ_UNCOMMITTED, isolationLevel);
ConnectorTransactionHandle transaction = new HiveTransactionHandle();
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
transactionManager.put(transaction, metadataFactory.create());
transactionManager.put(transaction, metadataFactory.create(autoCommit));
}
return transaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public class HiveMetadata

private final CatalogName catalogName;
private final SemiTransactionalHiveMetastore metastore;
private final boolean autoCommit;
private final HdfsEnvironment hdfsEnvironment;
private final HivePartitionManager partitionManager;
private final TypeManager typeManager;
Expand All @@ -353,6 +354,7 @@ public class HiveMetadata
public HiveMetadata(
CatalogName catalogName,
SemiTransactionalHiveMetastore metastore,
boolean autoCommit,
HdfsEnvironment hdfsEnvironment,
HivePartitionManager partitionManager,
boolean writesToNonManagedTablesEnabled,
Expand All @@ -371,6 +373,7 @@ public HiveMetadata(
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.autoCommit = autoCommit;
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand Down Expand Up @@ -1698,7 +1701,7 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
}
// This check is required to prevent using partition overwrite operation during user managed transactions
// Partition overwrite operation is nonatomic thus can't and shouldn't be used in non autocommit context.
if (!session.isAutoCommitContext()) {
if (!autoCommit) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in non auto commit context doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public HiveMetadataFactory(
}

@Override
public TransactionalMetadata create()
public TransactionalMetadata create(boolean autoCommit)
{
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(
memoizeMetastore(metastore, perTransactionCacheMaximumSize)); // per-transaction cache
Expand All @@ -192,6 +192,7 @@ public TransactionalMetadata create()
return new HiveMetadata(
catalogName,
metastore,
autoCommit,
hdfsEnvironment,
partitionManager,
writesToNonManagedTablesEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@

public interface TransactionalMetadataFactory
{
TransactionalMetadata create();
TransactionalMetadata create(boolean autoCommit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void createEmptyPartition(ConnectorSession session, ConnectorAccessContro

private void doCreateEmptyPartition(ConnectorSession session, ConnectorAccessControl accessControl, String schemaName, String tableName, List<String> partitionColumnNames, List<String> partitionValues)
{
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create();
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(true);
HiveTableHandle tableHandle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schemaName, tableName));
if (tableHandle == null) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schemaName, tableName)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon

private void doDropStats(ConnectorSession session, ConnectorAccessControl accessControl, String schema, String table, List<?> partitionValues)
{
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create();
TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(true);
HiveTableHandle handle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schema, table));
if (handle == null) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schema, table)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private void doRegisterPartition(ConnectorSession session, ConnectorAccessContro
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Partition location does not exist: " + partitionLocation);
}

SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create().getMetastore();
SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(true).getMetastore();

metastore.addPartition(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void doSyncPartitionMetadata(ConnectorSession session, ConnectorAccessCo
SyncMode syncMode = toSyncMode(mode);
HdfsContext hdfsContext = new HdfsContext(session);
HiveIdentity identity = new HiveIdentity(session);
SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create().getMetastore();
SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(true).getMetastore();
SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName);

Table table = metastore.getTable(identity, schemaName, tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void doUnregisterPartition(ConnectorSession session, ConnectorAccessCont
Partition partition = metastore.getPartition(new HiveIdentity(session), schemaName, tableName, partitionValues)
.orElseThrow(() -> new TrinoException(NOT_FOUND, format("Partition '%s' does not exist", partitionName)));

SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create().getMetastore();
SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(true).getMetastore();

metastore.dropPartition(
session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ protected ConnectorSession newSession(Map<String, Object> propertyValues)

protected Transaction newTransaction()
{
return new HiveTransaction(transactionManager, (HiveMetadata) metadataFactory.create());
return new HiveTransaction(transactionManager, (HiveMetadata) metadataFactory.create(false));
}

protected interface Transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ protected ConnectorSession newSession()

protected Transaction newTransaction()
{
return new HiveTransaction(transactionManager, (HiveMetadata) metadataFactory.create());
return new HiveTransaction(transactionManager, (HiveMetadata) metadataFactory.create(false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private static void assertCreateConnector(String metastoreUri)
.build();

Connector connector = new HiveConnectorFactory("hive").create("hive-test", config, new TestingConnectorContext());
ConnectorTransactionHandle transaction = connector.beginTransaction(READ_UNCOMMITTED, true);
ConnectorTransactionHandle transaction = connector.beginTransaction(READ_UNCOMMITTED, true, true);
assertInstanceOf(connector.getMetadata(transaction), ClassLoaderSafeConnectorMetadata.class);
assertInstanceOf(connector.getSplitManager(), ClassLoaderSafeConnectorSplitManager.class);
assertInstanceOf(connector.getPageSourceProvider(), ConnectorPageSourceProvider.class);
Expand Down

0 comments on commit 536a63e

Please sign in to comment.