Skip to content

Commit

Permalink
remove useless code and fix failed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyangzhong committed Sep 23, 2024
1 parent b245e1b commit 6874b0d
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,33 +106,6 @@ class JoinITCase(miniBatch: MiniBatchMode, state: StateBackendMode, enableAsyncS
// Tests for inner join.
override def after(): Unit = {}

@TestTemplate
def test(): Unit = {
val tableA = failingDataSource(
Seq[(Int, Long, String)]((1, 1L, "Hi"), (2, 2L, "Hello"), (3, 3L, "Hello world")))
.toTable(tEnv, 'a1, 'a2, 'a3)
val tableB = failingDataSource(
Seq[(Int, Long, String)]((1, 1L, "Hi"), (2, 2L, "Hello"), (3, 3L, "Hello world")))
.toTable(tEnv, 'b1, 'b2, 'b3)
tEnv.createTemporaryView("C", tableA)
tEnv.createTemporaryView("D", tableB)

val sqlQuery = "SELECT * FROM C, D WHERE a1 = b1 and a2 = 1"

val sink = new TestingRetractSink
tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()

val expected = mutable.Seq(
"1,1,Hi,2,2,1,Hallo Welt,2",
"2,2,Hello,4,10,9,FGH,2",
"2,2,Hello,4,7,6,CDE,2",
"2,2,Hello,4,8,7,DEF,1",
"2,2,Hello,4,9,8,EFG,1")

assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testDependentConditionDerivationInnerJoin(): Unit = {
val sqlQuery = "SELECT * FROM A, B WHERE (a2 = 1 and b2 = 2) or (a1 = 2 and b1 = 4)"
Expand Down Expand Up @@ -1711,8 +1684,7 @@ object JoinITCase {
Array(MiniBatchOff, ROCKSDB_BACKEND, Boolean.box(false)),
Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(false)),
Array(MiniBatchOn, ROCKSDB_BACKEND, Boolean.box(false)),
Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true)),
Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(true))
Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1827,8 +1827,7 @@ object JoinITCase {
Array(MiniBatchOff, ROCKSDB_BACKEND, Boolean.box(false)),
Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(false)),
Array(MiniBatchOn, ROCKSDB_BACKEND, Boolean.box(false)),
Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true)),
Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(true))
Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true))
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public StateFuture<List<OuterRecord>> findMatchedRecords(
v -> {
reusedList.clear();
if (v != null) {
reusedList.add(new OuterRecord(v));
if (condition.apply(v)) {
reusedList.add(new OuterRecord(v));
}
}
return reusedList;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public StateFuture<List<OuterRecord>> findMatchedRecordsAndNumOfAssociations(
tuple -> {
reusedList.clear();
if (tuple != null) {
reusedList.add(new OuterRecord(tuple.f0, tuple.f1));
if (condition.apply(tuple.f0)) {
reusedList.add(new OuterRecord(tuple.f0, tuple.f1));
}
}
return StateFutureUtils.completedFuture(reusedList);
});
Expand Down

0 comments on commit 6874b0d

Please sign in to comment.