From 9a58b000c83ba3714b37a10909765c07bddb15f9 Mon Sep 17 00:00:00 2001 From: ylz Date: Thu, 1 Feb 2024 14:20:56 +0800 Subject: [PATCH] support ps protocol inner 349 fix fix --- .../nio/handler/PreparedResponseHandler.java | 6 +- .../mysqlsharding/MySQLResponseService.java | 10 +- .../rwsplit/MysqlExecuteResponseHandler.java | 203 ++++++++++++++++++ .../rwsplit/MysqlPrepareLogicHandler.java | 99 +++++---- .../dble/services/rwsplit/RWSplitHandler.java | 68 +++--- 5 files changed, 309 insertions(+), 77 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/services/rwsplit/MysqlExecuteResponseHandler.java diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/PreparedResponseHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/PreparedResponseHandler.java index 9f2667e5bc..908dfe4c7e 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/PreparedResponseHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/PreparedResponseHandler.java @@ -15,9 +15,7 @@ */ public interface PreparedResponseHandler { - void preparedOkResponse(byte[] ok, MySQLResponseService service); + void preparedOkResponse(byte[] ok, List fields, List params, MySQLResponseService service); - void paramEofResponse(List params, byte[] eof, MySQLResponseService service); - - void fieldEofResponse(List fields, byte[] eof, MySQLResponseService service); + void preparedExecuteResponse(byte[] header, List fields, byte[] eof, MySQLResponseService service); } diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java index 9caa4a8795..71bcd65749 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLResponseService.java @@ -22,6 +22,7 @@ import com.actiontech.dble.services.BusinessService; import com.actiontech.dble.services.VariablesService; import com.actiontech.dble.services.mysqlauthenticate.MySQLBackAuthService; +import com.actiontech.dble.services.rwsplit.MysqlExecuteResponseHandler; import com.actiontech.dble.services.rwsplit.MysqlPrepareLogicHandler; import com.actiontech.dble.services.rwsplit.MysqlStatisticsLogicHandler; import com.actiontech.dble.services.rwsplit.RWSplitService; @@ -68,6 +69,7 @@ public class MySQLResponseService extends VariablesService { private volatile boolean isDDL = false; private volatile boolean prepareOK = false; private volatile boolean statisticResponse = false; + private volatile boolean executeResponse = false; private volatile boolean testing = false; private volatile StatusSync statusSync; private volatile boolean isRowDataFlowing = false; @@ -80,6 +82,7 @@ public class MySQLResponseService extends VariablesService { private MysqlBackendLogicHandler baseLogicHandler; private MysqlPrepareLogicHandler prepareLogicHandler; private MysqlStatisticsLogicHandler statisticsLogicHandler; + private MysqlExecuteResponseHandler executeResponseHandler; private static final CommandPacket COMMIT = new CommandPacket(); private static final CommandPacket ROLLBACK = new CommandPacket(); @@ -101,7 +104,6 @@ public MySQLResponseService(AbstractConnection connection) { initFromConfig(); this.proto = new MySQLProtoHandlerImpl(); this.baseLogicHandler = new MysqlBackendLogicHandler(this); - this.prepareLogicHandler = new MysqlPrepareLogicHandler(this); this.statisticsLogicHandler = new MysqlStatisticsLogicHandler(this); } @@ -151,6 +153,8 @@ protected void handleInnerData(byte[] data) { prepareLogicHandler.handleInnerData(data); } else if (statisticResponse) { statisticsLogicHandler.handleInnerData(data); + } else if (executeResponse) { + executeResponseHandler.handleInnerData(data); } else { baseLogicHandler.handleInnerData(data); } @@ -639,6 +643,10 @@ public void execute(RWSplitService service, byte[] originPacket) { if (originPacket.length > 4) { prepareOK = originPacket[4] == MySQLPacket.COM_STMT_PREPARE; statisticResponse = originPacket[4] == MySQLPacket.COM_STATISTICS; + if (originPacket[4] == MySQLPacket.COM_STMT_EXECUTE) { + this.executeResponseHandler = new MysqlExecuteResponseHandler(this, originPacket[9] == (byte) 0x01); + executeResponse = true; + } } isExecuting = true; diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/MysqlExecuteResponseHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/MysqlExecuteResponseHandler.java new file mode 100644 index 0000000000..2d8d467556 --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/rwsplit/MysqlExecuteResponseHandler.java @@ -0,0 +1,203 @@ +package com.actiontech.dble.services.rwsplit; + +import com.actiontech.dble.backend.mysql.ByteUtil; +import com.actiontech.dble.backend.mysql.nio.handler.LoadDataResponseHandler; +import com.actiontech.dble.backend.mysql.nio.handler.PreparedResponseHandler; +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; +import com.actiontech.dble.net.mysql.*; +import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; +import com.actiontech.dble.services.mysqlsharding.MysqlBackendLogicHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +public class MysqlExecuteResponseHandler extends MysqlBackendLogicHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MysqlExecuteResponseHandler.class); + + private final boolean cursor; + private volatile boolean okAfterEof = false; + + public MysqlExecuteResponseHandler(MySQLResponseService service, boolean cursor) { + super(service); + this.cursor = cursor; + } + + public void handleInnerData(byte[] data) { + if (service.getConnection().isClosed()) { + if (data != null && data.length > 4 && data[4] == ErrorPacket.FIELD_COUNT) { + service.parseErrorPacket(data, "connection close"); + } + return; + } + switch (resultStatus) { + case RESULT_STATUS_INIT: + if (service.getSession() != null) { + service.getSession().startExecuteBackend(service.getConnection().getId()); + } + switch (data[4]) { + case OkPacket.FIELD_COUNT: + handleOkPacket(data); + break; + case ErrorPacket.FIELD_COUNT: + handleErrorPacket(data); + break; + case RequestFilePacket.FIELD_COUNT: + handleRequestPacket(data); + break; + default: + resultStatus = RESULT_STATUS_HEADER; + header = data; + fields = new ArrayList<>((int) ByteUtil.readLength(data, 4)); + } + break; + case RESULT_STATUS_HEADER: + switch (data[4]) { + case ErrorPacket.FIELD_COUNT: + reset(); + handleErrorPacket(data); + break; + case EOFPacket.FIELD_COUNT: + resultStatus = RESULT_STATUS_FIELD_EOF; + eof(data); + break; + default: + fields.add(data); + } + break; + case RESULT_STATUS_FIELD_EOF: + switch (data[4]) { + case ErrorPacket.FIELD_COUNT: + reset(); + handleErrorPacket(data); + break; + case EOFPacket.FIELD_COUNT: + if (data.length > MySQLPacket.MAX_EOF_SIZE) { + handleRowPacket(data); + } else { + reset(); + handleRowEofPacket(data); + } + break; + default: + handleRowPacket(data); + } + break; + default: + throw new RuntimeException("unknown status!"); + } + } + + + + private void handleErrorPacket(byte[] data) { + final ResponseHandler respHand = service.getResponseHandler(); + service.setExecuting(false); + service.signal(); + if (respHand != null) { + respHand.errorResponse(data, service); + } else { + try { + ErrorPacket errPkg = new ErrorPacket(); + errPkg.read(data); + String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage()); + LOGGER.warn("no handler process the execute sql err,just close it, sql error:{},back con:{}", errMsg, service); + if (service.getSession() != null) { + LOGGER.warn("no handler process the execute sql err,front conn {}", service.getSession().getSource()); + } + + } catch (RuntimeException e) { + LOGGER.info("error handle error-packet", e); + } + closeNoHandler(); + } + } + + private void closeNoHandler() { + if (!service.getConnection().isClosed()) { + service.getConnection().close("no handler"); + LOGGER.info("no handler bind in this con " + this + " client:" + service); + } + } + + private void handleRequestPacket(byte[] data) { + ResponseHandler respHand = service.getResponseHandler(); + if (respHand instanceof LoadDataResponseHandler) { + ((LoadDataResponseHandler) respHand).requestDataResponse(data, service); + } else { + closeNoHandler(); + } + } + + private void handleRowPacket(byte[] data) { + //LOGGER.info("get into rowing data " + data.length); + ResponseHandler respHand = service.getResponseHandler(); + if (respHand != null) { + respHand.rowResponse(data, null, false, service); + } else { + closeNoHandler(); + } + } + + private void handleRowEofPacket(byte[] data) { + if (service.getSession() != null && !service.isTesting() && service.getLogResponse().compareAndSet(false, true)) { + service.getSession().setBackendResponseEndTime(this.service); + } + service.getLogResponse().set(false); + ResponseHandler respHand = service.getResponseHandler(); + if (respHand != null) { + service.backendSpecialCleanUp(); + respHand.rowEofResponse(data, false, service); + } else { + closeNoHandler(); + } + } + + private void handleOkPacket(byte[] data) { + // if prepared statement doesn't keep cursor on, the response contains additional ok packet + if (okAfterEof) { + resultStatus = RESULT_STATUS_HEADER; + header = data; + fields = new ArrayList<>((int) ByteUtil.readLength(data, 4)); + } else { + ok(data); + } + } + + + private void ok(byte[] data) { + ResponseHandler respHand = service.getResponseHandler(); + if (respHand != null) { + respHand.okResponse(data, service); + } + } + + + private void eof(byte[] eof) { + if (cursor) { + handleFieldEofPacket(eof); + } else { + okAfterEof = true; + handleBackendFieldEofPacket(eof); + } + } + + private void handleBackendFieldEofPacket(byte[] data) { + ResponseHandler respHand = service.getResponseHandler(); + service.setRowDataFlowing(true); + if (respHand != null) { + respHand.fieldEofResponse(header, fields, null, data, false, service); + } else { + closeNoHandler(); + } + } + + private void handleFieldEofPacket(byte[] eof) { + ResponseHandler respHand = service.getResponseHandler(); + if (respHand instanceof PreparedResponseHandler) { + ((PreparedResponseHandler) respHand).preparedExecuteResponse(header, fields, eof, service); + } else { + closeNoHandler(); + } + } +} diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/MysqlPrepareLogicHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/MysqlPrepareLogicHandler.java index 5aef0769dc..f87377fbe6 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/MysqlPrepareLogicHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/MysqlPrepareLogicHandler.java @@ -5,6 +5,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.net.mysql.EOFPacket; import com.actiontech.dble.net.mysql.ErrorPacket; +import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.services.mysqlsharding.MysqlBackendLogicHandler; import org.slf4j.Logger; @@ -18,7 +19,7 @@ */ public class MysqlPrepareLogicHandler extends MysqlBackendLogicHandler { private static final Logger LOGGER = LoggerFactory.getLogger(MysqlPrepareLogicHandler.class); - + private volatile byte[] ok; private volatile List params; private volatile List fields; @@ -35,65 +36,77 @@ public void handleInnerData(byte[] data) { } return; } - switch (resultStatus) { - case PREPARED_OK: - boolean executeResponse = service.syncAndExecute(); - if (executeResponse) { - // Prepare ok + byte type = data[4]; + if (type == OkPacket.FIELD_COUNT) { + boolean executeResponse = service.syncAndExecute(); + if (executeResponse) { + final int fieldCount = (int) ByteUtil.readLength(data, 9); + final int paramCount = (int) ByteUtil.readLength(data, 11); + if (fieldCount > 0) { + fields = new ArrayList<>(fieldCount + 1); + resultStatus = PREPARED_FIELD; + } + if (paramCount > 0) { + params = new ArrayList<>(paramCount + 1); + resultStatus = PREPARED_PARAM; + } + if (fieldCount == 0 && paramCount == 0) { + // handle ok packet handleOkPacket(data); - int fieldCount = ByteUtil.readUB2(data, 9); - int paramCount = ByteUtil.readUB2(data, 11); - if (fieldCount > 0) { - fields = new ArrayList<>(fieldCount); - resultStatus = PREPARED_FIELD; - } - if (paramCount > 0) { - params = new ArrayList<>(paramCount); - resultStatus = PREPARED_PARAM; - } + return; } - break; - case PREPARED_PARAM: - if (data[4] == EOFPacket.FIELD_COUNT) { + ok = data; + } + } else if (type == ErrorPacket.FIELD_COUNT) { + if (resultStatus == PREPARED_FIELD) { + fields.add(data); + // handle field eof + handleOkPacket(ok); + } else { + params.add(data); + if (fields != null) { resultStatus = PREPARED_FIELD; - handleParamEofPacket(data); } else { - params.add(data); + // handle param eof + handleOkPacket(ok); } - break; - case PREPARED_FIELD: - if (data[4] == EOFPacket.FIELD_COUNT) { - resultStatus = RESULT_STATUS_INIT; - handleFieldEofPacket(data); + } + } else if (type == EOFPacket.FIELD_COUNT) { + if (resultStatus == PREPARED_FIELD) { + fields.add(data); + // handle field eof + handleOkPacket(ok); + } else { + params.add(data); + if (fields != null) { + resultStatus = PREPARED_FIELD; } else { - fields.add(data); + // handle param eof + handleOkPacket(ok); } - break; - default: - super.handleInnerData(data); + } + } else { + data(data); } } - private void handleOkPacket(byte[] data) { - ResponseHandler respHand = service.getResponseHandler(); - if (respHand instanceof PreparedResponseHandler) { - ((PreparedResponseHandler) respHand).preparedOkResponse(data, service); - } - } - private void handleParamEofPacket(byte[] data) { - ResponseHandler respHand = service.getResponseHandler(); - if (respHand instanceof PreparedResponseHandler) { - ((PreparedResponseHandler) respHand).paramEofResponse(params, data, service); + public void data(byte[] data) { + if (resultStatus == PREPARED_FIELD) { + fields.add(data); } else { - closeNoHandler(); + params.add(data); } } - private void handleFieldEofPacket(byte[] data) { + private void handleOkPacket(byte[] okPacket) { ResponseHandler respHand = service.getResponseHandler(); if (respHand instanceof PreparedResponseHandler) { - ((PreparedResponseHandler) respHand).fieldEofResponse(fields, data, service); + ((PreparedResponseHandler) respHand).preparedOkResponse(okPacket, fields, params, service); + ok = null; + params = null; + fields = null; + resultStatus = PREPARED_FIELD; } else { closeNoHandler(); } diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java index 5dc0fc0181..5fdfe0c254 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java @@ -144,19 +144,6 @@ public void fieldEofResponse(byte[] header, List fields, List fields, byte[] eof, MySQLResponseService service) { - synchronized (this) { - for (byte[] field : fields) { - field[3] = (byte) rwSplitService.nextPacketId(); - buffer = frontedConnection.writeToBuffer(field, buffer); - } - eof[3] = (byte) rwSplitService.nextPacketId(); - buffer = frontedConnection.writeToBuffer(eof, buffer); - frontedConnection.write(buffer); - } - } @Override @@ -229,31 +216,54 @@ public void connectionClose(AbstractService service, String reason) { } @Override - public void preparedOkResponse(byte[] ok, MySQLResponseService service) { - boolean executeResponse = service.syncAndExecute(); - if (executeResponse) { - synchronized (this) { - if (buffer == null) { - buffer = frontedConnection.allocate(); + public void preparedOkResponse(byte[] ok, List fields, List params, MySQLResponseService service) { + synchronized (this) { + if (buffer == null) { + buffer = frontedConnection.allocate(); + } + if (!write2Client) { + ok[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(ok, buffer); + if (fields != null) { + for (byte[] field : fields) { + field[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(field, buffer); + } } - if (!write2Client) { - ok[3] = (byte) rwSplitService.nextPacketId(); - frontedConnection.write(ok); - write2Client = true; + if (params != null) { + for (byte[] param : params) { + param[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(param, buffer); + } } + frontedConnection.write(buffer); + write2Client = true; + buffer = null; } } } @Override - public void paramEofResponse(List params, byte[] eof, MySQLResponseService service) { + public void preparedExecuteResponse(byte[] header, List fields, byte[] eof, MySQLResponseService service) { synchronized (this) { - for (byte[] field : params) { - field[3] = (byte) rwSplitService.nextPacketId(); - buffer = frontedConnection.writeToBuffer(field, buffer); + if (buffer == null) { + buffer = frontedConnection.allocate(); + } + if (!write2Client) { + header[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(header, buffer); + if (fields != null) { + for (byte[] field : fields) { + field[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(field, buffer); + } + } + eof[3] = (byte) rwSplitService.nextPacketId(); + buffer = frontedConnection.writeToBuffer(eof, buffer); + frontedConnection.write(buffer); + write2Client = true; + buffer = null; } - eof[3] = (byte) rwSplitService.nextPacketId(); - buffer = frontedConnection.writeToBuffer(eof, buffer); } }