Skip to content

Commit

Permalink
1. delete setJoinSubsetType
Browse files Browse the repository at this point in the history
2. move joinSubsetType to setUp method
  • Loading branch information
gabrywu committed Sep 18, 2023
1 parent e7ceff4 commit c08b522
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
* FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}.
*/
public interface BeamSqlSeekableTable extends Serializable {
default void setJoinSubsetType(Schema joinSubsetType) {}
/** prepare the instance. */
default void setUp() {}
/**
* prepare the instance
*
* @param joinSubsetType joining subset schema
*/
default void setUp(Schema joinSubsetType) {}

default void startBundle(
DoFn<Row, Row>.StartBundleContext context, PipelineOptions pipelineOptions) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public JoinAsLookup(
this.outputSchema = outputSchema;
this.factColOffset = factColOffset;
joinFieldsMapping(joinCondition, factColOffset, lkpColOffset);
this.seekableTable.setJoinSubsetType(joinSubsetType);
}

private void joinFieldsMapping(RexNode joinCondition, int factColOffset, int lkpColOffset) {
Expand Down Expand Up @@ -154,7 +153,7 @@ public PCollection<Row> expand(PCollection<Row> input) {
new DoFn<Row, Row>() {
@Setup
public void setup() {
seekableTable.setUp();
seekableTable.setUp(joinSubsetType);
}

@StartBundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ public class BeamSideInputLookupJoinRelTest extends BaseRelTest {

/** Test table for JOIN-AS-LOOKUP. */
public static class SiteLookupTable extends SchemaBaseBeamTable implements BeamSqlSeekableTable {

private Schema joinSubsetType;

public SiteLookupTable(Schema schema) {
super(schema);
}

@Override
public void setJoinSubsetType(Schema joinSubsetType) {
public void setUp(Schema joinSubsetType) {
this.joinSubsetType = joinSubsetType;
Assert.assertNotNull(joinSubsetType);
}

@Override
Expand All @@ -77,7 +77,7 @@ public POutput buildIOWriter(PCollection<Row> input) {

@Override
public List<Row> seekRow(Row lookupSubRow) {
Assert.assertNotNull(joinSubsetType);
Assert.assertEquals(joinSubsetType, lookupSubRow.getSchema());
if (lookupSubRow.getInt32("site_id") == 2) {
return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build());
}
Expand Down

0 comments on commit c08b522

Please sign in to comment.