Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for using JDBC Catalog for Iceberg testing #6144

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions extensions/iceberg/TESTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# sqlite catalogs

The sqlite JDBC catalog is able to support multiple catalogs through a single database file.
As such, the following convention has been established for testing purposes:

* database file: `<rootDir>/dh-iceberg-test.db`
* warehouse directory: `<rootDir>/catalogs/<catalogName>/`

Both writers and readers of this catalog need to be setup to support relative metadata locations to ensure portability.

A root directory for extension-iceberg testing has been established at `extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource`.

## Usage

### Java

```java
import org.apache.iceberg.catalog.Catalog;
import io.deephaven.iceberg.sqlite.DbResource;

Catalog catalog = DbResource.openCatalog("<catalogName>");
```

### pyiceberg

To setup in [pyiceberg](https://py.iceberg.apache.org/):

```python
from pyiceberg.catalog.sql import SqlCatalog

catalog = SqlCatalog(
"<catalogName>",
**{
"uri": f"sqlite:///dh-iceberg-test.db",
"warehouse": f"catalogs/<catalogName>",
},
)
```

## Generating data

Note that any scripts that write data should be run relative to
[db_resource](src/test/resources/io/deephaven/iceberg/sqlite/db_resource) working directory to ensure unit testability.

### pyiceberg-1

Here's an example of what was needed to generate this data:

```shell
$ cd extensions/iceberg/src/test/resources/io/deephaven/iceberg/sqlite/db_resource

# Note: 3.10 explicitly chosen b/c it doesn't seem like pyiceberg produces 3.12 wheels yet
$ python3.10 -m venv /tmp/iceberg

$ source /tmp/iceberg/bin/activate

$ pip install --only-binary=":all:" "pyiceberg[sql-sqlite, pyarrow]"

$ pip freeze
annotated-types==0.7.0
certifi==2024.8.30
charset-normalizer==3.3.2
click==8.1.7
fsspec==2024.9.0
greenlet==3.1.1
idna==3.10
markdown-it-py==3.0.0
mdurl==0.1.2
mmh3==4.1.0
numpy==1.26.4
pyarrow==17.0.0
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.18.0
pyiceberg==0.7.1
pyparsing==3.1.4
python-dateutil==2.9.0.post0
requests==2.32.3
rich==13.8.1
six==1.16.0
sortedcontainers==2.4.0
SQLAlchemy==2.0.35
strictyaml==1.7.3
tenacity==8.5.0
typing_extensions==4.12.2
urllib3==2.2.3

$ python generate-pyiceberg-1.py

$ sqlite3 dh-iceberg-test.db
SQLite version 3.45.1 2024-01-30 16:01:20
Enter ".help" for usage hints.
sqlite> .dump
PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE iceberg_tables (
catalog_name VARCHAR(255) NOT NULL,
table_namespace VARCHAR(255) NOT NULL,
table_name VARCHAR(255) NOT NULL,
metadata_location VARCHAR(1000),
previous_metadata_location VARCHAR(1000),
PRIMARY KEY (catalog_name, table_namespace, table_name)
);
INSERT INTO iceberg_tables VALUES('pyiceberg-1','dh-default','cities','catalogs/pyiceberg-1/dh-default.db/cities/metadata/00003-68091f71-d3c5-42bb-8161-e2e187dece14.metadata.json','catalogs/pyiceberg-1/dh-default.db/cities/metadata/00002-106b37f8-8818-439d-87c5-3cae608d1972.metadata.json');
CREATE TABLE iceberg_namespace_properties (
catalog_name VARCHAR(255) NOT NULL,
namespace VARCHAR(255) NOT NULL,
property_key VARCHAR(255) NOT NULL,
property_value VARCHAR(1000) NOT NULL,
PRIMARY KEY (catalog_name, namespace, property_key)
);
INSERT INTO iceberg_namespace_properties VALUES('pyiceberg-1','dh-default','exists','true');
COMMIT;
sqlite>
```

### sqlite

If we add a lot of catalogs to the database, we may want to look into vacuuming the database to keep the file size small.

`sqlite3 dh-iceberg-test.db 'VACUUM;'`

Ideally, the sqlite database can be a small collection of catalogs that were created via external tooling to verify that
we can integrate with them successfully.

18 changes: 13 additions & 5 deletions extensions/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ dependencies {
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher

testImplementation libs.testcontainers
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.localstack
testImplementation libs.testcontainers.minio

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

testRuntimeOnly libs.xerial.sqlite.jdbc

// NOTE: we need to pull down more hadoop dependencies,
// buildSrc/src/main/groovy/io.deephaven.hadoop-common-dependencies.gradle excludes some stuff we need to do
// Hadoop FileIO.
testRuntimeOnly libs.hadoop.common

testImplementation project(':engine-test-utils')
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
Expand Down Expand Up @@ -130,6 +131,15 @@ public IcebergBaseLayout(

abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri);

@NotNull
private URI dataFileUri(@NotNull DataFile df) {
String path = df.path().toString();
if (fileIO instanceof RelativeFileIO) {
path = ((RelativeFileIO) fileIO).absoluteLocation(path);
}
return FileUtils.convertToURI(path, false);
}

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
try {
Expand All @@ -144,7 +154,7 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
}
try (final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, fileIO)) {
for (DataFile df : reader) {
final URI fileUri = FileUtils.convertToURI(df.path().toString(), false);
final URI fileUri = dataFileUri(df);
final IcebergTableLocationKey locationKey =
cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri));
if (locationKey != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.relative;

import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.DelegateFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;

import java.util.Map;
import java.util.function.Function;

/**
* While this class is in the public source set, it is meant to support testing use cases only and should not be used in
* production.
*
* @see <a href="https://github.com/apache/iceberg/issues/1617">Support relative paths in Table Metadata</a>
*/
@VisibleForTesting
public final class RelativeFileIO implements HadoopConfigurable, DelegateFileIO {
public static final String BASE_PATH = "relative.base-path";
private static final String IO_DEFAULT_IMPL = ResolvingFileIO.class.getName();

private String basePath;

private DelegateFileIO io;

private SerializableSupplier<Configuration> hadoopConf;

public RelativeFileIO() {}

public RelativeFileIO(Configuration hadoopConf) {
this(new SerializableConfiguration(hadoopConf)::get);
}

public RelativeFileIO(SerializableSupplier<Configuration> hadoopConf) {
this.hadoopConf = hadoopConf;
}

@Override
public Configuration getConf() {
return hadoopConf.get();
}

@Override
public void setConf(Configuration conf) {
this.hadoopConf = new SerializableConfiguration(conf)::get;
}

@Override
public void serializeConfWith(Function<Configuration, SerializableSupplier<Configuration>> confSerializer) {
this.hadoopConf = confSerializer.apply(getConf());
}

public String absoluteLocation(String location) {
return basePath + location;
}

private String relativeLocation(String location) {
if (!location.startsWith(basePath)) {
throw new IllegalStateException();
}
return location.substring(basePath.length());
}

@Override
public void initialize(Map<String, String> properties) {
this.basePath = StringUtils.appendIfMissing(properties.get(BASE_PATH), "/");
// We can add a property here later if we need to override the default
// final String impl = properties.getOrDefault(IO_IMPL, IO_DEFAULT_IMPL);
final FileIO fileIO = CatalogUtil.loadFileIO(IO_DEFAULT_IMPL, properties, hadoopConf.get());
if (!(fileIO instanceof DelegateFileIO)) {
throw new IllegalArgumentException("filoIO must be DelegateFileIO, " + fileIO.getClass());
}
this.io = (DelegateFileIO) fileIO;
}

@Override
public Map<String, String> properties() {
return io.properties();
}

@Override
public InputFile newInputFile(String path) {
return new RelativeInputFile(path, io.newInputFile(absoluteLocation(path)));
}

@Override
public InputFile newInputFile(String path, long length) {
return new RelativeInputFile(path, io.newInputFile(absoluteLocation(path), length));
}

@Override
public OutputFile newOutputFile(String path) {
return new RelativeOutputFile(path, io.newOutputFile(absoluteLocation(path)));
}

@Override
public void deleteFiles(Iterable<String> iterable) throws BulkDeletionFailureException {
io.deleteFiles(Streams.stream(iterable).map(this::absoluteLocation)::iterator);
}

@Override
public Iterable<FileInfo> listPrefix(String s) {
return Streams.stream(io.listPrefix(absoluteLocation(s)))
.map(x -> new FileInfo(relativeLocation(x.location()), x.size(), x.createdAtMillis()))::iterator;
}

@Override
public void deletePrefix(String s) {
io.deletePrefix(absoluteLocation(s));
}

@Override
public void deleteFile(String path) {
io.deleteFile(absoluteLocation(path));
}

@Override
public void close() {
if (io != null) {
io.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.iceberg.relative;

import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.util.Objects;

final class RelativeInputFile implements InputFile {
private final String location;
private final InputFile file;

public RelativeInputFile(String location, InputFile file) {
this.location = Objects.requireNonNull(location);
this.file = Objects.requireNonNull(file);
}

@Override
public long getLength() {
return file.getLength();
}

@Override
public SeekableInputStream newStream() {
return file.newStream();
}

@Override
public String location() {
return location;
}

@Override
public boolean exists() {
return file.exists();
}
}
Loading