Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ps protocol inner 349 #3861

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
public interface PreparedResponseHandler {

void preparedOkResponse(byte[] ok, MySQLResponseService service);
void preparedOkResponse(byte[] ok, List<byte[]> fields, List<byte[]> params, MySQLResponseService service);

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'service' is never used.

void paramEofResponse(List<byte[]> params, byte[] eof, MySQLResponseService service);

void fieldEofResponse(List<byte[]> fields, byte[] eof, MySQLResponseService service);
void preparedExecuteResponse(byte[] header, List<byte[]> fields, byte[] eof, MySQLResponseService service);

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'service' is never used.
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.actiontech.dble.services.BusinessService;
import com.actiontech.dble.services.VariablesService;
import com.actiontech.dble.services.mysqlauthenticate.MySQLBackAuthService;
import com.actiontech.dble.services.rwsplit.MysqlExecuteResponseHandler;
import com.actiontech.dble.services.rwsplit.MysqlPrepareLogicHandler;
import com.actiontech.dble.services.rwsplit.MysqlStatisticsLogicHandler;
import com.actiontech.dble.services.rwsplit.RWSplitService;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class MySQLResponseService extends VariablesService {
private volatile boolean isDDL = false;
private volatile boolean prepareOK = false;
private volatile boolean statisticResponse = false;
private volatile boolean executeResponse = false;
private volatile boolean testing = false;
private volatile StatusSync statusSync;
private volatile boolean isRowDataFlowing = false;
Expand All @@ -80,6 +82,7 @@ public class MySQLResponseService extends VariablesService {
private MysqlBackendLogicHandler baseLogicHandler;
private MysqlPrepareLogicHandler prepareLogicHandler;
private MysqlStatisticsLogicHandler statisticsLogicHandler;
private MysqlExecuteResponseHandler executeResponseHandler;

private static final CommandPacket COMMIT = new CommandPacket();
private static final CommandPacket ROLLBACK = new CommandPacket();
Expand All @@ -101,7 +104,6 @@ public MySQLResponseService(AbstractConnection connection) {
initFromConfig();
this.proto = new MySQLProtoHandlerImpl();
this.baseLogicHandler = new MysqlBackendLogicHandler(this);
this.prepareLogicHandler = new MysqlPrepareLogicHandler(this);
this.statisticsLogicHandler = new MysqlStatisticsLogicHandler(this);
}

Expand Down Expand Up @@ -151,6 +153,8 @@ protected void handleInnerData(byte[] data) {
prepareLogicHandler.handleInnerData(data);
} else if (statisticResponse) {
statisticsLogicHandler.handleInnerData(data);
} else if (executeResponse) {
executeResponseHandler.handleInnerData(data);
} else {
baseLogicHandler.handleInnerData(data);
}
Expand Down Expand Up @@ -639,6 +643,10 @@ public void execute(RWSplitService service, byte[] originPacket) {
if (originPacket.length > 4) {
prepareOK = originPacket[4] == MySQLPacket.COM_STMT_PREPARE;
statisticResponse = originPacket[4] == MySQLPacket.COM_STATISTICS;
if (originPacket[4] == MySQLPacket.COM_STMT_EXECUTE) {
this.executeResponseHandler = new MysqlExecuteResponseHandler(this, originPacket[9] == (byte) 0x01);
executeResponse = true;
}
}

isExecuting = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package com.actiontech.dble.services.rwsplit;

import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.nio.handler.LoadDataResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.PreparedResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.MysqlBackendLogicHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;

public class MysqlExecuteResponseHandler extends MysqlBackendLogicHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlExecuteResponseHandler.class);

private final boolean cursor;
private volatile boolean okAfterEof = false;

public MysqlExecuteResponseHandler(MySQLResponseService service, boolean cursor) {
super(service);
this.cursor = cursor;
}

public void handleInnerData(byte[] data) {

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
MysqlBackendLogicHandler.handleInnerData
; it is advisable to add an Override annotation.
if (service.getConnection().isClosed()) {
if (data != null && data.length > 4 && data[4] == ErrorPacket.FIELD_COUNT) {
service.parseErrorPacket(data, "connection close");
}
return;
}
switch (resultStatus) {
case RESULT_STATUS_INIT:
if (service.getSession() != null) {
service.getSession().startExecuteBackend(service.getConnection().getId());
}
switch (data[4]) {

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
data
may be null at this access as suggested by
this
null guard.
case OkPacket.FIELD_COUNT:
handleOkPacket(data);
break;
case ErrorPacket.FIELD_COUNT:
handleErrorPacket(data);
break;
case RequestFilePacket.FIELD_COUNT:
handleRequestPacket(data);
break;
default:
resultStatus = RESULT_STATUS_HEADER;
header = data;
fields = new ArrayList<>((int) ByteUtil.readLength(data, 4));
}
break;
case RESULT_STATUS_HEADER:
switch (data[4]) {

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
data
may be null at this access as suggested by
this
null guard.
case ErrorPacket.FIELD_COUNT:
reset();
handleErrorPacket(data);
break;
case EOFPacket.FIELD_COUNT:
resultStatus = RESULT_STATUS_FIELD_EOF;
eof(data);
break;
default:
fields.add(data);
}
break;
case RESULT_STATUS_FIELD_EOF:
switch (data[4]) {

Check warning

Code scanning / CodeQL

Dereferenced variable may be null Warning

Variable
data
may be null at this access as suggested by
this
null guard.
case ErrorPacket.FIELD_COUNT:
reset();
handleErrorPacket(data);
break;
case EOFPacket.FIELD_COUNT:
if (data.length > MySQLPacket.MAX_EOF_SIZE) {
handleRowPacket(data);
} else {
reset();
handleRowEofPacket(data);
}
break;
default:
handleRowPacket(data);
}
break;
default:
throw new RuntimeException("unknown status!");
}
}



private void handleErrorPacket(byte[] data) {
final ResponseHandler respHand = service.getResponseHandler();
service.setExecuting(false);
service.signal();
if (respHand != null) {
respHand.errorResponse(data, service);
} else {
try {
ErrorPacket errPkg = new ErrorPacket();
errPkg.read(data);
String errMsg = "errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
LOGGER.warn("no handler process the execute sql err,just close it, sql error:{},back con:{}", errMsg, service);
if (service.getSession() != null) {
LOGGER.warn("no handler process the execute sql err,front conn {}", service.getSession().getSource());
}

} catch (RuntimeException e) {
LOGGER.info("error handle error-packet", e);
}
closeNoHandler();
}
}

private void closeNoHandler() {
if (!service.getConnection().isClosed()) {
service.getConnection().close("no handler");
LOGGER.info("no handler bind in this con " + this + " client:" + service);

Check notice

Code scanning / CodeQL

Use of default toString() Note

Default toString(): MysqlExecuteResponseHandler inherits toString() from Object, and so is not suitable for printing.
}
}

private void handleRequestPacket(byte[] data) {
ResponseHandler respHand = service.getResponseHandler();
if (respHand instanceof LoadDataResponseHandler) {
((LoadDataResponseHandler) respHand).requestDataResponse(data, service);
} else {
closeNoHandler();
}
}

private void handleRowPacket(byte[] data) {
//LOGGER.info("get into rowing data " + data.length);
ResponseHandler respHand = service.getResponseHandler();
if (respHand != null) {
respHand.rowResponse(data, null, false, service);
} else {
closeNoHandler();
}
}

private void handleRowEofPacket(byte[] data) {
if (service.getSession() != null && !service.isTesting() && service.getLogResponse().compareAndSet(false, true)) {
service.getSession().setBackendResponseEndTime(this.service);
}
service.getLogResponse().set(false);
ResponseHandler respHand = service.getResponseHandler();
if (respHand != null) {
service.backendSpecialCleanUp();
respHand.rowEofResponse(data, false, service);
} else {
closeNoHandler();
}
}

private void handleOkPacket(byte[] data) {
// if prepared statement doesn't keep cursor on, the response contains additional ok packet
if (okAfterEof) {
resultStatus = RESULT_STATUS_HEADER;
header = data;
fields = new ArrayList<>((int) ByteUtil.readLength(data, 4));
} else {
ok(data);
}
}


private void ok(byte[] data) {
ResponseHandler respHand = service.getResponseHandler();
if (respHand != null) {
respHand.okResponse(data, service);
}
}


private void eof(byte[] eof) {
if (cursor) {
handleFieldEofPacket(eof);
} else {
okAfterEof = true;
handleBackendFieldEofPacket(eof);
}
}

private void handleBackendFieldEofPacket(byte[] data) {
ResponseHandler respHand = service.getResponseHandler();
service.setRowDataFlowing(true);
if (respHand != null) {
respHand.fieldEofResponse(header, fields, null, data, false, service);
} else {
closeNoHandler();
}
}

private void handleFieldEofPacket(byte[] eof) {
ResponseHandler respHand = service.getResponseHandler();
if (respHand instanceof PreparedResponseHandler) {
((PreparedResponseHandler) respHand).preparedExecuteResponse(header, fields, eof, service);
} else {
closeNoHandler();
}
}
}
Loading
Loading