diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java index f93e1af006..0544373609 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java @@ -17,36 +17,21 @@ package org.apache.eventmesh.runtime.admin.controller; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; -import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientByIpPortHandler; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientByPathHandler; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectAllClientHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectClientByIpPortHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectClientBySubSystemHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemAndDcnHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler; import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; -import org.apache.eventmesh.runtime.constants.EventMeshConstants; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; -import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,713 +48,18 @@ public ClientManageController(EventMeshTCPServer eventMeshTCPServer) { public void start() throws IOException { int port = eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerAdminPort; HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); - server.createContext("/clientManage/showClient", new ShowClientHandler()); - server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler()); - server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler()); - server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler()); - server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler()); - server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler()); - server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler()); - server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler()); -// server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler()); - server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler()); + server.createContext("/clientManage/showClient", new ShowClientHandler(eventMeshTCPServer)); + server.createContext("/clientManage/showClientBySystemAndDcn", new ShowClientBySystemAndDcnHandler(eventMeshTCPServer)); + server.createContext("/clientManage/rejectAllClient", new RejectAllClientHandler(eventMeshTCPServer)); + server.createContext("/clientManage/rejectClientByIpPort", new RejectClientByIpPortHandler(eventMeshTCPServer)); + server.createContext("/clientManage/rejectClientBySubSystem", new RejectClientBySubSystemHandler(eventMeshTCPServer)); + server.createContext("/clientManage/redirectClientBySubSystem", new RedirectClientBySubSystemHandler(eventMeshTCPServer)); + server.createContext("/clientManage/redirectClientByPath", new RedirectClientByPathHandler(eventMeshTCPServer)); + server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer)); +// server.createContext("/eventMesh/msg/push", new EventMeshMsgDownStreamHandler(eventMeshTCPServer)); + server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer)); server.start(); logger.info("ClientManageController start success, port:{}", port); } - - private Map parsePostParameters(HttpExchange exchange) - throws IOException { - Map parameters = new HashMap<>(); - if ("post".equalsIgnoreCase(exchange.getRequestMethod())) { - InputStreamReader isr = - new InputStreamReader(exchange.getRequestBody(), "utf-8"); - BufferedReader br = new BufferedReader(isr); - String query = br.readLine(); - parseQuery(query, parameters); - } - return parameters; - } - - @SuppressWarnings("unchecked") - private void parseQuery(String query, Map parameters) - throws UnsupportedEncodingException { - - if (query != null) { - String pairs[] = query.split("&"); - - for (String pair : pairs) { - String param[] = pair.split("="); - - String key = null; - String value = null; - if (param.length > 0) { - key = URLDecoder.decode(param[0], "UTF-8"); - } - - if (param.length > 1) { - value = URLDecoder.decode(param[1], "UTF-8"); - } - - if (parameters.containsKey(key)) { - Object obj = parameters.get(key); - if (obj instanceof List) { - List values = (List) obj; - values.add(value); - } else if (obj instanceof String) { - List values = new ArrayList(); - values.add((String) obj); - values.add(value); - parameters.put(key, values); - } - } else { - parameters.put(key, value); - } - } - } - } - - /** - * 打印本eventMesh上所有客户端信息 - * - * @return - */ - class ShowClientHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String newLine = System.getProperty("line.separator"); - logger.info("showAllClient================="); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - Map dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo(); - if (!dcnSystemMap.isEmpty()) { - List> list = new ArrayList<>(); - ValueComparator vc = new ValueComparator(); - for (Map.Entry entry : dcnSystemMap.entrySet()) { - list.add(entry); - } - Collections.sort(list, vc); - for (Map.Entry entry : list) { - result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) + - newLine; - } - } - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("ShowClientHandler fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - class ValueComparator implements Comparator> { - @Override - public int compare(Map.Entry x, Map.Entry y) { - return x.getValue().intValue() - y.getValue().intValue(); - } - } - - /** - * print clientInfo by subsys and dcn - * - * @return - */ - class ShowClientBySystemAndDcnHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); - String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); - - String newLine = System.getProperty("line.separator"); - logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - if (!sessionMap.isEmpty()) { - for (Session session : sessionMap.values()) { - if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { - UserAgent userAgent = session.getClient(); - result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent - .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine; - } - } - } - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("ShowClientBySystemAndDcnHandler fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - - /** - * query client subscription by topic - */ - class ShowListenClientByTopicHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC); - - String newLine = System.getProperty("line.separator"); - logger.info("showListeningClientByTopic,topic:{}=================", topic); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap clientGroupMap = clientSessionGroupMapping.getClientGroupMap(); - if (!clientGroupMap.isEmpty()) { - for (ClientGroupWrapper cgw : clientGroupMap.values()) { - Set listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic); - if (listenSessionSet != null && listenSessionSet.size() > 0) { - result += String.format("group:%s", cgw.getGroupName()) + newLine; - for (Session session : listenSessionSet) { - UserAgent userAgent = session.getClient(); - result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent - .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine; - } - } - } - } - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("ShowListenClientByTopicHandler fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - - /** - * remove all clients accessed by eventMesh - * - * @return - */ - class RejectAllClientHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - final List successRemoteAddrs = new ArrayList(); - try { - logger.info("rejectAllClient in admin===================="); - if (!sessionMap.isEmpty()) { - for (Map.Entry entry : sessionMap.entrySet()) { - InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); - if (addr != null) { - successRemoteAddrs.add(addr); - } - } - } - } catch (Exception e) { - logger.error("clientManage|rejectAllClient|fail", e); - result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s", - sessionMap.size(), printClients(successRemoteAddrs), e.getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size - (), printClients(successRemoteAddrs)); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("rejectAllClient fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - /** - * remove c client by ip and port - * - * @return - */ - class RejectClientByIpPortHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP); - String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT); - - if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) { - httpExchange.sendResponseHeaders(200, 0); - result = "params illegal!"; - out.write(result.getBytes()); - return; - } - logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - final List successRemoteAddrs = new ArrayList(); - try { - if (!sessionMap.isEmpty()) { - for (Map.Entry entry : sessionMap.entrySet()) { - if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) { - InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); - if (addr != null) { - successRemoteAddrs.add(addr); - } - } - } - } - } catch (Exception e) { - logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e); - result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip, - port, printClients(successRemoteAddrs), e.getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - - result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port, printClients - (successRemoteAddrs)); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("rejectClientByIpPort fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - - /** - * remove c client by dcn and susysId - * - * @return - */ - class RejectClientBySubSystemHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); - String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); - - if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) { - httpExchange.sendResponseHeaders(200, 0); - result = "params illegal!"; - out.write(result.getBytes()); - return; - } - - logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - final List successRemoteAddrs = new ArrayList(); - try { - if (!sessionMap.isEmpty()) { - for (Session session : sessionMap.values()) { - if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { - InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping); - if (addr != null) { - successRemoteAddrs.add(addr); - } - } - } - } - } catch (Exception e) { - logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e); - result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%d} , {dcn=%s " + - "port=%s}, errorMsg : %s", sessionMap.size(), printClients(successRemoteAddrs), dcn, - subSystem, e.getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " + - "port=%s}", sessionMap.size(), printClients(successRemoteAddrs), dcn, subSystem); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("rejectClientBySubSystem fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - /** - * redirect subsystem for subsys and dcn - * - * @return - */ - class RedirectClientBySubSystemHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); - String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); - String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); - String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); - - if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem) - || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) - || !StringUtils.isNumeric(destEventMeshPort)) { - httpExchange.sendResponseHeaders(200, 0); - result = "params illegal!"; - out.write(result.getBytes()); - return; - } - logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - String redirectResult = ""; - try { - if (!sessionMap.isEmpty()) { - for (Session session : sessionMap.values()) { - if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { - redirectResult += "|"; - redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), - session, clientSessionGroupMapping); - } - } - } - } catch (Exception e) { - logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" + - "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e); - result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", - sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e - .getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", - sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("redirectClientBySubSystem fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - /** - * redirect subsystem for path - * - * @return - */ - class RedirectClientByPathHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH); - String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); - String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); - - if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) || - !StringUtils.isNumeric(destEventMeshPort)) { - httpExchange.sendResponseHeaders(200, 0); - result = "params illegal!"; - out.write(result.getBytes()); - return; - } - logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - String redirectResult = ""; - try { - if (!sessionMap.isEmpty()) { - for (Session session : sessionMap.values()) { - if (session.getClient().getPath().contains(path)) { - redirectResult += "|"; - redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), - session, clientSessionGroupMapping); - } - } - } - } catch (Exception e) { - logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" + - "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e); - result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", - sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e - .getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", - sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("redirectClientByPath fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - /** - * redirect subsystem for ip and port - * - * @return - */ - class RedirectClientByIpPortHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = ""; - OutputStream out = httpExchange.getResponseBody(); - try { - String queryString = httpExchange.getRequestURI().getQuery(); - Map queryStringInfo = formData2Dic(queryString); - String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP); - String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT); - String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); - String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); - - if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port) - || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) - || !StringUtils.isNumeric(destEventMeshPort)) { - httpExchange.sendResponseHeaders(200, 0); - result = "params illegal!"; - out.write(result.getBytes()); - return; - } - logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort); - ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); - ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); - String redirectResult = ""; - try { - if (!sessionMap.isEmpty()) { - for (Session session : sessionMap.values()) { - if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) { - redirectResult += "|"; - redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), - session, clientSessionGroupMapping); - } - } - } - } catch (Exception e) { - logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" + - "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e); - result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", - sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e - .getMessage()); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - return; - } - result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " + - "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", - sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult); - httpExchange.sendResponseHeaders(200, 0); - out.write(result.getBytes()); - } catch (Exception e) { - logger.error("redirectClientByIpPort fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } - - private String printClients(List clients) { - if (clients.isEmpty()) { - return "no session had been closed"; - } - StringBuilder sb = new StringBuilder(); - for (InetSocketAddress addr : clients) { - sb.append(addr).append("|"); - } - return sb.toString(); - } - - private Map formData2Dic(String formData) { - Map result = new HashMap<>(); - if (formData == null || formData.trim().length() == 0) { - return result; - } - final String[] items = formData.split("&"); - Arrays.stream(items).forEach(item -> { - final String[] keyAndVal = item.split("="); - if (keyAndVal.length == 2) { - try { - final String key = URLDecoder.decode(keyAndVal[0], "utf8"); - final String val = URLDecoder.decode(keyAndVal[1], "utf8"); - result.put(key, val); - } catch (UnsupportedEncodingException e) { - logger.warn("formData2Dic:param decode failed...", e); - } - } - }); - return result; - } - - class EventMeshMsgDownStreamHandler implements HttpHandler { - @Override - public void handle(HttpExchange httpExchange) throws IOException { - String result = "false"; - OutputStream out = httpExchange.getResponseBody(); - try { -// Map queryStringInfo = parsePostParameters(httpExchange); -// String msgStr = (String)queryStringInfo.get("msg"); -// String groupName = (String)queryStringInfo.get("group"); -// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr); -// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) { -// logger.warn("msg or groupName is null"); -// httpExchange.sendResponseHeaders(200, 0); -// out.write(result.getBytes()); -// return; -// } -// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class); -// String topic = messageExt.getTopic(); -// -// if (!EventMeshUtil.isValidRMBTopic(topic)) { -// logger.warn("msg topic is illegal"); -// httpExchange.sendResponseHeaders(200, 0); -// out.write(result.getBytes()); -// return; -// } -// -// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy(); -// Set groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions(); -// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions); -// -// if(session == null){ -// logger.error("DownStream msg,retry other eventMesh found no session again"); -// httpExchange.sendResponseHeaders(200, 0); -// out.write(result.getBytes()); -// return; -// } -// -// DownStreamMsgContext downStreamMsgContext = -// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true); -// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext); -// -// if (session.isCanDownStream()) { -// session.downstreamMsg(downStreamMsgContext); -// httpExchange.sendResponseHeaders(200, 0); -// result = "true"; -// out.write(result.getBytes()); -// return; -// } -// -// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq); -// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills; -// downStreamMsgContext.delay(delayTime); -// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext); -// result = "true"; -// httpExchange.sendResponseHeaders(200, 0); -// out.write(result.getBytes()); - - } catch (Exception e) { - logger.error("EventMeshMsgDownStreamHandler handle fail...", e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException e) { - logger.warn("out close failed...", e); - } - } - } - - } - } } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java new file mode 100644 index 0000000000..cdc086e8e8 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/EventMeshMsgDownStreamHandler.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EventMeshMsgDownStreamHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(EventMeshMsgDownStreamHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public EventMeshMsgDownStreamHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = "false"; + OutputStream out = httpExchange.getResponseBody(); + try { +// Map queryStringInfo = parsePostParameters(httpExchange); +// String msgStr = (String)queryStringInfo.get("msg"); +// String groupName = (String)queryStringInfo.get("group"); +// logger.info("recieve msg from other eventMesh, group:{}, msg:{}", groupName, msgStr); +// if (StringUtils.isBlank(msgStr) || StringUtils.isBlank(groupName)) { +// logger.warn("msg or groupName is null"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// MessageExt messageExt = JSON.parseObject(msgStr, MessageExt.class); +// String topic = messageExt.getTopic(); +// +// if (!EventMeshUtil.isValidRMBTopic(topic)) { +// logger.warn("msg topic is illegal"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// +// DownstreamDispatchStrategy downstreamDispatchStrategy = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamDispatchStrategy(); +// Set groupConsumerSessions = eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getGroupConsumerSessions(); +// Session session = downstreamDispatchStrategy.select(groupName, topic, groupConsumerSessions); +// +// if(session == null){ +// logger.error("DownStream msg,retry other eventMesh found no session again"); +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); +// return; +// } +// +// DownStreamMsgContext downStreamMsgContext = +// new DownStreamMsgContext(messageExt, session, eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getPersistentMsgConsumer(), null, true); +// eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupWrapper(groupName).getDownstreamMap().putIfAbsent(downStreamMsgContext.seq, downStreamMsgContext); +// +// if (session.isCanDownStream()) { +// session.downstreamMsg(downStreamMsgContext); +// httpExchange.sendResponseHeaders(200, 0); +// result = "true"; +// out.write(result.getBytes()); +// return; +// } +// +// logger.warn("EventMeshMsgDownStreamHandler|dispatch retry, seq[{}]", downStreamMsgContext.seq); +// long delayTime = EventMeshUtil.isService(downStreamMsgContext.msgExt.getTopic()) ? 0 : eventMeshTCPServer.getAccessConfiguration().eventMeshTcpMsgRetryDelayInMills; +// downStreamMsgContext.delay(delayTime); +// eventMeshTCPServer.getEventMeshTcpRetryer().pushRetry(downStreamMsgContext); +// result = "true"; +// httpExchange.sendResponseHeaders(200, 0); +// out.write(result.getBytes()); + + } catch (Exception e) { + logger.error("EventMeshMsgDownStreamHandler handle fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } + + private Map parsePostParameters(HttpExchange exchange) + throws IOException { + Map parameters = new HashMap<>(); + if ("post".equalsIgnoreCase(exchange.getRequestMethod())) { + InputStreamReader isr = + new InputStreamReader(exchange.getRequestBody(), "utf-8"); + BufferedReader br = new BufferedReader(isr); + String query = br.readLine(); + parseQuery(query, parameters); + } + return parameters; + } + + @SuppressWarnings("unchecked") + private void parseQuery(String query, Map parameters) + throws UnsupportedEncodingException { + + if (query != null) { + String pairs[] = query.split("&"); + + for (String pair : pairs) { + String param[] = pair.split("="); + + String key = null; + String value = null; + if (param.length > 0) { + key = URLDecoder.decode(param[0], "UTF-8"); + } + + if (param.length > 1) { + value = URLDecoder.decode(param[1], "UTF-8"); + } + + if (parameters.containsKey(key)) { + Object obj = parameters.get(key); + if (obj instanceof List) { + List values = (List) obj; + values.add(value); + } else if (obj instanceof String) { + List values = new ArrayList(); + values.add((String) obj); + values.add(value); + parameters.put(key, values); + } + } else { + parameters.put(key, value); + } + } + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java new file mode 100644 index 0000000000..79ba4bd98f --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RedirectClientByIpPortHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RedirectClientByIpPortHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public RedirectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP); + String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT); + String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); + String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); + + if (StringUtils.isBlank(ip) || !StringUtils.isNumeric(port) + || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) + || !StringUtils.isNumeric(destEventMeshPort)) { + httpExchange.sendResponseHeaders(200, 0); + result = "params illegal!"; + out.write(result.getBytes()); + return; + } + logger.info("redirectClientByIpPort in admin,ip:{},port:{},destIp:{},destPort:{}====================", ip, port, destEventMeshIp, destEventMeshPort); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + String redirectResult = ""; + try { + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getHost().equals(ip) && String.valueOf(session.getClient().getPort()).equals(port)) { + redirectResult += "|"; + redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), + session, clientSessionGroupMapping); + } + } + } + } catch (Exception e) { + logger.error("clientManage|redirectClientByIpPort|fail|ip={}|port={}|destEventMeshIp" + + "={}|destEventMeshPort={},errMsg={}", ip, port, destEventMeshIp, destEventMeshPort, e); + result = String.format("redirectClientByIpPort fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", + sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult, e + .getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + result = String.format("redirectClientByIpPort success! sessionMap size {%d}, {ip=%s port=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", + sessionMap.size(), ip, port, destEventMeshIp, destEventMeshPort, redirectResult); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("redirectClientByIpPort fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java new file mode 100644 index 0000000000..b605a1cc64 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByPathHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * redirect subsystem for path + */ +public class RedirectClientByPathHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RedirectClientByPathHandler.class); + + private EventMeshTCPServer eventMeshTCPServer; + + public RedirectClientByPathHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String path = queryStringInfo.get(EventMeshConstants.MANAGE_PATH); + String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); + String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); + + if (StringUtils.isBlank(path) || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) || + !StringUtils.isNumeric(destEventMeshPort)) { + httpExchange.sendResponseHeaders(200, 0); + result = "params illegal!"; + out.write(result.getBytes()); + return; + } + logger.info("redirectClientByPath in admin,path:{},destIp:{},destPort:{}====================", path, destEventMeshIp, destEventMeshPort); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + String redirectResult = ""; + try { + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getPath().contains(path)) { + redirectResult += "|"; + redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), + session, clientSessionGroupMapping); + } + } + } + } catch (Exception e) { + logger.error("clientManage|redirectClientByPath|fail|path={}|destEventMeshIp" + + "={}|destEventMeshPort={},errMsg={}", path, destEventMeshIp, destEventMeshPort, e); + result = String.format("redirectClientByPath fail! sessionMap size {%d}, {path=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", + sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult, e + .getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + result = String.format("redirectClientByPath success! sessionMap size {%d}, {path=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", + sessionMap.size(), path, destEventMeshIp, destEventMeshPort, redirectResult); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("redirectClientByPath fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java new file mode 100644 index 0000000000..fa03e7205b --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientBySubSystemHandler.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * redirect subsystem for subsys and dcn + */ +public class RedirectClientBySubSystemHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RedirectClientBySubSystemHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public RedirectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); + String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); + String destEventMeshIp = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_IP); + String destEventMeshPort = queryStringInfo.get(EventMeshConstants.MANAGE_DEST_PORT); + + if (StringUtils.isBlank(dcn) || !StringUtils.isNumeric(subSystem) + || StringUtils.isBlank(destEventMeshIp) || StringUtils.isBlank(destEventMeshPort) + || !StringUtils.isNumeric(destEventMeshPort)) { + httpExchange.sendResponseHeaders(200, 0); + result = "params illegal!"; + out.write(result.getBytes()); + return; + } + logger.info("redirectClientBySubSystem in admin,subsys:{},dcn:{},destIp:{},destPort:{}====================", subSystem, dcn, destEventMeshIp, destEventMeshPort); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + String redirectResult = ""; + try { + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { + redirectResult += "|"; + redirectResult += EventMeshTcp2Client.redirectClient2NewEventMesh(eventMeshTCPServer, destEventMeshIp, Integer.parseInt(destEventMeshPort), + session, clientSessionGroupMapping); + } + } + } + } catch (Exception e) { + logger.error("clientManage|redirectClientBySubSystem|fail|dcn={}|subSystem={}|destEventMeshIp" + + "={}|destEventMeshPort={},errMsg={}", dcn, subSystem, destEventMeshIp, destEventMeshPort, e); + result = String.format("redirectClientBySubSystem fail! sessionMap size {%d}, {clientIp=%s clientPort=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s}, errorMsg : %s", + sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult, e + .getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + result = String.format("redirectClientBySubSystem success! sessionMap size {%d}, {dcn=%s subSystem=%s " + + "destEventMeshIp=%s destEventMeshPort=%s}, result {%s} ", + sessionMap.size(), dcn, subSystem, destEventMeshIp, destEventMeshPort, redirectResult); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("redirectClientBySubSystem fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java new file mode 100644 index 0000000000..6314c489ac --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectAllClientHandler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RejectAllClientHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RejectAllClientHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public RejectAllClientHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + /** + * remove all clients accessed by eventMesh + * + * @param httpExchange + * @throws IOException + */ + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + final List successRemoteAddrs = new ArrayList<>(); + try { + logger.info("rejectAllClient in admin===================="); + if (!sessionMap.isEmpty()) { + for (Map.Entry entry : sessionMap.entrySet()) { + InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); + if (addr != null) { + successRemoteAddrs.add(addr); + } + } + } + } catch (Exception e) { + logger.error("clientManage|rejectAllClient|fail", e); + result = String.format("rejectAllClient fail! sessionMap size {%d}, had reject {%s}, errorMsg : %s", + sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), e.getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + result = String.format("rejectAllClient success! sessionMap size {%d}, had reject {%s}", sessionMap.size(), + NetUtils.addressToString(successRemoteAddrs)); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("rejectAllClient fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java new file mode 100644 index 0000000000..3ddd45e492 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientByIpPortHandler.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RejectClientByIpPortHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RejectClientByIpPortHandler.class); + + private EventMeshTCPServer eventMeshTCPServer; + + public RejectClientByIpPortHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String ip = queryStringInfo.get(EventMeshConstants.MANAGE_IP); + String port = queryStringInfo.get(EventMeshConstants.MANAGE_PORT); + + if (StringUtils.isBlank(ip) || StringUtils.isBlank(port)) { + httpExchange.sendResponseHeaders(200, 0); + result = "params illegal!"; + out.write(result.getBytes()); + return; + } + logger.info("rejectClientByIpPort in admin,ip:{},port:{}====================", ip, port); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + final List successRemoteAddrs = new ArrayList(); + try { + if (!sessionMap.isEmpty()) { + for (Map.Entry entry : sessionMap.entrySet()) { + if (entry.getKey().getHostString().equals(ip) && String.valueOf(entry.getKey().getPort()).equals(port)) { + InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, entry.getValue(), clientSessionGroupMapping); + if (addr != null) { + successRemoteAddrs.add(addr); + } + } + } + } + } catch (Exception e) { + logger.error("clientManage|rejectClientByIpPort|fail|ip={}|port={},errMsg={}", ip, port, e); + result = String.format("rejectClientByIpPort fail! {ip=%s port=%s}, had reject {%s}, errorMsg : %s", ip, + port, NetUtils.addressToString(successRemoteAddrs), e.getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + + result = String.format("rejectClientByIpPort success! {ip=%s port=%s}, had reject {%s}", ip, port, + NetUtils.addressToString(successRemoteAddrs)); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("rejectClientByIpPort fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java new file mode 100644 index 0000000000..5b1e841bc8 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/RejectClientBySubSystemHandler.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RejectClientBySubSystemHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(RejectClientBySubSystemHandler.class); + + private EventMeshTCPServer eventMeshTCPServer; + + public RejectClientBySubSystemHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + /** + * remove c client by dcn and susysId + * @param httpExchange + * @throws IOException + */ + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); + String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); + + if (StringUtils.isBlank(dcn) || StringUtils.isBlank(subSystem)) { + httpExchange.sendResponseHeaders(200, 0); + result = "params illegal!"; + out.write(result.getBytes()); + return; + } + + logger.info("rejectClientBySubSystem in admin,subsys:{},dcn:{}====================", subSystem, dcn); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + final List successRemoteAddrs = new ArrayList<>(); + try { + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { + InetSocketAddress addr = EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, session, clientSessionGroupMapping); + if (addr != null) { + successRemoteAddrs.add(addr); + } + } + } + } + } catch (Exception e) { + logger.error("clientManage|rejectClientBySubSystem|fail|dcn={}|subSystemId={},errMsg={}", dcn, subSystem, e); + result = String.format("rejectClientBySubSystem fail! sessionMap size {%d}, had reject {%s} , {dcn=%s " + + "port=%s}, errorMsg : %s", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn, + subSystem, e.getMessage()); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + return; + } + result = String.format("rejectClientBySubSystem success! sessionMap size {%d}, had reject {%s} , {dcn=%s " + + "port=%s}", sessionMap.size(), NetUtils.addressToString(successRemoteAddrs), dcn, subSystem); + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("rejectClientBySubSystem fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java new file mode 100644 index 0000000000..9ccd547291 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientBySystemAndDcnHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ShowClientBySystemAndDcnHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(ShowClientBySystemAndDcnHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public ShowClientBySystemAndDcnHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + /** + * print clientInfo by subsys and dcn + * + * @param httpExchange + * @throws IOException + */ + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String dcn = queryStringInfo.get(EventMeshConstants.MANAGE_DCN); + String subSystem = queryStringInfo.get(EventMeshConstants.MANAGE_SUBSYSTEM); + + String newLine = System.getProperty("line.separator"); + logger.info("showClientBySubsysAndDcn,subsys:{},dcn:{}=================", subSystem, dcn); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap sessionMap = clientSessionGroupMapping.getSessionMap(); + if (!sessionMap.isEmpty()) { + for (Session session : sessionMap.values()) { + if (session.getClient().getDcn().equals(dcn) && session.getClient().getSubsystem().equals(subSystem)) { + UserAgent userAgent = session.getClient(); + result += String.format("pid=%s | ip=%s | port=%s | path=%s | purpose=%s", userAgent.getPid(), userAgent + .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getPurpose()) + newLine; + } + } + } + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("ShowClientBySystemAndDcnHandler fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + } + + +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java new file mode 100644 index 0000000000..314b2e5635 --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowClientHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This handler used to print the total client info + */ +public class ShowClientHandler implements HttpHandler { + + private static final Logger logger = LoggerFactory.getLogger(ShowClientHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public ShowClientHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String newLine = System.getProperty("line.separator"); + logger.info("showAllClient================="); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + Map dcnSystemMap = clientSessionGroupMapping.statDCNSystemInfo(); + if (!dcnSystemMap.isEmpty()) { + List> list = new ArrayList<>(); + for (Map.Entry entry : dcnSystemMap.entrySet()) { + list.add(entry); + } + Collections.sort(list, Comparator.comparingInt(x -> x.getValue().intValue())); + for (Map.Entry entry : list) { + result += String.format("System=%s | ClientNum=%d", entry.getKey(), entry.getValue().intValue()) + + newLine; + } + } + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("ShowClientHandler fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java new file mode 100644 index 0000000000..62d76ddecf --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/handler/ShowListenClientByTopicHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.eventmesh.common.protocol.tcp.UserAgent; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.apache.eventmesh.runtime.constants.EventMeshConstants; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; +import org.apache.eventmesh.runtime.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * query client subscription by topic + */ +public class ShowListenClientByTopicHandler implements HttpHandler { + + private Logger logger = LoggerFactory.getLogger(ShowListenClientByTopicHandler.class); + + private final EventMeshTCPServer eventMeshTCPServer; + + public ShowListenClientByTopicHandler(EventMeshTCPServer eventMeshTCPServer) { + this.eventMeshTCPServer = eventMeshTCPServer; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + String result = ""; + OutputStream out = httpExchange.getResponseBody(); + try { + String queryString = httpExchange.getRequestURI().getQuery(); + Map queryStringInfo = NetUtils.formData2Dic(queryString); + String topic = queryStringInfo.get(EventMeshConstants.MANAGE_TOPIC); + + String newLine = System.getProperty("line.separator"); + logger.info("showListeningClientByTopic,topic:{}=================", topic); + ClientSessionGroupMapping clientSessionGroupMapping = eventMeshTCPServer.getClientSessionGroupMapping(); + ConcurrentHashMap clientGroupMap = clientSessionGroupMapping.getClientGroupMap(); + if (!clientGroupMap.isEmpty()) { + for (ClientGroupWrapper cgw : clientGroupMap.values()) { + Set listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic); + if (listenSessionSet != null && listenSessionSet.size() > 0) { + result += String.format("group:%s", cgw.getGroupName()) + newLine; + for (Session session : listenSessionSet) { + UserAgent userAgent = session.getClient(); + result += String.format("pid=%s | ip=%s | port=%s | path=%s | version=%s", userAgent.getPid(), userAgent + .getHost(), userAgent.getPort(), userAgent.getPath(), userAgent.getVersion()) + newLine; + } + } + } + } + httpExchange.sendResponseHeaders(200, 0); + out.write(result.getBytes()); + } catch (Exception e) { + logger.error("ShowListenClientByTopicHandler fail...", e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.warn("out close failed...", e); + } + } + } + + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java new file mode 100644 index 0000000000..a2563bce3b --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/NetUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.runtime.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.net.InetSocketAddress; +import java.net.URLDecoder; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NetUtils { + + private static final Logger logger = LoggerFactory.getLogger(NetUtils.class); + + /** + * Transform the url form string to Map + * + * @param formData + * @return + */ + public static Map formData2Dic(String formData) { + Map result = new HashMap<>(); + if (formData == null || formData.trim().length() == 0) { + return result; + } + final String[] items = formData.split("&"); + Arrays.stream(items).forEach(item -> { + final String[] keyAndVal = item.split("="); + if (keyAndVal.length == 2) { + try { + final String key = URLDecoder.decode(keyAndVal[0], "utf8"); + final String val = URLDecoder.decode(keyAndVal[1], "utf8"); + result.put(key, val); + } catch (UnsupportedEncodingException e) { + logger.warn("formData2Dic:param decode failed...", e); + } + } + }); + return result; + } + + public static String addressToString(List clients) { + if (clients.isEmpty()) { + return "no session had been closed"; + } + StringBuilder sb = new StringBuilder(); + for (InetSocketAddress addr : clients) { + sb.append(addr).append("|"); + } + return sb.toString(); + } +} diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java new file mode 100644 index 0000000000..2d0ae0391f --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/admin/handler/RedirectClientByIpPortHandlerTest.java @@ -0,0 +1,40 @@ +package org.apache.eventmesh.runtime.admin.handler; + +import com.sun.net.httpserver.HttpExchange; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; + +public class RedirectClientByIpPortHandlerTest { + + private RedirectClientByIpPortHandler redirectClientByIpPortHandler; + + @Before + public void init() { + EventMeshTCPServer mockServer = PowerMockito.mock(EventMeshTCPServer.class); + redirectClientByIpPortHandler = new RedirectClientByIpPortHandler(mockServer); + } + + @Test + public void testHandleParamIllegal() throws IOException { + OutputStream outputStream = new ByteArrayOutputStream(); + URI uri = URI.create("ip=127.0.0.1&port=1234&desteventMeshIp=127.0.0.1&desteventMeshPort="); + + HttpExchange mockExchange = PowerMockito.mock(HttpExchange.class); + PowerMockito.when(mockExchange.getResponseBody()).thenReturn(outputStream); + PowerMockito.when(mockExchange.getRequestURI()).thenReturn(uri); + + redirectClientByIpPortHandler.handle(mockExchange); + + String response = outputStream.toString(); + Assert.assertEquals("params illegal!", response); + + } +} \ No newline at end of file