Skip to content

Commit

Permalink
Split Join gRPC call into one per type of join (deephaven#905)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored and jmao-denver committed Nov 23, 2021
1 parent 32a8da6 commit 6336bf9
Show file tree
Hide file tree
Showing 17 changed files with 2,870 additions and 412 deletions.
3 changes: 2 additions & 1 deletion DB/src/main/java/io/deephaven/db/v2/CrossJoinHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
* Implementation for chunk-oriented aggregation operations, including {@link Table#join}.
*/
public class CrossJoinHelper {
// Note: This should be >= 16 to get efficient performance from Index#insert and Index#shiftInPlace.
// Note: This would be >= 16 to get efficient performance from Index#insert and Index#shiftInPlace. However, it is
// very costly for joins of many small groups for the default to be so high.
public static final int DEFAULT_NUM_RIGHT_BITS_TO_RESERVE = Configuration.getInstance().getIntegerForClassWithDefault(CrossJoinHelper.class, "numRightBitsToReserve", 10);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import dagger.Module;
import dagger.multibindings.IntoMap;
import dagger.multibindings.IntoSet;
import io.deephaven.grpc_api.table.ops.AsOfJoinTablesGrpcImpl;
import io.deephaven.grpc_api.table.ops.ComboAggregateGrpcImpl;
import io.deephaven.grpc_api.table.ops.DropColumnsGrpcImpl;
import io.deephaven.grpc_api.table.ops.EmptyTableGrpcImpl;
Expand Down Expand Up @@ -82,8 +81,17 @@ public interface TableModule {
@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.COMBO_AGGREGATE)
GrpcTableOperation<?> bindOperationComboAgg(ComboAggregateGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.JOIN)
GrpcTableOperation<?> bindOperationJoinTables(JoinTablesGrpcImpl op);
@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.CROSS_JOIN)
GrpcTableOperation<?> bindOperationCrossJoin(JoinTablesGrpcImpl.CrossJoinTablesGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.EXACT_JOIN)
GrpcTableOperation<?> bindOperationExactJoin(JoinTablesGrpcImpl.ExactJoinTablesGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.LEFT_JOIN)
GrpcTableOperation<?> bindOperationLeftJoin(JoinTablesGrpcImpl.LeftJoinTablesGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.NATURAL_JOIN)
GrpcTableOperation<?> bindOperationNaturalJoin(JoinTablesGrpcImpl.NaturalJoinTablesGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.FILTER)
GrpcTableOperation<?> bindOperationFilterTable(FilterTableGrpcImpl op);
Expand All @@ -104,7 +112,7 @@ public interface TableModule {
GrpcTableOperation<?> bindOperationFlatten(FlattenTableGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.AS_OF_JOIN)
GrpcTableOperation<?> bindOperationAsOfJoin(AsOfJoinTablesGrpcImpl op);
GrpcTableOperation<?> bindOperationAsOfJoin(JoinTablesGrpcImpl.AsOfJoinTablesGrpcImpl op);

@Binds @IntoMap @BatchOpCode(BatchTableRequest.Operation.OpCase.RUN_CHART_DOWNSAMPLE)
GrpcTableOperation<?> bindOperationRunChartDownsample(RunChartDownsampleGrpcImpl op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.deephaven.grpc_api.table;

import com.google.flatbuffers.FlatBufferBuilder;
import com.google.protobuf.ByteStringAccess;
import com.google.rpc.Code;
import io.deephaven.grpc_api.barrage.util.BarrageSchemaUtil;
import io.deephaven.grpc_api.util.ExportTicketHelper;
Expand All @@ -18,17 +20,20 @@
import io.deephaven.proto.backplane.grpc.AsOfJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.BatchTableRequest;
import io.deephaven.proto.backplane.grpc.ComboAggregateRequest;
import io.deephaven.proto.backplane.grpc.CrossJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.DropColumnsRequest;
import io.deephaven.proto.backplane.grpc.EmptyTableRequest;
import io.deephaven.proto.backplane.grpc.ExactJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.ExportedTableCreationResponse;
import io.deephaven.proto.backplane.grpc.ExportedTableUpdateMessage;
import io.deephaven.proto.backplane.grpc.ExportedTableUpdatesRequest;
import io.deephaven.proto.backplane.grpc.FilterTableRequest;
import io.deephaven.proto.backplane.grpc.FlattenRequest;
import io.deephaven.proto.backplane.grpc.HeadOrTailByRequest;
import io.deephaven.proto.backplane.grpc.HeadOrTailRequest;
import io.deephaven.proto.backplane.grpc.JoinTablesRequest;
import io.deephaven.proto.backplane.grpc.LeftJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.MergeTablesRequest;
import io.deephaven.proto.backplane.grpc.NaturalJoinTablesRequest;
import io.deephaven.proto.backplane.grpc.RunChartDownsampleRequest;
import io.deephaven.proto.backplane.grpc.SelectDistinctRequest;
import io.deephaven.proto.backplane.grpc.SelectOrUpdateRequest;
Expand Down Expand Up @@ -154,11 +159,6 @@ public void comboAggregate(final ComboAggregateRequest request, final StreamObse
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.COMBO_AGGREGATE, request, responseObserver);
}

@Override
public void joinTables(final JoinTablesRequest request, final StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.JOIN, request, responseObserver);
}

@Override
public void snapshot(final SnapshotTableRequest request, final StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.SNAPSHOT, request, responseObserver);
Expand Down Expand Up @@ -189,6 +189,26 @@ public void flatten(final FlattenRequest request, final StreamObserver<ExportedT
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.FLATTEN, request, responseObserver);
}

@Override
public void crossJoinTables(final CrossJoinTablesRequest request, final StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.CROSS_JOIN, request, responseObserver);
}

@Override
public void naturalJoinTables(final NaturalJoinTablesRequest request, final StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.NATURAL_JOIN, request, responseObserver);
}

@Override
public void exactJoinTables(final ExactJoinTablesRequest request, final StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.EXACT_JOIN, request, responseObserver);
}

@Override
public void leftJoinTables(LeftJoinTablesRequest request, StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.LEFT_JOIN, request, responseObserver);
}

@Override
public void asOfJoinTables(AsOfJoinTablesRequest request, StreamObserver<ExportedTableCreationResponse> responseObserver) {
oneShotOperationWrapper(BatchTableRequest.Operation.OpCase.AS_OF_JOIN, request, responseObserver);
Expand Down

This file was deleted.

Loading

0 comments on commit 6336bf9

Please sign in to comment.