Skip to content

Commit

Permalink
Merge branch 'flolom-feature/okhttp-support'
Browse files Browse the repository at this point in the history
  • Loading branch information
NaikSoftware committed Jun 17, 2017
2 parents 2045f63 + 631a4e3 commit cbe0cfa
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 10 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Check out the full example server https://github.com/NaikSoftware/stomp-protocol

**Basic usage**
``` java
import org.java_websocket.WebSocket;

private StompClient mStompClient;

// ...
Expand All @@ -89,7 +91,8 @@ See the full example https://github.com/NaikSoftware/StompProtocolAndroid/tree/m
Method `Stomp.over` consume class for create connection as first parameter.
You must provide dependency for lib and pass class.
At now supported connection providers:
- WebSocket.class ('org.java-websocket:Java-WebSocket:1.3.0')
- `org.java_websocket.WebSocket.class` ('org.java-websocket:Java-WebSocket:1.3.0')
- `okhttp3.WebSocket.class` ('com.squareup.okhttp3:okhttp:3.8.0')

You can add own connection provider. Just implement interface `ConnectionProvider`.
If you implement new provider, please create pull request :)
Expand Down
1 change: 1 addition & 0 deletions lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
compile 'io.reactivex:rxjava:1.2.0'
// Supported transports
provided "org.java-websocket:java-websocket:1.3.2"
provided 'com.squareup.okhttp3:okhttp:3.8.0'
}

task sourcesJar(type: Jar) {
Expand Down
168 changes: 168 additions & 0 deletions lib/src/main/java/ua/naiksoftware/stomp/OkHttpConnectionProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package ua.naiksoftware.stomp;

import android.util.Log;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import rx.Observable;
import rx.Subscriber;

/* package */ class OkHttpConnectionProvider implements ConnectionProvider {

private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();

private final String mUri;
private final Map<String, String> mConnectHttpHeaders;
private final OkHttpClient mOkHttpClient;

private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private final List<Subscriber<? super String>> mMessagesSubscribers;

private WebSocket openedSocked;


/* package */ OkHttpConnectionProvider(String uri, Map<String, String> connectHttpHeaders, OkHttpClient okHttpClient) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleSubscribers = new ArrayList<>();
mMessagesSubscribers = new ArrayList<>();
mOkHttpClient = okHttpClient;
}

@Override
public Observable<String> messages() {
Observable<String> observable = Observable.<String>create(subscriber -> {
mMessagesSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super String>> iterator = mMessagesSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}

if (mMessagesSubscribers.size() < 1) {
Log.d(TAG, "Close web socket connection now in thread " + Thread.currentThread());
openedSocked.close(1000, "");
openedSocked = null;
}
});

createWebSocketConnection();
return observable;
}

private void createWebSocketConnection() {

if (openedSocked != null) {
throw new IllegalStateException("Already have connection to web socket");
}

Request.Builder requestBuilder = new Request.Builder()
.url(mUri);

addConnectionHeadersToBuilder(requestBuilder, mConnectHttpHeaders);

openedSocked = mOkHttpClient.newWebSocket(requestBuilder.build(),
new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
LifecycleEvent openEvent = new LifecycleEvent(LifecycleEvent.Type.OPENED);

TreeMap<String, String> headersAsMap = headersAsMap(response);

openEvent.setHandshakeResponseHeaders(headersAsMap);
emitLifecycleEvent(openEvent);
}

@Override
public void onMessage(WebSocket webSocket, String text) {
emitMessage(text);
}

@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
emitMessage(bytes.utf8());
}

@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.CLOSED));
openedSocked = null;
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
emitLifecycleEvent(new LifecycleEvent(LifecycleEvent.Type.ERROR, new Exception(t)));
}
}

);
}

@Override
public Observable<Void> send(String stompMessage) {
return Observable.create(subscriber -> {
if (openedSocked == null) {
subscriber.onError(new IllegalStateException("Not connected yet"));
} else {
Log.d(TAG, "Send STOMP message: " + stompMessage);
openedSocked.send(stompMessage);
subscriber.onCompleted();
}
});
}

@Override
public Observable<LifecycleEvent> getLifecycleReceiver() {
return Observable.<LifecycleEvent>create(subscriber -> {
mLifecycleSubscribers.add(subscriber);

}).doOnUnsubscribe(() -> {
Iterator<Subscriber<? super LifecycleEvent>> iterator = mLifecycleSubscribers.iterator();
while (iterator.hasNext()) {
if (iterator.next().isUnsubscribed()) iterator.remove();
}
});
}

private TreeMap<String, String> headersAsMap(Response response) {
TreeMap<String, String> headersAsMap = new TreeMap<>();
Headers headers = response.headers();
for (String key : headers.names()) {
headersAsMap.put(key, headers.get(key));
}
return headersAsMap;
}

private void addConnectionHeadersToBuilder(Request.Builder requestBuilder, Map<String, String> mConnectHttpHeaders) {
for (Map.Entry<String, String> headerEntry : mConnectHttpHeaders.entrySet()) {
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
}
}

private void emitLifecycleEvent(LifecycleEvent lifecycleEvent) {
Log.d(TAG, "Emit lifecycle event: " + lifecycleEvent.getType().name());
for (Subscriber<? super LifecycleEvent> subscriber : mLifecycleSubscribers) {
subscriber.onNext(lifecycleEvent);
}
}

private void emitMessage(String stompMessage) {
Log.d(TAG, "Emit STOMP message: " + stompMessage);
for (Subscriber<? super String> subscriber : mMessagesSubscribers) {
subscriber.onNext(stompMessage);
}
}
}
54 changes: 50 additions & 4 deletions lib/src/main/java/ua/naiksoftware/stomp/Stomp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import java.util.Map;

import okhttp3.OkHttpClient;
import ua.naiksoftware.stomp.client.StompClient;

/**
* Supported overlays:
* - org.java_websocket.WebSocket ('org.java-websocket:Java-WebSocket:1.3.0')
* - okhttp3.WebSocket ('com.squareup.okhttp3:okhttp:3.8.0')
*
* You can add own relay, just implement ConnectionProvider for you stomp transport,
* such as web socket.
Expand All @@ -18,7 +20,7 @@
public class Stomp {

public static StompClient over(Class clazz, String uri) {
return over(clazz, uri, null);
return over(clazz, uri, null, null);
}

/**
Expand All @@ -29,14 +31,58 @@ public static StompClient over(Class clazz, String uri) {
* @return StompClient for receiving and sending messages. Call #StompClient.connect
*/
public static StompClient over(Class clazz, String uri, Map<String, String> connectHttpHeaders) {
if (clazz == WebSocket.class) {
return createStompClient(new WebSocketsConnectionProvider(uri, connectHttpHeaders));
}
return over(clazz, uri, connectHttpHeaders, null);
}

/**
* {@code webSocketClient} can accept the following type of clients:
* <ul>
* <li>{@code org.java_websocket.WebSocket}: cannot accept an existing client</li>
* <li>{@code okhttp3.WebSocket}: can accept a non-null instance of {@code okhttp3.OkHttpClient}</li>
* </ul>
* @param clazz class for using as transport
* @param uri URI to connect
* @param connectHttpHeaders HTTP headers, will be passed with handshake query, may be null
* @param webSocketClient Existing client that will be used to open the WebSocket connection, may be null to use default client
* @return StompClient for receiving and sending messages. Call #StompClient.connect
*/
public static StompClient over(Class clazz, String uri, Map<String, String> connectHttpHeaders, Object webSocketClient) {
try {
if (Class.forName("org.java_websocket.WebSocket") != null && clazz == WebSocket.class) {

if (webSocketClient != null) {
throw new IllegalArgumentException("You cannot pass a webSocketClient with 'org.java_websocket.WebSocket'. use null instead.");
}

return createStompClient(new WebSocketsConnectionProvider(uri, connectHttpHeaders));
}
} catch (ClassNotFoundException e) {}
try {
if (Class.forName("okhttp3.WebSocket") != null && clazz == okhttp3.WebSocket.class) {

OkHttpClient okHttpClient = getOkHttpClient(webSocketClient);

return createStompClient(new OkHttpConnectionProvider(uri, connectHttpHeaders, okHttpClient));
}
} catch (ClassNotFoundException e) {}

throw new RuntimeException("Not supported overlay transport: " + clazz.getName());
}

private static StompClient createStompClient(ConnectionProvider connectionProvider) {
return new StompClient(connectionProvider);
}

private static OkHttpClient getOkHttpClient(Object webSocketClient) {
if (webSocketClient != null) {
if (webSocketClient instanceof OkHttpClient) {
return (OkHttpClient) webSocketClient;
} else {
throw new IllegalArgumentException("You must pass a non-null instance of an 'okhttp3.OkHttpClient'. Or pass null to use a default websocket client.");
}
} else {
// default http client
return new OkHttpClient();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ua.naiksoftware.stomp;

import android.os.Looper;
import android.util.Log;

import org.java_websocket.WebSocket;
Expand All @@ -27,23 +26,25 @@
/**
* Created by naik on 05.05.16.
*/
public class WebSocketsConnectionProvider implements ConnectionProvider {
/* package */ class WebSocketsConnectionProvider implements ConnectionProvider {

private static final String TAG = WebSocketsConnectionProvider.class.getSimpleName();

private final String mUri;
private final Map<String, String> mConnectHttpHeaders;

private final List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private final List<Subscriber<? super String>> mMessagesSubscribers;

private WebSocketClient mWebSocketClient;
private List<Subscriber<? super LifecycleEvent>> mLifecycleSubscribers;
private List<Subscriber<? super String>> mMessagesSubscribers;
private boolean haveConnection;
private TreeMap<String, String> mServerHandshakeHeaders;

/**
* Support UIR scheme ws://host:port/path
* @param connectHttpHeaders may be null
*/
public WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
/* package */ WebSocketsConnectionProvider(String uri, Map<String, String> connectHttpHeaders) {
mUri = uri;
mConnectHttpHeaders = connectHttpHeaders != null ? connectHttpHeaders : new HashMap<>();
mLifecycleSubscribers = new ArrayList<>();
Expand Down

0 comments on commit cbe0cfa

Please sign in to comment.