Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare EH for 5.3.1 release #17021

Merged
merged 3 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ com.azure:azure-e2e;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-identity;1.1.3;1.2.0-beta.3
com.azure:azure-identity-perf;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-messaging-eventgrid;2.0.0-beta.3;2.0.0-beta.4
com.azure:azure-messaging-eventhubs;5.2.0;5.3.0
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.2.0;1.3.0
com.azure:azure-messaging-eventhubs;5.3.0;5.3.1
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.3.0;1.3.1
com.azure:azure-messaging-servicebus;7.0.0-beta.6;7.0.0-beta.7
com.azure:azure-search-documents;11.1.1;11.2.0-beta.3
com.azure:azure-search-perf;1.0.0-beta.1;1.0.0-beta.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History

## 1.3.1 (2020-10-30)
### Dependency Updates
- Update `azure-messaging-eventhubs` dependency to `5.3.1`.


## 1.3.0 (2020-10-12)
### Dependency Updates
- Update `azure-messaging-eventhubs` dependency to `5.3.0`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
srnagar marked this conversation as resolved.
Show resolved Hide resolved

<name>Microsoft Azure client library for storing checkpoints in Storage Blobs</name>
<description>Library for using storing checkpoints in Storage Blobs</description>
Expand All @@ -40,7 +40,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
7 changes: 7 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release History

## 5.3.1 (2020-10-30)
### Bug fixes
- Eagerly close top-level client in `EventProcessorClient` after fetching the list of partitions instead of waiting until
the connection times out.
- Added checks for matching lost link name with the current link name before propagating the error in
`AmqpReceiveLinkProcessor`.

## 5.3.0 (2020-10-12)
### New Features
- Add `clientOptions` to `EventHubClientBuilder` to support for setting user's application id in the user-agent property
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version>
<version>5.3.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->

<name>Microsoft Azure client library for Event Hubs</name>
<description>Libraries built on Microsoft Azure Event Hubs</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,19 @@ void loadBalance() {
* Retrieve the list of partition ids from the Event Hub.
*/
Mono<List<String>> partitionsMono;
if (partitionsCache.get() == null || partitionsCache.get().isEmpty()) {
if (CoreUtils.isNullOrEmpty(partitionsCache.get())) {
// Call Event Hubs service to get the partition ids if the cache is empty
logger.info("Getting partitions from Event Hubs service for {}", eventHubName);
partitionsMono = eventHubAsyncClient
.getPartitionIds()
.timeout(Duration.ofMinutes(1))
.collectList();
} else {
partitionsMono = Mono.just(partitionsCache.get());
// we have the partitions, the client can be closed now
closeClient();
}


Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
.then()
Expand Down Expand Up @@ -170,8 +172,9 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
}
partitionsCache.set(partitionIds);
int numberOfPartitions = partitionIds.size();
logger.info("CheckpointStore returned {} ownership records", partitionOwnershipMap.size());
logger.info("Event Hubs service returned {} partitions", numberOfPartitions);
logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(),
numberOfPartitions);

if (!isValid(partitionOwnershipMap)) {
// User data is corrupt.
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -275,6 +278,18 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
});
}

/*
* Closes the client used by load balancer to get the partitions.
*/
private void closeClient() {
try {
// this is an idempotent operation, calling close on an already closed client is just a no-op.
this.eventHubAsyncClient.close();
} catch (Exception ex) {
logger.warning("Failed to close the client", ex);
}
}

/*
* This method renews the ownership of currently owned partitions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,19 @@ void stopAllPartitionPumps() {
* @param ownership The partition ownership information for which the connection state will be verified.
*/
void verifyPartitionConnection(PartitionOwnership ownership) {
if (partitionPumps.containsKey(ownership.getPartitionId())) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(ownership.getPartitionId());
String partitionId = ownership.getPartitionId();
if (partitionPumps.containsKey(partitionId)) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(partitionId);
if (consumerClient.isConnectionClosed()) {
logger.info("Connection closed for {}, partition {}. Removing the consumer.",
ownership.getEventHubName(), ownership.getPartitionId());
partitionPumps.remove(ownership.getPartitionId());
ownership.getEventHubName(), partitionId);
try {
partitionPumps.get(partitionId).close();
} catch (Exception ex) {
logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex);
} finally {
partitionPumps.remove(partitionId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -166,6 +169,21 @@ public void onNext(AmqpReceiveLink next) {
}
},
error -> {
if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN
&& amqpException.getContext() != null
&& amqpException.getContext() instanceof LinkErrorContext) {
LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext();
if (currentLink != null
&& !currentLink.getLinkName().equals(errorContext.getTrackingId())) {
logger.info("EntityPath[{}]: Link lost signal received for a link "
+ "that is not current. Ignoring the error. Current link {}, link lost {}",
entityPath, linkName, errorContext.getTrackingId());
return;
}
}
}
currentLink = null;
logger.warning("linkName[{}] entityPath[{}]. Error occurred in link.", linkName, entityPath);
onError(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
srnagar marked this conversation as resolved.
Show resolved Hide resolved
</dependency>

<dependency>
Expand Down
4 changes: 2 additions & 2 deletions sdk/spring/azure-spring-integration-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
</dependency>

<!-- Contains Azure Storage Blobs checkpoint store when using EventProcessorClient -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down