Skip to content

Commit

Permalink
NC | Threaded Multiple Object Delete fix
Browse files Browse the repository at this point in the history
Signed-off-by: Romy <[email protected]>
  • Loading branch information
romayalon committed Sep 19, 2024
1 parent 3c8a29a commit b4b2968
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2766,9 +2766,11 @@ class NamespaceFS {
}
return version_info;
} catch (err) {
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err);
retries -= 1;
// retry also on ENOENT if the version moved from .versions -> latest or the opposite/deleted concurrently
// on the next retry will return
if (retries <= 0 || (!native_fs_utils.should_retry_link_unlink(is_gpfs, err) && err.code !== 'ENOENT')) throw err;
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
}
Expand Down
43 changes: 43 additions & 0 deletions src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const fs_utils = require('../../../util/fs_utils');
const NamespaceFS = require('../../../sdk/namespace_fs');
const buffer_utils = require('../../../util/buffer_utils');
const { TMP_PATH } = require('../../system_tests/test_utils');
const { crypto_random_string } = require('../../../util/string_utils');
const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector');

function make_dummy_object_sdk(nsfs_config, uid, gid) {
Expand Down Expand Up @@ -81,4 +82,46 @@ describe('test versioning concurrency', () => {
await P.delay(1000);
expect(number_of_successful_operations.length).toBe(15);
});

// same as s3tests_boto3/functional/test_s3.py::test_versioning_concurrent_multi_object_delete,
// this test has a bug, it tries to create the bucket twice and fails
// https://github.com/ceph/s3-tests/blob/master/s3tests_boto3/functional/test_s3.py#L1642
// see - https://github.com/ceph/s3-tests/issues/588
it('concurrent multi object delete', async () => {
const bucket = 'bucket1';
const concurrency_num = 10;
const delete_objects_arr = [];
for (let i = 0; i < concurrency_num; i++) {
const key = `key${i}`;
const random_data = Buffer.from(String(crypto_random_string(7)));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK);
delete_objects_arr.push({ key: key, version_id: res.version_id });
}
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);

for (const { key, version_id } of delete_objects_arr) {
const found = versions.objects.find(object => object.key === key && object.version_id === version_id);
expect(found).toBeDefined();
}

const delete_responses = [];
const delete_errors = [];

for (let i = 0; i < concurrency_num; i++) {
nsfs.delete_multiple_objects({ bucket, objects: delete_objects_arr }, DUMMY_OBJECT_SDK)
.then(res => delete_responses.push(res))
.catch(err => delete_errors.push(err));
}
await P.delay(5000);
expect(delete_responses.length).toBe(concurrency_num);
for (const res of delete_responses) {
expect(res.length).toBe(concurrency_num);
for (const single_delete_res of res) {
expect(single_delete_res.err_message).toBe(undefined);
}
}
const list_res = await nsfs.list_objects({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(list_res.objects.length).toBe(0);
}, 8000);
});

0 comments on commit b4b2968

Please sign in to comment.