Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix : check if table meta cache should be refreshed in AT mode #4734

Merged
merged 61 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
ea0daa4
todo
Bughue Jun 27, 2022
1ac4cbb
simulate nullpointerExp
Bughue Jun 27, 2022
68f2687
fix npe
Bughue Jun 30, 2022
dc21dcd
fix npe
Bughue Jun 30, 2022
42ee542
fix npe
Bughue Jun 30, 2022
7de5d38
format
Bughue Jun 30, 2022
de27bf6
format
Bughue Jun 30, 2022
af64796
先去掉单测,让构建都完成,方便review
Bughue Jul 14, 2022
a7800e2
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Jul 14, 2022
1679535
remove t-w-r
Bughue Jul 26, 2022
d6b8610
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Aug 25, 2022
fccf6a9
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Sep 7, 2022
13c8601
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Sep 7, 2022
6faf8e7
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Sep 21, 2022
fbade58
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Dec 14, 2022
329618c
fix merge
Bughue Dec 14, 2022
4056fc2
fix test
Bughue Dec 14, 2022
9397cca
fix test
Bughue Dec 14, 2022
b2d712f
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Jan 17, 2023
2689577
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 2, 2023
c3de060
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 3, 2023
8a34079
rename exp
Bughue Feb 6, 2023
db46e53
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 6, 2023
b488939
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 6, 2023
355f106
npe
Bughue Feb 6, 2023
9209696
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 6, 2023
4fbfca4
optimize
Bughue Feb 6, 2023
90ea894
ignore unrefreshable
Bughue Feb 7, 2023
57960dd
remove todo
Bughue Feb 7, 2023
363b019
checkstyle
Bughue Feb 7, 2023
ef60d17
checkstyle
Bughue Feb 7, 2023
ac4166c
Merge branch 'develop' into dev_4572_new_feild_null
funky-eyes Feb 7, 2023
fd7aad3
checkstyle
Bughue Feb 7, 2023
125f042
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 7, 2023
20c1edc
changes
Bughue Feb 7, 2023
d1ce758
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Feb 9, 2023
b64d227
optimize
Bughue Feb 10, 2023
6062d52
optimize
Bughue Feb 10, 2023
933c3b9
private tableMeta
Bughue Feb 10, 2023
8db15a3
Merge branch 'develop' of github.com:seata/seata into dev_4572_new_fe…
Bughue Feb 13, 2023
ccf9ecf
tableMeta refresh event
Bughue Feb 15, 2023
e8b9a6b
check style
Bughue Feb 15, 2023
c901589
check style
Bughue Feb 15, 2023
072f609
currentTimeMillis
Bughue Feb 15, 2023
026f1b5
check style
Bughue Feb 15, 2023
996bfb5
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 15, 2023
1ac5bf2
comment
Bughue Feb 15, 2023
711e659
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 21, 2023
eb47a94
comment
Bughue Feb 22, 2023
332081b
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 22, 2023
ef87ff3
fix test
Bughue Feb 22, 2023
3b85ba1
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 22, 2023
de81450
fix test
Bughue Feb 22, 2023
155c5f0
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 22, 2023
a8d8781
fix test
Bughue Feb 22, 2023
febc6a0
fix test
Bughue Feb 22, 2023
9672177
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 22, 2023
16ca49a
Merge branch 'develop' into dev_4572_new_feild_null
Bughue Feb 23, 2023
8b22aef
optimize
Bughue Feb 28, 2023
5b4e206
Merge remote-tracking branch 'origin/dev_4572_new_feild_null' into de…
Bughue Feb 28, 2023
96bae53
optimize
Bughue Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#5299](https://github.com/seata/seata/pull/5299)] fix GlobalSession deletion when retry rollback or retry commit timeout
- [[#5307](https://github.com/seata/seata/pull/5307)] fix that keywords don't add escaped characters
- [[#5311](https://github.com/seata/seata/pull/5311)] remove RollbackRetryTimeout sessions during in file storage recover
- [[#4734](https://github.com/seata/seata/pull/4734)] check if table meta cache should be refreshed in AT mode
- [[#5316](https://github.com/seata/seata/pull/5316)] fix G1 jvm parameter in jdk8
- [[#5321](https://github.com/seata/seata/pull/5321)] fix When the rollback logic on the TC side returns RollbackFailed, the custom FailureHandler is not executed
- [[#5332](https://github.com/seata/seata/pull/5332)] fix bugs found in unit tests
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- [[#5299](https://github.com/seata/seata/pull/5299)] 修复TC端重试回滚或重试提交超时GlobalSession的删除问题
- [[#5307](https://github.com/seata/seata/pull/5307)] 修复生成update前后镜像sql不对关键字转义的bug
- [[#5311](https://github.com/seata/seata/pull/5311)] 移除基于文件存储恢复时的RollbackRetryTimeout事务
- [[#4734](https://github.com/seata/seata/pull/4734)] 修复AT模式下新增字段产生的字段找不到
- [[#5316](https://github.com/seata/seata/pull/5316)] 修复jdk8 中 G1 参数
- [[#5321](https://github.com/seata/seata/pull/5321)] 修复当TC端回滚返回RollbackFailed时,自定义FailureHandler的方法未执行
- [[#5332](https://github.com/seata/seata/pull/5332)] 修复单元测试中发现的bug
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.seata.common.Constants;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.common.ConfigurationKeys;
import io.seata.core.constants.DBType;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
Expand All @@ -36,11 +34,6 @@
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import io.seata.rm.datasource.util.JdbcUtils;
import io.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE;
import static io.seata.common.DefaultValues.DEFAULT_TABLE_META_CHECKER_INTERVAL;

/**
* The type Data source proxy.
Expand All @@ -65,21 +58,6 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource

private String version;

/**
* Enable the table meta checker
*/
private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(
ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);

/**
* Table meta checker interval
*/
private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(
ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);

private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("tableMetaChecker", 1, true));

/**
* Instantiates a new Data source proxy.
*
Expand Down Expand Up @@ -120,20 +98,18 @@ private void init(DataSource dataSource, String resourceGroupId) {
}
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原定时刷新缓存的功能在这次更新后失效了?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableMeta缓存刷新失效? #6660

tableMetaExecutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}

TableMetaCacheFactory.registerTableMeta(this);
//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}

/**
* publish tableMeta refresh event
*/
public void tableMetaRefreshEvent() {
TableMetaCacheFactory.tableMetaRefreshEvent(this.getResourceId());
}

/**
* Gets plain connection.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.seata.rm.datasource.exception;

import java.sql.SQLException;

/**
* The type TableMetaException exception.
*
* @author Bughue
*/
public class TableMetaException extends SQLException {
private String columnName;
private String tableName;

public TableMetaException(String tableName, String columnName) {
this.columnName = columnName;
this.tableName = tableName;
}

public String getTableName() {
return tableName;
}

public String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.seata.rm.datasource.ConnectionContext;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.StatementProxy;
import io.seata.rm.datasource.exception.TableMetaException;
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -93,11 +94,18 @@ public T doExecute(Object... args) throws Throwable {
* @throws Exception the exception
*/
protected T executeAutoCommitFalse(Object[] args) throws Exception {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
try {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
} catch (TableMetaException e) {
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
e.getTableName(), e.getColumnName());
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
throw e;
}
}

private boolean isMultiPk() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ protected TableMeta getTableMeta(String tableName) {
* @param afterImage the after image
* @return sql undo log
*/
@Override
protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
SQLType sqlType = sqlRecognizer.getSQLType();
String tableName = beforeImage.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,53 @@
*/
package io.seata.rm.datasource.sql.struct;

import java.sql.Connection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import io.seata.common.ConfigurationKeys;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.rm.datasource.DataSourceProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.common.DefaultValues.DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE;
import static io.seata.common.DefaultValues.DEFAULT_TABLE_META_CHECKER_INTERVAL;

/**
* @author guoyao
*/
public class TableMetaCacheFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(TableMetaCacheFactory.class);

private static final Map<String, TableMetaCache> TABLE_META_CACHE_MAP = new ConcurrentHashMap<>();

private static final Map<String, TableMetaRefreshHolder> TABLE_META_REFRESH_HOLDER_MAP = new ConcurrentHashMap<>();

private static final long TABLE_META_REFRESH_INTERVAL_TIME = 1000L;

/**
* Enable the table meta checker
*/
private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance()
.getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);

/**
* Table meta checker interval
*/
private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance()
.getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);


/**
* get table meta cache
*
Expand All @@ -38,4 +72,70 @@ public static TableMetaCache getTableMetaCache(String dbType) {
return CollectionUtils.computeIfAbsent(TABLE_META_CACHE_MAP, dbType,
key -> EnhancedServiceLoader.load(TableMetaCache.class, dbType));
}

/**
* register table meta
*
* @param dataSourceProxy
*/
public static void registerTableMeta(DataSourceProxy dataSourceProxy) {
TableMetaRefreshHolder holder = new TableMetaRefreshHolder(dataSourceProxy);
TABLE_META_REFRESH_HOLDER_MAP.put(dataSourceProxy.getResourceId(), holder);
}

/**
* public tableMeta refresh event
*/
public static void tableMetaRefreshEvent(String resourceId) {
TableMetaRefreshHolder refreshHolder = TABLE_META_REFRESH_HOLDER_MAP.get(resourceId);
boolean offer = refreshHolder.tableMetaRefreshQueue.offer(System.currentTimeMillis());
if (!offer) {
LOGGER.error("table refresh event offer error:{}", resourceId);
}
}

static class TableMetaRefreshHolder {
private long lastRefreshFinishTime;
private DataSourceProxy dataSource;
private BlockingQueue<Long> tableMetaRefreshQueue;


private final Executor tableMetaRefreshExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(), new NamedThreadFactory("tableMetaRefresh", 1, true));

TableMetaRefreshHolder(DataSourceProxy dataSource) {
this.dataSource = dataSource;
this.lastRefreshFinishTime = System.currentTimeMillis() - TABLE_META_REFRESH_INTERVAL_TIME;
this.tableMetaRefreshQueue = new LinkedBlockingQueue<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

limited the queue length


tableMetaRefreshExecutor.execute(() -> {
while (true) {
// 1. check table meta
if (ENABLE_TABLE_META_CHECKER_ENABLE
&& System.currentTimeMillis() - lastRefreshFinishTime > TABLE_META_CHECKER_INTERVAL) {
tableMetaRefreshEvent(dataSource.getResourceId());
}

// 2. refresh table meta
try {
Long eventTime = tableMetaRefreshQueue.take();
// if it has bean refreshed not long ago, skip
if (eventTime - lastRefreshFinishTime > TABLE_META_REFRESH_INTERVAL_TIME) {
try (Connection connection = dataSource.getConnection()) {
TableMetaCache tableMetaCache =
TableMetaCacheFactory.getTableMetaCache(dataSource.getDbType());
tableMetaCache.refresh(connection, dataSource.getResourceId());
}
lastRefreshFinishTime = System.currentTimeMillis();
}
} catch (Exception exx) {
LOGGER.error("table refresh error:{}", exx.getMessage(), exx);
}
}
});
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import javax.sql.rowset.serial.SerialDatalink;
import javax.sql.rowset.serial.SerialJavaObject;
import javax.sql.rowset.serial.SerialRef;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.rm.datasource.exception.TableMetaException;
import io.seata.rm.datasource.sql.serial.SerialArray;
import static io.seata.rm.datasource.exec.oracle.OracleJdbcType.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
import static io.seata.rm.datasource.exec.oracle.OracleJdbcType.TIMESTAMP_WITH_TIME_ZONE;
Expand All @@ -55,6 +59,8 @@ public class TableRecords implements java.io.Serializable {

private List<Row> rows = new ArrayList<Row>();

private static final Interner<String> TABLE_NAME_POOL = Interners.newWeakInterner();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where used?


/**
* Gets table name.
*
Expand Down Expand Up @@ -196,7 +202,7 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
List<Field> fields = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
String colName = resultSetMetaData.getColumnName(i);
ColumnMeta col = tmeta.getColumnMeta(colName);
ColumnMeta col = getColumnMeta(tmeta,colName);
int dataType = col.getDataType();
Field field = new Field();
field.setName(col.getColumnName());
Expand Down Expand Up @@ -259,6 +265,20 @@ public static TableRecords buildRecords(TableMeta tmeta, ResultSet resultSet) th
return records;
}

/**
* check if the column is null and return
*
* @param tmeta the table meta
* @param colName the column nmae
*/
private static ColumnMeta getColumnMeta(TableMeta tmeta , String colName) throws SQLException {
ColumnMeta col = tmeta.getColumnMeta(colName);
if (col == null) {
throw new TableMetaException(tmeta.getTableName(), colName);
}
return col;
}

/**
* since there is no parameterless constructor for Blob, Clob and NClob just like mysql,
* it needs to be converted to Serial_ type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.mock.MockDataSource;
import io.seata.rm.datasource.mock.MockDriver;
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -44,7 +45,7 @@ public void test_constructor() {
@Test
public void getResourceIdTest() throws SQLException, NoSuchFieldException, IllegalAccessException {
// Disable 'DataSourceProxy.tableMetaExecutor' to prevent unit tests from being affected
Field enableField = DataSourceProxy.class.getDeclaredField("ENABLE_TABLE_META_CHECKER_ENABLE");
Field enableField = TableMetaCacheFactory.class.getDeclaredField("ENABLE_TABLE_META_CHECKER_ENABLE");
enableField.setAccessible(true);
enableField.set(null, false);

Expand Down
Loading