Skip to content

Commit

Permalink
Improve the performance of TextSource by reducing how many byte[]s ar…
Browse files Browse the repository at this point in the history
…e copied (fixes apache#23193) (apache#23196)

* Improve the performance of TextSource by reducing how many byte[]s are copied (fixes apache#23193)

This makes TextSource take about 2.3x less CPU resources during decoding.

Before this change:
```
TextSourceBenchmark.benchmarkTextSource        thrpt    5  0.248 ± 0.029  ops/s
```

After this change:
```
TextSourceBenchmark.benchmarkHadoopLineReader  thrpt    5  0.465 ± 0.064  ops/s
TextSourceBenchmark.benchmarkTextSource        thrpt    5  0.575 ± 0.059  ops/s
```

* Write file in pieces instead of pre-allocating entire buffer

* Address PR comments
  • Loading branch information
lukecwik authored Sep 15, 2022
1 parent 94405e6 commit 30a48f0
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)).

## New Features / Improvements

Expand Down
2 changes: 2 additions & 0 deletions sdks/java/core/jmh/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ ext.summary = "This contains JMH benchmarks for the SDK Core for Beam Java"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadowTest")
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_48_1
implementation library.java.vendored_guava_26_0_jre
implementation library.java.hadoop_common
runtimeOnly library.java.slf4j_jdk14
testImplementation library.java.junit
testImplementation library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.jmh.io;

import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.io.FileBasedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.TextIOReadTest;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

public class TextSourceBenchmark {
private static final int NUM_LINES = 10_000_000;
private static char[] data = new char[120];

static {
Arrays.fill(data, 'a');
}

@State(Scope.Benchmark)
public static class Data {
public Path path;
public String pathString;
public int length;

/** Generates a random file with {@code NUM_LINES} between 60 and 120 characters each. */
@Setup
public void createFile() throws Exception {
path = Files.createTempFile("benchmark", null).toAbsolutePath();
pathString = path.toString();
BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8);
for (int i = 0; i < NUM_LINES; ++i) {
String valueToAppend =
String.valueOf(data, 0, ThreadLocalRandom.current().nextInt(60, 120));
length += valueToAppend.length();
writer.write(valueToAppend);
writer.write('\n');
}
writer.close();
}

@TearDown
public void deleteFile() throws Exception {
Files.deleteIfExists(path);
}
}

@Benchmark
public void benchmarkTextSource(Data data) throws Exception {
Source.Reader<String> reader =
((FileBasedSource<String>) TextIOReadTest.getTextSource(data.pathString, null))
.createReader(PipelineOptionsFactory.create());
int length = 0;
int linesRead = 0;
if (reader.start()) {
linesRead += 1;
length += reader.getCurrent().length();
}
while (reader.advance()) {
linesRead += 1;
length += reader.getCurrent().length();
}
if (linesRead != NUM_LINES) {
throw new IllegalStateException();
}
if (length != data.length) {
throw new IllegalStateException();
}
reader.close();
}

@Benchmark
public void benchmarkHadoopLineReader(Data data) throws Exception {
LineReader reader = new LineReader(new FileInputStream(data.pathString));
int length = 0;
int linesRead = 0;
do {
Text text = new Text();
reader.readLine(text);
// It is important to convert toString() here so that we force the decoding to UTF8 otherwise
// Text keeps the encoded byte[] version in memory.
length += text.toString().length();
linesRead += 1;
} while (length < data.length);
if (linesRead != NUM_LINES) {
throw new IllegalStateException();
}
reader.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Benchmarks for IO. */
package org.apache.beam.sdk.jmh.io;
Loading

0 comments on commit 30a48f0

Please sign in to comment.