Skip to content

Commit

Permalink
ARROW-15745: [Java] Deprecate redundant iterable of ScanTask (apache#…
Browse files Browse the repository at this point in the history
…14168)

Deprecate redundant iterable ScanTask since there are no more ScanTasks on the C++ side

Authored-by: david dali susanibar arce <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
davisusanibar authored and fatemehp committed Oct 17, 2022
1 parent 4ae97de commit 85a73aa
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* id via {@link JniWrapper}, thus we allow only one-time execution of method {@link #execute()}. If a re-scan
* operation is expected, call {@link NativeDataset#newScan} to create a new scanner instance.
*/
@Deprecated
public class NativeScanTask implements ScanTask {
private final NativeScanner scanner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ ArrowReader execute() {
}

@Override
public ArrowReader scanBatches() {
if (closed) {
throw new NativeInstanceReleasedException();
}
if (!executed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("NativeScanner can only be executed once. Create a " +
"new scanner instead");
}
return new NativeReader(context.getAllocator());
}

@Override
@Deprecated
public Iterable<? extends NativeScanTask> scan() {
if (closed) {
throw new NativeInstanceReleasedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* ScanTask is meant to be a unit of work to be dispatched. The implementation
* must be thread and concurrent safe.
*/
@Deprecated
public interface ScanTask extends AutoCloseable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@

package org.apache.arrow.dataset.scanner;

import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* A high level interface for scanning data over dataset.
*/
public interface Scanner extends AutoCloseable {

/**
* Read the dataset as a stream of record batches.
*
* @return a {@link ArrowReader}.
*/
ArrowReader scanBatches();

/**
* Perform the scan operation.
*
* @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed
* to execute the tasks concurrently to gain better performance.
* @deprecated use {@link #scanBatches()} instead.
*/
@Deprecated
Iterable<? extends ScanTask> scan();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.stream.StreamSupport;

import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
Expand Down Expand Up @@ -63,9 +62,7 @@ protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
try {
final List<ArrowRecordBatch> ret = stream(scanner.scan())
.flatMap(t -> stream(collectTaskData(t)))
.collect(Collectors.toList());
final List<ArrowRecordBatch> ret = collectTaskData(scanner);
AutoCloseables.close(scanner, dataset);
return ret;
} catch (RuntimeException e) {
Expand All @@ -75,8 +72,8 @@ protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory
}
}

protected List<ArrowRecordBatch> collectTaskData(ScanTask scanTask) {
try (ArrowReader reader = scanTask.execute()) {
protected List<ArrowRecordBatch> collectTaskData(Scanner scan) {
try (ArrowReader reader = scan.scanBatches()) {
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.arrow.dataset.jni.NativeDataset;
import org.apache.arrow.dataset.jni.NativeInstanceReleasedException;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.jni.NativeScanTask;
import org.apache.arrow.dataset.jni.NativeScanner;
import org.apache.arrow.dataset.jni.TestNativeDataset;
import org.apache.arrow.dataset.scanner.ScanOptions;
Expand Down Expand Up @@ -88,7 +87,7 @@ public void testBaseParquetRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(2, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
Expand All @@ -112,7 +111,7 @@ public void testParquetProjectSingleColumn() throws Exception {
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(1, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType());
Expand All @@ -139,7 +138,7 @@ public void testParquetBatchSize() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(3, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
Expand All @@ -163,7 +162,7 @@ public void testParquetDirectoryRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(7, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);
Expand All @@ -182,7 +181,7 @@ public void testEmptyProjectSelectsZeroColumns() throws Exception {
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList());

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(0, schema.getFields().size());
assertEquals(1, datum.size());
checkParquetReadResult(schema,
Expand All @@ -204,7 +203,7 @@ public void testNullProjectSelectsAllColumns() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(2, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
Expand Down Expand Up @@ -233,33 +232,26 @@ public void testNoErrorWhenCloseAgain() throws Exception {
}

@Test
public void testErrorThrownWhenScanAgain() throws Exception {
public void testErrorThrownWhenScanBatchesAgain() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> taskList1 = collect(scanner.scan());
List<? extends NativeScanTask> taskList2 = collect(scanner.scan());
NativeScanTask task1 = taskList1.get(0);
NativeScanTask task2 = taskList2.get(0);
List<ArrowRecordBatch> datum = collectTaskData(task1);

List<ArrowRecordBatch> datum = collectTaskData(scanner);
AutoCloseables.close(datum);

UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute);
Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead",
UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class,
scanner::scanBatches);
Assertions.assertEquals("NativeScanner can only be executed once. Create a new scanner instead",
uoe.getMessage());

AutoCloseables.close(taskList1);
AutoCloseables.close(taskList2);
AutoCloseables.close(scanner, dataset, factory);
}

@Test
public void testScanInOtherThread() throws Exception {
public void testScanBatchesInOtherThread() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

Expand All @@ -268,17 +260,14 @@ public void testScanInOtherThread() throws Exception {
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> taskList = collect(scanner.scan());
NativeScanTask task = taskList.get(0);
List<ArrowRecordBatch> datum = executor.submit(() -> collectTaskData(task)).get();
List<ArrowRecordBatch> datum = executor.submit(() -> collectTaskData(scanner)).get();

AutoCloseables.close(datum);
AutoCloseables.close(taskList);
AutoCloseables.close(scanner, dataset, factory);
}

@Test
public void testErrorThrownWhenScanAfterScannerClose() throws Exception {
public void testErrorThrownWhenScanBatchesAfterScannerClose() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
Expand All @@ -287,40 +276,24 @@ public void testErrorThrownWhenScanAfterScannerClose() throws Exception {
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
scanner.close();
assertThrows(NativeInstanceReleasedException.class, scanner::scan);
AutoCloseables.close(factory);
}

@Test
public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
assertThrows(NativeInstanceReleasedException.class, scanner::scanBatches);

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> tasks = collect(scanner.scan());
NativeScanTask task = tasks.get(0);
task.close();
assertThrows(NativeInstanceReleasedException.class, task::execute);
AutoCloseables.close(factory);
}

@Test
public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exception {
public void testErrorThrownWhenReadAfterNativeReaderClose() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> tasks = collect(scanner.scan());
NativeScanTask task = tasks.get(0);
ArrowReader reader = task.execute();
task.close();
ArrowReader reader = scanner.scanBatches();
scanner.close();
assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch);

AutoCloseables.close(factory);
}

Expand Down Expand Up @@ -348,7 +321,7 @@ public void testBaseArrowIpcRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(1, schema.getFields().size());
assertEquals("ints", schema.getFields().get(0).getName());
Expand Down Expand Up @@ -376,7 +349,7 @@ public void testBaseOrcRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanBatchesProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(1, schema.getFields().size());
assertEquals("ints", schema.getFields().get(0).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.junit.Assert;

public abstract class TestNativeDataset extends TestDataset {
protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) {
protected void assertScanBatchesProduced(DatasetFactory factory, ScanOptions options) {
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
Assert.assertEquals(1L, stream(scanner.scan()).count());
Assert.assertNotNull(scanner.scanBatches());
}
}

0 comments on commit 85a73aa

Please sign in to comment.