Skip to content

Commit

Permalink
ARROW-4740: [Java] [flight] Update to Junit 5
Browse files Browse the repository at this point in the history
  • Loading branch information
andreoss committed Sep 13, 2022
1 parent 4ae26d1 commit 41587a7
Show file tree
Hide file tree
Showing 28 changed files with 733 additions and 638 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.Random;
import java.util.function.Function;

import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;

Expand Down Expand Up @@ -130,7 +130,7 @@ static boolean isNativeTransportAvailable() {
*/
public static CallStatus assertCode(FlightStatusCode code, Executable r) {
final FlightRuntimeException ex = Assertions.assertThrows(FlightRuntimeException.class, r);
Assert.assertEquals(code, ex.status().code());
Assertions.assertEquals(code, ex.status().code());
return ex.status();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Tests for application-specific metadata support in Flight.
Expand All @@ -51,16 +51,16 @@ public class TestApplicationMetadata {
*/
@Test
// This test is consistently flaky on CI, unfortunately.
@Ignore
@Disabled
public void retrieveMetadata() {
test((allocator, client) -> {
try (final FlightStream stream = client.getStream(new Ticket(new byte[0]))) {
byte i = 0;
while (stream.next()) {
final IntVector vector = (IntVector) stream.getRoot().getVector("a");
Assert.assertEquals(1, vector.getValueCount());
Assert.assertEquals(10, vector.get(0));
Assert.assertEquals(i, stream.getLatestMetadata().getByte(0));
Assertions.assertEquals(1, vector.getValueCount());
Assertions.assertEquals(10, vector.get(0));
Assertions.assertEquals(i, stream.getLatestMetadata().getByte(0));
i++;
}
} catch (Exception e) {
Expand All @@ -81,7 +81,7 @@ public void arrow6136() {
final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener);
// Must attempt to retrieve the result to get any server-side errors.
final CallStatus status = FlightTestUtil.assertCode(FlightStatusCode.INTERNAL, writer::getResult);
Assert.assertEquals(MESSAGE_ARROW_6136, status.description());
Assertions.assertEquals(MESSAGE_ARROW_6136, status.description());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -92,7 +92,7 @@ public void arrow6136() {
* Ensure that a client can send metadata to the server.
*/
@Test
@Ignore
@Disabled
public void uploadMetadataAsync() {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
test((allocator, client) -> {
Expand All @@ -104,8 +104,8 @@ public void uploadMetadataAsync() {

@Override
public void onNext(PutResult val) {
Assert.assertNotNull(val);
Assert.assertEquals(counter, val.getApplicationMetadata().getByte(0));
Assertions.assertNotNull(val);
Assertions.assertEquals(counter, val.getApplicationMetadata().getByte(0));
counter++;
}
};
Expand Down Expand Up @@ -134,7 +134,7 @@ public void onNext(PutResult val) {
* Ensure that a client can send metadata to the server. Uses the synchronous API.
*/
@Test
@Ignore
@Disabled
public void uploadMetadataSync() {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
test((allocator, client) -> {
Expand All @@ -153,8 +153,8 @@ public void uploadMetadataSync() {
root.setRowCount(1);
writer.putNext(metadata);
try (final PutResult message = listener.poll(5000, TimeUnit.SECONDS)) {
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getApplicationMetadata().getByte(0));
Assertions.assertNotNull(message);
Assertions.assertEquals(i, message.getApplicationMetadata().getByte(0));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Expand All @@ -170,7 +170,7 @@ public void uploadMetadataSync() {
* Make sure that a {@link SyncPutListener} properly reclaims memory if ignored.
*/
@Test
@Ignore
@Disabled
public void syncMemoryReclaimed() {
final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
test((allocator, client) -> {
Expand Down Expand Up @@ -216,10 +216,10 @@ public void testMetadataEndianness() throws Exception {
final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, reader);
writer.completed();
try (final PutResult metadata = reader.read()) {
Assert.assertEquals(16, metadata.getApplicationMetadata().readableBytes());
Assertions.assertEquals(16, metadata.getApplicationMetadata().readableBytes());
byte[] bytes = new byte[16];
metadata.getApplicationMetadata().readBytes(bytes);
Assert.assertArrayEquals(EndianFlightProducer.EXPECTED_BYTES, bytes);
Assertions.assertArrayEquals(EndianFlightProducer.EXPECTED_BYTES, bytes);
}
writer.getResult();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,61 @@
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestAuth {

/** An auth handler that does not send messages should not block the server forever. */
@Test(expected = RuntimeException.class)
@Test
public void noMessages() throws Exception {
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final FlightServer s = FlightTestUtil
.getStartedServer(
location -> FlightServer.builder(allocator, location, new NoOpFlightProducer()).authHandler(
new OneshotAuthHandler()).build());
final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
client.authenticate(new ClientAuthHandler() {
@Override
public void authenticate(ClientAuthSender outgoing, Iterator<byte[]> incoming) {
}
Assertions.assertThrows(RuntimeException.class, () -> {
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final FlightServer s = FlightTestUtil
.getStartedServer(
location -> FlightServer.builder(allocator, location, new NoOpFlightProducer()).authHandler(
new OneshotAuthHandler()).build());
final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
client.authenticate(new ClientAuthHandler() {
@Override
public void authenticate(ClientAuthSender outgoing, Iterator<byte[]> incoming) {
}

@Override
public byte[] getCallToken() {
return new byte[0];
}
});
}
@Override
public byte[] getCallToken() {
return new byte[0];
}
});
}
});
}

/** An auth handler that sends an error should not block the server forever. */
@Test(expected = RuntimeException.class)
@Test
public void clientError() throws Exception {
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final FlightServer s = FlightTestUtil
.getStartedServer(
location -> FlightServer.builder(allocator, location, new NoOpFlightProducer()).authHandler(
new OneshotAuthHandler()).build());
final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
client.authenticate(new ClientAuthHandler() {
@Override
public void authenticate(ClientAuthSender outgoing, Iterator<byte[]> incoming) {
outgoing.send(new byte[0]);
// Ensure the server-side runs
incoming.next();
outgoing.onError(new RuntimeException("test"));
}
Assertions.assertThrows(RuntimeException.class, () -> {
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
final FlightServer s = FlightTestUtil
.getStartedServer(
location -> FlightServer.builder(allocator, location, new NoOpFlightProducer()).authHandler(
new OneshotAuthHandler()).build());
final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
client.authenticate(new ClientAuthHandler() {
@Override
public void authenticate(ClientAuthSender outgoing, Iterator<byte[]> incoming) {
outgoing.send(new byte[0]);
// Ensure the server-side runs
incoming.next();
outgoing.onError(new RuntimeException("test"));
}

@Override
public byte[] getCallToken() {
return new byte[0];
}
});
}
@Override
public byte[] getCallToken() {
return new byte[0];
}
});
}
});
}

private static class OneshotAuthHandler implements ServerAuthHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.google.common.collect.ImmutableList;

Expand All @@ -43,7 +43,7 @@ public class TestBackPressure {
/**
* Make sure that failing to consume one stream doesn't block other streams.
*/
@Ignore
@Disabled
@Test
public void ensureIndependentSteams() throws Exception {
ensureIndependentSteams((b) -> (location -> new PerformanceTestServer(b, location)));
Expand All @@ -52,7 +52,7 @@ public void ensureIndependentSteams() throws Exception {
/**
* Make sure that failing to consume one stream doesn't block other streams.
*/
@Ignore
@Disabled
@Test
public void ensureIndependentSteamsWithCallbacks() throws Exception {
ensureIndependentSteams((b) -> (location -> new PerformanceTestServer(b, location,
Expand All @@ -62,7 +62,7 @@ public void ensureIndependentSteamsWithCallbacks() throws Exception {
/**
* Test to make sure stream doesn't go faster than the consumer is consuming.
*/
@Ignore
@Disabled
@Test
public void ensureWaitUntilProceed() throws Exception {
ensureWaitUntilProceed(new PollingBackpressureStrategy(), false);
Expand All @@ -72,7 +72,7 @@ public void ensureWaitUntilProceed() throws Exception {
* Test to make sure stream doesn't go faster than the consumer is consuming using a callback-based
* backpressure strategy.
*/
@Ignore
@Disabled
@Test
public void ensureWaitUntilProceedWithCallbacks() throws Exception {
ensureWaitUntilProceed(new RecordingCallbackBackpressureStrategy(), true);
Expand Down Expand Up @@ -177,9 +177,14 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
root.clear();
}
long expected = wait - epsilon;
Assert.assertTrue(
String.format("Expected a sleep of at least %dms but only slept for %d", expected,
bpStrategy.getSleepTime()), bpStrategy.getSleepTime() > expected);
Assertions.assertTrue(
bpStrategy.getSleepTime() > expected,
String.format(
"Expected a sleep of at least %dms but only slept for %d",
expected,
bpStrategy.getSleepTime()
)
);

}
}
Expand Down
Loading

0 comments on commit 41587a7

Please sign in to comment.