Skip to content

Commit

Permalink
feat: initial spotless integration dataset module
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jun 11, 2024
1 parent 12e32f5 commit c6f76b6
Show file tree
Hide file tree
Showing 37 changed files with 1,116 additions and 846 deletions.
3 changes: 3 additions & 0 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ under the License.
<packaging>jar</packaging>
<name>Arrow Java Dataset</name>
<description>Java implementation of Arrow Dataset API/Framework</description>

<properties>
<arrow.cpp.build.dir>../../../cpp/release-build/</arrow.cpp.build.dir>
<parquet.version>1.13.1</parquet.version>
<avro.version>1.11.3</avro.version>
<checkstyle.config.location>dev/checkstyle/checkstyle-spotless.xml</checkstyle.config.location>
<spotless.java.excludes>none</spotless.java.excludes>
</properties>

<dependencies>
Expand Down
30 changes: 15 additions & 15 deletions java/dataset/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

open module org.apache.arrow.dataset {
exports org.apache.arrow.dataset.file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.c.ArrowArrayStream;
Expand All @@ -36,15 +35,27 @@ public class DatasetFileWriter {
* @param uri target file uri
* @param maxPartitions maximum partitions to be included in written files
* @param partitionColumns columns used to partition output files. Empty to disable partitioning
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is
* current partition ID around all written files.
*/
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
public static void write(
BufferAllocator allocator,
ArrowReader reader,
FileFormat format,
String uri,
String[] partitionColumns,
int maxPartitions,
String baseNameTemplate) {
try (final ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader, stream);
JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(),
format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
JniWrapper.get()
.writeFromScannerToFile(
stream.memoryAddress(),
format.id(),
uri,
partitionColumns,
maxPartitions,
baseNameTemplate);
}
}

Expand All @@ -55,7 +66,8 @@ public static void write(BufferAllocator allocator, ArrowReader reader, FileForm
* @param format target file format
* @param uri target file uri
*/
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) {
public static void write(
BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) {
write(allocator, reader, format, uri, new String[0], 1024, "data_{i}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

/**
* File format definitions.
*/
/** File format definitions. */
public enum FileFormat {
PARQUET(0),
ARROW_IPC(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.NativeDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.memory.BufferAllocator;

/**
* Java binding of the C++ FileSystemDatasetFactory.
*/
/** Java binding of the C++ FileSystemDatasetFactory. */
public class FileSystemDatasetFactory extends NativeDatasetFactory {

public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
String uri) {
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) {
super(allocator, memoryPool, createNative(format, uri));
}

public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
String[] uris) {
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) {
super(allocator, memoryPool, createNative(format, uris));
}

Expand All @@ -43,5 +40,4 @@ private static long createNative(FileFormat format, String uri) {
private static long createNative(FileFormat format, String[] uris) {
return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.dataset.jni.JniLoader;
Expand All @@ -31,8 +30,7 @@ public static JniWrapper get() {
return INSTANCE;
}

private JniWrapper() {
}
private JniWrapper() {}

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
Expand Down Expand Up @@ -65,14 +63,14 @@ private JniWrapper() {
* @param uri target file uri
* @param partitionColumns columns used to partition output files
* @param maxPartitions maximum partitions to be included in written files
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is
* current partition ID around all written files.
*/
public native void writeFromScannerToFile(long streamAddress,
long fileFormat,
String uri,
String[] partitionColumns,
int maxPartitions,
String baseNameTemplate);

public native void writeFromScannerToFile(
long streamAddress,
long fileFormat,
String uri,
String[] partitionColumns,
int maxPartitions,
String baseNameTemplate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.jni;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.util.VisibleForTesting;

/**
* Reserving Java direct memory bytes from java.nio.Bits. Used by Java Dataset API's C++ memory
* pool implementation. This makes memory allocated by the pool to be controlled by JVM option
* Reserving Java direct memory bytes from java.nio.Bits. Used by Java Dataset API's C++ memory pool
* implementation. This makes memory allocated by the pool to be controlled by JVM option
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
Expand All @@ -40,10 +38,13 @@ private DirectReservationListener() {
methodUnreserve = this.getDeclaredMethodBaseOnJDKVersion(classBits, "unreserveMemory");
methodUnreserve.setAccessible(true);
} catch (Exception e) {
final RuntimeException failure = new RuntimeException(
"Failed to initialize DirectReservationListener. When starting Java you must include " +
"`--add-opens=java.base/java.nio=org.apache.arrow.dataset,org.apache.arrow.memory.core,ALL-UNNAMED` " +
"(See https://arrow.apache.org/docs/java/install.html)", e);
final RuntimeException failure =
new RuntimeException(
"Failed to initialize DirectReservationListener. When starting Java you must include "
+ "`--add-opens=java.base/java.nio=org.apache.arrow.dataset,"
+ "org.apache.arrow.memory.core,ALL-UNNAMED` "
+ "(See https://arrow.apache.org/docs/java/install.html)",
e);
failure.printStackTrace();
throw failure;
}
Expand All @@ -55,39 +56,35 @@ public static DirectReservationListener instance() {
return INSTANCE;
}

/**
* Reserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#reserveMemory.
*/
/** Reserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#reserveMemory. */
@Override
public void reserve(long size) {
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
throw new IllegalArgumentException(
"reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodReserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
*/
/** Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory. */
@Override
public void unreserve(long size) {
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
throw new IllegalArgumentException(
"unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodUnreserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Get current reservation of jVM direct memory. Visible for testing.
*/
/** Get current reservation of jVM direct memory. Visible for testing. */
@VisibleForTesting
public long getCurrentDirectMemReservation() {
try {
Expand All @@ -110,7 +107,9 @@ public long getCurrentDirectMemReservation() {
}

/**
* Get the given method via reflection, searching for different signatures based on the Java version.
* Get the given method via reflection, searching for different signatures based on the Java
* version.
*
* @param classBits The java.nio.Bits class.
* @param name The method being requested.
* @return The method object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.jni;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
* For native code to invoke to convert a java/lang/Throwable to jstring.
*/
/** For native code to invoke to convert a java/lang/Throwable to jstring. */
class JniExceptionDescriber {
private JniExceptionDescriber() {
}
private JniExceptionDescriber() {}

/**
* Convert a java/lang/Throwable to jstring. See codes in arrow::dataset::jni::CheckException
* for more details.
* Convert a java/lang/Throwable to jstring. See codes in arrow::dataset::jni::CheckException for
* more details.
*
* @param throwable the exception instance.
* @return a String including error message and stack trace of the exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.jni;

import java.io.File;
Expand All @@ -30,12 +29,11 @@
import java.util.Locale;
import java.util.Set;

/**
* The JniLoader for Dataset API's native implementation.
*/
/** The JniLoader for Dataset API's native implementation. */
public final class JniLoader {

private static final JniLoader INSTANCE = new JniLoader(Collections.singletonList("arrow_dataset_jni"));
private static final JniLoader INSTANCE =
new JniLoader(Collections.singletonList("arrow_dataset_jni"));

public static JniLoader get() {
return INSTANCE;
Expand All @@ -51,9 +49,7 @@ private boolean finished() {
return librariesToLoad.isEmpty();
}

/**
* If required JNI libraries are not loaded, then load them.
*/
/** If required JNI libraries are not loaded, then load them. */
public void ensureLoaded() {
if (finished()) {
return;
Expand All @@ -65,7 +61,8 @@ public void ensureLoaded() {
private synchronized void loadRemaining() {
// The method is protected by a mutex via synchronized, if more than one thread race to call
// loadRemaining, at same time only one will do the actual loading and the others will wait for
// the mutex to be acquired then check on the remaining list: if there are libraries that were not
// the mutex to be acquired then check on the remaining list: if there are libraries that were
// not
// successfully loaded then the mutex owner will try to load them again.
if (finished()) {
return;
Expand All @@ -81,10 +78,11 @@ private void load(String name) {
final String libraryToLoad =
name + "/" + getNormalizedArch() + "/" + System.mapLibraryName(name);
try {
File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir")));
File temp =
File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir")));
temp.deleteOnExit();
try (final InputStream is
= JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
try (final InputStream is =
JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
if (is == null) {
throw new FileNotFoundException(libraryToLoad);
}
Expand Down Expand Up @@ -112,6 +110,11 @@ private String getNormalizedArch() {
}

private void ensureS3FinalizedOnShutdown() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> { JniWrapper.get().ensureS3Finalized(); }));
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
JniWrapper.get().ensureS3Finalized();
}));
}
}
Loading

0 comments on commit c6f76b6

Please sign in to comment.