Skip to content

Commit

Permalink
[YAML] Clean up some confusing error messages. (apache#29481)
Browse files Browse the repository at this point in the history
* Provide sane defaults rather than NullPtrExceptions for optional BigQuery parameters.
* Better error when cross-language is used with (incompatible) local streaming Python runner.
* Add format and schema to readme PubSub examples.
  • Loading branch information
robertwb authored Dec 5, 2023
1 parent 665ede8 commit e16d401
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,21 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency = configuration.getTriggeringFrequencySeconds();
Boolean autoSharding = configuration.getAutoSharding();
Integer numStreams = configuration.getNumStreams();
int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams();
boolean useAtLeastOnceSemantics =
configuration.getUseAtLeastOnceSemantics() == null
? false
: configuration.getUseAtLeastOnceSemantics();
// Triggering frequency is only applicable for exactly-once
if (configuration.getUseAtLeastOnceSemantics() == null
|| !configuration.getUseAtLeastOnceSemantics()) {
if (!useAtLeastOnceSemantics) {
write =
write.withTriggeringFrequency(
(triggeringFrequency == null || triggeringFrequency <= 0)
? DEFAULT_TRIGGERING_FREQUENCY
: Duration.standardSeconds(triggeringFrequency));
}
// set num streams if specified, otherwise default to autoSharding
if (numStreams != null && numStreams > 0) {
if (numStreams > 0) {
write = write.withNumStorageWriteApiStreams(numStreams);
} else if (autoSharding == null || autoSharding) {
write = write.withAutoSharding();
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,17 @@ def run_pipeline(self, pipeline, options):
from apache_beam.runners.direct.transform_evaluator import \
TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform

class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines.")

pipeline.visit(VerifyNoCrossLanguageTransforms())

# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
Expand Down
28 changes: 27 additions & 1 deletion sdks/python/apache_beam/yaml/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ pipeline:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: json
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
Expand All @@ -282,6 +289,7 @@ pipeline:
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```

Rather than using an explicit `WindowInto` operation, one may instead tag a
Expand All @@ -295,6 +303,8 @@ pipeline:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeAggregation
windowing:
type: sliding
Expand All @@ -303,6 +313,7 @@ pipeline:
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```

Note that the `Sql` operation itself is often a from of aggregation, and
Expand All @@ -316,6 +327,8 @@ pipeline:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
Expand All @@ -325,6 +338,7 @@ pipeline:
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```

The specified windowing is applied to all inputs, in this case resulting in
Expand All @@ -337,11 +351,15 @@ pipeline:
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
Expand All @@ -364,7 +382,9 @@ pipeline:
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60
Expand All @@ -374,6 +394,7 @@ pipeline:
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```

One can also specify windowing at the top level of a pipeline (or composite),
Expand All @@ -388,12 +409,15 @@ pipeline:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
windowing:
type: fixed
size: 60
Expand All @@ -410,6 +434,8 @@ pipeline:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def guess_name_and_type(expr):


class FakeReadFromPubSub(beam.PTransform):
def __init__(self, topic):
def __init__(self, topic, format, schema):
pass

def expand(self, p):
Expand All @@ -112,7 +112,7 @@ def expand(self, p):


class FakeWriteToPubSub(beam.PTransform):
def __init__(self, topic):
def __init__(self, topic, format):
pass

def expand(self, pcoll):
Expand Down

0 comments on commit e16d401

Please sign in to comment.