Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded support for Eventhubs management family of APIs #5315

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
41a231d
Enabled multithreaded calls to eventhubs management APIs
LarryOsterman Feb 2, 2024
ab5f7fa
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 2, 2024
14ac359
Added partition client properties tests, removed unused variables in …
LarryOsterman Feb 2, 2024
72edbb8
Restructured properties APIs in eventhubs to simplify producer client…
LarryOsterman Feb 2, 2024
40dfecb
Clang fix
LarryOsterman Feb 2, 2024
1b7f2a6
Removed AMQP code which logged an incoming management message
LarryOsterman Feb 2, 2024
d1a41e7
Removed unused lambda capture fields.
LarryOsterman Feb 3, 2024
84a5429
Fixed test crash in LinkAttachDetach AMQP test
LarryOsterman Feb 5, 2024
8a4c96e
Added test cases for management authn failures
LarryOsterman Feb 5, 2024
2ac800f
clang-format
LarryOsterman Feb 5, 2024
99cc884
Don't emit body contents in AmqpMessage insertion operator
LarryOsterman Feb 6, 2024
9089926
Don't take numeric value parameters by value to ostream insertion ope…
LarryOsterman Feb 6, 2024
847296a
clang-format
LarryOsterman Feb 6, 2024
c90ca72
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 6, 2024
924744c
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 6, 2024
b31ccd9
Updated changelog to reflect changes in this PR
LarryOsterman Feb 6, 2024
64cdeb1
Updated eventhubs dependency to match reality
LarryOsterman Feb 7, 2024
1307c98
Pull request feedback
LarryOsterman Feb 8, 2024
5964ef9
Fixed test crashes in management tests
LarryOsterman Feb 8, 2024
51e432b
clang-format
LarryOsterman Feb 8, 2024
db550b9
Improved code coverage
LarryOsterman Feb 8, 2024
198af85
compiler didnt notice an impossible branch
LarryOsterman Feb 8, 2024
60318a3
clang-format
LarryOsterman Feb 8, 2024
a373ae5
Better code coverage
LarryOsterman Feb 8, 2024
29a9c47
clang fixes
LarryOsterman Feb 8, 2024
5209f1f
amqpvalue_create_described does not clone its inputs
LarryOsterman Feb 9, 2024
395e1c3
Added value based tests for enumeration stream inserters
LarryOsterman Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

### Breaking Changes

- `ClaimBasedSecurity::PutToken` throws `Azure::Core::Credentials::AuthenticationException` instead of `std::runtime_error`.
- Claims Based Security authentication now throws `Azure::Core::Credentials::AuthenticationException` instead of `std::runtime_error`.

### Bugs Fixed

- Fixed [#5284](https://github.com/Azure/azure-sdk-for-cpp/issues/5284).
- Fixed [#5297](https://github.com/Azure/azure-sdk-for-cpp/issues/5297). Enabled multiple simultaneous `ExecuteOperation` calls.
- [#5284](https://github.com/Azure/azure-sdk-for-cpp/issues/5284): [azure-identity][azure-messaging-eventhubs] Impossible to catch exception resulting in SIGABRT signal.
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
- [#5297](https://github.com/Azure/azure-sdk-for-cpp/issues/5297): Enabled multiple simultaneous `ExecuteOperation` calls.
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
- Fixed crash when Link Detach message is received while link is being destroyed.
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Mixed,
};

std::ostream& operator<<(std::ostream& os, SenderSettleMode const mode);
std::ostream& operator<<(std::ostream& os, SenderSettleMode mode);

enum class ReceiverSettleMode
{
First,
Second,
};
std::ostream& operator<<(std::ostream& os, ReceiverSettleMode const mode);
std::ostream& operator<<(std::ostream& os, ReceiverSettleMode mode);

}}}} // namespace Azure::Core::Amqp::_internal
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Failed,
InstanceClosed,
};
std::ostream& operator<<(std::ostream& os, CbsOperationResult const operationResult);
std::ostream& operator<<(std::ostream& os, CbsOperationResult operationResult);

enum class CbsOpenResult
{
Expand All @@ -27,7 +27,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Error,
Cancelled,
};
std::ostream& operator<<(std::ostream& os, CbsOpenResult const operationResult);
std::ostream& operator<<(std::ostream& os, CbsOpenResult operationResult);

enum class CbsTokenType
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Error,
};

std::ostream& operator<<(std::ostream& stream, ConnectionState const value);
std::ostream& operator<<(std::ostream& stream, ConnectionState value);

class Connection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Error,
};

std::ostream& operator<<(std::ostream& stream, LinkState const linkState);
std::ostream& operator<<(std::ostream& stream, LinkState linkState);

enum class LinkTransferResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Closing,
Error,
};
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState const state);
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState state);

class MessageReceiver;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Closing,
Error,
};
std::ostream& operator<<(std::ostream& stream, MessageSenderState const state);
std::ostream& operator<<(std::ostream& stream, MessageSenderState state);

class MessageSender;
class MessageSenderEvents {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*
* @returns the input ostream.
*/
std::ostream& operator<<(std::ostream& os, AmqpValueType const value);
std::ostream& operator<<(std::ostream& os, AmqpValueType value);

class AmqpArray;
class AmqpMap;
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
return std::make_tuple(cbsResult, result.StatusCode, result.Error.Description);
}
}
std::ostream& operator<<(std::ostream& os, CbsOperationResult const operationResult)
std::ostream& operator<<(std::ostream& os, CbsOperationResult operationResult)
{
switch (operationResult)
{
Expand All @@ -163,7 +163,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
return os;
}

std::ostream& operator<<(std::ostream& os, CbsOpenResult const openResult)
std::ostream& operator<<(std::ostream& os, CbsOpenResult openResult)
{
switch (openResult)
{
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/src/amqp/link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
#endif

std::ostream& operator<<(std::ostream& os, LinkState const linkState)
std::ostream& operator<<(std::ostream& os, LinkState linkState)
{
switch (linkState)
{
Expand Down
12 changes: 12 additions & 0 deletions sdk/core/azure-core-amqp/src/amqp/management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

_internal::ManagementOpenStatus ManagementClientImpl::Open(Context const& context)
{
std::unique_lock<std::mutex> lock(m_openCloseLock);
if (m_isOpen)
{
throw std::runtime_error("Management object is already open.");
}

try
{
/** Authentication needs to happen *before* the links are created.
Expand Down Expand Up @@ -261,7 +267,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

void ManagementClientImpl::Close(Context const& context)
{
std::unique_lock<std::mutex> lock(m_openCloseLock);
Log::Stream(Logger::Level::Verbose) << "ManagementClient::Close" << std::endl;
if (!m_isOpen)
{
throw std::runtime_error("Management object is not open.");
}

SetState(ManagementState::Closing);
if (m_messageSender && m_messageSenderOpen)
{
Expand Down
6 changes: 3 additions & 3 deletions sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {

namespace Azure { namespace Core { namespace Amqp { namespace _internal {

std::ostream& operator<<(std::ostream& stream, ReceiverSettleMode const settleMode)
std::ostream& operator<<(std::ostream& stream, ReceiverSettleMode settleMode)
{
switch (settleMode)
{
Expand Down Expand Up @@ -109,7 +109,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
}

std::string MessageReceiver::GetLinkName() const { return m_impl->GetLinkName(); }
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState const state)
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState state)
{
switch (state)
{
Expand Down Expand Up @@ -399,7 +399,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
"MESSAGE_RECEIVER_STATE_ERROR",
};

std::ostream& operator<<(std::ostream& stream, MESSAGE_RECEIVER_STATE const state)
std::ostream& operator<<(std::ostream& stream, MESSAGE_RECEIVER_STATE state)
{
if (state < sizeof(MESSAGE_RECEIVER_STATEStrings) / sizeof(MESSAGE_RECEIVER_STATEStrings[0]))
{
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core-amqp/src/amqp/message_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::ostream& operator<<(std::ostream& stream, SenderSettleMode const settleMode)
std::ostream& operator<<(std::ostream& stream, SenderSettleMode settleMode)
{
switch (settleMode)
{
Expand Down Expand Up @@ -66,7 +66,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::uint64_t MessageSender::GetMaxMessageSize() const { return m_impl->GetMaxMessageSize(); }
std::string MessageSender::GetLinkName() const { return m_impl->GetLinkName(); }
MessageSender::~MessageSender() noexcept {}
std::ostream& operator<<(std::ostream& stream, _internal::MessageSenderState const state)
std::ostream& operator<<(std::ostream& stream, _internal::MessageSenderState state)
{
switch (state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::shared_ptr<MessageSenderImpl> m_messageSender;
std::shared_ptr<MessageReceiverImpl> m_messageReceiver;
ManagementState m_state = ManagementState::Idle;
std::mutex m_openCloseLock;
bool m_isOpen{false};
bool m_messageSenderOpen{false};
bool m_messageReceiverOpen{false};
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core-amqp/src/models/amqp_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
namespace Azure { namespace Core { namespace Amqp { namespace Models {
namespace _detail {

std::ostream& operator<<(std::ostream& os, AMQP_TYPE const value)
std::ostream& operator<<(std::ostream& os, AMQP_TYPE value)
{
switch (value)
{
Expand Down Expand Up @@ -146,7 +146,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
} // namespace _detail

std::ostream& operator<<(std::ostream& os, AmqpValueType const value)
std::ostream& operator<<(std::ostream& os, AmqpValueType value)
{

switch (value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
private:
UniqueAmqpValueHandle m_value;
};
std::ostream& operator<<(std::ostream& os, AMQP_TYPE const value);
std::ostream& operator<<(std::ostream& os, AMQP_TYPE value);
std::ostream& operator<<(std::ostream& os, AMQP_VALUE const value);

}}}}} // namespace Azure::Core::Amqp::Models::_detail
1 change: 0 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

### Other Changes


## 1.0.0-beta.6 (2024-02-06)

### Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
producerOptions.Name = "Producer for LoadBalancerTest";
ProducerClient producerClient{connectionString, eventHubName, producerOptions};

#if TRUE
std::thread processEventsThread([&]() {
std::set<std::string> partitionsAcquired;
std::vector<std::thread> processEventsThreads;
Expand Down Expand Up @@ -160,43 +159,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
processor.Run(context);

processEventsThread.join();
#else
// Run the processor on a background thread and the test on the foreground.
processor.Start(context);

std::set<std::string> partitionsAcquired;
std::vector<std::thread> processEventsThreads;
// When we exit the process thread, cancel the context to unblock the processor.
// scope_guard onExit([&context] { context.Cancel(); });

WaitGroup waitGroup;
for (auto const& partitionId : eventHubProperties.PartitionIds)
{
std::shared_ptr<ProcessorPartitionClient> partitionClient
= processor.NextPartitionClient(context);
waitGroup.AddWaiter();
ASSERT_EQ(partitionsAcquired.find(partitionId), partitionsAcquired.end())
<< "No previous client for " << partitionClient->PartitionId();
processEventsThreads.push_back(
std::thread([&waitGroup, &producerClient, partitionClient, &context, this] {
scope_guard onExit([&] { waitGroup.CompleteWaiter(); });
ProcessEventsForLoadBalancerTest(producerClient, partitionClient, context);
}));
}
// Block until all the events have been processed.
waitGroup.Wait();

// And wait until all the threads have completed.
for (auto& thread : processEventsThreads)
{
if (thread.joinable())
{
thread.join();
}
}
// Stop the processor, we're done with the test.
processor.Stop();
#endif
}

void TestWithLoadBalancerSingleThreaded(Models::ProcessorStrategy processorStrategy)
Expand Down
Loading