Skip to content

Commit

Permalink
rerun maven in webpubsub client sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
mssfang committed Dec 3, 2024
1 parent fe68f8f commit 4505965
Show file tree
Hide file tree
Showing 25 changed files with 284 additions and 427 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,23 @@ public String getConnectionId() {
*/
public synchronized void start() {
asyncClient.start(() -> {
this.asyncClient.receiveGroupMessageEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveGroupMessageEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(GROUP_MESSAGE_EVENT, event));
this.asyncClient.receiveServerMessageEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveServerMessageEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(SERVER_MESSAGE_EVENT, event));
this.asyncClient.receiveConnectedEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveConnectedEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(CONNECT_EVENT, event));
this.asyncClient.receiveDisconnectedEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveDisconnectedEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(DISCONNECT_EVENT, event));
this.asyncClient.receiveStoppedEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveStoppedEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(STOPPED_EVENT, event));
this.asyncClient.receiveRejoinGroupFailedEvents().publishOn(Schedulers.boundedElastic())
this.asyncClient.receiveRejoinGroupFailedEvents()
.publishOn(Schedulers.boundedElastic())
.subscribe(event -> eventHandlerCollection.fireEvent(REJOIN_GROUP_FAILED_EVENT, event));
}).block();
}
Expand All @@ -124,7 +130,6 @@ public synchronized void stop() {
asyncClient.stop().block();
}


/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -255,8 +260,7 @@ public void removeOnStoppedEventHandler(Consumer<StoppedEvent> onStoppedEventHan
*
* @param onRejoinGroupFailedEventHandler the event handler for RejoinGroupFailedEvent.
*/
public void addOnRejoinGroupFailedEventHandler(
Consumer<RejoinGroupFailedEvent> onRejoinGroupFailedEventHandler) {
public void addOnRejoinGroupFailedEventHandler(Consumer<RejoinGroupFailedEvent> onRejoinGroupFailedEventHandler) {
eventHandlerCollection.addEventHandler(REJOIN_GROUP_FAILED_EVENT, onRejoinGroupFailedEventHandler);
}

Expand All @@ -265,8 +269,8 @@ public void addOnRejoinGroupFailedEventHandler(
*
* @param onRejoinGroupFailedEventHandler the event handler for RejoinGroupFailedEvent.
*/
public void removeOnRejoinGroupFailedEventHandler(
Consumer<RejoinGroupFailedEvent> onRejoinGroupFailedEventHandler) {
public void
removeOnRejoinGroupFailedEventHandler(Consumer<RejoinGroupFailedEvent> onRejoinGroupFailedEventHandler) {
eventHandlerCollection.removeEventHandler(REJOIN_GROUP_FAILED_EVENT, onRejoinGroupFailedEventHandler);
}

Expand Down Expand Up @@ -407,7 +411,7 @@ public WebPubSubResult sendToGroup(String group, BinaryData content, WebPubSubDa
* @return the result.
*/
public WebPubSubResult sendToGroup(String group, BinaryData content, WebPubSubDataFormat dataFormat,
SendToGroupOptions options) {
SendToGroupOptions options) {
return asyncClient.sendToGroup(group, content, dataFormat, options).block();
}

Expand Down Expand Up @@ -439,14 +443,15 @@ public WebPubSubResult sendEvent(String eventName, BinaryData content, WebPubSub
* @return the result.
*/
public WebPubSubResult sendEvent(String eventName, BinaryData content, WebPubSubDataFormat dataFormat,
SendEventOptions options) {
SendEventOptions options) {
return asyncClient.sendEvent(eventName, content, dataFormat, options).block();
}

// following API is for testing
WebPubSubClientState getClientState() {
return this.asyncClient.getClientState();
}

WebSocketSession getWebsocketSession() {
return this.asyncClient.getWebsocketSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Either the credential through {@link #credential(WebPubSubClientCredential)},
* or the client access URL through {@link #clientAccessUrl(String)}.
*/
@ServiceClientBuilder(serviceClients = {WebPubSubClient.class})
@ServiceClientBuilder(serviceClients = { WebPubSubClient.class })
public final class WebPubSubClientBuilder implements ConfigurationTrait<WebPubSubClientBuilder> {

private static final ClientLogger LOGGER = new ClientLogger(WebPubSubClientBuilder.class);
Expand Down Expand Up @@ -227,17 +227,16 @@ WebPubSubAsyncClient buildAsyncClient() {
// credential
Supplier<String> clientAccessUrlSuplier;
if (credential != null && clientAccessUrl != null) {
throw LOGGER.logExceptionAsError(
new IllegalStateException("Both credential and clientAccessUrl have been set. "
throw LOGGER
.logExceptionAsError(new IllegalStateException("Both credential and clientAccessUrl have been set. "
+ "Set null to one of them to clear that option."));
} else if (credential != null) {
clientAccessUrlSuplier = credential.getClientAccessUrlSupplier();
} else if (clientAccessUrl != null) {
clientAccessUrlSuplier = () -> clientAccessUrl;
} else {
throw LOGGER.logExceptionAsError(
new IllegalStateException("Credentials have not been set. "
+ "They can be set using: clientAccessUrl(String), credential(WebPubSubClientCredential)"));
throw LOGGER.logExceptionAsError(new IllegalStateException("Credentials have not been set. "
+ "They can be set using: clientAccessUrl(String), credential(WebPubSubClientCredential)"));
}

// user-agent
Expand All @@ -247,9 +246,7 @@ WebPubSubAsyncClient buildAsyncClient() {
String userAgent = UserAgentUtil.toUserAgentString(applicationId, clientName, clientVersion,
configuration == null ? Configuration.getGlobalConfiguration() : configuration);

return new WebPubSubAsyncClient(
webSocketClient, clientAccessUrlSuplier, webPubSubProtocol,
applicationId, userAgent,
retryStrategy, autoReconnect, autoRestoreGroup);
return new WebPubSubAsyncClient(webSocketClient, clientAccessUrlSuplier, webPubSubProtocol, applicationId,
userAgent, retryStrategy, autoReconnect, autoRestoreGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@

package com.azure.messaging.webpubsub.client.implementation;


import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings({ "rawtypes", "unchecked" })
public final class EventHandlerCollection {

private final ConcurrentMap<String, List> collection = new ConcurrentHashMap<>();

public <T> void addEventHandler(String type, Consumer<T> eventHandler) {
List<Consumer<T>> listeners =
collection.computeIfAbsent(type, k -> new CopyOnWriteArrayList<Consumer<T>>());
List<Consumer<T>> listeners = collection.computeIfAbsent(type, k -> new CopyOnWriteArrayList<Consumer<T>>());
listeners.add(eventHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public static Map<String, Object> createContextWithConnectionId(String applicati
return globalLoggingContext;
}

public static LoggingEventBuilder addSignalTypeAndResult(LoggingEventBuilder logBuilder, SignalType signalType, Sinks.EmitResult result) {
public static LoggingEventBuilder addSignalTypeAndResult(LoggingEventBuilder logBuilder, SignalType signalType,
Sinks.EmitResult result) {
return logBuilder.addKeyValue("signalType", signalType).addKeyValue("emitResult", result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,14 @@ private static Object parseMessage(JsonNode jsonNode) throws IOException {
BinaryData data = parseData(jsonNode, type);
switch (jsonNode.get("from").asText()) {
case "group":
msg = new GroupDataMessage(
jsonNode.get("group").asText(),
type,
data,
msg = new GroupDataMessage(jsonNode.get("group").asText(), type, data,
jsonNode.has("fromUserId") ? jsonNode.get("fromUserId").asText() : null,
jsonNode.has("sequenceId") ? jsonNode.get("sequenceId").asLong() : null
);
jsonNode.has("sequenceId") ? jsonNode.get("sequenceId").asLong() : null);
break;

case "server":
msg = new ServerDataMessage(
type,
data,
jsonNode.has("sequenceId") ? jsonNode.get("sequenceId").asLong() : null
);
msg = new ServerDataMessage(type, data,
jsonNode.has("sequenceId") ? jsonNode.get("sequenceId").asLong() : null);
break;

default:
Expand Down Expand Up @@ -104,14 +97,12 @@ private static Object parseSystem(JsonNode jsonNode) throws IOException {
}

private static WebPubSubMessage parseAck(JsonNode jsonNode) {
AckMessage ackMessage = new AckMessage()
.setAckId(jsonNode.get("ackId").asLong())
.setSuccess(jsonNode.get("success").asBoolean());
AckMessage ackMessage
= new AckMessage().setAckId(jsonNode.get("ackId").asLong()).setSuccess(jsonNode.get("success").asBoolean());
if (jsonNode.has("error")) {
JsonNode errorNode = jsonNode.get("error");
ackMessage.setError(new AckResponseError(
errorNode.get("name").asText(),
errorNode.get("message").asText()));
ackMessage
.setError(new AckResponseError(errorNode.get("name").asText(), errorNode.get("message").asText()));
}
return ackMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public final class AckMessage extends WebPubSubMessage {

private AckResponseError error;


public long getAckId() {
return ackId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public final class GroupDataMessage {
* @param sequenceId the sequenceId.
*/
public GroupDataMessage(String group, WebPubSubDataFormat dataType, BinaryData data, String fromUserId,
Long sequenceId) {
Long sequenceId) {
this.data = data;
this.dataType = dataType;
this.fromUserId = fromUserId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
public interface WebSocketClient {

WebSocketSession connectToServer(ClientEndpointConfiguration cec, String path,
AtomicReference<ClientLogger> loggerReference,
Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler,
Consumer<CloseReason> closeHandler);
AtomicReference<ClientLogger> loggerReference, Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler, Consumer<CloseReason> closeHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ final class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final MessageDecoder messageDecoder;
private final Consumer<Object> messageHandler;

WebSocketClientHandler(WebSocketClientHandshaker handshaker,
AtomicReference<ClientLogger> loggerReference,
MessageDecoder messageDecoder,
Consumer<Object> messageHandler) {
WebSocketClientHandler(WebSocketClientHandshaker handshaker, AtomicReference<ClientLogger> loggerReference,
MessageDecoder messageDecoder, Consumer<Object> messageHandler) {
this.handshaker = handshaker;
this.loggerReference = loggerReference;
this.messageDecoder = messageDecoder;
Expand Down Expand Up @@ -72,16 +70,16 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw loggerReference.get().logExceptionAsError(new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'));
throw loggerReference.get()
.logExceptionAsError(new IllegalStateException("Unexpected FullHttpResponse (getStatus="
+ response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'));
}

WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
// Text
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
loggerReference.get().atVerbose()
.addKeyValue("text", textFrame.text())
.log("Received TextWebSocketFrame");
loggerReference.get().atVerbose().addKeyValue("text", textFrame.text()).log("Received TextWebSocketFrame");
Object wpsMessage = messageDecoder.decode(textFrame.text());
messageHandler.accept(wpsMessage);
} else if (frame instanceof PingWebSocketFrame) {
Expand All @@ -95,7 +93,8 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
} else if (frame instanceof CloseWebSocketFrame) {
// Close
CloseWebSocketFrame closeFrame = (CloseWebSocketFrame) frame;
loggerReference.get().atVerbose()
loggerReference.get()
.atVerbose()
.addKeyValue("statusCode", closeFrame.statusCode())
.addKeyValue("reasonText", closeFrame.reasonText())
.log("Received CloseWebSocketFrame");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
public final class WebSocketClientNettyImpl implements WebSocketClient {
@Override
public WebSocketSession connectToServer(ClientEndpointConfiguration cec, String path,
AtomicReference<ClientLogger> loggerReference,
Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler,
Consumer<CloseReason> closeHandler) {
AtomicReference<ClientLogger> loggerReference, Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler, Consumer<CloseReason> closeHandler) {
try {
WebSocketSessionNettyImpl session = new WebSocketSessionNettyImpl(cec, path, loggerReference, messageHandler, openHandler, closeHandler);
WebSocketSessionNettyImpl session
= new WebSocketSessionNettyImpl(cec, path, loggerReference, messageHandler, openHandler, closeHandler);
session.connect();
return session;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public interface WebSocketSession {

// following API is for testing
void sendTextAsync(String text, Consumer<SendResult> handler);

void closeSocket();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,14 @@ protected void initChannel(SocketChannel ch) {
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
p.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE,
handler);
}
}

WebSocketSessionNettyImpl(ClientEndpointConfiguration cec, String path,
AtomicReference<ClientLogger> loggerReference,
Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler,
Consumer<CloseReason> closeHandler) {
AtomicReference<ClientLogger> loggerReference, Consumer<Object> messageHandler,
Consumer<WebSocketSession> openHandler, Consumer<CloseReason> closeHandler) {
this.path = path;
this.loggerReference = loggerReference;
this.messageEncoder = cec.getMessageEncoder();
Expand Down Expand Up @@ -131,12 +126,10 @@ void connect() throws URISyntaxException, SSLException, InterruptedException, Ex

group = new NioEventLoopGroup();

handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, protocol, true,
handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, protocol, true,
new DefaultHttpHeaders().add(HttpHeaderName.USER_AGENT.getCaseInsensitiveName(), userAgent));

clientHandler =
new WebSocketClientHandler(handshaker, loggerReference, messageDecoder, messageHandler);
clientHandler = new WebSocketClientHandler(handshaker, loggerReference, messageDecoder, messageHandler);

Bootstrap b = new Bootstrap();
b.group(group)
Expand Down Expand Up @@ -181,8 +174,7 @@ void connect() throws URISyntaxException, SSLException, InterruptedException, Ex

@Override
public boolean isOpen() {
return ch != null && ch.isOpen()
&& handshaker != null && handshaker.isHandshakeComplete();
return ch != null && ch.isOpen() && handshaker != null && handshaker.isHandshakeComplete();
}

@Override
Expand All @@ -199,9 +191,7 @@ public void sendObjectAsync(Object data, Consumer<SendResult> handler) {
public void sendTextAsync(String text, Consumer<SendResult> handler) {
if (ch != null && ch.isOpen()) {
TextWebSocketFrame frame = new TextWebSocketFrame(text);
loggerReference.get().atVerbose()
.addKeyValue("text", frame.text())
.log("Send TextWebSocketFrame");
loggerReference.get().atVerbose().addKeyValue("text", frame.text()).log("Send TextWebSocketFrame");
ch.writeAndFlush(frame).addListener(future -> {
if (future.isSuccess()) {
handler.accept(new SendResult());
Expand Down Expand Up @@ -241,7 +231,8 @@ public void close() {
clientHandler.setClientCloseCallbackFuture(closeCallbackFuture);

CloseWebSocketFrame closeFrame = new CloseWebSocketFrame(WebSocketCloseStatus.NORMAL_CLOSURE);
loggerReference.get().atVerbose()
loggerReference.get()
.atVerbose()
.addKeyValue("statusCode", closeFrame.statusCode())
.addKeyValue("reasonText", closeFrame.reasonText())
.log("Send CloseWebSocketFrame");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public final class GroupMessageEvent {
* @param sequenceId the sequenceId.
*/
public GroupMessageEvent(String group, BinaryData data, WebPubSubDataFormat dataFormat, String fromUserId,
Long sequenceId) {
Long sequenceId) {
this.data = data;
this.dataFormat = dataFormat;
this.fromUserId = fromUserId;
Expand Down
Loading

0 comments on commit 4505965

Please sign in to comment.