From 1d98cd535a20fa67869da242b0ec7ddd713a4c7b Mon Sep 17 00:00:00 2001 From: Pagan Gazzard Date: Fri, 24 Nov 2023 17:10:53 +0000 Subject: [PATCH] Make use of `pipeline` for piping streams together Change-type: patch --- lib/build/builder.ts | 84 +++++++++++++-------------- lib/emulate/index.ts | 6 +- lib/multibuild/build-secrets/index.ts | 5 +- lib/multibuild/build.ts | 8 +-- lib/multibuild/index.ts | 14 ++--- lib/resolve/index.ts | 4 +- lib/resolve/resolvers/nodeResolver.ts | 5 +- package.json | 1 + 8 files changed, 65 insertions(+), 62 deletions(-) diff --git a/lib/build/builder.ts b/lib/build/builder.ts index d88a389..6ae638e 100644 --- a/lib/build/builder.ts +++ b/lib/build/builder.ts @@ -22,7 +22,7 @@ import * as JSONStream from 'JSONStream'; import * as _ from 'lodash'; import * as fs from 'mz/fs'; import * as path from 'path'; -import { Duplex } from 'stream'; +import * as stream from 'node:stream'; import * as tar from 'tar-stream'; // Import hook definitions @@ -71,7 +71,7 @@ export default class Builder { const fromTags: Utils.FromTagInfo[] = []; // Create a stream to be passed into the docker daemon - const inputStream = es.through(); + const inputStream = es.through(); // Create a bi-directional stream const dup = duplexify(); @@ -194,11 +194,9 @@ export default class Builder { // Tell the tar stream we're done pack.finalize(); // Create a build stream to send the data to - const stream = this.createBuildStream(buildOpts, hooks, handler); - // Write the tar archive to the stream - pack.pipe(stream); - // ...and return it for reading - return stream; + const buildStream = this.createBuildStream(buildOpts, hooks, handler); + // Write the tar archive to the stream and return it for reading + return stream.pipeline(pack, buildStream, _.noop); } /** @@ -244,44 +242,46 @@ function getDockerDaemonBuildOutputParserStream( layers: string[], fromImageTags: Utils.FromTagInfo[], onError: (error: Error) => void, -): Duplex { +): stream.Duplex { const fromAliases = new Set(); - return ( - daemonStream - // parse the docker daemon's output json objects - .pipe(JSONStream.parse()) - // Don't use fat-arrow syntax here, to capture 'this' from es - .pipe( - es.through(function (data: { stream: string; error: string }) { - if (data == null) { - return; + return stream.pipeline( + daemonStream, + // parse the docker daemon's output json objects + JSONStream.parse(), + // Don't use fat-arrow syntax here, to capture 'this' from es + es.through(function (data: { + stream: string; + error: string; + }) { + if (data == null) { + return; + } + try { + if (data.error) { + throw new Error(data.error); + } else { + // Store image layers, so that they can be + // deleted by the caller if necessary + const sha = Utils.extractLayer(data.stream); + if (sha !== undefined) { + layers.push(sha); } - try { - if (data.error) { - throw new Error(data.error); - } else { - // Store image layers, so that they can be - // deleted by the caller if necessary - const sha = Utils.extractLayer(data.stream); - if (sha !== undefined) { - layers.push(sha); - } - const fromTag = Utils.extractFromTag(data.stream); - if (fromTag !== undefined) { - if (!fromAliases.has(fromTag.repo)) { - fromImageTags.push(fromTag); - } - if (fromTag.alias) { - fromAliases.add(fromTag.alias); - } - } - this.emit('data', data.stream); + const fromTag = Utils.extractFromTag(data.stream); + if (fromTag !== undefined) { + if (!fromAliases.has(fromTag.repo)) { + fromImageTags.push(fromTag); + } + if (fromTag.alias) { + fromAliases.add(fromTag.alias); } - } catch (error) { - daemonStream.unpipe(); - onError(error); } - }), - ) + this.emit('data', data.stream); + } + } catch (error) { + daemonStream.unpipe(); + onError(error); + } + }), + _.noop, ); } diff --git a/lib/emulate/index.ts b/lib/emulate/index.ts index 527e365..3567a96 100644 --- a/lib/emulate/index.ts +++ b/lib/emulate/index.ts @@ -18,6 +18,7 @@ import * as parser from 'docker-file-parser'; import * as jsesc from 'jsesc'; import * as _ from 'lodash'; +import { pipeline } from 'node:stream'; import * as tar from 'tar-stream'; import { normalizeTarEntry } from 'tar-utils'; @@ -225,7 +226,7 @@ export function transposeTarStream( resolve(pack); }); - tarStream.pipe(extract); + pipeline(tarStream, extract, _.noop); }); } @@ -272,7 +273,7 @@ export function getBuildThroughStream( return data.replace(replaceRegex, ''); }; - return es.pipe( + return pipeline( es.mapSync(function (data: string | Buffer) { data = data.toString(); @@ -282,5 +283,6 @@ export function getBuildThroughStream( return data; }), es.join('\n'), + _.noop, ); } diff --git a/lib/multibuild/build-secrets/index.ts b/lib/multibuild/build-secrets/index.ts index ca21261..123908e 100644 --- a/lib/multibuild/build-secrets/index.ts +++ b/lib/multibuild/build-secrets/index.ts @@ -28,6 +28,7 @@ import * as dockerfileTemplate from '../../dockerfile'; import type BuildMetadata from '../build-metadata'; import { BuildSecretMissingError, SecretPopulationError } from '../errors'; import { PermissiveVarList, VarList } from '../validation-types/varlist'; +import { pipeline } from 'stream'; export const secretType = t.interface({ source: t.string, @@ -155,7 +156,7 @@ export async function populateSecrets( await new Promise((resolve, reject) => { builder.createBuildStream(dockerOpts, { buildStream: (stream) => { - pack.pipe(stream); + pipeline(pack, stream, _.noop); }, buildSuccess: resolve, buildFailure: reject, @@ -215,7 +216,7 @@ export async function removeSecrets( await new Promise((resolve, reject) => { builder.createBuildStream(dockerOpts, { buildStream: (stream) => { - pack.pipe(stream); + pipeline(pack, stream, _.noop); }, buildSuccess: resolve, buildFailure: reject, diff --git a/lib/multibuild/build.ts b/lib/multibuild/build.ts index a8450a4..7ba1b0a 100644 --- a/lib/multibuild/build.ts +++ b/lib/multibuild/build.ts @@ -18,7 +18,7 @@ import type * as Dockerode from 'dockerode'; import * as _ from 'lodash'; import * as semver from 'semver'; -import type * as Stream from 'stream'; +import * as stream from 'stream'; import { Builder, BuildHooks, FromTagInfo } from '../build'; @@ -79,13 +79,13 @@ function taskHooks( image.error = error; resolve(image); }, - buildStream: (stream: Stream.Duplex) => { + buildStream: (buildStream: stream.Duplex) => { startTime = Date.now(); if (typeof task.streamHook === 'function') { - task.streamHook(stream); + task.streamHook(buildStream); } - task.buildStream!.pipe(stream); + stream.pipeline(task.buildStream!, buildStream, _.noop); }, }; } diff --git a/lib/multibuild/index.ts b/lib/multibuild/index.ts index 762d15c..df16270 100644 --- a/lib/multibuild/index.ts +++ b/lib/multibuild/index.ts @@ -18,7 +18,7 @@ import type * as Dockerode from 'dockerode'; import * as _ from 'lodash'; import * as path from 'path'; -import type * as Stream from 'stream'; +import * as stream from 'node:stream'; import * as tar from 'tar-stream'; import * as TarUtils from 'tar-utils'; @@ -73,7 +73,7 @@ export { CANONICAL_HUB_URL } from './constants'; */ export function splitBuildStream( composition: Compose.Composition, - buildStream: Stream.Readable, + buildStream: stream.Readable, ): Promise { const images = Compose.parse(composition); return fromImageDescriptors(images, buildStream); @@ -81,7 +81,7 @@ export function splitBuildStream( export async function fromImageDescriptors( images: Compose.ImageDescriptor[], - buildStream: Stream.Readable, + buildStream: stream.Readable, metadataDirectories = ['.balena/', '.resin/'], ): Promise { const buildMetadata = new BuildMetadata(metadataDirectories); @@ -96,7 +96,7 @@ export async function fromImageDescriptors( const entryFn = ( header: tar.Headers, - stream: Stream.Readable, + entryStream: stream.Readable, next: () => void, ): void => { // Find the build context that this file should belong to @@ -109,7 +109,7 @@ export async function fromImageDescriptors( if (matchingTasks.length > 0) { // Add the file to every matching context - TarUtils.streamToBuffer(stream) + TarUtils.streamToBuffer(entryStream) .then((buf) => { matchingTasks.forEach((task) => { const relative = path.posix.relative(task.context!, header.name); @@ -140,7 +140,7 @@ export async function fromImageDescriptors( reject(new TarError(e)); }); } else { - TarUtils.drainStream(stream) + TarUtils.drainStream(entryStream) .then(() => { next(); }) @@ -161,7 +161,7 @@ export async function fromImageDescriptors( reject(new TarError(e)); }); - newStream.pipe(extract); + stream.pipeline(newStream, extract, _.noop); }).then((tasks) => { contracts.checkContractNamesUnique(tasks); return tasks; diff --git a/lib/resolve/index.ts b/lib/resolve/index.ts index 8ff4f22..e004cb8 100644 --- a/lib/resolve/index.ts +++ b/lib/resolve/index.ts @@ -15,7 +15,7 @@ * limitations under the License. */ import * as _ from 'lodash'; -import { Readable } from 'stream'; +import { Readable, pipeline } from 'stream'; import * as tar from 'tar-stream'; import * as TarUtils from 'tar-utils'; @@ -124,7 +124,7 @@ export function resolveInput( } }); - bundle.tarStream.pipe(extract); + pipeline(bundle.tarStream, extract, _.noop); return pack; } diff --git a/lib/resolve/resolvers/nodeResolver.ts b/lib/resolve/resolvers/nodeResolver.ts index 58db06b..c2d76d6 100644 --- a/lib/resolve/resolvers/nodeResolver.ts +++ b/lib/resolve/resolvers/nodeResolver.ts @@ -31,9 +31,8 @@ const getDeviceTypeVersions = memoize( async (deviceType: string): Promise => { const tags: string[] = []; // 100 is the max page size - let nextUrl: - | string - | undefined = `https://hub.docker.com/v2/repositories/resin/${deviceType}-node/tags/?page_size=100`; + let nextUrl: string | undefined = + `https://hub.docker.com/v2/repositories/resin/${deviceType}-node/tags/?page_size=100`; while (nextUrl != null) { const res = ( await getAsync({ diff --git a/package.json b/package.json index b913a7a..a1646a3 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "@types/memoizee": "^0.4.11", "@types/mocha": "^10.0.1", "@types/mz": "^2.7.4", + "@types/node": "^16.18.65", "@types/proxyquire": "^1.3.28", "@types/request": "^2.48.4", "@types/semver": "^7.3.5",