Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature:Support_reconnect_backoff_interface #155

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import java.util.concurrent.ThreadPoolExecutor;

import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import okhttp3.OkHttpClient;

/**
Expand All @@ -20,4 +22,5 @@ public interface ClientBuilder {
ClientBuilder okHttpClient(OkHttpClient okHttpClient);
ClientBuilder messageListener(MessageListener messageListener);
ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);
ClientBuilder backoffStrategy(BackoffStrategy backoffStrategy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@
import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.common.util.StringUtil;

import java.lang.reflect.Constructor;
import java.util.ServiceLoader;
import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientBuilderImpl implements ClientBuilder {

Expand Down Expand Up @@ -79,4 +86,10 @@ public ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool) {
this.conf.setCallbackThreadPool(callbackThreadPool);
return this;
}

@Override
public ClientBuilder backoffStrategy(BackoffStrategy backoffStrategy) {
this.conf.setBackoffStrategy(backoffStrategy);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.crossoverjie.cim.client.sdk.impl;

import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -46,4 +48,7 @@ public static class Auth{

@JsonIgnore
private ReconnectCheck reconnectCheck = (client) -> true;

@JsonIgnore
private BackoffStrategy backoffStrategy = new RandomBackoff();
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ public void reconnect() throws Exception {

this.conf.getEvent().info("cim trigger reconnecting....");

// TODO: 2024/9/13 need a backoff interface
int random = (int) (Math.random() * 7 + 3);
TimeUnit.SECONDS.sleep(random);
this.conf.getBackoffStrategy().runBackoff();

// don't set State ready, because when connect success, the State will be set to ready automate.
connectServer(v -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.crossoverjie.cim.client.sdk.io.backoff;

import java.util.concurrent.TimeUnit;

/**
* @author:qjj
* @create: 2024-09-21 12:16
* @Description: backoff strategy interface
*/

public interface BackoffStrategy {
/**
* @return the backoff time in milliseconds
*/
long nextBackoff();

/**
* Run the backoff strategy
* @throws InterruptedException
*/
default void runBackoff() throws InterruptedException {
TimeUnit.SECONDS.sleep(nextBackoff());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.crossoverjie.cim.client.sdk.io.backoff;

import java.util.concurrent.TimeUnit;

/**
* @author:qjj
* @create: 2024-09-21 12:22
* @Description: random backoff strategy
*/

public class RandomBackoff implements BackoffStrategy {

@Override
public long nextBackoff() {
int random = (int) (Math.random() * 7 + 3);
return random;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.crossoverjie.cim.client.sdk;

import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
Expand Down Expand Up @@ -224,11 +225,13 @@ public void testReconnect() throws Exception {
.userName(zs)
.userId(zsId)
.build();
var backoffStrategy = new RandomBackoff();

@Cleanup
Client client1 = Client.builder()
.auth(auth1)
.routeUrl(routeUrl)
.backoffStrategy(backoffStrategy)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state = client1.getState();
Expand All @@ -242,6 +245,7 @@ public void testReconnect() throws Exception {
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.backoffStrategy(backoffStrategy)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.crossoverjie.cim.client.sdk.Client;
import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.client.service.ShutDownSign;
import com.crossoverjie.cim.client.service.impl.MsgCallBackListener;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor ca
.okHttpClient(okHttpClient)
.messageListener(new MsgCallBackListener(msgLogger))
.callbackThreadPool(callbackThreadPool)
.backoffStrategy(new RandomBackoff())
.build();
}

Expand Down
74 changes: 37 additions & 37 deletions cim-client/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
spring:
application:
name: cim-client

# web port
server:
port: 8082

logging:
level:
root: error

# enable swagger
springdoc:
swagger-ui:
enabled: true

# log path
cim:
msg:
logger:
path: /opt/logs/cim/
route:
url: http://localhost:8083 # route url suggested that this is Nginx address
user: # cim userId and userName
id: 1725714450795
userName: cj4
callback:
thread:
queue:
size: 2
pool:
size: 2
heartbeat:
time: 60 # cim heartbeat time (seconds)
reconnect:
count: 3
spring:
application:
name: cim-client

# web port
server:
port: 8082

logging:
level:
root: error

# enable swagger
springdoc:
swagger-ui:
enabled: true

# log path
cim:
msg:
logger:
path: /opt/logs/cim/
route:
url: http://localhost:8083 # route url suggested that this is Nginx address
user: # cim userId and userName
id: 1725714450795
userName: cj4
callback:
thread:
queue:
size: 2
pool:
size: 2
heartbeat:
time: 60 # cim heartbeat time (seconds)
reconnect:
count: 3