Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10137] Add KinesisIO for cross-language usage with python wrapper #12297

Merged
merged 19 commits into from
Sep 2, 2020

Conversation

pjotrekk
Copy link

@pjotrekk pjotrekk commented Jul 17, 2020

This transform sends bytes stream instead of the whole KinesisRecord


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Dataflow Flink Samza Spark Twister2
Go Build Status --- Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch 4 times, most recently from ab6da5a to b0afbe6 Compare July 22, 2020 12:09
@pjotrekk
Copy link
Author

R: @TheNeuralBit Could I ask you for review?

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for doing this out of order! I meant to look at Write but I got halfway through reviewing the Read code before realizing my mistake.

I have a few initial comments

@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch 3 times, most recently from 2c1c700 to 613cf8a Compare July 23, 2020 11:02
@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch 3 times, most recently from 4786d19 to 24e2a7f Compare July 27, 2020 17:32
@pjotrekk
Copy link
Author

pjotrekk commented Jul 27, 2020

@TheNeuralBit I've moved all commits to this PR. I have a feeling that big PR is better than few smaller PRs dependent of each other - they are much less maintainable. I'm used to Gerrit where each dependent commit is a PR rebased on the previous commit.

I tried to use testcontainers with localstack but I encountered a problem that I can create, read and write streams via boto3, but I'm unable to use it with Beam. What I found is that kinesis java sdk by default uses CBOR which can be disabled by executing java program with kinesis sdk with executing it with -Dcom.amazonaws.sdk.disableCertChecking=true. But I don't have an idea how to do that in beam's execution model - do you have any idea or suggestion how could I do that?

In general the tests before introducing localstack work correctly. Commits after the localstack introduction don't work, I just put vanilla kinesis test to indicate that the container seems to work properly.

Edit: write test is now working after adding disabling certificate verification option.

@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch 3 times, most recently from bc57f6d to ee2e78b Compare July 28, 2020 13:22
@pjotrekk
Copy link
Author

pjotrekk commented Jul 29, 2020

@TheNeuralBit I did my best to make read test work but without success. I tried what people suggested here localstack/localstack#507 and https://stackoverflow.com/a/56867870/7946496 but nothing worked.

When I use http in service_endpoint, which works for boto3 python sdk I get error:
com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: null; Request ID: null)

When I use https in the transform argument service_endpoint I get
sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
I tried to run the expansion service jars with these flags: -Dcom.sun.net.ssl.checkRevocation=false, -Dtrust_all_cert=true -Dcom.amazonaws.sdk.disableCertChecking as was mentioned here: https://stackoverflow.com/q/58070809/7946496

Setting these values in the main method of expansion service also had no effect.

    System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
    System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
    System.setProperty("com.sun.net.ssl.checkRevocation", "true");

So I'm a bit stuck. How about skipping the read test and creating a Jira issue to make it work? I think that it's possible to run this test with localstack but I'm missing something. I've spent a lot of time on it already and I doubt I'll come up with something.

@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch from aa6090d to 18b5475 Compare July 29, 2020 15:36
@pjotrekk
Copy link
Author

pjotrekk commented Jul 30, 2020

@TheNeuralBit Sorry for lots of messages but every day brings something new.

I've managed to modify existing KinesisIOIT to work with localstack (I'll push it in a separate PR).

I've also managed to run cross-language read test. But it required putting this code anywhere in harness code, which is definitely not any solution because it affects whole beam:

System.setProperty("com.amazonaws.sdk.disableCertChecking", "true");
System.setProperty("com.amazonaws.sdk.disableCbor", "true");

I have no idea how to dynamically inject these except for creating custom beam java sdk docker image with modified harness code especially for this (read) test. I don't think it's an acceptable solution though. Do you possibly have a better idea?

If this is the only thing we could do I'd just skip read test and just execute only write instead in the postcommit.

@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch from ac912f2 to 9145e29 Compare July 30, 2020 11:21
@pjotrekk pjotrekk force-pushed the BEAM-10137-kinesis-read branch from 7bf5e80 to 42c62b1 Compare August 31, 2020 06:38
@pjotrekk
Copy link
Author

Please resolve the conflict.

@chamikaramj Done

@codecov
Copy link

codecov bot commented Aug 31, 2020

Codecov Report

Merging #12297 into master will decrease coverage by 0.12%.
The diff coverage is 66.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #12297      +/-   ##
==========================================
- Coverage   40.22%   40.10%   -0.13%     
==========================================
  Files         454      455       +1     
  Lines       53669    54025     +356     
==========================================
+ Hits        21587    21665      +78     
- Misses      32082    32360     +278     
Impacted Files Coverage Δ
sdks/python/apache_beam/io/kinesis.py 66.66% <66.66%> (ø)
.../runners/portability/fn_api_runner/translations.py 14.00% <0.00%> (+0.22%) ⬆️
...beam/runners/interactive/background_caching_job.py 25.92% <0.00%> (+0.68%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9aa6c39...42c62b1. Read the comment docs.

@pjotrekk
Copy link
Author

Run Python 3.8 PostCommit

@pjotrekk
Copy link
Author

Run Typescript PreCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

We can merge when tests pass.

@TheNeuralBit
Copy link
Member

Run Python PreCommit

@TheNeuralBit
Copy link
Member

Run Typescript PreCommit

@TheNeuralBit
Copy link
Member

Run Python PreCommit

1 similar comment
@TheNeuralBit
Copy link
Member

Run Python PreCommit

@mwalenia mwalenia merged commit 0335ba5 into apache:master Sep 2, 2020
@aromanenko-dev
Copy link
Contributor

Thanks for make it finally merged.
Quick note: this change is quite significant, please, update CHANGES.md on this (if not yet).

@hellomikelo
Copy link

I'm getting this error when trying ReadDataFromKinesis,

RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kinesis-expansion-service/2.24.0/beam-sdks-java-io-kinesis-expansion-service-2.24.0.jar: HTTP Error 404: Not Found

Seems like beam-sdks-java-io-kinesis-expansion-service no longer exists on Maven?

@pjotrekk
Copy link
Author

pjotrekk commented Oct 7, 2020

I'm getting this error when trying ReadDataFromKinesis,

RuntimeError: Unable to fetch remote job server jar at https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-kinesis-expansion-service/2.24.0/beam-sdks-java-io-kinesis-expansion-service-2.24.0.jar: HTTP Error 404: Not Found

Seems like beam-sdks-java-io-kinesis-expansion-service no longer exists on Maven?

@yuanhunglo ReadDataFromKinesis transform will be available in the 2.25.0 release. It was merged too late to make it to the 2.24.0. If you want to run it using 2.25 code then you need to build the expansion service using

./gradlew :sdks:java:io:kinesis:expansion-service:shadowJar

Then ReadDataFromKinesis should be able to detect the expansion service jar in its default path

@hellomikelo
Copy link

I tried that and received this error. I'll wait till 2.25.0 release to try again. Thanks!

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1602094791.569834934","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":4133,"referenced_errors":[{"created":"@1602094791.569831898","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}"

@pjotrekk
Copy link
Author

pjotrekk commented Oct 9, 2020

Sorry, I forgot there is some additional setup needed to run it locally.

You also need to build the jar of the runner you are using - example for Flink (you don't need it for DirectRunner)

./gradlew :runners:flink:1.10:job-server:shadowJar

Current python container:

./gradlew :sdks:python:container:py37:docker

Building java sdk container should not be needed - pulling the latest (2.24) from docker hub should be sufficient. But to be on the safe side:

./gradlew :sdks:java:container:docker
docker tag apache/beam_java8_sdk:2.26.0.dev apache/beam_java8_sdk:2.24.0.dev 

It will definitely be the easiest to wait for 2.25 but on the other hand it's a long time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants