Skip to content

Commit

Permalink
Merge pull request #42 from balena-io-modules/pipeline
Browse files Browse the repository at this point in the history
Make use of `pipeline` for piping streams together
  • Loading branch information
flowzone-app[bot] authored Nov 27, 2023
2 parents ef1d958 + 1d98cd5 commit ebd6681
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 62 deletions.
84 changes: 42 additions & 42 deletions lib/build/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import * as JSONStream from 'JSONStream';
import * as _ from 'lodash';
import * as fs from 'mz/fs';
import * as path from 'path';
import { Duplex } from 'stream';
import * as stream from 'node:stream';
import * as tar from 'tar-stream';

// Import hook definitions
Expand Down Expand Up @@ -71,7 +71,7 @@ export default class Builder {
const fromTags: Utils.FromTagInfo[] = [];

// Create a stream to be passed into the docker daemon
const inputStream = es.through<Duplex>();
const inputStream = es.through<stream.Duplex>();

// Create a bi-directional stream
const dup = duplexify();
Expand Down Expand Up @@ -194,11 +194,9 @@ export default class Builder {
// Tell the tar stream we're done
pack.finalize();
// Create a build stream to send the data to
const stream = this.createBuildStream(buildOpts, hooks, handler);
// Write the tar archive to the stream
pack.pipe(stream);
// ...and return it for reading
return stream;
const buildStream = this.createBuildStream(buildOpts, hooks, handler);
// Write the tar archive to the stream and return it for reading
return stream.pipeline(pack, buildStream, _.noop);
}

/**
Expand Down Expand Up @@ -244,44 +242,46 @@ function getDockerDaemonBuildOutputParserStream(
layers: string[],
fromImageTags: Utils.FromTagInfo[],
onError: (error: Error) => void,
): Duplex {
): stream.Duplex {
const fromAliases = new Set();
return (
daemonStream
// parse the docker daemon's output json objects
.pipe(JSONStream.parse())
// Don't use fat-arrow syntax here, to capture 'this' from es
.pipe(
es.through<Duplex>(function (data: { stream: string; error: string }) {
if (data == null) {
return;
return stream.pipeline(
daemonStream,
// parse the docker daemon's output json objects
JSONStream.parse(),
// Don't use fat-arrow syntax here, to capture 'this' from es
es.through<stream.Duplex>(function (data: {
stream: string;
error: string;
}) {
if (data == null) {
return;
}
try {
if (data.error) {
throw new Error(data.error);
} else {
// Store image layers, so that they can be
// deleted by the caller if necessary
const sha = Utils.extractLayer(data.stream);
if (sha !== undefined) {
layers.push(sha);
}
try {
if (data.error) {
throw new Error(data.error);
} else {
// Store image layers, so that they can be
// deleted by the caller if necessary
const sha = Utils.extractLayer(data.stream);
if (sha !== undefined) {
layers.push(sha);
}
const fromTag = Utils.extractFromTag(data.stream);
if (fromTag !== undefined) {
if (!fromAliases.has(fromTag.repo)) {
fromImageTags.push(fromTag);
}
if (fromTag.alias) {
fromAliases.add(fromTag.alias);
}
}
this.emit('data', data.stream);
const fromTag = Utils.extractFromTag(data.stream);
if (fromTag !== undefined) {
if (!fromAliases.has(fromTag.repo)) {
fromImageTags.push(fromTag);
}
if (fromTag.alias) {
fromAliases.add(fromTag.alias);
}
} catch (error) {
daemonStream.unpipe();
onError(error);
}
}),
)
this.emit('data', data.stream);
}
} catch (error) {
daemonStream.unpipe();
onError(error);
}
}),
_.noop,
);
}
6 changes: 4 additions & 2 deletions lib/emulate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import * as parser from 'docker-file-parser';
import * as jsesc from 'jsesc';
import * as _ from 'lodash';
import { pipeline } from 'node:stream';
import * as tar from 'tar-stream';
import { normalizeTarEntry } from 'tar-utils';

Expand Down Expand Up @@ -225,7 +226,7 @@ export function transposeTarStream(
resolve(pack);
});

tarStream.pipe(extract);
pipeline(tarStream, extract, _.noop);
});
}

Expand Down Expand Up @@ -272,7 +273,7 @@ export function getBuildThroughStream(
return data.replace(replaceRegex, '');
};

return es.pipe(
return pipeline(
es.mapSync(function (data: string | Buffer) {
data = data.toString();

Expand All @@ -282,5 +283,6 @@ export function getBuildThroughStream(
return data;
}),
es.join('\n'),
_.noop,
);
}
5 changes: 3 additions & 2 deletions lib/multibuild/build-secrets/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import * as dockerfileTemplate from '../../dockerfile';
import type BuildMetadata from '../build-metadata';
import { BuildSecretMissingError, SecretPopulationError } from '../errors';
import { PermissiveVarList, VarList } from '../validation-types/varlist';
import { pipeline } from 'stream';

export const secretType = t.interface({
source: t.string,
Expand Down Expand Up @@ -155,7 +156,7 @@ export async function populateSecrets(
await new Promise((resolve, reject) => {
builder.createBuildStream(dockerOpts, {
buildStream: (stream) => {
pack.pipe(stream);
pipeline(pack, stream, _.noop);
},
buildSuccess: resolve,
buildFailure: reject,
Expand Down Expand Up @@ -215,7 +216,7 @@ export async function removeSecrets(
await new Promise((resolve, reject) => {
builder.createBuildStream(dockerOpts, {
buildStream: (stream) => {
pack.pipe(stream);
pipeline(pack, stream, _.noop);
},
buildSuccess: resolve,
buildFailure: reject,
Expand Down
8 changes: 4 additions & 4 deletions lib/multibuild/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import type * as Dockerode from 'dockerode';
import * as _ from 'lodash';
import * as semver from 'semver';
import type * as Stream from 'stream';
import * as stream from 'stream';

import { Builder, BuildHooks, FromTagInfo } from '../build';

Expand Down Expand Up @@ -79,13 +79,13 @@ function taskHooks(
image.error = error;
resolve(image);
},
buildStream: (stream: Stream.Duplex) => {
buildStream: (buildStream: stream.Duplex) => {
startTime = Date.now();
if (typeof task.streamHook === 'function') {
task.streamHook(stream);
task.streamHook(buildStream);
}

task.buildStream!.pipe(stream);
stream.pipeline(task.buildStream!, buildStream, _.noop);
},
};
}
Expand Down
14 changes: 7 additions & 7 deletions lib/multibuild/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import type * as Dockerode from 'dockerode';
import * as _ from 'lodash';
import * as path from 'path';
import type * as Stream from 'stream';
import * as stream from 'node:stream';
import * as tar from 'tar-stream';
import * as TarUtils from 'tar-utils';

Expand Down Expand Up @@ -73,15 +73,15 @@ export { CANONICAL_HUB_URL } from './constants';
*/
export function splitBuildStream(
composition: Compose.Composition,
buildStream: Stream.Readable,
buildStream: stream.Readable,
): Promise<BuildTask[]> {
const images = Compose.parse(composition);
return fromImageDescriptors(images, buildStream);
}

export async function fromImageDescriptors(
images: Compose.ImageDescriptor[],
buildStream: Stream.Readable,
buildStream: stream.Readable,
metadataDirectories = ['.balena/', '.resin/'],
): Promise<BuildTask[]> {
const buildMetadata = new BuildMetadata(metadataDirectories);
Expand All @@ -96,7 +96,7 @@ export async function fromImageDescriptors(

const entryFn = (
header: tar.Headers,
stream: Stream.Readable,
entryStream: stream.Readable,
next: () => void,
): void => {
// Find the build context that this file should belong to
Expand All @@ -109,7 +109,7 @@ export async function fromImageDescriptors(

if (matchingTasks.length > 0) {
// Add the file to every matching context
TarUtils.streamToBuffer(stream)
TarUtils.streamToBuffer(entryStream)
.then((buf) => {
matchingTasks.forEach((task) => {
const relative = path.posix.relative(task.context!, header.name);
Expand Down Expand Up @@ -140,7 +140,7 @@ export async function fromImageDescriptors(
reject(new TarError(e));
});
} else {
TarUtils.drainStream(stream)
TarUtils.drainStream(entryStream)
.then(() => {
next();
})
Expand All @@ -161,7 +161,7 @@ export async function fromImageDescriptors(
reject(new TarError(e));
});

newStream.pipe(extract);
stream.pipeline(newStream, extract, _.noop);
}).then((tasks) => {
contracts.checkContractNamesUnique(tasks);
return tasks;
Expand Down
4 changes: 2 additions & 2 deletions lib/resolve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
import * as _ from 'lodash';
import { Readable } from 'stream';
import { Readable, pipeline } from 'stream';
import * as tar from 'tar-stream';
import * as TarUtils from 'tar-utils';

Expand Down Expand Up @@ -124,7 +124,7 @@ export function resolveInput(
}
});

bundle.tarStream.pipe(extract);
pipeline(bundle.tarStream, extract, _.noop);
return pack;
}

Expand Down
5 changes: 2 additions & 3 deletions lib/resolve/resolvers/nodeResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ const getDeviceTypeVersions = memoize(
async (deviceType: string): Promise<string[]> => {
const tags: string[] = [];
// 100 is the max page size
let nextUrl:
| string
| undefined = `https://hub.docker.com/v2/repositories/resin/${deviceType}-node/tags/?page_size=100`;
let nextUrl: string | undefined =
`https://hub.docker.com/v2/repositories/resin/${deviceType}-node/tags/?page_size=100`;
while (nextUrl != null) {
const res = (
await getAsync({
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"@types/memoizee": "^0.4.11",
"@types/mocha": "^10.0.1",
"@types/mz": "^2.7.4",
"@types/node": "^16.18.65",
"@types/proxyquire": "^1.3.28",
"@types/request": "^2.48.4",
"@types/semver": "^7.3.5",
Expand Down

0 comments on commit ebd6681

Please sign in to comment.