Skip to content

Commit

Permalink
draft 3
Browse files Browse the repository at this point in the history
  • Loading branch information
ranchodeluxe committed Oct 25, 2023
1 parent 23ec9a0 commit 6c28988
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 100 deletions.
19 changes: 10 additions & 9 deletions .github/workflows/flink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,36 @@ jobs:
with:
python-version: 3.9

- name: 'Setup minio + mc'
- name: 'Setup minio cli mc'
run: |
wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
mv minio /usr/local/bin/minio
wget --quiet https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/mc
minio --version
mc --version
- name: Install dependencies & our package
run: |
python -m pip install --upgrade pip
python -m pip install -r dev-requirements.txt
python -m pip install -e .
- name: Set up min.io as a k3s service
run: |
# TODO: generate keys/secrets with `openssl rand -hex 16`
kubectl create secret generic minio-secrets --from-literal=MINIO_ACCESS_KEY=myaccesskey --from-literal=MINIO_SECRET_KEY=mysecretkey
kubectl apply -f tests/integration/flink/minio-manifest.yaml
- name: Install socat so kubectl port-forward will work
run: |
# Not sure if this is why kubectl proxy isn't working, but let's try
sudo apt update --yes && sudo apt install --yes socat
# - name: Setup upterm session
# uses: lhotari/action-upterm@v1

- name: Test with pytest
run: |
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/test_flink.py
pytest -vvv -s --cov=pangeo_forge_runner tests/integration/flink/test_flink_integration.py
kubectl get pod -A
kubectl describe pod
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"escapism",
"jsonschema",
"traitlets",
"apache-beam[gcp]",
"apache-beam[gcp]==2.47.0",
"importlib-metadata",
],
entry_points={
Expand Down
19 changes: 19 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,22 @@ def minio(local_ip):
proc.wait()

assert proc.returncode == 0


@pytest.fixture(scope="session")
def minio_service():
cmd = [
"kubectl",
"get",
"service/minio-service",
"-o=jsonpath='{.spec.clusterIP}:{.spec.ports[0].port}'",
]
proc = subprocess.run(cmd, capture_output=True, text=True)
assert proc.returncode == 0
svc_address = proc.stdout.strip('"').strip("'")
endpoint = f"http://{svc_address}"
# enter
# TODO: get secrets from k3s
yield {"endpoint": endpoint, "username": "myaccesskey", "password": "mysecretkey"}
# exit
return
68 changes: 68 additions & 0 deletions tests/integration/flink/minio-manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: minio-pv-claim
labels:
app: minio-storage-claim
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: minio
spec:
selector:
matchLabels:
app: minio-server
strategy:
type: Recreate
template:
metadata:
labels:
app: minio-server
spec:
containers:
- name: minio
image: minio/minio
args:
- server
- /data
env:
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
name: minio-secrets
key: MINIO_ACCESS_KEY
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
name: minio-secrets
key: MINIO_SECRET_KEY
ports:
- containerPort: 9000
name: minio
volumeMounts:
- name: data
mountPath: "/data"
volumes:
- name: data
persistentVolumeClaim:
claimName: minio-pv-claim
---
apiVersion: v1
kind: Service
metadata:
name: minio-service
spec:
type: ClusterIP
ports:
- port: 9000
targetPort: 9000
protocol: TCP
selector:
app: minio-server
166 changes: 166 additions & 0 deletions tests/integration/flink/test_flink_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import json
import subprocess
import tempfile
import time
from importlib.metadata import version

import xarray as xr
from packaging.version import parse as parse_version


def test_flink_bake(minio_service):
fsspec_args = {
"key": minio_service["username"],
"secret": minio_service["password"],
"client_kwargs": {"endpoint_url": minio_service["endpoint"]},
}

# TODO: we need a build matrix for the flink tests: python versions, apache-beam versions, recipe versions
# and we need the integration repo to have flink-specific tags
# and apache-beam[gcp]==2.[47-51].x versions added to those tagged requirements
pfr_version = parse_version(version("pangeo-forge-recipes"))
if pfr_version >= parse_version("0.10"):
recipe_version_ref = str(pfr_version)
else:
recipe_version_ref = "0.9.x"
print(f"[ RECIPE VERSION ]: {recipe_version_ref}")

bucket = "s3://gpcp-out"
config = {
"Bake": {
"prune": True,
"job_name": "recipe",
"bakery_class": "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery",
# there must be a job-server jar available for the matching
# `apache-beam` and `FlinkOperatorBakery.flink_version` here:
# https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/
"container_image": "apache/beam_python3.9_sdk:2.47.0",
},
"TargetStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": bucket + "/target/{job_name}",
},
"InputCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": bucket + "/input-cache/{job_name}",
},
"MetadataCacheStorage": {
"fsspec_class": "s3fs.S3FileSystem",
"fsspec_args": fsspec_args,
"root_path": bucket + "/metadata-cache/{job_name}",
},
"FlinkOperatorBakery": {
"flink_version": "1.16",
"job_manager_resources": {"memory": "1024m", "cpu": 0.30},
"task_manager_resources": {"memory": "2048m", "cpu": 0.30},
"parallelism": 1,
"flink_configuration": {
"taskmanager.numberOfTaskSlots": "1",
"taskmanager.memory.jvm-overhead.max": "2048m",
},
},
}

with tempfile.NamedTemporaryFile("w", suffix=".json") as f:
json.dump(config, f)
f.flush()
cmd = [
"pangeo-forge-runner",
"bake",
"--repo",
"https://github.com/ranchodeluxe/gpcp-from-gcs-feedstock",
"--ref",
"fix/integration-test",
"-f",
f.name,
]

print("\nSubmitting job...")
timeout = 60 * 4
with subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
) as proc:
start = time.time()
for line in proc.stdout:
# nice to have output
print(line, end="")

elapsed_time = time.time() - start
if elapsed_time >= timeout:
raise Exception("timeout reached, exiting")

# make sure the last time submitted job
assert line.startswith("Started Flink job as")

# use minio cli to inuit when job is finished after a waiting period
# TODO: we need to get the historyserver up so we can query job status async
# https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/advanced/historyserver/
time.sleep(60 * 2)
cmd = [
"mc",
"alias",
"set",
"myminio",
minio_service["endpoint"],
minio_service["username"],
minio_service["password"],
]
proc = subprocess.run(cmd, capture_output=True)
assert proc.returncode == 0

# set up path lookups for minio cli and xarray
target_path = config["TargetStorage"]["root_path"].format(
job_name=config["Bake"]["job_name"]
)
if pfr_version >= parse_version("0.10"):
# in pangeo-forge-recipes>=0.10.0, an additional `StoreToZarr.store_name` kwarg
# is appended to the formatted root path at execution time. for ref `0.10.x`,
# the value of that kwarg is "gpcp", so we append that here.
target_path += "/gpcp"

cmd = [
"mc",
"ls",
"myminio/{}/precip".format(target_path.replace("s3://", "")),
]
timeout = 60 * 5
start = time.time()
print("[ RUNNING ]: ", " ".join(cmd))
while True:
proc = subprocess.run(cmd, capture_output=True, text=True)
# purposely don't check proc.returncode since files might not exist yet

# --prune prunes to two time steps by default, so we expect 2 time steps here
# but four overall files:
#
# $ mc ls myminio/gpcp/target/recipe/gpcp/precip/
# [2023-10-24 22:42:16 UTC] 365B STANDARD .zarray
# [2023-10-24 22:42:16 UTC] 442B STANDARD .zattrs
# [2023-10-24 22:42:17 UTC] 145KiB STANDARD 0.0.0
# [2023-10-24 22:42:17 UTC] 148KiB STANDARD 1.0.0
try:
output = proc.stdout.splitlines()
print(f"[ MINIO OUTPUT ]: {output[-1]}")
if len(output) == 4:
break
except:
pass

elapsed_time = time.time() - start
if elapsed_time >= timeout:
raise Exception("timeout reached, exiting")
time.sleep(2)

gpcp = xr.open_dataset(
target_path, backend_kwargs={"storage_options": fsspec_args}, engine="zarr"
)

assert (
gpcp.title
== "Global Precipitation Climatatology Project (GPCP) Climate Data Record (CDR), Daily V1.3"
)
# --prune prunes to two time steps by default, so we expect 2 items here
assert len(gpcp.precip) == 2
print(gpcp)
Loading

0 comments on commit 6c28988

Please sign in to comment.