-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Flink Integration Tests #114
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
|
||
from .. import Feedstock | ||
from ..bakery.base import Bakery | ||
from ..bakery.flink import FlinkOperatorBakery | ||
from ..bakery.local import LocalDirectBakery | ||
from ..plugin import get_injections, get_injectionspecs_from_entrypoints | ||
from ..storage import InputCacheStorage, MetadataCacheStorage, TargetStorage | ||
|
@@ -108,15 +109,29 @@ def _validate_job_name(self, proposal): | |
help=""" | ||
Container image to use for this job. | ||
|
||
Defaults to letting beam automatically figure out the image to use, | ||
For GCP DataFlow leaving it blank defaults to letting beam | ||
automatically figure out the image to use for the workers | ||
based on version of beam and python in use. | ||
|
||
Should be accessible to whatever Beam runner is being used. | ||
|
||
For Flink it's required that you pass an beam image | ||
for the version of python and beam you are targeting | ||
for example: apache/beam_python3.10_sdk:2.51.0 | ||
more info: https://hub.docker.com/layers/apache/ | ||
|
||
Note that some runners (like the local one) may not support this! | ||
""", | ||
) | ||
|
||
@validate("container_image") | ||
def _validate_container_image(self, proposal): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
if self.bakery_class == FlinkOperatorBakery and not proposal.value: | ||
raise ValueError( | ||
"'container_name' is required when using the 'FlinkOperatorBakery' " | ||
"for the version of python and apache-beam you are targeting. " | ||
"See the sdk images available: https://hub.docker.com/layers/apache/" | ||
) | ||
return proposal.value | ||
|
||
def autogenerate_job_name(self): | ||
""" | ||
Autogenerate a readable job_name | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import base64 | ||
import os | ||
import secrets | ||
import signal | ||
|
@@ -57,3 +58,66 @@ def minio(local_ip): | |
proc.wait() | ||
|
||
assert proc.returncode == 0 | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def minio_service(): | ||
cmd = [ | ||
"kubectl", | ||
"get", | ||
"service/minio-service", | ||
"-o=jsonpath='{.spec.clusterIP}:{.spec.ports[0].port}'", | ||
] | ||
proc = subprocess.run(cmd, capture_output=True, text=True) | ||
assert proc.returncode == 0 | ||
svc_address = proc.stdout.strip('"').strip("'") | ||
endpoint = f"http://{svc_address}" | ||
|
||
cmd = [ | ||
"kubectl", | ||
"get", | ||
"secret/minio-secrets", | ||
"-o=jsonpath='{.data.MINIO_ACCESS_KEY}'", | ||
] | ||
proc = subprocess.run(cmd, capture_output=True, text=True) | ||
assert proc.returncode == 0 | ||
myaccesskey = proc.stdout | ||
myaccesskey = base64.b64decode(myaccesskey).decode() | ||
|
||
cmd = [ | ||
"kubectl", | ||
"get", | ||
"secret/minio-secrets", | ||
"-o=jsonpath='{.data.MINIO_SECRET_KEY}'", | ||
] | ||
proc = subprocess.run(cmd, capture_output=True, text=True) | ||
assert proc.returncode == 0 | ||
mysecretkey = proc.stdout | ||
mysecretkey = base64.b64decode(mysecretkey).decode() | ||
|
||
# enter | ||
yield {"endpoint": endpoint, "username": myaccesskey, "password": mysecretkey} | ||
|
||
# exit | ||
return | ||
|
||
|
||
def pytest_addoption(parser): | ||
parser.addoption("--flinkversion", action="store", default="1.16") | ||
parser.addoption("--pythonversion", action="store", default="3.9") | ||
parser.addoption("--beamversion", action="store", default="2.47.0") | ||
|
||
Comment on lines
+105
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another pattern that I haven't seen before, I like it! |
||
|
||
@pytest.fixture | ||
def flinkversion(request): | ||
return request.config.getoption("--flinkversion") | ||
|
||
|
||
@pytest.fixture | ||
def pythonversion(request): | ||
return request.config.getoption("--pythonversion") | ||
|
||
|
||
@pytest.fixture | ||
def beamversion(request): | ||
return request.config.getoption("--beamversion") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
apiVersion: v1 | ||
kind: PersistentVolumeClaim | ||
metadata: | ||
name: minio-pv-claim | ||
labels: | ||
app: minio-storage-claim | ||
spec: | ||
accessModes: | ||
- ReadWriteOnce | ||
resources: | ||
requests: | ||
storage: 1Gi | ||
--- | ||
apiVersion: apps/v1 | ||
kind: Deployment | ||
metadata: | ||
name: minio | ||
spec: | ||
selector: | ||
matchLabels: | ||
app: minio-server | ||
strategy: | ||
type: Recreate | ||
template: | ||
metadata: | ||
labels: | ||
app: minio-server | ||
spec: | ||
containers: | ||
- name: minio | ||
image: minio/minio | ||
args: | ||
- server | ||
- /data | ||
env: | ||
- name: MINIO_ACCESS_KEY | ||
valueFrom: | ||
secretKeyRef: | ||
name: minio-secrets | ||
key: MINIO_ACCESS_KEY | ||
- name: MINIO_SECRET_KEY | ||
valueFrom: | ||
secretKeyRef: | ||
name: minio-secrets | ||
key: MINIO_SECRET_KEY | ||
ports: | ||
- containerPort: 9000 | ||
name: minio | ||
volumeMounts: | ||
- name: data | ||
mountPath: "/data" | ||
volumes: | ||
- name: data | ||
persistentVolumeClaim: | ||
claimName: minio-pv-claim | ||
--- | ||
apiVersion: v1 | ||
kind: Service | ||
metadata: | ||
name: minio-service | ||
spec: | ||
type: ClusterIP | ||
ports: | ||
- port: 9000 | ||
targetPort: 9000 | ||
protocol: TCP | ||
selector: | ||
app: minio-server |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm learning a lot reading this PR 😄