Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15745: [Java] Deprecate redundant iterable of ScanTask #14168

Merged
merged 5 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ ArrowReader execute() {
}

@Override
public NativeScanTask scanTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just shouldn't have ScanTask at all, there is no such thing on the C++ side anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me delete that, this scan() method is exposed to the client and will cause break compatibilities

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecation makes sense to me, I think we should consider what the new interface is, though. I would think something like ArrowReader scanBatchesUnordered().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mirror the C++ API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

if (closed) {
throw new NativeInstanceReleasedException();
}
return new NativeScanTask(this);
}

@Override
@Deprecated
public Iterable<? extends NativeScanTask> scan() {
if (closed) {
throw new NativeInstanceReleasedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,23 @@
*/
public interface Scanner extends AutoCloseable {

/**
* Perform the scan operation.
*
* @return a {@link ScanTask}. Each task is considered independent and it is allowed
* to execute the tasks concurrently to gain better performance.
*/
ScanTask scanTask();


/**
* Perform the scan operation.
*
* @return a iterable set of {@link ScanTask}s. Each task is considered independent and it is allowed
* to execute the tasks concurrently to gain better performance.
* @deprecated use {@link #scanTask()} instead.
*/
@Deprecated
Iterable<? extends ScanTask> scan();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
try {
final List<ArrowRecordBatch> ret = stream(scanner.scan())
.flatMap(t -> stream(collectTaskData(t)))
.collect(Collectors.toList());
final List<ArrowRecordBatch> ret = collectTaskData(scanner.scanTask());
AutoCloseables.close(scanner, dataset);
return ret;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testBaseParquetRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(2, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
Expand All @@ -112,7 +112,7 @@ public void testParquetProjectSingleColumn() throws Exception {
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
org.apache.avro.Schema expectedSchema = truncateAvroSchema(writeSupport.getAvroSchema(), 0, 1);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(1, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
assertEquals(Types.MinorType.INT.getType(), schema.getFields().get(0).getType());
Expand All @@ -139,7 +139,7 @@ public void testParquetBatchSize() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(3, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
Expand All @@ -163,7 +163,7 @@ public void testParquetDirectoryRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(7, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);
Expand All @@ -182,7 +182,7 @@ public void testEmptyProjectSelectsZeroColumns() throws Exception {
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
org.apache.avro.Schema expectedSchema = org.apache.avro.Schema.createRecord(Collections.emptyList());

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(0, schema.getFields().size());
assertEquals(1, datum.size());
checkParquetReadResult(schema,
Expand All @@ -204,7 +204,7 @@ public void testNullProjectSelectsAllColumns() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(2, schema.getFields().size());
assertEquals("id", schema.getFields().get(0).getName());
Expand Down Expand Up @@ -233,18 +233,16 @@ public void testNoErrorWhenCloseAgain() throws Exception {
}

@Test
public void testErrorThrownWhenScanAgain() throws Exception {
public void testErrorThrownWhenScanTaskAgain() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> taskList1 = collect(scanner.scan());
List<? extends NativeScanTask> taskList2 = collect(scanner.scan());
NativeScanTask task1 = taskList1.get(0);
NativeScanTask task2 = taskList2.get(0);
NativeScanTask task1 = scanner.scanTask();
NativeScanTask task2 = scanner.scanTask();
List<ArrowRecordBatch> datum = collectTaskData(task1);

AutoCloseables.close(datum);
Expand All @@ -253,13 +251,13 @@ public void testErrorThrownWhenScanAgain() throws Exception {
Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead",
uoe.getMessage());

AutoCloseables.close(taskList1);
AutoCloseables.close(taskList2);
AutoCloseables.close(task1);
AutoCloseables.close(task2);
AutoCloseables.close(scanner, dataset, factory);
}

@Test
public void testScanInOtherThread() throws Exception {
public void testScanTaskInOtherThread() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

Expand All @@ -268,12 +266,11 @@ public void testScanInOtherThread() throws Exception {
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> taskList = collect(scanner.scan());
NativeScanTask task = taskList.get(0);
NativeScanTask task = scanner.scanTask();
List<ArrowRecordBatch> datum = executor.submit(() -> collectTaskData(task)).get();

AutoCloseables.close(datum);
AutoCloseables.close(taskList);
AutoCloseables.close(task);
AutoCloseables.close(scanner, dataset, factory);
}

Expand All @@ -292,32 +289,30 @@ public void testErrorThrownWhenScanAfterScannerClose() throws Exception {
}

@Test
public void testErrorThrownWhenExecuteTaskAfterTaskClose() throws Exception {
public void testErrorThrownWhenExecuteScanTaskAfterScanTaskClose() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> tasks = collect(scanner.scan());
NativeScanTask task = tasks.get(0);
NativeScanTask task = scanner.scanTask();
task.close();
assertThrows(NativeInstanceReleasedException.class, task::execute);
AutoCloseables.close(factory);
}

@Test
public void testErrorThrownWhenIterateOnIteratorAfterTaskClose() throws Exception {
public void testErrorThrownWhenIterateOnIteratorAfterScanTaskClose() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");

FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, writeSupport.getOutputURI());
NativeDataset dataset = factory.finish();
ScanOptions options = new ScanOptions(100);
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> tasks = collect(scanner.scan());
NativeScanTask task = tasks.get(0);
NativeScanTask task = scanner.scanTask();
ArrowReader reader = task.execute();
task.close();
assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch);
Expand Down Expand Up @@ -348,7 +343,7 @@ public void testBaseArrowIpcRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(1, schema.getFields().size());
assertEquals("ints", schema.getFields().get(0).getName());
Expand Down Expand Up @@ -376,7 +371,7 @@ public void testBaseOrcRead() throws Exception {
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertSingleTaskProduced(factory, options);
assertScanTaskProduced(factory, options);
assertEquals(1, datum.size());
assertEquals(1, schema.getFields().size());
assertEquals("ints", schema.getFields().get(0).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.junit.Assert;

public abstract class TestNativeDataset extends TestDataset {
protected void assertSingleTaskProduced(DatasetFactory factory, ScanOptions options) {
protected void assertScanTaskProduced(DatasetFactory factory, ScanOptions options) {
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
Assert.assertEquals(1L, stream(scanner.scan()).count());
Assert.assertNotNull(scanner.scanTask());
}
}