Skip to content

Commit

Permalink
replace explode with explode_outer (#404)
Browse files Browse the repository at this point in the history
* replace explode with explode_outer

* rename test and add comments
  • Loading branch information
elliVM authored Nov 5, 2024
1 parent 2b79853 commit 17f2ce6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private Dataset<Row> updateBloomFilter(Dataset<Row> dataset) {

private Dataset<Row> estimateSize(Dataset<Row> dataset) {
return dataset
.select(functions.col("partition"), functions.explode(functions.col(inputCol)).as("token"))
.select(functions.col("partition"), functions.explode_outer(functions.col(inputCol)).as("token"))
.groupBy("partition")
.agg(functions.approxCountDistinct("token").as(outputCol));
}
Expand Down
28 changes: 28 additions & 0 deletions src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,34 @@ public void estimateTest() {
);
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void testEstimateOnEmptyArray() {
streamingTestUtil
.performDPLTest(
// index_Empty _raw = "" so tokenizer step will produce an empty array
"index=index_Empty earliest=2020-01-01T00:00:00z latest=2023-01-01T00:00:00z | teragrep exec tokenizer | teragrep exec bloom estimate",
testFile, ds -> {
Assertions
.assertEquals("[partition, estimate(tokens)]", Arrays.toString(ds.columns()), "Batch handler dataset contained an unexpected column arrangement !");
List<Integer> results = ds
.select("estimate(tokens)")
.collectAsList()
.stream()
.map(r -> Integer.parseInt(r.get(0).toString()))
.collect(Collectors.toList());

// assert that a row is produced and not an empty dataframe
Assertions.assertEquals(1, results.size());
// assert that estimate is 0 and not empty or null
Assertions.assertEquals(0, results.get(0));
}
);
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"_time": "2022-09-06 09:00:00.000", "id": 1, "_raw": "one", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "1", "offset": 1}
{"_time": "2022-09-06 09:00:00.000", "id": 2, "_raw": "one.two", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "2", "offset": 1}
{"_time": "2022-09-06 09:00:00.000", "id": 2, "_raw": "one.two", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "2", "offset": 1}
{"_time": "2022-09-06 09:00:00.000", "id": 3, "_raw": "", "index": "index_Empty", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "3", "offset": 1}

0 comments on commit 17f2ce6

Please sign in to comment.