Skip to content

Commit

Permalink
Merge pull request #8388 from romayalon/romy-threaded-multi-object-de…
Browse files Browse the repository at this point in the history
…lete

NC | GPFS | Threaded Multiple Object Delete fix
  • Loading branch information
romayalon authored Sep 24, 2024
2 parents 504f3b4 + 110abc2 commit 4b1fdab
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ Attached a table with tests that where investigated and their status (this table
change in our repo) - stopped passing between the update of commit hash 6861c3d81081a6883fb90d66cb60392e1abdf3ca to da91ad8bbf899c72199df35b69e9393c706aabee |
| test_get_object_ifmodifiedsince_failed | Internal Component | | It used to pass in the past (not related to code
change in our repo) - stopped passing between the update of commit hash 6861c3d81081a6883fb90d66cb60392e1abdf3ca to da91ad8bbf899c72199df35b69e9393c706aabee |
| test_versioning_concurrent_multi_object_delete | Faulty Test | [588](https://github.com/ceph/s3-tests/issues/588) |
9 changes: 8 additions & 1 deletion src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2766,9 +2766,16 @@ class NamespaceFS {
}
return version_info;
} catch (err) {
dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err);
retries -= 1;
// there are a few concurrency scenarios that might happen we should retry for -
// 1. the version id is the latest, concurrent put will might move the version id from being the latest to .versions/ -
// will throw safe unlink failed on non matching fd (on GPFS) or inode/mtime (on POSIX).
// 2. the version id is the second latest and stays under .versions/ - on concurrent delete of the latest,
// the version id might move to be the latest and we will get ENOENT
// 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
// after we will see that the version was already deleted
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
dbg.warn(`NamespaceFS._delete_single_object_versioned: retrying retries=${retries} file_path=${file_path}`, err);
} finally {
if (gpfs_options) await this._close_files_gpfs(fs_context, gpfs_options.delete_version, undefined, true);
}
Expand Down
43 changes: 43 additions & 0 deletions src/test/unit_tests/jest_tests/test_versioning_concurrency.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const fs_utils = require('../../../util/fs_utils');
const NamespaceFS = require('../../../sdk/namespace_fs');
const buffer_utils = require('../../../util/buffer_utils');
const { TMP_PATH } = require('../../system_tests/test_utils');
const { crypto_random_string } = require('../../../util/string_utils');
const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector');

function make_dummy_object_sdk(nsfs_config, uid, gid) {
Expand Down Expand Up @@ -81,4 +82,46 @@ describe('test versioning concurrency', () => {
await P.delay(1000);
expect(number_of_successful_operations.length).toBe(15);
});

// same as s3tests_boto3/functional/test_s3.py::test_versioning_concurrent_multi_object_delete,
// this test has a bug, it tries to create the bucket twice and fails
// https://github.com/ceph/s3-tests/blob/master/s3tests_boto3/functional/test_s3.py#L1642
// see - https://github.com/ceph/s3-tests/issues/588
it('concurrent multi object delete', async () => {
const bucket = 'bucket1';
const concurrency_num = 10;
const delete_objects_arr = [];
for (let i = 0; i < concurrency_num; i++) {
const key = `key${i}`;
const random_data = Buffer.from(String(crypto_random_string(7)));
const body = buffer_utils.buffer_to_read_stream(random_data);
const res = await nsfs.upload_object({ bucket: bucket, key: key, source_stream: body }, DUMMY_OBJECT_SDK);
delete_objects_arr.push({ key: key, version_id: res.version_id });
}
const versions = await nsfs.list_object_versions({ bucket: bucket }, DUMMY_OBJECT_SDK);

for (const { key, version_id } of delete_objects_arr) {
const found = versions.objects.find(object => object.key === key && object.version_id === version_id);
expect(found).toBeDefined();
}

const delete_responses = [];
const delete_errors = [];

for (let i = 0; i < concurrency_num; i++) {
nsfs.delete_multiple_objects({ bucket, objects: delete_objects_arr }, DUMMY_OBJECT_SDK)
.then(res => delete_responses.push(res))
.catch(err => delete_errors.push(err));
}
await P.delay(5000);
expect(delete_responses.length).toBe(concurrency_num);
for (const res of delete_responses) {
expect(res.length).toBe(concurrency_num);
for (const single_delete_res of res) {
expect(single_delete_res.err_message).toBe(undefined);
}
}
const list_res = await nsfs.list_objects({ bucket: bucket }, DUMMY_OBJECT_SDK);
expect(list_res.objects.length).toBe(0);
}, 8000);
});
8 changes: 4 additions & 4 deletions src/util/native_fs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ async function safe_unlink_gpfs(fs_context, to_delete_path, to_delete_file, dir_
}

function should_retry_link_unlink(is_gpfs, err) {
return is_gpfs ?
[gpfs_link_unlink_retry_err, gpfs_unlink_retry_catch].includes(err.code) :
[posix_link_retry_err, posix_unlink_retry_err].includes(err.message) ||
['EEXIST'].includes(err.code);
const should_retry_general = ['ENOENT', 'EEXIST'].includes(err.code);
const should_retry_gpfs = [gpfs_link_unlink_retry_err, gpfs_unlink_retry_catch].includes(err.code);
const should_retry_posix = [posix_link_retry_err, posix_unlink_retry_err].includes(err.message);
return should_retry_general || is_gpfs ? should_retry_gpfs : should_retry_posix;
}

////////////////////////
Expand Down

0 comments on commit 4b1fdab

Please sign in to comment.