Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Jan 20, 2022
1 parent 5af2f5b commit 2bbf1dc
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
4 changes: 2 additions & 2 deletions @xen-orchestra/fs/src/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,11 @@ export default class RemoteHandlerAbstract {
return p
}

async copy(oldPath, newPath, { checksum = false, size } = {}) {
async copy(oldPath, newPath, { checksum = false } = {}) {
oldPath = normalizePath(oldPath)
newPath = normalizePath(newPath)

let p = timeout.call(this._copy(oldPath, newPath, { size }), this._timeout)
let p = timeout.call(this._copy(oldPath, newPath), this._timeout)
if (checksum) {
p = Promise.all([p, this._copy(checksumFile(oldPath), checksumFile(newPath))])
}
Expand Down
38 changes: 22 additions & 16 deletions @xen-orchestra/fs/src/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,9 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}

async _copy(oldPath, newPath, { size } = {}) {
// don't recompute size if it's already known
size = size || (await this._getSize(oldPath))
async _multipartCopy(oldPath, newPath) {
const size = await this._getSize(oldPath)
const CopySource = `/${this._bucket}/${this._dir}${oldPath}`
// fast path : don't create a transaction if there is only one fragment
if (size < MAX_OBJECT_SIZE) {
await this._s3.copyObject({
...this._createParams(newPath),
CopySource,
})
return
}
const multipartParams = await this._s3.createMultipartUpload({ ...this._createParams(newPath) })
const param2 = { ...multipartParams, CopySource }
try {
Expand All @@ -95,6 +86,22 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
}

async _copy(oldPath, newPath) {
const CopySource = `/${this._bucket}/${this._dir}${oldPath}`
try {
await this._s3.copyObject({
...this._createParams(newPath),
CopySource,
})
} catch (e) {
// object > 5GB must be copied part by part
if (e.code === 'EntityTooLarge') {
return this._multipartCopy(oldPath, newPath)
}
throw e
}
}

async _isNotEmptyDir(path) {
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
Expand Down Expand Up @@ -281,19 +288,18 @@ export default class S3Handler extends RemoteHandlerAbstract {
NextContinuationToken = result.isTruncated ? result.NextContinuationToken : undefined
await asyncEach(
result.Contents,
async ({Key})=>{
// _unlink will add the prefix, but Key contains everything
// also we don't need to check if we delete a directory, since the list only return files
async ({ Key }) => {
// _unlink will add the prefix, but Key contains everything
// also we don't need to check if we delete a directory, since the list only return files
await this._s3.deleteObject({
Bucket: this._bucket,
Key,
})
},
{
concurrency: 16
concurrency: 16,
}
)

} while (NextContinuationToken !== undefined)
}

Expand Down
3 changes: 1 addition & 2 deletions packages/vhd-lib/Vhd/VhdDirectory.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
}
await this._handler.copy(
child._getChunkPath(child._getBlockPath(blockId)),
this._getChunkPath(this._getBlockPath(blockId)),
{ size: this.fullBlockSize }
this._getChunkPath(this._getBlockPath(blockId))
)
return sectorsToBytes(this.sectorsPerBlock)
}
Expand Down
20 changes: 15 additions & 5 deletions packages/vhd-lib/merge.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ const { VhdDirectory } = require('./Vhd/VhdDirectory')

const { warn } = createLogger('vhd-lib:merge')

function makeThrottledWriter(handler, path, delay){
let lastWrite = new Date()
return async json => {
if(new Date() - lastWrite > delay ){
lastWrite = new Date()
await handler.writeFile(path, JSON.stringify(json), { flags: 'w' }).catch(warn)
}
}
}

// Merge vhd child into vhd parent.
//
// TODO: rename the VHD file during the merge
Expand Down Expand Up @@ -90,10 +100,9 @@ module.exports = limitConcurrency(2)(async function merge(

const merging = new Set()
let counter = 0
const mergeStateInterval = setInterval(async () => {
mergeState.currentBlock = Math.min.apply(this, [...merging])
await parentHandler.writeFile(mergeStatePath, JSON.stringify(mergeState), { flags: 'w' }).catch(warn)
}, 10e3)

const mergeStateWriter = makeThrottledWriter(parentHandler, mergeStatePath, 10e3)

await asyncEach(
toMerge,
async blockId => {
Expand All @@ -110,12 +119,13 @@ module.exports = limitConcurrency(2)(async function merge(
done: counter + 1,
})
counter++
mergeState.currentBlock = Math.min(...merging)
mergeStateWriter(merging)
},
{
concurrency,
}
)
clearInterval(mergeStateInterval)
onProgress({ total: nBlocks, done: nBlocks })
// some blocks could have been created or moved in parent : write bat
await parentVhd.writeBlockAllocationTable()
Expand Down

0 comments on commit 2bbf1dc

Please sign in to comment.