Skip to content

Commit

Permalink
Prepare EH for 5.3.1 release (#17021)
Browse files Browse the repository at this point in the history
* Prepare EH for 5.3.1 release
  • Loading branch information
srnagar authored Oct 30, 2020
1 parent 2c08f59 commit 9f268d8
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 20 deletions.
4 changes: 2 additions & 2 deletions eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
4 changes: 2 additions & 2 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ com.azure:azure-e2e;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-identity;1.1.3;1.2.0-beta.3
com.azure:azure-identity-perf;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-messaging-eventgrid;2.0.0-beta.3;2.0.0-beta.4
com.azure:azure-messaging-eventhubs;5.2.0;5.3.0
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.2.0;1.3.0
com.azure:azure-messaging-eventhubs;5.3.0;5.3.1
com.azure:azure-messaging-eventhubs-checkpointstore-blob;1.3.0;1.3.1
com.azure:azure-messaging-servicebus;7.0.0-beta.6;7.0.0-beta.7
com.azure:azure-search-documents;11.1.1;11.2.0-beta.3
com.azure:azure-search-perf;1.0.0-beta.1;1.0.0-beta.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History

## 1.3.1 (2020-10-30)
### Dependency Updates
- Update `azure-messaging-eventhubs` dependency to `5.3.1`.


## 1.3.0 (2020-10-12)
### Dependency Updates
- Update `azure-messaging-eventhubs` dependency to `5.3.0`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->
<version>1.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;current} -->

<name>Microsoft Azure client library for storing checkpoints in Storage Blobs</name>
<description>Library for using storing checkpoints in Storage Blobs</description>
Expand All @@ -40,7 +40,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
7 changes: 7 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release History

## 5.3.1 (2020-10-30)
### Bug fixes
- Eagerly close top-level client in `EventProcessorClient` after fetching the list of partitions instead of waiting until
the connection times out.
- Added checks for matching lost link name with the current link name before propagating the error in
`AmqpReceiveLinkProcessor`.

## 5.3.0 (2020-10-12)
### New Features
- Add `clientOptions` to `EventHubClientBuilder` to support for setting user's application id in the user-agent property
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ documentation][event_hubs_product_docs] | [Samples][sample_examples]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version>
<version>5.3.1</version>
</dependency>
```
[//]: # ({x-version-update-end})
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->
<version>5.3.1</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;current} -->

<name>Microsoft Azure client library for Event Hubs</name>
<description>Libraries built on Microsoft Azure Event Hubs</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,19 @@ void loadBalance() {
* Retrieve the list of partition ids from the Event Hub.
*/
Mono<List<String>> partitionsMono;
if (partitionsCache.get() == null || partitionsCache.get().isEmpty()) {
if (CoreUtils.isNullOrEmpty(partitionsCache.get())) {
// Call Event Hubs service to get the partition ids if the cache is empty
logger.info("Getting partitions from Event Hubs service for {}", eventHubName);
partitionsMono = eventHubAsyncClient
.getPartitionIds()
.timeout(Duration.ofMinutes(1))
.collectList();
} else {
partitionsMono = Mono.just(partitionsCache.get());
// we have the partitions, the client can be closed now
closeClient();
}


Mono.zip(partitionOwnershipMono, partitionsMono)
.flatMap(this::loadBalance)
.then()
Expand Down Expand Up @@ -170,8 +172,9 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
}
partitionsCache.set(partitionIds);
int numberOfPartitions = partitionIds.size();
logger.info("CheckpointStore returned {} ownership records", partitionOwnershipMap.size());
logger.info("Event Hubs service returned {} partitions", numberOfPartitions);
logger.info("Number of ownership records {}, number of partitions {}", partitionOwnershipMap.size(),
numberOfPartitions);

if (!isValid(partitionOwnershipMap)) {
// User data is corrupt.
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -275,6 +278,18 @@ private Mono<Void> loadBalance(final Tuple2<Map<String, PartitionOwnership>, Lis
});
}

/*
* Closes the client used by load balancer to get the partitions.
*/
private void closeClient() {
try {
// this is an idempotent operation, calling close on an already closed client is just a no-op.
this.eventHubAsyncClient.close();
} catch (Exception ex) {
logger.warning("Failed to close the client", ex);
}
}

/*
* This method renews the ownership of currently owned partitions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,19 @@ void stopAllPartitionPumps() {
* @param ownership The partition ownership information for which the connection state will be verified.
*/
void verifyPartitionConnection(PartitionOwnership ownership) {
if (partitionPumps.containsKey(ownership.getPartitionId())) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(ownership.getPartitionId());
String partitionId = ownership.getPartitionId();
if (partitionPumps.containsKey(partitionId)) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(partitionId);
if (consumerClient.isConnectionClosed()) {
logger.info("Connection closed for {}, partition {}. Removing the consumer.",
ownership.getEventHubName(), ownership.getPartitionId());
partitionPumps.remove(ownership.getPartitionId());
ownership.getEventHubName(), partitionId);
try {
partitionPumps.get(partitionId).close();
} catch (Exception ex) {
logger.warning(Messages.FAILED_CLOSE_CONSUMER_PARTITION, partitionId, ex);
} finally {
partitionPumps.remove(partitionId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.message.Message;
Expand Down Expand Up @@ -166,6 +169,21 @@ public void onNext(AmqpReceiveLink next) {
}
},
error -> {
if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
if (amqpException.getErrorCondition() == AmqpErrorCondition.LINK_STOLEN
&& amqpException.getContext() != null
&& amqpException.getContext() instanceof LinkErrorContext) {
LinkErrorContext errorContext = (LinkErrorContext) amqpException.getContext();
if (currentLink != null
&& !currentLink.getLinkName().equals(errorContext.getTrackingId())) {
logger.info("EntityPath[{}]: Link lost signal received for a link "
+ "that is not current. Ignoring the error. Current link {}, link lost {}",
entityPath, linkName, errorContext.getTrackingId());
return;
}
}
}
currentLink = null;
logger.warning("linkName[{}] entityPath[{}]. Error occurred in link.", linkName, entityPath);
onError(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
</dependency>

<dependency>
Expand Down
4 changes: 2 additions & 2 deletions sdk/spring/azure-spring-integration-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
<version>5.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs;dependency} -->
</dependency>

<!-- Contains Azure Storage Blobs checkpoint store when using EventProcessorClient -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.2.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
<version>1.3.0</version> <!-- {x-version-update;com.azure:azure-messaging-eventhubs-checkpointstore-blob;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down

0 comments on commit 9f268d8

Please sign in to comment.