From c3cc51be2e14e931d6e212aa30842a2c514082d1 Mon Sep 17 00:00:00 2001 From: Daisuke Taniwaki Date: Sat, 12 Jan 2019 10:18:30 +0900 Subject: [PATCH] Support HDFS Artifact (#1159) Support HDFS Artifact (#1159) --- Gopkg.lock | 102 ++++++++ Gopkg.toml | 7 + api/openapi-spec/swagger.json | 132 ++++++++++ examples/hdfs-artifact.yaml | 81 ++++++ .../workflow/v1alpha1/openapi_generated.go | 236 +++++++++++++++++- pkg/apis/workflow/v1alpha1/types.go | 68 ++++- .../v1alpha1/zz_generated.deepcopy.go | 75 ++++++ workflow/artifacts/hdfs/hdfs.go | 217 ++++++++++++++++ workflow/artifacts/hdfs/util.go | 53 ++++ workflow/common/common.go | 6 + workflow/controller/config.go | 13 + workflow/controller/workflowpod.go | 9 +- workflow/executor/executor.go | 98 ++++++-- workflow/validate/validate.go | 7 + 14 files changed, 1080 insertions(+), 24 deletions(-) create mode 100644 examples/hdfs-artifact.yaml create mode 100644 workflow/artifacts/hdfs/hdfs.go create mode 100644 workflow/artifacts/hdfs/util.go diff --git a/Gopkg.lock b/Gopkg.lock index 768463856c7e..08a10cc5dbfd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -71,6 +71,18 @@ pruneopts = "" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" +[[projects]] + digest = "1:5cf8a8393124ac3d5632a8c51d08d8ff2aa29b6b328306cb8b7560a7e83cf760" + name = "github.com/colinmarc/hdfs" + packages = [ + ".", + "protocol/hadoop_common", + "protocol/hadoop_hdfs", + "rpc", + ] + pruneopts = "" + revision = "48eb8d6c34a97ffc73b406356f0f2e1c569b42a5" + [[projects]] digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b" name = "github.com/davecgh/go-spew" @@ -276,6 +288,14 @@ pruneopts = "" revision = "9cad4c3443a7200dd6400aef47183728de563a38" +[[projects]] + digest = "1:d35e07e002ccc51cb01fa77e932ea62206c4d3b2fb0fa1f1b052885942108a96" + name = "github.com/hashicorp/go-uuid" + packages = ["."] + pruneopts = "" + revision = "de160f5c59f693fed329e73e291bb751fe4ea4dc" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:9c776d7d9c54b7ed89f119e449983c3f24c0023e75001d6092442412ebca6b94" @@ -311,6 +331,17 @@ pruneopts = "" revision = "d14ea06fba99483203c19d92cfcd13ebe73135f4" +[[projects]] + branch = "master" + digest = "1:1c030807110db46f33e7abd02c08dd98dc2c1c6620eea6941185025f16ad8bbb" + name = "github.com/jcmturner/gofork" + packages = [ + "encoding/asn1", + "x/crypto/pbkdf2", + ] + pruneopts = "" + revision = "2aebee971930cd0dd525873330952ab7df5ac95c" + [[projects]] digest = "1:31c6f3c4f1e15fcc24fcfc9f5f24603ff3963c56d6fa162116493b4025fb6acc" name = "github.com/json-iterator/go" @@ -587,12 +618,14 @@ "ed25519/internal/edwards25519", "internal/chacha20", "internal/subtle", + "md4", "openpgp", "openpgp/armor", "openpgp/elgamal", "openpgp/errors", "openpgp/packet", "openpgp/s2k", + "pbkdf2", "poly1305", "ssh", "ssh/agent", @@ -714,6 +747,70 @@ revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" version = "v0.9.1" +[[projects]] + digest = "1:4777ba481cc12866b89aafb0a67529e7ac48b9aea06a25f3737b2cf5a3ffda12" + name = "gopkg.in/jcmturner/aescts.v1" + packages = ["."] + pruneopts = "" + revision = "f6abebb3171c4c1b1fea279cb7c7325020a26290" + version = "v1.0.1" + +[[projects]] + digest = "1:84c5b1392ef65ad1bb64da4b4d0beb2f204eefc769d6d96082347bb7057cb7b1" + name = "gopkg.in/jcmturner/dnsutils.v1" + packages = ["."] + pruneopts = "" + revision = "13eeb8d49ffb74d7a75784c35e4d900607a3943c" + version = "v1.0.1" + +[[projects]] + digest = "1:f727cb776135c090d4043eca9cd921b9967f75704a97309172fde92591b3c828" + name = "gopkg.in/jcmturner/gokrb5.v5" + packages = [ + "asn1tools", + "client", + "config", + "credentials", + "crypto", + "crypto/common", + "crypto/etype", + "crypto/rfc3961", + "crypto/rfc3962", + "crypto/rfc4757", + "crypto/rfc8009", + "gssapi", + "iana", + "iana/addrtype", + "iana/adtype", + "iana/asnAppTag", + "iana/chksumtype", + "iana/errorcode", + "iana/etypeID", + "iana/flags", + "iana/keyusage", + "iana/msgtype", + "iana/nametype", + "iana/patype", + "kadmin", + "keytab", + "krberror", + "messages", + "mstypes", + "pac", + "types", + ] + pruneopts = "" + revision = "32ba44ca5b42f17a4a9f33ff4305e70665a1bc0f" + version = "v5.3.0" + +[[projects]] + digest = "1:269a70a6997455a9130b3005af6d2983323e4b8c712f3288a0df0e6013c18ee1" + name = "gopkg.in/jcmturner/rpc.v0" + packages = ["ndr"] + pruneopts = "" + revision = "4480c480c9cd343b54b0acb5b62261cbd33d7adf" + version = "v0.0.2" + [[projects]] digest = "1:6715e0bec216255ab784fe04aa4d5a0a626ae07a3a209080182e469bc142761a" name = "gopkg.in/src-d/go-billy.v4" @@ -1069,6 +1166,7 @@ "github.com/argoproj/pkg/stats", "github.com/argoproj/pkg/strftime", "github.com/argoproj/pkg/time", + "github.com/colinmarc/hdfs", "github.com/evanphx/json-patch", "github.com/fsnotify/fsnotify", "github.com/ghodss/yaml", @@ -1085,6 +1183,10 @@ "github.com/tidwall/gjson", "github.com/valyala/fasttemplate", "golang.org/x/crypto/ssh", + "gopkg.in/jcmturner/gokrb5.v5/client", + "gopkg.in/jcmturner/gokrb5.v5/config", + "gopkg.in/jcmturner/gokrb5.v5/credentials", + "gopkg.in/jcmturner/gokrb5.v5/keytab", "gopkg.in/src-d/go-git.v4", "gopkg.in/src-d/go-git.v4/plumbing/transport", "gopkg.in/src-d/go-git.v4/plumbing/transport/http", diff --git a/Gopkg.toml b/Gopkg.toml index facaf7e03133..888ef9a90b8e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -55,3 +55,10 @@ required = [ name = "github.com/Azure/go-autorest" revision = "1ff28809256a84bb6966640ff3d0371af82ccba4" +[[constraint]] + name = "github.com/colinmarc/hdfs" + revision = "48eb8d6c34a97ffc73b406356f0f2e1c569b42a5" + +[[constraint]] + name = "gopkg.in/jcmturner/gokrb5.v5" + version = "5.3.0" diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 42fddd83d69b..221876f56de6 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -66,6 +66,10 @@ "description": "GlobalName exports an output artifact to the global scope, making it available as '{{workflow.outputs.artifacts.XXXX}} and in workflow.status.outputs.artifacts", "type": "string" }, + "hdfs": { + "description": "HDFS contains HDFS artifact location details", + "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.HDFSArtifact" + }, "http": { "description": "HTTP contains HTTP artifact location details", "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.HTTPArtifact" @@ -108,6 +112,10 @@ "description": "Git contains git artifact location details", "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.GitArtifact" }, + "hdfs": { + "description": "HDFS contains HDFS artifact location details", + "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.HDFSArtifact" + }, "http": { "description": "HTTP contains HTTP artifact location details", "$ref": "#/definitions/io.argoproj.workflow.v1alpha1.HTTPArtifact" @@ -249,6 +257,130 @@ } } }, + "io.argoproj.workflow.v1alpha1.HDFSArtifact": { + "description": "HDFSArtifact is the location of an HDFS artifact", + "required": [ + "addresses", + "path" + ], + "properties": { + "addresses": { + "description": "Addresses is accessible addresses of HDFS name nodes", + "type": "array", + "items": { + "type": "string" + } + }, + "force": { + "description": "Force copies a file forcibly even if it exists (default: false)", + "type": "boolean" + }, + "hdfsUser": { + "description": "HDFSUser is the user to access HDFS file system. It is ignored if either ccache or keytab is used.", + "type": "string" + }, + "krbCCacheSecret": { + "description": "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbConfigConfigMap": { + "description": "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + "$ref": "#/definitions/io.k8s.api.core.v1.ConfigMapKeySelector" + }, + "krbKeytabSecret": { + "description": "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbRealm": { + "description": "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + }, + "krbServicePrincipalName": { + "description": "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + "type": "string" + }, + "krbUsername": { + "description": "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + }, + "path": { + "description": "Path is a file path in HDFS", + "type": "string" + } + } + }, + "io.argoproj.workflow.v1alpha1.HDFSConfig": { + "description": "HDFSConfig is configurations for HDFS", + "required": [ + "addresses" + ], + "properties": { + "addresses": { + "description": "Addresses is accessible addresses of HDFS name nodes", + "type": "array", + "items": { + "type": "string" + } + }, + "hdfsUser": { + "description": "HDFSUser is the user to access HDFS file system. It is ignored if either ccache or keytab is used.", + "type": "string" + }, + "krbCCacheSecret": { + "description": "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbConfigConfigMap": { + "description": "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + "$ref": "#/definitions/io.k8s.api.core.v1.ConfigMapKeySelector" + }, + "krbKeytabSecret": { + "description": "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbRealm": { + "description": "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + }, + "krbServicePrincipalName": { + "description": "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + "type": "string" + }, + "krbUsername": { + "description": "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + } + } + }, + "io.argoproj.workflow.v1alpha1.HDFSKrbConfig": { + "description": "HDFSKrbConfig is auth configurations for Kerberos", + "properties": { + "krbCCacheSecret": { + "description": "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbConfigConfigMap": { + "description": "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + "$ref": "#/definitions/io.k8s.api.core.v1.ConfigMapKeySelector" + }, + "krbKeytabSecret": { + "description": "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + "$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector" + }, + "krbRealm": { + "description": "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + }, + "krbServicePrincipalName": { + "description": "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + "type": "string" + }, + "krbUsername": { + "description": "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + "type": "string" + } + } + }, "io.argoproj.workflow.v1alpha1.HTTPArtifact": { "description": "HTTPArtifact allows an file served on HTTP to be placed as an input artifact in a container", "required": [ diff --git a/examples/hdfs-artifact.yaml b/examples/hdfs-artifact.yaml new file mode 100644 index 000000000000..0031b756387f --- /dev/null +++ b/examples/hdfs-artifact.yaml @@ -0,0 +1,81 @@ +# This example demonstrates the use of hdfs as the store for artifacts. This example assumes the following: +# 1. you have hdfs running in the same namespace as where this workflow will be run and you have created a repo with the name "generic-local" +# 2. you have created a kubernetes secret for storing hdfs username/password. To create kubernetes secret required for this example, +# run the following command: +# $ kubectl create secret generic my-hdfs-credentials --from-literal=username= --from-literal=password= + +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: hdfs-artifact- +spec: + entrypoint: artifact-example + templates: + - name: artifact-example + steps: + - - name: generate-artifact + template: whalesay + - - name: consume-artifact + template: print-message + arguments: + artifacts: + - name: message + from: "{{steps.generate-artifact.outputs.artifacts.hello-art}}" + + - name: whalesay + container: + image: docker/whalesay:latest + command: [sh, -c] + args: ["cowsay hello world | tee /tmp/hello_world.txt"] + outputs: + artifacts: + - name: hello-art + path: /tmp/hello_world.txt + hdfs: + addresses: + - my-hdfs-namenode-0.my-hdfs-namenode.default.svc.cluster.local:8020 + - my-hdfs-namenode-1.my-hdfs-namenode.default.svc.cluster.local:8020 + path: "/tmp/argo/foo" + hdfsUser: root + force: true + # krbCCacheSecret: + # name: krb + # key: krb5cc_0 + # krbKeytabSecret: + # name: krb + # key: user1.keytab + # krbUsername: "user1" + # krbRealm: "MYCOMPANY.COM" + # krbConfigConfigMap: + # name: my-hdfs-krb5-config + # key: krb5.conf + # krbServicePrincipalName: hdfs/_HOST + + - name: print-message + inputs: + artifacts: + - name: message + path: /tmp/message + hdfs: + addresses: + - my-hdfs-namenode-0.my-hdfs-namenode.default.svc.cluster.local:8020 + - my-hdfs-namenode-1.my-hdfs-namenode.default.svc.cluster.local:8020 + path: "/tmp/argo/foo" + hdfsUser: root + force: true + # krbCCacheSecret: + # name: krb + # key: krb5cc_0 + # krbKeytabSecret: + # name: krb + # key: user1.keytab + # krbUsername: "user1" + # krbRealm: "MYCOMPANY.COM" + # krbConfigConfigMap: + # name: my-hdfs-krb5-config + # key: krb5.conf + # krbServicePrincipalName: hdfs/_HOST + container: + image: alpine:latest + command: [sh, -c] + args: ["cat /tmp/message"] diff --git a/pkg/apis/workflow/v1alpha1/openapi_generated.go b/pkg/apis/workflow/v1alpha1/openapi_generated.go index d7401fb91e99..81f606863664 100644 --- a/pkg/apis/workflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/workflow/v1alpha1/openapi_generated.go @@ -22,6 +22,9 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTask": schema_pkg_apis_workflow_v1alpha1_DAGTask(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.DAGTemplate": schema_pkg_apis_workflow_v1alpha1_DAGTemplate(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact": schema_pkg_apis_workflow_v1alpha1_GitArtifact(ref), + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSArtifact": schema_pkg_apis_workflow_v1alpha1_HDFSArtifact(ref), + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSConfig": schema_pkg_apis_workflow_v1alpha1_HDFSConfig(ref), + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSKrbConfig": schema_pkg_apis_workflow_v1alpha1_HDFSKrbConfig(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HTTPArtifact": schema_pkg_apis_workflow_v1alpha1_HTTPArtifact(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Inputs": schema_pkg_apis_workflow_v1alpha1_Inputs(ref), "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.Item": schema_pkg_apis_workflow_v1alpha1_Item(ref), @@ -177,6 +180,12 @@ func schema_pkg_apis_workflow_v1alpha1_Artifact(ref common.ReferenceCallback) co Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact"), }, }, + "hdfs": { + SchemaProps: spec.SchemaProps{ + Description: "HDFS contains HDFS artifact location details", + Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSArtifact"), + }, + }, "raw": { SchemaProps: spec.SchemaProps{ Description: "Raw contains raw artifact location details", @@ -201,7 +210,7 @@ func schema_pkg_apis_workflow_v1alpha1_Artifact(ref common.ReferenceCallback) co }, }, Dependencies: []string{ - "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArchiveStrategy", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HTTPArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RawArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.S3Artifact"}, + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArchiveStrategy", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HTTPArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RawArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.S3Artifact"}, } } @@ -242,6 +251,12 @@ func schema_pkg_apis_workflow_v1alpha1_ArtifactLocation(ref common.ReferenceCall Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact"), }, }, + "hdfs": { + SchemaProps: spec.SchemaProps{ + Description: "HDFS contains HDFS artifact location details", + Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSArtifact"), + }, + }, "raw": { SchemaProps: spec.SchemaProps{ Description: "Raw contains raw artifact location details", @@ -252,7 +267,7 @@ func schema_pkg_apis_workflow_v1alpha1_ArtifactLocation(ref common.ReferenceCall }, }, Dependencies: []string{ - "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HTTPArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RawArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.S3Artifact"}, + "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.ArtifactoryArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.GitArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HDFSArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.HTTPArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RawArtifact", "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.S3Artifact"}, } } @@ -480,6 +495,223 @@ func schema_pkg_apis_workflow_v1alpha1_GitArtifact(ref common.ReferenceCallback) } } +func schema_pkg_apis_workflow_v1alpha1_HDFSArtifact(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "HDFSArtifact is the location of an HDFS artifact", + Properties: map[string]spec.Schema{ + "krbCCacheSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbKeytabSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbUsername": { + SchemaProps: spec.SchemaProps{ + Description: "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbRealm": { + SchemaProps: spec.SchemaProps{ + Description: "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbConfigConfigMap": { + SchemaProps: spec.SchemaProps{ + Description: "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + Ref: ref("k8s.io/api/core/v1.ConfigMapKeySelector"), + }, + }, + "krbServicePrincipalName": { + SchemaProps: spec.SchemaProps{ + Description: "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "addresses": { + SchemaProps: spec.SchemaProps{ + Description: "Addresses is accessible addresses of HDFS name nodes", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "hdfsUser": { + SchemaProps: spec.SchemaProps{ + Description: "HDFSUser is the user to access HDFS file system. It is ignored if either ccache or keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "path": { + SchemaProps: spec.SchemaProps{ + Description: "Path is a file path in HDFS", + Type: []string{"string"}, + Format: "", + }, + }, + "force": { + SchemaProps: spec.SchemaProps{ + Description: "Force copies a file forcibly even if it exists (default: false)", + Type: []string{"boolean"}, + Format: "", + }, + }, + }, + Required: []string{"addresses", "path"}, + }, + }, + Dependencies: []string{ + "k8s.io/api/core/v1.ConfigMapKeySelector", "k8s.io/api/core/v1.SecretKeySelector"}, + } +} + +func schema_pkg_apis_workflow_v1alpha1_HDFSConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "HDFSConfig is configurations for HDFS", + Properties: map[string]spec.Schema{ + "krbCCacheSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbKeytabSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbUsername": { + SchemaProps: spec.SchemaProps{ + Description: "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbRealm": { + SchemaProps: spec.SchemaProps{ + Description: "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbConfigConfigMap": { + SchemaProps: spec.SchemaProps{ + Description: "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + Ref: ref("k8s.io/api/core/v1.ConfigMapKeySelector"), + }, + }, + "krbServicePrincipalName": { + SchemaProps: spec.SchemaProps{ + Description: "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "addresses": { + SchemaProps: spec.SchemaProps{ + Description: "Addresses is accessible addresses of HDFS name nodes", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "hdfsUser": { + SchemaProps: spec.SchemaProps{ + Description: "HDFSUser is the user to access HDFS file system. It is ignored if either ccache or keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"addresses"}, + }, + }, + Dependencies: []string{ + "k8s.io/api/core/v1.ConfigMapKeySelector", "k8s.io/api/core/v1.SecretKeySelector"}, + } +} + +func schema_pkg_apis_workflow_v1alpha1_HDFSKrbConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "HDFSKrbConfig is auth configurations for Kerberos", + Properties: map[string]spec.Schema{ + "krbCCacheSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbCCacheSecret is the secret selector for Kerberos ccache Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbKeytabSecret": { + SchemaProps: spec.SchemaProps{ + Description: "KrbKeytabSecret is the secret selector for Kerberos keytab Either ccache or keytab can be set to use Kerberos.", + Ref: ref("k8s.io/api/core/v1.SecretKeySelector"), + }, + }, + "krbUsername": { + SchemaProps: spec.SchemaProps{ + Description: "KrbUsername is the Kerberos username used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbRealm": { + SchemaProps: spec.SchemaProps{ + Description: "KrbRealm is the Kerberos realm used with Kerberos keytab It must be set if keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + "krbConfigConfigMap": { + SchemaProps: spec.SchemaProps{ + Description: "KrbConfig is the configmap selector for Kerberos config as string It must be set if either ccache or keytab is used.", + Ref: ref("k8s.io/api/core/v1.ConfigMapKeySelector"), + }, + }, + "krbServicePrincipalName": { + SchemaProps: spec.SchemaProps{ + Description: "KrbServicePrincipalName is the principal name of Kerberos service It must be set if either ccache or keytab is used.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/api/core/v1.ConfigMapKeySelector", "k8s.io/api/core/v1.SecretKeySelector"}, + } +} + func schema_pkg_apis_workflow_v1alpha1_HTTPArtifact(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/pkg/apis/workflow/v1alpha1/types.go b/pkg/apis/workflow/v1alpha1/types.go index 3b0bef13553b..22700b5b5ff9 100644 --- a/pkg/apis/workflow/v1alpha1/types.go +++ b/pkg/apis/workflow/v1alpha1/types.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "hash/fnv" + "strings" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -329,6 +330,9 @@ type ArtifactLocation struct { // Artifactory contains artifactory artifact location details Artifactory *ArtifactoryArtifact `json:"artifactory,omitempty"` + // HDFS contains HDFS artifact location details + HDFS *HDFSArtifact `json:"hdfs,omitempty"` + // Raw contains raw artifact location details Raw *RawArtifact `json:"raw,omitempty"` } @@ -653,6 +657,68 @@ func (a *ArtifactoryArtifact) String() string { return a.URL } +// HDFSArtifact is the location of an HDFS artifact +type HDFSArtifact struct { + HDFSConfig `json:",inline"` + + // Path is a file path in HDFS + Path string `json:"path"` + + // Force copies a file forcibly even if it exists (default: false) + Force bool `json:"force,omitempty"` +} + +// HDFSConfig is configurations for HDFS +type HDFSConfig struct { + HDFSKrbConfig `json:",inline"` + + // Addresses is accessible addresses of HDFS name nodes + Addresses []string `json:"addresses"` + + // HDFSUser is the user to access HDFS file system. + // It is ignored if either ccache or keytab is used. + HDFSUser string `json:"hdfsUser,omitempty"` +} + +// HDFSKrbConfig is auth configurations for Kerberos +type HDFSKrbConfig struct { + // KrbCCacheSecret is the secret selector for Kerberos ccache + // Either ccache or keytab can be set to use Kerberos. + KrbCCacheSecret *apiv1.SecretKeySelector `json:"krbCCacheSecret,omitempty"` + + // KrbKeytabSecret is the secret selector for Kerberos keytab + // Either ccache or keytab can be set to use Kerberos. + KrbKeytabSecret *apiv1.SecretKeySelector `json:"krbKeytabSecret,omitempty"` + + // KrbUsername is the Kerberos username used with Kerberos keytab + // It must be set if keytab is used. + KrbUsername string `json:"krbUsername,omitempty"` + + // KrbRealm is the Kerberos realm used with Kerberos keytab + // It must be set if keytab is used. + KrbRealm string `json:"krbRealm,omitempty"` + + // KrbConfig is the configmap selector for Kerberos config as string + // It must be set if either ccache or keytab is used. + KrbConfigConfigMap *apiv1.ConfigMapKeySelector `json:"krbConfigConfigMap,omitempty"` + + // KrbServicePrincipalName is the principal name of Kerberos service + // It must be set if either ccache or keytab is used. + KrbServicePrincipalName string `json:"krbServicePrincipalName,omitempty"` +} + +func (a *HDFSArtifact) String() string { + var cred string + if a.HDFSUser != "" { + cred = fmt.Sprintf("HDFS user %s", a.HDFSUser) + } else if a.KrbCCacheSecret != nil { + cred = fmt.Sprintf("ccache %v", a.KrbCCacheSecret.Name) + } else if a.KrbKeytabSecret != nil { + cred = fmt.Sprintf("keytab %v (%s/%s)", a.KrbKeytabSecret.Name, a.KrbUsername, a.KrbRealm) + } + return fmt.Sprintf("hdfs://%s/%s with %s", strings.Join(a.Addresses, ", "), a.Path, cred) +} + // RawArtifact allows raw string content to be placed as an artifact in a container type RawArtifact struct { // Data is the string contents of the artifact @@ -840,7 +906,7 @@ func (args *Arguments) GetParameterByName(name string) *Parameter { // HasLocation whether or not an artifact has a location defined func (a *Artifact) HasLocation() bool { - return a.S3 != nil || a.Git != nil || a.HTTP != nil || a.Artifactory != nil || a.Raw != nil + return a.S3 != nil || a.Git != nil || a.HTTP != nil || a.Artifactory != nil || a.Raw != nil || a.HDFS != nil } // GetTemplate retrieves a defined template by its name diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go index e8e1f77e4cca..a95538470e8f 100644 --- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go @@ -120,6 +120,11 @@ func (in *ArtifactLocation) DeepCopyInto(out *ArtifactLocation) { *out = new(ArtifactoryArtifact) (*in).DeepCopyInto(*out) } + if in.HDFS != nil { + in, out := &in.HDFS, &out.HDFS + *out = new(HDFSArtifact) + (*in).DeepCopyInto(*out) + } if in.Raw != nil { in, out := &in.Raw, &out.Raw *out = new(RawArtifact) @@ -269,6 +274,76 @@ func (in *GitArtifact) DeepCopy() *GitArtifact { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HDFSArtifact) DeepCopyInto(out *HDFSArtifact) { + *out = *in + in.HDFSConfig.DeepCopyInto(&out.HDFSConfig) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HDFSArtifact. +func (in *HDFSArtifact) DeepCopy() *HDFSArtifact { + if in == nil { + return nil + } + out := new(HDFSArtifact) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HDFSConfig) DeepCopyInto(out *HDFSConfig) { + *out = *in + in.HDFSKrbConfig.DeepCopyInto(&out.HDFSKrbConfig) + if in.Addresses != nil { + in, out := &in.Addresses, &out.Addresses + *out = make([]string, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HDFSConfig. +func (in *HDFSConfig) DeepCopy() *HDFSConfig { + if in == nil { + return nil + } + out := new(HDFSConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HDFSKrbConfig) DeepCopyInto(out *HDFSKrbConfig) { + *out = *in + if in.KrbCCacheSecret != nil { + in, out := &in.KrbCCacheSecret, &out.KrbCCacheSecret + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } + if in.KrbKeytabSecret != nil { + in, out := &in.KrbKeytabSecret, &out.KrbKeytabSecret + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } + if in.KrbConfigConfigMap != nil { + in, out := &in.KrbConfigConfigMap, &out.KrbConfigConfigMap + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HDFSKrbConfig. +func (in *HDFSKrbConfig) DeepCopy() *HDFSKrbConfig { + if in == nil { + return nil + } + out := new(HDFSKrbConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPArtifact) DeepCopyInto(out *HTTPArtifact) { *out = *in diff --git a/workflow/artifacts/hdfs/hdfs.go b/workflow/artifacts/hdfs/hdfs.go new file mode 100644 index 000000000000..2209fc0367bb --- /dev/null +++ b/workflow/artifacts/hdfs/hdfs.go @@ -0,0 +1,217 @@ +package hdfs + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/argoproj/pkg/file" + "gopkg.in/jcmturner/gokrb5.v5/credentials" + "gopkg.in/jcmturner/gokrb5.v5/keytab" + + "github.com/argoproj/argo/errors" + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/util" + "github.com/argoproj/argo/workflow/common" +) + +// ArtifactDriver is a driver for HDFS +type ArtifactDriver struct { + Addresses []string // comma-separated name nodes + Path string + Force bool + HDFSUser string + KrbOptions *KrbOptions +} + +// KrbOptions is options for Kerberos +type KrbOptions struct { + CCacheOptions *CCacheOptions + KeytabOptions *KeytabOptions + Config string + ServicePrincipalName string +} + +// CCacheOptions is options for ccache +type CCacheOptions struct { + CCache credentials.CCache +} + +// KeytabOptions is options for keytab +type KeytabOptions struct { + Keytab keytab.Keytab + Username string + Realm string +} + +// ValidateArtifact validates HDFS artifact +func ValidateArtifact(errPrefix string, art *wfv1.HDFSArtifact) error { + if len(art.Addresses) == 0 { + return errors.Errorf(errors.CodeBadRequest, "%s.addresses is required", errPrefix) + } + if art.Path == "" { + return errors.Errorf(errors.CodeBadRequest, "%s.path is required", errPrefix) + } + if !filepath.IsAbs(art.Path) { + return errors.Errorf(errors.CodeBadRequest, "%s.path must be a absolute file path", errPrefix) + } + + hasKrbCCache := art.KrbCCacheSecret != nil + hasKrbKeytab := art.KrbKeytabSecret != nil + + if art.HDFSUser == "" && !hasKrbCCache && !hasKrbKeytab { + return errors.Errorf(errors.CodeBadRequest, "either %s.hdfsUser, %s.krbCCacheSecret or %s.krbKeytabSecret is required", errPrefix, errPrefix, errPrefix) + } + if hasKrbKeytab && (art.KrbServicePrincipalName == "" || art.KrbConfigConfigMap == nil || art.KrbUsername == "" || art.KrbRealm == "") { + return errors.Errorf(errors.CodeBadRequest, "%s.krbServicePrincipalName, %s.krbConfigConfigMap, %s.krbUsername and %s.krbRealm are required with %s.krbKeytabSecret", errPrefix, errPrefix, errPrefix, errPrefix, errPrefix) + } + if hasKrbCCache && (art.KrbServicePrincipalName == "" || art.KrbConfigConfigMap == nil) { + return errors.Errorf(errors.CodeBadRequest, "%s.krbServicePrincipalName and %s.krbConfigConfigMap are required with %s.krbCCacheSecret", errPrefix, errPrefix, errPrefix) + } + return nil +} + +// CreateDriver constructs ArtifactDriver +func CreateDriver(ci common.ResourceInterface, art *wfv1.HDFSArtifact) (*ArtifactDriver, error) { + var krbConfig string + var krbOptions *KrbOptions + var err error + + namespace := ci.GetNamespace() + + if art.KrbConfigConfigMap != nil && art.KrbConfigConfigMap.Name != "" { + krbConfig, err = ci.GetConfigMapKey(namespace, art.KrbConfigConfigMap.Name, art.KrbConfigConfigMap.Key) + if err != nil { + return nil, err + } + } + if art.KrbCCacheSecret != nil && art.KrbCCacheSecret.Name != "" { + bytes, err := ci.GetSecrets(namespace, art.KrbCCacheSecret.Name, art.KrbCCacheSecret.Key) + if err != nil { + return nil, err + } + ccache, err := credentials.ParseCCache(bytes) + if err != nil { + return nil, err + } + krbOptions = &KrbOptions{ + CCacheOptions: &CCacheOptions{ + CCache: ccache, + }, + Config: krbConfig, + ServicePrincipalName: art.KrbServicePrincipalName, + } + } + if art.KrbKeytabSecret != nil && art.KrbKeytabSecret.Name != "" { + bytes, err := ci.GetSecrets(namespace, art.KrbKeytabSecret.Name, art.KrbKeytabSecret.Key) + if err != nil { + return nil, err + } + ktb, err := keytab.Parse(bytes) + if err != nil { + return nil, err + } + krbOptions = &KrbOptions{ + KeytabOptions: &KeytabOptions{ + Keytab: ktb, + Username: art.KrbUsername, + Realm: art.KrbRealm, + }, + Config: krbConfig, + ServicePrincipalName: art.KrbServicePrincipalName, + } + } + + driver := ArtifactDriver{ + Addresses: art.Addresses, + Path: art.Path, + Force: art.Force, + HDFSUser: art.HDFSUser, + KrbOptions: krbOptions, + } + return &driver, nil +} + +// Load downloads artifacts from HDFS compliant storage +func (driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { + hdfscli, err := createHDFSClient(driver.Addresses, driver.HDFSUser, driver.KrbOptions) + if err != nil { + return err + } + defer util.Close(hdfscli) + + srcStat, err := hdfscli.Stat(driver.Path) + if err != nil { + return err + } + if srcStat.IsDir() { + return fmt.Errorf("HDFS artifact does not suppot directory copy") + } + + _, err = os.Stat(path) + if err != nil && !os.IsNotExist(err) { + return err + } + + if os.IsNotExist(err) { + dirPath := filepath.Dir(driver.Path) + if dirPath != "." && dirPath != "/" { + // Follow umask for the permission + err = os.MkdirAll(dirPath, 0777) + if err != nil { + return err + } + } + } else { + if driver.Force { + err = os.Remove(path) + if err != nil && !os.IsNotExist(err) { + return err + } + } + } + + return hdfscli.CopyToLocal(driver.Path, path) +} + +// Save saves an artifact to HDFS compliant storage +func (driver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error { + hdfscli, err := createHDFSClient(driver.Addresses, driver.HDFSUser, driver.KrbOptions) + if err != nil { + return err + } + defer util.Close(hdfscli) + + isDir, err := file.IsDirectory(path) + if err != nil { + return err + } + if isDir { + return fmt.Errorf("HDFS artifact does not suppot directory copy") + } + + _, err = hdfscli.Stat(driver.Path) + if err != nil && !os.IsNotExist(err) { + return err + } + + if os.IsNotExist(err) { + dirPath := filepath.Dir(driver.Path) + if dirPath != "." && dirPath != "/" { + // Follow umask for the permission + err = hdfscli.MkdirAll(dirPath, 0777) + if err != nil { + return err + } + } + } else { + if driver.Force { + err = hdfscli.Remove(driver.Path) + if err != nil && !os.IsNotExist(err) { + return err + } + } + } + + return hdfscli.CopyToRemote(path, driver.Path) +} diff --git a/workflow/artifacts/hdfs/util.go b/workflow/artifacts/hdfs/util.go new file mode 100644 index 000000000000..3af330ae012e --- /dev/null +++ b/workflow/artifacts/hdfs/util.go @@ -0,0 +1,53 @@ +package hdfs + +import ( + "fmt" + + "github.com/colinmarc/hdfs" + krb "gopkg.in/jcmturner/gokrb5.v5/client" + "gopkg.in/jcmturner/gokrb5.v5/config" +) + +func createHDFSClient(addresses []string, user string, krbOptions *KrbOptions) (*hdfs.Client, error) { + options := hdfs.ClientOptions{ + Addresses: addresses, + } + + if krbOptions != nil { + krbClient, err := createKrbClient(krbOptions) + if err != nil { + return nil, err + } + options.KerberosClient = krbClient + options.KerberosServicePrincipleName = krbOptions.ServicePrincipalName + } else { + options.User = user + } + + return hdfs.NewClient(options) +} + +func createKrbClient(krbOptions *KrbOptions) (*krb.Client, error) { + krbConfig, err := config.NewConfigFromString(krbOptions.Config) + if err != nil { + return nil, err + } + + if krbOptions.CCacheOptions != nil { + client, err := krb.NewClientFromCCache(krbOptions.CCacheOptions.CCache) + if err != nil { + return nil, err + } + return client.WithConfig(krbConfig), nil + } else if krbOptions.KeytabOptions != nil { + client := krb.NewClientWithKeytab(krbOptions.KeytabOptions.Username, krbOptions.KeytabOptions.Realm, krbOptions.KeytabOptions.Keytab) + client = *client.WithConfig(krbConfig) + err = client.Login() + if err != nil { + return nil, err + } + return &client, nil + } + + return nil, fmt.Errorf("Failed to get a Kerberos client") +} diff --git a/workflow/common/common.go b/workflow/common/common.go index ce5ce1602097..7b326186464e 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -119,3 +119,9 @@ type ExecutionControl struct { // used to support workflow or steps/dag level timeouts. Deadline *time.Time `json:"deadline,omitempty"` } + +type ResourceInterface interface { + GetNamespace() string + GetSecrets(namespace, name, key string) ([]byte, error) + GetConfigMapKey(namespace, name, key string) (string, error) +} diff --git a/workflow/controller/config.go b/workflow/controller/config.go index 52ff479a66a6..31874163b581 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -70,6 +70,8 @@ type ArtifactRepository struct { S3 *S3ArtifactRepository `json:"s3,omitempty"` // Artifactory stores artifacts to JFrog Artifactory Artifactory *ArtifactoryArtifactRepository `json:"artifactory,omitempty"` + // HDFS stores artifacts in HDFS + HDFS *HDFSArtifactRepository `json:"hdfs,omitempty"` } // S3ArtifactRepository defines the controller configuration for an S3 artifact repository @@ -91,6 +93,17 @@ type ArtifactoryArtifactRepository struct { RepoURL string `json:"repoURL,omitempty"` } +// HDFSArtifactRepository defines the controller configuration for an HDFS artifact repository +type HDFSArtifactRepository struct { + wfv1.HDFSConfig `json:",inline"` + + // PathFormat is defines the format of path to store a file. Can reference workflow variables + PathFormat string `json:"pathFormat,omitempty"` + + // Force copies a file forcibly even if it exists (default: false) + Force bool `json:"force,omitempty"` +} + // ResyncConfig reloads the controller config from the configmap func (wfc *WorkflowController) ResyncConfig() error { cmClient := wfc.kubeclientset.CoreV1().ConfigMaps(wfc.namespace) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 9bbc803b167f..4c47e23b3b8d 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -548,7 +548,7 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat ArchiveLogs: woc.controller.Config.ArtifactRepository.ArchiveLogs, } } - if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil { + if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil || tmpl.ArchiveLocation.HDFS != nil { // User explicitly set the location. nothing else to do. return nil } @@ -584,6 +584,13 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat ArtifactoryAuth: woc.controller.Config.ArtifactRepository.Artifactory.ArtifactoryAuth, URL: artURL, } + } else if hdfsLocation := woc.controller.Config.ArtifactRepository.HDFS; hdfsLocation != nil { + log.Debugf("Setting HDFS artifact repository information") + tmpl.ArchiveLocation.HDFS = &wfv1.HDFSArtifact{ + HDFSConfig: hdfsLocation.HDFSConfig, + Path: hdfsLocation.PathFormat, + Force: hdfsLocation.Force, + } } else { for _, art := range tmpl.Outputs.Artifacts { if !art.HasLocation() { diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 017d89dfad09..cc55964e265c 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -25,6 +25,7 @@ import ( artifact "github.com/argoproj/argo/workflow/artifacts" "github.com/argoproj/argo/workflow/artifacts/artifactory" "github.com/argoproj/argo/workflow/artifacts/git" + "github.com/argoproj/argo/workflow/artifacts/hdfs" "github.com/argoproj/argo/workflow/artifacts/http" "github.com/argoproj/argo/workflow/artifacts/raw" "github.com/argoproj/argo/workflow/artifacts/s3" @@ -40,6 +41,8 @@ import ( // WorkflowExecutor is program which runs as the init/wait container type WorkflowExecutor struct { + common.ResourceInterface + PodName string Template wfv1.Template ClientSet kubernetes.Interface @@ -50,8 +53,10 @@ type WorkflowExecutor struct { // memoized container ID to prevent multiple lookups mainContainerID string + // memoized configmaps + memoizedConfigMaps map[string]string // memoized secrets - memoizedSecrets map[string]string + memoizedSecrets map[string][]byte // list of errors that occurred during execution. // the first of these is used as the overall message of the node errors []error @@ -87,7 +92,8 @@ func NewExecutor(clientset kubernetes.Interface, podName, namespace, podAnnotati Namespace: namespace, PodAnnotationsPath: podAnnotationsPath, RuntimeExecutor: cre, - memoizedSecrets: map[string]string{}, + memoizedConfigMaps: map[string]string{}, + memoizedSecrets: map[string][]byte{}, errors: []error{}, } } @@ -253,6 +259,10 @@ func (we *WorkflowExecutor) saveArtifact(tempOutArtDir string, mainCtrID string, } artifactoryURL.Path = path.Join(artifactoryURL.Path, fileName) art.Artifactory.URL = artifactoryURL.String() + } else if we.Template.ArchiveLocation.HDFS != nil { + shallowCopy := *we.Template.ArchiveLocation.HDFS + art.HDFS = &shallowCopy + art.HDFS.Path = path.Join(art.HDFS.Path, fileName) } else { return errors.Errorf(errors.CodeBadRequest, "Unable to determine path to store %s. Archive location provided no information", art.Name) } @@ -393,6 +403,10 @@ func (we *WorkflowExecutor) SaveLogs() (*wfv1.Artifact, error) { } artifactoryURL.Path = path.Join(artifactoryURL.Path, fileName) art.Artifactory.URL = artifactoryURL.String() + } else if we.Template.ArchiveLocation.HDFS != nil { + shallowCopy := *we.Template.ArchiveLocation.HDFS + art.HDFS = &shallowCopy + art.HDFS.Path = path.Join(art.HDFS.Path, fileName) } else { return nil, errors.Errorf(errors.CodeBadRequest, "Unable to determine path to store %s. Archive location provided no information", art.Name) } @@ -415,15 +429,16 @@ func (we *WorkflowExecutor) InitDriver(art wfv1.Artifact) (artifact.ArtifactDriv var secretKey string if art.S3.AccessKeySecret.Name != "" { - var err error - accessKey, err = we.getSecrets(we.Namespace, art.S3.AccessKeySecret.Name, art.S3.AccessKeySecret.Key) + accessKeyBytes, err := we.GetSecrets(we.Namespace, art.S3.AccessKeySecret.Name, art.S3.AccessKeySecret.Key) if err != nil { return nil, err } - secretKey, err = we.getSecrets(we.Namespace, art.S3.SecretKeySecret.Name, art.S3.SecretKeySecret.Key) + accessKey = string(accessKeyBytes) + secretKeyBytes, err := we.GetSecrets(we.Namespace, art.S3.SecretKeySecret.Name, art.S3.SecretKeySecret.Key) if err != nil { return nil, err } + secretKey = string(secretKeyBytes) } driver := s3.S3ArtifactDriver{ @@ -441,45 +456,48 @@ func (we *WorkflowExecutor) InitDriver(art wfv1.Artifact) (artifact.ArtifactDriv if art.Git != nil { gitDriver := git.GitArtifactDriver{} if art.Git.UsernameSecret != nil { - username, err := we.getSecrets(we.Namespace, art.Git.UsernameSecret.Name, art.Git.UsernameSecret.Key) + usernameBytes, err := we.GetSecrets(we.Namespace, art.Git.UsernameSecret.Name, art.Git.UsernameSecret.Key) if err != nil { return nil, err } - gitDriver.Username = username + gitDriver.Username = string(usernameBytes) } if art.Git.PasswordSecret != nil { - password, err := we.getSecrets(we.Namespace, art.Git.PasswordSecret.Name, art.Git.PasswordSecret.Key) + passwordBytes, err := we.GetSecrets(we.Namespace, art.Git.PasswordSecret.Name, art.Git.PasswordSecret.Key) if err != nil { return nil, err } - gitDriver.Password = password + gitDriver.Password = string(passwordBytes) } if art.Git.SSHPrivateKeySecret != nil { - sshPrivateKey, err := we.getSecrets(we.Namespace, art.Git.SSHPrivateKeySecret.Name, art.Git.SSHPrivateKeySecret.Key) + sshPrivateKeyBytes, err := we.GetSecrets(we.Namespace, art.Git.SSHPrivateKeySecret.Name, art.Git.SSHPrivateKeySecret.Key) if err != nil { return nil, err } - gitDriver.SSHPrivateKey = sshPrivateKey + gitDriver.SSHPrivateKey = string(sshPrivateKeyBytes) } return &gitDriver, nil } if art.Artifactory != nil { - username, err := we.getSecrets(we.Namespace, art.Artifactory.UsernameSecret.Name, art.Artifactory.UsernameSecret.Key) + usernameBytes, err := we.GetSecrets(we.Namespace, art.Artifactory.UsernameSecret.Name, art.Artifactory.UsernameSecret.Key) if err != nil { return nil, err } - password, err := we.getSecrets(we.Namespace, art.Artifactory.PasswordSecret.Name, art.Artifactory.PasswordSecret.Key) + passwordBytes, err := we.GetSecrets(we.Namespace, art.Artifactory.PasswordSecret.Name, art.Artifactory.PasswordSecret.Key) if err != nil { return nil, err } driver := artifactory.ArtifactoryArtifactDriver{ - Username: username, - Password: password, + Username: string(usernameBytes), + Password: string(passwordBytes), } return &driver, nil } + if art.HDFS != nil { + return hdfs.CreateDriver(we, art.HDFS) + } if art.Raw != nil { return &raw.RawArtifactDriver{}, nil } @@ -508,8 +526,48 @@ func (we *WorkflowExecutor) getPod() (*apiv1.Pod, error) { return pod, nil } -// getSecrets retrieves a secret value and memoizes the result -func (we *WorkflowExecutor) getSecrets(namespace, name, key string) (string, error) { +// GetNamespace returns the namespace +func (we *WorkflowExecutor) GetNamespace() string { + return we.Namespace +} + +// GetConfigMapKey retrieves a configmap value and memoizes the result +func (we *WorkflowExecutor) GetConfigMapKey(namespace, name, key string) (string, error) { + cachedKey := fmt.Sprintf("%s/%s/%s", namespace, name, key) + if val, ok := we.memoizedConfigMaps[cachedKey]; ok { + return val, nil + } + configmapsIf := we.ClientSet.CoreV1().ConfigMaps(namespace) + var configmap *apiv1.ConfigMap + var err error + _ = wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { + configmap, err = configmapsIf.Get(name, metav1.GetOptions{}) + if err != nil { + log.Warnf("Failed to get configmap '%s': %v", name, err) + if !retry.IsRetryableKubeAPIError(err) { + return false, err + } + return false, nil + } + return true, nil + }) + if err != nil { + return "", errors.InternalWrapError(err) + } + // memoize all keys in the configmap since it's highly likely we will need to get a + // subsequent key in the configmap (e.g. username + password) and we can save an API call + for k, v := range configmap.Data { + we.memoizedConfigMaps[fmt.Sprintf("%s/%s/%s", namespace, name, k)] = v + } + val, ok := we.memoizedConfigMaps[cachedKey] + if !ok { + return "", errors.Errorf(errors.CodeBadRequest, "configmap '%s' does not have the key '%s'", name, key) + } + return val, nil +} + +// GetSecrets retrieves a secret value and memoizes the result +func (we *WorkflowExecutor) GetSecrets(namespace, name, key string) ([]byte, error) { cachedKey := fmt.Sprintf("%s/%s/%s", namespace, name, key) if val, ok := we.memoizedSecrets[cachedKey]; ok { return val, nil @@ -529,16 +587,16 @@ func (we *WorkflowExecutor) getSecrets(namespace, name, key string) (string, err return true, nil }) if err != nil { - return "", errors.InternalWrapError(err) + return []byte{}, errors.InternalWrapError(err) } // memoize all keys in the secret since it's highly likely we will need to get a // subsequent key in the secret (e.g. username + password) and we can save an API call for k, v := range secret.Data { - we.memoizedSecrets[fmt.Sprintf("%s/%s/%s", namespace, name, k)] = string(v) + we.memoizedSecrets[fmt.Sprintf("%s/%s/%s", namespace, name, k)] = v } val, ok := we.memoizedSecrets[cachedKey] if !ok { - return "", errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key) + return []byte{}, errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key) } return val, nil } diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index 83d813b7a485..8e39608af0a5 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -10,6 +10,7 @@ import ( "github.com/argoproj/argo/errors" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/workflow/artifacts/hdfs" "github.com/argoproj/argo/workflow/common" "github.com/valyala/fasttemplate" apivalidation "k8s.io/apimachinery/pkg/util/validation" @@ -197,6 +198,12 @@ func validateArtifactLocation(errPrefix string, art wfv1.Artifact) error { return errors.Errorf(errors.CodeBadRequest, "%s.git.repo is required", errPrefix) } } + if art.HDFS != nil { + err := hdfs.ValidateArtifact(fmt.Sprintf("%s.hdfs", errPrefix), art.HDFS) + if err != nil { + return err + } + } // TODO: validate other artifact locations return nil }