-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SingleStoreIO #23535
SingleStoreIO #23535
Conversation
5c85399
to
4952064
Compare
Codecov Report
@@ Coverage Diff @@
## master #23535 +/- ##
==========================================
+ Coverage 73.47% 83.18% +9.70%
==========================================
Files 714 473 -241
Lines 96403 66952 -29451
==========================================
- Hits 70828 55691 -15137
+ Misses 24252 11261 -12991
+ Partials 1323 0 -1323
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Assigning reviewers. If you would like to opt out of this review, comment R: @kileys for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @kennknowles |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
R: @johnjcasey |
retest this please |
Run CommunityMetrics PreCommit |
1de0def
to
50b669e
Compare
Run Python PreCommit |
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the core code, Looks really solid so far.
implementation project(path: ":sdks:java:core", configuration: "shadow") | ||
implementation group: 'com.singlestore', name: 'singlestore-jdbc-client', version: '1.1.4' | ||
implementation library.java.slf4j_api | ||
implementation "org.apache.commons:commons-dbcp2:2.8.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these dependencies to BeamModulePlugin.groovy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that your proposition is to add these libs to this map
// A map of maps containing common libraries used per language. To use:
// dependencies {
// compile library.java.slf4j_api
// }
project.ext.library = [
java : [
and use it here?
...io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/DataSourceConfiguration.java
Outdated
Show resolved
Hide resolved
...io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/DataSourceConfiguration.java
Outdated
Show resolved
Hide resolved
...java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitions.java
Outdated
Show resolved
Hide resolved
DataSource dataSource = dataSourceConfiguration.getDataSource(); | ||
Connection conn = dataSource.getConnection(); | ||
try { | ||
for (long partition = tracker.currentRestriction().getFrom(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably not have partition++. In principle, this will always cause an error, because at some point the tryClaim will attempt to claim an invalid partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either because it is outside of the tracker range, or because it is beyond the total partition count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same loop is presented in this blog https://beam.apache.org/blog/splittable-do-fn/
public void process(ProcessContext c, OffsetRangeTracker tracker) {
for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
c.output(KV.of(c.element().getKey(), i));
}
}
I thought that tryClaim
will just return false
when an invalid partition is provided.
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/Read.java
Outdated
Show resolved
Hide resolved
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/Util.java
Outdated
Show resolved
Hide resolved
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/Write.java
Outdated
Show resolved
Hide resolved
sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/Write.java
Outdated
Show resolved
Hide resolved
|
||
final Exception[] writeException = new Exception[1]; | ||
|
||
Thread dataWritingThread = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid creating threads inside of dofns. In Beam, asynchronous and multithreading behavior is best handled by the Beam framework itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm....
I don't know how to implement this asynchronous task using Beam framework.
Here we have two parts - statement execution and writing data to the stream.
They are tightly coupled (writing will get stuck if the statement is not executed and the buffer is full and the statement execution won't finish until writing is finished).
And also these parts share a common PipedOutputStream which is not serializable.
Does the Beam framework have features that will allow something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat unusual for me as well. @lukecwik do we have a pattern for something like this?
This looks good to me outside of the threading, which I'm unsure about. @chamikaramj and @Abacn can you take a second look, as this is an entire IO I'd like more pairs of eyes |
Thanks @AdalbertMemSQL! Will take a look. For now, could you please integrate Read.java and Write.java into SingleStoreIO.java, as the patterns used for other IO connectors (e.g. JdbcIO). users may not able to do something like
since there may be naming conflictions. |
run seed job |
Run SQL PreCommit |
Run Seed Job |
Hi @AdalbertMemSQL would you mind rebase the branch onto latest master and I can run a seedjob (make beam_PerformanceTests_SingleStoreIO and Java_SingleStore_IO_Direct work), thanks also please consider adding an updates to https://github.com/apache/beam/blob/master/CHANGES.md |
Run Java_Spark3_Versions PreCommit |
I thought that |
Run Java SingleStoreIO Performance Test |
Notice that SingleStoreIO Performance Test actually runs on direct runner. Direct runner is only for local debug use. It adds some redundant calculations for validations and is not performant. Could you please consider running it on Dataflow runner (there is an |
Sure :) |
@Abacn Can you now run the perf test? |
Run Seed Job |
Run Java SingleStoreIO Performance Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the performance test, left some other comments here.
import org.slf4j.Logger; | ||
|
||
/** Provides utility functions for working with {@link SingleStoreIO}. */ | ||
public class SingleStoreUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public class SingleStoreUtil { | |
final class SingleStoreUtil { |
We may not expose this as public API (as the practice of other IOs)
CHANGES.md
Outdated
@@ -60,6 +60,7 @@ | |||
|
|||
* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)). | |||
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)). | |||
* Support for SingleStoreDB source added (Java) ([#22617](https://github.com/apache/beam/issues/22617)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean "SingleStoreDB Source and Sink"?
singleStoreUsername : "admin", | ||
singleStorePassword : "secretpass", | ||
singleStorePort: "3306", | ||
numberOfRecords: "100000", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the performance test is passing. The Write then Read steps finishing within seconds. To get a more meaningful metric, ideally the pipeline runs several minutes. How about setting numberOfRecords to 10000000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I meant 10M records (100 times of "100000").
The metrics shown in https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SingleStoreIO/6/console
13:54:32 Load test results for test (ID): 403b18ab-b3dc-475a-b4d9-0a77866159c8 and timestamp: 2022-11-15T18:54:32.477000000Z:
13:54:32 Metric: Value:
13:54:32 write_time 0.628
13:54:32 Load test results for test (ID): 403b18ab-b3dc-475a-b4d9-0a77866159c8 and timestamp: 2022-11-15T18:54:32.477000000Z:
13:54:33 Metric: Value:
13:54:33 read_time 4.433
13:54:33 Load test results for test (ID): 403b18ab-b3dc-475a-b4d9-0a77866159c8 and timestamp: 2022-11-15T18:54:32.477000000Z:
13:54:33 Metric: Value:
13:54:33 read_with_partitions_time 1.87
If performance is linear 1M record still costs seconds to execute write.
SingleStoreDB is pretty performant!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, it is really pretty fast :)
Increased the number of rows to 10M.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah sorry. The test row pre-defined some hash for validation. 10000000 does not work but 5000000 will do:
private static final ImmutableMap<Integer, String> EXPECTED_HASHES =
ImmutableMap.of(
1000, "7d94d63a41164be058a9680002914358",
100_000, "c7cbddb319209e200f1c5eebef8fe960",
600_000, "e2add2f680de9024e9bc46cd3912545e",
5_000_000, "c44f8a5648cd9207c9c6f77395a998dc");
Changed SingleStoreUtil to be final class. Changed CHANGES.md file. Increased number of rows in the integration test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last comment from my side. Thanks!
PipelineResult writeResult = runWrite(); | ||
writeResult.waitUntilFinish(); | ||
PipelineResult readResult = runRead(); | ||
readResult.waitUntilFinish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should assert the pipeline status for write and read is DONE; we may also assert the number of record we read is consistent with the number of record we have written (some basic validation). Looks like currently if the pipeline fails remotely the test will still pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
number of records is checked here
private void testReadResult(PCollection<TestRow> namesAndIds) {
PAssert.thatSingleton(namesAndIds.apply("Count All", Count.globally()))
.isEqualTo((long) numberOfRows);
(lines 248-250)
Will add asserts for pipeline status. Thanks!
Run Java SingleStoreIO Performance Test |
Run Java_Examples_Dataflow_Java17 PreCommit |
Run Java SingleStoreIO Performance Test |
1 similar comment
Run Java SingleStoreIO Performance Test |
Succeeded: https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_SingleStoreIO/9/console metrics:
|
* ); | ||
* }</pre> | ||
*/ | ||
final class SingleStoreIO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops I believe this is a mistake. should be final class SingleStoreUtil
and public class SingleStoreIO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, sorry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM once the last typo gets resolved (see above)
Some followups about tests (in separate PRs)
- Visualize the metrics in Grafana dashboard: http://104.154.241.245/d/bnlHKP3Wz/java-io-it-tests-dataflow?orgId=1
- Setup integration tests with local SingleStoreDB client (either some test client provided by singlestore or container)
Let us pin @lukecwik and @chamikaramj again who got mentioned in this thread and see if they have any inputs. Will be a nice feature for Beam 2.44.0 |
ok let's get this in so tests are continuously exercised |
Summary: Implemented SingleStoreIO according to this design doc: https://docs.google.com/document/d/1WU-hkoZ93SaGXyOz_UtX0jXzIRl194hCId_IdmEV9jw/edit?usp=sharing Changed SingleStoreUtil to be final class. Changed CHANGES.md file. Increased number of rows in the integration test.
Implemented SingleStoreIO according to this design doc:
https://docs.google.com/document/d/1WU-hkoZ93SaGXyOz_UtX0jXzIRl194hCId_IdmEV9jw/edit?usp=sharing
addresses #22617
In future pull requests, it is planned to add the possibility to use Read, ReadWithPartitions, and Write PTransforms without setting RowMapper and UserDataMapper and implement SchemaTransforms for them.