Skip to content

Commit

Permalink
Prevent ThriftMetastore lock release failure from corrupting Iceberg …
Browse files Browse the repository at this point in the history
…table

Before the change, failure to release the metastore lock resulted in
newly committed metadata file to be deleted, rendering the table
unreadable.
  • Loading branch information
osscm authored and findepi committed Oct 10, 2022
1 parent 96a009e commit 2c68ff2
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg.catalog.hms;

import io.airlift.log.Logger;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.MetastoreUtil;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
Expand Down Expand Up @@ -41,6 +42,7 @@
public class HiveMetastoreTableOperations
extends AbstractMetastoreTableOperations
{
private static final Logger log = Logger.get(HiveMetastoreTableOperations.class);
private final ThriftMetastore thriftMetastore;

public HiveMetastoreTableOperations(
Expand Down Expand Up @@ -97,7 +99,16 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
}
}
finally {
thriftMetastore.releaseTableLock(lockId);
try {
thriftMetastore.releaseTableLock(lockId);
}
catch (RuntimeException e) {
// Release lock step has failed. Not throwing this exception, after commit has already succeeded.
// So, that underlying iceberg API will not do the metadata cleanup, otherwise table will be in unusable state.
// If configured and supported, the unreleased lock will be automatically released by the metastore after not hearing a heartbeat for a while,
// or otherwise it might need to be manually deleted from the metastore backend storage.
log.error(e, "Failed to release lock %s when committing to table %s", lockId, tableName);
}
}

shouldRefresh = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.hms;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.InMemoryThriftMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreFactory;
import io.trino.plugin.iceberg.TestingIcebergConnectorFactory;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.security.PrincipalType;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.LocalQueryRunner;
import org.testng.annotations.Test;

import java.io.File;
import java.nio.file.Files;
import java.util.Optional;

import static com.google.inject.util.Modules.EMPTY_MODULE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

public class TestIcebergHiveMetastoreTableOperationsReleaseLockFailure
extends AbstractTestQueryFramework
{
private static final String ICEBERG_CATALOG = "iceberg";
private static final String SCHEMA_NAME = "test_schema";
private File baseDir;

@Override
protected LocalQueryRunner createQueryRunner() throws Exception
{
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema(SCHEMA_NAME)
.build();

baseDir = Files.createTempDirectory(null).toFile();
baseDir.deleteOnExit();

LocalQueryRunner queryRunner = LocalQueryRunner.create(session);

ThriftMetastore thriftMetastore = createMetastoreWithReleaseLockFailure();
HiveMetastore hiveMetastore = new BridgingHiveMetastore(thriftMetastore);
TestingIcebergHiveMetastoreCatalogModule testModule = new TestingIcebergHiveMetastoreCatalogModule(hiveMetastore, buildThriftMetastoreFactory(thriftMetastore));

queryRunner.createCatalog(
ICEBERG_CATALOG,
new TestingIcebergConnectorFactory(Optional.of(testModule), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

Database database = Database.builder()
.setDatabaseName(SCHEMA_NAME)
.setOwnerName(Optional.of("public"))
.setOwnerType(Optional.of(PrincipalType.ROLE))
.build();
hiveMetastore.createDatabase(database);

return queryRunner;
}

@Test
public void testReleaseLockFailureDoesNotCorruptTheTable()
{
String tableName = "test_release_lock_failure";
query(format("CREATE TABLE %s (a_varchar) AS VALUES ('Trino')", tableName));
query(format("INSERT INTO %s VALUES 'rocks'", tableName));
assertQuery("SELECT * FROM " + tableName, "VALUES 'Trino', 'rocks'");
}

private InMemoryThriftMetastore createMetastoreWithReleaseLockFailure()
{
return new InMemoryThriftMetastore(new File(baseDir + "/metastore"), new ThriftMetastoreConfig()) {
@Override
public long acquireTableExclusiveLock(AcidTransactionOwner transactionOwner, String queryId, String dbName, String tableName)
{
// returning dummy lock
return 100;
}

@Override
public void releaseTableLock(long lockId)
{
throw new RuntimeException("Release table lock has failed!");
}

@Override
public synchronized void createTable(org.apache.hadoop.hive.metastore.api.Table table)
{
// InMemoryThriftMetastore throws an exception if the table has any privileges set
table.setPrivileges(null);
super.createTable(table);
}
};
}

private static ThriftMetastoreFactory buildThriftMetastoreFactory(ThriftMetastore thriftMetastore)
{
return new ThriftMetastoreFactory()
{
@Override
public boolean isImpersonationEnabled()
{
return false;
}

@Override
public ThriftMetastore createMetastore(Optional<ConnectorIdentity> identity)
{
return thriftMetastore;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.hms;

import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.metastore.DecoratedHiveMetastoreModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreFactory;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;

import static java.util.Objects.requireNonNull;

public class TestingIcebergHiveMetastoreCatalogModule
extends AbstractConfigurationAwareModule
{
private final HiveMetastore hiveMetastore;
private final ThriftMetastoreFactory thriftMetastoreFactory;

public TestingIcebergHiveMetastoreCatalogModule(HiveMetastore hiveMetastore, ThriftMetastoreFactory thriftMetastoreFactory)
{
this.hiveMetastore = requireNonNull(hiveMetastore, "hiveMetastore is null");
this.thriftMetastoreFactory = requireNonNull(thriftMetastoreFactory, "thriftMetastoreFactory is null");
}

@Override
protected void setup(Binder binder)
{
install(new DecoratedHiveMetastoreModule());
binder.bind(ThriftMetastoreFactory.class).toInstance(this.thriftMetastoreFactory);
binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(this.hiveMetastore));
binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
}
}

0 comments on commit 2c68ff2

Please sign in to comment.