Skip to content

Commit

Permalink
feat: Add support for using JDBC Catalog for Iceberg testing (#6144)
Browse files Browse the repository at this point in the history
Fixes #6029
  • Loading branch information
devinrsmith authored Sep 27, 2024
1 parent e596689 commit ff1621a
Show file tree
Hide file tree
Showing 26 changed files with 739 additions and 28 deletions.
125 changes: 125 additions & 0 deletions extensions/iceberg/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# sqlite catalogs

The sqlite JDBC catalog is able to support multiple catalogs through a single database file.
As such, the following convention has been established for testing purposes:

* database file: `<rootDir>/dh-iceberg-test.db`
* warehouse directory: `<rootDir>/catalogs/<catalogName>/`

Both writers and readers of this catalog need to be setup to support relative metadata locations to ensure portability.

A root directory for extension-iceberg testing has been established at `extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource`.

## Usage

### Java

```java
import org.apache.iceberg.catalog.Catalog;
import io.deephaven.iceberg.sqlite.DbResource;

Catalog catalog = DbResource.openCatalog("<catalogName>");
```

### pyiceberg

To setup in [pyiceberg](https://py.iceberg.apache.org/):

```python
from pyiceberg.catalog.sql import SqlCatalog

catalog = SqlCatalog(
"<catalogName>",
**{
"uri": f"sqlite:///dh-iceberg-test.db",
"warehouse": f"catalogs/<catalogName>",
},
)
```

## Generating data

Note that any scripts that write data should be run relative to
[db_resource](src/test/resources/io/deephaven/iceberg/sqlite/db_resource) working directory to ensure unit testability.

### pyiceberg-1

Here's an example of what was needed to generate this data:

```shell
$ cd extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource

# Note: 3.10 explicitly chosen b/c it doesn't seem like pyiceberg produces 3.12 wheels yet
$ python3.10 -m venv /tmp/iceberg

$ source /tmp/iceberg/bin/activate

$ pip install --only-binary=":all:" "pyiceberg[sql-sqlite, pyarrow]"

$ pip freeze
annotated-types==0.7.0
certifi==2024.8.30
charset-normalizer==3.3.2
click==8.1.7
fsspec==2024.9.0
greenlet==3.1.1
idna==3.10
markdown-it-py==3.0.0
mdurl==0.1.2
mmh3==4.1.0
numpy==1.26.4
pyarrow==17.0.0
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.18.0
pyiceberg==0.7.1
pyparsing==3.1.4
python-dateutil==2.9.0.post0
requests==2.32.3
rich==13.8.1
six==1.16.0
sortedcontainers==2.4.0
SQLAlchemy==2.0.35
strictyaml==1.7.3
tenacity==8.5.0
typing_extensions==4.12.2
urllib3==2.2.3

$ python generate-pyiceberg-1.py

$ sqlite3 dh-iceberg-test.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .dump
PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE iceberg_tables (
catalog_name VARCHAR(255) NOT NULL,
table_namespace VARCHAR(255) NOT NULL,
table_name VARCHAR(255) NOT NULL,
metadata_location VARCHAR(1000),
previous_metadata_location VARCHAR(1000),
PRIMARY KEY (catalog_name, table_namespace, table_name)
);
INSERT INTO iceberg_tables VALUES('pyiceberg-1','dh-default','cities','catalogs/pyiceberg-1/dh-default.db/cities/metadata/00003-68091f71-d3c5-42bb-8161-e2e187dece14.metadata.json','catalogs/pyiceberg-1/dh-default.db/cities/metadata/00002-106b37f8-8818-439d-87c5-3cae608d1972.metadata.json');
CREATE TABLE iceberg_namespace_properties (
catalog_name VARCHAR(255) NOT NULL,
namespace VARCHAR(255) NOT NULL,
property_key VARCHAR(255) NOT NULL,
property_value VARCHAR(1000) NOT NULL,
PRIMARY KEY (catalog_name, namespace, property_key)
);
INSERT INTO iceberg_namespace_properties VALUES('pyiceberg-1','dh-default','exists','true');
COMMIT;
sqlite>
```

### sqlite

If we add a lot of catalogs to the database, we may want to look into vacuuming the database to keep the file size small.

`sqlite3 dh-iceberg-test.db 'VACUUM;'`

Ideally, the sqlite database can be a small collection of catalogs that were created via external tooling to verify that
we can integrate with them successfully.

18 changes: 13 additions & 5 deletions extensions/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ dependencies {
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher

testImplementation libs.testcontainers
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.localstack
testImplementation libs.testcontainers.minio

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

testRuntimeOnly libs.xerial.sqlite.jdbc

// NOTE: we need to pull down more hadoop dependencies,
// buildSrc/src/main/groovy/io.deephaven.hadoop-common-dependencies.gradle excludes some stuff we need to do
// Hadoop FileIO.
testRuntimeOnly libs.hadoop.common

testImplementation project(':engine-test-utils')
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
Expand Down Expand Up @@ -130,6 +131,15 @@ public IcebergBaseLayout(

abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);

@NotNull
private URI dataFileUri(@NotNull DataFile df) {
String path = df.path().toString();
if (fileIO instanceof RelativeFileIO) {
path = ((RelativeFileIO) fileIO).absoluteLocation(path);
}
return FileUtils.convertToURI(path, false);
}

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
try {
Expand All @@ -144,7 +154,7 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, fileIO)) {
for (DataFile df : reader) {
final URI fileUri = FileUtils.convertToURI(df.path().toString(), false);
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.relative;

import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;

import java.util.Map;
import java.util.function.Function;

/**
* While this class is in the public source set, it is meant to support testing use cases only and should not be used in
* production.
*
* @see <a href="https://github.com/apache/iceberg/issues/1617">Support relative paths in Table Metadata</a>
*/
@VisibleForTesting
public final class RelativeFileIO implements HadoopConfigurable, DelegateFileIO {
public static final String BASE_PATH = "relative.base-path";
private static final String IO_DEFAULT_IMPL = ResolvingFileIO.class.getName();

private String basePath;

private DelegateFileIO io;

private SerializableSupplier<Configuration> hadoopConf;

public RelativeFileIO() {}

public RelativeFileIO(Configuration hadoopConf) {
this(new SerializableConfiguration(hadoopConf)::get);
}

public RelativeFileIO(SerializableSupplier<Configuration> hadoopConf) {
this.hadoopConf = hadoopConf;
}

@Override
public Configuration getConf() {
return hadoopConf.get();
}

@Override
public void setConf(Configuration conf) {
this.hadoopConf = new SerializableConfiguration(conf)::get;
}

@Override
public void serializeConfWith(Function<Configuration, SerializableSupplier<Configuration>> confSerializer) {
this.hadoopConf = confSerializer.apply(getConf());
}

public String absoluteLocation(String location) {
return basePath + location;
}

private String relativeLocation(String location) {
if (!location.startsWith(basePath)) {
throw new IllegalStateException();
}
return location.substring(basePath.length());
}

@Override
public void initialize(Map<String, String> properties) {
this.basePath = StringUtils.appendIfMissing(properties.get(BASE_PATH), "/");
// We can add a property here later if we need to override the default
// final String impl = properties.getOrDefault(IO_IMPL, IO_DEFAULT_IMPL);
final FileIO fileIO = CatalogUtil.loadFileIO(IO_DEFAULT_IMPL, properties, hadoopConf.get());
if (!(fileIO instanceof DelegateFileIO)) {
throw new IllegalArgumentException("filoIO must be DelegateFileIO, " + fileIO.getClass());
}
this.io = (DelegateFileIO) fileIO;
}

@Override
public Map<String, String> properties() {
return io.properties();
}

@Override
public InputFile newInputFile(String path) {
return new RelativeInputFile(path, io.newInputFile(absoluteLocation(path)));
}

@Override
public InputFile newInputFile(String path, long length) {
return new RelativeInputFile(path, io.newInputFile(absoluteLocation(path), length));
}

@Override
public OutputFile newOutputFile(String path) {
return new RelativeOutputFile(path, io.newOutputFile(absoluteLocation(path)));
}

@Override
public void deleteFiles(Iterable<String> iterable) throws BulkDeletionFailureException {
io.deleteFiles(Streams.stream(iterable).map(this::absoluteLocation)::iterator);
}

@Override
public Iterable<FileInfo> listPrefix(String s) {
return Streams.stream(io.listPrefix(absoluteLocation(s)))
.map(x -> new FileInfo(relativeLocation(x.location()), x.size(), x.createdAtMillis()))::iterator;
}

@Override
public void deletePrefix(String s) {
io.deletePrefix(absoluteLocation(s));
}

@Override
public void deleteFile(String path) {
io.deleteFile(absoluteLocation(path));
}

@Override
public void close() {
if (io != null) {
io.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.relative;

import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.util.Objects;

final class RelativeInputFile implements InputFile {
private final String location;
private final InputFile file;

public RelativeInputFile(String location, InputFile file) {
this.location = Objects.requireNonNull(location);
this.file = Objects.requireNonNull(file);
}

@Override
public long getLength() {
return file.getLength();
}

@Override
public SeekableInputStream newStream() {
return file.newStream();
}

@Override
public String location() {
return location;
}

@Override
public boolean exists() {
return file.exists();
}
}
Loading

0 comments on commit ff1621a

Please sign in to comment.