From fc71332154733f1c5137fecbd8d7b6edaff3bc0d Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:17:16 +0800 Subject: [PATCH 01/12] wip --- src/internal/as-callback.ts | 32 +++++ src/internal/client.ts | 85 +++++++++++++- src/internal/type.ts | 24 ++++ src/internal/xml-parser.ts | 62 +++++++++- src/minio.d.ts | 31 +---- src/minio.js | 225 ++---------------------------------- src/type.ts | 0 7 files changed, 208 insertions(+), 251 deletions(-) create mode 100644 src/internal/as-callback.ts create mode 100644 src/type.ts diff --git a/src/internal/as-callback.ts b/src/internal/as-callback.ts new file mode 100644 index 00000000..e08337e1 --- /dev/null +++ b/src/internal/as-callback.ts @@ -0,0 +1,32 @@ +import { isFunction } from './helper.ts' + +export function asCallback( + cb: undefined | ((err: unknown | null, result: T) => void), + promise: Promise, +): Promise | void { + if (cb === undefined) { + return promise + } + + if (!isFunction(cb)) { + throw new TypeError(`callback should be of type "function", got ${cb}`) + } + + promise.then( + (result) => { + cb(null, result) + }, + (err) => { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + cb(err) + }, + ) +} + +export function asCallbackFn( + cb: undefined | ((err: unknown | null, result: T) => void), + asyncFn: () => Promise, +): Promise | void { + return asCallback(cb, asyncFn()) +} diff --git a/src/internal/client.ts b/src/internal/client.ts index 16bc5030..9ce410a8 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -1,7 +1,8 @@ import * as http from 'node:http' import * as https from 'node:https' -import type * as stream from 'node:stream' +import * as stream from 'node:stream' +import async from 'async' import { isBrowser } from 'browser-or-node' import _ from 'lodash' @@ -21,17 +22,28 @@ import { isValidBucketName, isValidEndpoint, isValidPort, + isValidPrefix, isVirtualHostStyle, makeDateLong, toSha256, - uriResourceEscape, + uriEscape, + uriResourceEscape, isValidObjectName, isFunction, pipesetup, } from './helper.ts' import { request } from './request.ts' import { drainResponse, readAsString } from './response.ts' import type { Region } from './s3-endpoints.ts' import { getS3Endpoint } from './s3-endpoints.ts' -import type { Binary, IRequest, RequestHeaders, Transport } from './type.ts' +import type { + Binary, + BucketStream, + IncompleteUploadedBucketItem, + IRequest, + RequestHeaders, + Transport, +} from './type.ts' +import type { Multipart } from './xml-parser.ts' import * as xmlParsers from './xml-parser.ts' +import { parseListParts } from './xml-parser.ts' // will be replaced by bundler. const Package = { version: process.env.MINIO_JS_PACKAGE_VERSION || 'development' } @@ -740,4 +752,71 @@ export class TypedClient { (err) => cb(err), ) } + + + // Get part-info of all parts of an incomplete upload specified by uploadId. + listParts(bucketName, objectName, uploadId, cb) { + if (!isValidBucketName(bucketName)) { + throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) + } + if (!isValidObjectName(objectName)) { + throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`) + } + if (!isString(uploadId)) { + throw new TypeError('uploadId should be of type "string"') + } + if (!uploadId) { + throw new errors.InvalidArgumentError('uploadId cannot be empty') + } + + const exec=async ()=>{}; + + exec().then() + + var parts = [] + var listNext = (marker) => { + this.listPartsQuery(bucketName, objectName, uploadId, marker, (e, result) => { + if (e) { + cb(e) + return + } + parts = parts.concat(result.parts) + if (result.isTruncated) { + listNext(result.marker) + return + } + cb(null, parts) + }) + } + listNext(0) + } + + // Called by listParts to fetch a batch of part-info + async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker: string) { + if (!isValidBucketName(bucketName)) { + throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) + } + if (!isValidObjectName(objectName)) { + throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`) + } + if (!isString(uploadId)) { + throw new TypeError('uploadId should be of type "string"') + } + if (!isNumber(marker)) { + throw new TypeError('marker should be of type "number"') + } + if (!uploadId) { + throw new errors.InvalidArgumentError('uploadId cannot be empty') + } + + let query = '' + if (marker && marker !== 0) { + query += `part-number-marker=${marker}&` + } + query += `uploadId=${uriEscape(uploadId)}` + + const method = 'GET' + const res = await this.makeRequestAsync({ method, bucketName, objectName, query }) + return xmlParsers.parseListParts(await readAsString(res)) + } } diff --git a/src/internal/type.ts b/src/internal/type.ts index 130df6dc..6d38ed8c 100644 --- a/src/internal/type.ts +++ b/src/internal/type.ts @@ -1,4 +1,5 @@ import type * as http from 'node:http' +import { Readable as ReadableStream } from 'stream' export type Binary = string | Buffer @@ -56,3 +57,26 @@ export interface IRequest { } export type ICanonicalRequest = string + +export interface UploadedObjectInfo { + etag: string + versionId: string | null +} + +export interface IncompleteUploadedBucketItem { + key: string + uploadId: string + size: number +} + +export interface BucketStream extends ReadableStream { + on(event: 'data', listener: (item: T) => void): this + + on(event: 'end' | 'pause' | 'readable' | 'resume' | 'close', listener: () => void): this + + on(event: 'error', listener: (err: Error) => void): this + + on(event: string | symbol, listener: (...args: any[]) => void): this +} + +export type ResultCallback = (error: Error | null, result: T) => void diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index ca77830a..3a64706e 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -3,7 +3,7 @@ import type * as http from 'node:http' import { XMLParser } from 'fast-xml-parser' import * as errors from '../errors.ts' -import { parseXml } from './helper.ts' +import { parseXml, sanitizeObjectKey, toArray } from './helper.ts' import { readAsString } from './response.ts' // parse XML response for bucket region @@ -85,3 +85,63 @@ export async function parseResponseError(response: http.IncomingMessage) { throw e } + +export type Multipart = { + uploads: Array<{ + key: string + uploadId: string + initiator: unknown + owner: unknown + storageClass: unknown + initiated: unknown + }> + prefixes: { prefix: string }[] + isTruncated: boolean + nextKeyMarker: undefined + nextUploadIdMarker: undefined +} + + +export type UploadedPart = { + part: number + lastModified?: Date + etag: string +} + +// parse XML response for list parts of an in progress multipart upload +export function parseListParts(xml: string): { + isTruncated: boolean; + marker: number | undefined; + parts: UploadedPart[] +} { + let xmlobj = parseXml(xml) + const result: { isTruncated: boolean; marker: number | undefined; parts: UploadedPart[] } = { + isTruncated: false, + parts: [], + marker: undefined as number | undefined, + } + if (!xmlobj.ListPartsResult) { + throw new errors.InvalidXMLError('Missing tag: "ListPartsResult"') + } + xmlobj = xmlobj.ListPartsResult + if (xmlobj.IsTruncated) { + result.isTruncated = xmlobj.IsTruncated + } + if (xmlobj.NextPartNumberMarker) { + result.marker = toArray(xmlobj.NextPartNumberMarker)[0] + } + if (xmlobj.Part) { + toArray(xmlobj.Part).forEach((p) => { + const part = +toArray(p.PartNumber)[0] + const lastModified = new Date(p.LastModified) + const etag = p.ETag.replace(/^"/g, '') + .replace(/"$/g, '') + .replace(/^"/g, '') + .replace(/"$/g, '') + .replace(/^"/g, '') + .replace(/"$/g, '') + result.parts.push({ part, lastModified, etag }) + }) + } + return result +} diff --git a/src/minio.d.ts b/src/minio.d.ts index 37bf6168..9a1f738f 100644 --- a/src/minio.d.ts +++ b/src/minio.d.ts @@ -16,11 +16,12 @@ import { TypedClient } from './internal/client.ts' import { CopyConditions } from './internal/copy-conditions.ts' import { PostPolicy } from './internal/post-policy.ts' import type { Region } from './internal/s3-endpoints.ts' +import type { BucketStream, IncompleteUploadedBucketItem, ResultCallback,UploadedObjectInfo } from './internal/type.ts' export * from './helpers.ts' export type { Region } from './internal/s3-endpoints.ts' export { CopyConditions, PostPolicy } -export type { ClientOptions } +export type { BucketStream,ClientOptions, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } // Exports only from typings export type NotificationEvent = @@ -58,7 +59,6 @@ export type LockUnit = RETENTION_VALIDITY_UNITS export type LegalHoldStatus = LEGAL_HOLD_STATUS export type NoResultCallback = (error: Error | null) => void -export type ResultCallback = (error: Error | null, result: T) => void export type VersioningConfig = Record export type TagList = Record export type EmptyObject = Record @@ -98,22 +98,6 @@ export interface BucketItemStat { metaData: ItemBucketMetadata } -export interface IncompleteUploadedBucketItem { - key: string - uploadId: string - size: number -} - -export interface BucketStream extends ReadableStream { - on(event: 'data', listener: (item: T) => void): this - - on(event: 'end' | 'pause' | 'readable' | 'resume' | 'close', listener: () => void): this - - on(event: 'error', listener: (err: Error) => void): this - - on(event: string | symbol, listener: (...args: any[]) => void): this -} - export interface PostPolicyResult { postURL: string formData: { @@ -134,11 +118,6 @@ export interface ItemBucketMetadata { [key: string]: any } -export interface UploadedObjectInfo { - etag: string - versionId: string | null -} - export interface Tag { Key: string Value: string @@ -275,12 +254,6 @@ export class Client extends TypedClient { listObjectsV2(bucketName: string, prefix?: string, recursive?: boolean, startAfter?: string): BucketStream - listIncompleteUploads( - bucketName: string, - prefix?: string, - recursive?: boolean, - ): BucketStream - getBucketVersioning(bucketName: string, callback: ResultCallback): void getBucketVersioning(bucketName: string): Promise diff --git a/src/minio.js b/src/minio.js index 31bcfa23..c367de27 100644 --- a/src/minio.js +++ b/src/minio.js @@ -110,7 +110,7 @@ export class Client extends TypedClient { return this.partSize } var partSize = this.partSize - for (;;) { + for (; ;) { // while(true) {...} throws linting error. // If partSize is big enough to accomodate the object size, then use it. if (partSize * 10000 > size) { @@ -598,153 +598,12 @@ export class Client extends TypedClient { // Inserts correct `content-type` attribute based on metaData and filePath metaData = insertContentType(metaData, filePath) - // Updates metaData to have the correct prefix if needed - metaData = prependXAMZMeta(metaData) - var size - var partSize - - async.waterfall( - [ - (cb) => fs.stat(filePath, cb), - (stats, cb) => { - size = stats.size - var stream - var cbTriggered = false - var origCb = cb - cb = function () { - if (cbTriggered) { - return - } - cbTriggered = true - if (stream) { - stream.destroy() - } - return origCb.apply(this, arguments) - } - if (size > this.maxObjectSize) { - return cb(new Error(`${filePath} size : ${stats.size}, max allowed size : 5TB`)) - } - if (size <= this.partSize) { - // simple PUT request, no multipart - var multipart = false - var uploader = this.getUploader(bucketName, objectName, metaData, multipart) - var hash = transformers.getHashSummer(this.enableSHA256) - var start = 0 - var end = size - 1 - var autoClose = true - if (size === 0) { - end = 0 - } - var options = { start, end, autoClose } - pipesetup(fs.createReadStream(filePath, options), hash) - .on('data', (data) => { - var md5sum = data.md5sum - var sha256sum = data.sha256sum - stream = fs.createReadStream(filePath, options) - uploader(stream, size, sha256sum, md5sum, (err, objInfo) => { - callback(err, objInfo) - cb(true) - }) - }) - .on('error', (e) => cb(e)) - return - } - this.findUploadId(bucketName, objectName, cb) - }, - (uploadId, cb) => { - // if there was a previous incomplete upload, fetch all its uploaded parts info - if (uploadId) { - return this.listParts(bucketName, objectName, uploadId, (e, etags) => cb(e, uploadId, etags)) - } - // there was no previous upload, initiate a new one - this.initiateNewMultipartUpload(bucketName, objectName, metaData, (e, uploadId) => cb(e, uploadId, [])) - }, - (uploadId, etags, cb) => { - partSize = this.calculatePartSize(size) - var multipart = true - var uploader = this.getUploader(bucketName, objectName, metaData, multipart) - - // convert array to object to make things easy - var parts = etags.reduce(function (acc, item) { - if (!acc[item.part]) { - acc[item.part] = item - } - return acc - }, {}) - var partsDone = [] - var partNumber = 1 - var uploadedSize = 0 - async.whilst( - (cb) => { - cb(null, uploadedSize < size) - }, - (cb) => { - var stream - var cbTriggered = false - var origCb = cb - cb = function () { - if (cbTriggered) { - return - } - cbTriggered = true - if (stream) { - stream.destroy() - } - return origCb.apply(this, arguments) - } - var part = parts[partNumber] - var hash = transformers.getHashSummer(this.enableSHA256) - var length = partSize - if (length > size - uploadedSize) { - length = size - uploadedSize - } - var start = uploadedSize - var end = uploadedSize + length - 1 - var autoClose = true - var options = { autoClose, start, end } - // verify md5sum of each part - pipesetup(fs.createReadStream(filePath, options), hash) - .on('data', (data) => { - var md5sumHex = Buffer.from(data.md5sum, 'base64').toString('hex') - if (part && md5sumHex === part.etag) { - // md5 matches, chunk already uploaded - partsDone.push({ part: partNumber, etag: part.etag }) - partNumber++ - uploadedSize += length - return cb() - } - // part is not uploaded yet, or md5 mismatch - stream = fs.createReadStream(filePath, options) - uploader(uploadId, partNumber, stream, length, data.sha256sum, data.md5sum, (e, objInfo) => { - if (e) { - return cb(e) - } - partsDone.push({ part: partNumber, etag: objInfo.etag }) - partNumber++ - uploadedSize += length - return cb() - }) - }) - .on('error', (e) => cb(e)) - }, - (e) => { - if (e) { - return cb(e) - } - cb(null, partsDone, uploadId) - }, - ) - }, - // all parts uploaded, complete the multipart upload - (etags, uploadId, cb) => this.completeMultipartUpload(bucketName, objectName, uploadId, etags, cb), - ], - (err, ...rest) => { - if (err === true) { - return - } - callback(err, ...rest) - }, - ) + fs.lstat(filePath, (err, stat) => { + if (err) { + return callback(err) + } + return this.putObject(bucketName, objectName, fs.createReadStream(filePath), stat.size, metaData, callback) + }) } // Uploads the object. @@ -1771,76 +1630,6 @@ export class Client extends TypedClient { }) } - // Get part-info of all parts of an incomplete upload specified by uploadId. - listParts(bucketName, objectName, uploadId, cb) { - if (!isValidBucketName(bucketName)) { - throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) - } - if (!isValidObjectName(objectName)) { - throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`) - } - if (!isString(uploadId)) { - throw new TypeError('uploadId should be of type "string"') - } - if (!uploadId) { - throw new errors.InvalidArgumentError('uploadId cannot be empty') - } - var parts = [] - var listNext = (marker) => { - this.listPartsQuery(bucketName, objectName, uploadId, marker, (e, result) => { - if (e) { - cb(e) - return - } - parts = parts.concat(result.parts) - if (result.isTruncated) { - listNext(result.marker) - return - } - cb(null, parts) - }) - } - listNext(0) - } - - // Called by listParts to fetch a batch of part-info - listPartsQuery(bucketName, objectName, uploadId, marker, cb) { - if (!isValidBucketName(bucketName)) { - throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) - } - if (!isValidObjectName(objectName)) { - throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`) - } - if (!isString(uploadId)) { - throw new TypeError('uploadId should be of type "string"') - } - if (!isNumber(marker)) { - throw new TypeError('marker should be of type "number"') - } - if (!isFunction(cb)) { - throw new TypeError('callback should be of type "function"') - } - if (!uploadId) { - throw new errors.InvalidArgumentError('uploadId cannot be empty') - } - var query = '' - if (marker && marker !== 0) { - query += `part-number-marker=${marker}&` - } - query += `uploadId=${uriEscape(uploadId)}` - - var method = 'GET' - this.makeRequest({ method, bucketName, objectName, query }, '', [200], '', true, (e, response) => { - if (e) { - return cb(e) - } - var transformer = transformers.getListPartsTransformer() - pipesetup(response, transformer) - .on('error', (e) => cb(e)) - .on('data', (data) => cb(null, data)) - }) - } - // Called by listIncompleteUploads to fetch a batch of incomplete uploads. listIncompleteUploadsQuery(bucketName, prefix, keyMarker, uploadIdMarker, delimiter) { if (!isValidBucketName(bucketName)) { diff --git a/src/type.ts b/src/type.ts new file mode 100644 index 00000000..e69de29b From 8a04a812609f1c31aeda73d26ceed5d16f2ecf28 Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:32:11 +0800 Subject: [PATCH 02/12] refactor listParts --- src/internal/client.ts | 54 ++++++++++++-------------------------- src/internal/type.ts | 4 +-- src/internal/xml-parser.ts | 7 +++-- src/minio.d.ts | 4 +-- src/minio.js | 9 +++---- src/object-uploader.js | 37 +++++++++++++------------- 6 files changed, 46 insertions(+), 69 deletions(-) diff --git a/src/internal/client.ts b/src/internal/client.ts index 9ce410a8..cd8f4665 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -1,8 +1,7 @@ import * as http from 'node:http' import * as https from 'node:https' -import * as stream from 'node:stream' +import type * as stream from 'node:stream' -import async from 'async' import { isBrowser } from 'browser-or-node' import _ from 'lodash' @@ -21,29 +20,21 @@ import { isString, isValidBucketName, isValidEndpoint, + isValidObjectName, isValidPort, - isValidPrefix, isVirtualHostStyle, makeDateLong, toSha256, uriEscape, - uriResourceEscape, isValidObjectName, isFunction, pipesetup, + uriResourceEscape, } from './helper.ts' import { request } from './request.ts' import { drainResponse, readAsString } from './response.ts' import type { Region } from './s3-endpoints.ts' import { getS3Endpoint } from './s3-endpoints.ts' -import type { - Binary, - BucketStream, - IncompleteUploadedBucketItem, - IRequest, - RequestHeaders, - Transport, -} from './type.ts' -import type { Multipart } from './xml-parser.ts' +import type { Binary, IRequest, RequestHeaders, Transport } from './type.ts' +import type { UploadedPart } from './xml-parser.ts' import * as xmlParsers from './xml-parser.ts' -import { parseListParts } from './xml-parser.ts' // will be replaced by bundler. const Package = { version: process.env.MINIO_JS_PACKAGE_VERSION || 'development' } @@ -753,9 +744,8 @@ export class TypedClient { ) } - // Get part-info of all parts of an incomplete upload specified by uploadId. - listParts(bucketName, objectName, uploadId, cb) { + async listParts(bucketName: string, objectName: string, uploadId: string): Promise { if (!isValidBucketName(bucketName)) { throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) } @@ -769,30 +759,20 @@ export class TypedClient { throw new errors.InvalidArgumentError('uploadId cannot be empty') } - const exec=async ()=>{}; - - exec().then() + const parts: UploadedPart[] = [] + let marker: number | undefined = undefined + let result + do { + result = await this.listPartsQuery(bucketName, objectName, uploadId, marker) + marker = result.marker + parts.push(...result.parts) + } while (result.isTruncated) - var parts = [] - var listNext = (marker) => { - this.listPartsQuery(bucketName, objectName, uploadId, marker, (e, result) => { - if (e) { - cb(e) - return - } - parts = parts.concat(result.parts) - if (result.isTruncated) { - listNext(result.marker) - return - } - cb(null, parts) - }) - } - listNext(0) + return parts } // Called by listParts to fetch a batch of part-info - async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker: string) { + async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker?: number) { if (!isValidBucketName(bucketName)) { throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) } @@ -810,7 +790,7 @@ export class TypedClient { } let query = '' - if (marker && marker !== 0) { + if (marker) { query += `part-number-marker=${marker}&` } query += `uploadId=${uriEscape(uploadId)}` diff --git a/src/internal/type.ts b/src/internal/type.ts index 6d38ed8c..aa5b5617 100644 --- a/src/internal/type.ts +++ b/src/internal/type.ts @@ -1,5 +1,5 @@ import type * as http from 'node:http' -import { Readable as ReadableStream } from 'stream' +import type { Readable as ReadableStream } from 'node:stream' export type Binary = string | Buffer @@ -79,4 +79,4 @@ export interface BucketStream extends ReadableStream { on(event: string | symbol, listener: (...args: any[]) => void): this } -export type ResultCallback = (error: Error | null, result: T) => void +export type ResultCallback = (error: unknown, result: T) => void diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index 3a64706e..cbda21b6 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -3,7 +3,7 @@ import type * as http from 'node:http' import { XMLParser } from 'fast-xml-parser' import * as errors from '../errors.ts' -import { parseXml, sanitizeObjectKey, toArray } from './helper.ts' +import { parseXml, toArray } from './helper.ts' import { readAsString } from './response.ts' // parse XML response for bucket region @@ -101,7 +101,6 @@ export type Multipart = { nextUploadIdMarker: undefined } - export type UploadedPart = { part: number lastModified?: Date @@ -110,8 +109,8 @@ export type UploadedPart = { // parse XML response for list parts of an in progress multipart upload export function parseListParts(xml: string): { - isTruncated: boolean; - marker: number | undefined; + isTruncated: boolean + marker: number | undefined parts: UploadedPart[] } { let xmlobj = parseXml(xml) diff --git a/src/minio.d.ts b/src/minio.d.ts index 9a1f738f..1e39f078 100644 --- a/src/minio.d.ts +++ b/src/minio.d.ts @@ -16,12 +16,12 @@ import { TypedClient } from './internal/client.ts' import { CopyConditions } from './internal/copy-conditions.ts' import { PostPolicy } from './internal/post-policy.ts' import type { Region } from './internal/s3-endpoints.ts' -import type { BucketStream, IncompleteUploadedBucketItem, ResultCallback,UploadedObjectInfo } from './internal/type.ts' +import type { BucketStream, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } from './internal/type.ts' export * from './helpers.ts' export type { Region } from './internal/s3-endpoints.ts' export { CopyConditions, PostPolicy } -export type { BucketStream,ClientOptions, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } +export type { BucketStream, ClientOptions, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } // Exports only from typings export type NotificationEvent = diff --git a/src/minio.js b/src/minio.js index c367de27..c4b6222a 100644 --- a/src/minio.js +++ b/src/minio.js @@ -110,7 +110,7 @@ export class Client extends TypedClient { return this.partSize } var partSize = this.partSize - for (; ;) { + for (;;) { // while(true) {...} throws linting error. // If partSize is big enough to accomodate the object size, then use it. if (partSize * 10000 > size) { @@ -285,14 +285,11 @@ export class Client extends TypedClient { result.uploads, (upload, cb) => { // for each incomplete upload add the sizes of its uploaded parts - this.listParts(bucket, upload.key, upload.uploadId, (err, parts) => { - if (err) { - return cb(err) - } + this.listParts(bucket, upload.key, upload.uploadId).then((parts) => { upload.size = parts.reduce((acc, item) => acc + item.size, 0) uploads.push(upload) cb() - }) + }, cb) }, (err) => { if (err) { diff --git a/src/object-uploader.js b/src/object-uploader.js index b14f75c1..aa7d238e 100644 --- a/src/object-uploader.js +++ b/src/object-uploader.js @@ -142,26 +142,27 @@ export class ObjectUploader extends Transform { this.id = id // Retrieve the pre-uploaded parts, if we need to resume the upload. - this.client.listParts(this.bucketName, this.objectName, id, (err, etags) => { - if (err) { - return this.emit('error', err) - } - - // It is possible for no parts to be already uploaded. - if (!etags) { - etags = [] - } - - // oldParts will become an object, allowing oldParts[partNumber].etag - this.oldParts = etags.reduce(function (prev, item) { - if (!prev[item.part]) { - prev[item.part] = item + this.client.listParts(this.bucketName, this.objectName, id).then( + (etags) => { + // It is possible for no parts to be already uploaded. + if (!etags) { + etags = [] } - return prev - }, {}) - this.emit('ready') - }) + // oldParts will become an object, allowing oldParts[partNumber].etag + this.oldParts = etags.reduce(function (prev, item) { + if (!prev[item.part]) { + prev[item.part] = item + } + return prev + }, {}) + + this.emit('ready') + }, + (err) => { + return this.emit('error', err) + }, + ) }) return From a9c6962d4bd2905e46368425a54daa3bfc19df7c Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:32:45 +0800 Subject: [PATCH 03/12] remove dead code --- src/transformers.js | 5 ----- src/xml-parsers.js | 34 ---------------------------------- 2 files changed, 39 deletions(-) diff --git a/src/transformers.js b/src/transformers.js index fdc77ec1..044f5779 100644 --- a/src/transformers.js +++ b/src/transformers.js @@ -111,11 +111,6 @@ export function getListMultipartTransformer() { return getConcater(xmlParsers.parseListMultipart) } -// Parses listParts response. -export function getListPartsTransformer() { - return getConcater(xmlParsers.parseListParts) -} - // Parses initMultipartUpload response. export function getInitiateMultipartTransformer() { return getConcater(xmlParsers.parseInitiateMultipart) diff --git a/src/xml-parsers.js b/src/xml-parsers.js index 30095683..3420bcba 100644 --- a/src/xml-parsers.js +++ b/src/xml-parsers.js @@ -187,40 +187,6 @@ export function parseBucketNotification(xml) { return result } -// parse XML response for list parts of an in progress multipart upload -export function parseListParts(xml) { - var xmlobj = parseXml(xml) - var result = { - isTruncated: false, - parts: [], - marker: undefined, - } - if (!xmlobj.ListPartsResult) { - throw new errors.InvalidXMLError('Missing tag: "ListPartsResult"') - } - xmlobj = xmlobj.ListPartsResult - if (xmlobj.IsTruncated) { - result.isTruncated = xmlobj.IsTruncated - } - if (xmlobj.NextPartNumberMarker) { - result.marker = +toArray(xmlobj.NextPartNumberMarker)[0] - } - if (xmlobj.Part) { - toArray(xmlobj.Part).forEach((p) => { - var part = +toArray(p.PartNumber)[0] - var lastModified = new Date(p.LastModified) - var etag = p.ETag.replace(/^"/g, '') - .replace(/"$/g, '') - .replace(/^"/g, '') - .replace(/"$/g, '') - .replace(/^"/g, '') - .replace(/"$/g, '') - result.parts.push({ part, lastModified, etag }) - }) - } - return result -} - // parse XML response when a new multipart upload is initiated export function parseInitiateMultipart(xml) { var xmlobj = parseXml(xml) From ea2d6fef56e9f5a31323223c7197c0faca383237 Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:33:47 +0800 Subject: [PATCH 04/12] protected --- src/internal/client.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/internal/client.ts b/src/internal/client.ts index cd8f4665..c0cca0d0 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -744,8 +744,10 @@ export class TypedClient { ) } - // Get part-info of all parts of an incomplete upload specified by uploadId. - async listParts(bucketName: string, objectName: string, uploadId: string): Promise { + /** + * Get part-info of all parts of an incomplete upload specified by uploadId. + */ + protected async listParts(bucketName: string, objectName: string, uploadId: string): Promise { if (!isValidBucketName(bucketName)) { throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) } @@ -771,8 +773,10 @@ export class TypedClient { return parts } - // Called by listParts to fetch a batch of part-info - async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker?: number) { + /** + * Called by listParts to fetch a batch of part-info + */ + private async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker?: number) { if (!isValidBucketName(bucketName)) { throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) } From 5cae5125722d9e31e1dbe25c610999c7a8ad70fb Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:34:32 +0800 Subject: [PATCH 05/12] remove dead code --- src/internal/as-callback.ts | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 src/internal/as-callback.ts diff --git a/src/internal/as-callback.ts b/src/internal/as-callback.ts deleted file mode 100644 index e08337e1..00000000 --- a/src/internal/as-callback.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { isFunction } from './helper.ts' - -export function asCallback( - cb: undefined | ((err: unknown | null, result: T) => void), - promise: Promise, -): Promise | void { - if (cb === undefined) { - return promise - } - - if (!isFunction(cb)) { - throw new TypeError(`callback should be of type "function", got ${cb}`) - } - - promise.then( - (result) => { - cb(null, result) - }, - (err) => { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - cb(err) - }, - ) -} - -export function asCallbackFn( - cb: undefined | ((err: unknown | null, result: T) => void), - asyncFn: () => Promise, -): Promise | void { - return asCallback(cb, asyncFn()) -} From 5e469816ef029d1dc11ece325f8d4ad1a9814f65 Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:41:55 +0800 Subject: [PATCH 06/12] fix test --- src/internal/client.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/internal/client.ts b/src/internal/client.ts index c0cca0d0..cf9556a6 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -762,11 +762,11 @@ export class TypedClient { } const parts: UploadedPart[] = [] - let marker: number | undefined = undefined + let marker = 0 let result do { result = await this.listPartsQuery(bucketName, objectName, uploadId, marker) - marker = result.marker + marker = result.marker as number parts.push(...result.parts) } while (result.isTruncated) @@ -776,7 +776,7 @@ export class TypedClient { /** * Called by listParts to fetch a batch of part-info */ - private async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker?: number) { + private async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker: number) { if (!isValidBucketName(bucketName)) { throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName) } @@ -793,11 +793,10 @@ export class TypedClient { throw new errors.InvalidArgumentError('uploadId cannot be empty') } - let query = '' + let query = `uploadId=${uriEscape(uploadId)}` if (marker) { - query += `part-number-marker=${marker}&` + query += `&part-number-marker=${marker}` } - query += `uploadId=${uriEscape(uploadId)}` const method = 'GET' const res = await this.makeRequestAsync({ method, bucketName, objectName, query }) From 57478ebcc64cc90afb807bdb393266d7fbde412c Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 03:50:03 +0800 Subject: [PATCH 07/12] types --- src/internal/type.ts | 18 ------------------ src/internal/xml-parser.ts | 3 ++- src/minio.d.ts | 26 ++++++++++++++++++++++++-- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/internal/type.ts b/src/internal/type.ts index aa5b5617..05da7797 100644 --- a/src/internal/type.ts +++ b/src/internal/type.ts @@ -1,5 +1,4 @@ import type * as http from 'node:http' -import type { Readable as ReadableStream } from 'node:stream' export type Binary = string | Buffer @@ -58,25 +57,8 @@ export interface IRequest { export type ICanonicalRequest = string -export interface UploadedObjectInfo { - etag: string - versionId: string | null -} - export interface IncompleteUploadedBucketItem { key: string uploadId: string size: number } - -export interface BucketStream extends ReadableStream { - on(event: 'data', listener: (item: T) => void): this - - on(event: 'end' | 'pause' | 'readable' | 'resume' | 'close', listener: () => void): this - - on(event: 'error', listener: (err: Error) => void): this - - on(event: string | symbol, listener: (...args: any[]) => void): this -} - -export type ResultCallback = (error: unknown, result: T) => void diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index cbda21b6..703c5fd8 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -105,6 +105,7 @@ export type UploadedPart = { part: number lastModified?: Date etag: string + size: number } // parse XML response for list parts of an in progress multipart upload @@ -139,7 +140,7 @@ export function parseListParts(xml: string): { .replace(/"$/g, '') .replace(/^"/g, '') .replace(/"$/g, '') - result.parts.push({ part, lastModified, etag }) + result.parts.push({ part, lastModified, etag, size: parseInt(p.Size, 10) }) }) } return result diff --git a/src/minio.d.ts b/src/minio.d.ts index 1e39f078..2838bbab 100644 --- a/src/minio.d.ts +++ b/src/minio.d.ts @@ -16,12 +16,12 @@ import { TypedClient } from './internal/client.ts' import { CopyConditions } from './internal/copy-conditions.ts' import { PostPolicy } from './internal/post-policy.ts' import type { Region } from './internal/s3-endpoints.ts' -import type { BucketStream, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } from './internal/type.ts' +import type { IncompleteUploadedBucketItem } from './internal/type.ts' export * from './helpers.ts' export type { Region } from './internal/s3-endpoints.ts' export { CopyConditions, PostPolicy } -export type { BucketStream, ClientOptions, IncompleteUploadedBucketItem, ResultCallback, UploadedObjectInfo } +export type { ClientOptions, IncompleteUploadedBucketItem } // Exports only from typings export type NotificationEvent = @@ -59,6 +59,7 @@ export type LockUnit = RETENTION_VALIDITY_UNITS export type LegalHoldStatus = LEGAL_HOLD_STATUS export type NoResultCallback = (error: Error | null) => void +export type ResultCallback = (error: Error | null, result: T) => void export type VersioningConfig = Record export type TagList = Record export type EmptyObject = Record @@ -98,6 +99,16 @@ export interface BucketItemStat { metaData: ItemBucketMetadata } +export interface BucketStream extends ReadableStream { + on(event: 'data', listener: (item: T) => void): this + + on(event: 'end' | 'pause' | 'readable' | 'resume' | 'close', listener: () => void): this + + on(event: 'error', listener: (err: Error) => void): this + + on(event: string | symbol, listener: (...args: any[]) => void): this +} + export interface PostPolicyResult { postURL: string formData: { @@ -118,6 +129,11 @@ export interface ItemBucketMetadata { [key: string]: any } +export interface UploadedObjectInfo { + etag: string + versionId: string | null +} + export interface Tag { Key: string Value: string @@ -254,6 +270,12 @@ export class Client extends TypedClient { listObjectsV2(bucketName: string, prefix?: string, recursive?: boolean, startAfter?: string): BucketStream + listIncompleteUploads( + bucketName: string, + prefix?: string, + recursive?: boolean, + ): BucketStream + getBucketVersioning(bucketName: string, callback: ResultCallback): void getBucketVersioning(bucketName: string): Promise From 8dc3117a4ca6fb9dfcedf62f9ef76290a7a5fe5a Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 25 May 2023 04:07:09 +0800 Subject: [PATCH 08/12] marker type --- src/internal/client.ts | 2 +- src/internal/helper.ts | 2 +- src/internal/xml-parser.ts | 6 +++--- src/type.ts | 0 4 files changed, 5 insertions(+), 5 deletions(-) delete mode 100644 src/type.ts diff --git a/src/internal/client.ts b/src/internal/client.ts index cf9556a6..cff8d926 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -766,7 +766,7 @@ export class TypedClient { let result do { result = await this.listPartsQuery(bucketName, objectName, uploadId, marker) - marker = result.marker as number + marker = result.marker parts.push(...result.parts) } while (result.isTruncated) diff --git a/src/internal/helper.ts b/src/internal/helper.ts index de58c485..16758167 100644 --- a/src/internal/helper.ts +++ b/src/internal/helper.ts @@ -571,7 +571,7 @@ export function calculateEvenSplits( return { startIndex: startIndexParts, endIndex: endIndexParts, objInfo: objInfo } } -const fxp = new XMLParser() +const fxp = new XMLParser({}) // eslint-disable-next-line @typescript-eslint/no-explicit-any export function parseXml(xml: string): any { diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index 703c5fd8..1c8d2846 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -111,14 +111,14 @@ export type UploadedPart = { // parse XML response for list parts of an in progress multipart upload export function parseListParts(xml: string): { isTruncated: boolean - marker: number | undefined + marker: number parts: UploadedPart[] } { let xmlobj = parseXml(xml) - const result: { isTruncated: boolean; marker: number | undefined; parts: UploadedPart[] } = { + const result: { isTruncated: boolean; marker: number; parts: UploadedPart[] } = { isTruncated: false, parts: [], - marker: undefined as number | undefined, + marker: 0, } if (!xmlobj.ListPartsResult) { throw new errors.InvalidXMLError('Missing tag: "ListPartsResult"') diff --git a/src/type.ts b/src/type.ts deleted file mode 100644 index e69de29b..00000000 From 0649608200efdbe682104eeafb75141430a39a1d Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 1 Jun 2023 19:02:34 +0800 Subject: [PATCH 09/12] fix s3 region --- src/internal/client.ts | 21 ++++++++++++++------- src/internal/helper.ts | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/internal/client.ts b/src/internal/client.ts index cff8d926..30d773ee 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -80,7 +80,6 @@ export type RequestOption = Partial & { method: string bucketName?: string objectName?: string - region?: string query?: string pathStyle?: boolean } @@ -279,7 +278,9 @@ export class TypedClient { * returns options object that can be used with http.request() * Takes care of constructing virtual-host-style or path-style hostname */ - protected getRequestOptions(opts: RequestOption): IRequest & { host: string; headers: Record } { + protected getRequestOptions( + opts: RequestOption & { region: string }, + ): IRequest & { host: string; headers: Record } { const method = opts.method const region = opts.region const bucketName = opts.bucketName @@ -319,8 +320,7 @@ export class TypedClient { if (accelerateEndPoint) { host = `${accelerateEndPoint}` } else { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - host = getS3Endpoint(region!) + host = getS3Endpoint(region) } } @@ -559,9 +559,9 @@ export class TypedClient { await this.checkAndRefreshCreds() // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - region = region || (await this.getBucketRegionAsync(options.bucketName!)) + const finalRegion = region || (await this.getBucketRegionAsync(options.bucketName!)) - const reqOptions = this.getRequestOptions(options) + const reqOptions = this.getRequestOptions({ ...options, region: finalRegion }) if (!this.anonymous) { // For non-anonymous https requests sha256sum is 'UNSIGNED-PAYLOAD' for signature calculation. if (!this.enableSHA256) { @@ -573,7 +573,14 @@ export class TypedClient { if (this.sessionToken) { reqOptions.headers['x-amz-security-token'] = this.sessionToken } - reqOptions.headers.authorization = signV4(reqOptions, this.accessKey, this.secretKey, region, date, sha256sum) + reqOptions.headers.authorization = signV4( + reqOptions, + this.accessKey, + this.secretKey, + finalRegion, + date, + sha256sum, + ) } const response = await request(this.transport, reqOptions, body) diff --git a/src/internal/helper.ts b/src/internal/helper.ts index 16758167..de58c485 100644 --- a/src/internal/helper.ts +++ b/src/internal/helper.ts @@ -571,7 +571,7 @@ export function calculateEvenSplits( return { startIndex: startIndexParts, endIndex: endIndexParts, objInfo: objInfo } } -const fxp = new XMLParser({}) +const fxp = new XMLParser() // eslint-disable-next-line @typescript-eslint/no-explicit-any export function parseXml(xml: string): any { From 1c41f52b1b19cc0ee3f9dffdcb998311fb4b461c Mon Sep 17 00:00:00 2001 From: Trim21 Date: Thu, 1 Jun 2023 19:05:00 +0800 Subject: [PATCH 10/12] region variable name --- src/internal/client.ts | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/internal/client.ts b/src/internal/client.ts index 30d773ee..0907c698 100644 --- a/src/internal/client.ts +++ b/src/internal/client.ts @@ -559,9 +559,9 @@ export class TypedClient { await this.checkAndRefreshCreds() // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const finalRegion = region || (await this.getBucketRegionAsync(options.bucketName!)) + region = region || (await this.getBucketRegionAsync(options.bucketName!)) - const reqOptions = this.getRequestOptions({ ...options, region: finalRegion }) + const reqOptions = this.getRequestOptions({ ...options, region }) if (!this.anonymous) { // For non-anonymous https requests sha256sum is 'UNSIGNED-PAYLOAD' for signature calculation. if (!this.enableSHA256) { @@ -573,14 +573,7 @@ export class TypedClient { if (this.sessionToken) { reqOptions.headers['x-amz-security-token'] = this.sessionToken } - reqOptions.headers.authorization = signV4( - reqOptions, - this.accessKey, - this.secretKey, - finalRegion, - date, - sha256sum, - ) + reqOptions.headers.authorization = signV4(reqOptions, this.accessKey, this.secretKey, region, date, sha256sum) } const response = await request(this.transport, reqOptions, body) From c775086ce17eb8d92c90753210cf9d3daaf81190 Mon Sep 17 00:00:00 2001 From: Trim21 Date: Sun, 11 Jun 2023 06:25:10 +0800 Subject: [PATCH 11/12] Update src/internal/xml-parser.ts --- src/internal/xml-parser.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index 1c8d2846..ac3ba004 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -128,7 +128,7 @@ export function parseListParts(xml: string): { result.isTruncated = xmlobj.IsTruncated } if (xmlobj.NextPartNumberMarker) { - result.marker = toArray(xmlobj.NextPartNumberMarker)[0] + result.marker = toArray(xmlobj.NextPartNumberMarker)[0] || '' } if (xmlobj.Part) { toArray(xmlobj.Part).forEach((p) => { From 50e74d5329b82e862ec00085263ef09c8bf1fbb3 Mon Sep 17 00:00:00 2001 From: Trim21 Date: Mon, 12 Jun 2023 17:51:08 +0800 Subject: [PATCH 12/12] use parseInt --- src/internal/xml-parser.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal/xml-parser.ts b/src/internal/xml-parser.ts index ac3ba004..1202c121 100644 --- a/src/internal/xml-parser.ts +++ b/src/internal/xml-parser.ts @@ -132,7 +132,7 @@ export function parseListParts(xml: string): { } if (xmlobj.Part) { toArray(xmlobj.Part).forEach((p) => { - const part = +toArray(p.PartNumber)[0] + const part = parseInt(toArray(p.PartNumber)[0], 10) const lastModified = new Date(p.LastModified) const etag = p.ETag.replace(/^"/g, '') .replace(/"$/g, '')