diff --git a/CHANGES.md b/CHANGES.md index 6ed10f6c49de..f873455cd66e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -91,6 +91,7 @@ * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)). +* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java index aed03f33e6d6..b17631a8bd0a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMap.java @@ -137,12 +137,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK keyCoder.encode(key, keyStream, Coder.Context.OUTER); ByteString keyBytes = keyStream.toByteString(); // Leaving data blank means that we delete the tag. - commitBuilder - .addValueUpdatesBuilder() - .setTag(keyBytes) - .setStateFamily(stateFamily) - .getValueBuilder() - .setTimestamp(Long.MAX_VALUE); + commitBuilder.addValueUpdatesBuilder().setTag(keyBytes).setStateFamily(stateFamily); V cachedValue = cachedValues.remove(key); if (cachedValue != null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index d06ed0f526c7..8d2623c382e9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -30,6 +30,8 @@ import java.io.Closeable; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.util.AbstractMap; import java.util.AbstractMap.SimpleEntry; @@ -305,6 +307,26 @@ private K userKeyFromProtoKey(ByteString tag, Coder keyCoder) throws IOEx return keyCoder.decode(keyBytes.newInput(), Context.OUTER); } + private static void assertBuildable( + Windmill.WorkItemCommitRequest.Builder commitWorkRequestBuilder) { + Windmill.WorkItemCommitRequest.Builder clone = commitWorkRequestBuilder.clone(); + if (!clone.hasKey()) { + clone.setKey(ByteString.EMPTY); // key is required to build + } + if (!clone.hasWorkToken()) { + clone.setWorkToken(1357924680L); // workToken is required to build + } + + try { + clone.build(); + } catch (Exception e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + fail( + "Failed to build commitRequest from: " + commitWorkRequestBuilder + "\n" + sw.toString()); + } + } + @Test public void testMapAddBeforeGet() throws Exception { StateTag> addr = @@ -647,6 +669,8 @@ public void testMapAddPersist() throws Exception { .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of())) .collect(Collectors.toList()), Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new SimpleEntry<>(tag2, 2))); + + assertBuildable(commitBuilder); } @Test @@ -670,6 +694,8 @@ public void testMapRemovePersist() throws Exception { .map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of())) .collect(Collectors.toList()), Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new SimpleEntry<>(tag2, null))); + + assertBuildable(commitBuilder); } @Test @@ -695,6 +721,8 @@ public void testMapClearPersist() throws Exception { assertEquals( protoKeyFromUserKey(null, StringUtf8Coder.of()), commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix()); + + assertBuildable(commitBuilder); } @Test @@ -736,6 +764,8 @@ public void testMapComplexPersist() throws Exception { commitBuilder = Windmill.WorkItemCommitRequest.newBuilder(); assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount()); assertEquals(0, commitBuilder.getValueUpdatesCount()); + + assertBuildable(commitBuilder); } @Test @@ -953,6 +983,8 @@ public void testMultimapRemovePersistPut() { multimapState.put(key, 5); assertThat(multimapState.get(key).read(), Matchers.containsInAnyOrder(4, 5)); + + assertBuildable(commitBuilder); } @Test @@ -1766,6 +1798,8 @@ public void testMultimapPutAndPersist() { builder, new MultimapEntryUpdate(key1, Arrays.asList(1, 2), false), new MultimapEntryUpdate(key2, Collections.singletonList(2), false)); + + assertBuildable(commitBuilder); } @Test @@ -1799,6 +1833,8 @@ public void testMultimapRemovePutAndPersist() { builder, new MultimapEntryUpdate(key1, Arrays.asList(1, 2), true), new MultimapEntryUpdate(key2, Collections.singletonList(4), true)); + + assertBuildable(commitBuilder); } @Test @@ -1825,6 +1861,8 @@ public void testMultimapRemoveAndPersist() { builder, new MultimapEntryUpdate(key1, Collections.emptyList(), true), new MultimapEntryUpdate(key2, Collections.emptyList(), true)); + + assertBuildable(commitBuilder); } @Test @@ -1856,6 +1894,8 @@ public void testMultimapPutRemoveClearAndPersist() { Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList()); assertEquals(0, builder.getUpdatesCount()); assertTrue(builder.getDeleteAll()); + + assertBuildable(commitBuilder); } @Test @@ -1894,6 +1934,8 @@ false, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of())) Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList()); assertTagMultimapUpdates( builder, new MultimapEntryUpdate(key1, Collections.singletonList(4), false)); + + assertBuildable(commitBuilder); } @Test @@ -1938,6 +1980,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of())) ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), Context.OUTER); assertArrayEquals(key1, decodedKey); assertTrue(entryUpdate.getDeleteAll()); + + assertBuildable(commitBuilder); } @Test @@ -2053,6 +2097,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of())) Windmill.WorkItemCommitRequest.Builder commitBuilder = Windmill.WorkItemCommitRequest.newBuilder(); underTest.persist(commitBuilder); + + assertBuildable(commitBuilder); } @Test @@ -2253,6 +2299,8 @@ public void testOrderedListAddPersist() throws Exception { assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8()); assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey()); assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId()); + + assertBuildable(commitBuilder); } @Test @@ -2284,6 +2332,8 @@ public void testOrderedListClearPersist() throws Exception { assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId()); assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -2331,6 +2381,8 @@ public void testOrderedListDeleteRangePersist() { assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey()); assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId()); assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId()); + + assertBuildable(commitBuilder); } @Test @@ -2539,6 +2591,8 @@ public void testOrderedListPersistEmpty() throws Exception { assertEquals(1, updates.getDeletesCount()); assertEquals(WindmillOrderedList.MIN_TS_MICROS, updates.getDeletes(0).getRange().getStart()); assertEquals(WindmillOrderedList.MAX_TS_MICROS, updates.getDeletes(0).getRange().getLimit()); + + assertBuildable(commitBuilder); } @Test @@ -2653,6 +2707,8 @@ public void testBagAddPersist() throws Exception { assertEquals("hello", bagUpdates.getValues(0).toStringUtf8()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -2678,6 +2734,8 @@ public void testBagClearPersist() throws Exception { assertEquals("world", tagBag.getValues(0).toStringUtf8()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -2693,6 +2751,8 @@ public void testBagPersistEmpty() throws Exception { // 1 bag update = the clear assertEquals(1, commitBuilder.getBagUpdatesCount()); + + assertBuildable(commitBuilder); } @Test @@ -2806,6 +2866,8 @@ public void testCombiningAddPersist() throws Exception { 11, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -2835,6 +2897,8 @@ public void testCombiningAddPersistWithCompact() throws Exception { assertTrue(bagUpdates.getDeleteAll()); assertEquals( 111, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]); + + assertBuildable(commitBuilder); } @Test @@ -2862,6 +2926,8 @@ public void testCombiningClearPersist() throws Exception { 11, CoderUtils.decodeFromByteArray(accumCoder, tagBag.getValues(0).toByteArray())[0]); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -2990,6 +3056,8 @@ public void testWatermarkPersistEarliest() throws Exception { assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3016,6 +3084,8 @@ public void testWatermarkPersistLatestEmpty() throws Exception { Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3042,6 +3112,8 @@ public void testWatermarkPersistLatestWindmillWins() throws Exception { Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3068,6 +3140,8 @@ public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception { Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3091,6 +3165,8 @@ public void testWatermarkPersistEndOfWindow() throws Exception { // Blind adds should not need to read the future. Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3116,6 +3192,8 @@ public void testWatermarkClearPersist() throws Exception { assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0)); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3133,6 +3211,8 @@ public void testWatermarkPersistEmpty() throws Exception { // 1 bag update corresponds to deletion. There shouldn't be a bag update adding items. assertEquals(1, commitBuilder.getWatermarkHoldsCount()); + + assertBuildable(commitBuilder); } @Test @@ -3200,6 +3280,8 @@ public void testValueSetPersist() throws Exception { assertTrue(valueUpdate.isInitialized()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3220,6 +3302,8 @@ public void testValueClearPersist() throws Exception { assertEquals(0, valueUpdate.getValue().getData().size()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test @@ -3234,6 +3318,8 @@ public void testValueNoChangePersist() throws Exception { assertEquals(0, commitBuilder.getValueUpdatesCount()); Mockito.verifyNoMoreInteractions(mockReader); + + assertBuildable(commitBuilder); } @Test