Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix StreamConstraintsException introduced in jackson 2.15 #31580

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,36 @@
@Internal
public class RowJsonUtils {

//
private static int defaultBufferLimit;

/**
* Increase the default jackson-databind stream read constraint.
*
* <p>StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0)
* parsing failure. This has caused regressions in its dependencies include Beam. Here we
* overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit.
* If needed, call this method during pipeline run time, e.g. in DoFn.setup.
*/
public static void increaseDefaultStreamReadConstraints(int newLimit) {
Copy link
Contributor Author

@Abacn Abacn Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was considering adding a pipeline option to configure this. However, overrideDefaultStreamReadConstraints needs to be called as early as in static context to make it effective for ObjectMapper used in random places, makes it ineffective in non-static context, and ended up with this solution.

After all this is a "band-aid" fix for a bad upstream breaking change. Feel free to comment if there is a better solution.

if (newLimit <= defaultBufferLimit) return;
try {
Class<?> unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");

com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints(
com.fasterxml.jackson.core.StreamReadConstraints.builder()
.maxStringLength(newLimit)
.build());
} catch (ClassNotFoundException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sure pinning jackson-databind 2.14 would not break pipeline. i.e., In case there are other regression in jackson 2.15, use can still pin 2.14

// <2.15, do nothing
}
defaultBufferLimit = newLimit;
}

static {
increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
}

public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
SimpleModule module = new SimpleModule("rowDeserializationModule");
module.addDeserializer(Row.class, deserializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.util.RowJsonUtils;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format. */
Expand Down Expand Up @@ -69,15 +70,22 @@ public long getEncodedElementByteSize(TableRow value) throws Exception {

// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
private static final ObjectMapper MAPPER;;
private static final TableRowJsonCoder INSTANCE;
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;

private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
static {
RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);

MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new JodaModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
INSTANCE = new TableRowJsonCoder();
TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
}

private TableRowJsonCoder() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -67,6 +68,13 @@ public void testDecodeEncodeEqual() throws Exception {
}
}

@Test
public void testLargeRow() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before:

com.fasterxml.jackson.core.exc.StreamConstraintsException: String length (20054016) exceeds the maximum length (20000000)
	at com.fasterxml.jackson.core.StreamReadConstraints.validateStringLength(StreamReadConstraints.java:324)
	at com.fasterxml.jackson.core.util.ReadConstrainedTextBuffer.validateStringLength(ReadConstrainedTextBuffer.java:27)
	...
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3740)
	at org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder.decode(TableRowJsonCoder.java:59)
	...
	at org.apache.beam.sdk.testing.CoderProperties.coderDecodeEncodeEqual(CoderProperties.java:97)
	at org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoderTest.testLargeRow(TableRowJsonCoderTest.java:75)

after:

testLargeRow 0.615s passed

Also tested with example Pipeline (read TableRow from BigQuery with large row, then ReShuffle it).

String val = StringUtils.repeat("BEAM", 10 * 1024 * 1024); // 40 MB
TableRow testValue = new TableRowBuilder().set("a", val).set("b", "1").build();
CoderProperties.coderDecodeEncodeEqual(TEST_CODER, testValue);
}

/**
* Generated data to check that the wire format has not changed. To regenerate, see {@link
* org.apache.beam.sdk.coders.PrintBase64Encodings}.
Expand Down
Loading