Skip to content

Commit

Permalink
refactor: rewrite listParts in TypeScript (#1160)
Browse files Browse the repository at this point in the history
* wip

* refactor listParts

* remove dead code

* protected

* remove dead code

* fix test

* types

* marker type

* fix s3 region

* region variable name

* Update src/internal/xml-parser.ts

* use parseInt

---------

Co-authored-by: Prakash Senthil Vel <[email protected]>
  • Loading branch information
trim21 and prakashsvmx authored Jun 21, 2023
1 parent 05880d1 commit 582fb42
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 140 deletions.
62 changes: 62 additions & 0 deletions src/internal/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@ import {
isString,
isValidBucketName,
isValidEndpoint,
isValidObjectName,
isValidPort,
isVirtualHostStyle,
makeDateLong,
toSha256,
uriEscape,
uriResourceEscape,
} from './helper.ts'
import { request } from './request.ts'
import { drainResponse, readAsString } from './response.ts'
import type { Region } from './s3-endpoints.ts'
import { getS3Endpoint } from './s3-endpoints.ts'
import type { Binary, IRequest, RequestHeaders, Transport } from './type.ts'
import type { UploadedPart } from './xml-parser.ts'
import * as xmlParsers from './xml-parser.ts'

// will be replaced by bundler.
Expand Down Expand Up @@ -740,4 +743,63 @@ export class TypedClient {
(err) => cb(err),
)
}

/**
* Get part-info of all parts of an incomplete upload specified by uploadId.
*/
protected async listParts(bucketName: string, objectName: string, uploadId: string): Promise<UploadedPart[]> {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isValidObjectName(objectName)) {
throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`)
}
if (!isString(uploadId)) {
throw new TypeError('uploadId should be of type "string"')
}
if (!uploadId) {
throw new errors.InvalidArgumentError('uploadId cannot be empty')
}

const parts: UploadedPart[] = []
let marker = 0
let result
do {
result = await this.listPartsQuery(bucketName, objectName, uploadId, marker)
marker = result.marker
parts.push(...result.parts)
} while (result.isTruncated)

return parts
}

/**
* Called by listParts to fetch a batch of part-info
*/
private async listPartsQuery(bucketName: string, objectName: string, uploadId: string, marker: number) {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isValidObjectName(objectName)) {
throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`)
}
if (!isString(uploadId)) {
throw new TypeError('uploadId should be of type "string"')
}
if (!isNumber(marker)) {
throw new TypeError('marker should be of type "number"')
}
if (!uploadId) {
throw new errors.InvalidArgumentError('uploadId cannot be empty')
}

let query = `uploadId=${uriEscape(uploadId)}`
if (marker) {
query += `&part-number-marker=${marker}`
}

const method = 'GET'
const res = await this.makeRequestAsync({ method, bucketName, objectName, query })
return xmlParsers.parseListParts(await readAsString(res))
}
}
6 changes: 6 additions & 0 deletions src/internal/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ export interface IRequest {
}

export type ICanonicalRequest = string

export interface IncompleteUploadedBucketItem {
key: string
uploadId: string
size: number
}
62 changes: 61 additions & 1 deletion src/internal/xml-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type * as http from 'node:http'
import { XMLParser } from 'fast-xml-parser'

import * as errors from '../errors.ts'
import { parseXml } from './helper.ts'
import { parseXml, toArray } from './helper.ts'
import { readAsString } from './response.ts'

// parse XML response for bucket region
Expand Down Expand Up @@ -85,3 +85,63 @@ export async function parseResponseError(response: http.IncomingMessage) {

throw e
}

export type Multipart = {
uploads: Array<{
key: string
uploadId: string
initiator: unknown
owner: unknown
storageClass: unknown
initiated: unknown
}>
prefixes: { prefix: string }[]
isTruncated: boolean
nextKeyMarker: undefined
nextUploadIdMarker: undefined
}

export type UploadedPart = {
part: number
lastModified?: Date
etag: string
size: number
}

// parse XML response for list parts of an in progress multipart upload
export function parseListParts(xml: string): {
isTruncated: boolean
marker: number
parts: UploadedPart[]
} {
let xmlobj = parseXml(xml)
const result: { isTruncated: boolean; marker: number; parts: UploadedPart[] } = {
isTruncated: false,
parts: [],
marker: 0,
}
if (!xmlobj.ListPartsResult) {
throw new errors.InvalidXMLError('Missing tag: "ListPartsResult"')
}
xmlobj = xmlobj.ListPartsResult
if (xmlobj.IsTruncated) {
result.isTruncated = xmlobj.IsTruncated
}
if (xmlobj.NextPartNumberMarker) {
result.marker = toArray(xmlobj.NextPartNumberMarker)[0] || ''
}
if (xmlobj.Part) {
toArray(xmlobj.Part).forEach((p) => {
const part = parseInt(toArray(p.PartNumber)[0], 10)
const lastModified = new Date(p.LastModified)
const etag = p.ETag.replace(/^"/g, '')
.replace(/"$/g, '')
.replace(/^&quot;/g, '')
.replace(/&quot;$/g, '')
.replace(/^&#34;/g, '')
.replace(/&#34;$/g, '')
result.parts.push({ part, lastModified, etag, size: parseInt(p.Size, 10) })
})
}
return result
}
9 changes: 2 additions & 7 deletions src/minio.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import { TypedClient } from './internal/client.ts'
import { CopyConditions } from './internal/copy-conditions.ts'
import { PostPolicy } from './internal/post-policy.ts'
import type { Region } from './internal/s3-endpoints.ts'
import type { IncompleteUploadedBucketItem } from './internal/type.ts'

export * from './helpers.ts'
export type { Region } from './internal/s3-endpoints.ts'
export { CopyConditions, PostPolicy }
export type { ClientOptions }
export type { ClientOptions, IncompleteUploadedBucketItem }

// Exports only from typings
export type NotificationEvent =
Expand Down Expand Up @@ -98,12 +99,6 @@ export interface BucketItemStat {
metaData: ItemBucketMetadata
}

export interface IncompleteUploadedBucketItem {
key: string
uploadId: string
size: number
}

export interface BucketStream<T> extends ReadableStream {
on(event: 'data', listener: (item: T) => void): this

Expand Down
77 changes: 2 additions & 75 deletions src/minio.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,11 @@ export class Client extends TypedClient {
result.uploads,
(upload, cb) => {
// for each incomplete upload add the sizes of its uploaded parts
this.listParts(bucket, upload.key, upload.uploadId, (err, parts) => {
if (err) {
return cb(err)
}
this.listParts(bucket, upload.key, upload.uploadId).then((parts) => {
upload.size = parts.reduce((acc, item) => acc + item.size, 0)
uploads.push(upload)
cb()
})
}, cb)
},
(err) => {
if (err) {
Expand Down Expand Up @@ -1630,76 +1627,6 @@ export class Client extends TypedClient {
})
}

// Get part-info of all parts of an incomplete upload specified by uploadId.
listParts(bucketName, objectName, uploadId, cb) {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isValidObjectName(objectName)) {
throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`)
}
if (!isString(uploadId)) {
throw new TypeError('uploadId should be of type "string"')
}
if (!uploadId) {
throw new errors.InvalidArgumentError('uploadId cannot be empty')
}
var parts = []
var listNext = (marker) => {
this.listPartsQuery(bucketName, objectName, uploadId, marker, (e, result) => {
if (e) {
cb(e)
return
}
parts = parts.concat(result.parts)
if (result.isTruncated) {
listNext(result.marker)
return
}
cb(null, parts)
})
}
listNext(0)
}

// Called by listParts to fetch a batch of part-info
listPartsQuery(bucketName, objectName, uploadId, marker, cb) {
if (!isValidBucketName(bucketName)) {
throw new errors.InvalidBucketNameError('Invalid bucket name: ' + bucketName)
}
if (!isValidObjectName(objectName)) {
throw new errors.InvalidObjectNameError(`Invalid object name: ${objectName}`)
}
if (!isString(uploadId)) {
throw new TypeError('uploadId should be of type "string"')
}
if (!isNumber(marker)) {
throw new TypeError('marker should be of type "number"')
}
if (!isFunction(cb)) {
throw new TypeError('callback should be of type "function"')
}
if (!uploadId) {
throw new errors.InvalidArgumentError('uploadId cannot be empty')
}
var query = ''
if (marker && marker !== 0) {
query += `part-number-marker=${marker}&`
}
query += `uploadId=${uriEscape(uploadId)}`

var method = 'GET'
this.makeRequest({ method, bucketName, objectName, query }, '', [200], '', true, (e, response) => {
if (e) {
return cb(e)
}
var transformer = transformers.getListPartsTransformer()
pipesetup(response, transformer)
.on('error', (e) => cb(e))
.on('data', (data) => cb(null, data))
})
}

// Called by listIncompleteUploads to fetch a batch of incomplete uploads.
listIncompleteUploadsQuery(bucketName, prefix, keyMarker, uploadIdMarker, delimiter) {
if (!isValidBucketName(bucketName)) {
Expand Down
37 changes: 19 additions & 18 deletions src/object-uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,27 @@ export class ObjectUploader extends Transform {
this.id = id

// Retrieve the pre-uploaded parts, if we need to resume the upload.
this.client.listParts(this.bucketName, this.objectName, id, (err, etags) => {
if (err) {
return this.emit('error', err)
}

// It is possible for no parts to be already uploaded.
if (!etags) {
etags = []
}

// oldParts will become an object, allowing oldParts[partNumber].etag
this.oldParts = etags.reduce(function (prev, item) {
if (!prev[item.part]) {
prev[item.part] = item
this.client.listParts(this.bucketName, this.objectName, id).then(
(etags) => {
// It is possible for no parts to be already uploaded.
if (!etags) {
etags = []
}
return prev
}, {})

this.emit('ready')
})
// oldParts will become an object, allowing oldParts[partNumber].etag
this.oldParts = etags.reduce(function (prev, item) {
if (!prev[item.part]) {
prev[item.part] = item
}
return prev
}, {})

this.emit('ready')
},
(err) => {
return this.emit('error', err)
},
)
})

return
Expand Down
5 changes: 0 additions & 5 deletions src/transformers.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ export function getListMultipartTransformer() {
return getConcater(xmlParsers.parseListMultipart)
}

// Parses listParts response.
export function getListPartsTransformer() {
return getConcater(xmlParsers.parseListParts)
}

// Parses initMultipartUpload response.
export function getInitiateMultipartTransformer() {
return getConcater(xmlParsers.parseInitiateMultipart)
Expand Down
34 changes: 0 additions & 34 deletions src/xml-parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,40 +187,6 @@ export function parseBucketNotification(xml) {
return result
}

// parse XML response for list parts of an in progress multipart upload
export function parseListParts(xml) {
var xmlobj = parseXml(xml)
var result = {
isTruncated: false,
parts: [],
marker: undefined,
}
if (!xmlobj.ListPartsResult) {
throw new errors.InvalidXMLError('Missing tag: "ListPartsResult"')
}
xmlobj = xmlobj.ListPartsResult
if (xmlobj.IsTruncated) {
result.isTruncated = xmlobj.IsTruncated
}
if (xmlobj.NextPartNumberMarker) {
result.marker = +toArray(xmlobj.NextPartNumberMarker)[0]
}
if (xmlobj.Part) {
toArray(xmlobj.Part).forEach((p) => {
var part = +toArray(p.PartNumber)[0]
var lastModified = new Date(p.LastModified)
var etag = p.ETag.replace(/^"/g, '')
.replace(/"$/g, '')
.replace(/^&quot;/g, '')
.replace(/&quot;$/g, '')
.replace(/^&#34;/g, '')
.replace(/&#34;$/g, '')
result.parts.push({ part, lastModified, etag })
})
}
return result
}

// parse XML response when a new multipart upload is initiated
export function parseInitiateMultipart(xml) {
var xmlobj = parseXml(xml)
Expand Down

0 comments on commit 582fb42

Please sign in to comment.