Skip to content

Commit

Permalink
Merge pull request #165 from internxt/feat/pb-3405-add-multipart-upload
Browse files Browse the repository at this point in the history
[PB-3405]: feat/add-multipart-upload
  • Loading branch information
larryrider authored Jan 16, 2025
2 parents 726b377 + 8d8575a commit 1937592
Show file tree
Hide file tree
Showing 12 changed files with 456 additions and 31 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@internxt/sdk": "1.7.0",
"@oclif/core": "4.2.3",
"@types/validator": "13.12.2",
"async": "3.2.6",
"axios": "1.7.9",
"bip39": "3.1.0",
"body-parser": "1.20.3",
Expand Down Expand Up @@ -70,6 +71,7 @@
"@internxt/prettier-config": "internxt/prettier-config#v1.0.2",
"@oclif/test": "4.1.7",
"@openpgp/web-stream-tools": "0.0.11-patch-1",
"@types/async": "3.2.24",
"@types/cli-progress": "3.11.6",
"@types/express": "5.0.0",
"@types/mime-types": "2.1.4",
Expand Down
37 changes: 29 additions & 8 deletions src/commands/upload-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,38 @@ export default class UploadFile extends Command {
linewrap: true,
});
progressBar.start(100, 0);
const [uploadPromise, abortable] = await networkFacade.uploadFromStream(
user.bucket,
user.mnemonic,
stats.size,
fileStream,
{

const minimumMultipartThreshold = 100 * 1024 * 1024;
const useMultipart = stats.size > minimumMultipartThreshold;
const partSize = 30 * 1024 * 1024;
const parts = Math.ceil(stats.size / partSize);

let uploadOperation: Promise<
[
Promise<{
fileId: string;
hash: Buffer;
}>,
AbortController,
]
>;

if (useMultipart) {
uploadOperation = networkFacade.uploadMultipartFromStream(user.bucket, user.mnemonic, stats.size, fileStream, {
parts,
progressCallback: (progress) => {
progressBar.update(progress * 0.99);
},
},
);
});
} else {
uploadOperation = networkFacade.uploadFromStream(user.bucket, user.mnemonic, stats.size, fileStream, {
progressCallback: (progress) => {
progressBar.update(progress * 0.99);
},
});
}

const [uploadPromise, abortable] = await uploadOperation;

process.on('SIGINT', () => {
abortable.abort('SIGINT received');
Expand Down
22 changes: 18 additions & 4 deletions src/services/crypto.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { CryptoProvider } from '@internxt/sdk';
import { Keys, Password } from '@internxt/sdk/dist/auth';
import { createCipheriv, createDecipheriv, createHash, Decipher, pbkdf2Sync, randomBytes } from 'node:crypto';
import { Transform } from 'node:stream';
import { Readable, Transform } from 'node:stream';
import { KeysService } from './keys.service';
import { ConfigService } from '../services/config.service';
import { StreamUtils } from '../utils/stream.utils';
Expand Down Expand Up @@ -116,12 +116,26 @@ export class CryptoService {
return Buffer.concat([decipher.update(contentsToDecrypt), decipher.final()]).toString('utf8');
};

public async decryptStream(
public encryptStreamInParts = (
readable: Readable,
cipher: Transform,
size: number,
parts: number,
): Transform => {
// We include a marginChunkSize because if we split the chunk directly, there will always be one more chunk left, this will cause a mismatch with the urls provided
const marginChunkSize = 1024;
const chunkSize = size / parts + marginChunkSize;
const readableChunks = StreamUtils.streamReadableIntoChunks(readable, chunkSize);

return readableChunks.pipe(cipher);
};

public decryptStream = (
inputSlices: ReadableStream<Uint8Array>[],
key: Buffer,
iv: Buffer,
startOffsetByte?: number,
) {
) => {
let decipher: Decipher;
if (startOffsetByte) {
const aesBlockSize = 16;
Expand Down Expand Up @@ -164,7 +178,7 @@ export class CryptoService {
});

return decryptedStream;
}
};

public getEncryptionTransform = (key: Buffer, iv: Buffer): Transform => {
const cipher = createCipheriv('aes-256-ctr', key, iv);
Expand Down
153 changes: 151 additions & 2 deletions src/services/network/network-facade.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,26 @@ import {
DownloadFileFunction,
EncryptFileFunction,
UploadFileFunction,
UploadFileMultipartFunction,
} from '@internxt/sdk/dist/network';
import { Environment } from '@internxt/inxt-js';
import { randomBytes } from 'node:crypto';
import { Readable, Transform } from 'node:stream';
import { DownloadOptions, UploadOptions, UploadProgressCallback, DownloadProgressCallback } from '../../types/network.types';
import {
DownloadOptions,
UploadOptions,
UploadProgressCallback,
DownloadProgressCallback,
UploadMultipartOptions,
UploadTask,
} from '../../types/network.types';
import { CryptoService } from '../crypto.service';
import { UploadService } from './upload.service';
import { DownloadService } from './download.service';
import { ValidationService } from '../validation.service';
import { HashStream } from '../../utils/hash.utils';
import { RangeOptions } from '../../utils/network.utils';
import { queue, QueueObject } from 'async';

export class NetworkFacade {
private readonly cryptoLib: Network.Crypto;
Expand Down Expand Up @@ -73,7 +82,7 @@ export class NetworkFacade {
if (rangeOptions) {
startOffsetByte = rangeOptions.parsed.start;
}
fileStream = await this.cryptoService.decryptStream(
fileStream = this.cryptoService.decryptStream(
encryptedContentStreams,
Buffer.from(key as ArrayBuffer),
Buffer.from(iv as ArrayBuffer),
Expand Down Expand Up @@ -183,4 +192,144 @@ export class NetworkFacade {

return [uploadOperation(), abortable];
}

/**
* Performs a multi-part upload encrypting the stream content
*
* @param bucketId The bucket where the file will be uploaded
* @param mnemonic The plain mnemonic of the user
* @param size The total size of the stream content
* @param from The source ReadStream to upload from
* @param options The upload options
* @returns A promise to execute the upload and an abort controller to cancel the upload
*/
async uploadMultipartFromStream(
bucketId: string,
mnemonic: string,
size: number,
from: Readable,
options: UploadMultipartOptions,
): Promise<[Promise<{ fileId: string; hash: Buffer }>, AbortController]> {
const hashStream = new HashStream();
const abortable = options?.abortController ?? new AbortController();
let encryptionTransform: Transform;
let hash: Buffer;

const partsUploadedBytes: Record<number, number> = {};
type Part = {
PartNumber: number;
ETag: string;
};
const fileParts: Part[] = [];

const onProgress = (partId: number, loadedBytes: number) => {
if (!options?.progressCallback) return;
partsUploadedBytes[partId] = loadedBytes;
const currentTotalLoadedBytes = Object.values(partsUploadedBytes).reduce((a, p) => a + p, 0);
const reportedProgress = Math.round((currentTotalLoadedBytes / size) * 100);
options.progressCallback(reportedProgress);
};

const encryptFile: EncryptFileFunction = async (_, key, iv) => {
const encryptionCipher = this.cryptoService.getEncryptionTransform(
Buffer.from(key as ArrayBuffer),
Buffer.from(iv as ArrayBuffer),
);
const streamInParts = this.cryptoService.encryptStreamInParts(from, encryptionCipher, size, options.parts);
encryptionTransform = streamInParts.pipe(hashStream);
};

const uploadFileMultipart: UploadFileMultipartFunction = async (urls: string[]) => {
let partIndex = 0;
const limitConcurrency = 6;

const uploadPart = async (upload: UploadTask) => {
const { etag } = await this.uploadService.uploadFile(upload.urlToUpload, upload.contentToUpload, {
abortController: abortable,
progressCallback: (loadedBytes: number) => {
onProgress(upload.index, loadedBytes);
},
});

fileParts.push({
ETag: etag,
PartNumber: upload.index + 1,
});
};

const uploadQueue: QueueObject<UploadTask> = queue<UploadTask>(function (task, callback) {
uploadPart(task)
.then(() => {
callback();
})
.catch((e) => {
callback(e);
});
}, limitConcurrency);

for await (const chunk of encryptionTransform) {
const part: Buffer = chunk;

if (uploadQueue.running() === limitConcurrency) {
await uploadQueue.unsaturated();
}

if (abortable.signal.aborted) {
throw new Error('Upload cancelled by user');
}

let errorAlreadyThrown = false;

uploadQueue
.pushAsync({
contentToUpload: part,
urlToUpload: urls[partIndex],
index: partIndex++,
})
.catch((err: Error) => {
if (errorAlreadyThrown) return;

errorAlreadyThrown = true;
if (err) {
uploadQueue.kill();
if (!abortable?.signal.aborted) {
abortable.abort();
}
}
});
}

while (uploadQueue.running() > 0 || uploadQueue.length() > 0) {
await uploadQueue.drain();
}

hash = hashStream.getHash();
const compareParts = (pA: Part, pB: Part) => pA.PartNumber - pB.PartNumber;
const sortedParts = fileParts.sort(compareParts);
return {
hash: hash.toString('hex'),
parts: sortedParts,
};
};

const uploadOperation = async () => {
const uploadResult = await NetworkUpload.uploadMultipartFile(
this.network,
this.cryptoLib,
bucketId,
mnemonic,
size,
encryptFile,
uploadFileMultipart,
options.parts,
);

return {
fileId: uploadResult,
hash: hash,
};
};

return [uploadOperation(), abortable];
}
}
2 changes: 1 addition & 1 deletion src/services/network/upload.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { UploadOptions } from '../../types/network.types';
export class UploadService {
public static readonly instance: UploadService = new UploadService();

async uploadFile(url: string, from: Readable, options: UploadOptions): Promise<{ etag: string }> {
async uploadFile(url: string, from: Readable | Buffer, options: UploadOptions): Promise<{ etag: string }> {
const response = await axios.put(url, from, {
signal: options.abortController?.signal,
onUploadProgress: (progressEvent) => {
Expand Down
10 changes: 10 additions & 0 deletions src/types/network.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ export interface SelfsignedCert {
cert: string | Buffer;
key: string | Buffer;
}

export interface UploadTask {
contentToUpload: Buffer;
urlToUpload: string;
index: number;
}

export interface UploadMultipartOptions extends UploadOptions {
parts: number;
}
44 changes: 43 additions & 1 deletion src/utils/stream.utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ReadStream, WriteStream } from 'node:fs';
import { Transform, TransformCallback } from 'node:stream';
import { Readable, Transform, TransformCallback } from 'node:stream';

export class StreamUtils {
static readStreamToReadableStream(readStream: ReadStream): ReadableStream<Uint8Array> {
Expand Down Expand Up @@ -64,6 +64,48 @@ export class StreamUtils {

return stream;
}

/**
* Given a readable stream, it enqueues its parts into chunks as it is being read
* @param readable Readable stream
* @param chunkSize The chunkSize in bytes that we want each chunk to be
* @returns A readable stream whose output is chunks of size chunkSize
*/
static streamReadableIntoChunks(readable: Readable, chunkSize: number): Readable {
let buffer = Buffer.alloc(0);

const mergeBuffers = (buffer1: Buffer, buffer2: Buffer): Buffer => {
return Buffer.concat([buffer1, buffer2]);
};

const outputStream = new Readable({
read() {
// noop
},
});

readable.on('data', (chunk: Buffer) => {
buffer = mergeBuffers(buffer, chunk);

while (buffer.length >= chunkSize) {
outputStream.push(buffer.subarray(0, chunkSize));
buffer = buffer.subarray(chunkSize);
}
});

readable.on('end', () => {
if (buffer.length > 0) {
outputStream.push(buffer);
}
outputStream.push(null); // Signal the end of the stream
});

readable.on('error', (err) => {
outputStream.destroy(err);
});

return outputStream;
}
}

export class ProgressTransform extends Transform {
Expand Down
Loading

0 comments on commit 1937592

Please sign in to comment.