Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UpdateBy gRPC #2635

Merged
merged 5 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,259 changes: 3,485 additions & 1,774 deletions go/internal/proto/table/table.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions go/internal/proto/table/table_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package io.deephaven.client;

import io.deephaven.api.TableOperations;
import io.deephaven.api.updateby.UpdateByOperation;
import io.deephaven.client.impl.TableHandle;
import io.deephaven.client.impl.TableHandle.TableHandleException;
import io.deephaven.qst.TableCreator;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.qst.table.NewTable;
import io.deephaven.qst.table.TableCreatorImpl;
import io.deephaven.qst.table.TableSpec;
import io.deephaven.qst.table.UpdateByTable;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
Expand Down Expand Up @@ -53,6 +56,28 @@ public void getStream() throws Exception {
}
}

@Test
public void updateBy() throws Exception {
final int size = 100;
final UpdateByTable spec = TableSpec.empty(size)
.view("I=i")
.updateBy(UpdateByOperation.CumSum("I"));
try (
final TableHandle handle = flightSession.session().batch().execute(spec);
final FlightStream stream = flightSession.stream(handle)) {
assertThat(stream.next()).isTrue();
final VectorSchemaRoot root = stream.getRoot();
assertThat(root.getRowCount()).isEqualTo(size);
final BigIntVector longVector = (BigIntVector) root.getVector("I");
long sum = 0;
for (int i = 0; i < size; ++i) {
sum += i;
final long actual = longVector.get(i);
assertThat(actual).isEqualTo(sum);
}
}
}

// TODO (deephaven-core#1373): Hook up doPut integration unit testing
// @Test
// public void doPutStream() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ public TableSpec apply(TableSpec spec, String[] formulas) {
public TableSpec apply(TableSpec spec, String[] formulas) {
return spec.lazyUpdate(formulas);
}
}
},
}

private final UpdateOrSelect method;


public UpdateOrSelectSessionTest(UpdateOrSelect method) {
this.method = Objects.requireNonNull(method);
}
Expand Down Expand Up @@ -98,6 +97,12 @@ public void allowTickingII() throws InterruptedException, TableHandle.TableHandl
allow(TimeTable.of(Duration.ofSeconds(1)).tail(1), "Y = ii");
}

@Test
public void allowSpecificFunctions() throws TableHandle.TableHandleException, InterruptedException {
// This test isn't meant to be exhaustive
allow(TableSpec.empty(1).view("Seconds=(long)1659095381"), "Nanos = secondsToNanos(Seconds)");
}

@Test
public void disallowCustomFunctions() throws InterruptedException {
disallow(TableSpec.empty(1), String.format("X = %s.myFunction()", UpdateOrSelectSessionTest.class.getName()));
Expand All @@ -109,25 +114,41 @@ public void disallowNew() throws InterruptedException {
}

@Test
public void allowPreviousColumn() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X + 1");
public void allowPreviousColumnName() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1).view("X = 12"), "X");
}

@Test
public void allowPreviousColumnAssignment() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1).view("X = 12"), "Y = X");
}

@Test
public void allowPreviousColumnAssignmentInline() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X");
}

@Test
public void disallowFutureColumn() throws InterruptedException {
disallow(TableSpec.empty(1), "Y = X + 1", "X = 12");
disallow(TableSpec.empty(1), "Y = X", "X = 12");
}

@Test
public void allowReassignmentColumn() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12", "Y = X + 1", "X = 42");
allow(TableSpec.empty(1), "X = 12", "Y = X", "X = 42");
}

@Test
public void disallowNonExistentColumn() throws InterruptedException {
disallow(TableSpec.empty(1), "X = 12", "Y = Z + 1");
disallow(TableSpec.empty(1), "Y = Z");
}

@Test
public void allowValue() throws TableHandle.TableHandleException, InterruptedException {
allow(TableSpec.empty(1), "X = 12");
}


private void allow(TableSpec parent, String... formulas)
throws InterruptedException, TableHandle.TableHandleException {
try (final TableHandle handle = session.batch().execute(method.apply(parent, formulas))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.backplane.grpc.TimeTableRequest;
import io.deephaven.proto.backplane.grpc.UnstructuredFilterTableRequest;
import io.deephaven.proto.backplane.grpc.UpdateByRequest;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.qst.table.AggregateAllByTable;
import io.deephaven.qst.table.AggregationTable;
Expand Down Expand Up @@ -458,7 +459,11 @@ public void visit(CountByTable countByTable) {

@Override
public void visit(UpdateByTable updateByTable) {
throw new UnsupportedOperationException("TODO(deephaven-core#2607): UpdateByTable gRPC impl");
final UpdateByRequest.Builder request = UpdateByBuilder
.adapt(updateByTable)
.setResultId(ticket)
.setSourceId(ref(updateByTable.parent()));
out = op(Builder::setUpdateBy, request);
}

private SelectOrUpdateRequest selectOrUpdate(SingleParentTable x,
Expand Down
Loading