Skip to content

Commit

Permalink
NSFS | versioning | copy_object - close chunkfs read stream to preven…
Browse files Browse the repository at this point in the history
…t stream being closed after stat
  • Loading branch information
nadavMiz committed Nov 25, 2024
1 parent 412e804 commit c15af6e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1140,9 +1140,7 @@ class NamespaceFS {
// end the stream
res.end();

// in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
// and don't care about the readable part, set readable: false
await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal });
await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal });
object_sdk.throw_if_aborted();

dbg.log0('NamespaceFS: read_object_stream completed file', file_path, {
Expand Down Expand Up @@ -1581,9 +1579,15 @@ class NamespaceFS {
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
chunk_fs.on('finish', arg => dbg.error('namespace_fs._upload_stream: finish occured on stream ChunkFS: ', arg));
chunk_fs.on('close', arg => dbg.error('namespace_fs._upload_stream: close occured on stream ChunkFS: ', arg));
if (copy_source) {
// ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part
// we need to close both read and write parts for Transform stream to properly close and release resorces
chunk_fs.resume();
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
} else if (params.source_params) {
chunk_fs.resume();
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
Expand Down
10 changes: 10 additions & 0 deletions src/test/unit_tests/test_bucketspace_versioning.js
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ mocha.describe('bucketspace namespace_fs - versioning', function() {
assert.ok(exist2);
});

mocha.it('copy object version id - version matches mtime and inode', async function() {
const key = 'copied_key5.txt';
const res = await s3_uid6.copyObject({ Bucket: bucket_name, Key: key,
CopySource: `${bucket_name}/${key1}?versionId=${key1_ver1}`});
const obj_path = path.join(full_path, key);
const stat = await nb_native().fs.stat(DEFAULT_FS_CONFIG, obj_path);
const expcted_version = 'mtime-' + stat.mtimeNsBigint.toString(36) + '-ino-' + stat.ino.toString(36);
assert.equal(expcted_version, res.VersionId);
});

mocha.it('delete object - versioning enabled - nested key (more than 1 level)- delete partial directory', async function() {
const parital_nested_directory = dir_path_complete.slice(0, -1); // the directory without the last slash
const folder_path_nested = path.join(nested_keys_full_path, dir_path_complete, NSFS_FOLDER_OBJECT_NAME);
Expand Down

0 comments on commit c15af6e

Please sign in to comment.