Skip to content

Commit

Permalink
UpdateBy gRPC (#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith authored Aug 29, 2022
1 parent 1d2721e commit dffa8cc
Show file tree
Hide file tree
Showing 17 changed files with 4,787 additions and 1,883 deletions.
5,259 changes: 3,485 additions & 1,774 deletions go/internal/proto/table/table.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions go/internal/proto/table/table_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package io.deephaven.client;

import io.deephaven.api.TableOperations;
import io.deephaven.api.updateby.UpdateByOperation;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.TableCreatorImpl;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.UpdateByTable;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand Down Expand Up @@ -54,6 +57,28 @@ public void getStream() throws Exception {
}
}

@Test
public void updateBy() throws Exception {
final int size = 100;
final UpdateByTable spec = TableSpec.empty(size)
.view("I=i")
.updateBy(UpdateByOperation.CumSum("I"));
try (
final TableHandle handle = flightSession.session().batch().execute(spec);
final FlightStream stream = flightSession.stream(handle)) {
assertThat(stream.next()).isTrue();
final VectorSchemaRoot root = stream.getRoot();
assertThat(root.getRowCount()).isEqualTo(size);
final BigIntVector longVector = (BigIntVector) root.getVector("I");
long sum = 0;
for (int i = 0; i < size; ++i) {
sum += i;
final long actual = longVector.get(i);
assertThat(actual).isEqualTo(sum);
}
}
}

// TODO (deephaven-core#1373): Hook up doPut integration unit testing
// @Test
// public void doPutStream() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ public TableSpec apply(TableSpec spec, String[] formulas) {
public TableSpec apply(TableSpec spec, String[] formulas) {
return spec.lazyUpdate(formulas);
}
}
},
}

private final UpdateOrSelect method;


public UpdateOrSelectSessionTest(UpdateOrSelect method) {
this.method = Objects.requireNonNull(method);
}
Expand Down Expand Up @@ -98,6 +97,12 @@ public void allowTickingII() throws InterruptedException, TableHandle.TableHandl
allow(TimeTable.of(Duration.ofSeconds(1)).tail(1), "Y = ii");
}

@Test
public void allowSpecificFunctions() throws TableHandle.TableHandleException, InterruptedException {
// This test isn't meant to be exhaustive
allow(TableSpec.empty(1).view("Seconds=(long)1659095381"), "Nanos = secondsToNanos(Seconds)");
}

@Test
public void disallowCustomFunctions() throws InterruptedException {
disallow(TableSpec.empty(1), String.format("X = %s.myFunction()", UpdateOrSelectSessionTest.class.getName()));
Expand All @@ -109,25 +114,41 @@ public void disallowNew() throws InterruptedException {
}

@Test
public void allowPreviousColumn() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X + 1");
public void allowPreviousColumnName() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1).view("X = 12"), "X");
}

@Test
public void allowPreviousColumnAssignment() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1).view("X = 12"), "Y = X");
}

@Test
public void allowPreviousColumnAssignmentInline() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X");
}

@Test
public void disallowFutureColumn() throws InterruptedException {
disallow(TableSpec.empty(1), "Y = X + 1", "X = 12");
disallow(TableSpec.empty(1), "Y = X", "X = 12");
}

@Test
public void allowReassignmentColumn() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X + 1", "X = 42");
allow(TableSpec.empty(1), "X = 12", "Y = X", "X = 42");
}

@Test
public void disallowNonExistentColumn() throws InterruptedException {
disallow(TableSpec.empty(1), "X = 12", "Y = Z + 1");
disallow(TableSpec.empty(1), "Y = Z");
}

@Test
public void allowValue() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12");
}


private void allow(TableSpec parent, String... formulas)
throws InterruptedException, TableHandle.TableHandleException {
try (final TableHandle handle = session.batch().execute(method.apply(parent, formulas))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.backplane.grpc.TimeTableRequest;
import io.deephaven.proto.backplane.grpc.UnstructuredFilterTableRequest;
import io.deephaven.proto.backplane.grpc.UpdateByRequest;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.qst.table.AggregateAllByTable;
import io.deephaven.qst.table.AggregationTable;
Expand Down Expand Up @@ -458,7 +459,11 @@ public void visit(CountByTable countByTable) {

@Override
public void visit(UpdateByTable updateByTable) {
throw new UnsupportedOperationException("TODO(deephaven-core#2607): UpdateByTable gRPC impl");
final UpdateByRequest.Builder request = UpdateByBuilder
.adapt(updateByTable)
.setResultId(ticket)
.setSourceId(ref(updateByTable.parent()));
out = op(Builder::setUpdateBy, request);
}

private SelectOrUpdateRequest selectOrUpdate(SingleParentTable x,
Expand Down
Loading

0 comments on commit dffa8cc

Please sign in to comment.