Skip to content

Commit

Permalink
Pass runtime configs & variables to BeamSqlSeekableTable (#28253)
Browse files Browse the repository at this point in the history
* closing #28145

* remove unsupported parameter & add parameter type to context

* remove unnecessary semicolon
  • Loading branch information
gabrywu authored Sep 5, 2023
1 parent f6c6400 commit 3364271
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.Row;

/**
Expand All @@ -27,11 +29,17 @@
*/
public interface BeamSqlSeekableTable extends Serializable {
/** prepare the instance. */
default void setUp() {};
default void setUp() {}

default void startBundle(
DoFn<Row, Row>.StartBundleContext context, PipelineOptions pipelineOptions) {}

default void finishBundle(
DoFn<Row, Row>.FinishBundleContext context, PipelineOptions pipelineOptions) {}

/** return a list of {@code Row} with given key set. */
List<Row> seekRow(Row lookupSubRow);

/** cleanup resources of the instance. */
default void tearDown() {};
default void tearDown() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexFieldAccess;
import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexInputRef;
import org.apache.beam.sdk.extensions.sql.impl.utils.SerializableRexNode;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -155,6 +156,20 @@ public void setup() {
seekableTable.setUp();
}

@StartBundle
public void startBundle(
DoFn<Row, Row>.StartBundleContext context,
PipelineOptions pipelineOptions) {
seekableTable.startBundle(context, pipelineOptions);
}

@FinishBundle
public void finishBundle(
DoFn<Row, Row>.FinishBundleContext context,
PipelineOptions pipelineOptions) {
seekableTable.finishBundle(context, pipelineOptions);
}

@ProcessElement
public void processElement(ProcessContext context) {
Row factRow = context.element();
Expand Down

0 comments on commit 3364271

Please sign in to comment.