Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support larger workflows #7121

Open
jli opened this issue Nov 1, 2021 · 27 comments
Open

Support larger workflows #7121

jli opened this issue Nov 1, 2021 · 27 comments
Labels
area/controller Controller issues, panics solution/workaround There's a workaround, might not be great, but exists type/feature Feature request

Comments

@jli
Copy link

jli commented Nov 1, 2021

Summary

It seems there's a limit on the size of Argo workflows (in terms of number of tasks and task specification size). Argo compresses the workflow status.nodes field into status.compressedNodes, but the spec.templates field can still become very large if there are a lot of tasks and/or if the tasks have large specs.

https://argoproj.github.io/argo-workflows/offloading-large-workflows/ seems to only apply to the status.nodes field, not to spec.templates.

Would it be possible for the spec.templates field to also be gzip+b64 encoded, like status.nodes? I think this would allow for much larger workflows. Offloading to Postgres would be another option to scale even bigger, though that may not be necessary.

Use Cases

I'm using Argo via Kubeflow Pipelines, and a workflow with ~200 tasks exceeds the limit of what KFP/Argo can run: I get the error offload node status is not supported, more details here. KFP adds some metadata annotations which makes each task template spec kind of large (the KFP devs suggested a hack here to make this a bit smaller).

Other users have run into this problem as well. Example from the CNCF slack on 2021-10-29: https://cloud-native.slack.com/archives/C01QW9QSSSK/p1635517397302400


Message from the maintainers:

Impacted by this bug? Give it a 👍. We prioritise the issues with the most 👍.

@jli jli added the type/feature Feature request label Nov 1, 2021
@alexec
Copy link
Contributor

alexec commented Nov 1, 2021

Interesting. Thoughts:

  1. Shrink the size of the templates in the short term by not adding so much meta-data.
  2. We could provide spec.compressedTemplate that could be uncompressed at runtime.
  3. We could fully offload the whole spec to external storage and leave just a reference to it in the spec.

Option 2 would not be too hard and could buy time for option 3.

@sarabala1979
Copy link
Member

The above requirement can be done by Workflows of workflow. This is way load and the process can be distributed across multiple workflows.

@jli
Copy link
Author

jli commented Nov 2, 2021

@sarabala1979 thanks, though unfortunately we don't have access to workflow of workflows via KFP. But also, I think it would be nice even for direct Argo users to be able to create a much larger single workflow without figuring out how to break up their workflow into a workflow-of-workflows setup.

@alexec I'm not that familiar with the Argo code base. Do you think it would be tricky for an external contributor to implement option 2? I've briefly skimmed code related to the status.nodes compression - would a similar pattern work for spec.templates?

@alexec
Copy link
Contributor

alexec commented Nov 2, 2021

I've briefly skimmed code related to the status.nodes compression - would a similar pattern work for spec.templates?

Yes. I wonder if we only need compression? That will be much easier to implement. We/you also need to think about interplay between offloading of nodes and offloading of templates.

@simster7
Copy link
Member

simster7 commented Nov 2, 2021

Also you could have complicated templates stored on WorkflowTemplates and only referenced at runtime?

@terrytangyuan
Copy link
Member

We implemented an optimizer (part of the work we did for Couler) that could convert a large workflow to use the "workflow of workflows" pattern automatically based on algorithms. Unfortunately, it's not open-sourced yet. I can reach out to see if that can be open-sourced at some point (potentially bring it to Argo code base?).

@sarabala1979
Copy link
Member

@alexec compress/offload template will not work because the first client should create workflow object in k8s. k8s will reject if the object is greater than 1 mb. OffloadNodestatus is processed output that will generate in runtime that's why the offload feature is easy to implement.

They could create workflowtemplates refer it as templateRef or workflowtemplateRef that will save a lot of space.

One improvement we can do in workflowcontroller is compress storedTemplate and storeWorkflowSpec

@isubasinghe
Copy link
Member

isubasinghe commented Jan 18, 2022

Just a comment just to generate discussion. Feel free to ignore if assumptions are wrong.

Assumptions:

  • This issue is due to etcd having a limit of 1.5mb for key value pairs
  • This is only a etcd related storage problem and not a application level memory problem.
  • Only guaranteed storage available to argo is etcd

Pros:

  • Practically unlimited size of workflows can be stored
  • No reliance on another database.

Cons:

  • Code for this suggestion is not difficult on the axis of "how hard is it to grasp" in my opinion, it is fairly obvious what to do but it is error prone, tedious and introduces non-trivial maintenance burden.

Suggestion:
You could alternatively deconstruct the data into a set of key/value pairs on write and on read reconstruct back the original data. This is commonly done in newer NoSQL style DBs that use rocksdb for example so this is a proven viable solution. You would need to do this as a transaction in etcd.

I think the code change could be fairly minimal as long as the retrieval and storage of data from etcd is fairly localised in the source code.

Imo given my assumptions being correct, this is the ideal solution in a perfect world with infinite time and infinite resources but obviously we do not live in such a world but I thought that it is still worth to put this comment up here because if someone thinks this effort is worth it they could more seriously consider it.

I think storing in a database is far easier and the better (more pragmatic) route. But I probably don't have enough experience to make that call so a more experienced dev's opinion would be appreciated.

@alexec

@isubasinghe
Copy link
Member

I have done the above process for processing JSON into key value pairs in Rust. That codebase is not open source however I would be happy to share the key value deconstruction part if that helps.

@pogacsa
Copy link

pogacsa commented Sep 7, 2022

Has anything been done to solve this instead of us doing workarounds? Could this limit be set higher?

@agilgur5 agilgur5 added the solution/workaround There's a workaround, might not be great, but exists label Jan 20, 2024
@agilgur5
Copy link

Could this limit be set higher?

Per above and the docs, this is a hard limit set by etcd, not Argo

@isubasinghe
Copy link
Member

isubasinghe commented Jan 20, 2024

As I stated before, we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

This is how databases are built on top of KV stores, and I've personally done this before for a little DB I built on top of rocksdb

@agilgur5
Copy link

agilgur5 commented Jan 20, 2024

I was responding to the upvoted comment above for clarity, since this is not a limit set by Argo and so is not something that can be "set higher".

It requires a workaround in user-land, such as Workflows of Workflows, or in the Controller, as you stated

As I stated before, we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

Correct me if I'm wrong, but I think even this workaround will hit a limitation: the initial resource passed to k8s / etcd -- before the Controller has had a chance to process it -- cannot exceed the etcd 1MB limit. I don't think there's a way of working around that in the Controller; one way or another, at a large enough size eventually a Workflow of Workflows will be necessary

@isubasinghe
Copy link
Member

You are correct that the this will still be limited by the initial resource passed to k8s.

It definitely isn't a nice solution (arguably, it is a hack) I was more just giving an possible option if we want to push the limits a bit more.

@agilgur5
Copy link

It definitely isn't a nice solution (arguably, it is a hack) I was more just giving an possible option if we want to push the limits a bit more.

Since it won't push it that much further, I would probably say that this is not worth adding such a hack / workaround.

Some more compression could potentially be added as Bala mentioned above to push some of the generated pieces in our control.

Otherwise, the best DX may be to have a documented user-land optimizer for Workflows of Workflows as Terry suggested. We could make that part of the CLI

@isubasinghe
Copy link
Member

@agilgur5 Makes sense, I do like Terry's suggestion the best.

@agilgur5
Copy link

we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

I did think of a slightly different use-case for this -- this could be a different option for node status offloading that doesn't require a separate DB. That's actually probably how I would've implemented status offloading initially as splitting into multiple keys is a more natural progression, although a DB may still eventually be necessary when the status gets too large and the key splitting algorithm can no longer handle it.

@Joibel
Copy link
Member

Joibel commented Jan 22, 2024

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

I guess you could have an index object which could be used to verify integrity (with the necessity of keeping old indexes and data around) or some kind of journaled status.

I had assumed this complexity was why offloading to a databases was implemented instead.

@isubasinghe
Copy link
Member

isubasinghe commented Jan 23, 2024

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

I guess you could have an index object which could be used to verify integrity (with the necessity of keeping old indexes and data around) or some kind of journaled status.

I had assumed this complexity was why offloading to a databases was implemented instead.

I think if you are willing to use etcd directly, you should be able to do atomic writes/reads to etcd, etcd does have transactions

@isubasinghe
Copy link
Member

we could potentially deconstruct the workflows into multiple key/value pairs @agilgur5, it is kind of an ugly solution but one that will work with no issues (well we might run into API rate limiting if we allow any size).

I did think of a slightly different use-case for this -- this could be a different option for node status offloading that doesn't require a separate DB. That's actually probably how I would've implemented status offloading initially as splitting into multiple keys is a more natural progression, although a DB may still eventually be necessary when the status gets too large and the key splitting algorithm can no longer handle it.

Honestly even splitting the status and the workflow itself, would probably roughly double the max size.

@Joibel
Copy link
Member

Joibel commented Jan 23, 2024

We cannot use kubernetes own etcd directly, there is no access to it in most managed kubernetes clusters. If we're going to install etcd as a service in the cluster we may as well use something more suited to this problem.

@agilgur5
Copy link

agilgur5 commented Jan 31, 2024

I think if you are willing to use etcd directly, you should be able to do atomic writes/reads to etcd, etcd does have transactions

I was thinking about this too, as most key-value stores have transactions. Although I was thinking of avoiding using etcd directly if possible.

We cannot use kubernetes own etcd directly, there is no access to it in most managed kubernetes clusters. If we're going to install etcd as a service in the cluster we may as well use something more suited to this problem.

I usually run self-managed kOps clusters, so I forgot about that, good point.
But there are already tools that use a secondary etcd in the ecosystem, such as Cilium, in order to offload some of the load from the central k8s API server / etcd. kOps supports running extra etcd instances for this purpose as well.

I don't think we necessarily need to do that, though that is an interesting option. If we already support a (SQL) DB though, not much reason to implement another alternative offload store.

How would key splitting work given that you cannot guarantee atomic writes to multiple objects?

You may end up writing some status changes but then failing to write others because of resourceVersion?

This is a good point overall, though as we already have the status/compressedNodes subresource I assume this is already being done in some places.
We also already lock each Workflow, so we can do atomicity ourselves, though I agree it would be less complex to avoid having to write that ourselves.
I would suspect the subresource logic is either already doing that or not even considering atomicity currently 😅

I had assumed this complexity was why offloading to a databases was implemented instead.

This is why I put such importance on historical context 🙂
I'm not sure, we should probably check the PR history and see if it was discussed in the repo

@alexec
Copy link
Contributor

alexec commented Feb 5, 2024

Couple of notes here. The way that we offload workflows today is that the node status ($.status.nodes) which has always been the largest component for a workflow is either (a) compressed if it is small enough or (b) offloaded if too large.

The benefit of compression is that you don’t need to operate and manage a new database. Even thought garbage collection and retention of that database is managed by the controller, setting up an external database is onerous, and you can no longer uses kubectl to directly interacte with Argo Workflows, you must use the argo cli and you must use the API and UI, so you must deploy all that too.

I.e. large workflows create operational overhead many users don’t want.

Offloading is nothing more than saving the data and replacing it with a reference. The reference itself is a deterministic hash of the data. This allows it to be robustness, even if there is a network error when writing the status (i.e. it does an upsert).

By having a reference you keep the benefits of Kubernetes, when the reference changes, the workflow changes, and you get Kubernetes events. Watches continue to work.

I think the exact same pattern would work for workflow with a large volume of templates, but rather than just do the templates, do the whole workflow and just have a reference in the status field.

@RyanDevlin
Copy link

My team is experiencing similar issues where we are looking for a way to reduce pressure on etcd. We run extremely large-scale batch jobs using Argo Workflows, and currently these jobs are eating up 50%+ of etcd when at full scale. Ideally we'd like a mechanism to offload a big chunk of this data to a separate DB so we can run multiple jobs at once on a single cluster. Etcd consumption is currently the only scaling dimension preventing us from doing this.

It would be amazing if Argo had a feature where you could use something like DynamoDB (or any other distributed DB that has similar performance to etcd) as a backing store instead of etcd. Currently the "offloading large workflows" feature doesn't necessarily support our use case, because it only offloads if the workflows exceed the 1MB limit, which ours do not. Having a way to back Argo's state with a separate DB would unlock serious scaling capabilities where our workflows could run hundreds of thousands of Pods.

@agilgur5
Copy link

Currently the "offloading large workflows" feature doesn't necessarily support our use case, because it only offloads if the workflows exceed the 1MB limit, which ours do not.

There is an existing, separate feature to offload in all cases, the environment variable ALWAYS_OFFLOAD_NODE_STATUS. There is a trade-off in performance when doing that though (since you're constantly reading and writing to a DB), see also #13107

@RyanDevlin
Copy link

RyanDevlin commented Jul 19, 2024

@agilgur5 Interesting thanks for that pointer! Do you have any anecdotal evidence of the performance with offloading? We'll obviously have to test it with our own setup, but should I expect significantly degraded performance when using that flag?

Also I'm curious, what specifically is the performance bottleneck when offloading? Shouldn't Argo be able to write out state to a separate DB with similar performance as Etcd? Since etcd uses Raft it's not as performant as similar distributed key-value DBs, so I'm having trouble visualizing why the offloading feature is much slower than natively using etcd.

@agilgur5
Copy link

agilgur5 commented Jul 19, 2024

Shouldn't Argo be able to write out state to a separate DB with similar performance as Etcd?

There's two main differences:

  1. Argo is still writing to the k8s api-server/etcd, just substantially less for the node status specifically. That means that offloading adds an additional DB. That does get quite nuanced as to when that happens and how performant each DB is respectively with large JSON payloads (see also Improve mysql write performance and stability when offloading #13290), so yes I would suggest testing the performance before/after.
  2. The network latency also depends on where you host each; k8s api-server typically has very close proximity in a cluster.

Also this is better discussed in #13107 as this addresses that exact question and is a bit adjacent to this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/controller Controller issues, panics solution/workaround There's a workaround, might not be great, but exists type/feature Feature request
Projects
None yet
Development

No branches or pull requests

10 participants