Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull latest changes from azure-event-hubs-java #3474

Merged
merged 45 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6c781cd
Update Apache Proton-J dependency (0.29.0 --> 0.31.0) (#407)
sjkwak Dec 13, 2018
71f8562
PartitionReceiver - add a method that provides an EventPosition which…
sjkwak Dec 13, 2018
0dd058d
Support IsPartitionEmpty property for PartitionRuntimeInformation (#399)
sjkwak Dec 14, 2018
0b7ca03
Move setPrefetchCount API to the ReceiverOptions class from the Parti…
sjkwak Dec 18, 2018
d1b68d1
Fixes several issues in the reactor related components (#411)
sjkwak Dec 21, 2018
9d04a98
Implement comparable on EventData (#395)
Fokko Dec 21, 2018
c37c848
Update receive/send link creation logic and improve tracing (#414)
sjkwak Jan 2, 2019
fec5a03
Prep for releasing client 2.0.0 and EPH 2.2.0 (#415)
sjkwak Jan 2, 2019
9305f06
Ensure that links are closed when transport error occurrs (#417)
sjkwak Jan 5, 2019
e6ef7f0
Prep for releasing client 2.1.0 and EPH 2.3.0 (#418)
sjkwak Jan 10, 2019
11c68b0
Update prefetch sendflow logic and increment version for new release …
sjkwak Jan 10, 2019
a1fa1a5
Fix args for proxy auth call to Authenticator (#421)
JamesBirdsall Jan 10, 2019
5fbdbe4
Prepare EPH 2.3.4 release (#423)
sjkwak Jan 15, 2019
3ddd268
Prepare EPH 2.4.0 release (#423) (#424)
sjkwak Jan 15, 2019
120231b
Handle proton:io errors with meaningful error msg (#427)
JamesBirdsall Jan 29, 2019
5826288
Minor changes to lease scanner (#428)
JamesBirdsall Feb 6, 2019
6edab2c
Make EventData.SystemProperties completely public (#435)
JamesBirdsall Mar 8, 2019
07b4072
Digest Support: init first connection with null headers (#431)
mssfang Mar 11, 2019
f4827ab
Fix lease scanner issues when Storage unreachable (#434)
JamesBirdsall Mar 12, 2019
50f99ca
message receiver - fix null pointer error and ensure that receive lin…
sjkwak Apr 4, 2019
325d471
Update version numbers for release (#440)
JamesBirdsall Apr 5, 2019
b2d7507
Update prefetch count for a receiver (#441)
sjkwak Apr 9, 2019
f5da494
Fix an issue of creating multiple sessions for $management & $cbs cha…
sjkwak Apr 15, 2019
2e81d67
Merge commit 'f5da49467dd1a48a97c57f4ef5b7f2bc46e42b91' into mergeLat…
conniey Apr 27, 2019
d54d756
Running through java files and double checking changes
conniey Apr 27, 2019
d567ad9
Fix casing on test names
conniey Apr 27, 2019
6bba56e
Fixing checkstyle issues.
conniey Apr 27, 2019
49ab1a9
SpotBug fixes.
conniey Apr 27, 2019
a03491c
Last spotbug revert/fixes
conniey Apr 27, 2019
feefc38
Creating inner static class.
conniey Apr 27, 2019
b867794
Redo changes from before.
conniey Apr 27, 2019
61c0898
Undo old change.
conniey Apr 27, 2019
dee4f2e
Ignore testcases that hang.
conniey Apr 28, 2019
6ca1460
Add Ignore to another failing test.
conniey Apr 28, 2019
be9e486
Removing commented out dependency.
conniey Apr 28, 2019
cdcdebf
Moving increment out of if statement.
conniey Apr 28, 2019
c976035
Fix NullPointerException when there is no inner exception
conniey Apr 28, 2019
461a2ac
Enable fixed tests.
conniey Apr 28, 2019
69cb46a
Merge branch 'master' of https://github.com/azure/azure-sdk-for-java …
conniey Apr 29, 2019
eb1fdf2
Move parent node to the top of the file.
conniey Apr 29, 2019
4fb29e3
Update version numbers in spotbugs-reporting
conniey Apr 29, 2019
56829c5
Increasing wait time until event hub scheduler is completed.
conniey Apr 29, 2019
9955374
Rearrange imports.
conniey Apr 29, 2019
6064875
Remove unused imports.
conniey Apr 29, 2019
972775f
Merge branch 'master' into mergeLatestChanges
conniey Apr 30, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
<version>1.0.0</version>

<properties>
<azure-keyvault.version>1.2.0</azure-keyvault.version>
<azure-batch.version>5.0.1</azure-batch.version>
<azure-eventhubs.version>2.0.0</azure-eventhubs.version>
<azure-eventhubs.version>2.3.0</azure-eventhubs.version>
<azure-eventhubs-eph.version>2.5.0</azure-eventhubs-eph.version>
<azure-keyvault.version>1.2.0</azure-keyvault.version>
<azure-storage-blob.version>10.5.0</azure-storage-blob.version>
</properties>

Expand Down Expand Up @@ -67,7 +68,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>${azure-eventhubs.version}</version>
<version>${azure-eventhubs-eph.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
8 changes: 4 additions & 4 deletions eventhubs/data-plane/ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:

```XML
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.0</version>
</dependency>
```

Expand Down
10 changes: 5 additions & 5 deletions eventhubs/data-plane/PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:

```XML
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.0</version>
</dependency>
```

For different types of build environments, the latest released JAR files can also be [explicitly obtained from the
Expand Down
16 changes: 8 additions & 8 deletions eventhubs/data-plane/azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>2.2.0</version>
<version>2.5.0</version>

<name>Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)</name>
<description>EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -331,10 +332,17 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
(bp.getLeaseState() == LeaseState.LEASED)));
});
future = CompletableFuture.completedFuture(infos);
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | StorageException | NoSuchElementException e) {
Throwable effective = e;
if (e instanceof NoSuchElementException) {
// If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
// Strip the misleading NoSuchElementException to provide a meaningful error for the user.
effective = e.getCause();
}

TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
future = new CompletableFuture<List<BaseLease>>();
future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
future = new CompletableFuture<>();
future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
}

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -23,7 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;

/***
* The main class of event processor host.
* The main class of event processor host.
*/
public final class EventProcessorHost {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class);
Expand Down Expand Up @@ -268,15 +269,14 @@ public EventProcessorHost(
if (leaseManager == null) {
throw new IllegalArgumentException("Must provide an object which implements ILeaseManager");
}
// executorService argument is allowed to be null, that is the indication to use an internal threadpool.

// executorService argument is allowed to be null, that is the indication to use an internal threadpool.

// Normally will not be null because we're using the AzureStorage implementation.
// If it is null, we're using user-supplied implementation. Establish generic defaults
// in case the user doesn't provide an options object.
this.partitionManagerOptions = new PartitionManagerOptions();


if (executorService != null) {
// User has supplied an ExecutorService, so use that.
this.weOwnExecutor = false;
Expand Down Expand Up @@ -560,7 +560,7 @@ public Thread newThread(Runnable r) {
}

private String getNamePrefix() {
return String.format("[%s|%s|%s]-%s-",
return String.format(Locale.US, "[%s|%s|%s]-%s-",
this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
* Options affecting the behavior of the event processor host instance in general.
*/
Expand All @@ -25,6 +28,8 @@ public final class EventProcessorOptions {
return EventPosition.fromStartOfStream();
};

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class);

public EventProcessorOptions() {
}

Expand Down Expand Up @@ -112,7 +117,7 @@ public int getPrefetchCount() {
/***
* Sets the prefetch count for the underlying event hub client.
*
* The default is 500. This controls how many events are received in advance.
* The default is 300. This controls how many events are received in advance.
*
* @param prefetchCount The new prefetch count.
*/
Expand Down Expand Up @@ -210,7 +215,11 @@ void notifyOfException(String hostname, Exception exception, String action, Stri
// Capture handler so it doesn't get set to null between test and use
Consumer<ExceptionReceivedEventArgs> handler = this.exceptionNotificationHandler;
if (handler != null) {
handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
try {
handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
} catch (Exception e) {
TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.ConcurrentHashMap;

/***
* An ILeaseManager implementation based on an in-memory store.
* An ILeaseManager implementation based on an in-memory store.
*
* THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory
* only and not persisted in any way. In addition, it is only visible within the same process: multiple
Expand Down Expand Up @@ -46,11 +46,11 @@ public InMemoryLeaseManager() {
public void initialize(HostContext hostContext) {
this.hostContext = hostContext;
}

public void setLatency(long milliseconds) {
this.millisecondsLatency = milliseconds;
}

private void latency(String caller) {
if (this.millisecondsLatency > 0) {
try {
Expand Down Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<Void> deleteLeaseStore() {
latency("deleteLeaseStore");
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<CompleteLease> getLease(String partitionId) {
TRACE_LOGGER.debug(this.hostContext.withHost("getLease()"));
Expand All @@ -110,7 +110,7 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
latency("getAllLeasesStateInfo");
return CompletableFuture.completedFuture(infos);
}

@Override
public CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds) {
ArrayList<CompletableFuture<BaseLease>> createFutures = new ArrayList<CompletableFuture<BaseLease>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,21 @@ private Void scan(boolean isFirst) {
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
try {
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) -> {
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));

if ((e != null) && !(e instanceof ClosingException)) {
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
}

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
if (!this.getIsClosingOrClosed()) {
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds()
: this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
: this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
if (isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
Expand All @@ -301,9 +306,19 @@ private Void scan(boolean isFirst) {
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());
} catch (Exception e) {
TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e);
if (!this.getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
}
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private CompletableFuture<Void> openClientsRetryWrapper() {
// trace exceptions from the final attempt, or ReceiverDisconnectedException.
return retryResult.handleAsync((r, e) -> {
if (e == null) {
// IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
// IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
// meaning it is safe to set the handler and start calling IEventProcessor.onEvents.
this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private CompletableFuture<Boolean> stealLeases(List<BaseLease> stealThese) {

return allSteals;
}

private static class AcquisitionHolder {
private CompleteLease acquiredLease;

Expand Down
14 changes: 7 additions & 7 deletions eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-extensions</artifactId>
Expand All @@ -12,13 +19,6 @@
<description>Extensions built on Microsoft Azure Event Hubs</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
14 changes: 7 additions & 7 deletions eventhubs/data-plane/azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
Expand All @@ -12,13 +19,6 @@
<description>Libraries built on Microsoft Azure Event Hubs</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ static EventData create(final ByteBuffer buffer) {
* @see SystemProperties#getEnqueuedTime
*/
SystemProperties getSystemProperties();
void setSystemProperties(SystemProperties props);

class SystemProperties extends HashMap<String, Object> {
private static final long serialVersionUID = -2827050124966993723L;
Expand All @@ -155,6 +156,13 @@ public SystemProperties(final HashMap<String, Object> map) {
super(Collections.unmodifiableMap(map));
}

public SystemProperties(final long sequenceNumber, final Instant enqueuedTimeUtc, final String offset, final String partitionKey) {
this.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, sequenceNumber);
this.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, new Date(enqueuedTimeUtc.toEpochMilli()));
this.put(AmqpConstants.OFFSET_ANNOTATION_NAME, offset);
this.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, partitionKey);
}

public String getOffset() {
return this.getSystemProperty(AmqpConstants.OFFSET_ANNOTATION_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface PartitionReceiver {

int MINIMUM_PREFETCH_COUNT = 1;
int DEFAULT_PREFETCH_COUNT = 500;
int MAXIMUM_PREFETCH_COUNT = 2000;
int MAXIMUM_PREFETCH_COUNT = 8000;

long NULL_EPOCH = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public String getIdentifier() {
* EventHubs service will throw {@link QuotaExceededException} and will include this identifier.
* So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}.
* <p>
* </p>
*
* @param value string to identify {@link PartitionReceiver}
*/
Expand Down
Loading