Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notimplemented handling #2108

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/mock/s3_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& s3_object

const auto not_found_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false));
const auto precondition_failed_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::UNKNOWN, "Precondition failed", "Precondition failed", false));
const auto not_implemented_error = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(Aws::S3::S3Errors::UNKNOWN, "NotImplemented", "A header you provided implies functionality that is not implemented", false));

S3Result<std::monostate> MockS3Client::head_object(
const std::string& s3_object_name,
Expand Down Expand Up @@ -91,6 +92,11 @@ S3Result<std::monostate> MockS3Client::put_object(
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(s3_object_name, StorageOperation::WRITE);

if (maybe_error.has_value() && header == PutHeader::IF_NONE_MATCH) {
return {not_implemented_error};
}

if (maybe_error.has_value()) {
return {*maybe_error};
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ using Range = folly::Range<It>;
error_message_suffix));
}

if(err.GetExceptionName().find("NotImplemented") != std::string::npos) {
raise<ErrorCode::E_NOT_IMPLEMENTED>(fmt::format("Operation is not implemented for storage: {}", error_message_suffix));
}

if (err.ShouldRetry()) {
raise<ErrorCode::E_S3_RETRYABLE>(fmt::format("Retry-able error: {}",
error_message_suffix));
}

// We create a more detailed error explanation in case of NETWORK_CONNECTION errors to remedy #880.
if (type == Aws::S3::S3Errors::NETWORK_CONNECTION) {
error_message = fmt::format("Unexpected network error: {} "
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class NfsBackedStorage final : public Storage {
void do_write(KeySegmentPair&& key_seg) final;

void do_write_if_none(KeySegmentPair&& kv [[maybe_unused]]) final {
storage::raise<ErrorCode::E_UNSUPPORTED_ATOMIC_OPERATION>("Atomic operations are only supported for s3 backend");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes for NFS backed storages were made to make testing easier and to add the same general behavior as for S3 storages.
This can be reverted, if needed.

storage::raise<ErrorCode::E_NOT_IMPLEMENTED>("do_write_if_none not implemented for NFS backed storage");
};

void do_update(KeySegmentPair&& key_seg, UpdateOpts opts) final;
Expand All @@ -58,7 +58,7 @@ class NfsBackedStorage final : public Storage {
}

bool do_supports_atomic_writes() const final {
return false;
return ConfigsMap::instance()->get_int("NfsStorage.SupportsAtomicWrites", 0) == 1;
}

bool do_fast_delete() final {
Expand Down
7 changes: 7 additions & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(5020, E_UNEXPECTED_S3_ERROR) \
ERROR_CODE(5021, E_S3_RETRYABLE) \
ERROR_CODE(5022, E_ATOMIC_OPERATION_FAILED) \
ERROR_CODE(5023, E_NOT_IMPLEMENTED) \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This sounds a bit like something that we did not implement yet. It's more that the storage (PURE) does not support it.
What do you think about E_NOT_IMPLEMENTED_BY_STORAGE or E_NOT_SUPPORTED_BY_STORAGE?

ERROR_CODE(5030, E_UNEXPECTED_AZURE_ERROR) \
ERROR_CODE(5050, E_MONGO_BULK_OP_NO_REPLY) \
ERROR_CODE(5051, E_UNEXPECTED_MONGO_ERROR) \
Expand Down Expand Up @@ -181,6 +182,7 @@ using UnsortedDataException = ArcticSpecificException<ErrorCode::E_UNSORTED_DATA
using UserInputException = ArcticCategorizedException<ErrorCategory::USER_INPUT>;
using CompatibilityException = ArcticCategorizedException<ErrorCategory::COMPATIBILITY>;
using CodecException = ArcticCategorizedException<ErrorCategory::CODEC>;
using NotImplementedException = ArcticSpecificException<ErrorCode::E_NOT_IMPLEMENTED>;

template<ErrorCode error_code>
[[noreturn]] void throw_error(const std::string& msg) {
Expand Down Expand Up @@ -242,6 +244,11 @@ template<>
throw ArcticSpecificException<ErrorCode::E_UNSORTED_DATA>(msg);
}

template<>
[[noreturn]] inline void throw_error<ErrorCode::E_NOT_IMPLEMENTED>(const std::string& msg) {
throw ArcticSpecificException<ErrorCode::E_NOT_IMPLEMENTED>(msg);
}

}

namespace fmt {
Expand Down
65 changes: 49 additions & 16 deletions cpp/arcticdb/util/reliable_storage_lock-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#pragma once

#include <arcticdb/util/reliable_storage_lock.hpp>

Expand Down Expand Up @@ -107,18 +108,34 @@ void ReliableStorageLock<ClockType>::clear_old_locks(const std::vector<AcquiredL
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_take_lock() const {
ReliableLockResult ReliableStorageLock<ClockType>::try_take_lock() const {
auto [existing_locks, latest] = get_all_locks();
if (latest.has_value()) {
auto expires = get_expiration(RefKey{get_stream_id(latest.value()), KeyType::ATOMIC_LOCK});
if (expires > ClockType::nanos_since_epoch()) {
// An unexpired lock exists
return std::nullopt;
return LockInUse{};
}
}
return try_take_next_id(existing_locks, latest);
}

inline std::optional<AcquiredLockId> parse_reliable_storage_lock_result(const ReliableLockResult& result) {
return util::variant_match(
result,
[&](const AcquiredLock &acquired_lock) -> std::optional<AcquiredLockId> {
return acquired_lock;
},
[&](const LockInUse &) -> std::optional<AcquiredLockId> {
return std::nullopt;
},
[&](const UnsupportedOperation &err) -> std::optional<AcquiredLockId> {
log::lock().error("Unsupported operation while taking lock: {}", err);
throw LostReliableLock();
}
);
}

template<class ClockType>
AcquiredLockId ReliableStorageLock<ClockType>::retry_until_take_lock() const {
// We don't use the ExponentialBackoff because we want to be able to wait indefinitely
Expand All @@ -132,24 +149,25 @@ AcquiredLockId ReliableStorageLock<ClockType>::retry_until_take_lock() const {
return current_wait * factor;
};

auto acquired_lock = try_take_lock();
std::optional<AcquiredLockId> acquired_lock = parse_reliable_storage_lock_result(try_take_lock());

while (!acquired_lock.has_value()) {
std::this_thread::sleep_for(jittered_wait());
current_wait = std::min(current_wait * 2, max_wait);
acquired_lock = try_take_lock();
acquired_lock = parse_reliable_storage_lock_result(try_take_lock());
}
return acquired_lock.value();
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_extend_lock(AcquiredLockId acquired_lock) const {
ReliableLockResult ReliableStorageLock<ClockType>::try_extend_lock(AcquiredLockId acquired_lock) const {
auto [existing_locks, latest] = get_all_locks();
util::check(latest.has_value() && latest.value() >= acquired_lock,
"We are trying to extend a newer lock_id than the existing one in storage. Extend lock_id: {}",
acquired_lock);
if (latest.value() != acquired_lock) {
// We have lost the lock while holding it (most likely due to timeout).
return std::nullopt;
return LockInUse{};
}
return try_take_next_id(existing_locks, latest);
}
Expand All @@ -170,23 +188,27 @@ void ReliableStorageLock<ClockType>::free_lock(AcquiredLockId acquired_lock) con
}

template <class ClockType>
std::optional<AcquiredLockId> ReliableStorageLock<ClockType>::try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const {
ReliableLockResult ReliableStorageLock<ClockType>::try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const {
AcquiredLockId lock_id = get_next_id(latest);
auto lock_stream_id = get_stream_id(lock_id);
auto expiration = ClockType::nanos_since_epoch() + timeout_;
try {
store_->write_if_none_sync(KeyType::ATOMIC_LOCK, lock_stream_id, lock_segment(lock_stream_id, expiration));
} catch (const NotImplementedException& e) {
auto err = fmt::format("Failed to acquire lock (storage does not support atomic writes): {}", e.what());
log::lock().debug(err);
return UnsupportedOperation{err};
} catch (const StorageException& e) {
// There is no specific Aws::S3::S3Errors for the failed atomic operation, so we catch any StorageException.
// Either way it's safe to assume we have failed to acquire the lock in case of transient S3 error.
// If error persists we'll approprieately raise in the next attempt to LIST/GET the existing lock and propagate
// If error persists we'll appropriately raise in the next attempt to LIST/GET the existing lock and propagate
// the transient error.
log::lock().debug("Failed to acquire lock (likely someone acquired it before us): {}", e.what());
return std::nullopt;
return LockInUse{};
}
// We clear old locks only after aquiring the lock to avoid duplicating the deletion work
// We clear old locks only after acquiring the lock to avoid duplicating the deletion work
clear_old_locks(existing_locks);
return lock_id;
return AcquiredLock{lock_id};
}

inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageLock<> &lock, AcquiredLockId acquired_lock, std::optional<folly::Func>&& on_lost_lock) :
Expand All @@ -198,11 +220,22 @@ inline ReliableStorageLockGuard::ReliableStorageLockGuard(const ReliableStorageL
extend_lock_heartbeat_.addFunction(
[that=this](){
if (that->acquired_lock_.has_value()) {
that->acquired_lock_ = that->lock_.try_extend_lock(that->acquired_lock_.value());
if (!that->acquired_lock_.has_value()) {
// Clean up if we have lost the lock.
that->cleanup_on_lost_lock();
}
auto result = that->lock_.try_extend_lock(that->acquired_lock_.value());
util::variant_match(
result,
[&](AcquiredLock &acquired_lock) {
that->acquired_lock_ = acquired_lock;
},
[&](LockInUse &) {
// Clean up if we have lost the lock.
that->cleanup_on_lost_lock();
},
[&](UnsupportedOperation &err) {
// This should never happen
log::lock().error("Unsupported operation while extending lock {}: {}", that->acquired_lock_.value(), err);
that->cleanup_on_lost_lock();
}
);
}
}, hearbeat_frequency, "Extend lock", hearbeat_frequency);
extend_lock_heartbeat_.start();
Expand Down
12 changes: 9 additions & 3 deletions cpp/arcticdb/util/reliable_storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ namespace lock {

using AcquiredLockId = uint64_t;

using AcquiredLock = AcquiredLockId;
using LockInUse = std::monostate;
using UnsupportedOperation = std::string;

using ReliableLockResult = std::variant<AcquiredLock, LockInUse, UnsupportedOperation>;

// The ReliableStorageLock is a storage lock which relies on atomic If-None-Match Put and ListObject operations to
// provide a more reliable lock than the StorageLock but it requires the backend to support atomic operations. It should
// be completely consistent unless a process holding a lock gets paused for times comparable to the lock timeout.
Expand All @@ -36,12 +42,12 @@ class ReliableStorageLock {
ReliableStorageLock(const ReliableStorageLock<ClockType>& other) = default;

AcquiredLockId retry_until_take_lock() const;
std::optional<AcquiredLockId> try_take_lock() const;
std::optional<AcquiredLockId> try_extend_lock(AcquiredLockId acquired_lock) const;
ReliableLockResult try_take_lock() const;
ReliableLockResult try_extend_lock(AcquiredLockId acquired_lock) const;
void free_lock(AcquiredLockId acquired_lock) const;
timestamp timeout() const;
private:
std::optional<AcquiredLockId> try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const;
ReliableLockResult try_take_next_id(const std::vector<AcquiredLockId>& existing_locks, std::optional<AcquiredLockId> latest) const;
std::pair<std::vector<AcquiredLockId>, std::optional<AcquiredLockId>> get_all_locks() const;
timestamp get_expiration(RefKey lock_key) const;
void clear_old_locks(const std::vector<AcquiredLockId>& acquired_locks) const;
Expand Down
85 changes: 68 additions & 17 deletions cpp/arcticdb/util/test/test_reliable_storage_lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,23 @@
#include <arcticdb/util/test/gtest_utils.hpp>
#include <arcticdb/util/clock.hpp>

#include <arcticdb/storage/common.hpp>
#include <arcticdb/storage/config_resolvers.hpp>
#include <arcticdb/storage/library_index.hpp>
#include <arcticdb/storage/storage_factory.hpp>
#include <arcticdb/util/test/config_common.hpp>

#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/storage/s3/s3_storage.hpp>
#include <arcticdb/storage/s3/detail-inl.hpp>
#include <arcticdb/storage/mock/s3_mock_client.hpp>
#include <arcticdb/storage/mock/storage_mock_client.hpp>
#include <aws/core/Aws.h>

using namespace arcticdb;
using namespace lock;
namespace aa = arcticdb::async;
namespace as = arcticdb::storage;

// These tests test the actual implementation

Expand All @@ -33,45 +48,45 @@ TEST(ReliableStorageLock, SingleThreaded) {

// We take the first lock at 0 and it should not expire until 20
Clock::time_ = 0;
ASSERT_EQ(lock1.try_take_lock(), std::optional<uint64_t>{0});
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional<uint64_t>{0});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 5;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 10;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);
Clock::time_ = 19;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);

// Once the first lock has expired we can take a new lock with lock_id=1
Clock::time_ = 20;
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{1});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{1});
ASSERT_EQ(count_locks(), 2);

// We can extend the lock timeout at 25 to 35 and get an lock_id=2
Clock::time_ = 25;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(lock2.try_extend_lock(1), std::optional<uint64_t>{2});
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_extend_lock(1)), std::optional<uint64_t>{2});
ASSERT_EQ(count_locks(), 3);
Clock::time_ = 34;
ASSERT_EQ(lock1.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::nullopt);

// At time 35 the lock with lock_id=2 has expired and we can re-acquire the lock
Clock::time_ = 35;
ASSERT_EQ(lock1.try_take_lock(), std::optional<uint64_t>{3});
ASSERT_EQ(parse_reliable_storage_lock_result(lock1.try_take_lock()), std::optional<uint64_t>{3});
ASSERT_EQ(count_locks(), 4);
ASSERT_EQ(lock2.try_take_lock(), std::nullopt);
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::nullopt);

// And we can free the lock immediately to allow re-aquiring without waiting for timeout
lock1.free_lock(3);
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{4});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{4});
ASSERT_EQ(count_locks(), 5);

// But if we take a lock at 1000 all locks would have expired a 10xtimeout=100 ago, and we should clear all apart from latest lock_id=5
Clock::time_ = 1000;
ASSERT_EQ(lock2.try_take_lock(), std::optional<uint64_t>{5});
ASSERT_EQ(parse_reliable_storage_lock_result(lock2.try_take_lock()), std::optional<uint64_t>{5});
ASSERT_EQ(count_locks(), 1);
}

Expand Down Expand Up @@ -124,5 +139,41 @@ TEST(ReliableStorageLock, StressMultiThreaded) {
ASSERT_EQ(counter, threads);

// Also the lock should be free by the end (i.e. we can take a new lock)
ASSERT_EQ(lock.try_take_lock().has_value(), true);
ASSERT_EQ(parse_reliable_storage_lock_result(lock.try_take_lock()).has_value(), true);
}


TEST(ReliableStorageLock, NotImplementedException) {
using namespace arcticdb::async;

// Given
as::EnvironmentName environment_name{"research"};
as::StorageName storage_name("storage_name");
as::LibraryPath library_path{"a", "b"};
namespace ap = arcticdb::pipelines;


auto failed_config = proto::s3_storage::Config();
failed_config.set_use_mock_storage_for_testing(true);

auto failed_env_config = arcticdb::get_test_environment_config(
library_path, storage_name, environment_name, std::make_optional(failed_config));
auto failed_config_resolver = as::create_in_memory_resolver(failed_env_config);
as::LibraryIndex failed_library_index{environment_name, failed_config_resolver};

as::UserAuth user_auth{"abc"};
auto codec_opt = std::make_shared<arcticdb::proto::encoding::VariantCodec>();

auto lib = failed_library_index.get_library(library_path, as::OpenMode::WRITE, user_auth, storage::NativeVariantStorage());
auto store = std::make_shared<aa::AsyncStore<>>(aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1));

std::string sym = "test_lock";
std::string failureSymbol = storage::s3::MockS3Client::get_failure_trigger(sym, storage::StorageOperation::WRITE, Aws::S3::S3Errors::UNKNOWN);

ReliableStorageLock<> lock{StringId{failureSymbol}, store, ONE_SECOND};

// parse_reliable_storage_lock_result throws when we encounter an UnsupportedOperation
EXPECT_THROW({
parse_reliable_storage_lock_result(lock.try_take_lock());
}, LostReliableLock);
}
Loading