diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java
index 35c5557ea62..dd7334b9820 100644
--- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java
@@ -159,6 +159,7 @@ private void cleanTimeoutCache() {
* @param timeDiff 缓存对象保存时间 millis
*/
public void addCache(Object key, Object value, Long timeDiff) {
+ removeCache(key);
if (timeDiff == null) {
timeDiff = DEFAULT_CACHE_TIMEOUT;
}
diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/ssh/CommonSshClient.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/ssh/CommonSshClient.java
index b9223ec9933..f06ec8701c8 100644
--- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/ssh/CommonSshClient.java
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/ssh/CommonSshClient.java
@@ -14,23 +14,25 @@
@Slf4j
public class CommonSshClient {
- private static SshClient sshClient;
+ private static final SshClient SSH_CLIENT;
static {
- sshClient = SshClient.setUpDefaultClient();
+ SSH_CLIENT = SshClient.setUpDefaultClient();
// 接受所有服务端公钥校验,会打印warn日志 Server at {} presented unverified {} key: {}
AcceptAllServerKeyVerifier verifier = AcceptAllServerKeyVerifier.INSTANCE;
- sshClient.setServerKeyVerifier(verifier);
- // 设置链接保活心跳2000毫秒一次, 客户端等待保活心跳响应超时时间300_0000毫秒
+ SSH_CLIENT.setServerKeyVerifier(verifier);
+ // 设置链接保活心跳2000毫秒一次, 客户端等待保活心跳响应超时时间300_000毫秒
PropertyResolverUtils.updateProperty(
- sshClient, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 2000);
+ SSH_CLIENT, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 2000);
PropertyResolverUtils.updateProperty(
- sshClient, CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getName(), 300_000);
- sshClient.start();
+ SSH_CLIENT, CoreModuleProperties.HEARTBEAT_REPLY_WAIT.getName(), 300_000);
+ PropertyResolverUtils.updateProperty(
+ SSH_CLIENT, CoreModuleProperties.SOCKET_KEEPALIVE.getName(), true);
+ SSH_CLIENT.start();
}
public static SshClient getSshClient() {
- return sshClient;
+ return SSH_CLIENT;
}
}
diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java
index 487e3607865..cd4d5436a85 100644
--- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java
@@ -17,8 +17,8 @@
package org.dromara.hertzbeat.collector.collect.mongodb;
-import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
@@ -47,20 +47,18 @@
* Mongodb 单机指标收集器
*
* @author liudonghua
- * @version 1.0
- * Created by liudonghua on 2023/01/01
- * see also https://www.mongodb.com/languages/java,
- * https://www.mongodb.com/docs/manual/reference/command/serverStatus/#metrics
+ * see also https://www.mongodb.com/languages/java,
+ * https://www.mongodb.com/docs/manual/reference/command/serverStatus/#metrics
*/
@Slf4j
public class MongodbSingleCollectImpl extends AbstractCollect {
/**
* 支持的 mongodb diagnostic 命令,排除internal/deprecated相关的命令
- * 可参考 https://www.mongodb.com/docs/manual/reference/command/nav-diagnostic/,
- * https://www.mongodb.com/docs/mongodb-shell/run-commands/
+ * 可参考 ...,
+ * ...
* 注意:一些命令需要相应的权限才能执行,否则执行虽然不会报错,但是返回的结果是空的,
- * 详见 https://www.mongodb.com/docs/manual/reference/built-in-roles/
+ * 详见 ...
*/
private static final String[] SUPPORTED_MONGODB_DIAGNOSTIC_COMMANDS = {
"buildInfo",
@@ -199,14 +197,10 @@ private MongoClient getClient(Metrics metrics) {
}
// 复用失败则新建连接 connect to mongodb
String url;
- try {
- // 密码可能包含特殊字符,需要使用类似js的encodeURIComponent进行编码,这里使用java的URLEncoder
- url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
- URLEncoder.encode(mongodbProtocol.getPassword(), "UTF-8"), mongodbProtocol.getHost(), mongodbProtocol.getPort(),
- mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase());
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ // 密码可能包含特殊字符,需要使用类似js的encodeURIComponent进行编码,这里使用java的URLEncoder
+ url = String.format("mongodb://%s:%s@%s:%s/%s?authSource=%s", mongodbProtocol.getUsername(),
+ URLEncoder.encode(mongodbProtocol.getPassword(), StandardCharsets.UTF_8), mongodbProtocol.getHost(), mongodbProtocol.getPort(),
+ mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase());
mongoClient = MongoClients.create(url);
MongodbConnect mongodbConnect = new MongodbConnect(mongoClient);
CommonCache.getInstance().addCache(identifier, mongodbConnect);
diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java
index 3bc3027a432..04f8d7673e8 100644
--- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java
+++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java
@@ -17,6 +17,9 @@
package org.dromara.hertzbeat.collector.collect.ssh;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.exception.SshChannelOpenException;
+import org.apache.sshd.common.util.io.output.NoCloseOutputStream;
import org.apache.sshd.common.util.security.SecurityUtils;
import org.dromara.hertzbeat.collector.collect.AbstractCollect;
import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier;
@@ -43,9 +46,11 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.SocketTimeoutException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -83,24 +88,24 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a
return;
}
SshProtocol sshProtocol = metrics.getSsh();
+ boolean reuseConnection = Boolean.parseBoolean(sshProtocol.getReuseConnection());
int timeout = CollectUtil.getTimeout(sshProtocol.getTimeout(), DEFAULT_TIMEOUT);
ClientChannel channel = null;
+ ClientSession clientSession = null;
try {
- ClientSession clientSession = getConnectSession(sshProtocol, timeout);
+ clientSession = getConnectSession(sshProtocol, timeout, reuseConnection);
channel = clientSession.createExecChannel(sshProtocol.getScript());
ByteArrayOutputStream response = new ByteArrayOutputStream();
channel.setOut(response);
- if (!channel.open().verify(timeout).isOpened()) {
- removeConnectSessionCache(sshProtocol);
- channel.close();
- clientSession.close();
- throw new Exception("ssh channel open failed");
- }
+ channel.setErr(new NoCloseOutputStream(System.err));
+ channel.open().verify(timeout);
List list = new ArrayList<>();
list.add(ClientChannelEvent.CLOSED);
- channel.waitFor(list, timeout);
+ Collection waitEvents = channel.waitFor(list, timeout);
+ if (waitEvents.contains(ClientChannelEvent.TIMEOUT)) {
+ throw new SocketTimeoutException("Failed to retrieve command result in time: " + sshProtocol.getScript());
+ }
Long responseTime = System.currentTimeMillis() - startTime;
- channel.close();
String result = response.toString();
if (!StringUtils.hasText(result)) {
builder.setCode(CollectRep.Code.FAIL);
@@ -126,11 +131,19 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a
log.info(errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("The peer refused to connect: service port does not listening or firewall: " + errorMsg);
+ } catch (SshException sshException) {
+ Throwable throwable = sshException.getCause();
+ if (throwable instanceof SshChannelOpenException) {
+ log.warn("Remote ssh server no more session channel, please increase sshd_config MaxSessions.");
+ }
+ String errorMsg = CommonUtil.getMessageFromThrowable(sshException);
+ builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+ builder.setMsg("Peer ssh connection failed: " + errorMsg);
} catch (IOException ioException) {
String errorMsg = CommonUtil.getMessageFromThrowable(ioException);
log.info(errorMsg);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
- builder.setMsg("Peer connection failed: " + errorMsg);
+ builder.setMsg("Peer io connection failed: " + errorMsg);
} catch (Exception exception) {
String errorMsg = CommonUtil.getMessageFromThrowable(exception);
log.warn(errorMsg, exception);
@@ -144,6 +157,13 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a
log.error(e.getMessage(), e);
}
}
+ if (clientSession != null && !reuseConnection) {
+ try {
+ clientSession.close();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
}
}
@@ -247,28 +267,31 @@ private void removeConnectSessionCache(SshProtocol sshProtocol) {
CommonCache.getInstance().removeCache(identifier);
}
- private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout) throws IOException, GeneralSecurityException {
+ private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout, boolean reuseConnection)
+ throws IOException, GeneralSecurityException {
CacheIdentifier identifier = CacheIdentifier.builder()
- .ip(sshProtocol.getHost()).port(sshProtocol.getPort())
- .username(sshProtocol.getUsername()).password(sshProtocol.getPassword())
- .build();
- Optional