Skip to content

Commit

Permalink
fix(k8s): fix various issues with Kubernetes API queries
Browse files Browse the repository at this point in the history
Basically I've completely overhauled the API wrapper, so that it should
dynamically and consistently support any API resource properly. This is
much cleaner and more similar to how kubectl works internally, for example.
  • Loading branch information
edvald committed Jun 10, 2019
1 parent bc6f5d7 commit c7839e9
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 121 deletions.
297 changes: 177 additions & 120 deletions garden-service/src/plugins/kubernetes/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import { resolve } from "url"
import {
KubeConfig,
Core_v1Api,
Expand All @@ -15,20 +16,48 @@ import {
Apiextensions_v1beta1Api,
V1Secret,
Policy_v1beta1Api,
Storage_v1Api,
CoreApi,
ApisApi,
V1APIGroup,
V1APIVersions,
V1APIResource,
} from "@kubernetes/client-node"

import AsyncLock = require("async-lock")
import request = require("request-promise")
import requestErrors = require("request-promise/errors")

import { safeLoad, safeDump } from "js-yaml"
import { zip, omitBy, isObject } from "lodash"

import { Omit } from "../../util/util"
import { zip, omitBy, isObject, keyBy } from "lodash"
import { GardenBaseError, RuntimeError, ConfigurationError } from "../../exceptions"
import { KubernetesResource } from "./types"
import { LogEntry } from "../../logger/log-entry"
import { splitLast, findByName } from "../../util/util"
import { kubectl } from "./kubectl"

interface ApiGroupMap {
[groupVersion: string]: V1APIGroup
}

interface ApiResourceMap {
[kind: string]: V1APIResource
}

interface ApiInfo {
coreApi: V1APIVersions
groups: V1APIGroup[]
groupMap: ApiGroupMap
resources: { [group: string]: ApiResourceMap }
}

interface ApiResourceInfo {
group: V1APIGroup
resource: V1APIResource
}

const cachedConfigs: { [context: string]: KubeConfig } = {}
const cachedApiInfo: { [context: string]: ApiInfo } = {}
const apiInfoLock = new AsyncLock()

// NOTE: be warned, the API of the client library is very likely to change

Expand All @@ -42,11 +71,14 @@ type K8sApiConstructor<T extends K8sApi> = new (basePath?: string) => T

const apiTypes: { [key: string]: K8sApiConstructor<any> } = {
apiExtensions: Apiextensions_v1beta1Api,
apis: ApisApi,
apps: Apps_v1Api,
core: Core_v1Api,
coreApi: CoreApi,
extensions: Extensions_v1beta1Api,
policy: Policy_v1beta1Api,
rbac: RbacAuthorization_v1Api,
storage: Storage_v1Api,
}

const crudMap = {
Expand All @@ -71,8 +103,10 @@ export class KubernetesError extends GardenBaseError {

export class KubeApi {
public apiExtensions: Apiextensions_v1beta1Api
public apis: ApisApi
public apps: Apps_v1Api
public core: Core_v1Api
public coreApi: CoreApi
public extensions: Extensions_v1beta1Api
public policy: Policy_v1beta1Api
public rbac: RbacAuthorization_v1Api
Expand All @@ -98,128 +132,144 @@ export class KubeApi {
return new KubeApi(context, config)
}

async readBySpec(namespace: string, spec: KubernetesResource, log: LogEntry) {
// this is just awful, sorry. any better ideas? - JE
const name = spec.metadata.name

switch (spec.kind) {
case "ConfigMap":
return this.core.readNamespacedConfigMap(name, namespace)
case "Endpoints":
return this.core.readNamespacedEndpoints(name, namespace)
case "LimitRange":
return this.core.readNamespacedLimitRange(name, namespace)
case "PersistentVolumeClaim":
return this.core.readNamespacedPersistentVolumeClaim(name, namespace)
case "Pod":
return this.core.readNamespacedPod(name, namespace)
case "PodTemplate":
return this.core.readNamespacedPodTemplate(name, namespace)
case "ReplicationController":
return this.core.readNamespacedReplicationController(name, namespace)
case "ResourceQuota":
return this.core.readNamespacedResourceQuota(name, namespace)
case "Secret":
return this.core.readNamespacedSecret(name, namespace)
case "Service":
return this.core.readNamespacedService(name, namespace)
case "ServiceAccount":
return this.core.readNamespacedServiceAccount(name, namespace)
case "DaemonSet":
return this.extensions.readNamespacedDaemonSet(name, namespace)
case "Deployment":
return this.extensions.readNamespacedDeployment(name, namespace)
case "Ingress":
return this.extensions.readNamespacedIngress(name, namespace)
case "ReplicaSet":
return this.extensions.readNamespacedReplicaSet(name, namespace)
case "StatefulSet":
return this.apps.readNamespacedStatefulSet(name, namespace)
case "ClusterRole":
return this.rbac.readClusterRole(name)
case "ClusterRoleBinding":
return this.rbac.readClusterRoleBinding(name)
case "Role":
return this.rbac.readNamespacedRole(name, namespace)
case "RoleBinding":
return this.rbac.readNamespacedRoleBinding(name, namespace)
case "CustomResourceDefinition":
return this.apiExtensions.readCustomResourceDefinition(name)
case "PodDisruptionBudget":
return this.policy.readNamespacedPodDisruptionBudget(name, namespace)
default:
// Handle CRDs
const apiVersion = spec.apiVersion
const baseUrl = `${this.config.getCurrentCluster()!.server}/apis/${apiVersion}`

const [group, version] = splitLast(apiVersion, "/")

if (!group || !version) {
throw new KubernetesError(`Invalid apiVersion ${apiVersion}`, { spec })
}

let url: string

if (!group.includes(".") && group.endsWith("k8s.io")) {
// Looks like a built-in object
// TODO: this is awful, need to find out where to look this up...
let plural: string
async getApiInfo(): Promise<ApiInfo> {
if (cachedApiInfo[this.context]) {
return cachedApiInfo[this.context]
}

if (spec.kind.endsWith("s")) {
plural = spec.kind + "es"
} else if (spec.kind.endsWith("y")) {
plural = spec.kind.slice(0, spec.kind.length - 1) + "ies"
} else {
plural = spec.kind + "s"
return apiInfoLock.acquire(this.context, async () => {
if (cachedApiInfo[this.context] === undefined) {
const coreApi = await this.coreApi.getAPIVersions()
const apis = await this.apis.getAPIVersions()

const coreGroups: V1APIGroup[] = coreApi.body.versions.map(version => ({
apiVersion: "v1",
kind: "ApiGroup",
name: version,
preferredVersion: {
groupVersion: version,
version,
},
versions: [
{
groupVersion: "core/" + version,
version: "core/" + version,
},
{
groupVersion: version,
version,
},
],
serverAddressByClientCIDRs: coreApi.body.serverAddressByClientCIDRs,
}))

const groups = coreGroups.concat(apis.body.groups)
const groupMap: ApiGroupMap = {}

for (const group of groups) {
for (const version of group.versions) {
groupMap[version.groupVersion] = group
}
// /apis/networking.istio.io/v1alpha3/namespaces/gis-backend/virtualservices/gis-elasticsearch-master
// /apis/networking.istio.io/v1alpha3/namespaces/gis-backend/virtualservices/gis-elasticsearch-master
url = spec.metadata.namespace
? `${baseUrl}/namespaces/${namespace}/${plural}/${name}`
: `${baseUrl}/${plural}/${name}`

} else {
// Must be a CRD then...
const crd = await this.findCrd(group, version, spec.kind)

const plural = crd.spec.names.plural
url = crd.spec.scope === "Namespaced"
? `${baseUrl}/namespaces/${namespace}/${plural}/${name}`
: `${baseUrl}/${plural}/${name}`
}

log.silly(`GET ${url}`)
const info = {
coreApi: coreApi.body,
groups,
groupMap,
resources: {},
}

const opts: request.Options = { method: "get", url, json: true, resolveWithFullResponse: true }
this.config.applyToRequest(opts)
cachedApiInfo[this.context] = info
}

try {
return await request(opts)
} catch (err) {
handleRequestPromiseError(err)
}
return cachedApiInfo[this.context]
})
}

async getApiGroup(resource: KubernetesResource) {
const apiInfo = await this.getApiInfo()
const apiVersion = resource.apiVersion
const group = apiInfo.groupMap[apiVersion]

if (!group) {
throw new KubernetesError(`Unrecognized apiVersion: ${apiVersion}`, {
apiVersion,
resource,
})
}

return group
}

async findCrd(group: string, version: string, kind: string) {
const crds = (await this.apiExtensions.listCustomResourceDefinition()).body
async getApiResourceInfo(log: LogEntry, manifest: KubernetesResource): Promise<ApiResourceInfo> {
const apiInfo = await this.getApiInfo()
const group = await this.getApiGroup(manifest)
const groupId = group.preferredVersion.groupVersion

for (const crd of crds.items) {
if (
crd.spec.group === group &&
crd.status.acceptedNames.kind === kind &&
findByName(crd.spec.versions, version)
) {
return crd
const lockKey = `${this.context}/${groupId}`
const resourceMap = apiInfo.resources[groupId] || await apiInfoLock.acquire(lockKey, async () => {
if (apiInfo.resources[groupId]) {
return apiInfo.resources[groupId]
}
}

throw new KubernetesError(`Could not find resource type ${group}/${version}/${kind}`, {
group,
version,
kind,
availableCrds: crds.items,
log.debug(`Kubernetes: Getting API resource info for group ${groupId}`)
const res = await this.request(log, getGroupBasePath(groupId))

// We're only interested in the entities themselves, not the sub-resources
const resources = res.body.resources.filter(r => !r.name.includes("/"))

apiInfo.resources[groupId] = keyBy(resources, "kind")
return apiInfo.resources[groupId]
})

const resource = resourceMap[manifest.kind]

if (!resource) {
throw new KubernetesError(`Unrecognized resource type ${manifest.apiVersion}/${manifest.kind}`, {
manifest,
})
}

return { group, resource }
}

async request(log: LogEntry, path: string, opts: Omit<request.OptionsWithUrl, "url"> = {}): Promise<any> {
const baseUrl = this.config.getCurrentCluster()!.server
const url = resolve(baseUrl, path)

// set some default values
const requestOpts = {
url,
method: "get",
json: true,
resolveWithFullResponse: true,
...opts,
}

// apply auth
this.config.applyToRequest(requestOpts)

try {
log.silly(`GET ${url}`)
return await request(requestOpts)
} catch (err) {
throw handleRequestPromiseError(err)
}
}

async readBySpec(namespace: string, manifest: KubernetesResource, log: LogEntry) {
const name = manifest.metadata.name
log.silly(`Fetching Kubernetes resource ${manifest.apiVersion}/${manifest.kind}/${name}`)

const { group, resource } = await this.getApiResourceInfo(log, manifest)
const groupId = group.preferredVersion.groupVersion
const basePath = getGroupBasePath(groupId)

const apiPath = resource.namespaced
? `${basePath}/namespaces/${namespace}/${resource.name}/${name}`
: `${basePath}/${resource.name}/${name}`

return this.request(log, apiPath)
}

async upsert<K extends keyof CrudMapType>(
Expand Down Expand Up @@ -274,7 +324,9 @@ export class KubeApi {

if (typeof output.then === "function") {
// the API errors are not properly formed Error objects
return output.catch(wrapError)
return output.catch((err: Error) => {
throw wrapError(err)
})
} else {
return output
}
Expand All @@ -284,6 +336,11 @@ export class KubeApi {
}
}

function getGroupBasePath(groupId: string) {
// Of course, Kubernetes helpfully uses a singular for the core API and not everything else. So there you go.
return groupId.includes("/") ? `/apis/${groupId}` : `/api/${groupId}`
}

export async function getKubeConfig(log: LogEntry) {
let kubeConfigStr: string

Expand Down Expand Up @@ -329,21 +386,21 @@ function wrapError(err) {
request: omitBy(err.response.request, (v, k) => isObject(v) || k[0] === "_"),
})
wrapped.code = err.response.statusCode
throw wrapped
return wrapped
} else {
throw err
return err
}
}

function handleRequestPromiseError(err) {
function handleRequestPromiseError(err: Error) {
if (err instanceof requestErrors.StatusCodeError) {
const wrapped = new KubernetesError(`StatusCodeError from Kubernetes API - ${err.message}`, {
body: err.error,
})
wrapped.code = err.statusCode

throw wrapped
return wrapped
} else {
return wrapError(err)
}

return wrapError(err)
}
Loading

0 comments on commit c7839e9

Please sign in to comment.