Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save Jobs History on Flink #6

Merged
merged 14 commits into from
Feb 27, 2024
Merged

Save Jobs History on Flink #6

merged 14 commits into from
Feb 27, 2024

Conversation

ranchodeluxe
Copy link
Collaborator

@ranchodeluxe ranchodeluxe commented Nov 15, 2023

Mount EFS to Job Managers so they can archive jobs for historical status lookups

Addresses: pangeo-forge/pangeo-forge-runner#122

Related PR: pangeo-forge/pangeo-forge-runner#131

@ranchodeluxe ranchodeluxe changed the title WIP: Save Jobs HIstory on Flink WIP: Save Jobs History on Flink Nov 15, 2023
@ranchodeluxe ranchodeluxe changed the title WIP: Save Jobs History on Flink Save Jobs History on Flink Nov 15, 2023
@ranchodeluxe ranchodeluxe changed the title Save Jobs History on Flink WIP: Save Jobs History on Flink Nov 15, 2023
@ranchodeluxe ranchodeluxe changed the title WIP: Save Jobs History on Flink Save Jobs History on Flink Nov 15, 2023
Copy link
Collaborator

@yuvipanda yuvipanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, EFS is NFS. And NFS is one of those 'you have a problem, you think you will use NFS, and now you have two problems' situations. It plays poorly with a lot of data formats that use any kinda file locking (see https://www.sqlite.org/howtocorrupt.html#_filesystems_with_broken_or_missing_lock_implementations), and the file corruption only shows up in the worst possible times. So I think the primary, and perhaps the only, time to use NFS (and hence EFS) is when providing home directories.

Given we already have the EBS provisioner setup and use it for prometheus, can we not use EBS here too? It does mean that only one pod can write to an EBS volume at a time, but relying on NFS for multiple-replica high availability eventually only leads to tears, pain, blood, stale file handle crashes and death.

Left some inline comments about the kubernetes provider.

terraform/aws/k8s_manifests.tf Outdated Show resolved Hide resolved
terraform/aws/variables.tf Outdated Show resolved Hide resolved
@ranchodeluxe
Copy link
Collaborator Author

ranchodeluxe commented Nov 16, 2023

So, EFS is NFS. And NFS is one of those 'you have a problem, you think you will use NFS, and now you have two problems' situations. It plays poorly with a lot of data formats that use any kinda file locking (see https://www.sqlite.org/howtocorrupt.html#_filesystems_with_broken_or_missing_lock_implementations), and the file corruption only shows up in the worst possible times. So I think the primary, and perhaps the only, time to use NFS (and hence EFS) is when providing home directories.

Given we already have the EBS provisioner setup and use it for prometheus, can we not use EBS here too? It does mean that only one pod can write to an EBS volume at a time, but relying on NFS for multiple-replica high availability eventually only leads to tears, pain, blood, stale file handle crashes and death.

Left some inline comments about the kubernetes provider.

Thanks for giving me the deep deets on why EFS/NFS is bad. I was going to use EBS but then I realized something when playing with multiple job managers that made me switch back to EFS:

  1. There's no reason we need to start the historyserver as the docs recommend. It seems the job manager REST API serves the history API (that's how the job manager UI basically works)

  2. More importantly even if a job manager DID NOT RUN a job it can still find the archived job in the EFS mount and return information about it. This is important b/c that means any of the existing job manager REST APIs can tell us about all history even if the job manager that specially ran a job is killed (hence needing multiple pods to have the EFS mount). In the future we are probably going to need to create some type of kind: Job || CronJob reaper that cleans up kind: FlinkDeployment on a regular basis. If we do that we can't expect job-manager pods to stick around anyway

Does any of that assuage your fears and persuade you one way or the other @yuvipanda?

@ranchodeluxe
Copy link
Collaborator Author

doh, so poor: hashicorp/terraform-provider-kubernetes#1775 (comment)

maybe I just write a helm config since that works

@yuvipanda
Copy link
Collaborator

maybe I just write a helm config since that works

YESSS, I always prefer this over raw manifests :)

@yuvipanda
Copy link
Collaborator

Thanks for engaging with me on the EFS issues :) My goal here is not to say 'no EFS ever', but just to make sure we are only using it after we have completely determined that EBS is not an option.

So if I understand this correctly, the reason for EFS over EBS are:

  1. Multiple pods may be writing to this filesystem.
    a. QUESTION: Will these be concurrently writing to the same filesystem, or non-concurrently? What is the 'level' of concurrency - one writer per job, or multiple writers per job?
    b. QUESTION: Will these multiple writers be writing to the same files, or different files? And concurrently, or serially?
  2. Will this reaper process require direct read and write access to the files dropped there by the flink servers? I don't think I fully understand the relationship between the reaper and EFS.

I think answers to these questions will help me a lot :)

@ranchodeluxe
Copy link
Collaborator Author

ranchodeluxe commented Nov 17, 2023

1. Multiple pods may be writing to this filesystem.
   a. QUESTION: Will these be _concurrently_ writing to the same filesystem, or non-concurrently? What is the 'level' of concurrency - one writer per job, or multiple writers per job?
  • since jobs (and hence pods) can run concurrently then, yes, these will be writing to the same filesystem concurrently

  • I will need to look into the Flink source code more to discover how many writers per job. The logs make it seem like it's a single service handling the archival process so guessing one writer per job

   b. QUESTION: Will these multiple writers be writing to the _same_ files, or different files? And concurrently, or serially?
  • don't know the answer to this question until I investigate more

  • this question anticipates another thing to confirm in the Flink source -- how are Job IDs determined. Will they be unique across jobs (hence pods) or only unique per job manager? Or are they hashes of the source? If the Job IDs are not unique then multiple writers "could" be trying to write to the same file in the case of two jobs running simultaneously

2. Will this reaper process require direct read and write access to the files dropped there by the flink servers? I don't think I fully understand the relationship between the reaper and EFS. 

No, the reaper process doesn't need to access the EFS mount. It's only checking kind: FlinkDeployment and their ages and then kubectl delete <kind:flinkdeployment> past some age expiry

@ranchodeluxe
Copy link
Collaborator Author

These clowns removed the 1.5.0 operator: https://downloads.apache.org/flink/flink-kubernetes-operator-1.5.0

@ranchodeluxe
Copy link
Collaborator Author

ranchodeluxe commented Nov 21, 2023

These clowns removed the 1.5.0 operator: https://downloads.apache.org/flink/flink-kubernetes-operator-1.5.0

Got confirmation from one of the devs that only the latest two operator versions are supported and one was just released. He's not sure if this documentation applies to the operators as well but it pretty much aligns:

https://flink.apache.org/downloads/#update-policy-for-old-releases

specific to the operator: https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning

apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-historyserver-efs-pv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this has 'historyserver' in the name, so it gets used specifically just for this and not much more :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistentVolume is also not namespaced, and should get same treatment as StorageClass with .Release.Name. sorry for not catching that earlier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranchodeluxe I think this still needs to be fixed?

Copy link
Collaborator

@yuvipanda yuvipanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working with me on this, @ranchodeluxe. I think using EFS is alright here! I've left some other minor comments, but overall lgtm

helm-charts/flink-historyserver/templates/efs.yaml Outdated Show resolved Hide resolved
helm-charts/flink-historyserver/templates/efs.yaml Outdated Show resolved Hide resolved
helm-charts/flink-historyserver/values.yaml Outdated Show resolved Hide resolved
@ranchodeluxe
Copy link
Collaborator Author

Thanks for working with me on this, @ranchodeluxe. I think using EFS is alright here! I've left some other minor comments, but overall lgtm

Sorry @yuvipanda I thought I muted this by turning it back into a draft so it wouldn't ping you. I'll do that now (it still needs a bit of work) and I'll incorporate your feedback before requesting another review. Here are some answers to some previous questions:

  1. Multiple pods may be writing to this filesystem.
    a. QUESTION: Will these be concurrently writing to the same filesystem, or non-concurrently? What is the 'level' of concurrency - one writer per job, or multiple writers per job?

The JobID(s) returned are statistically unique. And the writers of history to the NFS are a single process/thread

@ranchodeluxe ranchodeluxe marked this pull request as draft November 24, 2023 15:14
@ranchodeluxe ranchodeluxe dismissed yuvipanda’s stale review November 28, 2023 15:42

requestors changes have been made and I've requested a new review

yuvipanda pushed a commit to pangeo-forge/pangeo-forge-runner that referenced this pull request Nov 28, 2023
Mount EFS to Job Managers so they can archive jobs for historical status lookups

Addresses #122

Related PR: pangeo-forge/pangeo-forge-cloud-federation#6

Co-authored-by: ranchodeluxe <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Copy link
Collaborator

@yuvipanda yuvipanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for working on this, @ranchodeluxe

apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-historyserver-efs-pv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistentVolume is also not namespaced, and should get same treatment as StorageClass with .Release.Name. sorry for not catching that earlier.

terraform/aws/variables.tf Outdated Show resolved Hide resolved
terraform/aws/main.tf Outdated Show resolved Hide resolved
"prometheus.io/port" : "9999"
}
})
value = local_file.flink_operator_config.content
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this simply be templatefile("flink_operator_config.tpl",{ mount_path=var.historyserver_mount_path }) instead? That way, we can get rid of having to gitignore .yaml files and save an additional resource here. Also keeps it simpler with one fewer level of redirection.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why is this a template at all? Can't it be just yamlencode still, with the values for jobmanager.archive.fs.dir be set to var.historyserver_mount_path? I think that's much cleaner, and we'll never run into YAML indentation issues due to how templating works.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yamlencode was already creating something that couldn't be parsed by the operator:

[WARN ] Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:55: Line is not a key-value pair (missing space after ':'?)

Let me look into this one after I clean other things up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, this inline version of a Map is the only one that doesn't give the parse warning 😆

kubernetes.jobmanager.annotations: {"prometheus.io/scrape": true, "prometheus.io/port": 9999}

I'm gonna keep the template file b/c IMHO yamlencode is hard to grok

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could get that solution to render via templatefile or yamlencode correctly so ticked and moving onto better problems for now. Will have a look at it when I get around to getting Prometheus to work better

terraform/aws/helm_historyserver.tf Outdated Show resolved Hide resolved

locals {
# removing lines that start with '#' b/c TF >> helm doesn't like them
filtered_log4j_config = join("\n", [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually really confused about what's happening here. Are we copy pasting the default values from a configmap generated by the operator onto our helm setup, but with # removed? Why are we copy pasting them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had removed these changes but there were committed to some other branch it seems. It's reusing the default flink-operator-config now

@ranchodeluxe
Copy link
Collaborator Author

@yuvipanda gentle nudge with some 🧁 for dessert 😄

Copy link
Collaborator

@yuvipanda yuvipanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extra ping, @ranchodeluxe :)

apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-historyserver-efs-pv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranchodeluxe I think this still needs to be fixed?

@@ -0,0 +1,4 @@
efsFileSystemId: ""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think undoing the 'required' is much cleaner than enforcing them via minLength: 1. If there is a future change coming in that makes these required, we can modify the schema at that point, no?

terraform/aws/.gitignore Outdated Show resolved Hide resolved
@ranchodeluxe
Copy link
Collaborator Author

alrighty then @yuvipanda, back at this with recent changes so @thodson-usgs can use EFS

Copy link

@moradology moradology left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@thodson-usgs
Copy link
Contributor

Looks good to me

@ranchodeluxe ranchodeluxe merged commit 62cdddf into main Feb 27, 2024
@thodson-usgs thodson-usgs mentioned this pull request Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants