Skip to content

Commit

Permalink
Fix GroupTest
Browse files Browse the repository at this point in the history
  • Loading branch information
TheNeuralBit committed Apr 27, 2020
1 parent d440058 commit 633710a
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sample;
Expand Down Expand Up @@ -390,7 +391,7 @@ public void testGloballyWithSchemaAggregateFn() {
@Test
@Category(NeedsRunner.class)
public void testSchemaAggregateFnByBaseValue() {
Schema inputSchema = Schema.of(Field.of("f_nanos", FieldType.logicalType(MillisInstant.of())));
Schema inputSchema = Schema.of(Field.of("f_millis", FieldType.logicalType(MillisInstant.of())));
Collection<Row> elements =
ImmutableList.of(
Row.withSchema(inputSchema).addValue(Instant.ofEpochMilli(2L)).build(),
Expand All @@ -403,10 +404,12 @@ public void testSchemaAggregateFnByBaseValue() {
.setRowSchema(inputSchema)
.apply(
Group.<Row>globally()
.aggregateFieldBaseValue("f_millis", Min.ofLongs(), "f_millis"));
.aggregateFieldBaseValue("max_millis", Max.ofLongs(), "f_millis")
.aggregateFieldBaseValue("min_millis", Min.ofLongs(), "f_millis"));

Schema aggregateSchema = Schema.builder().addInt64Field("f_millis").build();
Row expectedRow = Row.withSchema(aggregateSchema).addValues(1L).build();
Schema aggregateSchema =
Schema.builder().addInt64Field("max_millis").addInt64Field("min_millis").build();
Row expectedRow = Row.withSchema(aggregateSchema).addValues(1L, 3L).build();

PAssert.that(aggregate).containsInAnyOrder(expectedRow);

Expand Down

0 comments on commit 633710a

Please sign in to comment.