From d28061032e2af1190b2c6a5910081d80e6781709 Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Mon, 19 Sep 2022 10:59:59 -0500 Subject: [PATCH 1/4] Deprecate redundant iterable ScanTask since there are no more ScanTasks on the C++ side --- .../arrow/dataset/jni/NativeScanner.java | 9 ++++ .../apache/arrow/dataset/scanner/Scanner.java | 11 +++++ .../org/apache/arrow/dataset/TestDataset.java | 4 +- .../dataset/file/TestFileSystemDataset.java | 45 +++++++++---------- .../arrow/dataset/jni/TestNativeDataset.java | 4 +- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index de18f9e5e0bcb..5e4b949927338 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -68,6 +68,15 @@ ArrowReader execute() { } @Override + public NativeScanTask scanTask() { + if (closed) { + throw new NativeInstanceReleasedException(); + } + return new NativeScanTask(this); + } + + @Override + @Deprecated public Iterable scan() { if (closed) { throw new NativeInstanceReleasedException(); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java index 93a1b08f36609..f562065481162 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -24,12 +24,23 @@ */ public interface Scanner extends AutoCloseable { + /** + * Perform the scan operation. + * + * @return a {@link ScanTask}. Each task is considered independent and it is allowed + * to execute the tasks concurrently to gain better performance. + */ + ScanTask scanTask(); + + /** * Perform the scan operation. * * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed * to execute the tasks concurrently to gain better performance. + * @deprecated use {@link #scanTask()} instead. */ + @Deprecated Iterable scan(); /** diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java index 15224534d2873..220ceff8a330e 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -63,9 +63,7 @@ protected List collectResultFromFactory(DatasetFactory factory final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); try { - final List ret = stream(scanner.scan()) - .flatMap(t -> stream(collectTaskData(t))) - .collect(Collectors.toList()); + final List ret = collectTaskData(scanner.scanTask()); AutoCloseables.close(scanner, dataset); return ret; } catch (RuntimeException e) { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index b8d51a3edb169..9d249dadda984 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -88,7 +88,7 @@ public void testBaseParquetRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(1, datum.size()); assertEquals(2, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); @@ -112,7 +112,7 @@ public void testParquetProjectSingleColumn() throws Exception { List datum = collectResultFromFactory(factory, options); org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(1, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); @@ -139,7 +139,7 @@ public void testParquetBatchSize() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(3, datum.size()); datum.forEach(batch -> assertEquals(1, batch.getLength())); checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); @@ -163,7 +163,7 @@ public void testParquetDirectoryRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(7, datum.size()); datum.forEach(batch -> assertEquals(1, batch.getLength())); checkParquetReadResult(schema, expectedJsonUnordered, datum); @@ -182,7 +182,7 @@ public void testEmptyProjectSelectsZeroColumns() throws Exception { List datum = collectResultFromFactory(factory, options); org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList()); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(0, schema.getFields().size()); assertEquals(1, datum.size()); checkParquetReadResult(schema, @@ -204,7 +204,7 @@ public void testNullProjectSelectsAllColumns() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(1, datum.size()); assertEquals(2, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); @@ -233,7 +233,7 @@ public void testNoErrorWhenCloseAgain() throws Exception { } @Test - public void testErrorThrownWhenScanAgain() throws Exception { + public void testErrorThrownWhenScanTaskAgain() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -241,10 +241,8 @@ public void testErrorThrownWhenScanAgain() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - List taskList1 = collect(scanner.scan()); - List taskList2 = collect(scanner.scan()); - NativeScanTask task1 = taskList1.get(0); - NativeScanTask task2 = taskList2.get(0); + NativeScanTask task1 = scanner.scanTask(); + NativeScanTask task2 = scanner.scanTask(); List datum = collectTaskData(task1); AutoCloseables.close(datum); @@ -253,13 +251,13 @@ public void testErrorThrownWhenScanAgain() throws Exception { Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", uoe.getMessage()); - AutoCloseables.close(taskList1); - AutoCloseables.close(taskList2); + AutoCloseables.close(task1); + AutoCloseables.close(task2); AutoCloseables.close(scanner, dataset, factory); } @Test - public void testScanInOtherThread() throws Exception { + public void testScanTaskInOtherThread() throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); @@ -268,12 +266,11 @@ public void testScanInOtherThread() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - List taskList = collect(scanner.scan()); - NativeScanTask task = taskList.get(0); + NativeScanTask task = scanner.scanTask(); List datum = executor.submit(() -> collectTaskData(task)).get(); AutoCloseables.close(datum); - AutoCloseables.close(taskList); + AutoCloseables.close(task); AutoCloseables.close(scanner, dataset, factory); } @@ -292,7 +289,7 @@ public void testErrorThrownWhenScanAfterScannerClose() throws Exception { } @Test - public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception { + public void testErrorThrownWhenExecuteScanTaskAfterScanTaskClose() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -300,15 +297,14 @@ public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - List tasks = collect(scanner.scan()); - NativeScanTask task = tasks.get(0); + NativeScanTask task = scanner.scanTask(); task.close(); assertThrows(NativeInstanceReleasedException.class, task::execute); AutoCloseables.close(factory); } @Test - public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exception { + public void testErrorThrownWhenIterateOnIteratorAfterScanTaskClose() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -316,8 +312,7 @@ public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exceptio NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - List tasks = collect(scanner.scan()); - NativeScanTask task = tasks.get(0); + NativeScanTask task = scanner.scanTask(); ArrowReader reader = task.execute(); task.close(); assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch); @@ -348,7 +343,7 @@ public void testBaseArrowIpcRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(1, datum.size()); assertEquals(1, schema.getFields().size()); assertEquals("ints", schema.getFields().get(0).getName()); @@ -376,7 +371,7 @@ public void testBaseOrcRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertSingleTaskProduced(factory, options); + assertScanTaskProduced(factory, options); assertEquals(1, datum.size()); assertEquals(1, schema.getFields().size()); assertEquals("ints", schema.getFields().get(0).getName()); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java index 2a86a25688309..318dea86aa9e6 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java @@ -25,9 +25,9 @@ import org.junit.Assert; public abstract class TestNativeDataset extends TestDataset { - protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) { + protected void assertScanTaskProduced(DatasetFactory factory, ScanOptions options) { final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); - Assert.assertEquals(1L, stream(scanner.scan()).count()); + Assert.assertNotNull(scanner.scanTask()); } } From 130ce9821f28f15d9dfc664de2a05c8eacabbadf Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Mon, 19 Sep 2022 17:53:19 -0500 Subject: [PATCH 2/4] Added Scanner method to scan unordered batches --- .../arrow/dataset/jni/NativeScanTask.java | 1 + .../arrow/dataset/jni/NativeScanner.java | 8 ++- .../arrow/dataset/scanner/ScanTask.java | 1 + .../apache/arrow/dataset/scanner/Scanner.java | 11 ++-- .../org/apache/arrow/dataset/TestDataset.java | 7 +-- .../dataset/file/TestFileSystemDataset.java | 62 ++++++------------- .../arrow/dataset/jni/TestNativeDataset.java | 4 +- 7 files changed, 38 insertions(+), 56 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java index e4764236dadbd..7747dd6034016 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java @@ -25,6 +25,7 @@ * id via {@link JniWrapper}, thus we allow only one-time execution of method {@link #execute()}. If a re-scan * operation is expected, call {@link NativeDataset#newScan} to create a new scanner instance. */ +@Deprecated public class NativeScanTask implements ScanTask { private final NativeScanner scanner; diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 5e4b949927338..ad718c0a69b1e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -68,11 +68,15 @@ ArrowReader execute() { } @Override - public NativeScanTask scanTask() { + public ArrowReader scanBatchesUnordered() { if (closed) { throw new NativeInstanceReleasedException(); } - return new NativeScanTask(this); + if (!executed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " + + "new scanner instead"); + } + return new NativeReader(context.getAllocator()); } @Override diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java index 434f5c9a6fa5a..16b8aeefb61f9 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java @@ -26,6 +26,7 @@ * ScanTask is meant to be a unit of work to be dispatched. The implementation * must be thread and concurrent safe. */ +@Deprecated public interface ScanTask extends AutoCloseable { /** diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java index f562065481162..d013d7db3e76c 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -17,6 +17,7 @@ package org.apache.arrow.dataset.scanner; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.Schema; /** @@ -25,20 +26,18 @@ public interface Scanner extends AutoCloseable { /** - * Perform the scan operation. + * Read record batches from the URI resources filtering the batch size configured. * - * @return a {@link ScanTask}. Each task is considered independent and it is allowed - * to execute the tasks concurrently to gain better performance. + * @return a {@link ArrowReader}. */ - ScanTask scanTask(); - + ArrowReader scanBatchesUnordered(); /** * Perform the scan operation. * * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed * to execute the tasks concurrently to gain better performance. - * @deprecated use {@link #scanTask()} instead. + * @deprecated use {@link #scanBatchesUnordered()} instead. */ @Deprecated Iterable scan(); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java index 220ceff8a330e..62f79daeba870 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -28,7 +28,6 @@ import java.util.stream.StreamSupport; import org.apache.arrow.dataset.scanner.ScanOptions; -import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.dataset.source.Dataset; import org.apache.arrow.dataset.source.DatasetFactory; @@ -63,7 +62,7 @@ protected List collectResultFromFactory(DatasetFactory factory final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); try { - final List ret = collectTaskData(scanner.scanTask()); + final List ret = collectTaskData(scanner); AutoCloseables.close(scanner, dataset); return ret; } catch (RuntimeException e) { @@ -73,8 +72,8 @@ protected List collectResultFromFactory(DatasetFactory factory } } - protected List collectTaskData(ScanTask scanTask) { - try (ArrowReader reader = scanTask.execute()) { + protected List collectTaskData(Scanner scan) { + try (ArrowReader reader = scan.scanBatchesUnordered()) { List batches = new ArrayList<>(); while (reader.loadNextBatch()) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index 9d249dadda984..f1d0659328a21 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -42,7 +42,6 @@ import org.apache.arrow.dataset.jni.NativeDataset; import org.apache.arrow.dataset.jni.NativeInstanceReleasedException; import org.apache.arrow.dataset.jni.NativeMemoryPool; -import org.apache.arrow.dataset.jni.NativeScanTask; import org.apache.arrow.dataset.jni.NativeScanner; import org.apache.arrow.dataset.jni.TestNativeDataset; import org.apache.arrow.dataset.scanner.ScanOptions; @@ -88,7 +87,7 @@ public void testBaseParquetRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(1, datum.size()); assertEquals(2, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); @@ -112,7 +111,7 @@ public void testParquetProjectSingleColumn() throws Exception { List datum = collectResultFromFactory(factory, options); org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(1, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType()); @@ -139,7 +138,7 @@ public void testParquetBatchSize() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(3, datum.size()); datum.forEach(batch -> assertEquals(1, batch.getLength())); checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum); @@ -163,7 +162,7 @@ public void testParquetDirectoryRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(7, datum.size()); datum.forEach(batch -> assertEquals(1, batch.getLength())); checkParquetReadResult(schema, expectedJsonUnordered, datum); @@ -182,7 +181,7 @@ public void testEmptyProjectSelectsZeroColumns() throws Exception { List datum = collectResultFromFactory(factory, options); org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList()); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(0, schema.getFields().size()); assertEquals(1, datum.size()); checkParquetReadResult(schema, @@ -204,7 +203,7 @@ public void testNullProjectSelectsAllColumns() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(1, datum.size()); assertEquals(2, schema.getFields().size()); assertEquals("id", schema.getFields().get(0).getName()); @@ -233,7 +232,7 @@ public void testNoErrorWhenCloseAgain() throws Exception { } @Test - public void testErrorThrownWhenScanTaskAgain() throws Exception { + public void testErrorThrownWhenScanBatchesAgain() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -241,23 +240,18 @@ public void testErrorThrownWhenScanTaskAgain() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - NativeScanTask task1 = scanner.scanTask(); - NativeScanTask task2 = scanner.scanTask(); - List datum = collectTaskData(task1); - + List datum = collectTaskData(scanner); AutoCloseables.close(datum); - - UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute); + UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, + scanner::scanBatchesUnordered); Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", uoe.getMessage()); - AutoCloseables.close(task1); - AutoCloseables.close(task2); AutoCloseables.close(scanner, dataset, factory); } @Test - public void testScanTaskInOtherThread() throws Exception { + public void testScanBatchesInOtherThread() throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); @@ -266,16 +260,14 @@ public void testScanTaskInOtherThread() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - NativeScanTask task = scanner.scanTask(); - List datum = executor.submit(() -> collectTaskData(task)).get(); + List datum = executor.submit(() -> collectTaskData(scanner)).get(); AutoCloseables.close(datum); - AutoCloseables.close(task); AutoCloseables.close(scanner, dataset, factory); } @Test - public void testErrorThrownWhenScanAfterScannerClose() throws Exception { + public void testErrorThrownWhenScanBatchesAfterScannerClose() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -284,27 +276,13 @@ public void testErrorThrownWhenScanAfterScannerClose() throws Exception { ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); scanner.close(); - assertThrows(NativeInstanceReleasedException.class, scanner::scan); - AutoCloseables.close(factory); - } - - @Test - public void testErrorThrownWhenExecuteScanTaskAfterScanTaskClose() throws Exception { - ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); + assertThrows(NativeInstanceReleasedException.class, scanner::scanBatchesUnordered); - FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), - FileFormat.PARQUET, writeSupport.getOutputURI()); - NativeDataset dataset = factory.finish(); - ScanOptions options = new ScanOptions(100); - NativeScanner scanner = dataset.newScan(options); - NativeScanTask task = scanner.scanTask(); - task.close(); - assertThrows(NativeInstanceReleasedException.class, task::execute); AutoCloseables.close(factory); } @Test - public void testErrorThrownWhenIterateOnIteratorAfterScanTaskClose() throws Exception { + public void testErrorThrownWhenReadAfterNativeReaderClose() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), @@ -312,10 +290,10 @@ public void testErrorThrownWhenIterateOnIteratorAfterScanTaskClose() throws Exce NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - NativeScanTask task = scanner.scanTask(); - ArrowReader reader = task.execute(); - task.close(); + ArrowReader reader = scanner.scanBatchesUnordered(); + scanner.close(); assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch); + AutoCloseables.close(factory); } @@ -343,7 +321,7 @@ public void testBaseArrowIpcRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(1, datum.size()); assertEquals(1, schema.getFields().size()); assertEquals("ints", schema.getFields().get(0).getName()); @@ -371,7 +349,7 @@ public void testBaseOrcRead() throws Exception { Schema schema = inferResultSchemaFromFactory(factory, options); List datum = collectResultFromFactory(factory, options); - assertScanTaskProduced(factory, options); + assertScanBatchesProduced(factory, options); assertEquals(1, datum.size()); assertEquals(1, schema.getFields().size()); assertEquals("ints", schema.getFields().get(0).getName()); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java index 318dea86aa9e6..ccf22a133c842 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java @@ -25,9 +25,9 @@ import org.junit.Assert; public abstract class TestNativeDataset extends TestDataset { - protected void assertScanTaskProduced(DatasetFactory factory, ScanOptions options) { + protected void assertScanBatchesProduced(DatasetFactory factory, ScanOptions options) { final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); - Assert.assertNotNull(scanner.scanTask()); + Assert.assertNotNull(scanner.scanBatchesUnordered()); } } From 639fc4f22ca7d4b2840078bfa4a4a3b7d1fbc33d Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Tue, 20 Sep 2022 15:45:52 -0500 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: David Li --- .../main/java/org/apache/arrow/dataset/jni/NativeScanner.java | 2 +- .../src/main/java/org/apache/arrow/dataset/scanner/Scanner.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index ad718c0a69b1e..293ee7b838d86 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -73,7 +73,7 @@ public ArrowReader scanBatchesUnordered() { throw new NativeInstanceReleasedException(); } if (!executed.compareAndSet(false, true)) { - throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " + + throw new UnsupportedOperationException("NativeScanner can only be executed once. Create a " + "new scanner instead"); } return new NativeReader(context.getAllocator()); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java index d013d7db3e76c..4828c2954c39d 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -26,7 +26,7 @@ public interface Scanner extends AutoCloseable { /** - * Read record batches from the URI resources filtering the batch size configured. + * Read the dataset as a stream of record batches. * * @return a {@link ArrowReader}. */ From 7f20e3e847a8fc48cb610290c21ecd40a337c54b Mon Sep 17 00:00:00 2001 From: david dali susanibar arce Date: Tue, 20 Sep 2022 16:10:40 -0500 Subject: [PATCH 4/4] Change scanBatchesUnordered to scanBatches to be related with C++ implementation --- .../java/org/apache/arrow/dataset/jni/NativeScanner.java | 2 +- .../java/org/apache/arrow/dataset/scanner/Scanner.java | 4 ++-- .../test/java/org/apache/arrow/dataset/TestDataset.java | 2 +- .../apache/arrow/dataset/file/TestFileSystemDataset.java | 8 ++++---- .../org/apache/arrow/dataset/jni/TestNativeDataset.java | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 293ee7b838d86..8ca8e5cf50eac 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -68,7 +68,7 @@ ArrowReader execute() { } @Override - public ArrowReader scanBatchesUnordered() { + public ArrowReader scanBatches() { if (closed) { throw new NativeInstanceReleasedException(); } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java index 4828c2954c39d..43749b7db8ec2 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -30,14 +30,14 @@ public interface Scanner extends AutoCloseable { * * @return a {@link ArrowReader}. */ - ArrowReader scanBatchesUnordered(); + ArrowReader scanBatches(); /** * Perform the scan operation. * * @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed * to execute the tasks concurrently to gain better performance. - * @deprecated use {@link #scanBatchesUnordered()} instead. + * @deprecated use {@link #scanBatches()} instead. */ @Deprecated Iterable scan(); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java index 62f79daeba870..2516c409593ba 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java @@ -73,7 +73,7 @@ protected List collectResultFromFactory(DatasetFactory factory } protected List collectTaskData(Scanner scan) { - try (ArrowReader reader = scan.scanBatchesUnordered()) { + try (ArrowReader reader = scan.scanBatches()) { List batches = new ArrayList<>(); while (reader.loadNextBatch()) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index f1d0659328a21..9dc5f2b655a83 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -243,8 +243,8 @@ public void testErrorThrownWhenScanBatchesAgain() throws Exception { List datum = collectTaskData(scanner); AutoCloseables.close(datum); UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, - scanner::scanBatchesUnordered); - Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead", + scanner::scanBatches); + Assertions.assertEquals("NativeScanner can only be executed once. Create a new scanner instead", uoe.getMessage()); AutoCloseables.close(scanner, dataset, factory); @@ -276,7 +276,7 @@ public void testErrorThrownWhenScanBatchesAfterScannerClose() throws Exception { ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); scanner.close(); - assertThrows(NativeInstanceReleasedException.class, scanner::scanBatchesUnordered); + assertThrows(NativeInstanceReleasedException.class, scanner::scanBatches); AutoCloseables.close(factory); } @@ -290,7 +290,7 @@ public void testErrorThrownWhenReadAfterNativeReaderClose() throws Exception { NativeDataset dataset = factory.finish(); ScanOptions options = new ScanOptions(100); NativeScanner scanner = dataset.newScan(options); - ArrowReader reader = scanner.scanBatchesUnordered(); + ArrowReader reader = scanner.scanBatches(); scanner.close(); assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch); diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java index ccf22a133c842..d0f91769096d8 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/TestNativeDataset.java @@ -28,6 +28,6 @@ public abstract class TestNativeDataset extends TestDataset { protected void assertScanBatchesProduced(DatasetFactory factory, ScanOptions options) { final Dataset dataset = factory.finish(); final Scanner scanner = dataset.newScan(options); - Assert.assertNotNull(scanner.scanBatchesUnordered()); + Assert.assertNotNull(scanner.scanBatches()); } }