Skip to content

Commit

Permalink
[#5204] feat(iceberg): support more table event for Iceberg REST serv…
Browse files Browse the repository at this point in the history
…er (#5156)

### What changes were proposed in this pull request?

support more table event for Iceberg REST server

### Why are the changes needed?

Fix: #5204 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests
  • Loading branch information
FANNG1 authored Oct 22, 2024
1 parent ab3e7cd commit 6a9a77b
Show file tree
Hide file tree
Showing 26 changed files with 1,024 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public IcebergCatalogWrapper getCatalogWrapper(String catalogName) {
return catalogWrapper;
}

public CredentialProvider getCredentialProvider(String prefix) {
String catalogName = IcebergRestUtils.getCatalogName(prefix);
public CredentialProvider getCredentialProvider(String catalogName) {
return credentialProviderManager.getCredentialProvider(catalogName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.responses.ErrorResponse;

Expand Down Expand Up @@ -114,6 +115,13 @@ public static <T> T cloneIcebergRESTObject(Object message, Class<T> className) {
}
}

public static NameIdentifier getGravitinoNameIdentifier(
String metalakeName, String catalogName, Namespace namespace) {
Stream<String> catalogNS =
Stream.concat(Stream.of(metalakeName, catalogName), Arrays.stream(namespace.levels()));
return NameIdentifier.of(catalogNS.toArray(String[]::new));
}

// remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to
// 'iceberg_catalog'
private static String normalizePrefix(String rawPrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,31 @@
import org.apache.gravitino.listener.api.event.IcebergCreateTableEvent;
import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergDropTableEvent;
import org.apache.gravitino.listener.api.event.IcebergDropTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergDropTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergListTableEvent;
import org.apache.gravitino.listener.api.event.IcebergListTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergListTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergTableExistsEvent;
import org.apache.gravitino.listener.api.event.IcebergTableExistsFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergTableExistsPreEvent;
import org.apache.gravitino.listener.api.event.IcebergUpdateTableEvent;
import org.apache.gravitino.listener.api.event.IcebergUpdateTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergUpdateTablePreEvent;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;

/**
Expand Down Expand Up @@ -67,7 +88,7 @@ public LoadTableResponse createTable(
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergCreateTableFailureEvent(
PrincipalUtils.getCurrentUserName(), nameIdentifier, e));
PrincipalUtils.getCurrentUserName(), nameIdentifier, createTableRequest, e));
throw e;
}
eventBus.dispatchEvent(
Expand All @@ -78,4 +99,137 @@ public LoadTableResponse createTable(
loadTableResponse));
return loadTableResponse;
}

@Override
public LoadTableResponse updateTable(
String catalogName, TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier);
eventBus.dispatchEvent(
new IcebergUpdateTablePreEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, updateTableRequest));
LoadTableResponse loadTableResponse;
try {
loadTableResponse =
icebergTableOperationDispatcher.updateTable(
catalogName, tableIdentifier, updateTableRequest);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergUpdateTableFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, updateTableRequest, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergUpdateTableEvent(
PrincipalUtils.getCurrentUserName(),
gravitinoNameIdentifier,
updateTableRequest,
loadTableResponse));
return loadTableResponse;
}

@Override
public void dropTable(
String catalogName, TableIdentifier tableIdentifier, boolean purgeRequested) {
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier);
eventBus.dispatchEvent(
new IcebergDropTablePreEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, purgeRequested));
try {
icebergTableOperationDispatcher.dropTable(catalogName, tableIdentifier, purgeRequested);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergDropTableFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, purgeRequested, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergDropTableEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, purgeRequested));
}

@Override
public LoadTableResponse loadTable(String catalogName, TableIdentifier tableIdentifier) {
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier);
eventBus.dispatchEvent(
new IcebergLoadTablePreEvent(PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier));
LoadTableResponse loadTableResponse;
try {
loadTableResponse = icebergTableOperationDispatcher.loadTable(catalogName, tableIdentifier);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergLoadTableFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergLoadTableEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, loadTableResponse));
return loadTableResponse;
}

@Override
public ListTablesResponse listTable(String catalogName, Namespace namespace) {
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, namespace);
eventBus.dispatchEvent(
new IcebergListTablePreEvent(PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier));
ListTablesResponse listTablesResponse;
try {
listTablesResponse = icebergTableOperationDispatcher.listTable(catalogName, namespace);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergListTableFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergListTableEvent(PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier));
return listTablesResponse;
}

@Override
public boolean tableExists(String catalogName, TableIdentifier tableIdentifier) {
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier);
eventBus.dispatchEvent(
new IcebergTableExistsPreEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier));
boolean isExists;
try {
isExists = icebergTableOperationDispatcher.tableExists(catalogName, tableIdentifier);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergTableExistsFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergTableExistsEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, isExists));
return isExists;
}

@Override
public void renameTable(String catalogName, RenameTableRequest renameTableRequest) {
TableIdentifier sourceTable = renameTableRequest.source();
NameIdentifier gravitinoNameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, sourceTable);
eventBus.dispatchEvent(
new IcebergRenameTablePreEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, renameTableRequest));
try {
icebergTableOperationDispatcher.renameTable(catalogName, renameTableRequest);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergRenameTableFailureEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, renameTableRequest, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergRenameTableEvent(
PrincipalUtils.getCurrentUserName(), gravitinoNameIdentifier, renameTableRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
package org.apache.gravitino.iceberg.service.dispatcher;

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;

/**
* The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg
* tables.
*/
public interface IcebergTableOperationDispatcher {

/**
* Creates a new Iceberg table.
*
Expand All @@ -38,4 +43,59 @@ public interface IcebergTableOperationDispatcher {
*/
LoadTableResponse createTable(
String catalogName, Namespace namespace, CreateTableRequest createTableRequest);

/**
* Updates an Iceberg table.
*
* @param catalogName The catalog name when updating the table.
* @param tableIdentifier The Iceberg table identifier.
* @param updateTableRequest The request object containing the details for updating the table.
* @return A {@link LoadTableResponse} object containing the result of the operation.
*/
LoadTableResponse updateTable(
String catalogName, TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest);

/**
* Drops an Iceberg table.
*
* @param catalogName The catalog name when dropping the table.
* @param tableIdentifier The Iceberg table identifier.
* @param purgeRequested Whether to purge the table.
*/
void dropTable(String catalogName, TableIdentifier tableIdentifier, boolean purgeRequested);

/**
* Loads an Iceberg table.
*
* @param catalogName The catalog name when dropping the table.
* @param tableIdentifier The Iceberg table identifier.
* @return A {@link LoadTableResponse} object containing the result of the operation.
*/
LoadTableResponse loadTable(String catalogName, TableIdentifier tableIdentifier);

/**
* Lists Iceberg tables.
*
* @param catalogName The catalog name when dropping the table.
* @param namespace The Iceberg namespace.
* @return A {@link ListTablesResponse} object containing the list of table identifiers.
*/
ListTablesResponse listTable(String catalogName, Namespace namespace);

/**
* Check whether an Iceberg table exists.
*
* @param catalogName The catalog name when dropping the table.
* @param tableIdentifier The Iceberg table identifier.
* @return Whether table exists.
*/
boolean tableExists(String catalogName, TableIdentifier tableIdentifier);

/**
* Rename an Iceberg table.
*
* @param catalogName The catalog name when dropping the table.
* @param renameTableRequest Rename table request information.
*/
void renameTable(String catalogName, RenameTableRequest renameTableRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;

public class IcebergTableOperationExecutor implements IcebergTableOperationDispatcher {
Expand All @@ -39,4 +43,42 @@ public LoadTableResponse createTable(
.getCatalogWrapper(catalogName)
.createTable(namespace, createTableRequest);
}

@Override
public LoadTableResponse updateTable(
String catalogName, TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
return icebergCatalogWrapperManager
.getCatalogWrapper(catalogName)
.updateTable(tableIdentifier, updateTableRequest);
}

@Override
public void dropTable(
String catalogName, TableIdentifier tableIdentifier, boolean purgeRequested) {
if (purgeRequested) {
icebergCatalogWrapperManager.getCatalogWrapper(catalogName).purgeTable(tableIdentifier);
} else {
icebergCatalogWrapperManager.getCatalogWrapper(catalogName).dropTable(tableIdentifier);
}
}

@Override
public LoadTableResponse loadTable(String catalogName, TableIdentifier tableIdentifier) {
return icebergCatalogWrapperManager.getCatalogWrapper(catalogName).loadTable(tableIdentifier);
}

@Override
public ListTablesResponse listTable(String catalogName, Namespace namespace) {
return icebergCatalogWrapperManager.getCatalogWrapper(catalogName).listTable(namespace);
}

@Override
public boolean tableExists(String catalogName, TableIdentifier tableIdentifier) {
return icebergCatalogWrapperManager.getCatalogWrapper(catalogName).tableExists(tableIdentifier);
}

@Override
public void renameTable(String catalogName, RenameTableRequest renameTableRequest) {
icebergCatalogWrapperManager.getCatalogWrapper(catalogName).renameTable(renameTableRequest);
}
}
Loading

0 comments on commit 6a9a77b

Please sign in to comment.