Skip to content

Commit

Permalink
[Flink] fastercopy everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Aug 20, 2024
1 parent 4cbd239 commit 7e7b300
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.types;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
Expand All @@ -26,6 +27,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
Expand Down Expand Up @@ -109,14 +111,26 @@ public int getLength() {
@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
coder.encode(t, outputWrapper);
if(fasterCopy) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
coder.encode(t, bos);
VarInt.encode((int) bos.size(), outputWrapper);
bos.writeTo(outputWrapper);
} else {
coder.encode(t, outputWrapper);
}
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
try {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
return coder.decode(inputWrapper);
if(fasterCopy) {
VarInt.decodeInt(inputWrapper); // just advance the stream the the actual encoded value
return coder.decode(inputWrapper);
} else {
return coder.decode(inputWrapper);
}
} catch (CoderException e) {
Throwable cause = e.getCause();
if (cause instanceof EOFException) {
Expand All @@ -134,7 +148,15 @@ public T deserialize(T t, DataInputView dataInputView) throws IOException {

@Override
public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
serialize(deserialize(dataInputView), dataOutputView);
if(fasterCopy) {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
int size = VarInt.decodeInt(inputWrapper);
VarInt.encode(size, outputWrapper);
dataOutputView.write(dataInputView, size);
} else {
serialize(deserialize(dataInputView), dataOutputView);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.types;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
Expand All @@ -26,6 +27,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
Expand Down Expand Up @@ -106,14 +108,26 @@ public int getLength() {
@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
coder.encode(t, outputWrapper);
if(fasterCopy) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
coder.encode(t, bos);
VarInt.encode((int) bos.size(), outputWrapper);
bos.writeTo(outputWrapper);
} else {
coder.encode(t, outputWrapper);
}
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
try {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
return coder.decode(inputWrapper);
if(fasterCopy) {
VarInt.decodeInt(inputWrapper); // just advance the stream the the actual encoded value
return coder.decode(inputWrapper);
} else {
return coder.decode(inputWrapper);
}
} catch (CoderException e) {
Throwable cause = e.getCause();
if (cause instanceof EOFException) {
Expand All @@ -131,7 +145,15 @@ public T deserialize(T t, DataInputView dataInputView) throws IOException {

@Override
public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
serialize(deserialize(dataInputView), dataOutputView);
if(fasterCopy) {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
int size = VarInt.decodeInt(inputWrapper);
VarInt.encode(size, outputWrapper);
dataOutputView.write(dataInputView, size);
} else {
serialize(deserialize(dataInputView), dataOutputView);
}
}

@Override
Expand Down

0 comments on commit 7e7b300

Please sign in to comment.