Skip to content

Commit

Permalink
add setJoinSubsetType to inject joinSubsetType to BeamSqlSeekableTable
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrywu committed Sep 15, 2023
1 parent 5df59a9 commit e7ceff4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.Row;

Expand All @@ -28,6 +29,7 @@
* FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}.
*/
public interface BeamSqlSeekableTable extends Serializable {
default void setJoinSubsetType(Schema joinSubsetType) {}
/** prepare the instance. */
default void setUp() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public JoinAsLookup(
this.outputSchema = outputSchema;
this.factColOffset = factColOffset;
joinFieldsMapping(joinCondition, factColOffset, lkpColOffset);
this.seekableTable.setJoinSubsetType(joinSubsetType);
}

private void joinFieldsMapping(RexNode joinCondition, int factColOffset, int lkpColOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -48,10 +49,17 @@ public class BeamSideInputLookupJoinRelTest extends BaseRelTest {
/** Test table for JOIN-AS-LOOKUP. */
public static class SiteLookupTable extends SchemaBaseBeamTable implements BeamSqlSeekableTable {

private Schema joinSubsetType;

public SiteLookupTable(Schema schema) {
super(schema);
}

@Override
public void setJoinSubsetType(Schema joinSubsetType) {
this.joinSubsetType = joinSubsetType;
}

@Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
Expand All @@ -69,6 +77,7 @@ public POutput buildIOWriter(PCollection<Row> input) {

@Override
public List<Row> seekRow(Row lookupSubRow) {
Assert.assertNotNull(joinSubsetType);
if (lookupSubRow.getInt32("site_id") == 2) {
return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build());
}
Expand Down

0 comments on commit e7ceff4

Please sign in to comment.