Skip to content

Commit

Permalink
fix(k8s): potential memory issue when fetching artifacts
Browse files Browse the repository at this point in the history
Basically we were buffering data that was also being directly streamed,
which might have caused issues and was definitely unnecessary + slow.
  • Loading branch information
edvald authored and thsig committed Mar 10, 2021
1 parent 3940106 commit 5ac7822
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 13 deletions.
20 changes: 11 additions & 9 deletions core/src/plugins/kubernetes/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ export class KubeApi {
* Warning: Do not use tty=true unless you're actually attaching to a terminal, since collecting output will not work.
*/
async execInPod({
buffer,
namespace,
podName,
containerName,
Expand All @@ -598,6 +599,7 @@ export class KubeApi {
tty,
timeoutSec,
}: {
buffer: boolean
namespace: string
podName: string
containerName: string
Expand All @@ -612,8 +614,8 @@ export class KubeApi {
const stderrCollector = new StringCollector()
const combinedCollector = new StringCollector()

let _stdout: Writable = stdoutCollector
let _stderr: Writable = stderrCollector
let _stdout = stdout || null
let _stderr = stderr || null

if (tty) {
// We connect stdout and stderr directly.
Expand All @@ -633,23 +635,23 @@ export class KubeApi {
const ttyStdin = stdin as ReadStream
ttyStdin.setRawMode && ttyStdin.setRawMode(true)
}
} else {
} else if (buffer) {
/**
* Unless we're attaching a TTY to the output streams, we multiplex the outputs to both a StringCollector,
* and whatever stream the caller provided.
* Unless we're attaching a TTY to the output streams or buffer=false, we multiplex the outputs to both a
* StringCollector, and whatever stream the caller provided.
*/
_stdout = new PassThrough()
_stdout.pipe(stdoutCollector)
_stdout.pipe(combinedCollector)

if (stdout) {
_stdout.pipe(stdout)
}

_stderr = new PassThrough()
_stderr.pipe(stderrCollector)
_stderr.pipe(combinedCollector)

if (stdout) {
_stdout.pipe(stdout)
}

if (stderr) {
_stderr.pipe(stderr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ async function deleteImagesFromDaemon({
command: ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"],
containerName: dockerDaemonContainerName,
timeoutSec: 300,
buffer: true,
})

const imagesInDaemon = res.log
Expand Down Expand Up @@ -387,6 +388,7 @@ async function deleteImagesFromDaemon({
command: ["docker", "rmi", ...images],
containerName: dockerDaemonContainerName,
timeoutSec: 600,
buffer: true,
})
log.setState(deline`
Deleting images:
Expand All @@ -404,6 +406,7 @@ async function deleteImagesFromDaemon({
command: ["docker", "image", "prune", "-f"],
containerName: dockerDaemonContainerName,
timeoutSec: 1000,
buffer: true,
})

log.setSuccess()
Expand Down Expand Up @@ -441,6 +444,7 @@ async function cleanupBuildSyncVolume({
log,
command: ["sh", "-c", 'stat /data/* -c "%n %X"'],
timeoutSec: 300,
buffer: true,
})

// Remove directories last accessed more than workspaceSyncDirTtl ago
Expand Down Expand Up @@ -468,6 +472,7 @@ async function cleanupBuildSyncVolume({
log,
command: ["rm", "-rf", ...dirsToDelete],
timeoutSec: 300,
buffer: true,
}),
{ concurrency: 20 }
)
Expand Down
2 changes: 2 additions & 0 deletions core/src/plugins/kubernetes/container/build/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ const remoteBuild: BuildHandler = async (params) => {
timeoutSec: buildTimeout,
containerName,
stdout,
buffer: true,
})

buildLog = buildRes.log
Expand All @@ -144,6 +145,7 @@ const remoteBuild: BuildHandler = async (params) => {
timeoutSec: 300,
containerName,
stdout,
buffer: true,
})

buildLog += pushRes.log
Expand Down
1 change: 1 addition & 0 deletions core/src/plugins/kubernetes/container/build/buildkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ export const buildkitBuildHandler: BuildHandler = async (params) => {
timeoutSec: buildTimeout,
containerName: buildkitContainerName,
stdout,
buffer: true,
})

buildLog = buildRes.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const getClusterDockerBuildStatus: BuildStatusHandler = async (params) =>
command: pushArgs,
timeoutSec: 300,
containerName: dockerDaemonContainerName,
buffer: true,
})
return { ready: true }
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions core/src/plugins/kubernetes/container/build/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ export async function skopeoBuildStatus({
command: podCommand,
timeoutSec: 300,
containerName,
buffer: true,
})
return { ready: true }
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions core/src/plugins/kubernetes/container/exec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export async function execInWorkload({
command,
timeoutSec: 999999,
tty: interactive,
buffer: true,
})

return { code: res.exitCode, output: res.log }
Expand Down
9 changes: 7 additions & 2 deletions core/src/plugins/kubernetes/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ async function runWithArtifacts({
stderr,
// Anything above two minutes for this would be unusual
timeoutSec: 120,
buffer: false,
})
} catch (err) {
// TODO: fall back to copying `arc` (https://github.com/mholt/archiver) or similar into the container and
Expand Down Expand Up @@ -524,6 +525,7 @@ async function runWithArtifacts({
stdout,
stderr,
timeoutSec,
buffer: true,
})
result = {
...res,
Expand Down Expand Up @@ -572,7 +574,7 @@ async function runWithArtifacts({
]

try {
await new Promise((_resolve, reject) => {
await new Promise<void>((_resolve, reject) => {
// Create an extractor to receive the tarball we will stream from the container
// and extract to the artifacts directory.
let done = 0
Expand Down Expand Up @@ -600,6 +602,7 @@ async function runWithArtifacts({
log,
stdout: extractor,
timeoutSec,
buffer: false,
})
.then(() => {
// Need to make sure both processes are complete before resolving (may happen in either order)
Expand Down Expand Up @@ -651,6 +654,7 @@ type ExecParams = StartParams & {
stderr?: Writable
stdin?: Readable
tty?: boolean
buffer: boolean
}

type RunParams = StartParams & {
Expand Down Expand Up @@ -846,7 +850,7 @@ export class PodRunner extends PodRunnerParams {
* Executes a command in the running Pod. Must be called after `start()`.
*/
async exec(params: ExecParams) {
const { command, containerName: container, timeoutSec, tty = false, log } = params
const { command, containerName: container, timeoutSec, tty = false, log, buffer = true } = params
let { stdout, stderr, stdin } = params

if (tty) {
Expand All @@ -871,6 +875,7 @@ export class PodRunner extends PodRunnerParams {
command,
stdout,
stderr,
buffer,
stdin,
tty,
timeoutSec,
Expand Down
3 changes: 3 additions & 0 deletions core/test/integ/src/plugins/kubernetes/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ describe("KubeApi", () => {
containerName,
command: ["/bin/sh", "-c", "echo some output"],
tty: false,
buffer: true,
})
expect(res.stdout).to.equal("some output\n")
expect(res.stderr).to.equal("")
Expand All @@ -131,6 +132,7 @@ describe("KubeApi", () => {
containerName,
command: ["/bin/sh", "-c", "exit 2"],
tty: false,
buffer: true,
})
expect(res.stdout).to.equal("")
expect(res.stderr).to.equal("")
Expand All @@ -156,6 +158,7 @@ describe("KubeApi", () => {
command: ["/bin/sh", "-c", "echo foo && sleep 100"],
tty: false,
timeoutSec: 2,
buffer: true,
})
expect(res.stdout).to.equal("foo\n")
expect(res.stderr).to.equal("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ describe("kubernetes build flow", () => {
command: ["docker", "rmi", remoteId],
timeoutSec: 300,
containerName: dockerDaemonContainerName,
buffer: true,
})

// This should still report the build as ready, because it's in the registry
Expand Down Expand Up @@ -246,6 +247,7 @@ describe("kubernetes build flow", () => {
command: ["docker", "rmi", remoteId],
timeoutSec: 300,
containerName: dockerDaemonContainerName,
buffer: true,
})

// This should still report the build as ready, because it's in the registry
Expand Down
5 changes: 3 additions & 2 deletions core/test/integ/src/plugins/kubernetes/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ describe("kubernetes Pod runner functions", () => {
const res = await runner.exec({
log,
command: ["echo", "foo"],
buffer: true,
})

expect(res.log.trim()).to.equal("foo")
Expand All @@ -162,7 +163,7 @@ describe("kubernetes Pod runner functions", () => {

await runner.start({ log })
await expectError(
() => runner.exec({ log, command: ["sh", "-c", "sleep 100"], timeoutSec: 1 }),
() => runner.exec({ log, command: ["sh", "-c", "sleep 100"], timeoutSec: 1, buffer: true }),
(err) => expect(err.message).to.equal("Command timed out after 1 seconds.")
)
})
Expand All @@ -180,7 +181,7 @@ describe("kubernetes Pod runner functions", () => {

await runner.start({ log })
await expectError(
() => runner.exec({ log, command: ["sh", "-c", "echo foo && exit 2"] }),
() => runner.exec({ log, command: ["sh", "-c", "echo foo && exit 2"], buffer: true }),
(err) => expect(err.message.trim()).to.equal("Command exited with code 2:\nfoo")
)
})
Expand Down

0 comments on commit 5ac7822

Please sign in to comment.